Index: Makefile ================================================================== --- Makefile +++ Makefile @@ -4,11 +4,11 @@ INSTALL=install SRCFILES = common.scm items.scm launch.scm \ ods.scm runconfig.scm server.scm configf.scm \ db.scm keys.scm margs.scm megatest-version.scm \ process.scm runs.scm tasks.scm tests.scm genexample.scm \ - http-transport.scm filedb.scm \ + http-transport.scm nmsg-transport.scm filedb.scm \ client.scm gutils.scm synchash.scm daemon.scm mt.scm dcommon.scm \ tree.scm ezsteps.scm lock-queue.scm sdb.scm \ rmt.scm api.scm tdb.scm portlogger.scm # Eggs to install (straightforward ones) Index: api.scm ================================================================== --- api.scm +++ api.scm @@ -121,10 +121,11 @@ (db:with-db dbstruct run-id #t ;; these are all for modifying the db (lambda (db) (db:general-call db stmtname realparams))))) ((sync-inmem->db) (db:sync-touched dbstruct run-id force-sync: #t)) ((sdb-qry) (apply sdb:qry params)) + ((ping) (current-process-id)) ;; TESTMETA ((testmeta-get-record) (apply db:testmeta-get-record dbstruct params)) ((testmeta-add-record) (apply db:testmeta-add-record dbstruct params)) ((testmeta-update-field) (apply db:testmeta-update-field dbstruct params)) Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -65,12 +65,11 @@ (define *db-access-allowed* #t) ;; flag to allow access (define *db-access-mutex* (make-mutex)) ;; SERVER (define *my-client-signature* #f) -(define *transport-type* 'http) -(define *rpc:listener* #f) ;; if set up for server communication this will hold the tcp port +(define *transport-type* 'nm) (define *runremote* (make-hash-table)) ;; if set up for server communication this will hold (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) (define *client-non-blocking-mode* #f) (define *server-id* #f) Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -2276,26 +2276,26 @@ ;;====================================================================== ;; QUEUE UP META, TEST STATUS AND STEPS REMOTE ACCESS ;;====================================================================== ;; NOTE: Can remove the regex and base64 encoding for zmq -(define (db:obj->string obj) - (case *transport-type* +(define (db:obj->string obj #!key (transport 'http)) + (case transport ;; ((fs) obj) ((http fs) (string-substitute (regexp "=") "_" (base64:base64-encode (z3:encode-buffer (with-output-to-string (lambda ()(serialize obj))))) #t)) - ((zmq)(with-output-to-string (lambda ()(serialize obj)))) + ((zmq nm)(with-output-to-string (lambda ()(serialize obj)))) (else obj))) -(define (db:string->obj msg) - (case *transport-type* +(define (db:string->obj msg #!key (transport 'http)) + (case transport ;; ((fs) msg) ((http fs) (if (string? msg) (with-input-from-string (z3:decode-buffer @@ -2304,11 +2304,11 @@ (regexp "_") "=" msg #t))) (lambda ()(deserialize))) (begin (debug:print 0 "ERROR: reception failed. Received " msg " but cannot translate it.") #f))) ;; crude reply for when things go awry - ((zmq)(with-input-from-string msg (lambda ()(deserialize)))) + ((zmq nm)(with-input-from-string msg (lambda ()(deserialize)))) (else msg))) (define (db:test-set-status-state dbstruct run-id test-id status state msg) (let ((dbdat (db:get-db dbstruct run-id))) (if (member state '("LAUNCHED" "REMOTEHOSTSTART")) Index: nmsg-transport.scm ================================================================== --- nmsg-transport.scm +++ nmsg-transport.scm @@ -49,105 +49,63 @@ ;; [x] [ ] - after 60 seconds ;; [ ] [ ] i. check server alive, connect to new if necessary ;; [ ] [ ] ii. resend request ;; [ ] [ ] 7. Turn self ping back on -(define (nmsg-transport:make-server-url hostport) +(define (nmsg-transport:make-server-url hostport #!key (bindall #f)) (if (not hostport) #f - (conc "tcp://" (car hostport) ":" (cadr hostport)))) + (conc "tcp://" (if bindall "*" (car hostport)) ":" (cadr hostport)))) (define *server-loop-heart-beat* (current-seconds)) (define *heartbeat-mutex* (make-mutex)) ;;====================================================================== ;; S E R V E R ;;====================================================================== -(define-inline (nmsgsock:get-pub dat)(vector-ref dat 0)) -(define-inline (nmsgsock:get-pull dat)(vector-ref dat 1)) -(define-inline (nmsgsock:set-pub! dat s)(vector-set! dat s 0)) -(define-inline (nmsgsock:set-pull! dat s)(vector-set! dat s 0)) - -(define (nmsg-transport:run hostn) +(define (nmsg-transport:run dbstruct hostn run-id server-id) (debug:print 2 "Attempting to start the server ...") - (if (not *toppath*) - (if (not (launch:setup-for-run)) - (begin - (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") - (exit)))) - (let* ((db (open-db)) ;; here we *do not* want to be opening and closing the db - (nmsg-sdat1 #f) - (nmsg-sdat2 #f) - (pull-socket #f) - (pub-socket #f) - (p1 #f) - (p2 #f) - (nmsg-sockets-dat #f) - (iface (if (string=? "-" hostn) - "*" ;; (get-host-name) - hostn)) - (hostname (get-host-name)) - (ipaddrstr (let ((ipstr (if (string=? "-" hostn) - (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") - #f))) - (if ipstr ipstr hostname))) - (last-run 0)) - (set! nmsg-sockets-dat (nmsg-transport:setup-ports ipaddrstr (if (args:get-arg "-port") - (string->number (args:get-arg "-port")) - (+ 5000 (random 1001))))) - - (set! nmsg-sdat1 (car nmsg-sockets-dat)) - (set! pull-socket (cadr nmsg-sdat1)) ;; (iface s port) - (set! p1 (caddr nmsg-sdat1)) - - (set! nmsg-sdat2 (cadr nmsg-sockets-dat)) - (set! pub-socket (cadr nmsg-sdat2)) - (set! p2 (caddr nmsg-sdat2)) - - (set! *cache-on* #t) - - (set! *runremote* (vector pull-socket pub-socket)) ;; overloading the use of *runremote* BUG!? - - ;; what to do when we quit - ;; -;; (on-exit (lambda () -;; (if (and *toppath* *server-info*) -;; (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*)) -;; (let loop () -;; (let ((queue-len 0)) -;; (thread-sleep! (random 5)) -;; (mutex-lock! *incoming-mutex*) -;; (set! queue-len (length *incoming-data*)) -;; (mutex-unlock! *incoming-mutex*) -;; (if (> queue-len 0) -;; (begin -;; (debug:print-info 0 "Queue not flushed, waiting ...") -;; (loop)))))))) - - ;; The heavy lifting - ;; - ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime - ;; - (debug:print-info 11 "Server setup complete, start listening for messages") - (let loop ((queue-lst '())) - (let* ((rawmsg (receive-message* pull-socket)) - (packet (db:string->obj rawmsg)) - (qtype (cdb:packet-get-qtype packet))) - (debug:print-info 12 "server=> received packet=" packet) - (if (not (member qtype '(sync ping))) - (begin - (mutex-lock! *heartbeat-mutex*) - (set! *last-db-access* (current-seconds)) - (mutex-unlock! *heartbeat-mutex*))) - (if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue - (begin - (db:process-queue-item db packet) - ;; (open-run-close db:process-queue #f pub-socket (cons packet queue-lst)) - - (loop '())) - (loop (cons packet queue-lst))))))) + (let* ((start-port (portlogger:open-run-close portlogger:find-port)) + (server-thread (make-thread (lambda () + (nmsg-transport:try-start-server dbstruct run-id start-port server-id)) + "server thread")) + (tdbdat (tasks:open-db))) + (thread-start! server-thread) + (if (nmsg-transport:ping hostn start-port) + (begin + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep") + (set! *server-info* (list hostn start-port)) ;; probably not needed anymore? + (thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access + (set! *inmemdb* dbstruct) + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running") + (thread-start! nmsg-transport:keep-running) + (thread-join! server-thread)) + (begin + (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature") + (portlogger:open-run-close portlogger:set-failed start-port) + (nmsg-transport:run dbstruct hostn run-id server-id))))) + +(define (nmsg-transport:try-start-server dbstruct run-id portnum server-id) + (let ((repsoc (nn-socket 'rep))) + (nn-bind repsoc (conc "tcp://*:" portnum)) + (let loop ((msg-in (nn-recv repsoc))) + (cond + ((equal? msg-in "quit") + (nn-send repsoc "Ok, quitting")) + ((and (>= (string-length msg-in) 4) + (equal? (substring msg-in 0 4) "ping")) + (nn-send repsoc (conc (current-process-id))) + (loop (nn-recv repsoc))) + (else + (let* ((dat (db:string->obj msg-in transport: 'nm)) + (cmd (vector-ref dat 0)) + (params (vector-ref dat 1)) + (result (api:execute-requests dbstruct cmd params)) + (newdat (db:obj->string result transport: 'nm))) + (nn-send repsoc newdat) + (loop (nn-recv repsoc)))))))) ;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (nmsg-transport:keep-running) @@ -178,11 +136,11 @@ ;; (print "Server running, count is " count) (if (< count 1) ;; 3x3 = 9 secs aprox (loop (+ count 1))) ;; NOTE: Get rid of this mechanism! It really is not needed... - (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) + ;; (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access (mutex-lock! *heartbeat-mutex*) (set! last-access *last-db-access*) (mutex-unlock! *heartbeat-mutex*) @@ -204,101 +162,136 @@ (thread-sleep! 1) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") (exit))))))) -(define (nmsg-transport:find-free-port-and-open iface s port stype #!key (trynum 50)) - (let ((s (if s s (make-socket stype))) - (p (if (number? port) port 5555)) - (old-handler (current-exception-handler))) - (handle-exceptions - exn - (begin - (debug:print 0 "Failed to bind to port " p ", trying next port") - (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) - ;; (old-handler) - ;; (print-call-chain) - (if (> trynum 0) - (nmsg-transport:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1)) - (debug:print-info 0 "Tried ports up to " p - " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use")) - (exit)) ;; To exit or not? That is the question. - (let ((nmsg-url (conc "tcp://" iface ":" p))) - (debug:print 2 "Trying to start server on " nmsg-url) - (bind-socket s nmsg-url) - (list iface s port))))) - -(define (nmsg-transport:setup-ports ipaddrstr startport) - (let* ((s1 (nmsg-transport:find-free-port-and-open ipaddrstr #f startport 'pull)) - (p1 (caddr s1)) - (s2 (nmsg-transport:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub)) - (p2 (caddr s2))) - (set! *runremote* #f) - (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2) - (mutex-lock! *heartbeat-mutex*) - (set! *server-info* (open-run-close tasks:server-register - tasks:open-db - (current-process-id) - ipaddrstr p1 - 0 - 'live - 'nmsg - pubport: p2)) - (debug:print-info 11 "*server-info* set to " *server-info*) - (mutex-unlock! *heartbeat-mutex*) - (list s1 s2))) +;; all routes though here end in exit ... +;; +(define (nmsg-transport:launch run-id) + (let* ((tdbdat (tasks:open-db)) + (dbstruct (db:setup run-id)) + (hostn (or (args:get-arg "-server") "-"))) + (set! *run-id* run-id) + ;; with nbfake daemonize isn't really needed + ;; + ;; (if (args:get-arg "-daemonize") + ;; (begin + ;; (daemon:ize) + ;; (if *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it + ;; (begin + ;; (current-error-port *alt-log-file*) + ;; (current-output-port *alt-log-file*))))) + (if (server:check-if-running run-id) + (begin + (debug:print-info 0 "Server for run-id " run-id " already running") + (exit 0))) + (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)) + (remtries 4)) + (if (not server-id) + (if (> remtries 0) + (begin + (thread-sleep! 2) + (if (not (server:check-if-running run-id)) + (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id) + (- remtries 1)) + (begin + (debug:print-info 0 "Another server took the slot, exiting") + (exit 0)))) + (begin + ;; since we didn't get the server lock we are going to clean up and bail out + (debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") + (tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch") + ))) + (nmsg-transport:run dbstruct hostn run-id server-id) + (set! *didsomething* #t) + (exit)))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== (define (nmsg-transport:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) (argv))))))) -;;====================================================================== -;; S E R V E R U T I L I T I E S -;;====================================================================== - ;;====================================================================== ;; C L I E N T S ;;====================================================================== -;; -(define (nmsg-transport:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '())) - (debug:print-info 3 "client-connect " iface ":" port ", type=" type ", subscriptions=" subscriptions) - (let ((connect-ok #f) - (nmsg-socket (if context - (make-socket type context) - (make-socket type))) - (conurl (nmsg-transport:make-server-url (list iface port)))) - (if (socket? nmsg-socket) - (begin - ;; first apply subscriptions - (for-each (lambda (subscription) - (debug:print 2 "Subscribing to " subscription) - (socket-option-set! nmsg-socket 'subscribe subscription)) - subscriptions) - (connect-socket nmsg-socket conurl) - nmsg-socket) - (begin - (debug:print 0 "ERROR: Failed to open socket to " conurl) - #f)))) - -(define (nmsg-transport:client-connect iface pullport pubport) - (let* ((push-socket (nmsg-transport:client-socket-connect iface pullport type: 'push)) - (sub-socket (nmsg-transport:client-socket-connect iface pubport - type: 'sub - subscriptions: (list (client:get-signature) "all"))) - (nmsg-sockets (vector push-socket sub-socket)) - (login-res #f)) +;; ping the server at host:port +;; return the open socket if successful (return-socket == #t) +;; expect the key expected-key returned in payload +;; send our-key or #f as payload +;; +(define (nmsg-transport:ping hostn port #!key (return-socket #t)(expected-key #f)(our-key #f)) + ;; send a random number along with pid and check that we get it back + (let* ((req (nn-socket 'req)) + (host (if (or (not hostn) + (equal? hostn "-")) ;; use localhost + (get-host-name) + hostn)) + (success #f) + (keepwaiting #t) + (dat (db:obj->string (vector "ping" our-key) transport: 'nm)) + (ping (make-thread + (lambda () + (nn-send req dat) + (let* ((result (nn-recv req)) + (key (vector-ref (db:string->obj result transport: 'nm) 1))) + (if (or (not expect-key) ;; just getting a reply is good enough then + (equal? (conc (current-process-id)) expected-key)) + (begin + ;; (print "ping, success: received \"" result "\"") + (set! success #t)) + (begin + ;; (print "ping, failed: received key \"" result "\"") + (set! keepwaiting #f) + (set! success #f))))) + "ping")) + (timeout (make-thread (lambda () + (let loop ((count 0)) + (thread-sleep! 1) + (print "still waiting after count seconds...") + (if (and keepwaiting (< count 10)) + (loop (+ count 1)))) + (if keepwaiting + (begin + (print "timeout waiting for ping") + (thread-terminate! ping)))) + "timeout"))) + (nn-connect req (conc "tcp://" host ":" port)) + (handle-exceptions + exn + (begin + (print-call-chain) + (print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) + (print "exn=" (condition->list exn)) + (print "ping failed to connect to " host ":" port)) + (thread-start! timeout) + (thread-start! ping) + (thread-join! ping) + (if success (thread-terminate! timeout))) + (if return-socket + (if success req #f) + (begin + (nn-close req) + success)))) + +(define (nmsg-transport:client-connect iface portnum) + (let* ((reqsoc (nmsg-transport:ping iface portnum)) + (login-res #f)) + (nn-connect reqsoc (conc "tcp://" iface ":" portnum)) (debug:print-info 11 "nmsg-transport:client-connect started. Next is login") (set! login-res (client:login serverdat nmsg-sockets)) (if (and (not (null? login-res)) (car login-res)) (begin (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".") - (set! *runremote* nmsg-sockets) + (set! *nm-port* nmsg-sockets) nmsg-sockets) (begin (debug:print-info 2 "Failed to login or connect to " conurl) (set! *runremote* #f) #f)))) @@ -358,49 +351,10 @@ (thread-sleep! 1) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") (exit))))))) -;; all routes though here end in exit ... -(define (nmsg-transport:launch) - (if (not *toppath*) - (if (not (launch:setup-for-run)) - (begin - (debug:print 0 "ERROR: cannot find megatest.config, exiting") - (exit)))) - (debug:print-info 2 "Starting nmsg server") - (if *toppath* - (let* (;; (th1 (make-thread (lambda () - ;; (let ((server-info #f)) - ;; ;; wait for the server to be online and available - ;; (let loop () - ;; (debug:print-info 2 "Waiting for the server to come online before starting heartbeat") - ;; (thread-sleep! 2) - ;; (mutex-lock! *heartbeat-mutex*) - ;; (set! server-info *server-info* ) - ;; (mutex-unlock! *heartbeat-mutex*) - ;; (if (not server-info)(loop))) - ;; (debug:print 2 "Server alive, starting self-ping") - ;; (nmsg-transport:self-ping server-info) - ;; )) - ;; "Self ping")) - (th2 (make-thread (lambda () - (nmsg-transport:run - (if (args:get-arg "-server") - (args:get-arg "-server") - "-"))) "Server run")) - ;; (th3 (make-thread (lambda ()(nmsg-transport:keep-running)) "Keep running")) - ) - (set! *client-non-blocking-mode* #t) - ;; (thread-start! th1) - (thread-start! th2) - ;; (thread-start! th3) - (set! *didsomething* #t) - ;; (thread-join! th3) - (thread-join! th2) - ) - (debug:print 0 "ERROR: Failed to setup for megatest"))) (define (nmsg-transport:client-signal-handler signum) (handle-exceptions exn (debug:print " ... exiting ...") @@ -416,78 +370,5 @@ "exit on ^C timer"))) (thread-start! th2) (thread-start! th1) (thread-join! th2)))) -(define (nmsg-transport:client-launch) - (set-signal-handler! signal/int nmsg-transport:client-signal-handler) - (if (nmsg-transport:client-setup) - (debug:print-info 2 "connected as client") - (begin - (debug:print 0 "ERROR: Failed to connect as client") - (exit)))) - -;;====================================================================== -;; Defunct functions -;;====================================================================== - -;; ping a server and return number of clients or #f (if no response) -;; NOT IN USE! -(define (nmsg-transport:ping host port #!key (secs 10)(return-socket #f)) - (cdb:use-non-blocking-mode - (lambda () - (let* ((res #f) - (th1 (make-thread - (lambda () - (let* ((nmsg-context (make-context 1)) - (nmsg-socket (nmsg-transport:client-connect host port context: nmsg-context))) - (if nmsg-socket - (if (nmsg-transport:client-login nmsg-socket) - (let ((numclients (cdb:num-clients nmsg-socket))) - (if (not return-socket) - (begin - (nmsg-transport:client-logout nmsg-socket) - (close-socket nmsg-socket))) - (set! res (list #t numclients (if return-socket nmsg-socket #f)))) - (begin - ;; (close-socket nmsg-socket) - (set! res (list #f "CAN'T LOGIN" #f)))) - (set! res (list #f "CAN'T CONNECT" #f))))) - "Ping: th1")) - (th2 (make-thread - (lambda () - (let loop ((count 1)) - (debug:print-info 1 "Ping " count " server on " host " at port " port) - (thread-sleep! 2) - (if (< count (/ secs 2)) - (loop (+ count 1)))) - ;; (thread-terminate! th1) - (set! res (list #f "TIMED OUT" #f))) - "Ping: th2"))) - (thread-start! th2) - (thread-start! th1) - (handle-exceptions - exn - (set! res (list #f "TIMED OUT" #f)) - (thread-join! th1 secs)) - res)))) - -;; (define (nmsg-transport:self-ping server-info) -;; ;; server-info: server-id interface pullport pubport -;; (let ((iface (list-ref server-info 1)) -;; (pullport (list-ref server-info 2)) -;; (pubport (list-ref server-info 3))) -;; (nmsg-transport:client-connect iface pullport pubport) -;; (let loop () -;; (thread-sleep! 2) -;; (cdb:client-call *runremote* 'ping #t) -;; (debug:print 4 "nmsg-transport:self-ping - I'm alive on " iface ":" pullport "/" pubport "!") -;; (mutex-lock! *heartbeat-mutex*) -;; (set! *server-loop-heart-beat* (current-seconds)) -;; (mutex-unlock! *heartbeat-mutex*) -;; (loop)))) - -(define (nmsg-transport:reply pubsock target query-sig success/fail result) - (debug:print-info 11 "nmsg-transport:reply target=" target ", result=" result) - (send-message pubsock target send-more: #t) - (send-message pubsock (db:obj->string (vector success/fail query-sig result)))) - Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -13,10 +13,11 @@ (declare (unit rmt)) (declare (uses api)) (declare (uses tdb)) (declare (uses http-transport)) +(declare (uses nmsg-transport)) ;; ;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!! ;; @@ -63,13 +64,10 @@ ;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id)) (if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id) (client:setup run-id) #f)))) -;; cmd is a symbol -;; vars is a json string encoding the parameters for the call -;; (define (rmt:send-receive cmd rid params #!key (attemptnum 0)) ;; clean out old connections (mutex-lock! *db-multi-sync-mutex*) (let ((expire-time (- (current-seconds) 60))) (for-each @@ -77,33 +75,28 @@ (let ((connection (hash-table-ref/default *runremote* run-id #f))) (if (and connection (< (http-transport:server-dat-get-last-access connection) expire-time)) (begin (debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses") + ;; SHOULD CLOSE THE CONNECTION HERE (hash-table-delete! *runremote* run-id))))) (hash-table-keys *runremote*))) (mutex-unlock! *db-multi-sync-mutex*) (let* ((run-id (if rid rid 0)) (connection-info (rmt:get-connection-info run-id)) (jparams (db:obj->string params))) (if connection-info ;; use the server if have connection info - (let* ((dat (http-transport:client-api-send-receive run-id connection-info cmd jparams)) + (let* ((dat (case *transport-type* + ((http)(http-transport:client-api-send-receive run-id connection-info cmd jparams)) + ((nm) (nm-transport:client-api-send-receive run-id connection-info cmd jparams)) + (else (exit)))) (res (if (vector? dat) (vector-ref dat 1) #f)) (success (if (vector? dat) (vector-ref dat 0) #f))) (http-transport:server-dat-update-last-access connection-info) (if success (db:string->obj res) - ;; (if (< attemptnum 100) - ;; (begin - ;; (hash-table-delete! *runremote* run-id) - ;; (thread-sleep! 0.5) - ;; (rmt:send-receive cmd rid params attempnum: (+ attemptnum 1))) - ;; (begin - ;; (print-call-chain (current-error-port)) - ;; (debug:print 0 "ERROR: too many attempts to communicate have failed. Giving up. Kill your mtest processes and start over") - ;; (exit 1))))) (begin ;; let ((new-connection-info (client:setup run-id))) (debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.") (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection ;; no longer killing the server in http-transport:client-api-send-receive Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -20,10 +20,11 @@ (declare (uses common)) (declare (uses db)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (declare (uses synchash)) (declare (uses http-transport)) +(declare (uses nmsg-transport)) (declare (uses launch)) ;; (declare (uses zmq-transport)) (declare (uses daemon)) (include "common_records.scm") @@ -47,11 +48,14 @@ ;; all routes though here end in exit ... ;; ;; start_server ;; (define (server:launch run-id) - (http-transport:launch run-id)) + (case *transport-type* + ((http)(http-transport:launch run-id)) + ((nm) (nmsg-transport:launch run-id)) + (else (debug:print 0 "ERROR: unknown server type " *transport-type*)))) ;;====================================================================== ;; Q U E U E M A N A G E M E N T ;;====================================================================== ADDED testnanomsg/req-rep-client.scm Index: testnanomsg/req-rep-client.scm ================================================================== --- /dev/null +++ testnanomsg/req-rep-client.scm @@ -0,0 +1,30 @@ +;; watch nanomsg's pipeline load-balancer in action. +(use nanomsg) + +(define req (nn-socket 'req)) + +(nn-connect req "tcp://localhost:22022") + +;; (with-output-to-string (lambda ()(serialize obj))) +(define (client-send-receive soc msg) + (nn-send soc msg) + (nn-recv soc)) + +(define ((talk-to-server soc)) + (let loop ((cnt 20)) + (let ((name (list-ref '("Matt" "Tom" "Bob" "Jill" "James" "Jane")(random 6)))) + (print "Sending " name) + (print (client-send-receive req name)) + (if (> cnt 0)(loop (- cnt 1))))) + (print (client-send-receive req "quit")) + (nn-close req) + (exit)) + +;; (thread-start! (lambda () +;; (thread-sleep! 20) +;; (print "Give up on waiting for the server") +;; (nn-close req) +;; (exit))) + +(thread-join! (thread-start! (talk-to-server req))) + ADDED testnanomsg/req-rep-server.scm Index: testnanomsg/req-rep-server.scm ================================================================== --- /dev/null +++ testnanomsg/req-rep-server.scm @@ -0,0 +1,90 @@ +;; watch nanomsg's pipeline load-balancer in action. +(use nanomsg) + +;; (use trace) +;; (trace nn-bind nn-socket nn-assert nn-recv nn-send thread-terminate! nn-close ) + +(define port 22022) +(define host "127.0.0.1") + +(define rep (nn-socket 'rep)) + +(print "connecting, got: " (nn-bind rep (conc "tcp://" "*" ":" port))) + +(define (server soc) + (print "server starting") + (let loop ((msg-in (nn-recv soc))) + (print "server received: " msg-in) + (cond + ((equal? msg-in "quit") + (nn-send soc "Ok, quitting")) + ((and (>= (string-length msg-in) 4) + (equal? (substring msg-in 0 4) "ping")) + (nn-send soc (conc (current-process-id))) + (loop (nn-recv soc))) + ;;((and (>= (string-length msg-in) + (else + (let ((this-task (random 15))) + (thread-sleep! this-task) + (nn-send soc (conc "hello " msg-in " this task took " this-task " seconds to complete")) + (loop (nn-recv soc))))))) + +(define (ping-self host port #!key (return-socket #t)) + ;; send a random number along with pid and check that we get it back + (let* ((req (nn-socket 'req)) + (key "ping") + (success #f) + (keepwaiting #t) + (ping (make-thread + (lambda () + (print "ping: sending string \"" key "\", expecting " (current-process-id)) + (nn-send req key) + (let ((result (nn-recv req))) + (if (equal? (conc (current-process-id)) result) + (begin + (print "ping, success: received \"" result "\"") + (set! success #t)) + (begin + (print "ping, failed: received key \"" result "\"") + (set! keepwaiting #f) + (set! success #f))))) + "ping")) + (timeout (make-thread (lambda () + (let loop ((count 0)) + (thread-sleep! 1) + (print "still waiting after count seconds...") + (if (and keepwaiting (< count 10)) + (loop (+ count 1)))) + (if keepwaiting + (begin + (print "timeout waiting for ping") + (thread-terminate! ping)))) + "timeout"))) + (nn-connect req (conc "tcp://" host ":" port)) + (handle-exceptions + exn + (begin + (print-call-chain) + (print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) + (print "exn=" (condition->list exn)) + (print "ping failed to connect to " host ":" port)) + (thread-start! timeout) + (thread-start! ping) + (thread-join! ping) + (if success (thread-terminate! timeout))) + (if return-socket + (if success req #f) + (begin + (nn-close req) + success)))) + +(let ((server-thread (make-thread (lambda ()(server rep)) "server"))) + (thread-start! server-thread) + ;; (thread-sleep! 1) + (if (ping-self host port) + (begin + (thread-join! server-thread) + (nn-close rep)) + (print "ping failed"))) + +(exit) Index: tests/Makefile ================================================================== --- tests/Makefile +++ tests/Makefile @@ -34,11 +34,11 @@ cd ..;make -j && make install cd fullrun;$(MEGATEST) -stop-server 0 repl : cd ..;make -j && make install - cd fullrun;$(MEGATEST) -repl + cd fullrun;$(MEGATEST) -:b -repl test0 : cleanprep cd simplerun ; $(MEGATEST) -server - -debug $(DEBUG) test1 : cleanprep