Index: apimod.scm ================================================================== --- apimod.scm +++ apimod.scm @@ -185,11 +185,10 @@ (api:run-server-process apath dbname) 'server-started) (else (debug:print-info 0 *default-log-port* "api:start-server called with wrong params: "params) 'bad-params))))) - (define (api:dispatch-cmd dbstruct cmd params) (case cmd ;;=============================================== ;; READ/WRITE QUERIES @@ -199,10 +198,11 @@ ;; SERVERS ;; ((start-server) (apply server:kind-run params)) ((kill-server) (set! *server-run* #f)) ((get-server) (api:start-server dbstruct params)) + ((register-server) (apply db:register-server dbstruct params) );; dbstruct host port servkey pid ipaddr dbpath) ;; TESTS ;;((test-set-state-status-by-id) (apply mt:test-set-state-status-by-id dbstruct params)) ;;BB - commented out above because it was calling below, eventually, incorrectly (dbstruct passed to mt:test-set-state-status-by-id, which previosly did more, but now only passes thru to db:set-state-status-and-roll-up-items. Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -1141,11 +1141,11 @@ (if (args:get-arg "-server") (if (not (args:get-arg "-db")) (debug:print 0 *default-log-port* "ERROR: -db required to start server") (let ((tl (launch:setup)) (dbname (args:get-arg "-db"))) ;; transport-type (string->symbol (or (args:get-arg "-transport") "http")))) - (server:launch dbname) + (rmt:launch dbname) (set! *didsomething* #t)))) ;; The adjutant is a bit different, it does NOT run (launch:setup) as it is not necessarily tied to ;; a specific Megatest area. Detail are being hashed out and this may change. ;; Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -258,29 +258,39 @@ )) #t) (start-main-srv))) (start-main-srv)))) +(define (rmt:main-open-connection remote apath) + (rmt:open-main-connection remote apath) + (rmt:get-connection remote apath (db:run-id->dbname #f))) + ;; NB// remote is a rmt:remote struct ;; -(define (rmt:general-open-connection remote apath dbname) +(define (rmt:general-open-connection remote apath dbname #!key (num-tries 5)) (let ((mainconn (rmt:get-connection remote apath (db:run-id->dbname #f)))) ;; (debug:print 0 *default-log-port* "remote: " remote) (if (not mainconn) (begin (rmt:open-main-connection remote apath) (thread-sleep! 1) (rmt:general-open-connection remote apath dbname)) ;; we have a connection to main, ask for contact info for dbname (let* ((res (rmt:send-receive 'get-server #f `(,apath ,dbname)))) - ;; (print "rmt:general-open-connection got res="res) - res)))) - + (case res + ((server-started) + (if (> num-tries 0) + (begin + (thread-sleep! 2) + (rmt:general-open-connection remote apath dbname num-tries: (- num-tries 1))) + 'failed)) + (else + res)))))) ;;====================================================================== -;; Defaults to +;; Defaults to current area ;; (define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) (if (not *rmt:remote*)(set! *rmt:remote* (make-rmt:remote))) (let* ((apath *toppath*) (conns *rmt:remote*) @@ -1483,17 +1493,25 @@ "/" (servdat-uuid *server-info*) ".pkt")) (dbfile (servdat-dbfile *server-info*))) (debug:print-info 0 *default-log-port* "removing pkt "pkt-file) (delete-file* pkt-file) - (if (and dbfile - (string-match ".*/main.db$" dbfile)) - (begin - (debug:print-info 0 *default-log-port* "Releasing lock for "dbfile) - (db:with-lock-db (servdat-dbfile *server-info*) - (lambda (dbh dbfile) - (db:release-lock dbh dbfile))))))) + (if dbfile + (if (string-match ".*/main.db$" dbfile) + (begin + (debug:print-info 0 *default-log-port* "Releasing lock for "dbfile) + (db:with-lock-db (servdat-dbfile *server-info*) + (lambda (dbh dbfile) + (db:release-lock dbh dbfile)))) + (let* ((sdat *server-info*)) ;; we have a run-id server + (rmt:send-receive-real *rmt:remote* *toppath* + (db:run-id->dbname #f) + #f 'register-server + `(,(servdat-uuid sdat) + ,(current-process-id) + ,(servdat-host sdat) ;; iface + ,(servdat-port sdat)))))))) (if (bdat-task-db *bdat*) ;; TODO: Check that this is correct for task db (let ((db (cdr (bdat-task-db *bdat*)))) (if (sqlite3:database? db) (begin (sqlite3:interrupt! db) @@ -1878,24 +1896,24 @@ ((equal? (uri-path (request-uri (current-request))) '(/ "dashboard")) (send-response body: ((http-get-function 'http-transport:html-dboard) $) headers: '((content-type text/HTML)))) (else (continue)))))))) - #;(handle-exceptions - exn - (debug:print 0 *default-log-port* "Failed to create file " start-file ", exn=" exn) - (with-output-to-file start-file (lambda ()(print (current-process-id))))) (http-transport:try-start-server ipaddrstr start-port))) -;; This is recursively run by http-transport:run until sucessful +;; This is recursively run by http-transport:run until sucessful, it then runs until server is stopped ;; (define (http-transport:try-start-server ipaddrstr portnum) (let ((config-hostname (configf:lookup *configdat* "server" "hostname")) (config-use-proxy (equal? (configf:lookup *configdat* "client" "use-http_proxy") "yes"))) (if (not config-use-proxy) (determine-proxy (constantly #f))) - (debug:print-info 0 *default-log-port* "http-transport:try-start-server time=" (seconds->time-string (current-seconds)) " ipaddrsstr=" ipaddrstr " portnum=" portnum " config-hostname=" config-hostname) + (debug:print-info 0 *default-log-port* "http-transport:try-start-server time=" + (seconds->time-string (current-seconds)) + " ipaddrsstr=" ipaddrstr + " portnum=" portnum + " config-hostname=" config-hostname) (handle-exceptions exn (begin (print-error-message exn) (if (< portnum 64000) @@ -1973,48 +1991,10 @@ (define (http-transport:inc-requests-and-prep-to-close-all-connections) (mutex-lock! *http-mutex*) (set! *http-requests-in-progress* (+ 1 *http-requests-in-progress*))) -;; serverdat contains uuid to be used for connection validation -;; -;; NOTE: serverdat must be initialized or created by servdat-init -;; -;; DO NOT USE. Moved to rmt:set-receive-real -;; -;; (define (http-transport:send-receive conn qry-key cmd params #!key (numretries 3)) -;; (let* ((res #f) -;; (success #t) -;; (sparams (with-output-to-string -;; (lambda ()(write params))))) -;; ;; send the data and get the response extract the needed info from -;; ;; the http data and process and return it. -;; (let* ((send-recieve (lambda () -;; (set! res -;; (with-input-from-request -;; (rmt:conn->uri conn "api") -;; (list (cons 'key qry-key) -;; ;; (cons 'srvid (servdat-uuid sdat)) -;; (cons 'cmd cmd) -;; (cons 'params sparams)) -;; read-string)))) -;; (time-out (lambda () -;; (thread-sleep! 45) -;; (debug:print 0 *default-log-port* "WARNING: send-receive took more than 45 seconds!!") -;; #f)) -;; (th1 (make-thread send-recieve "with-input-from-request")) -;; (th2 (make-thread time-out "time out"))) -;; (thread-start! th1) -;; (thread-start! th2) -;; (thread-join! th1) -;; (close-idle-connections!) -;; (thread-terminate! th2) -;; (if (string? res) -;; (with-input-from-string res -;; (lambda () read)) -;; res)))) - ;; careful closing of connections stored in *runremote* ;; (define (http-transport:close-connections #!key (area-dat #f)) (debug:print-info 0 *default-log-port* "http-transport:close-connections doesn't do anything now!")) ;; (let* ((runremote (or area-dat *runremote*)) @@ -2233,10 +2213,12 @@ ;;====================================================================== ;; END NEW SERVER METHOD ;;====================================================================== +;; if .db/main.db check the pkts +;; (define (http-transport:wait-for-server pkts-dir db-file server-key) (let* ((sdat *server-info*)) (let loop ((start-time (current-seconds)) (changed #t) (last-sdat "not this")) @@ -2297,139 +2279,126 @@ (debug:print-error 0 *default-log-port* "transport appears to have died, exiting server") (exit)) (loop start-time (equal? sdat last-sdat) sdat)))))))) + +(define (rmt:register-server remote apath iface port server-key db-file) + (rmt:send-receive-real remote apath ;; params: host port servkey pid ipaddr dbpath + (db:run-id->dbname #f) #f 'register-server `(,iface + ,port + ,server-key + ,(current-process-id) + ,iface + ,db-file))) + +(define (http-transport:wait-for-stable-interface #!optional (num-tries-allowed 100)) + ;; wait until *server-info* stops changing + (let loop ((sdat #f) ;; this is our copy of the *last* *server-info* + (tries 0)) + ;; first we verify port and interface, update *server-info* in need be. + (cond + ((> tries num-tries-allowed) + (debug:print 0 *default-log-port* "http-transport:keep-running, giving up after trying for several minutes.") + (exit 1)) + ((not *server-info*) + (thread-sleep! 1.5) + (loop *server-info* (+ tries 1))) + ((not sdat) + (debug:print 0 *default-log-port* "http-transport:keep-running, impossible, should never get here.") + (thread-sleep! 1.5) + (loop *server-info* (+ tries 1))) + ((or (not (equal? (servdat-host sdat)(servdat-host *server-info*))) + (not (equal? (servdat-port sdat)(servdat-port *server-info*)))) + (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info") + (thread-sleep! 1.5) + (loop *server-info* (+ tries 1))) + (else + (if (not *server-id*)(set! *server-id* (server:mk-signature))) + (debug:print 0 *default-log-port* + "SERVER STARTED: " (servdat-host *server-info*) + ":" (servdat-port *server-info*) + " AT " (current-seconds) " server-id: " *server-id*) + (flush-output *default-log-port*) + #t)))) ;; run http-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (http-transport:keep-running dbname) ;; if none running or if > 20 seconds since ;; server last used then start shutdown ;; This thread waits for the server to come alive (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server") - (let* ((run-id (let ((rid (args:get-arg "-run-id"))) ;; consider getting rid of the -run-id mechanism - (if rid ;; replace with -db - (string->number rid) - #f))) - (db-file (if dbname - (db:dbname->path *toppath* dbname) - (db:run-id->path *toppath* run-id))) - (sdat #f) - ;; (tmp-area (common:get-db-tmp-area)) - (server-start-time (current-seconds)) + + (let* ((server-start-time (current-seconds)) (pkts-dir (get-pkts-dir)) (server-key (server:mk-signature)) - (server-info (http-transport:wait-for-server pkts-dir db-file server-key )) - (iface (servdat-host server-info)) - (port (servdat-port server-info)) - (last-access 0) - (server-timeout (server:expiration-timeout)) - (server-log-file (args:get-arg "-log"))) ;; always set when we are a server - - (let loop ((count 0) - (server-state 'available) - (bad-sync-count 0) - (start-time (current-milliseconds))) - ;; Use this opportunity to sync the tmp db to megatest.db NOTE: This conflicts with the watchdog syncing? - (if (not *dbstruct-db* ) - (let ((watchdog (bdat-watchdog *bdat*))) - (debug:print 0 *default-log-port* "SERVER: dbprep") - - (db:setup dbname) ;; sets *dbstruct-db* as side effect - ;; NOW REGISTER THE SERVER in main.db - - - - - - - - - - - - - - - - - - - - - (debug:print 0 *default-log-port* "SERVER: running, megatest version: " (common:get-full-version)) ;; NOTE: the server is NOT yet marked as running in the log. We do that in the keep-running routine. - (if watchdog - (if (not (member (thread-state watchdog) '(ready running blocked sleeping dead))) - (begin - (debug:print-info 0 *default-log-port* "Starting watchdog thread (in state "(thread-state watchdog)")") - (thread-start! watchdog))) - (debug:print 0 *default-log-port* "ERROR: *watchdog* not setup, cannot start it.")))) - - ;; when things go wrong we don't want to be doing the various queries too often - ;; so we strive to run this stuff only every four seconds or so. - (let* ((sync-time (- (current-milliseconds) start-time)) - (rem-time (quotient (- 4000 sync-time) 1000))) - (if (and (<= rem-time 4) - (> rem-time 0)) - (thread-sleep! rem-time))) - - (if (< count 1) ;; 3x3 = 9 secs aprox - (loop (+ count 1) 'running bad-sync-count (current-milliseconds))) - - ;; Check that iface and port have not changed (can happen if server port collides) - (mutex-lock! *heartbeat-mutex*) - (set! sdat *server-info*) - (mutex-unlock! *heartbeat-mutex*) - - (if (or (not (equal? (servdat-host sdat) iface)) - (not (equal? (servdat-port sdat) port))) - (let ((new-iface (servdat-host sdat)) - (new-port (servdat-port sdat))) - (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info") - (set! iface new-iface) - (set! port new-port) - (if (not *server-id*) - (set! *server-id* (server:mk-signature))) - ;; (debug:print 0 *default-log-port* (current-seconds) (current-directory) (current-process-id) (argv)) - (debug:print 0 *default-log-port* "SERVER STARTED: " iface ":" port " AT " (current-seconds) " server-id: " *server-id*) - (flush-output *default-log-port*))) - - ;; Transfer *db-last-access* to last-access to use in checking that we are still alive - (mutex-lock! *heartbeat-mutex*) - (set! last-access *db-last-access*) - (mutex-unlock! *heartbeat-mutex*) - - (if (common:low-noise-print 120 (conc "server running on " iface ":" port)) - (begin - (if (not *server-id*) - (set! *server-id* (server:mk-signature))) - ;; (debug:print 0 *default-log-port* (current-seconds) (current-directory) (current-process-id) (argv)) - (debug:print 0 *default-log-port* "SERVER STARTED: " iface ":" port " AT " (current-seconds) " server-id: " *server-id*) - (flush-output *default-log-port*))) - (if (common:low-noise-print 60 "dbstats") - (begin - (debug:print 0 *default-log-port* "Server stats:") - (db:print-current-query-stats))) - (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600))) - (cond - ((and *server-run* - (> (+ last-access server-timeout) - (current-seconds))) - (if (common:low-noise-print 120 "server continuing") - (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) - (let ((curr-time (current-seconds))) - (handle-exceptions - exn - (debug:print 0 *default-log-port* "ERROR: Failed to change timestamp on log file " server-log-file ". Are you out of space on that disk? exn=" exn) - (if (and server-log-file (not *server-overloaded*)) - (set-file-times! server-log-file curr-time curr-time))))) - (loop 0 server-state bad-sync-count (current-milliseconds))) - (else - (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access)) - (http-transport:server-shutdown port))))))) + (is-main (equal? (args:get-arg "-db") ".db/main.db")) + (last-access 0) + (server-timeout (server:expiration-timeout))) + ;; exits if nothing found in 100 tries (switch to a duration would be good) + (http-transport:wait-for-stable-interface) + (if is-main (http-transport:wait-for-server pkts-dir dbname server-key)) + (let* ((iface (servdat-host *server-info*)) + (port (servdat-port *server-info*))) + (let loop ((count 0) + (server-state 'available) + (bad-sync-count 0) + (start-time (current-milliseconds))) + ;; set up the database handle + (if (not *dbstruct-db*) ;; no db opened yet, open the db and register with main if appropriate + (let ((watchdog (bdat-watchdog *bdat*))) + (debug:print 0 *default-log-port* "SERVER: dbprep") + (db:setup dbname) ;; sets *dbstruct-db* as side effect + + ;; IFF I'm not main, call into main and register self + (if (not is-main) + (debug:print-info 0 + *default-log-port* + "Register server returned: " + (rmt:register-server *rmt:remote* *toppath* iface port server-key dbname))) + + (debug:print 0 *default-log-port* "SERVER: running, megatest version: " (common:get-full-version)) ;; NOTE: the server is NOT yet marked as running in the log. We do that in the keep-running routine. + (if watchdog + (if (not (member (thread-state watchdog) '(ready running blocked sleeping dead))) + (begin + (debug:print-info 0 *default-log-port* "Starting watchdog thread (in state "(thread-state watchdog)")") + (thread-start! watchdog))) + (debug:print 0 *default-log-port* "ERROR: *watchdog* not setup, cannot start it.")))) + + ;; when things go wrong we don't want to be doing the various queries too often + ;; so we strive to run this stuff only every four seconds or so. + (let* ((sync-time (- (current-milliseconds) start-time)) + (rem-time (quotient (- 4000 sync-time) 1000))) + (if (and (<= rem-time 4) + (> rem-time 0)) + (thread-sleep! rem-time))) + + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1) 'running bad-sync-count (current-milliseconds))) + + ;; Transfer *db-last-access* to last-access to use in checking that we are still alive + (mutex-lock! *heartbeat-mutex*) + (set! last-access *db-last-access*) + (mutex-unlock! *heartbeat-mutex*) + + (if (common:low-noise-print 60 "dbstats") + (begin + (debug:print 0 *default-log-port* "Server stats:") + (db:print-current-query-stats))) + (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600))) + (cond + ((and *server-run* + (> (+ last-access server-timeout) + (current-seconds))) + (if (common:low-noise-print 120 "server continuing") + (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access))) + (loop 0 server-state bad-sync-count (current-milliseconds))) + (else + (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access)) + (http-transport:server-shutdown port)))))))) (define (http-transport:server-shutdown port) (begin ;;(BB> "http-transport:server-shutdown called") (debug:print-info 0 *default-log-port* "Starting to shutdown the server. pid="(current-process-id)) @@ -2468,53 +2437,39 @@ (debug:print-info 0 *default-log-port* "Server shutdown complete. Exiting") (exit))) ;; Call this to start the actual server ;; - ;; all routes though here end in exit ... ;; -;; start_server? +;; This is the point at which servers are started ;; -(define (http-transport:launch dbname) - (let* (;; (tmp-area (common:get-db-tmp-area)) - ;; (server-start (conc tmp-area "/.server-start")) - ;; (server-started (conc tmp-area "/.server-started")) - ;; (start-time (common:lazy-modification-time server-start)) - ;; (started-time (common:lazy-modification-time server-started)) - ;; (server-starting (< start-time started-time)) ;; if start-time is less than started-time then a server is still starting - ;; (start-time-old (> (- (current-seconds) start-time) 5)) - (cleanup-proc (lambda (msg) - (let* ((serv-fname (conc "server-" (current-process-id) "-" (get-host-name) ".log")) - (full-serv-fname (conc *toppath* "/logs/" serv-fname)) - (new-serv-fname (conc *toppath* "/logs/" "defunct-" serv-fname))) - (debug:print 0 *default-log-port* msg) - (if (common:file-exists? full-serv-fname) - (system (conc "sleep 1;mv -f " full-serv-fname " " new-serv-fname)) - (debug:print 0 *default-log-port* "INFO: cannot move " full-serv-fname " to " new-serv-fname)) - (exit))))) - #;(common:save-pkt `((action . start) - (T . server) - (pid . ,(current-process-id))) - *configdat* #t) - (let* ((th2 (make-thread (lambda () - (debug:print-info 0 *default-log-port* "Server run thread started") - (http-transport:run - (if (args:get-arg "-server") - (args:get-arg "-server") - "-") - )) "Server run")) - (th3 (make-thread (lambda () - (debug:print-info 0 *default-log-port* "Server monitor thread started") - (http-transport:keep-running dbname) - "Keep running")))) - (thread-start! th2) - (thread-sleep! 0.252) ;; give the server time to settle before starting the keep-running monitor. - (thread-start! th3) - (set! *didsomething* #t) - (thread-join! th2) - (exit)))) +(define (rmt:launch dbname) + ;;(let* ((tmp-area (common:get-db-tmp-area)) + ;; (server-start (conc tmp-area "/.server-start")) + ;; (server-started (conc tmp-area "/.server-started")) + ;; (start-time (common:lazy-modification-time server-start)) + ;; (started-time (common:lazy-modification-time server-started)) + ;; (server-starting (< start-time started-time)) ;; if start-time is less than started-time then a server is still starting + ;; (start-time-old (> (- (current-seconds) start-time) 5)) + (let* ((th2 (make-thread (lambda () + (debug:print-info 0 *default-log-port* "Server run thread started") + (http-transport:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-") + )) "Server run")) + (th3 (make-thread (lambda () + (debug:print-info 0 *default-log-port* "Server monitor thread started") + (http-transport:keep-running dbname) + "Keep running")))) + (thread-start! th2) + (thread-sleep! 0.252) ;; give the server time to settle before starting the keep-running monitor. + (thread-start! th3) + (set! *didsomething* #t) + (thread-join! th2) + (exit))) ;; Generate a unique signature for this server (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string @@ -2546,21 +2501,10 @@ ;;====================================================================== ;; S E R V E R ;;====================================================================== - -;; Call this to start the actual server -;; - -;; all routes though here end in exit ... -;; -;; start_server -;; -(define (server:launch dbname) - (http-transport:launch dbname)) - ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== ;; NOT USED (well, ok, reference in rpc-transport but otherwise not used). Index: tests/unittests/basicserver.scm ================================================================== --- tests/unittests/basicserver.scm +++ tests/unittests/basicserver.scm @@ -20,11 +20,11 @@ ;; Run like this: ;; ;; ./rununittest.sh server 1;(cd simplerun;megatest -stop-server 0) -(import rmtmod trace http-transportmod http-client apimod dbmod) +(import rmtmod trace http-client apimod dbmod) (trace-call-sites #t) (trace ;; db:get-dbdat ;; rmt:find-main-server ;; rmt:send-receive-real @@ -47,22 +47,21 @@ (rmt:conn-port *main*) tdat))) (list 'a '(a "b" 123 1.23 ))) (test #f #t (number? (rmt:send-receive 'ping #f 'hello))) (trace - ;; rmt:send-receive - ;; with-input-from-request - ;; rmt:get-connection - ;; with-input-from-request + rmt:register-server ) (define *db* (db:setup #f)) (test #f 'server-started (api:execute-requests *db* 'get-server (list *toppath* ".db/1.db"))) (set! *dbstruct-db* #f) (test #f 'server-started (rmt:general-open-connection *rmt:remote* *toppath* ".db/1.db")) -(test #f '("SYSTEM" "RELEASE") (rmt:get-keys)) +(thread-sleep! 2) +(test #f 'server-started (rmt:general-open-connection *rmt:remote* *toppath* ".db/1.db")) +(test #f '("SYSTEM" "RELEASE") (rmt:get-keys)) (test #f 1 (rmt:register-run '(("SYSTEM" "a")("RELEASE" "b")) "run1" "new" "n/a" "justme" #f)) ;; (delete-file* "logs/1.log") ;; (define run-id 1)