Index: Makefile ================================================================== --- Makefile +++ Makefile @@ -195,12 +195,13 @@ vg.o dashboard.o : vg_records.scm megatest-version.scm dcommon.o : mofiles/dcommonmod.o run_records.scm -mofiles/stml2.o : mofiles/cookie.o -mofiles/dbmod.o : mofiles/ods.o +mofiles/stml2.o : mofiles/cookie.o +mofiles/dbmod.o : mofiles/ods.o +mofiles/rmtmod.o : mofiles/dbmod.o # # special include based modules # mofiles/pkts.o : pkts/pkts.scm # mofiles/stml2.o : cookie.o # # mofiles/mtargs.o : mtargs/mtargs.scm Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -54,10 +54,13 @@ (declare (uses dbmod)) (import dbmod) (declare (uses configfmod)) (import configfmod) + +(declare (uses servermod)) +(import servermod) (include "common_records.scm") ;;====================================================================== ;; (require-library margs) Index: dashboard.scm ================================================================== --- dashboard.scm +++ dashboard.scm @@ -67,10 +67,13 @@ ;; (import ods) ;; (declare (uses dbmod)) (import dbmod) ;; (declare (uses dbmod.import)) + +(declare (uses servermod)) +(import servermod) (include "common_records.scm") (include "db_records.scm") (include "run_records.scm") (include "task_records.scm") Index: dbmod.scm ================================================================== --- dbmod.scm +++ dbmod.scm @@ -755,29 +755,29 @@ ;; if last-update specified ("field-name" . time-in-seconds) ;; then sync only records where field-name >= time-in-seconds ;; IFF field-name exists ;; (define (db:sync-tables tbls last-update fromdb todb . slave-dbs) - (handle-exceptions - exn - (begin - (debug:print 0 *default-log-port* "EXCEPTION: database probably overloaded or unreadable in db:sync-tables.") - (print-call-chain (current-error-port)) - (debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn)) - (debug:print 5 *default-log-port* "exn=" (condition->list exn)) - (debug:print 0 *default-log-port* " status: " ((condition-property-accessor 'sqlite3 'status) exn)) - (debug:print 0 *default-log-port* " src db: " (db:dbdat-get-path fromdb)) - (for-each (lambda (dbdat) - (let ((dbpath (db:dbdat-get-path dbdat))) - (debug:print 0 *default-log-port* " dbpath: " dbpath) - (if (not (db:repair-db dbdat)) - (begin - (debug:print-error 0 *default-log-port* "Failed to rebuild " dbpath ", exiting now.") - (exit))))) - (cons todb slave-dbs)) - - 0) + ;;(handle-exceptions + ;;exn + ;;(begin + ;; (debug:print 0 *default-log-port* "EXCEPTION: database probably overloaded or unreadable in db:sync-tables.") + ;; (print-call-chain (current-error-port)) + ;; (debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn)) + ;; (debug:print 5 *default-log-port* "exn=" (condition->list exn)) + ;; (debug:print 0 *default-log-port* " status: " ((condition-property-accessor 'sqlite3 'status) exn)) + ;; (debug:print 0 *default-log-port* " src db: " (db:dbdat-get-path fromdb)) + ;; (for-each (lambda (dbdat) + ;; (let ((dbpath (db:dbdat-get-path dbdat))) + ;; (debug:print 0 *default-log-port* " dbpath: " dbpath) + ;; (if (not (db:repair-db dbdat)) + ;; (begin + ;; (debug:print-error 0 *default-log-port* "Failed to rebuild " dbpath ", exiting now.") + ;; (exit))))) + ;; (cons todb slave-dbs)) + ;; + ;; 0) ;; this is the work to be done (cond ((not fromdb) (debug:print 3 *default-log-port* "WARNING: db:sync-tables called with fromdb missing") -1) ((not todb) (debug:print 3 *default-log-port* "WARNING: db:sync-tables called with todb missing") @@ -949,11 +949,11 @@ (count (cdr dat))) (set! tot-count (+ tot-count count)) (if (> count 0) (if should-print (debug:print 0 *default-log-port* (format #f " ~10a ~5a" tblname count)))))) (sort (hash-table->alist numrecs)(lambda (a b)(> (cdr a)(cdr b)))))) - tot-count))))) + tot-count)))) (define db:trigger-list (list (list "update_runs_trigger" "CREATE TRIGGER IF NOT EXISTS update_runs_trigger AFTER UPDATE ON runs FOR EACH ROW BEGIN @@ -1991,14 +1991,14 @@ (sqlite3:with-transaction db (lambda () (handle-exceptions exn - (let ((lock-time (current-seconds))) - (debug:print-info 2 *default-log-port* "db:no-sync-get-lock keyname=" keyname ", lock-time=" lock-time ", exn=" exn) - (sqlite3:execute db "INSERT INTO no_sync_metadat (var,val) VALUES(?,?);" keyname lock-time) - `(#t . ,lock-time)) + (let ((lock-time (current-seconds))) + (debug:print-info 2 *default-log-port* "db:no-sync-get-lock keyname=" keyname ", lock-time=" lock-time ", exn=" exn) + (sqlite3:execute db "INSERT INTO no_sync_metadat (var,val) VALUES(?,?);" keyname lock-time) + `(#t . ,lock-time)) `(#f . ,(sqlite3:first-result db "SELECT val FROM no_sync_metadat WHERE var=?;" keyname))))))) ;; use a global for some primitive caching, it is just silly to ;; re-read the db over and over again for the keys since they never ;; change @@ -4119,11 +4119,11 @@ (loop (car tal)(cdr tal)))))))))) ;; Function recursively checks if .journal exists; if yes means db busy; call itself after delayed interval ;; return the sqlite3 db handle if possible ;; -(define (db:delay-if-busy dbdat #!key (count 6)) +#;(define (db:delay-if-busy dbdat #!key (count 6)) (if (not (configf:lookup *configdat* "server" "delay-on-busy")) (and dbdat (db:dbdat-get-db dbdat)) (if dbdat (let* ((dbpath (db:dbdat-get-path dbdat)) (db (db:dbdat-get-db dbdat)) ;; we'll return this so (db:delay--if-busy can be called inline @@ -4629,9 +4629,17 @@ (process-run fullcmd) (if prev-nbfake-log (setenv "NBFAKE_LOG" prev-nbfake-log) (unsetenv "NBFAKE_LOG")) )) ;; )) + +;; is dbdir writeable and or is dbdir/dbname writeable +;; +(define (db:writeable dbdir dbname) + (let* ((dbfile (conc dbdir "/" dbname))) + (if (file-exists? dbfile) + (file-write-access? dbfile) + (file-write-access? *toppath*)))) ;;======================================================================the end ) Index: dcommon.scm ================================================================== --- dcommon.scm +++ dcommon.scm @@ -39,10 +39,13 @@ (declare (uses configfmod)) (import configfmod) (declare (uses dcommonmod)) (import dcommonmod) + +(declare (uses servermod)) +(import servermod) ;; (declare (uses synchash)) (include "megatest-version.scm") (include "common_records.scm") Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -37,10 +37,13 @@ (declare (uses dbmod)) (import dbmod) (declare (uses configfmod)) (import configfmod) + +(declare (uses servermod)) +(import servermod) (include "common_records.scm") ;; (declare (uses rmtmod)) ;; (import rmtmod) @@ -104,11 +107,22 @@ ;;DOT node [shape="box"]; ;;DOT "rmt:send-receive" -> MUTEXLOCK; ;;DOT { edge [style=invis];"case 1" -> "case 2" -> "case 3" -> "case 4" -> "case 5" -> "case 6" -> "case 7" -> "case 8" -> "case 9" -> "case 10" -> "case 11"; } ;; do all the prep locked under the rmt-mutex (mutex-lock! *rmt-mutex*) - + + ;; set up runremote record earlier than the loop below + (if (not area-dat) ;; can remove this one. should never get here. + (begin + (set! *runremote* (create-remote-record)) + (let* ((server-info (remote-server-info *runremote*))) + (if server-info + (begin + (remote-server-url-set! *runremote* (server:record->url server-info)) + (remote-server-id-set! *runremote* (server:record->id server-info))))) + (set! area-dat *runremote*))) ;; new runremote will come from this on next iteration + ;; 1. check if server is started IFF cmd is a write OR if we are not on the homehost, store in runremote ;; 2. check the age of the connections. refresh the connection if it is older than timeout-20 seconds. ;; 3. do the query, if on homehost use local access ;; (let* ((start-time (current-seconds)) ;; snapshot time so all use cases get same value @@ -120,20 +134,10 @@ ;; DOT INIT_RUNREMOTE; // leaving off - doesn't really add to the clarity ;; DOT MUTEXLOCK -> INIT_RUNREMOTE [label="no remote?"]; ;; DOT INIT_RUNREMOTE -> MUTEXLOCK; ;; ensure we have a record for our connection for given area - (if (not runremote) ;; can remove this one. should never get here. - (begin - (set! *runremote* (create-remote-record)) - (let* ((server-info (remote-server-info *runremote*))) - (if server-info - (begin - (remote-server-url-set! *runremote* (server:record->url server-info)) - (remote-server-id-set! *runremote* (server:record->id server-info))))) - (set! runremote *runremote*))) ;; new runremote will come from this on next iteration - ;; DOT SET_HOMEHOST; // leaving off - doesn't really add to the clarity ;; DOT MUTEXLOCK -> SET_HOMEHOST [label="no homehost?"]; ;; DOT SET_HOMEHOST -> MUTEXLOCK; ;; ensure we have a homehost record (if (not (pair? (remote-hh-dat runremote))) ;; not on homehost Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -19,81 +19,34 @@ ;;====================================================================== (declare (unit rmtmod)) (declare (uses commonmod)) (declare (uses apimod)) -;; (declare (uses apimod.import)) -;; (declare (uses ulex)) - -;; (include "ulex/ulex.scm") +(declare (uses dbmod)) (module rmtmod * (import scheme chicken data-structures extras) (import (prefix sqlite3 sqlite3:) posix typed-records srfi-18) (import commonmod) +(import dbmod) (import apimod) -;; (import (prefix ulex ulex:)) (defstruct alldat (areapath #f) (ulexdat #f) ) - (define (rmtmod:calc-ro-mode runremote *toppath*) (if (and runremote (remote-ro-mode-checked runremote)) (remote-ro-mode runremote) - (let* ((dbfile (conc *toppath* "/megatest.db")) - (ro-mode (not (file-write-access? dbfile)))) ;; TODO: use dbstruct or runremote to figure this out in future + (let* ((ro-mode (not (db:writeable *toppath* "megatest.db")))) (if runremote (begin (remote-ro-mode-set! runremote ro-mode) (remote-ro-mode-checked-set! runremote #t) ro-mode) ro-mode)))) - -;; return the handle struct for sending queries to a specific database -;; - initializes the connection object if this is the first access -;; - finds the "captain" and asks who to talk to for the given dbfname -;; - establishes the connection to the current dbowner -;; -#;(define (rmt:connect alldat dbfname dbtype) - (let* ((ulexdat (or (alldat-ulexdat alldat) - (rmt:setup-ulex alldat)))) - (ulex:connect ulexdat dbfname dbtype))) - -;; setup the remote calls -#;(define (rmt:setup-ulex alldat) - (let* ((udata (ulex:setup))) ;; establish connection to ulex - (alldat-ulexdat-set! alldat udata) - ;; register all needed procs - (ulex:register-handler udata 'ping cmod:get-full-version) ;; override ping with get-full-version - (ulex:register-handler udata 'login cmod:get-full-version) ;; force setup of the connection - (ulex:register-handler udata 'execute api:execute-requests) - udata)) - -;; set up a connection to the current owner of the dbfile associated with rid -;; then send the query to that dbfile owner and wait for a response. -;; -#;(define (rmt:send-receive alldat cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected - (let* (;; (alldat *alldat*) - (areapath (alldat-areapath alldat)) - (dbtype (if (or (not rid)(< rid 1)) ;; this is the criteria for "main.db" - "main" "runs")) - (dbfname (if (equal? dbtype "main") - "main.db" - (conc rid ".db"))) - (dbfile (conc areapath "/.db/" dbfname)) - (ulexconn (rmt:connect alldat dbfname dbtype)) ;; ulexconn is our new *runremote*, it is a dbowner struct < pdat lastrefresh > - (udata (alldat-ulexdat alldat))) - (ulex:remote-request udata ulexconn 'immediate dbfile 'execute rid params))) - ;; need to call this on the other side - ;; (api:execute-requests dbstruct-local (vector (symbol->string cmd) params)))) - - #;(with-input-from-string - (ulex:remote-request udata ulexconn 'immediate dbfile 'execute rid (with-output-to-string (lambda ()(serialize params)))) - (lambda ()(deserialize))) ) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -40,19 +40,17 @@ (declare (uses dbmod)) (import dbmod) (declare (uses configfmod)) (import configfmod) + +(declare (uses servermod)) +(import servermod) (include "common_records.scm") (include "db_records.scm") -(define (server:make-server-url hostport) - (if (not hostport) - #f - (conc "http://" (car hostport) ":" (cadr hostport)))) - (define *server-loop-heart-beat* (current-seconds)) ;;====================================================================== ;; P K T S S T U F F ;;====================================================================== @@ -81,49 +79,10 @@ ((http)(http-transport:launch)) ;;((nmsg)(nmsg-transport:launch run-id)) ;;((rpc) (rpc-transport:launch run-id)) (else (debug:print-error 0 *default-log-port* "unknown server type " transport-type)))) -;;====================================================================== -;; S E R V E R U T I L I T I E S -;;====================================================================== - -;; Get the transport -(define (server:get-transport) - (if *transport-type* - *transport-type* - (let ((ttype (string->symbol - (or (args:get-arg "-transport") - (configf:lookup *configdat* "server" "transport") - "rpc")))) - (set! *transport-type* ttype) - ttype))) - -;; Generate a unique signature for this server -(define (server:mk-signature) - (message-digest-string (md5-primitive) - (with-output-to-string - (lambda () - (write (list (current-directory) - (current-process-id) - (argv))))))) - -;; When using zmq this would send the message back (two step process) -;; with spiffy or rpc this simply returns the return data to be returned -;; -(define (server:reply return-addr query-sig success/fail result) - (debug:print-info 11 *default-log-port* "server:reply return-addr=" return-addr ", result=" result) - ;; (send-message pubsock target send-more: #t) - ;; (send-message pubsock - (case (server:get-transport) - ((rpc) (db:obj->string (vector success/fail query-sig result))) - ((http) (db:obj->string (vector success/fail query-sig result))) - ((fs) result) - (else - (debug:print-error 0 *default-log-port* "unrecognised transport type: " *transport-type*) - result))) - ;; Given a run id start a server process ### NOTE ### > file 2>&1 ;; if the run-id is zero and the target-host is set ;; try running on that host ;; incidental: rotate logs in logs/ dir. ;; @@ -172,220 +131,59 @@ (unsetenv "TARGETHOST_LOGF") (if (get-environment-variable "TARGETHOST")(unsetenv "TARGETHOST")) (thread-join! log-rotate) (pop-directory))) -;; given a path to a server log return: host port startseconds -;; any changes to number of elements returned by this fuction will dirctly affect server:record->url,server:record->id,server:kill,server:get-num-alive which uses match let - -(define (server:logf-get-start-info logf) - (let ((rx (regexp "^SERVER STARTED: (\\S+):(\\d+) AT ([\\d\\.]+) server-id: (\\S+)"))) ;; SERVER STARTED: host:port AT timesecs server id - (handle-exceptions - exn - (begin - (debug:print-info 0 *default-log-port* "Unable to get server info from " logf ", exn=" exn) - (list #f #f #f #f)) ;; no idea what went wrong, call it a bad server - (with-input-from-file - logf - (lambda () - (let loop ((inl (read-line)) - (lnum 0)) - (if (not (eof-object? inl)) - (let ((mlst (string-match rx inl))) - (if (not mlst) - (if (< lnum 500) ;; give up if more than 500 lines of server log read - (loop (read-line)(+ lnum 1)) - (begin - (debug:print-info 0 *default-log-port* "Unable to get server info from first 500 lines of " logf ) - (list #f #f #f #f))) - (let ((dat (cdr mlst))) - (list (car dat) ;; host - (string->number (cadr dat)) ;; port - (string->number (caddr dat)) - (cadr (cddr dat)))))) - (begin - (debug:print-info 0 *default-log-port* "Unable to get server info from " logf " at " (current-seconds)) - (list #f #f #f #f))))))))) - -;; get a list of servers with all relevant data -;; ( mod-time host port start-time pid ) -;; -(define (server:get-list areapath #!key (limit #f)) - (let ((fname-rx (regexp "^(|.*/)server-(\\d+)-(\\S+).log$")) - (day-seconds (* 24 60 60))) - ;; if the directory exists continue to get the list - ;; otherwise attempt to create the logs dir and then - ;; continue - (if (if (directory-exists? (conc areapath "/logs")) - '() - (if (file-write-access? areapath) - (begin - (condition-case - (create-directory (conc areapath "/logs") #t) - (exn (i/o file)(debug:print 0 *default-log-port* "ERROR: Cannot create directory at " (conc areapath "/logs"))) - (exn ()(debug:print 0 *default-log-port* "ERROR: Unknown error attemtping to get server list. exn=" exn))) - (directory-exists? (conc areapath "/logs"))) - '())) - (let* ((server-logs (glob (conc areapath "/logs/server-*.log"))) - (num-serv-logs (length server-logs))) - (if (null? server-logs) - '() - (let loop ((hed (car server-logs)) - (tal (cdr server-logs)) - (res '())) - (let* ((mod-time (handle-exceptions - exn - (begin - (debug:print 0 *default-log-port* "failed to get modification time on " hed ", exn=" exn) - (current-seconds)) ;; 0 - (file-modification-time hed))) ;; default to *very* old so log gets ignored if deleted - (down-time (- (current-seconds) mod-time)) - (serv-dat (if (or (< num-serv-logs 10) - (< down-time 900)) ;; day-seconds)) - (server:logf-get-start-info hed) - '())) ;; don't waste time processing server files not touched in the 15 minutes if there are more than ten servers to look at - (serv-rec (cons mod-time serv-dat)) - (fmatch (string-match fname-rx hed)) - (pid (if fmatch (string->number (list-ref fmatch 2)) #f)) - (new-res (if (null? serv-dat) - res - (cons (append serv-rec (list pid)) res)))) ;; any changes to number of elements in new-res will dirctly affect server:record->url,server:record->id,server:kill,server:get-num-alive which uses match let - (if (null? tal) - (if (and limit - (> (length new-res) limit)) - new-res ;; (take new-res limit) <= need intelligent sorting before this will work - new-res) - (loop (car tal)(cdr tal) new-res))))))))) - -(define (server:get-num-alive srvlst) - (let ((num-alive 0)) - (for-each - (lambda (server) - (handle-exceptions - exn - (begin - (debug:print-info 0 *default-log-port* "Unable to get server start-time and/or mod-time from " server ", exn=" exn)) - (match-let (((mod-time host port start-time server-id pid) - server)) - (let* ((uptime (- (current-seconds) mod-time)) - (runtime (if start-time - (- mod-time start-time) - 0))) - (if (< uptime 5)(set! num-alive (+ num-alive 1))))))) - srvlst) - num-alive)) - -;; given a list of servers get a list of valid servers, i.e. at least -;; 10 seconds old, has started and is less than 1 hour old and is -;; active (i.e. mod-time < 10 seconds -;; -;; mod-time host port start-time pid -;; -;; sort by start-time descending. I.e. get the oldest first. Young servers will thus drop off -;; and servers should stick around for about two hours or so. -;; -(define (server:get-best srvlst) - (let* ((nums (server:get-num-servers)) - (now (current-seconds)) - (slst (sort - (filter (lambda (rec) - (if (and (list? rec) - (> (length rec) 2)) - (let ((start-time (list-ref rec 3)) - (mod-time (list-ref rec 0))) - ;; (print "start-time: " start-time " mod-time: " mod-time) - (and start-time mod-time - (> (- now start-time) 0) ;; been running at least 0 seconds - (< (- now mod-time) 16) ;; still alive - file touched in last 16 seconds - (or (not (configf:lookup *configdat* "server" "runtime")) ;; skip if not set - (< (- now start-time) - (+ (- (string->number (configf:lookup *configdat* "server" "runtime")) - 180) - (random 360)))) ;; under one hour running time +/- 180 - )) - #f)) - srvlst) - (lambda (a b) - (< (list-ref a 3) - (list-ref b 3)))))) - (if (> (length slst) nums) - (take slst nums) - slst))) - -(define (server:get-first-best areapath) - (let ((srvrs (server:get-best (server:get-list areapath)))) - (if (and srvrs - (not (null? srvrs))) - (car srvrs) - #f))) - -(define (server:get-rand-best areapath) - (let ((srvrs (server:get-best (server:get-list areapath)))) - (if (and (list? srvrs) - (not (null? srvrs))) - (let* ((len (length srvrs)) - (idx (random len))) - (list-ref srvrs idx)) - #f))) - -(define (server:record->id servr) - (handle-exceptions - exn - (begin - (debug:print-info 0 *default-log-port* "Unable to get server id from " servr ", exn=" exn) - #f) - (match-let (((mod-time host port start-time server-id pid) - servr)) - (if server-id - server-id - #f)))) - -(define (server:record->url servr) - (handle-exceptions - exn - (begin - (debug:print-info 0 *default-log-port* "Unable to get server url from " servr ", exn=" exn) - #f) - (match-let (((mod-time host port start-time server-id pid) - servr)) - (if (and host port) - (conc host ":" port) - #f)))) - -(define (server:get-client-signature) ;; BB> why is this proc named "get-"? it returns nothing -- set! has not return value. - (if *my-client-signature* *my-client-signature* - (let ((sig (server:mk-signature))) - (set! *my-client-signature* sig) - *my-client-signature*))) - -;; wait for server=start-last to be three seconds old -;; -(define (server:wait-for-server-start-last-flag areapath) - (let* ((start-flag (conc areapath "/logs/server-start-last")) - ;;; THIS INTERACTS WITH [server] timeout. Suggest using 0.1 or above for timeout (6 seconds) - (reftime (configf:lookup-number *configdat* "server" "idletime" default: 4)) - (server-key (conc (get-host-name) "-" (current-process-id)))) - (if (file-exists? start-flag) - (let* ((fmodtime (file-modification-time start-flag)) - (delta (- (current-seconds) fmodtime)) - (all-go (> delta reftime))) - (if (and all-go - (begin - (with-output-to-file start-flag - (lambda () - (print server-key))) - (thread-sleep! 0.25) - (let ((res (with-input-from-file start-flag - (lambda () - (read-line))))) - (equal? server-key res)))) - #t ;; (system (conc "touch " start-flag)) ;; lazy but safe - (begin - (debug:print-info 0 *default-log-port* "Gating server start, last start: " - fmodtime ", delta: " delta ", reftime: " reftime ", all-go=" all-go) - (thread-sleep! reftime) - (server:wait-for-server-start-last-flag areapath))))))) + + + + + + + + + + +(define (server:ping host-port-in server-id #!key (do-exit #f)) + (let ((host:port (if (not host-port-in) ;; use read-dotserver to find + #f ;; (server:check-if-running *toppath*) + ;; (if (number? host-port-in) ;; we were handed a server-id + ;; (let ((srec (tasks:get-server-by-id (db:delay-if-busy (tasks:open-db)) host-port-in))) + ;; ;; (print "srec: " srec " host-port-in: " host-port-in) + ;; (if srec + ;; (conc (vector-ref srec 3) ":" (vector-ref srec 4)) + ;; (conc "no such server-id " host-port-in))) + host-port-in))) ;; ) + (let* ((host-port (if host:port + (let ((slst (string-split host:port ":"))) + (if (eq? (length slst) 2) + (list (car slst)(string->number (cadr slst))) + #f)) + #f))) +;; (toppath (launch:setup))) + ;; (print "host-port=" host-port) + (if (not host-port) + (begin + (if host-port-in + (debug:print 0 *default-log-port* "ERROR: bad host:port")) + (if do-exit (exit 1)) + #f) + (let* ((iface (car host-port)) + (port (cadr host-port)) + (server-dat (http-transport:client-connect iface port server-id)) + (login-res (rmt:login-no-auto-client-setup server-dat))) + (if (and (list? login-res) + (car login-res)) + (begin + ;; (print "LOGIN_OK") + (if do-exit (exit 0)) + #t) + (begin + ;; (print "LOGIN_FAILED") + (if do-exit (exit 1)) + #f))))))) + ;; kind start up of servers, wait 40 seconds before allowing another server for a given ;; run-id to be launched ;; (define (server:kind-run areapath) @@ -427,16 +225,10 @@ (server:kind-run areapath)) (thread-sleep! 5) (loop (server:check-if-running areapath) (+ try-num 1))))))) -(define server:try-running server:run) ;; there is no more per-run servers ;; REMOVE ME. BUG. - -(define (server:get-num-servers #!key (numservers 2)) - (let ((ns (string->number - (or (configf:lookup *configdat* "server" "numservers") "notanumber")))) - (or ns numservers))) ;; no longer care if multiple servers are started by accident. older servers will drop off in time. ;; (define (server:check-if-running areapath) ;; #!key (numservers "2")) (let* ((ns (server:get-num-servers)) @@ -467,226 +259,26 @@ ))) (if res server-url #f))) -(define (server:kill servr) - (handle-exceptions - exn - (begin - (debug:print-info 0 *default-log-port* "Unable to get host and/or port from " servr ", exn=" exn) - #f) - (match-let (((mod-time hostname port start-time server-id pid) - servr)) - (tasks:kill-server hostname pid)))) - -;; called in megatest.scm, host-port is string hostname:port -;; -;; NOTE: This is NOT called directly from clients as not all transports support a client running -;; in the same process as the server. -;; -(define (server:ping host-port-in server-id #!key (do-exit #f)) - (let ((host:port (if (not host-port-in) ;; use read-dotserver to find - #f ;; (server:check-if-running *toppath*) - ;; (if (number? host-port-in) ;; we were handed a server-id - ;; (let ((srec (tasks:get-server-by-id (db:delay-if-busy (tasks:open-db)) host-port-in))) - ;; ;; (print "srec: " srec " host-port-in: " host-port-in) - ;; (if srec - ;; (conc (vector-ref srec 3) ":" (vector-ref srec 4)) - ;; (conc "no such server-id " host-port-in))) - host-port-in))) ;; ) - (let* ((host-port (if host:port - (let ((slst (string-split host:port ":"))) - (if (eq? (length slst) 2) - (list (car slst)(string->number (cadr slst))) - #f)) - #f))) -;; (toppath (launch:setup))) - ;; (print "host-port=" host-port) - (if (not host-port) - (begin - (if host-port-in - (debug:print 0 *default-log-port* "ERROR: bad host:port")) - (if do-exit (exit 1)) - #f) - (let* ((iface (car host-port)) - (port (cadr host-port)) - (server-dat (http-transport:client-connect iface port server-id)) - (login-res (rmt:login-no-auto-client-setup server-dat))) - (if (and (list? login-res) - (car login-res)) - (begin - ;; (print "LOGIN_OK") - (if do-exit (exit 0)) - #t) - (begin - ;; (print "LOGIN_FAILED") - (if do-exit (exit 1)) - #f))))))) - -;; run ping in separate process, safest way in some cases -;; -(define (server:ping-server ifaceport) - (with-input-from-pipe - (conc (common:get-megatest-exe) " -ping " ifaceport) - (lambda () - (let loop ((inl (read-line)) - (res "NOREPLY")) - (if (eof-object? inl) - (case (string->symbol res) - ((NOREPLY) #f) - ((LOGIN_OK) #t) - (else #f)) - (loop (read-line) inl)))))) - -;; NOT USED (well, ok, reference in rpc-transport but otherwise not used). -;; -(define (server:login toppath) - (lambda (toppath) - (set! *db-last-access* (current-seconds)) ;; might not be needed. - (if (equal? *toppath* toppath) - #t - #f))) - -;; timeout is hms string: 1h 5m 3s, default is 1 minute -;; -(define (server:expiration-timeout) - (let ((tmo (configf:lookup *configdat* "server" "timeout"))) - (if (and (string? tmo) - (common:hms-string->seconds tmo)) ;; BUG: hms-string->seconds is broken, if given "10" returns 0. Also, it doesn't belong in this logic unless the string->number is changed below - (* 3600 (string->number tmo)) - 60))) - -(define (server:get-best-guess-address hostname) - (let ((res #f)) - (for-each - (lambda (adr) - (if (not (eq? (u8vector-ref adr 0) 127)) - (set! res adr))) - ;; NOTE: This can fail when there is no mention of the host in /etc/hosts. FIXME - (vector->list (hostinfo-addresses (hostname->hostinfo hostname)))) - (string-intersperse - (map number->string - (u8vector->list - (if res res (hostname->ip hostname)))) "."))) - -;; (define server:sync-lock-token "SERVER_SYNC_LOCK") -;; (define (server:release-sync-lock) -;; (db:no-sync-del! *no-sync-db* server:sync-lock-token)) -;; (define (server:have-sync-lock?) -;; (let* ((have-lock-pair (db:no-sync-get-lock *no-sync-db* server:sync-lock-token)) -;; (have-lock? (car have-lock-pair)) -;; (lock-time (cdr have-lock-pair)) -;; (lock-age (- (current-seconds) lock-time))) -;; (cond -;; (have-lock? #t) -;; ((>lock-age -;; (* 3 (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 180))) -;; (server:release-sync-lock) -;; (server:have-sync-lock?)) -;; (else #f)))) - -;; moving this here as it needs access to db and cannot be in common. -;; - -(define (server:get-bruteforce-syncer dbstruct #!key (fork-to-background #f) (persist-until-sync #f)) - (let* ((sqlite-exe (or (get-environment-variable "MT_SQLITE3_EXE"))) ;; defined in cfg.sh - (sync-log (or (args:get-arg "-sync-log") (conc *toppath* "/logs/sync-" (current-process-id) "-" (get-host-name) ".log"))) - (tmp-area (common:get-db-tmp-area)) - (tmp-db (conc tmp-area "/megatest.db")) - (staging-file (conc *toppath* "/.megatest.db")) - (mtdbfile (conc *toppath* "/megatest.db")) - (lockfile (common:get-sync-lock-filepath)) - (sync-cmd-core (conc sqlite-exe" " tmp-db " .dump | "sqlite-exe" " staging-file "&>"sync-log)) - (sync-cmd (if fork-to-background - (conc "/usr/bin/env NBFAKE_LOG="*toppath*"/logs/last-server-sync-"(current-process-id)".log nbfake \""sync-cmd-core" && /bin/mv -f " staging-file " " mtdbfile" \"") - sync-cmd-core)) - (default-min-intersync-delay 2) - (min-intersync-delay (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: default-min-intersync-delay)) - (default-duty-cycle 0.1) - (duty-cycle (configf:lookup-number *configdat* "server" "sync-duty-cycle" default: default-duty-cycle)) - (last-sync-seconds 10) ;; we will adjust this to a measurement and delay last-sync-seconds * (1 - duty-cycle) - (calculate-off-time (lambda (work-duration duty-cycle) - (* (/ (- 1 duty-cycle) duty-cycle) last-sync-seconds))) - (off-time min-intersync-delay) ;; adjusted in closure below. - (do-a-sync - (lambda () - (BB> "Start do-a-sync with fork-to-background="fork-to-background" persist-until-sync="persist-until-sync) - (let* ((finalres - (let retry-loop ((num-tries 0)) - (if (common:simple-file-lock lockfile) - (begin - (cond - ((not (or fork-to-background persist-until-sync)) - (debug:print 0 *default-log-port* "INFO: syncer thread sleeping for max of (server.minimum-intersync-delay="min-intersync-delay - " , off-time="off-time" seconds ]") - (thread-sleep! (max off-time min-intersync-delay))) - (else - (debug:print 0 *default-log-port* "INFO: syncer thread NOT sleeping ; maybe time-to-exit..."))) - - (if (not (configf:lookup *configdat* "server" "disable-db-snapshot")) - (common:snapshot-file mtdbfile subdir: ".db-snapshot")) - (delete-file* staging-file) - (let* ((start-time (current-milliseconds)) - (res (system sync-cmd)) - (dbbackupfile (conc mtdbfile ".backup")) - (res2 - (cond - ((eq? 0 res ) - (handle-exceptions - exn - #f - (if (file-exists? dbbackupfile) - (delete-file* dbbackupfile) - ) - (if (eq? 0 (file-size sync-log)) - (delete-file* sync-log)) - (system (conc "/bin/mv " staging-file " " mtdbfile)) - - (set! last-sync-seconds (/ (- (current-milliseconds) start-time) 1000)) - (set! off-time (calculate-off-time - last-sync-seconds - (cond - ((and (number? duty-cycle) (> duty-cycle 0) (< duty-cycle 1)) - duty-cycle) - (else - (debug:print 0 *default-log-port* "WARNING: ["(common:human-time)"] server.sync-duty-cycle is invalid. Should be a number between 0 and 1, but "duty-cycle" was specified. Using default value: "default-duty-cycle) - default-duty-cycle)))) - - (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" SYNC took "last-sync-seconds" sec") - (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" SYNC took "last-sync-seconds" sec ; with duty-cycle of "duty-cycle" off time is now "off-time) - 'sync-completed)) - (else - (system (conc "/bin/cp "sync-log" "sync-log".fail")) - (debug:print 0 *default-log-port* "ERROR: ["(common:human-time)"] Sync failed. See log at "sync-log".fail") - (if (file-exists? (conc mtdbfile ".backup")) - (system (conc "/bin/cp "mtdbfile ".backup " mtdbfile))) - #f)))) - (common:simple-file-release-lock lockfile) - (BB> "released lockfile: " lockfile) - (when (common:file-exists? lockfile) - (BB> "DID NOT ACTUALLY RELEASE LOCKFILE")) - res2) ;; end let - );; end begin - ;; else - (cond - (persist-until-sync - (thread-sleep! 1) - (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" other SYNC in progress; we're in a fork-to-background so we need to succeed. Let's wait a jiffy and and try again. num-tries="num-tries" (waiting for lockfile="lockfile" to disappear)") - (retry-loop (add1 num-tries))) - (else - (thread-sleep! (max off-time (+ last-sync-seconds min-intersync-delay))) - (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" other SYNC in progress; not syncing.") - 'parallel-sync-in-progress)) - ) ;; end if got lockfile - ) - )) - (BB> "End do-a-sync with fork-to-background="fork-to-background" persist-until-sync="persist-until-sync" and result="finalres) - finalres) - ) ;; end lambda - )) - do-a-sync)) + +(define server:try-running server:run) ;; there is no more per-run servers ;; REMOVE ME. BUG. + + + + + + + + + + + + + + (define (server:writable-watchdog-bruteforce dbstruct) (thread-sleep! 1) ;; delay for startup (let* ((do-a-sync (server:get-bruteforce-syncer dbstruct)) (final-sync (server:get-bruteforce-syncer dbstruct fork-to-background: #t persist-until-sync: #t))) Index: servermod.scm ================================================================== --- servermod.scm +++ servermod.scm @@ -17,19 +17,467 @@ ;; along with Megatest. If not, see . ;;====================================================================== (declare (unit servermod)) +(declare (uses commonmod)) +(declare (uses configfmod)) +(declare (uses dbmod)) (module servermod * -(import scheme chicken data-structures extras) -(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18) +(import scheme chicken data-structures extras ports files) +(import (prefix sqlite3 sqlite3:) posix + typed-records srfi-18 srfi-1 srfi-69 srfi-4 + message-digest hostinfo + regex matchable + md5) + +(import commonmod) +(import configfmod) +(import dbmod) + +(define (server:make-server-url hostport) + (if (not hostport) + #f + (conc "http://" (car hostport) ":" (cadr hostport)))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +;; Get the transport +#;(define (server:get-transport) + (if *transport-type* + *transport-type* + (let ((ttype (string->symbol + (or (args:get-arg "-transport") + (configf:lookup *configdat* "server" "transport") + "rpc")))) + (set! *transport-type* ttype) + ttype))) + +;; Generate a unique signature for this server +(define (server:mk-signature) + (message-digest-string (md5-primitive) + (with-output-to-string + (lambda () + (write (list (current-directory) + (current-process-id) + (argv))))))) + +;; When using zmq this would send the message back (two step process) +;; with spiffy or rpc this simply returns the return data to be returned +;; +(define (server:reply return-addr query-sig success/fail result) + (debug:print-info 11 *default-log-port* "server:reply return-addr=" return-addr ", result=" result) + ;; (send-message pubsock target send-more: #t) + ;; (send-message pubsock + (db:obj->string (vector success/fail query-sig result))) +;; (case (server:get-transport) +;; ((rpc) (db:obj->string (vector success/fail query-sig result))) +;; ((http) (db:obj->string (vector success/fail query-sig result))) +;; ((fs) result) +;; (else +;; (debug:print-error 0 *default-log-port* "unrecognised transport type: " *transport-type*) +;; result))) + +;; given a path to a server log return: host port startseconds +;; any changes to number of elements returned by this fuction will dirctly affect server:record->url,server:record->id,server:kill,server:get-num-alive which uses match let +;; +(define (server:logf-get-start-info logf) + (let ((rx (regexp "^SERVER STARTED: (\\S+):(\\d+) AT ([\\d\\.]+) server-id: (\\S+)"))) ;; SERVER STARTED: host:port AT timesecs server id + (handle-exceptions + exn + (begin + (debug:print-info 0 *default-log-port* "Unable to get server info from " logf ", exn=" exn) + (list #f #f #f #f)) ;; no idea what went wrong, call it a bad server + (with-input-from-file + logf + (lambda () + (let loop ((inl (read-line)) + (lnum 0)) + (if (not (eof-object? inl)) + (let ((mlst (string-match rx inl))) + (if (not mlst) + (if (< lnum 500) ;; give up if more than 500 lines of server log read + (loop (read-line)(+ lnum 1)) + (begin + (debug:print-info 0 *default-log-port* "Unable to get server info from first 500 lines of " logf ) + (list #f #f #f #f))) + (let ((dat (cdr mlst))) + (list (car dat) ;; host + (string->number (cadr dat)) ;; port + (string->number (caddr dat)) + (cadr (cddr dat)))))) + (begin + (debug:print-info 0 *default-log-port* "Unable to get server info from " logf " at " (current-seconds)) + (list #f #f #f #f))))))))) + +;; get a list of servers with all relevant data +;; ( mod-time host port start-time pid ) +;; +(define (server:get-list areapath #!key (limit #f)) + (let ((fname-rx (regexp "^(|.*/)server-(\\d+)-(\\S+).log$")) + (day-seconds (* 24 60 60))) + ;; if the directory exists continue to get the list + ;; otherwise attempt to create the logs dir and then + ;; continue + (if (if (directory-exists? (conc areapath "/logs")) + '() + (if (file-write-access? areapath) + (begin + (condition-case + (create-directory (conc areapath "/logs") #t) + (exn (i/o file)(debug:print 0 *default-log-port* "ERROR: Cannot create directory at " (conc areapath "/logs"))) + (exn ()(debug:print 0 *default-log-port* "ERROR: Unknown error attemtping to get server list. exn=" exn))) + (directory-exists? (conc areapath "/logs"))) + '())) + (let* ((server-logs (glob (conc areapath "/logs/server-*.log"))) + (num-serv-logs (length server-logs))) + (if (null? server-logs) + '() + (let loop ((hed (car server-logs)) + (tal (cdr server-logs)) + (res '())) + (let* ((mod-time (handle-exceptions + exn + (begin + (debug:print 0 *default-log-port* "failed to get modification time on " hed ", exn=" exn) + (current-seconds)) ;; 0 + (file-modification-time hed))) ;; default to *very* old so log gets ignored if deleted + (down-time (- (current-seconds) mod-time)) + (serv-dat (if (or (< num-serv-logs 10) + (< down-time 900)) ;; day-seconds)) + (server:logf-get-start-info hed) + '())) ;; don't waste time processing server files not touched in the 15 minutes if there are more than ten servers to look at + (serv-rec (cons mod-time serv-dat)) + (fmatch (string-match fname-rx hed)) + (pid (if fmatch (string->number (list-ref fmatch 2)) #f)) + (new-res (if (null? serv-dat) + res + (cons (append serv-rec (list pid)) res)))) ;; any changes to number of elements in new-res will dirctly affect server:record->url,server:record->id,server:kill,server:get-num-alive which uses match let + (if (null? tal) + (if (and limit + (> (length new-res) limit)) + new-res ;; (take new-res limit) <= need intelligent sorting before this will work + new-res) + (loop (car tal)(cdr tal) new-res))))))))) + +(define (server:get-num-alive srvlst) + (let ((num-alive 0)) + (for-each + (lambda (server) + (handle-exceptions + exn + (begin + (debug:print-info 0 *default-log-port* "Unable to get server start-time and/or mod-time from " server ", exn=" exn)) + (match-let (((mod-time host port start-time server-id pid) + server)) + (let* ((uptime (- (current-seconds) mod-time)) + (runtime (if start-time + (- mod-time start-time) + 0))) + (if (< uptime 5)(set! num-alive (+ num-alive 1))))))) + srvlst) + num-alive)) + +;; given a list of servers get a list of valid servers, i.e. at least +;; 10 seconds old, has started and is less than 1 hour old and is +;; active (i.e. mod-time < 10 seconds +;; +;; mod-time host port start-time pid +;; +;; sort by start-time descending. I.e. get the oldest first. Young servers will thus drop off +;; and servers should stick around for about two hours or so. +;; +(define (server:get-best srvlst) + (let* ((nums (server:get-num-servers)) + (now (current-seconds)) + (slst (sort + (filter (lambda (rec) + (if (and (list? rec) + (> (length rec) 2)) + (let ((start-time (list-ref rec 3)) + (mod-time (list-ref rec 0))) + ;; (print "start-time: " start-time " mod-time: " mod-time) + (and start-time mod-time + (> (- now start-time) 0) ;; been running at least 0 seconds + (< (- now mod-time) 16) ;; still alive - file touched in last 16 seconds + (or (not (configf:lookup *configdat* "server" "runtime")) ;; skip if not set + (< (- now start-time) + (+ (- (string->number (configf:lookup *configdat* "server" "runtime")) + 180) + (random 360)))) ;; under one hour running time +/- 180 + )) + #f)) + srvlst) + (lambda (a b) + (< (list-ref a 3) + (list-ref b 3)))))) + (if (> (length slst) nums) + (take slst nums) + slst))) + +(define (server:get-first-best areapath) + (let ((srvrs (server:get-best (server:get-list areapath)))) + (if (and srvrs + (not (null? srvrs))) + (car srvrs) + #f))) + +(define (server:get-rand-best areapath) + (let ((srvrs (server:get-best (server:get-list areapath)))) + (if (and (list? srvrs) + (not (null? srvrs))) + (let* ((len (length srvrs)) + (idx (random len))) + (list-ref srvrs idx)) + #f))) + +(define (server:record->id servr) + (handle-exceptions + exn + (begin + (debug:print-info 0 *default-log-port* "Unable to get server id from " servr ", exn=" exn) + #f) + (match-let (((mod-time host port start-time server-id pid) + servr)) + (if server-id + server-id + #f)))) + +(define (server:record->url servr) + (handle-exceptions + exn + (begin + (debug:print-info 0 *default-log-port* "Unable to get server url from " servr ", exn=" exn) + #f) + (match-let (((mod-time host port start-time server-id pid) + servr)) + (if (and host port) + (conc host ":" port) + #f)))) + +(define (server:get-client-signature) ;; BB> why is this proc named "get-"? it returns nothing -- set! has not return value. + (if *my-client-signature* *my-client-signature* + (let ((sig (server:mk-signature))) + (set! *my-client-signature* sig) + *my-client-signature*))) + +;; wait for server=start-last to be three seconds old +;; +(define (server:wait-for-server-start-last-flag areapath) + (let* ((start-flag (conc areapath "/logs/server-start-last")) + ;;; THIS INTERACTS WITH [server] timeout. Suggest using 0.1 or above for timeout (6 seconds) + (reftime (configf:lookup-number *configdat* "server" "idletime" default: 4)) + (server-key (conc (get-host-name) "-" (current-process-id)))) + (if (file-exists? start-flag) + (let* ((fmodtime (file-modification-time start-flag)) + (delta (- (current-seconds) fmodtime)) + (all-go (> delta reftime))) + (if (and all-go + (begin + (with-output-to-file start-flag + (lambda () + (print server-key))) + (thread-sleep! 0.25) + (let ((res (with-input-from-file start-flag + (lambda () + (read-line))))) + (equal? server-key res)))) + #t ;; (system (conc "touch " start-flag)) ;; lazy but safe + (begin + (debug:print-info 0 *default-log-port* "Gating server start, last start: " + fmodtime ", delta: " delta ", reftime: " reftime ", all-go=" all-go) + (thread-sleep! reftime) + (server:wait-for-server-start-last-flag areapath))))))) + +(define (server:get-num-servers #!key (numservers 2)) + (let ((ns (string->number + (or (configf:lookup *configdat* "server" "numservers") "notanumber")))) + (or ns numservers))) + +(define (server:kill servr) + (handle-exceptions + exn + (begin + (debug:print-info 0 *default-log-port* "Unable to get host and/or port from " servr ", exn=" exn) + #f) + (match-let (((mod-time hostname port start-time server-id pid) + servr)) + (tasks:kill-server hostname pid)))) + +;; called in megatest.scm, host-port is string hostname:port +;; +;; NOTE: This is NOT called directly from clients as not all transports support a client running +;; in the same process as the server. +;; +;; run ping in separate process, safest way in some cases +;; +(define (server:ping-server ifaceport) + (with-input-from-pipe + (conc (common:get-megatest-exe) " -ping " ifaceport) + (lambda () + (let loop ((inl (read-line)) + (res "NOREPLY")) + (if (eof-object? inl) + (case (string->symbol res) + ((NOREPLY) #f) + ((LOGIN_OK) #t) + (else #f)) + (loop (read-line) inl)))))) + +;; NOT USED (well, ok, reference in rpc-transport but otherwise not used). +;; +(define (server:login toppath) + (lambda (toppath) + (set! *db-last-access* (current-seconds)) ;; might not be needed. + (if (equal? *toppath* toppath) + #t + #f))) + +;; timeout is hms string: 1h 5m 3s, default is 1 minute +;; +(define (server:expiration-timeout) + (let ((tmo (configf:lookup *configdat* "server" "timeout"))) + (if (and (string? tmo) + (common:hms-string->seconds tmo)) ;; BUG: hms-string->seconds is broken, if given "10" returns 0. Also, it doesn't belong in this logic unless the string->number is changed below + (* 3600 (string->number tmo)) + 60))) + +(define (server:get-best-guess-address hostname) + (let ((res #f)) + (for-each + (lambda (adr) + (if (not (eq? (u8vector-ref adr 0) 127)) + (set! res adr))) + ;; NOTE: This can fail when there is no mention of the host in /etc/hosts. FIXME + (vector->list (hostinfo-addresses (hostname->hostinfo hostname)))) + (string-intersperse + (map number->string + (u8vector->list + (if res res (hostname->ip hostname)))) "."))) + +;; (define server:sync-lock-token "SERVER_SYNC_LOCK") +;; (define (server:release-sync-lock) +;; (db:no-sync-del! *no-sync-db* server:sync-lock-token)) +;; (define (server:have-sync-lock?) +;; (let* ((have-lock-pair (db:no-sync-get-lock *no-sync-db* server:sync-lock-token)) +;; (have-lock? (car have-lock-pair)) +;; (lock-time (cdr have-lock-pair)) +;; (lock-age (- (current-seconds) lock-time))) +;; (cond +;; (have-lock? #t) +;; ((>lock-age +;; (* 3 (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 180))) +;; (server:release-sync-lock) +;; (server:have-sync-lock?)) +;; (else #f)))) + +;; moving this here as it needs access to db and cannot be in common. +;; + +(define (server:get-bruteforce-syncer dbstruct #!key (fork-to-background #f) (persist-until-sync #f)) + (let* ((sqlite-exe (or (get-environment-variable "MT_SQLITE3_EXE"))) ;; defined in cfg.sh + (sync-log (or ;; (args:get-arg "-sync-log") + (conc *toppath* "/logs/sync-" (current-process-id) "-" (get-host-name) ".log"))) + (tmp-area (common:get-db-tmp-area)) + (tmp-db (conc tmp-area "/megatest.db")) + (staging-file (conc *toppath* "/.megatest.db")) + (mtdbfile (conc *toppath* "/megatest.db")) + (lockfile (common:get-sync-lock-filepath)) + (sync-cmd-core (conc sqlite-exe" " tmp-db " .dump | "sqlite-exe" " staging-file "&>"sync-log)) + (sync-cmd (if fork-to-background + (conc "/usr/bin/env NBFAKE_LOG="*toppath*"/logs/last-server-sync-"(current-process-id)".log nbfake \""sync-cmd-core" && /bin/mv -f " staging-file " " mtdbfile" \"") + sync-cmd-core)) + (default-min-intersync-delay 2) + (min-intersync-delay (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: default-min-intersync-delay)) + (default-duty-cycle 0.1) + (duty-cycle (configf:lookup-number *configdat* "server" "sync-duty-cycle" default: default-duty-cycle)) + (last-sync-seconds 10) ;; we will adjust this to a measurement and delay last-sync-seconds * (1 - duty-cycle) + (calculate-off-time (lambda (work-duration duty-cycle) + (* (/ (- 1 duty-cycle) duty-cycle) last-sync-seconds))) + (off-time min-intersync-delay) ;; adjusted in closure below. + (do-a-sync + (lambda () + ;; (BB> "Start do-a-sync with fork-to-background="fork-to-background" persist-until-sync="persist-until-sync) + (let* ((finalres + (let retry-loop ((num-tries 0)) + (if (common:simple-file-lock lockfile) + (begin + (cond + ((not (or fork-to-background persist-until-sync)) + (debug:print 0 *default-log-port* "INFO: syncer thread sleeping for max of (server.minimum-intersync-delay="min-intersync-delay + " , off-time="off-time" seconds ]") + (thread-sleep! (max off-time min-intersync-delay))) + (else + (debug:print 0 *default-log-port* "INFO: syncer thread NOT sleeping ; maybe time-to-exit..."))) -(define (just-testing) - (print "JUST TESTING")) + (if (not (configf:lookup *configdat* "server" "disable-db-snapshot")) + (common:snapshot-file mtdbfile subdir: ".db-snapshot")) + (delete-file* staging-file) + (let* ((start-time (current-milliseconds)) + (res (system sync-cmd)) + (dbbackupfile (conc mtdbfile ".backup")) + (res2 + (cond + ((eq? 0 res ) + (handle-exceptions + exn + #f + (if (file-exists? dbbackupfile) + (delete-file* dbbackupfile) + ) + (if (eq? 0 (file-size sync-log)) + (delete-file* sync-log)) + (system (conc "/bin/mv " staging-file " " mtdbfile)) + + (set! last-sync-seconds (/ (- (current-milliseconds) start-time) 1000)) + (set! off-time (calculate-off-time + last-sync-seconds + (cond + ((and (number? duty-cycle) (> duty-cycle 0) (< duty-cycle 1)) + duty-cycle) + (else + (debug:print 0 *default-log-port* "WARNING: ["(common:human-time)"] server.sync-duty-cycle is invalid. Should be a number between 0 and 1, but "duty-cycle" was specified. Using default value: "default-duty-cycle) + default-duty-cycle)))) + + (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" SYNC took "last-sync-seconds" sec") + (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" SYNC took "last-sync-seconds" sec ; with duty-cycle of "duty-cycle" off time is now "off-time) + 'sync-completed)) + (else + (system (conc "/bin/cp "sync-log" "sync-log".fail")) + (debug:print 0 *default-log-port* "ERROR: ["(common:human-time)"] Sync failed. See log at "sync-log".fail") + (if (file-exists? (conc mtdbfile ".backup")) + (system (conc "/bin/cp "mtdbfile ".backup " mtdbfile))) + #f)))) + (common:simple-file-release-lock lockfile) + ;; (BB> "released lockfile: " lockfile) + #;(when (common:file-exists? lockfile) + (BB> "DID NOT ACTUALLY RELEASE LOCKFILE")) + res2) ;; end let + );; end begin + ;; else + (cond + (persist-until-sync + (thread-sleep! 1) + (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" other SYNC in progress; we're in a fork-to-background so we need to succeed. Let's wait a jiffy and and try again. num-tries="num-tries" (waiting for lockfile="lockfile" to disappear)") + (retry-loop (add1 num-tries))) + (else + (thread-sleep! (max off-time (+ last-sync-seconds min-intersync-delay))) + (debug:print 1 *default-log-port* "INFO: ["(common:human-time)"] pid="(current-process-id)" other SYNC in progress; not syncing.") + 'parallel-sync-in-progress)) + ) ;; end if got lockfile + ) + )) + ;; (BB> "End do-a-sync with fork-to-background="fork-to-background" persist-until-sync="persist-until-sync" and result="finalres) + finalres) + ) ;; end lambda + )) + do-a-sync)) ;; (define (debug:print . params) #f) ;; (define (debug:print-info . params) #f) ;; ;; (define (set-functions dbgp dbgpinfo)