Index: http-transport.scm ================================================================== --- http-transport.scm +++ http-transport.scm @@ -347,10 +347,93 @@ (let* ((api-url (conc "http://" iface ":" port "/api")) (api-uri (uri-reference (conc "http://" iface ":" port "/api"))) (api-req (make-request method: 'POST uri: api-uri)) (server-dat (vector iface port api-uri api-url api-req (current-seconds)))) server-dat)) + +;;; factored out of http-transport:keep-running +;; return #t if a bad sync occurred and a retry is warranted +;; return #f otherwise +;; side effect - cleans up and exits on exception. +(define (http-transport:sync-inmemdb-to-db tdbdat server-state run-id server-id bad-sync-count) + (if *inmemdb* + (let ((start-time (current-milliseconds)) + (sync-time #f) + (rem-time #f) + (sync-retry #f)) + ;; inmemdb is a dbstruct + (condition-case + (db:sync-touched *inmemdb* *run-id* force-sync: #t) + ((sync-failed)(cond + ((> bad-sync-count 10) ;; time to give up + (http-transport:server-shutdown server-id port)) + (else ;; (> bad-sync-count 0) ;; we've had a fail or two, delay and loop + (thread-sleep! 5) + (set! sync-retry #t)))) + ((exn) + (debug:print 0 "ERROR: error from sync code other than 'sync-failed. Attempting to gracefully shutdown the server") + (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " http-transport:keep-running crashed") + (exit))) + (if sync-retry + #t ; return true - retry + (begin + (set! sync-time (- (current-milliseconds) start-time)) + (set! rem-time (quotient (- 4000 sync-time) 1000)) + (debug:print 4 "SYNC: time= " sync-time ", rem-time=" rem-time) + + (if (and (<= rem-time 4) + (> rem-time 0)) + (thread-sleep! rem-time) + (thread-sleep! 4)) ;; fallback for if the math is changed ... + + ;; + ;; no *inmemdb* yet, set running after our first pass through and start the db + ;; + (if (eq? server-state 'available) + (let ((new-server-id (tasks:server-am-i-the-server? (db:delay-if-busy tdbdat) run-id))) ;; try to ensure no double registering of servers + (if (equal? new-server-id server-id) + (begin + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep") + (thread-sleep! 0.5) ;; give some margin for queries to complete before switching from file based access to server based access + (set! *inmemdb* (db:setup run-id)) + ;; force initialization + ;; (db:get-db *inmemdb* #t) + (db:get-db *inmemdb* run-id) + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running")) + (begin ;; gotta exit nicely + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "collision") + (http-transport:server-shutdown server-id port))))) + #f))) ; return #f - don't retry + #f)) ; return #f - don't retry since there is no inmemdb + + +;;; factored out of http-transport:keep-running +(define (http-transport:get-server-info tdbdat server-start-time server-id run-id) + (let loop ((start-time (current-seconds)) + (changed #t) + (last-sdat "not this")) + (let ((sdat #f)) + (thread-sleep! 0.01) + (debug:print-info 0 "Waiting for server alive signature") + (mutex-lock! *heartbeat-mutex*) + (set! sdat *server-info*) + (mutex-unlock! *heartbeat-mutex*) + (if (and sdat + (not changed) + (> (- (current-seconds) start-time) 2)) + sdat + (begin + (debug:print-info 0 "Still waiting, last-sdat=" last-sdat) + (sleep 4) + (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes + (begin + (debug:print 0 "ERROR: transport appears to have died, exiting server " server-id " for run " run-id) + (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature") + (exit)) + (loop start-time + (equal? sdat last-sdat) + sdat))))))) ;; run http-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (http-transport:keep-running server-id run-id) @@ -358,87 +441,25 @@ ;; server last used then start shutdown ;; This thread waits for the server to come alive (debug:print-info 0 "Starting the sync-back, keep alive thread in server for run-id=" run-id) (let* ((tdbdat (tasks:open-db)) (server-start-time (current-seconds)) - (server-info (let loop ((start-time (current-seconds)) - (changed #t) - (last-sdat "not this")) - (let ((sdat #f)) - (thread-sleep! 0.01) - (debug:print-info 0 "Waiting for server alive signature") - (mutex-lock! *heartbeat-mutex*) - (set! sdat *server-info*) - (mutex-unlock! *heartbeat-mutex*) - (if (and sdat - (not changed) - (> (- (current-seconds) start-time) 2)) - sdat - (begin - (debug:print-info 0 "Still waiting, last-sdat=" last-sdat) - (sleep 4) - (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes - (begin - (debug:print 0 "ERROR: transport appears to have died, exiting server " server-id " for run " run-id) - (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature") - (exit)) - (loop start-time - (equal? sdat last-sdat) - sdat))))))) + (server-info (http-transport:get-server-info tdbdat server-start-time server-id run-id)) (iface (car server-info)) (port (cadr server-info)) (last-access 0) (server-timeout (server:get-timeout))) + (let loop ((count 0) (server-state 'available) (bad-sync-count 0)) ;; Use this opportunity to sync the inmemdb to db - (if *inmemdb* - (let ((start-time (current-milliseconds)) - (sync-time #f) - (rem-time #f)) - ;; inmemdb is a dbstruct - (condition-case - (db:sync-touched *inmemdb* *run-id* force-sync: #t) - ((sync-failed)(cond - ((> bad-sync-count 10) ;; time to give up - (http-transport:server-shutdown server-id port)) - (else ;; (> bad-sync-count 0) ;; we've had a fail or two, delay and loop - (thread-sleep! 5) - (loop count server-state (+ bad-sync-count 1))))) - ((exn) - (debug:print 0 "ERROR: error from sync code other than 'sync-failed. Attempting to gracefully shutdown the server") - (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " http-transport:keep-running crashed") - (exit))) - (set! sync-time (- (current-milliseconds) start-time)) - (set! rem-time (quotient (- 4000 sync-time) 1000)) - (debug:print 4 "SYNC: time= " sync-time ", rem-time=" rem-time) - - (if (and (<= rem-time 4) - (> rem-time 0)) - (thread-sleep! rem-time) - (thread-sleep! 4))) ;; fallback for if the math is changed ... - - ;; - ;; no *inmemdb* yet, set running after our first pass through and start the db - ;; - (if (eq? server-state 'available) - (let ((new-server-id (tasks:server-am-i-the-server? (db:delay-if-busy tdbdat) run-id))) ;; try to ensure no double registering of servers - (if (equal? new-server-id server-id) - (begin - (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep") - (thread-sleep! 0.5) ;; give some margin for queries to complete before switching from file based access to server based access - (set! *inmemdb* (db:setup run-id)) - ;; force initialization - ;; (db:get-db *inmemdb* #t) - (db:get-db *inmemdb* run-id) - (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running")) - (begin ;; gotta exit nicely - (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "collision") - (http-transport:server-shutdown server-id port)))))) - + (let ((sync-retry (http-transport:sync-inmemdb-to-db tdbdat server-state run-id server-id bad-sync-count))) + (if sync-retry + (loop count server-state (+ bad-sync-count 1)))) + (if (< count 1) ;; 3x3 = 9 secs aprox (loop (+ count 1) 'running bad-sync-count)) ;; Check that iface and port have not changed (can happen if server port collides) (mutex-lock! *heartbeat-mutex*)