ADDED nmsg-transport.scm Index: nmsg-transport.scm ================================================================== --- /dev/null +++ nmsg-transport.scm @@ -0,0 +1,493 @@ + +;; Copyright 2006-2012, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +(require-extension (srfi 18) extras tcp s11n) + +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) +(import (prefix sqlite3 sqlite3:)) + +(use nanomsg) + +(declare (unit nmsg-transport)) + +(declare (uses common)) +(declare (uses db)) +(declare (uses tests)) +(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. +(declare (uses server)) + +(include "common_records.scm") +(include "db_records.scm") + +;; Transition to pub --> sub with pull <-- push +;; +;; 1. client sends request to server via push to the pull port +;; 2. server puts request in queue or processes immediately as appropriate +;; 3. server puts responses from completed requests into pub port +;; +;; TODO +;; +;; Done Tested +;; [x] [ ] 1. Add columns pullport pubport to servers table +;; [x] [ ] 2. Add rm of monitor.db if older than 11/12/2012 +;; [x] [ ] 3. Add create of pullport and pubport with finding of available ports +;; [x] [ ] 4. Add client compose of request +;; [x] [ ] - name of client: testname/itempath-test_id-hostname +;; [x] [ ] - name of request: callname, params +;; [x] [ ] - request key: f(clientname, callname, params) +;; [x] [ ] 5. Add processing of subscription hits +;; [x] [ ] - done when get key +;; [x] [ ] - return results +;; [x] [ ] 6. Add timeout processing +;; [x] [ ] - after 60 seconds +;; [ ] [ ] i. check server alive, connect to new if necessary +;; [ ] [ ] ii. resend request +;; [ ] [ ] 7. Turn self ping back on + +(define (nmsg-transport:make-server-url hostport) + (if (not hostport) + #f + (conc "tcp://" (car hostport) ":" (cadr hostport)))) + +(define *server-loop-heart-beat* (current-seconds)) +(define *heartbeat-mutex* (make-mutex)) + +;;====================================================================== +;; S E R V E R +;;====================================================================== + +(define-inline (nmsgsock:get-pub dat)(vector-ref dat 0)) +(define-inline (nmsgsock:get-pull dat)(vector-ref dat 1)) +(define-inline (nmsgsock:set-pub! dat s)(vector-set! dat s 0)) +(define-inline (nmsgsock:set-pull! dat s)(vector-set! dat s 0)) + +(define (nmsg-transport:run hostn) + (debug:print 2 "Attempting to start the server ...") + (if (not *toppath*) + (if (not (launch:setup-for-run)) + (begin + (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") + (exit)))) + (let* ((db (open-db)) ;; here we *do not* want to be opening and closing the db + (nmsg-sdat1 #f) + (nmsg-sdat2 #f) + (pull-socket #f) + (pub-socket #f) + (p1 #f) + (p2 #f) + (nmsg-sockets-dat #f) + (iface (if (string=? "-" hostn) + "*" ;; (get-host-name) + hostn)) + (hostname (get-host-name)) + (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + #f))) + (if ipstr ipstr hostname))) + (last-run 0)) + (set! nmsg-sockets-dat (nmsg-transport:setup-ports ipaddrstr (if (args:get-arg "-port") + (string->number (args:get-arg "-port")) + (+ 5000 (random 1001))))) + + (set! nmsg-sdat1 (car nmsg-sockets-dat)) + (set! pull-socket (cadr nmsg-sdat1)) ;; (iface s port) + (set! p1 (caddr nmsg-sdat1)) + + (set! nmsg-sdat2 (cadr nmsg-sockets-dat)) + (set! pub-socket (cadr nmsg-sdat2)) + (set! p2 (caddr nmsg-sdat2)) + + (set! *cache-on* #t) + + (set! *runremote* (vector pull-socket pub-socket)) ;; overloading the use of *runremote* BUG!? + + ;; what to do when we quit + ;; +;; (on-exit (lambda () +;; (if (and *toppath* *server-info*) +;; (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*)) +;; (let loop () +;; (let ((queue-len 0)) +;; (thread-sleep! (random 5)) +;; (mutex-lock! *incoming-mutex*) +;; (set! queue-len (length *incoming-data*)) +;; (mutex-unlock! *incoming-mutex*) +;; (if (> queue-len 0) +;; (begin +;; (debug:print-info 0 "Queue not flushed, waiting ...") +;; (loop)))))))) + + ;; The heavy lifting + ;; + ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime + ;; + (debug:print-info 11 "Server setup complete, start listening for messages") + (let loop ((queue-lst '())) + (let* ((rawmsg (receive-message* pull-socket)) + (packet (db:string->obj rawmsg)) + (qtype (cdb:packet-get-qtype packet))) + (debug:print-info 12 "server=> received packet=" packet) + (if (not (member qtype '(sync ping))) + (begin + (mutex-lock! *heartbeat-mutex*) + (set! *last-db-access* (current-seconds)) + (mutex-unlock! *heartbeat-mutex*))) + (if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue + (begin + (db:process-queue-item db packet) + ;; (open-run-close db:process-queue #f pub-socket (cons packet queue-lst)) + + (loop '())) + (loop (cons packet queue-lst))))))) + +;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (nmsg-transport:keep-running) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *server-info*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (debug:print 12 "WARNING: server not started yet, waiting few seconds before trying again") + (sleep 4) + (loop)))))) + (iface (cadr server-info)) + (pullport (caddr server-info)) + (pubport (cadddr server-info)) ;; id interface pullport pubport) + ;; (nmsg-sockets (nmsg-transport:client-connect iface pullport pubport)) + (last-access 0)) + (debug:print-info 11 "heartbeat started for nmsg server on " iface " " pullport " " pubport) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + ;; GET REAL QUEUE LENGTH FROM THE VARIABLE + (let ((queue-len 0)) ;; FOR NOW DO NOT DO THIS (cdb:client-call nmsg-sockets 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (if (> (+ last-access + ;; (* 50 60 60) ;; 48 hrs + ;; 60 ;; one minute + ;; (* 60 60) ;; one hour + (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit))))))) + +(define (nmsg-transport:find-free-port-and-open iface s port stype #!key (trynum 50)) + (let ((s (if s s (make-socket stype))) + (p (if (number? port) port 5555)) + (old-handler (current-exception-handler))) + (handle-exceptions + exn + (begin + (debug:print 0 "Failed to bind to port " p ", trying next port") + (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) + ;; (old-handler) + ;; (print-call-chain) + (if (> trynum 0) + (nmsg-transport:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1)) + (debug:print-info 0 "Tried ports up to " p + " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use")) + (exit)) ;; To exit or not? That is the question. + (let ((nmsg-url (conc "tcp://" iface ":" p))) + (debug:print 2 "Trying to start server on " nmsg-url) + (bind-socket s nmsg-url) + (list iface s port))))) + +(define (nmsg-transport:setup-ports ipaddrstr startport) + (let* ((s1 (nmsg-transport:find-free-port-and-open ipaddrstr #f startport 'pull)) + (p1 (caddr s1)) + (s2 (nmsg-transport:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub)) + (p2 (caddr s2))) + (set! *runremote* #f) + (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2) + (mutex-lock! *heartbeat-mutex*) + (set! *server-info* (open-run-close tasks:server-register + tasks:open-db + (current-process-id) + ipaddrstr p1 + 0 + 'live + 'nmsg + pubport: p2)) + (debug:print-info 11 "*server-info* set to " *server-info*) + (mutex-unlock! *heartbeat-mutex*) + (list s1 s2))) + +(define (nmsg-transport:mk-signature) + (message-digest-string (md5-primitive) + (with-output-to-string + (lambda () + (write (list (current-directory) + (argv))))))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +;;====================================================================== +;; C L I E N T S +;;====================================================================== + +;; +(define (nmsg-transport:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '())) + (debug:print-info 3 "client-connect " iface ":" port ", type=" type ", subscriptions=" subscriptions) + (let ((connect-ok #f) + (nmsg-socket (if context + (make-socket type context) + (make-socket type))) + (conurl (nmsg-transport:make-server-url (list iface port)))) + (if (socket? nmsg-socket) + (begin + ;; first apply subscriptions + (for-each (lambda (subscription) + (debug:print 2 "Subscribing to " subscription) + (socket-option-set! nmsg-socket 'subscribe subscription)) + subscriptions) + (connect-socket nmsg-socket conurl) + nmsg-socket) + (begin + (debug:print 0 "ERROR: Failed to open socket to " conurl) + #f)))) + +(define (nmsg-transport:client-connect iface pullport pubport) + (let* ((push-socket (nmsg-transport:client-socket-connect iface pullport type: 'push)) + (sub-socket (nmsg-transport:client-socket-connect iface pubport + type: 'sub + subscriptions: (list (client:get-signature) "all"))) + (nmsg-sockets (vector push-socket sub-socket)) + (login-res #f)) + (debug:print-info 11 "nmsg-transport:client-connect started. Next is login") + (set! login-res (client:login serverdat nmsg-sockets)) + (if (and (not (null? login-res)) + (car login-res)) + (begin + (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".") + (set! *runremote* nmsg-sockets) + nmsg-sockets) + (begin + (debug:print-info 2 "Failed to login or connect to " conurl) + (set! *runremote* #f) + #f)))) + +;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (nmsg-transport:keep-running) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *runremote*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (sleep 4) + (loop)))))) + (iface (car server-info)) + (port (cadr server-info)) + (last-access 0) + (tdb (tasks:open-db)) + (spid (tasks:server-get-server-id tdb #f iface port #f))) + (print "Keep-running got server pid " spid ", using iface " iface " and port " port) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (tasks:server-update-heartbeat tdb spid) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (if (> (+ last-access + ;; (* 50 60 60) ;; 48 hrs + ;; 60 ;; one minute + ;; (* 60 60) ;; one hour + (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (tasks:server-deregister-self tdb (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit))))))) + +;; all routes though here end in exit ... +(define (nmsg-transport:launch) + (if (not *toppath*) + (if (not (launch:setup-for-run)) + (begin + (debug:print 0 "ERROR: cannot find megatest.config, exiting") + (exit)))) + (debug:print-info 2 "Starting nmsg server") + (if *toppath* + (let* (;; (th1 (make-thread (lambda () + ;; (let ((server-info #f)) + ;; ;; wait for the server to be online and available + ;; (let loop () + ;; (debug:print-info 2 "Waiting for the server to come online before starting heartbeat") + ;; (thread-sleep! 2) + ;; (mutex-lock! *heartbeat-mutex*) + ;; (set! server-info *server-info* ) + ;; (mutex-unlock! *heartbeat-mutex*) + ;; (if (not server-info)(loop))) + ;; (debug:print 2 "Server alive, starting self-ping") + ;; (nmsg-transport:self-ping server-info) + ;; )) + ;; "Self ping")) + (th2 (make-thread (lambda () + (nmsg-transport:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-"))) "Server run")) + ;; (th3 (make-thread (lambda ()(nmsg-transport:keep-running)) "Keep running")) + ) + (set! *client-non-blocking-mode* #t) + ;; (thread-start! th1) + (thread-start! th2) + ;; (thread-start! th3) + (set! *didsomething* #t) + ;; (thread-join! th3) + (thread-join! th2) + ) + (debug:print 0 "ERROR: Failed to setup for megatest"))) + +(define (nmsg-transport:client-signal-handler signum) + (handle-exceptions + exn + (debug:print " ... exiting ...") + (let ((th1 (make-thread (lambda () + (if (not *received-response*) + (receive-message* *runremote*))) ;; flush out last call if applicable + "eat response")) + (th2 (make-thread (lambda () + (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") + (thread-sleep! 3) ;; give the flush three seconds to do it's stuff + (debug:print 0 " Done.") + (exit 4)) + "exit on ^C timer"))) + (thread-start! th2) + (thread-start! th1) + (thread-join! th2)))) + +(define (nmsg-transport:client-launch) + (set-signal-handler! signal/int nmsg-transport:client-signal-handler) + (if (nmsg-transport:client-setup) + (debug:print-info 2 "connected as client") + (begin + (debug:print 0 "ERROR: Failed to connect as client") + (exit)))) + +;;====================================================================== +;; Defunct functions +;;====================================================================== + +;; ping a server and return number of clients or #f (if no response) +;; NOT IN USE! +(define (nmsg-transport:ping host port #!key (secs 10)(return-socket #f)) + (cdb:use-non-blocking-mode + (lambda () + (let* ((res #f) + (th1 (make-thread + (lambda () + (let* ((nmsg-context (make-context 1)) + (nmsg-socket (nmsg-transport:client-connect host port context: nmsg-context))) + (if nmsg-socket + (if (nmsg-transport:client-login nmsg-socket) + (let ((numclients (cdb:num-clients nmsg-socket))) + (if (not return-socket) + (begin + (nmsg-transport:client-logout nmsg-socket) + (close-socket nmsg-socket))) + (set! res (list #t numclients (if return-socket nmsg-socket #f)))) + (begin + ;; (close-socket nmsg-socket) + (set! res (list #f "CAN'T LOGIN" #f)))) + (set! res (list #f "CAN'T CONNECT" #f))))) + "Ping: th1")) + (th2 (make-thread + (lambda () + (let loop ((count 1)) + (debug:print-info 1 "Ping " count " server on " host " at port " port) + (thread-sleep! 2) + (if (< count (/ secs 2)) + (loop (+ count 1)))) + ;; (thread-terminate! th1) + (set! res (list #f "TIMED OUT" #f))) + "Ping: th2"))) + (thread-start! th2) + (thread-start! th1) + (handle-exceptions + exn + (set! res (list #f "TIMED OUT" #f)) + (thread-join! th1 secs)) + res)))) + +;; (define (nmsg-transport:self-ping server-info) +;; ;; server-info: server-id interface pullport pubport +;; (let ((iface (list-ref server-info 1)) +;; (pullport (list-ref server-info 2)) +;; (pubport (list-ref server-info 3))) +;; (nmsg-transport:client-connect iface pullport pubport) +;; (let loop () +;; (thread-sleep! 2) +;; (cdb:client-call *runremote* 'ping #t) +;; (debug:print 4 "nmsg-transport:self-ping - I'm alive on " iface ":" pullport "/" pubport "!") +;; (mutex-lock! *heartbeat-mutex*) +;; (set! *server-loop-heart-beat* (current-seconds)) +;; (mutex-unlock! *heartbeat-mutex*) +;; (loop)))) + +(define (nmsg-transport:reply pubsock target query-sig success/fail result) + (debug:print-info 11 "nmsg-transport:reply target=" target ", result=" result) + (send-message pubsock target send-more: #t) + (send-message pubsock (db:obj->string (vector success/fail query-sig result)))) + ADDED testnanomsg/basic-req-rep.scm Index: testnanomsg/basic-req-rep.scm ================================================================== --- /dev/null +++ testnanomsg/basic-req-rep.scm @@ -0,0 +1,3 @@ +(use nanomsg srfi-18 sqlite3 numbers) + +(define resp (nn-socket 'rep)) ADDED testnanomsg/mockupclient.scm Index: testnanomsg/mockupclient.scm ================================================================== --- /dev/null +++ testnanomsg/mockupclient.scm @@ -0,0 +1,42 @@ +(use zmq posix numbers) + +(define cname "Bob") +(define runtime 10) +(let ((args (argv))) + (if (< (length args) 3) + (begin + (print "Usage: mockupclient clientname runtime") + (exit)) + (begin + (set! cname (cadr args)) + (set! runtime (string->number (caddr args)))))) + +;; (define start-delay (/ (random 100) 9)) +;; (define runtime (+ 1 (/ (random 200) 2))) + +(print "Starting client " cname " with runtime " runtime) + +(include "mockupclientlib.scm") + +(set! endtime (+ (current-seconds) runtime)) + +;; first ping the server to ensure we have a connection +(if (server-ping cname 5) + (print "SUCCESS: Client " cname " connected to server") + (begin + (print "ERROR: Client " cname " failed ping of server, exiting") + (exit))) + +(let loop () + (let ((x (random 15)) + (varname (list-ref (list "hello" "goodbye" "saluton" "kiaorana")(random 4)))) + (case x + ;; ((1)(dbaccess cname 'sync "nodat" #f)) + ((2 3 4 5)(dbaccess cname 'set varname (random 999))) + ((6 7 8 9 10)(print cname ": Get \"" varname "\" " (dbaccess cname 'get varname #f))) + (else + (thread-sleep! 0.011))) + (if (< (current-seconds) endtime) + (loop)))) + +(print "Client " cname " all done!!") ADDED testnanomsg/mockupclientlib.scm Index: testnanomsg/mockupclientlib.scm ================================================================== --- /dev/null +++ testnanomsg/mockupclientlib.scm @@ -0,0 +1,58 @@ +(define reqs (nn-socket 'req)) + +(connect-socket reqs "tcp://localhost:6563") + +(thread-sleep! 0.2) + +(define (server-ping cname timeout) + (let ((msg (conc cname ":ping:" timeout)) + (maxtime (+ (current-seconds) timeout))) + (print "pinging server from " cname " with timeout " timeout) + (let loop ((res #f)) + (if (< maxtime (current-seconds)) + #f ;; failed to ping + (if (equal? res "Got ping") + #t + (begin + (print "Ping received from server " res) + (send-message push msg) + (thread-sleep! 0.1) + (loop (receive-message sub non-blocking: #t)))))))) + +(define (dbaccess cname cmd var val #!key (numtries 20)) + (let* ((msg (conc cname ":" cmd ":" (if val (conc var " " val) var))) + (res #f) + (mtx1 (make-mutex)) + (do-access (lambda () + (let ((tmpres #f)) + (print "Sending msg: " msg) + (send-message push msg) + (print "Message " msg " sent") + (print "Client " cname " waiting for response to " msg) + (print "Client " cname " received address " (receive-message* sub)) + (set! tmpres (receive-message* sub)) + (mutex-lock! mtx1) + (set! res tmpres) + (mutex-unlock! mtx1)))) + (th1 (make-thread do-access "do access")) + (th2 (make-thread (lambda () + (let ((result #f)) + (mutex-lock! mtx1) + (set! result res) + (mutex-unlock! mtx1) + (thread-sleep! 5) + (if (not result) + (if (> numtries 0) + (begin + (print "WARNING: access timed out for " cname ", trying again. Trys remaining=" numtries) + (dbaccess cname cmd var val numtries: (- numtries 1))) + (begin + (print "ERROR: dbaccess timed out. Exiting") + (exit))))) + "timeout thread")))) + (thread-start! th1) + (thread-start! th2) + (thread-join! th1) + (if res (print "SUCCESS: received " res " with " numtries " remaining possible attempts")) + res)) + ADDED testnanomsg/mockupserver.scm Index: testnanomsg/mockupserver.scm ================================================================== --- /dev/null +++ testnanomsg/mockupserver.scm @@ -0,0 +1,146 @@ +;; pub/sub with envelope address +;; Note that if you don't insert a sleep, the server will crash with SIGPIPE as soon +;; as a client disconnects. Also a remaining client may receive tons of +;; messages afterward. + +(use nanomsg srfi-18 sqlite3 numbers) + +(define resp (nn-socket 'rep)) +(define cname "server") +(define total-db-accesses 0) +(define start-time (current-seconds)) + +(nn-bind resp "tcp://*:6563") + +(thread-sleep! 0.2) + +(define (open-db) + (let* ((dbpath "mockup.db") + (dbexists (file-exists? dbpath)) + (db (open-database dbpath)) ;; (never-give-up-open-db dbpath)) + (handler (make-busy-timeout 10))) + (set-busy-handler! db handler) + (if (not dbexists) + (for-each + (lambda (stmt) + (execute db stmt)) + (list + "PRAGMA SYNCHRONOUS=0;" + "CREATE TABLE clients (id INTEGER PRIMARY KEY,name TEXT,num_accesses INTEGER DEFAULT 0);" + "CREATE TABLE vars (var TEXT,val TEXT,CONSTRAINT vars_constraint UNIQUE (var));"))) + db)) + +(define cid-cache (make-hash-table)) + +(define (get-client-id db cname) + (let ((cid (hash-table-ref/default cid-cache cname #f))) + (if cid + cid + (begin + (execute db "INSERT OR REPLACE INTO clients (name) VALUES(?);" cname) + (for-each-row + (lambda (id) + (set! cid id)) + db + "SELECT id FROM clients WHERE name=?;" cname) + (hash-table-set! cid-cache cname cid) + (set! total-db-accesses (+ total-db-accesses 2)) + cid)))) + +(define (count-client db cname) + (let ((cid (get-client-id db cname))) + (execute db "UPDATE clients SET num_accesses=num_accesses+1 WHERE id=?;" cid) + (set! total-db-accesses (+ total-db-accesses 1)) + )) + +(define db (open-db)) +;; (define queuelst '()) +;; (define mx1 (make-mutex)) + +(define max-queue-len 0) + +(define (process-queue queuelst) + (let ((queuelen (length queuelst))) + (if (> queuelen max-queue-len) + (set! max-queue-len queuelen)) + (for-each + (lambda (item) + (let ((cname (vector-ref item 1)) + (clcmd (vector-ref item 2)) + (cdata (vector-ref item 3))) + (send-message pub cname send-more: #t) + (send-message pub (case clcmd + ((sync) + (conc queuelen)) + ((set) + (set! total-db-accesses (+ total-db-accesses 1)) + (apply execute db "INSERT OR REPLACE INTO vars (var,val) VALUES (?,?);" (string-split cdata)) + "ok") + ((get) + (set! total-db-accesses (+ total-db-accesses 1)) + (let ((res "noval")) + (for-each-row + (lambda (val) + (set! res val)) + db + "SELECT val FROM vars WHERE var=?;" cdata) + res)) + (else (conc "unk cmd: " clcmd)))))) + queuelst))) + +;; SERVER THREAD +(define th1 (make-thread + (lambda () + (let ((last-run 0)) ;; current-seconds when run last + (let loop ((queuelst '())) + (let* ((indat (receive-message* pull)) + (parts (string-split indat ":")) + (cname (car parts)) ;; client name + (clcmd (string->symbol (cadr parts))) ;; client cmd + (cdata (caddr parts)) ;; client data + (svect (vector (current-seconds) cname clcmd cdata))) ;; record for the queue + ;; (print "Server received message: " indat) + (count-client db cname) + (case clcmd + ((ping) + (print "Got ping from " cname) + (send-message pub cname send-more: #t) + (send-message pub "Got ping") + (loop queuelst)) + ((sync) ;; just process the queue + (print "Got sync from " cname) + (process-queue (cons svect queuelst)) + (loop '())) + ((get) + (process-queue (cons svect queuelst)) + (loop '())) + (else + (loop (cons svect queuelst)))))))) + "server thread")) + +(include "mockupclientlib.scm") + +;; SYNC THREAD +;; send a sync to the pull port +(define th2 (make-thread + (lambda () + (let ((last-action-time (current-seconds))) + (let loop () + (thread-sleep! 5) + (let ((queuelen (string->number (dbaccess "server" 'sync "nada" #f))) + (last-action-delta #f)) + (if (> queuelen 1)(set! last-action-time (current-seconds))) + (set! last-action-delta (- (current-seconds) last-action-time)) + (print "Server: Got queuelen=" queuelen ", last-action-delta=" last-action-delta) + (if (< last-action-delta 60) + (loop) + (print "Server exiting, 25 seconds since last access")))))) + "sync thread")) + +(thread-start! th1) +(thread-start! th2) +(thread-join! th2) + +(let* ((run-time (- (current-seconds) start-time)) + (queries/second (/ total-db-accesses run-time))) + (print "Server exited! Total db accesses=" total-db-accesses " in " run-time " seconds for " queries/second " queries/second with max queue length of: " max-queue-len)) ADDED testnanomsg/pipeline.scm Index: testnanomsg/pipeline.scm ================================================================== --- /dev/null +++ testnanomsg/pipeline.scm @@ -0,0 +1,25 @@ +;; watch nanomsg's pipeline load-balancer in action. +(use nanomsg) + +(define push (nn-socket 'push)) +(define pull1 (nn-socket 'pull)) +(define pull2 (nn-socket 'pull)) + +(nn-bind push "inproc://test") +(nn-connect pull1 "inproc://test") +(nn-connect pull2 "inproc://test") + +(nn-send push "a") +(nn-send push "b") +(nn-send push "c") +(nn-send push "d") + +(define ((th sock)) + (print (current-thread) ": " (nn-recv sock)) + (print (current-thread) ": " (nn-recv sock)) + (print (current-thread) " is done")) + +(thread-start! (th pull1)) +(thread-start! (th pull2)) + +(thread-sleep! 1) ADDED testnanomsg/req-rep.scm Index: testnanomsg/req-rep.scm ================================================================== --- /dev/null +++ testnanomsg/req-rep.scm @@ -0,0 +1,30 @@ +;; watch nanomsg's pipeline load-balancer in action. +(use nanomsg) + +(define req (nn-socket 'req)) +(define rep (nn-socket 'rep)) + +(nn-bind rep "inproc://test") +(nn-connect req "inproc://test") + +(define (client-send-receive soc msg) + (nn-send soc msg) + (nn-recv soc)) + +(define ((server soc)) + (let loop ((msg-in (nn-recv soc))) + (if (not (equal? msg-in "quit")) + (begin + (nn-send soc (conc "hello " msg-in)) + (loop (nn-recv soc)))))) + +(thread-start! (server rep)) + +(print (client-send-receive req "Matt")) +(print (client-send-receive req "Tom")) + +;; (client-send-receive req "quit") + +(nn-close req) +(nn-close rep) +(exit)