Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -97,12 +97,11 @@ -env2file fname : write the environment to fname.csh and fname.sh -setvars VAR1=val1,VAR2=val2 : Add environment variables to a run NB// these are overwritten by values set in config files. -server -|hostname : start the server (reduces contention on megatest.db), use - to automatically figure out hostname - -listservers : list the servers - -killserver host:port|pid : kill server specified by host:port or pid + -list-servers : list the servers -repl : start a repl (useful for extending megatest) Spreadsheet generation -extract-ods fname.ods : extract an open document spreadsheet from the database -pathmod path : insert path, i.e. path/runame/itempath/logfile.html @@ -121,10 +120,11 @@ Called as " (string-intersperse (argv) " ") " Built from " megatest-fossil-hash )) ;; -gui : start a gui interface ;; -config fname : override the runconfig file with fname +;; -kill-server host:port|pid : kill server specified by host:port or pid ;; process args (define remargs (args:get-args (argv) (list "-runtests" ;; run a specific test @@ -157,11 +157,11 @@ ":expected" ":tol" ":units" ;; misc "-server" - "-killserver" + "-kill-server" "-port" "-extract-ods" "-pathmod" "-env2file" "-setvars" @@ -184,11 +184,11 @@ ;; misc "-archive" "-repl" "-lock" "-unlock" - "-listservers" + "-list-servers" ;; queries "-test-paths" ;; get path(s) to a test, ordered by youngest first "-runall" ;; run all tests "-remove-runs" @@ -270,52 +270,57 @@ (if (args:get-arg "-server") (begin (debug:print 1 "Launching server...") (server:launch))) -(if (or (args:get-arg "-listservers") - (args:get-arg "-killserver")) +(if (args:get-arg "-list-servers") + ;; (args:get-arg "-kill-server")) (let ((tl (setup-for-run))) (if tl (let ((servers (open-run-close tasks:get-all-servers tasks:open-db)) - (fmtstr "~5a~8a~8a~20a~20a~10a~20a~10a~10a\n") + (fmtstr "~5a~8a~8a~20a~20a~10a~10a~20a~10a~10a\n") (servers-to-kill '())) - (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface" "Port" "Time" "Priority" "State") - (format #t fmtstr "==" "=====" "===" "====" "=========" "====" "====" "========" "=====") + (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface" "OutPort" "InPort" "Time" "LastBeat" "State") + (format #t fmtstr "==" "=====" "===" "====" "=========" "=======" "======" "====" "========" "=====") (for-each (lambda (server) - (let* ((killinfo (args:get-arg "-killserver")) - (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f)) - (kpid (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f)) + (let* (;; (killinfo (args:get-arg "-kill-server")) + ;; (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f)) + ;; (kpid (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f)) (id (vector-ref server 0)) (pid (vector-ref server 1)) (hostname (vector-ref server 2)) (interface (vector-ref server 3)) - (port (vector-ref server 4)) - (start-time (vector-ref server 5)) - (priority (vector-ref server 6)) - (state (vector-ref server 7)) - (mt-ver (vector-ref server 8)) - (status (open-run-close tasks:server-alive? tasks:open-db #f hostname: hostname port: port)) + (pullport (vector-ref server 4)) + (pubport (vector-ref server 5)) + (start-time (vector-ref server 6)) + (priority (vector-ref server 7)) + (state (vector-ref server 8)) + (mt-ver (vector-ref server 9)) + (last-update (vector-ref server 10)) ;; (open-run-close tasks:server-alive? tasks:open-db #f hostname: hostname port: port)) (killed #f) - (zmq-socket (if status (server:client-connect hostname port) #f))) + (status (< last-update 20))) + ;; (zmq-sockets (if status (server:client-connect hostname port) #f))) ;; no need to login as status of #t indicates we are connecting to correct ;; server - (if (not status) ;; no point in keeping dead records in the db - (open-run-close tasks:server-deregister tasks:open-db hostname port: port pid: pid)) - - (if (and khost-port ;; kill by host/port - (equal? hostname (car khost-port)) - (equal? port (string->number (cadr khost-port)))) - (tasks:kill-server status hostname port pid)) - - (if (and kpid - (equal? hostname (get-host-name)) - (equal? kpid pid)) ;;; YEP, ALL WITH PID WILL BE KILLED!!! - (tasks:kill-server status hostname #f pid)) - - (format #t fmtstr id mt-ver pid hostname interface port start-time priority + (if (equal? state "dead") + (if (> last-update (* 25 60 60)) ;; keep records around for slighly over a day. + (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid action: 'delete)) + (if (> last-update 20) ;; Mark as dead if not updated in last 20 seconds + (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid))) + +;; (if (and khost-port ;; kill by host/port +;; (equal? hostname (car khost-port)) +;; (equal? port (string->number (cadr khost-port)))) +;; (tasks:kill-server status hostname port pid)) +;; +;; (if (and kpid +;; (equal? hostname (get-host-name)) +;; (equal? kpid pid)) ;;; YEP, ALL WITH PID WILL BE KILLED!!! +;; (tasks:kill-server status hostname #f pid)) +;; + (format #t fmtstr id mt-ver pid hostname interface pullport pubport start-time last-update (if status "alive" "dead")))) servers) (debug:print-info 1 "Done with listservers") (set! *didsomething* #t) (exit) ;; must do, would have to add checks to many/all calls below @@ -323,11 +328,11 @@ (exit))) ;; if not list or kill then start a client (if appropriate) (if (or (args-defined? "-h" "-version" "-gen-megatest-area" "-gen-megatest-test") (eq? (length (hash-table-keys args:arg-hash)) 0)) (debug:print-info 1 "Server connection not needed") - + (server:client-launch))) ;;====================================================================== ;; Remove old run(s) ;;====================================================================== Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -36,14 +36,14 @@ ;; ;; Done Tested ;; [x] [ ] 1. Add columns pullport pubport to servers table ;; [x] [ ] 2. Add rm of monitor.db if older than 11/12/2012 ;; [x] [ ] 3. Add create of pullport and pubport with finding of available ports -;; [ ] [ ] 4. Add client compose of request -;; [ ] [ ] - name of client: testname/itempath-test_id-hostname -;; [ ] [ ] - name of request: callname, params -;; [ ] [ ] - request key: f(clientname, callname, params) +;; [x] [ ] 4. Add client compose of request +;; [x] [ ] - name of client: testname/itempath-test_id-hostname +;; [x] [ ] - name of request: callname, params +;; [x] [ ] - request key: f(clientname, callname, params) ;; [ ] [ ] 5. Add processing of subscription hits ;; [ ] [ ] - done when get key ;; [ ] [ ] - return results ;; [ ] [ ] 6. Add timeout processing ;; [ ] [ ] - after 60 seconds @@ -57,24 +57,24 @@ (conc "tcp://" (car hostport) ":" (cadr hostport)))) (define *server-loop-heart-beat* (current-seconds)) (define *heartbeat-mutex* (make-mutex)) -(define (server:self-ping server-info) - ;; server-info: server-id interface pullport pubport - (let ((iface (list-ref server-info 1)) - (pullport (list-ref server-info 2)) - (pubport (list-ref server-info 3))) - (server:client-connect iface pullport pubport) - (let loop () - (thread-sleep! 2) - (cdb:client-call *runremote* 'ping #t) - (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!") - (mutex-lock! *heartbeat-mutex*) - (set! *server-loop-heart-beat* (current-seconds)) - (mutex-unlock! *heartbeat-mutex*) - (loop)))) +;; (define (server:self-ping server-info) +;; ;; server-info: server-id interface pullport pubport +;; (let ((iface (list-ref server-info 1)) +;; (pullport (list-ref server-info 2)) +;; (pubport (list-ref server-info 3))) +;; (server:client-connect iface pullport pubport) +;; (let loop () +;; (thread-sleep! 2) +;; (cdb:client-call *runremote* 'ping #t) +;; (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!") +;; (mutex-lock! *heartbeat-mutex*) +;; (set! *server-loop-heart-beat* (current-seconds)) +;; (mutex-unlock! *heartbeat-mutex*) +;; (loop)))) (define-inline (zmqsock:get-pub dat)(vector-ref dat 0)) (define-inline (zmqsock:get-pull dat)(vector-ref dat 1)) (define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0)) (define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0)) @@ -137,11 +137,11 @@ (loop)))))))) ;; The heavy lifting ;; (let loop () - (print "GOT HERE EH?") + ;; (print "GOT HERE EH?") (let* ((rawmsg (receive-message* pull-socket)) (params (db:string->obj rawmsg)) ;; (with-input-from-string rawmsg (lambda ()(deserialize)))) (res #f)) (debug:print-info 12 "server=> received params=" params) (set! res (cdb:cached-access params)) @@ -176,24 +176,27 @@ (server-loop-heartbeat #f) (server-info #f) (pulse 0)) ;; BUG add a wait on server alive here!! ;; ;; Ugly yuk. - (mutex-lock! *heartbeat-mutex*) - (set! server-loop-heartbeat *server-loop-heart-beat*) - (set! server-info *server-info*) - (mutex-unlock! *heartbeat-mutex*) + ;; == (mutex-lock! *heartbeat-mutex*) + ;; == (set! server-loop-heartbeat *server-loop-heart-beat*) + ;; == (set! server-info *server-info*) + ;; == (mutex-unlock! *heartbeat-mutex*) ;; The logic here is that if the server loop gets stuck blocked in working ;; we don't want to update our heartbeat - (set! pulse (- (current-seconds) server-loop-heartbeat)) - (debug:print-info 2 "Heartbeat period is " pulse " seconds on " (cadr server-info) ":" (caddr server-info) ", last db access is " (- (current-seconds) *last-db-access*) " seconds ago") - (if (> pulse 15) ;; must stay less than 10 seconds - (begin - (open-run-close tasks:server-deregister tasks:open-db (cadr server-info) pullport: (caddr server-info)) - (debug:print 0 "ERROR: Heartbeat failed, committing servercide") - (exit)) - (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))) + ;; == (set! pulse (- (current-seconds) server-loop-heartbeat)) + ;; == (debug:print-info 2 "Heartbeat period is " pulse " seconds on " (cadr server-info) ":" (caddr server-info) ", last db access is " (- (current-seconds) *last-db-access*) " seconds ago") + ;; == (if (> pulse 15) ;; must stay less than 10 seconds + ;; == (begin + ;; == (open-run-close tasks:server-deregister tasks:open-db (cadr server-info) pullport: (caddr server-info)) + ;; == (debug:print 0 "ERROR: Heartbeat failed, committing servercide") + ;; == (exit)) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access (if (> (+ *last-db-access* ;; (* 48 60 60) ;; 48 hrs ;; 60 ;; one minute (* 60 60) ;; one hour @@ -316,31 +319,31 @@ (let ((host (list-ref hostinfo 0)) (iface (list-ref hostinfo 1)) (pullport (list-ref hostinfo 2)) (pubport (list-ref hostinfo 3))) (debug:print-info 2 "Setting up to connect to " hostinfo) - (handle-exceptions - exn + ;;(handle-exceptions + ;; exn (begin ;; something went wrong in connecting to the server. In this scenario it is ok ;; to try again (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo) (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) (debug:print 0 " perhaps jobs killed with -9? Removing server records") (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport) (server:client-setup (- numtries 1)) #f) - (server:client-connect iface pullport pubport))) - (if (> numtries 0) - (let ((exe (car (argv)))) - (debug:print-info 1 "No server available, attempting to start one...") - (process-run exe (list "-server" "-" "-debug" (conc *verbosity*))) - (sleep 5) ;; give server time to start - ;; we are starting a server, do not try again! That can lead to - ;; recursively starting many processes!!! - (server:client-setup numtries: 0)) - (debug:print-info 1 "Too many attempts, giving up"))))) + (server:client-connect iface pullport pubport))))) + ;; (if (> numtries 0) + ;; (let ((exe (car (argv)))) + ;; (debug:print-info 1 "No server available, attempting to start one...") + ;; (process-run exe (list "-server" "-" "-debug" (conc *verbosity*))) + ;; (sleep 5) ;; give server time to start + ;; ;; we are starting a server, do not try again! That can lead to + ;; ;; recursively starting many processes!!! + ;; (server:client-setup numtries: 0)) + ;; (debug:print-info 1 "Too many attempts, giving up"))))) ;; all routes though here end in exit ... (define (server:launch) (if (not *toppath*) (if (not (setup-for-run)) @@ -350,30 +353,30 @@ (debug:print-info 1 "Starting the standalone server") (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo (debug:print-info 1 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) (if *toppath* - (let* ((th1 (make-thread (lambda () - (let ((server-info #f)) - ;; wait for the server to be online and available - (let loop () - (debug:print-info 1 "Waiting for the server to come online before starting heartbeat") - (thread-sleep! 2) - (mutex-lock! *heartbeat-mutex*) - (set! server-info *server-info* ) - (mutex-unlock! *heartbeat-mutex*) - (if (not server-info)(loop))) - (debug:print 1 "Server alive, starting self-ping") - ;; (server:self-ping server-info) - )) - "Self ping")) + (let* (;; (th1 (make-thread (lambda () + ;; (let ((server-info #f)) + ;; ;; wait for the server to be online and available + ;; (let loop () + ;; (debug:print-info 1 "Waiting for the server to come online before starting heartbeat") + ;; (thread-sleep! 2) + ;; (mutex-lock! *heartbeat-mutex*) + ;; (set! server-info *server-info* ) + ;; (mutex-unlock! *heartbeat-mutex*) + ;; (if (not server-info)(loop))) + ;; (debug:print 1 "Server alive, starting self-ping") + ;; (server:self-ping server-info) + ;; )) + ;; "Self ping")) (th2 (make-thread (lambda () (server:run (args:get-arg "-server"))) "Server run")) (th3 (make-thread (lambda () (server:keep-running)) "Keep running"))) (set! *client-non-blocking-mode* #t) - (thread-start! th1) + ;; (thread-start! th1) (thread-start! th2) (thread-start! th3) (set! *didsomething* #t) (thread-join! th3)) (debug:print 0 "ERROR: Failed to setup for megatest"))) Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -98,18 +98,20 @@ interface pullport pubport)) ;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used! -(define (tasks:server-deregister mdb hostname #!key (pullport #f)(pid #f)) +(define (tasks:server-deregister mdb hostname #!key (pullport #f)(pid #f)(action 'markdead)) (debug:print-info 11 "server-deregister " hostname ", pullport " pullport ", pid " pid) (if pid - ;; (sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid) - (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid) + (case action + ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid)) + (else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid))) (if pullport - ;; (sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND port=?;" hostname port) - (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND pullport=?;" hostname pullport) + (case action + ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND port=?;" hostname port)) + (else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND pullport=?;" hostname pullport))) (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified")))) (define (tasks:server-deregister-self mdb hostname) (tasks:server-deregister mdb hostname pid: (current-process-id))) @@ -233,18 +235,20 @@ (debug:print-info 1 "Sending signal/term to " pid " on " hostname) (process-signal pid signal/term) ;; local machine, send sig term (thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill (process-signal pid signal/kill)) (debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname)))))) + + (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row - (lambda (id pid hostname interface pullport pubport start-time priority state mt-version) - (set! res (cons (vector id pid hostname interface pullport pubport start-time priority state mt-version) res))) + (lambda (id pid hostname interface pullport pubport start-time priority state mt-version last-update) + (set! res (cons (vector id pid hostname interface pullport pubport start-time priority state mt-version last-update) res))) mdb - "SELECT id,pid,hostname,interface,pullport,pubport,start_time,priority,state,mt_version FROM servers ORDER BY start_time DESC;") + "SELECT id,pid,hostname,interface,pullport,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update FROM servers ORDER BY start_time DESC;") res)) ;;====================================================================== ;; Tasks and Task monitors