Index: nmsg-transport.scm ================================================================== --- nmsg-transport.scm +++ nmsg-transport.scm @@ -36,11 +36,11 @@ (use nanomsg srfi-18) ;;start a server, returns the connection ;; -(define (nmsg:start-server portnum ) +(define (nmsg:start-server portnum) (let ((rep (nn-socket 'rep))) (handle-exceptions exn (let ((emsg ((condition-property-accessor 'exn 'message) exn))) (print "ERROR: Failed to start server \"" emsg "\"") @@ -53,19 +53,18 @@ ;; to take an action on failure use proc which is called with the error info ;; (proc exn errormsg) ;; ;; returns the response or #f if no response within timeout ;; -(define (nmsg:open-send-close host-port msg attrib #!key (timeout 3)(proc #f)) ;; default timeout is 3 seconds +(define (nmsg:open-send-close host-port msg #!key (timeout 3)(proc #f)) ;; default timeout is 3 seconds (let ((req (nn-socket 'req)) (uri (conc "tcp://" host-port)) - (res #f) - (mode (alist-ref 'mode attrib))) + (res #f)) (handle-exceptions exn (let ((emsg ((condition-property-accessor 'exn 'message) exn))) - ;; Send notification + ;; call proc on fail (if proc (proc exn emsg)) #f) (nn-connect req uri) (print "Connected to the server " ) (nn-send req msg) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -67,25 +67,32 @@ (conn #f) (port #f) (myaddr #f) (hosts (make-hash-table)) pktid ;; get pkt from hosts table if needed - pktspec pktfile pktsdir mtrah (mutex (make-mutex)) ) ;; make it a global? Well, it is local to area module (define *area-conndat* (make-area)) -(area-pktspec-set! *area-conndat* - `((server (hostname . h) - (port . p) - (pid . i) - ))) +(define *pktspec* + `((server (hostname . h) + (port . p) + (pid . i) + ) + (data (hostname . h) ;; sender hostname + (port . p) ;; sender port + (ip . i) ;; sender ip + (hostkey . k) ;; sending host key - store info at server under this key + (servkey . s) ;; server key - this needs to match at server end or reject the msg + (format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json + (data . d) ;; base64 encoded slln data + ))) (define (server:get-mtrah) (or (get-environment-variable "MT_RUN_AREA_HOME") (if (file-exists? "megatest.config") (current-directory) @@ -105,11 +112,11 @@ (if (args:any? "-run" "-server") 'main 'passive))) (port-num (portlogger:open-run-close portlogger:find-port)) (area-conn (nmsg:start-server port-num)) - (pktspec (area-pktspec *area-conndat*)) + ;; (pktspec (area-pktspec *area-conndat*)) (mtdir (or (server:get-mtrah) (begin (print "ERROR: megatest.config not found and MT_RUN_AREA_HOME is not set.") #f))) (pktdir (conc mtdir @@ -125,11 +132,11 @@ (write-alist->pkt pktdir `((hostname . ,(get-host-name)) (port . ,port-num) (pid . ,(current-process-id))) - pktspec: pktspec + pktspec: *pktspec* ptype: 'server)) (area-pktfile-set! *area-conndat* (conc pktdir "/" (area-pktid *area-conndat*) ".pkt")))) ;; set all the area info in the (area-pktsdir-set! *area-conndat* pktdir) (area-mtrah-set! *area-conndat* mtdir) @@ -141,19 +148,38 @@ ;; Call this to start the actual server ;; ;; start_server ;; ;; mode: ' +;; handler: proc which takes pktrecieved as argument ;; -(define (server:launch mode) - (let ((start-time (current-seconds))) - (server:start-nmsg mode) - (let loop ((dead-time (- (current-seconds) start-time))) - (thread-sleep! 1) - (if (< dead-time 10) - (loop (- (current-seconds) start-time)) - (print "Timed out. Exiting"))))) +(define (server:launch mode handler) + (let* ((start-time (current-seconds)) + (rep (server:start-nmsg mode)) + (last-msg (current-seconds)) + (th1 (make-thread + (lambda () + (let loop () + (let ((pktdat (nn-recv rep))) + (set! last-msg (current-seconds)) + (if (not (eof-object? pktdat)) + (begin + (proc pktdat) + (loop)))))) + "message handler")) + (th2 (make-thread + (lambda () + (let loop () + (thread-sleep! 10) + (if (> (- (current-seconds) last-msg) 60) ;; timeout after 60 seconds + (begin + (print "Waited for 60 seconds and no messages, exiting now.") + (exit)) + (loop))))))) + (thread-start! th1) + (thread-start! th2) + (thread-join th1))) (define (server:shutdown) (let ((conn (area-conn *area-conndat*)) (pktf (area-pktfile *area-conndat*)) (port (area-port *area-conndat*))) @@ -167,27 +193,67 @@ (define (server:send-all msg) #f) ;; given a area record look up all the packets (define (server:get-all-server-pkts rec) - (let ((all-pkt-files (glob (conc (area-pktsdir rec) "/*.pkt"))) - (pktspec (area-pktspec rec))) + (let ((all-pkt-files (glob (conc (area-pktsdir rec) "/*.pkt")))) +;; (pktspec (area-pktspec rec))) (map (lambda (pkt-file) - (read-pkt->alist pkt-file pktspec: pktspec)) + (read-pkt->alist pkt-file pktspec: *pktspec*)) all-pkt-files))) + +#;((Z . "9a0212302295a19610d5796fce0370fa130758e9") + (port . "34827") + (pid . "28748") + (hostname . "zeus") + (T . "server") + (D . "1549427032.0")) + +;; srvpkt is the info for the server we wish to send the message to +;; +(define (server:send servpkt data dtype) + (let* ((port (alist-ref 'port servpkt)) + (host (alist-ref 'hostname servpkt)) + (hkey (alist-ref 'Z servpkt)) + (addr (conc host ":" port)) + (myport (area-port *area-conndat*)) + (myhost (area-myaddr *area-conndat*)) + (mykey (area-pktid *area-conndat*)) + (msg (alist->pkt `((hostname . ,myhost) + (port . ,myport) + (servkey . ,hkey) ;; server looks at this to ensure message is for them + (hostkey . ,mykey) + (format . ,dtype) ;; formating of the message + (data . ,data)) + *pktspec*))) + (if (and port host) + (nmsg:open-send-receive addr msg) + #f))) + +;; is the server alive? +;; +(define (server:ping servpkt) + (let* ((start-time (current-milliseconds)) + (res (server:send servpkt "ping" "t"))) + (cons (- (current-milliseconds) + (equal? res "got ping"))))) ;; look up all pkts and get the server id (the hash), port, host/ip ;; store this info in the global struct *area-conndat* ;; (define (server:get-all) ;; readll all pkts ;; foreach pkt; if it isn't me ping the server; if alive, add to hosts hash, else rm the pkt - '()) + (let ((all-pkts (server:get-all-server-pkts *area-conn*))) + (for-each + (lambda (servpkt) + (server:ping servpkt)) + all-pkts))) ;; send out an "I'm about to exit notice to all known servers" ;; -(define (server:announce-death) +(define (server:imminent-death) '()) (define (server:get-my-best-address) (ip->string (car (filter (lambda (x) (not (eq? (u8vector-ref x 0) 127))) @@ -204,548 +270,5 @@ ;; get a signature for identifing this process (define (server:get-process-signature) (cons (get-host-name)(current-process-id))) - -;; ;; Get the transport -;; (define (server:get-transport) -;; (if *transport-type* -;; *transport-type* -;; (let ((ttype (string->symbol -;; (or (args:get-arg "-transport") -;; (configf:lookup *configdat* "server" "transport") -;; "rpc")))) -;; (set! *transport-type* ttype) -;; ttype))) -;; -;; ;; Generate a unique signature for this server -;; (define (server:mk-signature) -;; (message-digest-string (md5-primitive) -;; (with-output-to-string -;; (lambda () -;; (write (list (current-directory) -;; (argv))))))) -;; -;; ;; When using zmq this would send the message back (two step process) -;; ;; with spiffy or rpc this simply returns the return data to be returned -;; ;; -;; (define (server:reply return-addr query-sig success/fail result) -;; (debug:print-info 11 *default-log-port* "server:reply return-addr=" return-addr ", result=" result) -;; ;; (send-message pubsock target send-more: #t) -;; ;; (send-message pubsock -;; (case (server:get-transport) -;; ((rpc) (db:obj->string (vector success/fail query-sig result))) -;; ((http) (db:obj->string (vector success/fail query-sig result))) -;; ((fs) result) -;; (else -;; (debug:print-error 0 *default-log-port* "unrecognised transport type: " *transport-type*) -;; result))) -;; -;; ;; Given a run id start a server process ### NOTE ### > file 2>&1 -;; ;; if the run-id is zero and the target-host is set -;; ;; try running on that host -;; ;; incidental: rotate logs in logs/ dir. -;; ;; -;; (define (server:run areapath) ;; areapath is *toppath* for a given testsuite area -;; (let* ((curr-host (get-host-name)) -;; ;; (attempt-in-progress (server:start-attempted? areapath)) -;; ;; (dot-server-url (server:check-if-running areapath)) -;; (curr-ip (server:get-best-guess-address curr-host)) -;; (curr-pid (current-process-id)) -;; (homehost (common:get-homehost)) ;; configf:lookup *configdat* "server" "homehost" )) -;; (target-host (car homehost)) -;; (testsuite (common:get-testsuite-name)) -;; (logfile (conc areapath "/logs/server.log")) ;; -" curr-pid "-" target-host ".log")) -;; (cmdln (conc (common:get-megatest-exe) -;; " -server " (or target-host "-") (if (equal? (configf:lookup *configdat* "server" "daemonize") "yes") -;; " -daemonize " -;; "") -;; ;; " -log " logfile -;; " -m testsuite:" testsuite)) ;; (conc " >> " logfile " 2>&1 &"))))) -;; (log-rotate (make-thread common:rotate-logs "server run, rotate logs thread")) -;; (load-limit (configf:lookup-number *configdat* "jobtools" "max-server-start-load" default: 3.0))) -;; ;; we want the remote server to start in *toppath* so push there -;; (push-directory areapath) -;; (debug:print 0 *default-log-port* "INFO: Trying to start server (" cmdln ") ...") -;; (thread-start! log-rotate) -;; -;; ;; host.domain.tld match host? -;; (if (and target-host -;; ;; look at target host, is it host.domain.tld or ip address and does it -;; ;; match current ip or hostname -;; (not (string-match (conc "("curr-host "|" curr-host"\\..*)") target-host)) -;; (not (equal? curr-ip target-host))) -;; (begin -;; (debug:print-info 0 *default-log-port* "Starting server on " target-host ", logfile is " logfile) -;; (setenv "TARGETHOST" target-host))) -;; -;; (setenv "TARGETHOST_LOGF" logfile) -;; (thread-sleep! (/ (random 5000) 1000)) ;; add about a random (up to 5 seconds) initial delay. It seems pretty common that many running tests request a server at the same time -;; (common:wait-for-normalized-load load-limit " delaying server start due to load" target-host) ;; do not try starting servers on an already overloaded machine, just wait forever -;; (system (conc "nbfake " cmdln)) -;; (unsetenv "TARGETHOST_LOGF") -;; (if (get-environment-variable "TARGETHOST")(unsetenv "TARGETHOST")) -;; (thread-join! log-rotate) -;; (pop-directory))) -;; -;; ;; given a path to a server log return: host port startseconds -;; ;; -;; (define (server:logf-get-start-info logf) -;; (let ((rx (regexp "^SERVER STARTED: (\\S+):(\\d+) AT ([\\d\\.]+)"))) ;; SERVER STARTED: host:port AT timesecs -;; (handle-exceptions -;; exn -;; (list #f #f #f) ;; no idea what went wrong, call it a bad server -;; (with-input-from-file -;; logf -;; (lambda () -;; (let loop ((inl (read-line)) -;; (lnum 0)) -;; (if (not (eof-object? inl)) -;; (let ((mlst (string-match rx inl))) -;; (if (not mlst) -;; (if (< lnum 500) ;; give up if more than 500 lines of server log read -;; (loop (read-line)(+ lnum 1)) -;; (list #f #f #f)) -;; (let ((dat (cdr mlst))) -;; (list (car dat) ;; host -;; (string->number (cadr dat)) ;; port -;; (string->number (caddr dat)))))) -;; (list #f #f #f)))))))) -;; -;; ;; get a list of servers with all relevant data -;; ;; ( mod-time host port start-time pid ) -;; ;; -;; (define (server:get-list areapath #!key (limit #f)) -;; (let ((fname-rx (regexp "^(|.*/)server-(\\d+)-(\\S+).log$")) -;; (day-seconds (* 24 60 60))) -;; ;; if the directory exists continue to get the list -;; ;; otherwise attempt to create the logs dir and then -;; ;; continue -;; (if (if (directory-exists? (conc areapath "/logs")) -;; '() -;; (if (file-write-access? areapath) -;; (begin -;; (condition-case -;; (create-directory (conc areapath "/logs") #t) -;; (exn (i/o file)(debug:print 0 *default-log-port* "ERROR: Cannot create directory at " (conc areapath "/logs"))) -;; (exn ()(debug:print 0 *default-log-port* "ERROR: Unknown error attemtping to get server list."))) -;; (directory-exists? (conc areapath "/logs"))) -;; '())) -;; (let* ((server-logs (glob (conc areapath "/logs/server-*.log"))) -;; (num-serv-logs (length server-logs))) -;; (if (null? server-logs) -;; '() -;; (let loop ((hed (car server-logs)) -;; (tal (cdr server-logs)) -;; (res '())) -;; (let* ((mod-time (handle-exceptions -;; exn -;; (current-seconds) ;; 0 -;; (file-modification-time hed))) ;; default to *very* old so log gets ignored if deleted -;; (down-time (- (current-seconds) mod-time)) -;; (serv-dat (if (or (< num-serv-logs 10) -;; (< down-time 900)) ;; day-seconds)) -;; (server:logf-get-start-info hed) -;; '())) ;; don't waste time processing server files not touched in the 15 minutes if there are more than ten servers to look at -;; (serv-rec (cons mod-time serv-dat)) -;; (fmatch (string-match fname-rx hed)) -;; (pid (if fmatch (string->number (list-ref fmatch 2)) #f)) -;; (new-res (if (null? serv-dat) -;; res -;; (cons (append serv-rec (list pid)) res)))) -;; (if (null? tal) -;; (if (and limit -;; (> (length new-res) limit)) -;; new-res ;; (take new-res limit) <= need intelligent sorting before this will work -;; new-res) -;; (loop (car tal)(cdr tal) new-res))))))))) -;; -;; (define (server:get-num-alive srvlst) -;; (let ((num-alive 0)) -;; (for-each -;; (lambda (server) -;; (match-let (((mod-time host port start-time pid) -;; server)) -;; (let* ((uptime (- (current-seconds) mod-time)) -;; (runtime (if start-time -;; (- mod-time start-time) -;; 0))) -;; (if (< uptime 5)(set! num-alive (+ num-alive 1)))))) -;; srvlst) -;; num-alive)) -;; -;; ;; given a list of servers get a list of valid servers, i.e. at least -;; ;; 10 seconds old, has started and is less than 1 hour old and is -;; ;; active (i.e. mod-time < 10 seconds -;; ;; -;; ;; mod-time host port start-time pid -;; ;; -;; ;; sort by start-time descending. I.e. get the oldest first. Young servers will thus drop off -;; ;; and servers should stick around for about two hours or so. -;; ;; -;; (define (server:get-best srvlst) -;; (let* ((nums (server:get-num-servers)) -;; (now (current-seconds)) -;; (slst (sort -;; (filter (lambda (rec) -;; (if (and (list? rec) -;; (> (length rec) 2)) -;; (let ((start-time (list-ref rec 3)) -;; (mod-time (list-ref rec 0))) -;; ;; (print "start-time: " start-time " mod-time: " mod-time) -;; (and start-time mod-time -;; (> (- now start-time) 0) ;; been running at least 0 seconds -;; (< (- now mod-time) 16) ;; still alive - file touched in last 16 seconds -;; (< (- now start-time) -;; (+ (- (string->number (or (configf:lookup *configdat* "server" "runtime") "3600")) -;; 180) -;; (random 360))) ;; under one hour running time +/- 180 -;; )) -;; #f)) -;; srvlst) -;; (lambda (a b) -;; (< (list-ref a 3) -;; (list-ref b 3)))))) -;; (if (> (length slst) nums) -;; (take slst nums) -;; slst))) -;; -;; (define (server:get-first-best areapath) -;; (let ((srvrs (server:get-best (server:get-list areapath)))) -;; (if (and srvrs -;; (not (null? srvrs))) -;; (car srvrs) -;; #f))) -;; -;; (define (server:get-rand-best areapath) -;; (let ((srvrs (server:get-best (server:get-list areapath)))) -;; (if (and (list? srvrs) -;; (not (null? srvrs))) -;; (let* ((len (length srvrs)) -;; (idx (random len))) -;; (list-ref srvrs idx)) -;; #f))) -;; -;; -;; (define (server:record->url servr) -;; (match-let (((mod-time host port start-time pid) -;; servr)) -;; (if (and host port) -;; (conc host ":" port) -;; #f))) -;; -;; (define (server:get-client-signature) ;; BB> why is this proc named "get-"? it returns nothing -- set! has not return value. -;; (if *my-client-signature* *my-client-signature* -;; (let ((sig (server:mk-signature))) -;; (set! *my-client-signature* sig) -;; *my-client-signature*))) -;; -;; ;; kind start up of servers, wait 40 seconds before allowing another server for a given -;; ;; run-id to be launched -;; (define (server:kind-run areapath) -;; (if (not (server:check-if-running areapath)) ;; why try if there is already a server running? -;; (let* ((last-run-dat (hash-table-ref/default *server-kind-run* areapath '(0 0))) ;; callnum, whenrun -;; (call-num (car last-run-dat)) -;; (when-run (cadr last-run-dat)) -;; (run-delay (+ (case call-num -;; ((0) 0) -;; ((1) 20) -;; ((2) 300) -;; (else 600)) -;; (random 5))) ;; add a small random number just in case a lot of jobs hit the work hosts simultaneously -;; (lock-file (conc areapath "/logs/server-start.lock"))) -;; (if (> (- (current-seconds) when-run) run-delay) -;; (begin -;; (common:simple-file-lock-and-wait lock-file expire-time: 15) -;; (server:run areapath) -;; (thread-sleep! 5) ;; don't release the lock for at least a few seconds -;; (common:simple-file-release-lock lock-file))) -;; (hash-table-set! *server-kind-run* areapath (list (+ call-num 1)(current-seconds)))))) -;; -;; (define (server:start-and-wait areapath #!key (timeout 60)) -;; (let ((give-up-time (+ (current-seconds) timeout))) -;; (let loop ((server-url (server:check-if-running areapath)) -;; (try-num 0)) -;; (if (or server-url -;; (> (current-seconds) give-up-time)) ;; server-url will be #f if no server available. -;; server-url -;; (let ((num-ok (length (server:get-best (server:get-list areapath))))) -;; (if (and (> try-num 0) ;; first time through simply wait a little while then try again -;; (< num-ok 1)) ;; if there are no decent candidates for servers then try starting a new one -;; (server:kind-run areapath)) -;; (thread-sleep! 5) -;; (loop (server:check-if-running areapath) -;; (+ try-num 1))))))) -;; -;; (define server:try-running server:run) ;; there is no more per-run servers ;; REMOVE ME. BUG. -;; -;; (define (server:get-num-servers #!key (numservers 2)) -;; (let ((ns (string->number -;; (or (configf:lookup *configdat* "server" "numservers") "notanumber")))) -;; (or ns numservers))) -;; -;; ;; no longer care if multiple servers are started by accident. older servers will drop off in time. -;; ;; -;; (define (server:check-if-running areapath) ;; #!key (numservers "2")) -;; (let* ((ns (server:get-num-servers)) -;; (servers (server:get-best (server:get-list areapath)))) -;; ;; (print "servers: " servers " ns: " ns) -;; (if (or (and servers -;; (null? servers)) -;; (not servers) -;; (and (list? servers) -;; (< (length servers) (random ns)))) ;; somewhere between 0 and numservers -;; #f -;; (let loop ((hed (car servers)) -;; (tal (cdr servers))) -;; (let ((res (server:check-server hed))) -;; (if res -;; res -;; (if (null? tal) -;; #f -;; (loop (car tal)(cdr tal))))))))) -;; -;; ;; ping the given server -;; ;; -;; (define (server:check-server server-record) -;; (let* ((server-url (server:record->url server-record)) -;; (res (case *transport-type* -;; ((http)(server:ping server-url)) -;; ;; ((nmsg)(nmsg-transport:ping (tasks:hostinfo-get-interface server) -;; ))) -;; (if res -;; server-url -;; #f))) -;; -;; ;; DO STUFF HERE - BUG -;; (define (server:kill servr) -;; (match-let (((mod-time hostname port start-time pid) -;; servr)) -;; #;(tasks:kill-server hostname pid) -;; #f -;; )) -;; -;; ;; called in megatest.scm, host-port is string hostname:port -;; ;; -;; ;; NOTE: This is NOT called directly from clients as not all transports support a client running -;; ;; in the same process as the server. -;; ;; -;; (define (server:ping host-port-in #!key (do-exit #f)) -;; (let ((host:port (if (not host-port-in) ;; use read-dotserver to find -;; #f ;; (server:check-if-running *toppath*) -;; ;; (if (number? host-port-in) ;; we were handed a server-id -;; ;; (let ((srec (tasks:get-server-by-id (db:delay-if-busy (tasks:open-db)) host-port-in))) -;; ;; ;; (print "srec: " srec " host-port-in: " host-port-in) -;; ;; (if srec -;; ;; (conc (vector-ref srec 3) ":" (vector-ref srec 4)) -;; ;; (conc "no such server-id " host-port-in))) -;; host-port-in))) ;; ) -;; (let* ((host-port (if host:port -;; (let ((slst (string-split host:port ":"))) -;; (if (eq? (length slst) 2) -;; (list (car slst)(string->number (cadr slst))) -;; #f)) -;; #f))) -;; ;; (toppath (launch:setup))) -;; ;; (print "host-port=" host-port) -;; (if (not host-port) -;; (begin -;; (if host-port-in -;; (debug:print 0 *default-log-port* "ERROR: bad host:port")) -;; (if do-exit (exit 1)) -;; #f) -;; (let* ((iface (car host-port)) -;; (port (cadr host-port)) -;; (server-dat (http-transport:client-connect iface port)) -;; (login-res (rmt:login-no-auto-client-setup server-dat))) -;; (if (and (list? login-res) -;; (car login-res)) -;; (begin -;; ;; (print "LOGIN_OK") -;; (if do-exit (exit 0)) -;; #t) -;; (begin -;; ;; (print "LOGIN_FAILED") -;; (if do-exit (exit 1)) -;; #f))))))) -;; -;; ;; run ping in separate process, safest way in some cases -;; ;; -;; (define (server:ping-server ifaceport) -;; (with-input-from-pipe -;; (conc (common:get-megatest-exe) " -ping " ifaceport) -;; (lambda () -;; (let loop ((inl (read-line)) -;; (res "NOREPLY")) -;; (if (eof-object? inl) -;; (case (string->symbol res) -;; ((NOREPLY) #f) -;; ((LOGIN_OK) #t) -;; (else #f)) -;; (loop (read-line) inl)))))) -;; -;; ;; NOT USED (well, ok, reference in rpc-transport but otherwise not used). -;; ;; -;; (define (server:login toppath) -;; (lambda (toppath) -;; (set! *db-last-access* (current-seconds)) ;; might not be needed. -;; (if (equal? *toppath* toppath) -;; #t -;; #f))) -;; -;; ;; timeout is hms string: 1h 5m 3s, default is 1 minute -;; ;; -;; (define (server:expiration-timeout) -;; (let ((tmo (configf:lookup *configdat* "server" "timeout"))) -;; (if (and (string? tmo) -;; (common:hms-string->seconds tmo)) ;; BUG: hms-string->seconds is broken, if given "10" returns 0. Also, it doesn't belong in this logic unless the string->number is changed below -;; (* 3600 (string->number tmo)) -;; 60))) -;; -;; (define (server:get-best-guess-address hostname) -;; (let ((res #f)) -;; (for-each -;; (lambda (adr) -;; (if (not (eq? (u8vector-ref adr 0) 127)) -;; (set! res adr))) -;; ;; NOTE: This can fail when there is no mention of the host in /etc/hosts. FIXME -;; (vector->list (hostinfo-addresses (hostname->hostinfo hostname)))) -;; (string-intersperse -;; (map number->string -;; (u8vector->list -;; (if res res (hostname->ip hostname)))) "."))) -;; -;; ;; moving this here as it needs access to db and cannot be in common. -;; ;; -;; (define (server:writable-watchdog dbstruct) -;; (thread-sleep! 0.05) ;; delay for startup -;; (let ((legacy-sync (common:run-sync?)) -;; (sync-stale-seconds (configf:lookup-number *configdat* "server" "sync-stale-seconds" default: 300)) -;; (debug-mode (debug:debug-mode 1)) -;; (last-time (current-seconds)) -;; (no-sync-db (db:open-no-sync-db)) -;; (sync-duration 0) ;; run time of the sync in milliseconds -;; ;;(this-wd-num (begin (mutex-lock! *wdnum*mutex) (let ((x *wdnum*)) (set! *wdnum* (add1 *wdnum*)) (mutex-unlock! *wdnum*mutex) x))) -;; ) -;; (set! *no-sync-db* no-sync-db) ;; make the no sync db available to api calls -;; (debug:print-info 2 *default-log-port* "Periodic sync thread started.") -;; (debug:print-info 3 *default-log-port* "watchdog starting. legacy-sync is " legacy-sync" pid="(current-process-id) );; " this-wd-num="this-wd-num) -;; (if (and legacy-sync (not *time-to-exit*)) -;; (let* (;;(dbstruct (db:setup)) -;; (mtdb (dbr:dbstruct-mtdb dbstruct)) -;; (mtpath (db:dbdat-get-path mtdb)) -;; (tmp-area (common:get-db-tmp-area)) -;; (start-file (conc tmp-area "/.start-sync")) -;; (end-file (conc tmp-area "/.end-sync"))) -;; (debug:print-info 0 *default-log-port* "Server running, periodic sync started.") -;; (let loop () -;; ;; sync for filesystem local db writes -;; ;; -;; (mutex-lock! *db-multi-sync-mutex*) -;; (let* ((need-sync (>= *db-last-access* *db-last-sync*)) ;; no sync since last write -;; (sync-in-progress *db-sync-in-progress*) -;; (min-intersync-delay (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 5)) -;; (should-sync (and (not *time-to-exit*) -;; (> (- (current-seconds) *db-last-sync*) min-intersync-delay))) ;; sync every five seconds minimum, deprecated logic, can probably be removed -;; (start-time (current-seconds)) -;; (cpu-load-adj (alist-ref 'adj-proc-load (common:get-normalized-cpu-load #f))) -;; (mt-mod-time (file-modification-time mtpath)) -;; (last-sync-start (if (common:file-exists? start-file) -;; (file-modification-time start-file) -;; 0)) -;; (last-sync-end (if (common:file-exists? end-file) -;; (file-modification-time end-file) -;; 10)) -;; (sync-period (+ 3 (* cpu-load-adj 30))) ;; as adjusted load increases increase the sync period -;; (recently-synced (and (< (- start-time mt-mod-time) sync-period) ;; not useful if sync didn't modify megatest.db! -;; (< mt-mod-time last-sync-start))) -;; (sync-done (<= last-sync-start last-sync-end)) -;; (sync-stale (> start-time (+ last-sync-start sync-stale-seconds))) -;; (will-sync (and (not *time-to-exit*) ;; do not start a sync if we are in the process of exiting -;; (or need-sync should-sync) -;; (or sync-done sync-stale) -;; (not sync-in-progress) -;; (not recently-synced)))) -;; (debug:print-info 13 *default-log-port* "WD writable-watchdog top of loop. need-sync="need-sync" sync-in-progress=" sync-in-progress -;; " should-sync="should-sync" start-time="start-time" mt-mod-time="mt-mod-time" recently-synced="recently-synced" will-sync="will-sync -;; " sync-done=" sync-done " sync-period=" sync-period) -;; (if (and (> sync-period 5) -;; (common:low-noise-print 30 "sync-period")) -;; (debug:print-info 0 *default-log-port* "Increased sync period due to long sync times, sync took: " sync-period " seconds.")) -;; ;; (if recently-synced (debug:print-info 0 *default-log-port* "Skipping sync due to recently-synced flag=" recently-synced)) -;; ;; (debug:print-info 0 *default-log-port* "need-sync: " need-sync " sync-in-progress: " sync-in-progress " should-sync: " should-sync " will-sync: " will-sync) -;; (if will-sync (set! *db-sync-in-progress* #t)) -;; (mutex-unlock! *db-multi-sync-mutex*) -;; (if will-sync -;; (let (;; (max-sync-duration (configf:lookup-number *configdat* "server" "max-sync-duration")) ;; KEEPING THIS AVAILABLE BUT SHOULD NOT USE, I'M PRETTY SURE IT DOES NOT WORK! -;; (sync-start (current-milliseconds))) -;; (with-output-to-file start-file (lambda ()(print (current-process-id)))) -;; -;; ;; put lock here -;; -;; ;; (if (or (not max-sync-duration) -;; ;; (< sync-duration max-sync-duration)) ;; NOTE: db:sync-to-megatest.db keeps track of time of last sync and syncs incrementally -;; (let ((res (db:sync-to-megatest.db dbstruct no-sync-db: no-sync-db))) ;; did we sync any data? If so need to set the db touched flag to keep the server alive -;; (set! sync-duration (- (current-milliseconds) sync-start)) -;; (if (> res 0) ;; some records were transferred, keep the db alive -;; (begin -;; (mutex-lock! *heartbeat-mutex*) -;; (set! *db-last-access* (current-seconds)) -;; (mutex-unlock! *heartbeat-mutex*) -;; (debug:print-info 0 *default-log-port* "sync called, " res " records transferred.")) -;; (debug:print-info 2 *default-log-port* "sync called but zero records transferred"))))) -;; ;; ;; TODO: factor this next routine out into a function -;; ;; (with-input-from-pipe ;; this should not block other threads but need to verify this -;; ;; (conc "megatest -sync-to-megatest.db -m testsuite:" (common:get-area-name) ":" *toppath*) -;; ;; (lambda () -;; ;; (let loop ((inl (read-line)) -;; ;; (res #f)) -;; ;; (if (eof-object? inl) -;; ;; (begin -;; ;; (set! sync-duration (- (current-milliseconds) sync-start)) -;; ;; (cond -;; ;; ((not res) -;; ;; (debug:print 0 *default-log-port* "ERROR: sync from /tmp db to megatest.db appears to have failed. Recommended that you stop your runs and run \"megatest -cleanup-db\"")) -;; ;; ((> res 0) -;; ;; (mutex-lock! *heartbeat-mutex*) -;; ;; (set! *db-last-access* (current-seconds)) -;; ;; (mutex-unlock! *heartbeat-mutex*)))) -;; ;; (let ((num-synced (let ((matches (string-match "^Synced (\\d+).*$" inl))) -;; ;; (if matches -;; ;; (string->number (cadr matches)) -;; ;; #f)))) -;; ;; (loop (read-line) -;; ;; (or num-synced res)))))))))) -;; (if will-sync -;; (begin -;; (mutex-lock! *db-multi-sync-mutex*) -;; (set! *db-sync-in-progress* #f) -;; (set! *db-last-sync* start-time) -;; (with-output-to-file end-file (lambda ()(print (current-process-id)))) -;; -;; ;; release lock here -;; -;; (mutex-unlock! *db-multi-sync-mutex*))) -;; (if (and debug-mode -;; (> (- start-time last-time) 60)) -;; (begin -;; (set! last-time start-time) -;; (debug:print-info 4 *default-log-port* "timestamp -> " (seconds->time-string (current-seconds)) ", time since start -> " (seconds->hr-min-sec (- (current-seconds) *time-zero*)))))) -;; -;; ;; keep going unless time to exit -;; ;; -;; (if (not *time-to-exit*) -;; (let delay-loop ((count 0)) -;; ;;(debug:print-info 13 *default-log-port* "delay-loop top; count="count" pid="(current-process-id)" this-wd-num="this-wd-num" *time-to-exit*="*time-to-exit*) -;; -;; (if (and (not *time-to-exit*) -;; (< count 6)) ;; was 11, changing to 4. -;; (begin -;; (thread-sleep! 1) -;; (delay-loop (+ count 1)))) -;; (if (not *time-to-exit*) (loop)))) -;; ;; time to exit, close the no-sync db here -;; (db:no-sync-close-db no-sync-db) -;; (if (common:low-noise-print 30) -;; (debug:print-info 0 *default-log-port* "Exiting watchdog timer, *time-to-exit* = " *time-to-exit*" pid="(current-process-id) ))))))) ;;" this-wd-num="this-wd-num))))))) -;;