Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -45,11 +45,11 @@ (define *rpc:listener* #f) ;; if set up for server communication this will hold the tcp port (define *runremote* #f) ;; if set up for server communication this will hold (define *last-db-access* (current-seconds)) ;; update when db is accessed via server (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) - +(define *client-non-blocking-mode* #f) (define *target* (make-hash-table)) ;; cache the target here; target is keyval1/keyval2/.../keyvalN (define *keys* (make-hash-table)) ;; cache the keys here (define *keyvals* (make-hash-table)) (define *toptest-paths* (make-hash-table)) ;; cache toptest path settings here Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -1163,16 +1163,24 @@ "WRITTEN"))))))) (define (db:obj->string obj)(with-output-to-string (lambda ()(serialize obj)))) (define (db:string->obj msg)(with-input-from-string msg (lambda ()(deserialize)))) +(define (cdb:use-non-blocking-mode proc) + (set! *client-non-blocking-mode* #t) + (let ((res (proc))) + (set! *client-non-blocking-mode* #f) + res)) + (define (cdb:client-call zmq-socket . params) (debug:print-info 11 "cdb:client-call zmq-socket=" zmq-socket " params=" params) (let ((zdat (db:obj->string params)) ;; (with-output-to-string (lambda ()(serialize params)))) (res #f)) (send-message zmq-socket zdat) - (set! res (db:string->obj (receive-message zmq-socket zdat))) + (set! res (db:string->obj (if *client-non-blocking-mode* + (receive-message* zmq-socket zdat) + (receive-message zmq-socket zdat)))) (debug:print-info 11 "zmq-socket " (car params) " res=" res) res)) (define (cdb:set-verbosity zmq-socket val) (cdb:client-call zmq-socket 'set-verbosity #f val)) Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -288,43 +288,40 @@ (hostname (vector-ref server 2)) (port (vector-ref server 3)) (start-time (vector-ref server 4)) (priority (vector-ref server 5)) (state (vector-ref server 6)) - (numclients #f) - (stat-numc ;; (handle-exceptions - ;; exn - ;; (list #f (conc "EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))) - (let ((zmq-socket (server:client-connect hostname port))) - (if zmq-socket - (if (server:client-login zmq-socket) - (let ((numclients (cdb:num-clients zmq-socket)) - (killed #f)) - (if (and khost-port ;; kill by host/port - (equal? hostname (car khost-port)) - (equal? port (string->number (cadr khost-port)))) - (begin - (open-run-close tasks:server-deregister tasks:open-db hostname port: port) - (cdb:kill-server zmq-socket) - (debug:print-info 1 "Killed server by host:port at " hostname ":" port) - (set! killed #t)) - (if (and kpid - (equal? kpid pid)) - (begin - (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid) - (set! killed #t) - (cdb:kill-server zmq-socket) - (debug:print-info 1 "Killed server by pid at " hostname ":" port)))) - (if (not killed)(server:client-logout zmq-socket)) - (close-socket zmq-socket) - (list numclients "ACCESSIBLE")) ;; (server:client-logout zmq-socket) - (begin - (close-socket zmq-socket) - (list #f "CAN'T LOGIN"))) - (list #f "CAN'T CONNECT"))))) ;; ) + (stat-numc (server:ping hostname port)) + (status (car stat-numc)) + (numclients (cadr stat-numc)) + (killed #f) + (zmq-socket (if status (server:client-connect hostname port) #f))) + ;; no need to login as status of #t indicates we are connecting to correct + ;; server + (if (or (not status) ;; no point in keeping dead records in the db + (and khost-port ;; kill by host/port + (equal? hostname (car khost-port)) + (equal? port (string->number (cadr khost-port))))) + (begin + (open-run-close tasks:server-deregister tasks:open-db hostname port: port) + (if status ;; #t means alive + (begin + (cdb:kill-server zmq-socket) + (debug:print-info 1 "Killed server by host:port at " hostname ":" port)) + (debug:print-info 1 "Removing defunct server record for " hostname ":" port)) + (set! killed #t))) + (if (and kpid + (equal? hostname (car khost-port)) + (equal? kpid pid)) + (begin + (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid) + (set! killed #t) + (if status (cdb:kill-server zmq-socket)) + (debug:print-info 1 "Killed server by pid at " hostname ":" port))) + ;; (if zmq-socket (close-socket zmq-socket)) (format #t fmtstr id pid hostname port start-time priority - (cadr stat-numc)(car stat-numc)))) + status numclients))) servers) (set! *didsomething* #t)))) ;; if not list or kill then start a client (if appropriate) (if (or (let ((res #f)) (for-each @@ -907,12 +904,13 @@ ;;====================================================================== ;; Exit and clean up ;;====================================================================== ;; this is the socket if we are a client -(if (socket? *runremote*) - (close-socket *runremote*)) +;; (if (and *runremote* +;; (socket? *runremote*)) +;; (close-socket *runremote*)) (if (not *didsomething*) (debug:print 0 help)) ;; (if *runremote* (rpc:close-all-connections!)) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -84,27 +84,28 @@ ;; if none running or if > 20 seconds since ;; server last used then start shutdown (let loop ((count 0)) (thread-sleep! 1) ;; no need to do this very often (db:write-cached-data) - (print "Server running, count is " count) + ;; (print "Server running, count is " count) (if (< count 10) (loop (+ count 1)) (let ((numrunning (open-run-close db:get-count-tests-running #f))) (if (or (> numrunning 0) (> (+ *last-db-access* 60)(current-seconds))) (begin (debug:print-info 0 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*)) (loop 0))) (begin - (debug:print-info 0 "Starting to shutdown the server side") + (debug:print-info 0 "Starting to shutdown the server.") ;; need to delete only *my* server entry (future use) (open-run-close db:del-var #f "SERVER") (thread-sleep! 10) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") - ))))) + (open-run-close tasks:server-deregister-self tasks:open-db) + (exit)))))) (define (server:find-free-port-and-open host s port #!key (trynum 50)) (let ((s (if s s (make-socket 'rep))) (p (if (number? port) port 5555))) (handle-exceptions @@ -137,24 +138,28 @@ (set! *my-client-signature* sig) *my-client-signature*))) ;; (define (server:client-connect host port) + (debug:print 3 "client-connect " host ":" port) (let ((connect-ok #f) (zmq-socket (make-socket 'req)) (conurl (server:make-server-url (list host port)))) - (connect-socket zmq-socket conurl) - zmq-socket)) + (if (socket? zmq-socket) + (begin + (connect-socket zmq-socket conurl) + zmq-socket) + #f))) (define (server:client-login zmq-socket) (cdb:login zmq-socket *toppath* (server:get-client-signature))) (define (server:client-logout zmq-socket) (let ((ok (and (socket? zmq-socket) (cdb:logout zmq-socket *toppath* (server:get-client-signature))))) - (close-socket zmq-socket) + ;; (close-socket zmq-socket) ok)) ;; Do all the connection work, start a server if not already running (define (server:client-setup #!key (numtries 10)) (if (not *toppath*)(setup-for-run)) @@ -197,22 +202,60 @@ (debug:print-info 1 "Too many retries, giving up"))))) (define (server:launch) (let* ((toppath (setup-for-run))) (debug:print-info 0 "Starting the standalone server") - (if *toppath* - (let* ((th2 (make-thread (lambda () - (server:run (args:get-arg "-server")))))) - ;; (th3 (make-thread (lambda () - ;; (server:keep-running))))) - (thread-start! th2) - ;; (thread-start! th3) - (set! *didsomething* #t) - (thread-join! th2)) - (debug:print 0 "ERROR: Failed to setup for megatest")))) + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) + (if hostinfo + (debug:print-info 1 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) + (if *toppath* + (let* ((th2 (make-thread (lambda () + (server:run (args:get-arg "-server"))))) + (th3 (make-thread (lambda () + (server:keep-running))))) + (thread-start! th2) + (thread-start! th3) + (set! *didsomething* #t) + (thread-join! th3)) + (debug:print 0 "ERROR: Failed to setup for megatest")))))) (define (server:client-launch) (if (server:client-setup) (debug:print-info 0 "connected as client") (begin (debug:print 0 "ERROR: Failed to connect as client") (exit)))) + +;; ping a server and return number of clients or #f (if no response) +(define (server:ping host port #!key (secs 10)) + (cdb:use-non-blocking-mode + (lambda () + (let* ((res #f) + (th1 (make-thread + (lambda () + (let ((zmq-socket (server:client-connect host port))) + (if zmq-socket + (if (server:client-login zmq-socket) + (let ((numclients (cdb:num-clients zmq-socket))) + (server:client-logout zmq-socket) + (close-socket zmq-socket) + (set! res (list #t numclients))) + (begin + ;; (close-socket zmq-socket) + (set! res (list #f "CAN'T LOGIN")))) + (set! res (list #f "CAN'T CONNECT"))))))) + (th2 (make-thread + (lambda () + (let loop ((count 1)) + (debug:print-info 1 "Ping " count " server on " host " at port " port) + (thread-sleep! 2) + (if (< count (/ secs 2)) + (loop (+ count 1)))) + ;; (thread-terminate! th1) + (set! res (list #f "TIMED OUT")))))) + (thread-start! th2) + (thread-start! th1) + (handle-exceptions + exn + (set! res (list #f "TIMED OUT")) + (thread-join! th1 secs)) + res)))) Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -118,18 +118,36 @@ server-id))) (define (tasks:have-clients? mdb server-id) (null? (tasks:get-logged-in-clients mdb server-id))) +;; ping each server in the db and return first found that responds. +;; remove any others. will not necessarily remove all! (define (tasks:get-best-server mdb) - (let ((res #f)) + (let ((res '()) + (best #f)) (sqlite3:for-each-row (lambda (id hostname port) - (set! res (list hostname port))) + (set! res (cons (list hostname port) res)) + (debug:print-info 1 "Found " hostname ":" port)) mdb "SELECT id,hostname,port FROM servers WHERE state='live' ORDER BY start_time DESC LIMIT 1;") - res)) + (print "res=" res) + (if (null? res) #f + (let loop ((hed (car res)) + (tal (cdr res))) + (print "hed=" hed ", tal=" tal) + (let* ((host (car hed)) + (port (cadr hed)) + (ping-res (server:ping host port))) + (if ping-res hed + ;; remove defunct server from table + (begin + (open-run-close tasks:server-deregister tasks:open-db host port: port) + (if (null? tal) + #f + (loop (car tal)(cdr tal)))))))))) (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row (lambda (id pid hostname port start-time priority state) Index: tests/tests.scm ================================================================== --- tests/tests.scm +++ tests/tests.scm @@ -104,10 +104,12 @@ (test #f #t (socket? *runremote*)) ;; (test #f #t (server:client-setup)) (test #f #t (car (cdb:login *runremote* *toppath* *my-client-signature*))) + +(test #f #t (open-run-close tasks:get-best-server tasks:open-db)) ;;====================================================================== ;; C O N F I G F I L E S ;;======================================================================