Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -17,10 +17,13 @@ ;; (import (prefix rpc rpc:)) (use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest base64) (import (prefix sqlite3 sqlite3:)) (import (prefix base64 base64:)) + +;; Note, try to remove this dependency +(use zmq) (declare (unit db)) (declare (uses common)) (declare (uses keys)) (declare (uses ods)) @@ -1105,22 +1108,33 @@ ;; (let loop () ;; (thread-sleep! 10) ;; move save time around to minimize regular collisions? ;; (db:write-cached-data) ;; (loop))) +;; NOTE: Can remove the regex and base64 encoding for zmq (define (db:obj->string obj) - (string-substitute - (regexp "=") "_" - (base64:base64-encode (with-output-to-string (lambda ()(serialize obj)))) - #t)) + (case *transport-type* + ((fs) obj) + ((http) + (string-substitute + (regexp "=") "_" + (base64:base64-encode (with-output-to-string (lambda ()(serialize obj)))) + #t)) + ((zmq)(with-output-to-string (lambda ()(serialize obj)))) + (else obj))) (define (db:string->obj msg) - (with-input-from-string - (base64:base64-decode - (string-substitute - (regexp "_") "=" msg #t)) - (lambda ()(deserialize)))) + (case *transport-type* + ((fs) msg) + ((http) + (with-input-from-string + (base64:base64-decode + (string-substitute + (regexp "_") "=" msg #t)) + (lambda ()(deserialize)))) + ((zmq)(with-input-from-string msg (lambda ()(deserialize)))) + (else msg))) (define (cdb:use-non-blocking-mode proc) (set! *client-non-blocking-mode* #t) (let ((res (proc))) (set! *client-non-blocking-mode* #f) @@ -1150,12 +1164,60 @@ (let* ((res #f) (rawdat (server:client-send-receive serverdat zdat)) (tmp #f)) (debug:print-info 11 "Sent " zdat ", received " rawdat) (set! tmp (db:string->obj rawdat)) - (vector-ref tmp 2) - ))))) + (vector-ref tmp 2)))) + ((zmq) + (handle-exceptions + exn + (begin + (thread-sleep! 5) + (if (> numretries 0)(apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params))) + (let* ((push-socket (vector-ref zmq-sockets 0)) + (sub-socket (vector-ref zmq-sockets 1)) + (client-sig (server:get-client-signature)) + (query-sig (message-digest-string (md5-primitive) (conc qtype immediate params))) + (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params)))) + (res #f) + (send-receive (lambda () + (debug:print-info 11 "sending message") + (send-message push-socket zdat) + (debug:print-info 11 "message sent") + (let loop () + ;; get the sender info + ;; this should match (server:get-client-signature) + ;; we will need to process "all" messages here some day + (receive-message* sub-socket) + ;; now get the actual message + (let ((myres (db:string->obj (receive-message* sub-socket)))) + (if (equal? query-sig (vector-ref myres 1)) + (set! res (vector-ref myres 2)) + (loop)))))) + (timeout (lambda () + (let loop ((n numretries)) + (thread-sleep! 15) + (if (not res) + (if (> numretries 0) + (begin + (debug:print 2 "WARNING: no reply to query " params ", trying resend") + (debug:print-info 11 "re-sending message") + (send-message push-socket zdat) + (debug:print-info 11 "message re-sent") + (loop (- n 1))) + ;; (apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params)) + (begin + (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.") + (exit 5)))))))) + (debug:print-info 11 "Starting threads") + (let ((th1 (make-thread send-receive "send receive")) + (th2 (make-thread timeout "timeout"))) + (thread-start! th1) + (thread-start! th2) + (thread-join! th1) + (debug:print-info 11 "cdb:client-call returning res=" res) + res)))))) (define (cdb:set-verbosity serverdat val) (cdb:client-call serverdat 'set-verbosity #f *default-numtries* val)) (define (cdb:login serverdat keyval signature) Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -61,10 +61,11 @@ (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY, pid INTEGER, interface TEXT, hostname TEXT, port INTEGER, + pubport INTEGER, start_time TIMESTAMP, priority INTEGER, state TEXT, mt_version TEXT, heartbeat TIMESTAMP, @@ -84,21 +85,22 @@ ;;====================================================================== ;; Server and client management ;;====================================================================== ;; state: 'live, 'shutting-down, 'dead -(define (tasks:server-register mdb pid interface port priority state) +(define (tasks:server-register mdb pid interface port priority state #!key (pubport -1)) (debug:print-info 11 "tasks:server-register " pid " " interface " " port " " priority " " state) (sqlite3:execute mdb - "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface) + "INSERT OR REPLACE INTO servers (pid,hostname,port,pubport,start_time,priority,state,mt_version,heartbeat,interface) VALUES(?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?);" - pid (get-host-name) port priority (conc state) megatest-version interface) + pid (get-host-name) port pubport priority (conc state) megatest-version interface) (list (tasks:server-get-server-id mdb (get-host-name) interface port pid) interface port + pubport )) ;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used! (define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)(action 'markdead)) (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid) @@ -253,14 +255,14 @@ (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row - (lambda (id pid hostname interface port start-time priority state mt-version last-update) - (set! res (cons (vector id pid hostname interface port start-time priority state mt-version last-update) res))) + (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update) + (set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update) res))) mdb - "SELECT id,pid,hostname,interface,port,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update FROM servers ORDER BY start_time DESC;") + "SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update FROM servers ORDER BY start_time DESC;") res)) ;;====================================================================== ;; Tasks and Task monitors Index: zmq-transport.scm ================================================================== --- zmq-transport.scm +++ zmq-transport.scm @@ -11,13 +11,11 @@ (require-extension (srfi 18) extras tcp s11n) (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) (import (prefix sqlite3 sqlite3:)) -(use spiffy uri-common intarweb http-client spiffy-request-vars) - -(tcp-buffer-size 2048) +(use zmq) (declare (unit server)) (declare (uses common)) (declare (uses db)) @@ -24,131 +22,186 @@ (declare (uses tests)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (include "common_records.scm") (include "db_records.scm") + +;; Transition to pub --> sub with pull <-- push +;; +;; 1. client sends request to server via push to the pull port +;; 2. server puts request in queue or processes immediately as appropriate +;; 3. server puts responses from completed requests into pub port +;; +;; TODO +;; +;; Done Tested +;; [x] [ ] 1. Add columns pullport pubport to servers table +;; [x] [ ] 2. Add rm of monitor.db if older than 11/12/2012 +;; [x] [ ] 3. Add create of pullport and pubport with finding of available ports +;; [x] [ ] 4. Add client compose of request +;; [x] [ ] - name of client: testname/itempath-test_id-hostname +;; [x] [ ] - name of request: callname, params +;; [x] [ ] - request key: f(clientname, callname, params) +;; [x] [ ] 5. Add processing of subscription hits +;; [x] [ ] - done when get key +;; [x] [ ] - return results +;; [x] [ ] 6. Add timeout processing +;; [x] [ ] - after 60 seconds +;; [ ] [ ] i. check server alive, connect to new if necessary +;; [ ] [ ] ii. resend request +;; [ ] [ ] 7. Turn self ping back on (define (server:make-server-url hostport) (if (not hostport) #f - (conc "http://" (car hostport) ":" (cadr hostport)))) + (conc "tcp://" (car hostport) ":" (cadr hostport)))) (define *server-loop-heart-beat* (current-seconds)) (define *heartbeat-mutex* (make-mutex)) ;;====================================================================== ;; S E R V E R ;;====================================================================== -;; Call this to start the actual server -;; - -(define *db:process-queue-mutex* (make-mutex)) +(define-inline (zmqsock:get-pub dat)(vector-ref dat 0)) +(define-inline (zmqsock:get-pull dat)(vector-ref dat 1)) +(define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0)) +(define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0)) (define (server:run hostn) (debug:print 2 "Attempting to start the server ...") (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") (exit)))) - (let* (;; (iface (if (string=? "-" hostn) - ;; #f ;; (get-host-name) - ;; hostn)) - (db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily + (let* ((zmq-sdat1 #f) + (zmq-sdat2 #f) + (pull-socket #f) + (pub-socket #f) + (p1 #f) + (p2 #f) + (zmq-sockets-dat #f) + (iface (if (string=? "-" hostn) + "*" ;; (get-host-name) + hostn)) (hostname (get-host-name)) (ipaddrstr (let ((ipstr (if (string=? "-" hostn) (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") #f))) - (if ipstr ipstr hostn))) ;; hostname))) - (start-port (if (args:get-arg "-port") + (if ipstr ipstr hostname))) + (last-run 0)) + (set! zmq-sockets-dat (server:setup-ports ipaddrstr (if (args:get-arg "-port") (string->number (args:get-arg "-port")) - (+ 5000 (random 1001)))) - (link-tree-path (config-lookup *configdat* "setup" "linktree"))) + (+ 5000 (random 1001))))) + + (set! zmq-sdat1 (car zmq-sockets-dat)) + (set! pull-socket (cadr zmq-sdat1)) ;; (iface s port) + (set! p1 (caddr zmq-sdat1)) + + (set! zmq-sdat2 (cadr zmq-sockets-dat)) + (set! pub-socket (cadr zmq-sdat2)) + (set! p2 (caddr zmq-sdat2)) + (set! *cache-on* #t) - (root-path (if link-tree-path - link-tree-path - (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP! + + ;; what to do when we quit + ;; +;; (on-exit (lambda () +;; (if (and *toppath* *server-info*) +;; (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*)) +;; (let loop () +;; (let ((queue-len 0)) +;; (thread-sleep! (random 5)) +;; (mutex-lock! *incoming-mutex*) +;; (set! queue-len (length *incoming-data*)) +;; (mutex-unlock! *incoming-mutex*) +;; (if (> queue-len 0) +;; (begin +;; (debug:print-info 0 "Queue not flushed, waiting ...") +;; (loop)))))))) - ;; Setup the web server and a /ctrl interface + ;; The heavy lifting + ;; + ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime ;; - (vhost-map `(((* any) . ,(lambda (continue) - ;; open the db on the first call - (if (not db)(set! db (open-db))) - (let* (($ (request-vars source: 'both)) - (dat ($ 'dat)) - (res #f)) - (cond - ((equal? (uri-path (request-uri (current-request))) - '(/ "hey")) - (send-response body: "hey there!\n" - headers: '((content-type text/plain)))) - ;; This is the /ctrl path where data is handed to the server and - ;; responses - ((equal? (uri-path (request-uri (current-request))) - '(/ "ctrl")) - (let* ((packet (db:string->obj dat)) + (let loop ((queue-lst '())) + (let* ((rawmsg (receive-message* pull-socket)) + (packet (db:string->obj rawmsg)) (qtype (cdb:packet-get-qtype packet))) (debug:print-info 12 "server=> received packet=" packet) (if (not (member qtype '(sync ping))) (begin (mutex-lock! *heartbeat-mutex*) (set! *last-db-access* (current-seconds)) (mutex-unlock! *heartbeat-mutex*))) - ;; (mutex-lock! *db:process-queue-mutex*) ;; trying a mutex - ;; (set! res (open-run-close db:process-queue-item open-db packet)) - (set! res (db:process-queue-item db packet)) - ;; (mutex-unlock! *db:process-queue-mutex*) - (debug:print-info 11 "Return value from db:process-queue-item is " res) - (send-response body: (conc "ctrl data\n" - res - "") - headers: '((content-type text/plain))))) - (else (continue)))))))) - (server:try-start-server ipaddrstr start-port) - ;; lite3:finalize! db))) - )) - - - -;; (define (server:main-loop) -;; (print "INFO: Exectuing main server loop") -;; (access-log "megatest-http.log") -;; (server-bind-address #f) -;; (define-page (main-page-path) -;; (lambda () -;; (let ((dat ($ "dat"))) -;; ;; (with-request-variables (dat) -;; (debug:print-info 12 "Got dat=" dat) -;; (let* ((packet (db:string->obj dat)) -;; (qtype (cdb:packet-get-qtype packet))) -;; (debug:print-info 12 "server=> received packet=" packet) -;; (if (not (member qtype '(sync ping))) -;; (begin -;; (mutex-lock! *heartbeat-mutex*) -;; (set! *last-db-access* (current-seconds)) -;; (mutex-unlock! *heartbeat-mutex*))) -;; (let ((res (open-run-close db:process-queue-item open-db packet))) -;; (debug:print-info 11 "Return value from db:process-queue-item is " res) -;; res)))))) - -;;; (use spiffy uri-common intarweb) -;;; -;;; (root-path "/var/www") -;;; -;;; (vhost-map `(((* any) . ,(lambda (continue) -;;; (if (equal? (uri-path (request-uri (current-request))) -;;; '(/ "hey")) -;;; (send-response body: "hey there!\n" -;;; headers: '((content-type text/plain))) -;;; (continue)))))) -;;; -;;; (start-server port: 12345) - -;; This is recursively run by server:run until sucessful + (if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue + (begin + (open-run-close db:process-queue #f pub-socket (cons packet queue-lst)) + (loop '())) + (loop (cons packet queue-lst))))))) + +;; run server:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. ;; -(define (server:try-start-server ipaddrstr portnum) +(define (server:keep-running) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *server-info*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (sleep 4) + (loop)))))) + (iface (cadr server-info)) + (pullport (caddr server-info)) + (pubport (cadddr server-info)) ;; id interface pullport pubport) + (zmq-sockets (server:client-connect iface pullport pubport)) + (last-access 0)) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + (let ((queue-len (cdb:client-call zmq-sockets 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (if (> (+ last-access + ;; (* 50 60 60) ;; 48 hrs + ;; 60 ;; one minute + ;; (* 60 60) ;; one hour + (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit))))))) + +(define (server:find-free-port-and-open iface s port stype #!key (trynum 50)) + (let ((s (if s s (make-socket stype))) + (p (if (number? port) port 5555)) + (old-handler (current-exception-handler))) (handle-exceptions exn (begin (print-error-message exn) (if (< portnum 9000) @@ -197,44 +250,30 @@ (if *my-client-signature* *my-client-signature* (let ((sig (server:mk-signature))) (set! *my-client-signature* sig) *my-client-signature*))) -;; -;; -;; 1 Hello, world! Goodbye Dolly -;; Send msg to serverdat and receive result -(define (server:client-send-receive serverdat msg) - (let* ((url (server:make-server-url serverdat)) - (fullurl (conc url "/ctrl")) ;; (conc url "/?dat=" msg))) - (numretries 0)) - (handle-exceptions - exn - (if (< numretries 200) - (server:client-send-receive serverdat msg)) +;; +(define (server:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '())) + (debug:print-info 3 "client-connect " iface ":" port ", type=" type ", subscriptions=" subscriptions) + (let ((connect-ok #f) + (zmq-socket (if context + (make-socket type context) + (make-socket type))) + (conurl (server:make-server-url (list iface port)))) + (if (socket? zmq-socket) (begin - (debug:print-info 11 "fullurl=" fullurl "\n") - ;; set up the http-client here - (max-retry-attempts 100) - (retry-request? (lambda (request) - (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10)) - (set! numretries (+ numretries 1)) - #t)) - ;; send the data and get the response - ;; extract the needed info from the http data and - ;; process and return it. - (let* ((res (with-input-from-request fullurl - ;; #f - ;; msg - (list (cons 'dat msg)) - read-string))) - (debug:print-info 11 "got res=" res) - (let ((match (string-search (regexp "(.*)<.body>") res))) - (debug:print-info 11 "match=" match) - (let ((final (cadr match))) - (debug:print-info 11 "final=" final) - final))))))) + ;; first apply subscriptions + (for-each (lambda (subscription) + (debug:print 2 "Subscribing to " subscription) + (socket-option-set! zmq-socket 'subscribe subscription)) + subscriptions) + (connect-socket zmq-socket conurl) + zmq-socket) + (begin + (debug:print 0 "ERROR: Failed to open socket to " conurl) + #f)))) (define (server:client-login serverdat) (max-retry-attempts 100) (cdb:login serverdat *toppath* (server:get-client-signature))) @@ -243,22 +282,26 @@ (let ((ok (and (socket? serverdat) (cdb:logout serverdat *toppath* (server:get-client-signature))))) ;; (close-socket serverdat) ok)) -(define (server:client-connect iface port) - (let* ((login-res #f) - (serverdat (list iface port))) - (set! login-res (server:client-login serverdat)) +(define (server:client-connect iface pullport pubport) + (let* ((push-socket (server:client-socket-connect iface pullport type: 'push)) + (sub-socket (server:client-socket-connect iface pubport + type: 'sub + subscriptions: (list (server:get-client-signature) "all"))) + (zmq-sockets (vector push-socket sub-socket)) + (login-res #f)) + (set! login-res (server:client-login zmq-sockets)) (if (and (not (null? login-res)) (car login-res)) (begin - (debug:print-info 2 "Logged in and connected to " iface ":" port) - (set! *runremote* serverdat) - serverdat) + (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".") + (set! *runremote* zmq-sockets) + zmq-sockets) (begin - (debug:print-info 2 "Failed to login or connect to " iface ":" port) + (debug:print-info 2 "Failed to login or connect to " conurl) (set! *runremote* #f) #f)))) ;; Do all the connection work, start a server if not already running (define (server:client-setup #!key (numtries 50)) @@ -270,25 +313,37 @@ (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo (let ((host (list-ref hostinfo 0)) (iface (list-ref hostinfo 1)) (port (list-ref hostinfo 2)) - (pid (list-ref hostinfo 3))) + (pubport (list-ref hostinfo 3)) + (pid (list-ref hostinfo 4))) (debug:print-info 2 "Setting up to connect to " hostinfo) - (server:client-connect iface port)) ;; ) + ;; (handle-exceptions + ;; exn + ;; (begin + ;; ;; something went wrong in connecting to the server. In this scenario it is ok + ;; ;; to try again + ;; (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo) + ;; (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) + ;; (debug:print 0 " perhaps jobs killed with -9? Removing server records") + ;; (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport) + ;; (server:client-setup (- numtries 1)) + ;; #f) + (server:client-connect iface port pubport)) ;; ) (if (> numtries 0) (let ((exe (car (argv))) (pid #f)) (debug:print-info 0 "No server available, attempting to start one...") - (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) - (string-intersperse *verbosity* ",") - (conc *verbosity*))))) - ;; (set! pid (process-fork (lambda () + ;; (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) + ;; (string-intersperse *verbosity* ",") + ;; (conc *verbosity*))))) + (set! pid (process-fork (lambda () ;; (current-input-port (open-input-file "/dev/null")) ;; (current-output-port (open-output-file "/dev/null")) ;; (current-error-port (open-output-file "/dev/null")) - ;; (server:launch)))) + (server:launch)))) ;; should never get here .... (let loop ((count 0)) (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if (not hostinfo) (begin (debug:print-info 0 "Waiting for server pid=" pid " to start") @@ -368,20 +423,37 @@ (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (debug:print 11 "server:launch hostinfo=" hostinfo) (if hostinfo (debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) (if *toppath* - (let* ((th2 (make-thread (lambda () + (let* (;; (th1 (make-thread (lambda () + ;; (let ((server-info #f)) + ;; ;; wait for the server to be online and available + ;; (let loop () + ;; (debug:print-info 2 "Waiting for the server to come online before starting heartbeat") + ;; (thread-sleep! 2) + ;; (mutex-lock! *heartbeat-mutex*) + ;; (set! server-info *server-info* ) + ;; (mutex-unlock! *heartbeat-mutex*) + ;; (if (not server-info)(loop))) + ;; (debug:print 2 "Server alive, starting self-ping") + ;; (server:self-ping server-info) + ;; )) + ;; "Self ping")) + (th2 (make-thread (lambda () (server:run (if (args:get-arg "-server") (args:get-arg "-server") "-"))) "Server run")) (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) ) + (set! *client-non-blocking-mode* #t) + ;; (thread-start! th1) (thread-start! th2) (thread-start! th3) (set! *didsomething* #t) + ;; (thread-join! th3) (thread-join! th2) ) (debug:print 0 "ERROR: Failed to setup for megatest"))) (exit))) @@ -388,15 +460,16 @@ (define (server:client-signal-handler signum) (handle-exceptions exn (debug:print " ... exiting ...") (let ((th1 (make-thread (lambda () - "") ;; do nothing for now (was flush out last call if applicable) + (if (not *received-response*) + (receive-message* *runremote*))) ;; flush out last call if applicable "eat response")) (th2 (make-thread (lambda () (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") - (thread-sleep! 1) ;; give the flush one second to do it's stuff + (thread-sleep! 3) ;; give the flush three seconds to do it's stuff (debug:print 0 " Done.") (exit 4)) "exit on ^C timer"))) (thread-start! th2) (thread-start! th1) @@ -408,5 +481,70 @@ (debug:print-info 2 "connected as client") (begin (debug:print 0 "ERROR: Failed to connect as client") (exit)))) +;;====================================================================== +;; Defunct functions +;;====================================================================== + +;; ping a server and return number of clients or #f (if no response) +;; NOT IN USE! +(define (server:ping host port #!key (secs 10)(return-socket #f)) + (cdb:use-non-blocking-mode + (lambda () + (let* ((res #f) + (th1 (make-thread + (lambda () + (let* ((zmq-context (make-context 1)) + (zmq-socket (server:client-connect host port context: zmq-context))) + (if zmq-socket + (if (server:client-login zmq-socket) + (let ((numclients (cdb:num-clients zmq-socket))) + (if (not return-socket) + (begin + (server:client-logout zmq-socket) + (close-socket zmq-socket))) + (set! res (list #t numclients (if return-socket zmq-socket #f)))) + (begin + ;; (close-socket zmq-socket) + (set! res (list #f "CAN'T LOGIN" #f)))) + (set! res (list #f "CAN'T CONNECT" #f))))) + "Ping: th1")) + (th2 (make-thread + (lambda () + (let loop ((count 1)) + (debug:print-info 1 "Ping " count " server on " host " at port " port) + (thread-sleep! 2) + (if (< count (/ secs 2)) + (loop (+ count 1)))) + ;; (thread-terminate! th1) + (set! res (list #f "TIMED OUT" #f))) + "Ping: th2"))) + (thread-start! th2) + (thread-start! th1) + (handle-exceptions + exn + (set! res (list #f "TIMED OUT" #f)) + (thread-join! th1 secs)) + res)))) + +;; (define (server:self-ping server-info) +;; ;; server-info: server-id interface pullport pubport +;; (let ((iface (list-ref server-info 1)) +;; (pullport (list-ref server-info 2)) +;; (pubport (list-ref server-info 3))) +;; (server:client-connect iface pullport pubport) +;; (let loop () +;; (thread-sleep! 2) +;; (cdb:client-call *runremote* 'ping #t) +;; (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!") +;; (mutex-lock! *heartbeat-mutex*) +;; (set! *server-loop-heart-beat* (current-seconds)) +;; (mutex-unlock! *heartbeat-mutex*) +;; (loop)))) + +(define (server:reply pubsock target query-sig success/fail result) + (debug:print-info 11 "server:reply target=" target ", result=" result) + (send-message pubsock target send-more: #t) + (send-message pubsock (db:obj->string (vector success/fail query-sig result)))) +