@@ -39,11 +39,11 @@ res)) ;; rpc receiver (define (rpc-transport:api-exec cmd params) - (let* ( (resdat (api:execute-requests *inmemdb* (vector cmd params))) ;; #( flag result ) + (let* ( (resdat (api:execute-requests *dbstruct-db* (vector cmd params))) ;; #( flag result ) (flag (vector-ref resdat 0)) (res (vector-ref resdat 1))) (mutex-lock! *heartbeat-mutex*) @@ -149,13 +149,13 @@ ;; TODO: (low) the following is extraordinaritly slow. Maybe we don't even need portlogger for rpc anyway?? the exception-based failover when ports are taken is fast! ;;(portlogger:open-run-close portlogger:set-port (rpc:default-server-port) "released") (set! *time-to-exit* #t) - ;;(if *inmemdb* (db:sync-touched *inmemdb* *run-id* force-sync: #t)) - - + ;;(if *dbstruct-db* (db:sync-touched *dbstruct-db* *run-id* force-sync: #t)) + + (server:remove-dotserver-file *toppath* "anyhost:anyport" force: #t) (tasks:server-delete-record (db:delay-if-busy (tasks:open-db)) server-id " rpc-transport:keep-running complete") ;;(BB> "Before (exit) (from-on-exit="from-on-exit")") ;;(unless from-on-exit (exit)) ;; sometimes we hang (around) here with 100% cpu. @@ -195,62 +195,38 @@ ;; (when *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it ;; (current-error-port *alt-log-file*) ;; (current-output-port *alt-log-file*))) ;; double check we dont alrady have a running server for this run-id - (when (server:check-if-running run-id) + (when (and (server:read-dotserver *toppath*) + (server:check-if-running run-id)) (debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running") (exit 0)) - - ;; clean up dead servers (duped in megatest.scm in -list-servers processing; may want to consolidate into proc) - (for-each - (lambda (server) - (let* ((id (vector-ref server 0)) - (pid (vector-ref server 1)) - (hostname (vector-ref server 2)) - (interface (vector-ref server 3)) - (pullport (vector-ref server 4)) - (pubport (vector-ref server 5)) - (start-time (vector-ref server 6)) - (priority (vector-ref server 7)) - (state (vector-ref server 8)) - (mt-ver (vector-ref server 9)) - (last-update (vector-ref server 10)) - (transport (vector-ref server 11)) - (killed #f) - (status (< last-update 20))) - - (if (equal? state "dead") - (if (> last-update (* 25 60 60)) ;; keep records around for slighly over a day. - (tasks:server-deregister (db:delay-if-busy (tasks:open-db)) hostname pullport: pullport pid: pid action: 'delete)) - (if (> last-update 20) ;; Mark as dead if not updated in last 20 seconds - (tasks:server-deregister (db:delay-if-busy (tasks:open-db)) hostname pullport: pullport pid: pid))) - ;;(format #t fmtstr id mt-ver pid hostname (conc interface ":" pullport) pubport last-update - ;; (if status "alive" "dead") transport) - ;; (if (or (equal? id sid) - ;; (equal? sid 0)) ;; kill all/any - ;; (begin - ;; (debug:print-info 0 *default-log-port* "Attempting to kill "kill-switch" server with pid " pid) - ;; (tasks:kill-server hostname pid kill-switch: kill-switch))) - - ) - - ) - (tasks:get-all-servers (db:delay-if-busy (tasks:open-db)))) - + ;; did not find server running, let's clean up the table of dead servers + (tasks:server-force-clean-running-records-for-run-id (db:delay-if-busy (tasks:open-db)) run-id "notresponding") + + (server:dotserver-starting) + + + + + ;; let's get a server-id for this server ;; if at first we do not suceed, try 3 more times. (let ((server-id (retry-thunk (lambda () (tasks:server-lock-slot (db:delay-if-busy (tasks:open-db)) run-id 'rpc)) chatty: #f final-failure-returns-actual: #t retries: 4))) (when (not server-id) ;; dang we couldn't get a server-id. ;; since we didn't get the server lock we are going to clean up and bail out + + (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") (tasks:server-delete-records-for-this-pid (db:delay-if-busy (tasks:open-db)) " rpc-transport:launch") + (server:dotserver-starting-remove) (exit 1)) ;; we got a server-id (and a corresponding entry in servers table in globally shared mdb) ;; all systems go. Proceed to setup rpc server. (rpc-transport:run @@ -408,11 +384,14 @@ ;; It is our handle on the listening tcp port ;; We will attach this to our rpc server with rpc:make-server in thread th1 . (rpc:listener (rpc-transport:find-free-port-and-open start-port)) (th1 (make-thread (lambda () - ((rpc:make-server rpc:listener) #t) ) + ;;(BB> "BEFORE rpc:make-server") + ((rpc:make-server rpc:listener) #t) + ;;(BB> "BEFORE rpc:make-server") + ) "rpc:server")) (hostname (if (string=? "-" hostn) (get-host-name) @@ -425,52 +404,64 @@ (hostname->ip hostn))) ".") )) (portnum (let ((res (rpc:default-server-port))) res)) (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" portnum))) + (when (not (equal? ipaddrstr (server:get-best-guess-address (get-host-name)))) + + (debug:print 0 *default-log-port* "Error: This host "(ip->string (hostname->ip (get-host-name)))" ("(get-host-name)") is not the homehost "ipaddrstr" ("(ip->hostname (string->ip ipaddrstr))"; Cannot proceed.") + (server:dotserver-starting-remove) + (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port + (exit)) + (tasks:server-set-interface-port (db:delay-if-busy (tasks:open-db)) server-id ipaddrstr portnum) ;;============================================================ ;; activate thread th1 to attach opened tcp port to rpc server ;;============================================================= (thread-start! th1) - (set! db *inmemdb*) + (set! db *dbstruct-db*) (debug:print 0 *default-log-port* "Server started on " host:port) - - ;; (thread-sleep! 5) - + ;;(BB> "before SELF-TEST") (if (retry-thunk (lambda () (rpc-transport:self-test run-id ipaddrstr portnum)) - final-failure-returns-actual: #t + final-failure-returns-actual: #t ;; TODO: remove this line ) (debug:print 0 *default-log-port* "INFO: rpc self test passed!") (begin (debug:print 0 *default-log-port* "Error: rpc listener did not pass self test. Shutting down. On: " host:port) (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "dead") (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port (rpc-transport:server-shutdown server-id rpc:listener) + (server:dotserver-starting-remove) (exit))) - (mutex-lock! *heartbeat-mutex*) - (set! *last-db-access* (current-seconds)) - (mutex-unlock! *heartbeat-mutex*) + + + ;;(on-exit (lambda () ;; (rpc-transport:server-shutdown server-id rpc:listener from-on-exit: #t))) ;; check again for running servers for this run-id in case one has snuck in since we checked last in rpc-transport:launch (if (not (equal? server-id (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id)));; try to ensure no double registering of servers (begin ;; i am not the server, another server snuck in and beat this one to the punch (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port - (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "collision")) + (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "collision") + (server:dotserver-starting-remove)) (begin ;; i am the server ;; setup the in-memory db - (set! *inmemdb* (db:setup run-id)) - (db:get-db *inmemdb* run-id) + (set! *dbstruct-db* (db:setup run-id)) + (db:get-db *dbstruct-db* run-id) + ;; at this point, satisfied server has started ;; let's make it official + (server:write-dotserver *toppath* (conc ipaddrstr ":" portnum)) + (mutex-lock! *heartbeat-mutex*) + (set! *last-db-access* (current-seconds)) + (mutex-unlock! *heartbeat-mutex*) (set! *rpc:listener* rpc:listener) (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running") ;; update our mdb servers entry @@ -477,31 +468,36 @@ ;; this let loop will hold open this thread until we want the server to shut down. ;; if no requests received within the last 20 seconds : ;; database hasnt changed in ?? ;; - ;; begin new loop - ;; keep-running loop: polls last-db-access to see if we have timed out. + + + ;; keep-running loop: polls last-db-access to see if we have timed out. keep running if not. (let loop ((count 0) (bad-sync-count 0)) - + (BB> "keep running: count = "count) ;; Use this opportunity to sync the inmemdb to db + (let ((start-time (current-milliseconds)) (sync-time #f) (rem-time #f)) - ;; inmemdb is a dbstruct - (condition-case - (db:sync-touched *inmemdb* *run-id* force-sync: #t) - ((sync-failed)(cond - ((> bad-sync-count 10) ;; time to give up - (rpc-transport:server-shutdown server-id rpc:listener)) - (else ;; (> bad-sync-count 0) ;; we've had a fail or two, delay and loop - (thread-sleep! 5) - (loop count (+ bad-sync-count 1))))) - ((exn) - (debug:print-error 0 *default-log-port* "error from sync code other than 'sync-failed. Attempting to gracefully shutdown the server") - (rpc-transport:server-shutdown server-id rpc:listener))) + + ;; following is now done in common:watchdog, commenting out. sync-time will now be 0; can live with that. + ;; ;; inmemddb is a dbstruct + ;; (condition-case + ;; (db:sync-touched *dbstruct-db* *run-id* force-sync: #t) + ;; ((sync-failed)(cond + ;; ((> bad-sync-count 10) ;; time to give up + ;; (rpc-transport:server-shutdown server-id rpc:listener)) + ;; (else ;; (> bad-sync-count 0) ;; we've had a fail or two, delay and loop + ;; (thread-sleep! 5) + ;; (loop count (+ bad-sync-count 1))))) + ;; ((exn) + ;; (debug:print-error 0 *default-log-port* "error from sync code other than 'sync-failed. Attempting to gracefully shutdown the server ") + ;; (rpc-transport:server-shutdown server-id rpc:listener))) + (set! sync-time (- (current-milliseconds) start-time)) (set! rem-time (quotient (- 4000 sync-time) 1000)) (debug:print 4 *default-log-port* "SYNC: time= " sync-time ", rem-time=" rem-time) (if (and (<= rem-time 4) @@ -556,10 +552,11 @@ (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running")) ;; (loop 0 bad-sync-count)) (begin ;;(BB> "SERVER SHUTDOWN CALLED! last-access="last-access" current-seconds="(current-seconds)" server-timeout="server-timeout) + (rpc-transport:server-shutdown server-id rpc:listener))))) ;; end new loop ))))