Index: Makefile ================================================================== --- Makefile +++ Makefile @@ -4,11 +4,11 @@ INSTALL=install SRCFILES = common.scm items.scm launch.scm \ ods.scm runconfig.scm server.scm configf.scm \ db.scm keys.scm margs.scm megatest-version.scm \ process.scm runs.scm tasks.scm tests.scm genexample.scm \ - http-transport.scm filedb.scm \ + http-transport.scm nmsg-transport.scm filedb.scm \ client.scm gutils.scm synchash.scm daemon.scm mt.scm dcommon.scm \ tree.scm ezsteps.scm lock-queue.scm sdb.scm \ rmt.scm api.scm tdb.scm portlogger.scm # Eggs to install (straightforward ones) Index: api.scm ================================================================== --- api.scm +++ api.scm @@ -121,10 +121,11 @@ (db:with-db dbstruct run-id #t ;; these are all for modifying the db (lambda (db) (db:general-call db stmtname realparams))))) ((sync-inmem->db) (db:sync-touched dbstruct run-id force-sync: #t)) ((sdb-qry) (apply sdb:qry params)) + ((ping) (current-process-id)) ;; TESTMETA ((testmeta-get-record) (apply db:testmeta-get-record dbstruct params)) ((testmeta-add-record) (apply db:testmeta-add-record dbstruct params)) ((testmeta-update-field) (apply db:testmeta-update-field dbstruct params)) Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -65,12 +65,11 @@ (define *db-access-allowed* #t) ;; flag to allow access (define *db-access-mutex* (make-mutex)) ;; SERVER (define *my-client-signature* #f) -(define *transport-type* 'http) -(define *rpc:listener* #f) ;; if set up for server communication this will hold the tcp port +(define *transport-type* 'nm) (define *runremote* (make-hash-table)) ;; if set up for server communication this will hold (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) (define *client-non-blocking-mode* #f) (define *server-id* #f) Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -2276,26 +2276,26 @@ ;;====================================================================== ;; QUEUE UP META, TEST STATUS AND STEPS REMOTE ACCESS ;;====================================================================== ;; NOTE: Can remove the regex and base64 encoding for zmq -(define (db:obj->string obj) - (case *transport-type* +(define (db:obj->string obj #!key (transport 'http)) + (case transport ;; ((fs) obj) ((http fs) (string-substitute (regexp "=") "_" (base64:base64-encode (z3:encode-buffer (with-output-to-string (lambda ()(serialize obj))))) #t)) - ((zmq)(with-output-to-string (lambda ()(serialize obj)))) + ((zmq nm)(with-output-to-string (lambda ()(serialize obj)))) (else obj))) -(define (db:string->obj msg) - (case *transport-type* +(define (db:string->obj msg #!key (transport 'http)) + (case transport ;; ((fs) msg) ((http fs) (if (string? msg) (with-input-from-string (z3:decode-buffer @@ -2304,11 +2304,11 @@ (regexp "_") "=" msg #t))) (lambda ()(deserialize))) (begin (debug:print 0 "ERROR: reception failed. Received " msg " but cannot translate it.") #f))) ;; crude reply for when things go awry - ((zmq)(with-input-from-string msg (lambda ()(deserialize)))) + ((zmq nm)(with-input-from-string msg (lambda ()(deserialize)))) (else msg))) (define (db:test-set-status-state dbstruct run-id test-id status state msg) (let ((dbdat (db:get-db dbstruct run-id))) (if (member state '("LAUNCHED" "REMOTEHOSTSTART")) ADDED nmsg-transport.scm Index: nmsg-transport.scm ================================================================== --- /dev/null +++ nmsg-transport.scm @@ -0,0 +1,374 @@ + +;; Copyright 2006-2012, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +(require-extension (srfi 18) extras tcp s11n) + +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) +(import (prefix sqlite3 sqlite3:)) + +(use nanomsg) + +(declare (unit nmsg-transport)) + +(declare (uses common)) +(declare (uses db)) +(declare (uses tests)) +(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. +(declare (uses server)) + +(include "common_records.scm") +(include "db_records.scm") + +;; Transition to pub --> sub with pull <-- push +;; +;; 1. client sends request to server via push to the pull port +;; 2. server puts request in queue or processes immediately as appropriate +;; 3. server puts responses from completed requests into pub port +;; +;; TODO +;; +;; Done Tested +;; [x] [ ] 1. Add columns pullport pubport to servers table +;; [x] [ ] 2. Add rm of monitor.db if older than 11/12/2012 +;; [x] [ ] 3. Add create of pullport and pubport with finding of available ports +;; [x] [ ] 4. Add client compose of request +;; [x] [ ] - name of client: testname/itempath-test_id-hostname +;; [x] [ ] - name of request: callname, params +;; [x] [ ] - request key: f(clientname, callname, params) +;; [x] [ ] 5. Add processing of subscription hits +;; [x] [ ] - done when get key +;; [x] [ ] - return results +;; [x] [ ] 6. Add timeout processing +;; [x] [ ] - after 60 seconds +;; [ ] [ ] i. check server alive, connect to new if necessary +;; [ ] [ ] ii. resend request +;; [ ] [ ] 7. Turn self ping back on + +(define (nmsg-transport:make-server-url hostport #!key (bindall #f)) + (if (not hostport) + #f + (conc "tcp://" (if bindall "*" (car hostport)) ":" (cadr hostport)))) + +(define *server-loop-heart-beat* (current-seconds)) +(define *heartbeat-mutex* (make-mutex)) + +;;====================================================================== +;; S E R V E R +;;====================================================================== + +(define (nmsg-transport:run dbstruct hostn run-id server-id) + (debug:print 2 "Attempting to start the server ...") + (let* ((start-port (portlogger:open-run-close portlogger:find-port)) + (server-thread (make-thread (lambda () + (nmsg-transport:try-start-server dbstruct run-id start-port server-id)) + "server thread")) + (tdbdat (tasks:open-db))) + (thread-start! server-thread) + (if (nmsg-transport:ping hostn start-port) + (begin + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep") + (set! *server-info* (list hostn start-port)) ;; probably not needed anymore? + (thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access + (set! *inmemdb* dbstruct) + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running") + (thread-start! nmsg-transport:keep-running) + (thread-join! server-thread)) + (begin + (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature") + (portlogger:open-run-close portlogger:set-failed start-port) + (nmsg-transport:run dbstruct hostn run-id server-id))))) + +(define (nmsg-transport:try-start-server dbstruct run-id portnum server-id) + (let ((repsoc (nn-socket 'rep))) + (nn-bind repsoc (conc "tcp://*:" portnum)) + (let loop ((msg-in (nn-recv repsoc))) + (cond + ((equal? msg-in "quit") + (nn-send repsoc "Ok, quitting")) + ((and (>= (string-length msg-in) 4) + (equal? (substring msg-in 0 4) "ping")) + (nn-send repsoc (conc (current-process-id))) + (loop (nn-recv repsoc))) + (else + (let* ((dat (db:string->obj msg-in transport: 'nm)) + (cmd (vector-ref dat 0)) + (params (vector-ref dat 1)) + (result (api:execute-requests dbstruct cmd params)) + (newdat (db:obj->string result transport: 'nm))) + (nn-send repsoc newdat) + (loop (nn-recv repsoc)))))))) + +;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (nmsg-transport:keep-running) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *server-info*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (debug:print 12 "WARNING: server not started yet, waiting few seconds before trying again") + (sleep 4) + (loop)))))) + (iface (cadr server-info)) + (pullport (caddr server-info)) + (pubport (cadddr server-info)) ;; id interface pullport pubport) + ;; (nmsg-sockets (nmsg-transport:client-connect iface pullport pubport)) + (last-access 0)) + (debug:print-info 11 "heartbeat started for nmsg server on " iface " " pullport " " pubport) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + ;; GET REAL QUEUE LENGTH FROM THE VARIABLE + (let ((queue-len 0)) ;; FOR NOW DO NOT DO THIS (cdb:client-call nmsg-sockets 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + ;; (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (if (> (+ last-access + ;; (* 50 60 60) ;; 48 hrs + ;; 60 ;; one minute + ;; (* 60 60) ;; one hour + (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit))))))) + +;; all routes though here end in exit ... +;; +(define (nmsg-transport:launch run-id) + (let* ((tdbdat (tasks:open-db)) + (dbstruct (db:setup run-id)) + (hostn (or (args:get-arg "-server") "-"))) + (set! *run-id* run-id) + ;; with nbfake daemonize isn't really needed + ;; + ;; (if (args:get-arg "-daemonize") + ;; (begin + ;; (daemon:ize) + ;; (if *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it + ;; (begin + ;; (current-error-port *alt-log-file*) + ;; (current-output-port *alt-log-file*))))) + (if (server:check-if-running run-id) + (begin + (debug:print-info 0 "Server for run-id " run-id " already running") + (exit 0))) + (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)) + (remtries 4)) + (if (not server-id) + (if (> remtries 0) + (begin + (thread-sleep! 2) + (if (not (server:check-if-running run-id)) + (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id) + (- remtries 1)) + (begin + (debug:print-info 0 "Another server took the slot, exiting") + (exit 0)))) + (begin + ;; since we didn't get the server lock we are going to clean up and bail out + (debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") + (tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch") + ))) + (nmsg-transport:run dbstruct hostn run-id server-id) + (set! *didsomething* #t) + (exit)))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +(define (nmsg-transport:mk-signature) + (message-digest-string (md5-primitive) + (with-output-to-string + (lambda () + (write (list (current-directory) + (argv))))))) + +;;====================================================================== +;; C L I E N T S +;;====================================================================== + +;; ping the server at host:port +;; return the open socket if successful (return-socket == #t) +;; expect the key expected-key returned in payload +;; send our-key or #f as payload +;; +(define (nmsg-transport:ping hostn port #!key (return-socket #t)(expected-key #f)(our-key #f)) + ;; send a random number along with pid and check that we get it back + (let* ((req (nn-socket 'req)) + (host (if (or (not hostn) + (equal? hostn "-")) ;; use localhost + (get-host-name) + hostn)) + (success #f) + (keepwaiting #t) + (dat (db:obj->string (vector "ping" our-key) transport: 'nm)) + (ping (make-thread + (lambda () + (nn-send req dat) + (let* ((result (nn-recv req)) + (key (vector-ref (db:string->obj result transport: 'nm) 1))) + (if (or (not expect-key) ;; just getting a reply is good enough then + (equal? (conc (current-process-id)) expected-key)) + (begin + ;; (print "ping, success: received \"" result "\"") + (set! success #t)) + (begin + ;; (print "ping, failed: received key \"" result "\"") + (set! keepwaiting #f) + (set! success #f))))) + "ping")) + (timeout (make-thread (lambda () + (let loop ((count 0)) + (thread-sleep! 1) + (print "still waiting after count seconds...") + (if (and keepwaiting (< count 10)) + (loop (+ count 1)))) + (if keepwaiting + (begin + (print "timeout waiting for ping") + (thread-terminate! ping)))) + "timeout"))) + (nn-connect req (conc "tcp://" host ":" port)) + (handle-exceptions + exn + (begin + (print-call-chain) + (print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) + (print "exn=" (condition->list exn)) + (print "ping failed to connect to " host ":" port)) + (thread-start! timeout) + (thread-start! ping) + (thread-join! ping) + (if success (thread-terminate! timeout))) + (if return-socket + (if success req #f) + (begin + (nn-close req) + success)))) + +(define (nmsg-transport:client-connect iface portnum) + (let* ((reqsoc (nmsg-transport:ping iface portnum)) + (login-res #f)) + (nn-connect reqsoc (conc "tcp://" iface ":" portnum)) + (debug:print-info 11 "nmsg-transport:client-connect started. Next is login") + (set! login-res (client:login serverdat nmsg-sockets)) + (if (and (not (null? login-res)) + (car login-res)) + (begin + (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".") + (set! *nm-port* nmsg-sockets) + nmsg-sockets) + (begin + (debug:print-info 2 "Failed to login or connect to " conurl) + (set! *runremote* #f) + #f)))) + +;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (nmsg-transport:keep-running) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *runremote*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (sleep 4) + (loop)))))) + (iface (car server-info)) + (port (cadr server-info)) + (last-access 0) + (tdb (tasks:open-db)) + (spid (tasks:server-get-server-id tdb #f iface port #f))) + (print "Keep-running got server pid " spid ", using iface " iface " and port " port) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (tasks:server-update-heartbeat tdb spid) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (if (> (+ last-access + ;; (* 50 60 60) ;; 48 hrs + ;; 60 ;; one minute + ;; (* 60 60) ;; one hour + (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (tasks:server-deregister-self tdb (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit))))))) + + +(define (nmsg-transport:client-signal-handler signum) + (handle-exceptions + exn + (debug:print " ... exiting ...") + (let ((th1 (make-thread (lambda () + (if (not *received-response*) + (receive-message* *runremote*))) ;; flush out last call if applicable + "eat response")) + (th2 (make-thread (lambda () + (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") + (thread-sleep! 3) ;; give the flush three seconds to do it's stuff + (debug:print 0 " Done.") + (exit 4)) + "exit on ^C timer"))) + (thread-start! th2) + (thread-start! th1) + (thread-join! th2)))) + Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -13,10 +13,11 @@ (declare (unit rmt)) (declare (uses api)) (declare (uses tdb)) (declare (uses http-transport)) +(declare (uses nmsg-transport)) ;; ;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!! ;; @@ -63,13 +64,10 @@ ;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id)) (if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id) (client:setup run-id) #f)))) -;; cmd is a symbol -;; vars is a json string encoding the parameters for the call -;; (define (rmt:send-receive cmd rid params #!key (attemptnum 0)) ;; clean out old connections (mutex-lock! *db-multi-sync-mutex*) (let ((expire-time (- (current-seconds) 60))) (for-each @@ -77,33 +75,28 @@ (let ((connection (hash-table-ref/default *runremote* run-id #f))) (if (and connection (< (http-transport:server-dat-get-last-access connection) expire-time)) (begin (debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses") + ;; SHOULD CLOSE THE CONNECTION HERE (hash-table-delete! *runremote* run-id))))) (hash-table-keys *runremote*))) (mutex-unlock! *db-multi-sync-mutex*) (let* ((run-id (if rid rid 0)) (connection-info (rmt:get-connection-info run-id)) (jparams (db:obj->string params))) (if connection-info ;; use the server if have connection info - (let* ((dat (http-transport:client-api-send-receive run-id connection-info cmd jparams)) + (let* ((dat (case *transport-type* + ((http)(http-transport:client-api-send-receive run-id connection-info cmd jparams)) + ((nm) (nm-transport:client-api-send-receive run-id connection-info cmd jparams)) + (else (exit)))) (res (if (vector? dat) (vector-ref dat 1) #f)) (success (if (vector? dat) (vector-ref dat 0) #f))) (http-transport:server-dat-update-last-access connection-info) (if success (db:string->obj res) - ;; (if (< attemptnum 100) - ;; (begin - ;; (hash-table-delete! *runremote* run-id) - ;; (thread-sleep! 0.5) - ;; (rmt:send-receive cmd rid params attempnum: (+ attemptnum 1))) - ;; (begin - ;; (print-call-chain (current-error-port)) - ;; (debug:print 0 "ERROR: too many attempts to communicate have failed. Giving up. Kill your mtest processes and start over") - ;; (exit 1))))) (begin ;; let ((new-connection-info (client:setup run-id))) (debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.") (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection ;; no longer killing the server in http-transport:client-api-send-receive Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -20,10 +20,11 @@ (declare (uses common)) (declare (uses db)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (declare (uses synchash)) (declare (uses http-transport)) +(declare (uses nmsg-transport)) (declare (uses launch)) ;; (declare (uses zmq-transport)) (declare (uses daemon)) (include "common_records.scm") @@ -47,11 +48,14 @@ ;; all routes though here end in exit ... ;; ;; start_server ;; (define (server:launch run-id) - (http-transport:launch run-id)) + (case *transport-type* + ((http)(http-transport:launch run-id)) + ((nm) (nmsg-transport:launch run-id)) + (else (debug:print 0 "ERROR: unknown server type " *transport-type*)))) ;;====================================================================== ;; Q U E U E M A N A G E M E N T ;;====================================================================== ADDED testnanomsg/basic-req-rep.scm Index: testnanomsg/basic-req-rep.scm ================================================================== --- /dev/null +++ testnanomsg/basic-req-rep.scm @@ -0,0 +1,3 @@ +(use nanomsg srfi-18 sqlite3 numbers) + +(define resp (nn-socket 'rep)) ADDED testnanomsg/mockupclient.scm Index: testnanomsg/mockupclient.scm ================================================================== --- /dev/null +++ testnanomsg/mockupclient.scm @@ -0,0 +1,42 @@ +(use zmq posix numbers) + +(define cname "Bob") +(define runtime 10) +(let ((args (argv))) + (if (< (length args) 3) + (begin + (print "Usage: mockupclient clientname runtime") + (exit)) + (begin + (set! cname (cadr args)) + (set! runtime (string->number (caddr args)))))) + +;; (define start-delay (/ (random 100) 9)) +;; (define runtime (+ 1 (/ (random 200) 2))) + +(print "Starting client " cname " with runtime " runtime) + +(include "mockupclientlib.scm") + +(set! endtime (+ (current-seconds) runtime)) + +;; first ping the server to ensure we have a connection +(if (server-ping cname 5) + (print "SUCCESS: Client " cname " connected to server") + (begin + (print "ERROR: Client " cname " failed ping of server, exiting") + (exit))) + +(let loop () + (let ((x (random 15)) + (varname (list-ref (list "hello" "goodbye" "saluton" "kiaorana")(random 4)))) + (case x + ;; ((1)(dbaccess cname 'sync "nodat" #f)) + ((2 3 4 5)(dbaccess cname 'set varname (random 999))) + ((6 7 8 9 10)(print cname ": Get \"" varname "\" " (dbaccess cname 'get varname #f))) + (else + (thread-sleep! 0.011))) + (if (< (current-seconds) endtime) + (loop)))) + +(print "Client " cname " all done!!") ADDED testnanomsg/mockupclientlib.scm Index: testnanomsg/mockupclientlib.scm ================================================================== --- /dev/null +++ testnanomsg/mockupclientlib.scm @@ -0,0 +1,58 @@ +(define reqs (nn-socket 'req)) + +(connect-socket reqs "tcp://localhost:6563") + +(thread-sleep! 0.2) + +(define (server-ping cname timeout) + (let ((msg (conc cname ":ping:" timeout)) + (maxtime (+ (current-seconds) timeout))) + (print "pinging server from " cname " with timeout " timeout) + (let loop ((res #f)) + (if (< maxtime (current-seconds)) + #f ;; failed to ping + (if (equal? res "Got ping") + #t + (begin + (print "Ping received from server " res) + (send-message push msg) + (thread-sleep! 0.1) + (loop (receive-message sub non-blocking: #t)))))))) + +(define (dbaccess cname cmd var val #!key (numtries 20)) + (let* ((msg (conc cname ":" cmd ":" (if val (conc var " " val) var))) + (res #f) + (mtx1 (make-mutex)) + (do-access (lambda () + (let ((tmpres #f)) + (print "Sending msg: " msg) + (send-message push msg) + (print "Message " msg " sent") + (print "Client " cname " waiting for response to " msg) + (print "Client " cname " received address " (receive-message* sub)) + (set! tmpres (receive-message* sub)) + (mutex-lock! mtx1) + (set! res tmpres) + (mutex-unlock! mtx1)))) + (th1 (make-thread do-access "do access")) + (th2 (make-thread (lambda () + (let ((result #f)) + (mutex-lock! mtx1) + (set! result res) + (mutex-unlock! mtx1) + (thread-sleep! 5) + (if (not result) + (if (> numtries 0) + (begin + (print "WARNING: access timed out for " cname ", trying again. Trys remaining=" numtries) + (dbaccess cname cmd var val numtries: (- numtries 1))) + (begin + (print "ERROR: dbaccess timed out. Exiting") + (exit))))) + "timeout thread")))) + (thread-start! th1) + (thread-start! th2) + (thread-join! th1) + (if res (print "SUCCESS: received " res " with " numtries " remaining possible attempts")) + res)) + ADDED testnanomsg/mockupserver.scm Index: testnanomsg/mockupserver.scm ================================================================== --- /dev/null +++ testnanomsg/mockupserver.scm @@ -0,0 +1,146 @@ +;; pub/sub with envelope address +;; Note that if you don't insert a sleep, the server will crash with SIGPIPE as soon +;; as a client disconnects. Also a remaining client may receive tons of +;; messages afterward. + +(use nanomsg srfi-18 sqlite3 numbers) + +(define resp (nn-socket 'rep)) +(define cname "server") +(define total-db-accesses 0) +(define start-time (current-seconds)) + +(nn-bind resp "tcp://*:6563") + +(thread-sleep! 0.2) + +(define (open-db) + (let* ((dbpath "mockup.db") + (dbexists (file-exists? dbpath)) + (db (open-database dbpath)) ;; (never-give-up-open-db dbpath)) + (handler (make-busy-timeout 10))) + (set-busy-handler! db handler) + (if (not dbexists) + (for-each + (lambda (stmt) + (execute db stmt)) + (list + "PRAGMA SYNCHRONOUS=0;" + "CREATE TABLE clients (id INTEGER PRIMARY KEY,name TEXT,num_accesses INTEGER DEFAULT 0);" + "CREATE TABLE vars (var TEXT,val TEXT,CONSTRAINT vars_constraint UNIQUE (var));"))) + db)) + +(define cid-cache (make-hash-table)) + +(define (get-client-id db cname) + (let ((cid (hash-table-ref/default cid-cache cname #f))) + (if cid + cid + (begin + (execute db "INSERT OR REPLACE INTO clients (name) VALUES(?);" cname) + (for-each-row + (lambda (id) + (set! cid id)) + db + "SELECT id FROM clients WHERE name=?;" cname) + (hash-table-set! cid-cache cname cid) + (set! total-db-accesses (+ total-db-accesses 2)) + cid)))) + +(define (count-client db cname) + (let ((cid (get-client-id db cname))) + (execute db "UPDATE clients SET num_accesses=num_accesses+1 WHERE id=?;" cid) + (set! total-db-accesses (+ total-db-accesses 1)) + )) + +(define db (open-db)) +;; (define queuelst '()) +;; (define mx1 (make-mutex)) + +(define max-queue-len 0) + +(define (process-queue queuelst) + (let ((queuelen (length queuelst))) + (if (> queuelen max-queue-len) + (set! max-queue-len queuelen)) + (for-each + (lambda (item) + (let ((cname (vector-ref item 1)) + (clcmd (vector-ref item 2)) + (cdata (vector-ref item 3))) + (send-message pub cname send-more: #t) + (send-message pub (case clcmd + ((sync) + (conc queuelen)) + ((set) + (set! total-db-accesses (+ total-db-accesses 1)) + (apply execute db "INSERT OR REPLACE INTO vars (var,val) VALUES (?,?);" (string-split cdata)) + "ok") + ((get) + (set! total-db-accesses (+ total-db-accesses 1)) + (let ((res "noval")) + (for-each-row + (lambda (val) + (set! res val)) + db + "SELECT val FROM vars WHERE var=?;" cdata) + res)) + (else (conc "unk cmd: " clcmd)))))) + queuelst))) + +;; SERVER THREAD +(define th1 (make-thread + (lambda () + (let ((last-run 0)) ;; current-seconds when run last + (let loop ((queuelst '())) + (let* ((indat (receive-message* pull)) + (parts (string-split indat ":")) + (cname (car parts)) ;; client name + (clcmd (string->symbol (cadr parts))) ;; client cmd + (cdata (caddr parts)) ;; client data + (svect (vector (current-seconds) cname clcmd cdata))) ;; record for the queue + ;; (print "Server received message: " indat) + (count-client db cname) + (case clcmd + ((ping) + (print "Got ping from " cname) + (send-message pub cname send-more: #t) + (send-message pub "Got ping") + (loop queuelst)) + ((sync) ;; just process the queue + (print "Got sync from " cname) + (process-queue (cons svect queuelst)) + (loop '())) + ((get) + (process-queue (cons svect queuelst)) + (loop '())) + (else + (loop (cons svect queuelst)))))))) + "server thread")) + +(include "mockupclientlib.scm") + +;; SYNC THREAD +;; send a sync to the pull port +(define th2 (make-thread + (lambda () + (let ((last-action-time (current-seconds))) + (let loop () + (thread-sleep! 5) + (let ((queuelen (string->number (dbaccess "server" 'sync "nada" #f))) + (last-action-delta #f)) + (if (> queuelen 1)(set! last-action-time (current-seconds))) + (set! last-action-delta (- (current-seconds) last-action-time)) + (print "Server: Got queuelen=" queuelen ", last-action-delta=" last-action-delta) + (if (< last-action-delta 60) + (loop) + (print "Server exiting, 25 seconds since last access")))))) + "sync thread")) + +(thread-start! th1) +(thread-start! th2) +(thread-join! th2) + +(let* ((run-time (- (current-seconds) start-time)) + (queries/second (/ total-db-accesses run-time))) + (print "Server exited! Total db accesses=" total-db-accesses " in " run-time " seconds for " queries/second " queries/second with max queue length of: " max-queue-len)) ADDED testnanomsg/pipeline.scm Index: testnanomsg/pipeline.scm ================================================================== --- /dev/null +++ testnanomsg/pipeline.scm @@ -0,0 +1,25 @@ +;; watch nanomsg's pipeline load-balancer in action. +(use nanomsg) + +(define push (nn-socket 'push)) +(define pull1 (nn-socket 'pull)) +(define pull2 (nn-socket 'pull)) + +(nn-bind push "inproc://test") +(nn-connect pull1 "inproc://test") +(nn-connect pull2 "inproc://test") + +(nn-send push "a") +(nn-send push "b") +(nn-send push "c") +(nn-send push "d") + +(define ((th sock)) + (print (current-thread) ": " (nn-recv sock)) + (print (current-thread) ": " (nn-recv sock)) + (print (current-thread) " is done")) + +(thread-start! (th pull1)) +(thread-start! (th pull2)) + +(thread-sleep! 1) ADDED testnanomsg/req-rep-client.scm Index: testnanomsg/req-rep-client.scm ================================================================== --- /dev/null +++ testnanomsg/req-rep-client.scm @@ -0,0 +1,30 @@ +;; watch nanomsg's pipeline load-balancer in action. +(use nanomsg) + +(define req (nn-socket 'req)) + +(nn-connect req "tcp://localhost:22022") + +;; (with-output-to-string (lambda ()(serialize obj))) +(define (client-send-receive soc msg) + (nn-send soc msg) + (nn-recv soc)) + +(define ((talk-to-server soc)) + (let loop ((cnt 20)) + (let ((name (list-ref '("Matt" "Tom" "Bob" "Jill" "James" "Jane")(random 6)))) + (print "Sending " name) + (print (client-send-receive req name)) + (if (> cnt 0)(loop (- cnt 1))))) + (print (client-send-receive req "quit")) + (nn-close req) + (exit)) + +;; (thread-start! (lambda () +;; (thread-sleep! 20) +;; (print "Give up on waiting for the server") +;; (nn-close req) +;; (exit))) + +(thread-join! (thread-start! (talk-to-server req))) + ADDED testnanomsg/req-rep-server.scm Index: testnanomsg/req-rep-server.scm ================================================================== --- /dev/null +++ testnanomsg/req-rep-server.scm @@ -0,0 +1,90 @@ +;; watch nanomsg's pipeline load-balancer in action. +(use nanomsg) + +;; (use trace) +;; (trace nn-bind nn-socket nn-assert nn-recv nn-send thread-terminate! nn-close ) + +(define port 22022) +(define host "127.0.0.1") + +(define rep (nn-socket 'rep)) + +(print "connecting, got: " (nn-bind rep (conc "tcp://" "*" ":" port))) + +(define (server soc) + (print "server starting") + (let loop ((msg-in (nn-recv soc))) + (print "server received: " msg-in) + (cond + ((equal? msg-in "quit") + (nn-send soc "Ok, quitting")) + ((and (>= (string-length msg-in) 4) + (equal? (substring msg-in 0 4) "ping")) + (nn-send soc (conc (current-process-id))) + (loop (nn-recv soc))) + ;;((and (>= (string-length msg-in) + (else + (let ((this-task (random 15))) + (thread-sleep! this-task) + (nn-send soc (conc "hello " msg-in " this task took " this-task " seconds to complete")) + (loop (nn-recv soc))))))) + +(define (ping-self host port #!key (return-socket #t)) + ;; send a random number along with pid and check that we get it back + (let* ((req (nn-socket 'req)) + (key "ping") + (success #f) + (keepwaiting #t) + (ping (make-thread + (lambda () + (print "ping: sending string \"" key "\", expecting " (current-process-id)) + (nn-send req key) + (let ((result (nn-recv req))) + (if (equal? (conc (current-process-id)) result) + (begin + (print "ping, success: received \"" result "\"") + (set! success #t)) + (begin + (print "ping, failed: received key \"" result "\"") + (set! keepwaiting #f) + (set! success #f))))) + "ping")) + (timeout (make-thread (lambda () + (let loop ((count 0)) + (thread-sleep! 1) + (print "still waiting after count seconds...") + (if (and keepwaiting (< count 10)) + (loop (+ count 1)))) + (if keepwaiting + (begin + (print "timeout waiting for ping") + (thread-terminate! ping)))) + "timeout"))) + (nn-connect req (conc "tcp://" host ":" port)) + (handle-exceptions + exn + (begin + (print-call-chain) + (print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) + (print "exn=" (condition->list exn)) + (print "ping failed to connect to " host ":" port)) + (thread-start! timeout) + (thread-start! ping) + (thread-join! ping) + (if success (thread-terminate! timeout))) + (if return-socket + (if success req #f) + (begin + (nn-close req) + success)))) + +(let ((server-thread (make-thread (lambda ()(server rep)) "server"))) + (thread-start! server-thread) + ;; (thread-sleep! 1) + (if (ping-self host port) + (begin + (thread-join! server-thread) + (nn-close rep)) + (print "ping failed"))) + +(exit) ADDED testnanomsg/req-rep.scm Index: testnanomsg/req-rep.scm ================================================================== --- /dev/null +++ testnanomsg/req-rep.scm @@ -0,0 +1,30 @@ +;; watch nanomsg's pipeline load-balancer in action. +(use nanomsg) + +(define req (nn-socket 'req)) +(define rep (nn-socket 'rep)) + +(nn-bind rep "inproc://test") +(nn-connect req "inproc://test") + +(define (client-send-receive soc msg) + (nn-send soc msg) + (nn-recv soc)) + +(define ((server soc)) + (let loop ((msg-in (nn-recv soc))) + (if (not (equal? msg-in "quit")) + (begin + (nn-send soc (conc "hello " msg-in)) + (loop (nn-recv soc)))))) + +(thread-start! (server rep)) + +(print (client-send-receive req "Matt")) +(print (client-send-receive req "Tom")) + +;; (client-send-receive req "quit") + +(nn-close req) +(nn-close rep) +(exit) Index: tests/Makefile ================================================================== --- tests/Makefile +++ tests/Makefile @@ -34,11 +34,11 @@ cd ..;make -j && make install cd fullrun;$(MEGATEST) -stop-server 0 repl : cd ..;make -j && make install - cd fullrun;$(MEGATEST) -repl + cd fullrun;$(MEGATEST) -:b -repl test0 : cleanprep cd simplerun ; $(MEGATEST) -server - -debug $(DEBUG) test1 : cleanprep