@@ -27,39 +27,59 @@ (define (server:make-server-url hostport) (if (not hostport) #f (conc "tcp://" (car hostport) ":" (cadr hostport)))) -(define *server-loop-heart-beat* (list 'start (current-seconds))) +(define *server-loop-heart-beat* (current-seconds)) +(define *heartbeat-mutex* (make-mutex)) +(define (server:self-ping iface port) + (let ((zsocket (server:client-connect iface port))) + (let loop () + (thread-sleep! 5) + (cdb:client-call zsocket 'ping #t) + (debug:print 4 "server:self-ping - I'm alive on " iface ":" port "!") + (mutex-lock! *heartbeat-mutex*) + (set! *server-loop-heart-beat* (current-seconds)) + (mutex-unlock! *heartbeat-mutex*) + (loop)))) + (define (server:run hostn) (debug:print 0 "Attempting to start the server ...") (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") (exit)))) (let* ((zmq-socket #f) + (zmq-socket-dat #f) (iface (if (string=? "-" hostn) "*" ;; (get-host-name) hostn)) (hostname (get-host-name)) (ipaddrstr (let ((ipstr (if (string=? "-" hostn) (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") #f))) - (if ipstr ipstr hostname)))) + (if ipstr ipstr hostname))) + (actual-port #f)) ;; (set! zmq-socket (server:find-free-port-and-open iface zmq-socket 5555 0)) - (set! zmq-socket (server:find-free-port-and-open ipaddrstr zmq-socket (if (args:get-arg "-port") + (set! zmq-socket-dat (server:find-free-port-and-open ipaddrstr zmq-socket (if (args:get-arg "-port") (string->number (args:get-arg "-port")) - 5555) + (+ 5000 (random 1001))) 0)) + (set! zmq-socket (cadr zmq-socket-dat)) + (set! actual-port (caddr zmq-socket-dat)) (set! *cache-on* #t) + + ;; (set! th1 (make-thread (lambda () + ;; (server:self-ping ipaddrstr actual-port)))) + ;; (thread-start! th1) ;; what to do when we quit ;; (on-exit (lambda () - (if (and *toppath* *server-id*) + (if (and *toppath* *server-info*) (begin (open-run-close tasks:server-deregister-self tasks:open-db ipaddrstr)) (let loop () (let ((queue-len 0)) (thread-sleep! (random 5)) @@ -72,21 +92,21 @@ (loop)))))))) ;; The heavy lifting ;; (let loop () - ;; Ugly yuk. - (mutex-lock! *incoming-mutex*) - (set! *server-loop-heart-beat* (list 'waiting (current-seconds))) - (mutex-unlock! *incoming-mutex*) + ;; ;; Ugly yuk. + ;; (mutex-lock! *incoming-mutex*) + ;; (set! *server-loop-heart-beat* (list 'waiting (current-seconds))) + ;; (mutex-unlock! *incoming-mutex*) (let* ((rawmsg (receive-message* zmq-socket)) (params (db:string->obj rawmsg)) ;; (with-input-from-string rawmsg (lambda ()(deserialize)))) (res #f)) ;;; Ugly yuk. - (mutex-lock! *incoming-mutex*) - (set! *server-loop-heart-beat* (list 'working (current-seconds))) - (mutex-unlock! *incoming-mutex*) + ;; (mutex-lock! *incoming-mutex*) + ;; (set! *server-loop-heart-beat* (list 'working (current-seconds))) + ;; (mutex-unlock! *incoming-mutex*) (debug:print-info 12 "server=> received params=" params) (set! res (cdb:cached-access params)) (debug:print-info 12 "server=> processed res=" res) (send-message zmq-socket (db:obj->string res)) (if (not *time-to-exit*) @@ -93,11 +113,13 @@ (loop) (begin (open-run-close tasks:server-deregister-self tasks:open-db #f) (db:write-cached-data) (exit) - )))))) + )))) + (thread-join! th1))) + ;; run server:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (server:keep-running) @@ -108,33 +130,43 @@ (db:write-cached-data) ;; (print "Server running, count is " count) (if (< count 2) ;; 3x3 = 9 secs aprox (loop (+ count 1)) (let ((numrunning (open-run-close db:get-count-tests-running #f)) - (server-loop-heartbeat #f)) - ;;; Ugly yuk. - (mutex-lock! *incoming-mutex*) + (server-loop-heartbeat #f) + (server-info #f) + (pulse 0)) + ;; BUG add a wait on server alive here!! + ;; ;; Ugly yuk. + (mutex-lock! *heartbeat-mutex*) (set! server-loop-heartbeat *server-loop-heart-beat*) - (mutex-unlock! *incoming-mutex*) + (set! server-info *server-info*) + (mutex-unlock! *heartbeat-mutex*) ;; The logic here is that if the server loop gets stuck blocked in working ;; we don't want to update our heartbeat - (let ((server-state (car server-loop-heartbeat)) - (server-update (cadr server-loop-heartbeat))) - (if (or (eq? server-state 'waiting) - (< (- (current-seconds) server-update) 10)) - (open-run-close tasks:server-update-heartbeat tasks:open-db *server-id*) - (debug:print "ERROR: No heartbeat update, server appears stuck"))) + (set! pulse (- (current-seconds) server-loop-heartbeat)) + (debug:print-info 1 "Heartbeat period is " pulse " on " (cadr server-info) ":" (caddr server-info)) + (if (> pulse 11) ;; must stay less than 10 seconds + (begin + (debug:print 0 "ERROR: Heartbeat failed, committing servercide") + (exit)) + (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))) (if (or (> numrunning 0) ;; stay alive for two days after last access - (> (+ *last-db-access* (* 48 60 60))(current-seconds))) + (> (+ *last-db-access* + ;; (* 48 60 60) ;; 48 hrs + ;; 60 ;; one minute + (* 60 60) ;; one hour + ) + (current-seconds))) (begin (debug:print-info 2 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*)) (loop 0)) (begin (debug:print-info 0 "Starting to shutdown the server.") ;; need to delete only *my* server entry (future use) (set! *time-to-exit* #t) - (open-run-close tasks:server-deregister-self tasks:open-db) + (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name)) (thread-sleep! 1) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") (exit))))))) @@ -156,12 +188,14 @@ (let ((zmq-url (conc "tcp://" iface ":" p))) (print "Trying to start server on " zmq-url) (bind-socket s zmq-url) (set! *runremote* #f) (debug:print 0 "Server started on " zmq-url) - (set! *server-id* (open-run-close tasks:server-register tasks:open-db (current-process-id) iface p 0 'live)) - s)))) + (mutex-lock! *heartbeat-mutex*) + (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) iface p 0 'live)) + (mutex-unlock! *heartbeat-mutex*) + (list iface s port))))) (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () @@ -197,29 +231,32 @@ (cdb:logout zmq-socket *toppath* (server:get-client-signature))))) ;; (close-socket zmq-socket) ok)) ;; Do all the connection work, start a server if not already running -(define (server:client-setup #!key (numtries 50)(do-ping #f)) +(define (server:client-setup #!key (numtries 50)) (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: failed to find megatest.config, exiting") (exit)))) - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db do-ping: do-ping))) + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo (let ((host (car hostinfo)) (iface (cadr hostinfo)) (port (caddr hostinfo))) (debug:print-info 2 "Setting up to connect to " hostinfo) (handle-exceptions exn (begin + ;; something went wrong in connecting to the server. In this scenario it is ok + ;; to try again (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo) (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) (debug:print 0 " perhaps jobs killed with -9? Removing server records") (open-run-close tasks:server-deregister tasks:open-db host port: port) + (server:client-setup (- numtries 1)) #f) (let* ((zmq-socket (server:client-connect iface port)) (login-res (server:client-login zmq-socket)) (connect-ok (if (null? login-res) #f (car login-res))) (conurl (server:make-server-url (list iface port)))) @@ -234,15 +271,17 @@ #f))))) (if (> numtries 0) (let ((exe (car (argv)))) (debug:print-info 1 "No server available, attempting to start one...") (process-run exe (list "-server" "-" "-debug" (conc *verbosity*))) - (sleep 2) - ;; not doing ping, assume the server started and registered itself - (server:client-setup numtries: (- numtries 1) do-ping: #f)) + (sleep 5) ;; give server time to start + ;; we are starting a server, do not try again! That can lead to + ;; recursively starting many processes!!! + (server:client-setup numtries: 0)) (debug:print-info 1 "Too many attempts, giving up"))))) +;; all routes though here end in exit ... (define (server:launch) (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, exiting") @@ -250,19 +289,34 @@ (debug:print-info 0 "Starting the standalone server") (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo (debug:print-info 1 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) (if *toppath* - (let* ((th2 (make-thread (lambda () - (server:run (args:get-arg "-server"))))) + (let* ((th1 (make-thread (lambda () + (let ((server-info #f)) + ;; wait for the server to be online and available + (let loop () + (debug:print-info 1 "Waiting for the server to come online before starting heartbeat") + (thread-sleep! 5) + (mutex-lock! *heartbeat-mutex*) + (set! server-info *server-info* ) + (mutex-unlock! *heartbeat-mutex*) + (if (not server-info)(loop))) + (debug:print 1 "Server alive, starting self-ping") + (server:self-ping (cadr server-info)(caddr server-info)))) "Self ping")) + (th2 (make-thread (lambda () + (server:run (args:get-arg "-server"))) "Server run")) (th3 (make-thread (lambda () - (server:keep-running))))) + (server:keep-running)) "Keep running"))) + (set! *client-non-blocking-mode* #t) + (thread-start! th1) (thread-start! th2) (thread-start! th3) (set! *didsomething* #t) (thread-join! th3)) - (debug:print 0 "ERROR: Failed to setup for megatest"))))) + (debug:print 0 "ERROR: Failed to setup for megatest"))) + (exit))) (define (server:client-launch #!key (do-ping #f)) (if (server:client-setup do-ping: do-ping) (debug:print-info 2 "connected as client") (begin @@ -287,22 +341,24 @@ (close-socket zmq-socket))) (set! res (list #t numclients (if return-socket zmq-socket #f)))) (begin ;; (close-socket zmq-socket) (set! res (list #f "CAN'T LOGIN" #f)))) - (set! res (list #f "CAN'T CONNECT" #f))))))) + (set! res (list #f "CAN'T CONNECT" #f))))) + "Ping: th1")) (th2 (make-thread (lambda () (let loop ((count 1)) (debug:print-info 1 "Ping " count " server on " host " at port " port) (thread-sleep! 2) (if (< count (/ secs 2)) (loop (+ count 1)))) ;; (thread-terminate! th1) - (set! res (list #f "TIMED OUT" #f)))))) + (set! res (list #f "TIMED OUT" #f))) + "Ping: th2"))) (thread-start! th2) (thread-start! th1) (handle-exceptions exn (set! res (list #f "TIMED OUT" #f)) (thread-join! th1 secs)) res))))