Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -1147,10 +1147,12 @@ ((set-verbosity) (set! *verbosity* (caddr params)) *verbosity*) ((get-verbosity) *verbosity*) + ((ping) + 'hi) (else (mutex-lock! *incoming-mutex*) (set! *last-db-access* (current-seconds)) (set! *incoming-data* (cons (vector qry-name @@ -1176,10 +1178,11 @@ (set! *client-non-blocking-mode* #t) (let ((res (proc))) (set! *client-non-blocking-mode* #f) res)) +;; params = 'target cached remparams (define (cdb:client-call zmq-socket . params) (debug:print-info 11 "cdb:client-call zmq-socket=" zmq-socket " params=" params) (let ((zdat (db:obj->string params)) ;; (with-output-to-string (lambda ()(serialize params)))) (res #f)) (send-message zmq-socket zdat) Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -291,41 +291,26 @@ (port (vector-ref server 4)) (start-time (vector-ref server 5)) (priority (vector-ref server 6)) (state (vector-ref server 7)) (mt-ver (vector-ref server 8)) - (status (open-run-close tasks:server-alive? tasks:open-db hostname port: port)) + (status (open-run-close tasks:server-alive? tasks:open-db #f hostname: hostname port: port)) (killed #f) (zmq-socket (if status (server:client-connect hostname port) #f))) ;; no need to login as status of #t indicates we are connecting to correct ;; server (if (or (not status) ;; no point in keeping dead records in the db (and khost-port ;; kill by host/port (equal? hostname (car khost-port)) (equal? port (string->number (cadr khost-port))))) - (begin - (open-run-close tasks:server-deregister tasks:open-db hostname port: port) - (if status ;; #t means alive - (begin - (if (equal? hostname (get-host-name)) - (process-signal pid signal/term) ;; local machine, send sig term - (cdb:kill-server zmq-socket)) ;; remote machine, try telling server to commit suicide - (debug:print-info 1 "Killed server by host:port at " hostname ":" port)) - (debug:print-info 1 "Removing defunct server record for " hostname ":" port)) - (set! killed #t))) + (tasks:kill-server status hostname port pid)) + (if (and kpid - ;; (equal? hostname (car khost-port)) + (equal? hostname (car khost-port)) (equal? kpid pid)) ;;; YEP, ALL WITH PID WILL BE KILLED!!! - (begin - (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid) - (set! killed #t) - (if status - (if (equal? hostname (get-host-name)) - (process-signal pid signal/term) ;; local machine, send sig term - (debug:print 0 "WARNING: Can't kill a dead server on host " hostname))) - (debug:print-info 1 "Killed server by pid at " hostname ":" port))) - ;; (if zmq-socket (close-socket zmq-socket)) + (tasks:kill-server status hostname #f pid)) + (format #t fmtstr id mt-ver pid hostname interface port start-time priority (if status "alive" "dead")))) servers) (debug:print-info 1 "Done with listservers") (set! *didsomething* #t) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -27,39 +27,59 @@ (define (server:make-server-url hostport) (if (not hostport) #f (conc "tcp://" (car hostport) ":" (cadr hostport)))) -(define *server-loop-heart-beat* (list 'start (current-seconds))) +(define *server-loop-heart-beat* (current-seconds)) +(define *heartbeat-mutex* (make-mutex)) +(define (server:self-ping iface port) + (let ((zsocket (server:client-connect iface port))) + (let loop () + (thread-sleep! 5) + (cdb:client-call zsocket 'ping #t) + (debug:print 4 "server:self-ping - I'm alive on " iface ":" port "!") + (mutex-lock! *heartbeat-mutex*) + (set! *server-loop-heart-beat* (current-seconds)) + (mutex-unlock! *heartbeat-mutex*) + (loop)))) + (define (server:run hostn) (debug:print 0 "Attempting to start the server ...") (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") (exit)))) (let* ((zmq-socket #f) + (zmq-socket-dat #f) (iface (if (string=? "-" hostn) "*" ;; (get-host-name) hostn)) (hostname (get-host-name)) (ipaddrstr (let ((ipstr (if (string=? "-" hostn) (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") #f))) - (if ipstr ipstr hostname)))) + (if ipstr ipstr hostname))) + (actual-port #f)) ;; (set! zmq-socket (server:find-free-port-and-open iface zmq-socket 5555 0)) - (set! zmq-socket (server:find-free-port-and-open ipaddrstr zmq-socket (if (args:get-arg "-port") + (set! zmq-socket-dat (server:find-free-port-and-open ipaddrstr zmq-socket (if (args:get-arg "-port") (string->number (args:get-arg "-port")) - 5555) + (+ 5000 (random 1001))) 0)) + (set! zmq-socket (cadr zmq-socket-dat)) + (set! actual-port (caddr zmq-socket-dat)) (set! *cache-on* #t) + + ;; (set! th1 (make-thread (lambda () + ;; (server:self-ping ipaddrstr actual-port)))) + ;; (thread-start! th1) ;; what to do when we quit ;; (on-exit (lambda () - (if (and *toppath* *server-id*) + (if (and *toppath* *server-info*) (begin (open-run-close tasks:server-deregister-self tasks:open-db ipaddrstr)) (let loop () (let ((queue-len 0)) (thread-sleep! (random 5)) @@ -72,21 +92,21 @@ (loop)))))))) ;; The heavy lifting ;; (let loop () - ;; Ugly yuk. - (mutex-lock! *incoming-mutex*) - (set! *server-loop-heart-beat* (list 'waiting (current-seconds))) - (mutex-unlock! *incoming-mutex*) + ;; ;; Ugly yuk. + ;; (mutex-lock! *incoming-mutex*) + ;; (set! *server-loop-heart-beat* (list 'waiting (current-seconds))) + ;; (mutex-unlock! *incoming-mutex*) (let* ((rawmsg (receive-message* zmq-socket)) (params (db:string->obj rawmsg)) ;; (with-input-from-string rawmsg (lambda ()(deserialize)))) (res #f)) ;;; Ugly yuk. - (mutex-lock! *incoming-mutex*) - (set! *server-loop-heart-beat* (list 'working (current-seconds))) - (mutex-unlock! *incoming-mutex*) + ;; (mutex-lock! *incoming-mutex*) + ;; (set! *server-loop-heart-beat* (list 'working (current-seconds))) + ;; (mutex-unlock! *incoming-mutex*) (debug:print-info 12 "server=> received params=" params) (set! res (cdb:cached-access params)) (debug:print-info 12 "server=> processed res=" res) (send-message zmq-socket (db:obj->string res)) (if (not *time-to-exit*) @@ -93,11 +113,13 @@ (loop) (begin (open-run-close tasks:server-deregister-self tasks:open-db #f) (db:write-cached-data) (exit) - )))))) + )))) + (thread-join! th1))) + ;; run server:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (server:keep-running) @@ -108,33 +130,43 @@ (db:write-cached-data) ;; (print "Server running, count is " count) (if (< count 2) ;; 3x3 = 9 secs aprox (loop (+ count 1)) (let ((numrunning (open-run-close db:get-count-tests-running #f)) - (server-loop-heartbeat #f)) - ;;; Ugly yuk. - (mutex-lock! *incoming-mutex*) + (server-loop-heartbeat #f) + (server-info #f) + (pulse 0)) + ;; BUG add a wait on server alive here!! + ;; ;; Ugly yuk. + (mutex-lock! *heartbeat-mutex*) (set! server-loop-heartbeat *server-loop-heart-beat*) - (mutex-unlock! *incoming-mutex*) + (set! server-info *server-info*) + (mutex-unlock! *heartbeat-mutex*) ;; The logic here is that if the server loop gets stuck blocked in working ;; we don't want to update our heartbeat - (let ((server-state (car server-loop-heartbeat)) - (server-update (cadr server-loop-heartbeat))) - (if (or (eq? server-state 'waiting) - (< (- (current-seconds) server-update) 10)) - (open-run-close tasks:server-update-heartbeat tasks:open-db *server-id*) - (debug:print "ERROR: No heartbeat update, server appears stuck"))) + (set! pulse (- (current-seconds) server-loop-heartbeat)) + (debug:print-info 1 "Heartbeat period is " pulse " on " (cadr server-info) ":" (caddr server-info)) + (if (> pulse 11) ;; must stay less than 10 seconds + (begin + (debug:print 0 "ERROR: Heartbeat failed, committing servercide") + (exit)) + (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))) (if (or (> numrunning 0) ;; stay alive for two days after last access - (> (+ *last-db-access* (* 48 60 60))(current-seconds))) + (> (+ *last-db-access* + ;; (* 48 60 60) ;; 48 hrs + ;; 60 ;; one minute + (* 60 60) ;; one hour + ) + (current-seconds))) (begin (debug:print-info 2 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*)) (loop 0)) (begin (debug:print-info 0 "Starting to shutdown the server.") ;; need to delete only *my* server entry (future use) (set! *time-to-exit* #t) - (open-run-close tasks:server-deregister-self tasks:open-db) + (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name)) (thread-sleep! 1) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") (exit))))))) @@ -156,12 +188,14 @@ (let ((zmq-url (conc "tcp://" iface ":" p))) (print "Trying to start server on " zmq-url) (bind-socket s zmq-url) (set! *runremote* #f) (debug:print 0 "Server started on " zmq-url) - (set! *server-id* (open-run-close tasks:server-register tasks:open-db (current-process-id) iface p 0 'live)) - s)))) + (mutex-lock! *heartbeat-mutex*) + (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) iface p 0 'live)) + (mutex-unlock! *heartbeat-mutex*) + (list iface s port))))) (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () @@ -197,29 +231,32 @@ (cdb:logout zmq-socket *toppath* (server:get-client-signature))))) ;; (close-socket zmq-socket) ok)) ;; Do all the connection work, start a server if not already running -(define (server:client-setup #!key (numtries 50)(do-ping #f)) +(define (server:client-setup #!key (numtries 50)) (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: failed to find megatest.config, exiting") (exit)))) - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db do-ping: do-ping))) + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo (let ((host (car hostinfo)) (iface (cadr hostinfo)) (port (caddr hostinfo))) (debug:print-info 2 "Setting up to connect to " hostinfo) (handle-exceptions exn (begin + ;; something went wrong in connecting to the server. In this scenario it is ok + ;; to try again (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo) (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) (debug:print 0 " perhaps jobs killed with -9? Removing server records") (open-run-close tasks:server-deregister tasks:open-db host port: port) + (server:client-setup (- numtries 1)) #f) (let* ((zmq-socket (server:client-connect iface port)) (login-res (server:client-login zmq-socket)) (connect-ok (if (null? login-res) #f (car login-res))) (conurl (server:make-server-url (list iface port)))) @@ -234,15 +271,17 @@ #f))))) (if (> numtries 0) (let ((exe (car (argv)))) (debug:print-info 1 "No server available, attempting to start one...") (process-run exe (list "-server" "-" "-debug" (conc *verbosity*))) - (sleep 2) - ;; not doing ping, assume the server started and registered itself - (server:client-setup numtries: (- numtries 1) do-ping: #f)) + (sleep 5) ;; give server time to start + ;; we are starting a server, do not try again! That can lead to + ;; recursively starting many processes!!! + (server:client-setup numtries: 0)) (debug:print-info 1 "Too many attempts, giving up"))))) +;; all routes though here end in exit ... (define (server:launch) (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, exiting") @@ -250,19 +289,34 @@ (debug:print-info 0 "Starting the standalone server") (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo (debug:print-info 1 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) (if *toppath* - (let* ((th2 (make-thread (lambda () - (server:run (args:get-arg "-server"))))) + (let* ((th1 (make-thread (lambda () + (let ((server-info #f)) + ;; wait for the server to be online and available + (let loop () + (debug:print-info 1 "Waiting for the server to come online before starting heartbeat") + (thread-sleep! 5) + (mutex-lock! *heartbeat-mutex*) + (set! server-info *server-info* ) + (mutex-unlock! *heartbeat-mutex*) + (if (not server-info)(loop))) + (debug:print 1 "Server alive, starting self-ping") + (server:self-ping (cadr server-info)(caddr server-info)))) "Self ping")) + (th2 (make-thread (lambda () + (server:run (args:get-arg "-server"))) "Server run")) (th3 (make-thread (lambda () - (server:keep-running))))) + (server:keep-running)) "Keep running"))) + (set! *client-non-blocking-mode* #t) + (thread-start! th1) (thread-start! th2) (thread-start! th3) (set! *didsomething* #t) (thread-join! th3)) - (debug:print 0 "ERROR: Failed to setup for megatest"))))) + (debug:print 0 "ERROR: Failed to setup for megatest"))) + (exit))) (define (server:client-launch #!key (do-ping #f)) (if (server:client-setup do-ping: do-ping) (debug:print-info 2 "connected as client") (begin @@ -287,22 +341,24 @@ (close-socket zmq-socket))) (set! res (list #t numclients (if return-socket zmq-socket #f)))) (begin ;; (close-socket zmq-socket) (set! res (list #f "CAN'T LOGIN" #f)))) - (set! res (list #f "CAN'T CONNECT" #f))))))) + (set! res (list #f "CAN'T CONNECT" #f))))) + "Ping: th1")) (th2 (make-thread (lambda () (let loop ((count 1)) (debug:print-info 1 "Ping " count " server on " host " at port " port) (thread-sleep! 2) (if (< count (/ secs 2)) (loop (+ count 1)))) ;; (thread-terminate! th1) - (set! res (list #f "TIMED OUT" #f)))))) + (set! res (list #f "TIMED OUT" #f))) + "Ping: th2"))) (thread-start! th2) (thread-start! th1) (handle-exceptions exn (set! res (list #f "TIMED OUT" #f)) (thread-join! th1 secs)) res)))) Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -80,19 +80,24 @@ (define (tasks:server-register mdb pid interface port priority state) (sqlite3:execute mdb "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface) VALUES(?,?,?,strftime('%s','now'),?,?,?,strftime('%s','now'),?);" pid (get-host-name) port priority (conc state) megatest-version interface) - (tasks:server-get-server-id mdb (get-host-name) port pid)) + (list + (tasks:server-get-server-id mdb (get-host-name) port pid) + interface + port)) ;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used! (define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)) (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid) (if pid - (sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid) + ;; (sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid) + (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid) (if port - (sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND port=?;" hostname port) + ;; (sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND port=?;" hostname port) + (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND port=?;" hostname port) (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified")))) (define (tasks:server-deregister-self mdb hostname) (tasks:server-deregister mdb hostname pid: (current-process-id))) @@ -119,17 +124,18 @@ (heartbeat-delta 99e9)) (sqlite3:for-each-row (lambda (delta) (set! heartbeat-delta delta)) mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id) + (debug:print 1 "Found heartbeat-delta of " heartbeat-delta " for server with id " server-id) (< heartbeat-delta 10))) (define (tasks:client-register mdb pid hostname cmdline) (sqlite3:execute mdb "INSERT OR REPLACE INTO clients (server_id,pid,hostname,cmdline,login_time) VALUES(?,?,?,?,strftime('%s','now'));") - (tasks:server-get-server-id mdb) + (tasks:server-get-server-id mdb hostname #f pid) pid hostname cmdline) (define (tasks:client-logout mdb pid hostname cmdline) (sqlite3:execute mdb @@ -148,17 +154,17 @@ (define (tasks:have-clients? mdb server-id) (null? (tasks:get-logged-in-clients mdb server-id))) ;; ping each server in the db and return first found that responds. ;; remove any others. will not necessarily remove all! -(define (tasks:get-best-server mdb #!key (do-ping #f)) +(define (tasks:get-best-server mdb) (let ((res '()) (best #f)) (sqlite3:for-each-row (lambda (id hostname interface port pid) (set! res (cons (list hostname interface port pid) res)) - (debug:print-info 1 "Found " hostname ":" port)) + (debug:print-info 1 "Found existing server " hostname ":" port " registered in db")) mdb "SELECT id,hostname,interface,port,pid FROM servers WHERE state='live' AND mt_version=? ORDER BY start_time DESC LIMIT 1;" megatest-version) ;; (print "res=" res) (if (null? res) #f (let loop ((hed (car res)) @@ -166,34 +172,49 @@ ;; (print "hed=" hed ", tal=" tal) (let* ((host (car hed)) (iface (cadr hed)) (port (caddr hed)) (pid (cadddr hed)) - ;; (ping-res (if do-ping (server:ping host port return-socket: #f) '(#t "NO PING" #f))) - (alive (open-run-close tasks:server-alive? tasks:open-db host port: port)) ;; (car ping-res)) - ;; (reason (cadr ping-res)) - ;; (zsocket (caddr ping-res)) - ) + (alive (open-run-close tasks:server-alive? tasks:open-db #f hostname: host port: port))) (if alive - ;; (if (server:ping iface port) - (list host iface port) - ;; ;; not actually alive, destroy! - ;; (begin - ;; (if (equal? host (get-host-name)) - ;; (begin - ;; (debug:print-info 0 "Killing process " pid " on host " host " with signal/term") - ;; (send-signal pid signal/term)) - ;; (debug:print 0 "WARNING: Can't kill process " pid " on host " host)) - ;; (open-run-close tasks:server-deregister tasks:open-db host port: port) - ;; #f)) - ;; remove defunct server from table + (begin + (debug:print 1 "Found an existing, alive, server " host ":" port ".") + (list host iface port)) (begin - (open-run-close tasks:server-deregister tasks:open-db host port: port) + (debug:print-info 1 "Removing " host ":" port " from server registry as it appears to be dead") + (tasks:kill-server #f host port pid) (if (null? tal) #f (loop (car tal)(cdr tal)))))))))) +(define (tasks:kill-server status hostname port pid) + (debug:print-info 1 "Removing defunct server record for " hostname ":" port) + (if port + (open-run-close tasks:server-deregister tasks:open-db hostname port: port) + (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid)) + + (if status ;; #t means alive + (begin + (if (equal? hostname (get-host-name)) + (begin + (debug:print 1 "Sending signal/term to " pid " on " hostname) + (process-signal pid signal/term) + (thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill + (process-signal pid signal/kill)) ;; local machine, send sig term + (begin + (debug:print-info 1 "Telling alive server on " hostname ":" port " to commit servercide") + (cdb:kill-server zmq-socket)))) ;; remote machine, try telling server to commit suicide + (begin + (if status + (if (equal? hostname (get-host-name)) + (begin + (debug:print-info 1 "Sending signal/term to " pid " on " hostname) + (process-signal pid signal/term) ;; local machine, send sig term + (thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill + (process-signal pid signal/kill)) + (debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname)))))) + (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row (lambda (id pid hostname interface port start-time priority state mt-version) (set! res (cons (vector id pid hostname interface port start-time priority state mt-version) res)))