Index: http-transport.scm ================================================================== --- http-transport.scm +++ http-transport.scm @@ -398,12 +398,12 @@ (thread-sleep! 0.5) ;; give some margin for queries to complete before switching from file based access to server based access (set! *dbstruct-db* (db:setup)) ;; run-id)) (set! server-going #t) (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running") (server:write-dotserver *toppath* (conc iface ":" port)) - (delete-file* (conc *toppath* "/.starting-server"))) - (begin ;; gotta exit nicely + (server:dotserver-starting-remove)) + (begin ;; gotta exit nicely (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "collision") (http-transport:server-shutdown server-id port)))))) ;; when things go wrong we don't want to be doing the various queries too often ;; so we strive to run this stuff only every four seconds or so. @@ -520,14 +520,11 @@ ;; all routes though here end in exit ... ;; ;; start_server? ;; (define (http-transport:launch run-id) - (with-output-to-file - (conc *toppath* "/.starting-server") - (lambda () - (print (current-process-id) " on " (get-host-name)))) + (server:dotserver-starting) (let* ((tdbdat (tasks:open-db))) (set! *run-id* run-id) (if (args:get-arg "-daemonize") (begin (daemon:ize) @@ -552,11 +549,12 @@ (- remtries 1))) (begin ;; since we didn't get the server lock we are going to clean up and bail out (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") (tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch") - (delete-file* (conc *toppath* "/.starting-server")) + + (server:dotserver-starting-remove) )) (let* ((th2 (make-thread (lambda () (debug:print-info 0 *default-log-port* "Server run thread started") (http-transport:run (if (args:get-arg "-server") Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -346,11 +346,11 @@ ;; The watchdog is to keep an eye on things like db sync etc. ;; (define *watchdog* (make-thread common:watchdog "Watchdog thread")) -(thread-start! *watchdog*) +;;(thread-start! *watchdog*) (if (args:get-arg "-log") (let ((oup (open-output-file (args:get-arg "-log")))) (debug:print-info 0 *default-log-port* "Sending log output to " (args:get-arg "-log")) (set! *default-log-port* oup))) Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -134,11 +134,14 @@ ((and (not (cdr (remote-hh-dat *runremote*))) ;; are we on a homehost? (not (remote-conndat *runremote*))) ;; and no connection (debug:print-info 12 *default-log-port* "rmt:send-receive, case 6 hh-dat: " (remote-hh-dat *runremote*) " conndat: " (remote-conndat *runremote*)) (mutex-unlock! *rmt-mutex*) (tasks:start-and-wait-for-server (tasks:open-db) 0 15) - (remote-conndat-set! *runremote* (rmt:get-connection-info 0)) ;; calls client:setup which calls client:setup-http + (let* ((cinfo (rmt:get-connection-info 0)) + (transport (vector-ref cinfo 6))) ;; TODO: replace with tasks:server-dat-accessor-?? for transport + (remote-conndat-set! *runremote* cinfo) ;; calls client:setup which calls client:setup-http + (remote-transport-set! *runremote* transport)) (rmt:send-receive cmd rid params attemptnum: attemptnum)) ;; all set up if get this far, dispatch the query ((cdr (remote-hh-dat *runremote*)) ;; we are on homehost (mutex-unlock! *rmt-mutex*) (debug:print-info 12 *default-log-port* "rmt:send-receive, case 7") @@ -156,11 +159,11 @@ ((rpc) (condition-case ;; handling here has caused a lot of problems. However it is needed to deal with attemtped communication to servers that have gone away (rpc-transport:client-api-send-receive 0 conninfo cmd params) ((commfail)(vector #f "communications fail")) ((exn)(vector #f "other fail" (print-call-chain))))) (else - (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported") + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (1)") (exit)))) (success (if (vector? dat) (vector-ref dat 0) #f)) (res (if (vector? dat) (vector-ref dat 1) #f))) (if (vector? conninfo)(http-transport:server-dat-update-last-access conninfo)) ;; refresh access time (debug:print-info 12 *default-log-port* "rmt:send-receive, case 9. conninfo=" conninfo " dat=" dat) Index: rpc-transport.scm ================================================================== --- rpc-transport.scm +++ rpc-transport.scm @@ -39,11 +39,11 @@ res)) ;; rpc receiver (define (rpc-transport:api-exec cmd params) - (let* ( (resdat (api:execute-requests *inmemdb* (vector cmd params))) ;; #( flag result ) + (let* ( (resdat (api:execute-requests *dbstruct-db* (vector cmd params))) ;; #( flag result ) (flag (vector-ref resdat 0)) (res (vector-ref resdat 1))) (mutex-lock! *heartbeat-mutex*) @@ -149,13 +149,13 @@ ;; TODO: (low) the following is extraordinaritly slow. Maybe we don't even need portlogger for rpc anyway?? the exception-based failover when ports are taken is fast! ;;(portlogger:open-run-close portlogger:set-port (rpc:default-server-port) "released") (set! *time-to-exit* #t) - ;;(if *inmemdb* (db:sync-touched *inmemdb* *run-id* force-sync: #t)) - - + ;;(if *dbstruct-db* (db:sync-touched *dbstruct-db* *run-id* force-sync: #t)) + + (server:remove-dotserver-file *toppath* "anyhost:anyport" force: #t) (tasks:server-delete-record (db:delay-if-busy (tasks:open-db)) server-id " rpc-transport:keep-running complete") ;;(BB> "Before (exit) (from-on-exit="from-on-exit")") ;;(unless from-on-exit (exit)) ;; sometimes we hang (around) here with 100% cpu. @@ -195,62 +195,38 @@ ;; (when *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it ;; (current-error-port *alt-log-file*) ;; (current-output-port *alt-log-file*))) ;; double check we dont alrady have a running server for this run-id - (when (server:check-if-running run-id) + (when (and (server:read-dotserver *toppath*) + (server:check-if-running run-id)) (debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running") (exit 0)) - - ;; clean up dead servers (duped in megatest.scm in -list-servers processing; may want to consolidate into proc) - (for-each - (lambda (server) - (let* ((id (vector-ref server 0)) - (pid (vector-ref server 1)) - (hostname (vector-ref server 2)) - (interface (vector-ref server 3)) - (pullport (vector-ref server 4)) - (pubport (vector-ref server 5)) - (start-time (vector-ref server 6)) - (priority (vector-ref server 7)) - (state (vector-ref server 8)) - (mt-ver (vector-ref server 9)) - (last-update (vector-ref server 10)) - (transport (vector-ref server 11)) - (killed #f) - (status (< last-update 20))) - - (if (equal? state "dead") - (if (> last-update (* 25 60 60)) ;; keep records around for slighly over a day. - (tasks:server-deregister (db:delay-if-busy (tasks:open-db)) hostname pullport: pullport pid: pid action: 'delete)) - (if (> last-update 20) ;; Mark as dead if not updated in last 20 seconds - (tasks:server-deregister (db:delay-if-busy (tasks:open-db)) hostname pullport: pullport pid: pid))) - ;;(format #t fmtstr id mt-ver pid hostname (conc interface ":" pullport) pubport last-update - ;; (if status "alive" "dead") transport) - ;; (if (or (equal? id sid) - ;; (equal? sid 0)) ;; kill all/any - ;; (begin - ;; (debug:print-info 0 *default-log-port* "Attempting to kill "kill-switch" server with pid " pid) - ;; (tasks:kill-server hostname pid kill-switch: kill-switch))) - - ) - - ) - (tasks:get-all-servers (db:delay-if-busy (tasks:open-db)))) - + ;; did not find server running, let's clean up the table of dead servers + (tasks:server-force-clean-running-records-for-run-id (db:delay-if-busy (tasks:open-db)) run-id "notresponding") + + (server:dotserver-starting) + + + + + ;; let's get a server-id for this server ;; if at first we do not suceed, try 3 more times. (let ((server-id (retry-thunk (lambda () (tasks:server-lock-slot (db:delay-if-busy (tasks:open-db)) run-id 'rpc)) chatty: #f final-failure-returns-actual: #t retries: 4))) (when (not server-id) ;; dang we couldn't get a server-id. ;; since we didn't get the server lock we are going to clean up and bail out + + (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") (tasks:server-delete-records-for-this-pid (db:delay-if-busy (tasks:open-db)) " rpc-transport:launch") + (server:dotserver-starting-remove) (exit 1)) ;; we got a server-id (and a corresponding entry in servers table in globally shared mdb) ;; all systems go. Proceed to setup rpc server. (rpc-transport:run @@ -408,11 +384,14 @@ ;; It is our handle on the listening tcp port ;; We will attach this to our rpc server with rpc:make-server in thread th1 . (rpc:listener (rpc-transport:find-free-port-and-open start-port)) (th1 (make-thread (lambda () - ((rpc:make-server rpc:listener) #t) ) + ;;(BB> "BEFORE rpc:make-server") + ((rpc:make-server rpc:listener) #t) + ;;(BB> "BEFORE rpc:make-server") + ) "rpc:server")) (hostname (if (string=? "-" hostn) (get-host-name) @@ -425,52 +404,64 @@ (hostname->ip hostn))) ".") )) (portnum (let ((res (rpc:default-server-port))) res)) (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" portnum))) + (when (not (equal? ipaddrstr (server:get-best-guess-address (get-host-name)))) + + (debug:print 0 *default-log-port* "Error: This host "(ip->string (hostname->ip (get-host-name)))" ("(get-host-name)") is not the homehost "ipaddrstr" ("(ip->hostname (string->ip ipaddrstr))"; Cannot proceed.") + (server:dotserver-starting-remove) + (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port + (exit)) + (tasks:server-set-interface-port (db:delay-if-busy (tasks:open-db)) server-id ipaddrstr portnum) ;;============================================================ ;; activate thread th1 to attach opened tcp port to rpc server ;;============================================================= (thread-start! th1) - (set! db *inmemdb*) + (set! db *dbstruct-db*) (debug:print 0 *default-log-port* "Server started on " host:port) - - ;; (thread-sleep! 5) - + ;;(BB> "before SELF-TEST") (if (retry-thunk (lambda () (rpc-transport:self-test run-id ipaddrstr portnum)) - final-failure-returns-actual: #t + final-failure-returns-actual: #t ;; TODO: remove this line ) (debug:print 0 *default-log-port* "INFO: rpc self test passed!") (begin (debug:print 0 *default-log-port* "Error: rpc listener did not pass self test. Shutting down. On: " host:port) (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "dead") (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port (rpc-transport:server-shutdown server-id rpc:listener) + (server:dotserver-starting-remove) (exit))) - (mutex-lock! *heartbeat-mutex*) - (set! *last-db-access* (current-seconds)) - (mutex-unlock! *heartbeat-mutex*) + + + ;;(on-exit (lambda () ;; (rpc-transport:server-shutdown server-id rpc:listener from-on-exit: #t))) ;; check again for running servers for this run-id in case one has snuck in since we checked last in rpc-transport:launch (if (not (equal? server-id (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id)));; try to ensure no double registering of servers (begin ;; i am not the server, another server snuck in and beat this one to the punch (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port - (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "collision")) + (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "collision") + (server:dotserver-starting-remove)) (begin ;; i am the server ;; setup the in-memory db - (set! *inmemdb* (db:setup run-id)) - (db:get-db *inmemdb* run-id) + (set! *dbstruct-db* (db:setup run-id)) + (db:get-db *dbstruct-db* run-id) + ;; at this point, satisfied server has started ;; let's make it official + (server:write-dotserver *toppath* (conc ipaddrstr ":" portnum)) + (mutex-lock! *heartbeat-mutex*) + (set! *last-db-access* (current-seconds)) + (mutex-unlock! *heartbeat-mutex*) (set! *rpc:listener* rpc:listener) (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running") ;; update our mdb servers entry @@ -477,31 +468,36 @@ ;; this let loop will hold open this thread until we want the server to shut down. ;; if no requests received within the last 20 seconds : ;; database hasnt changed in ?? ;; - ;; begin new loop - ;; keep-running loop: polls last-db-access to see if we have timed out. + + + ;; keep-running loop: polls last-db-access to see if we have timed out. keep running if not. (let loop ((count 0) (bad-sync-count 0)) - + (BB> "keep running: count = "count) ;; Use this opportunity to sync the inmemdb to db + (let ((start-time (current-milliseconds)) (sync-time #f) (rem-time #f)) - ;; inmemdb is a dbstruct - (condition-case - (db:sync-touched *inmemdb* *run-id* force-sync: #t) - ((sync-failed)(cond - ((> bad-sync-count 10) ;; time to give up - (rpc-transport:server-shutdown server-id rpc:listener)) - (else ;; (> bad-sync-count 0) ;; we've had a fail or two, delay and loop - (thread-sleep! 5) - (loop count (+ bad-sync-count 1))))) - ((exn) - (debug:print-error 0 *default-log-port* "error from sync code other than 'sync-failed. Attempting to gracefully shutdown the server") - (rpc-transport:server-shutdown server-id rpc:listener))) + + ;; following is now done in common:watchdog, commenting out. sync-time will now be 0; can live with that. + ;; ;; inmemddb is a dbstruct + ;; (condition-case + ;; (db:sync-touched *dbstruct-db* *run-id* force-sync: #t) + ;; ((sync-failed)(cond + ;; ((> bad-sync-count 10) ;; time to give up + ;; (rpc-transport:server-shutdown server-id rpc:listener)) + ;; (else ;; (> bad-sync-count 0) ;; we've had a fail or two, delay and loop + ;; (thread-sleep! 5) + ;; (loop count (+ bad-sync-count 1))))) + ;; ((exn) + ;; (debug:print-error 0 *default-log-port* "error from sync code other than 'sync-failed. Attempting to gracefully shutdown the server ") + ;; (rpc-transport:server-shutdown server-id rpc:listener))) + (set! sync-time (- (current-milliseconds) start-time)) (set! rem-time (quotient (- 4000 sync-time) 1000)) (debug:print 4 *default-log-port* "SYNC: time= " sync-time ", rem-time=" rem-time) (if (and (<= rem-time 4) @@ -556,10 +552,11 @@ (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running")) ;; (loop 0 bad-sync-count)) (begin ;;(BB> "SERVER SHUTDOWN CALLED! last-access="last-access" current-seconds="(current-seconds)" server-timeout="server-timeout) + (rpc-transport:server-shutdown server-id rpc:listener))))) ;; end new loop )))) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -52,11 +52,11 @@ (let ((transport-type (cond ((string? transport-type-raw) (string->symbol transport-type-raw)) (else transport-type-raw)))) - (BB> "server:launch fired for run-id="run-id" transport-type="transport-type) + ;;(BB> "server:launch fired for run-id="run-id" transport-type="transport-type) (case transport-type ((http)(http-transport:launch run-id)) ;;((nmsg)(nmsg-transport:launch run-id)) ((rpc) (rpc-transport:launch run-id)) @@ -193,10 +193,22 @@ dotfile (lambda () (read-line))) #f)))) + +(define (server:dotserver-starting) + (with-output-to-file + (conc *toppath* "/.starting-server") + (lambda () + (print (current-process-id) " on " (get-host-name))))) + +(define (server:dotserver-starting-remove) + (delete-file* (conc *toppath* "/.starting-server"))) + + + ;; write a .server file in *toppath* with hostport ;; return #t on success, #f otherwise ;; (define (server:write-dotserver areapath hostport) (let ((lock-file (conc areapath "/.server.lock")) @@ -212,15 +224,15 @@ (debug:print-info 0 *default-log-port* "server file " server-file " for " hostport " created") (common:simple-file-release-lock lock-file) res) #f))) -(define (server:remove-dotserver-file areapath hostport) +(define (server:remove-dotserver-file areapath hostport #!key (force #f)) (let ((dotserver (server:read-dotserver areapath)) (server-file (conc areapath "/.server")) (lock-file (conc areapath "/.server.lock"))) - (if (and dotserver (string-match (conc ".*:" hostport "$") dotserver)) ;; port matches, good enough info to decide to remove the file + (if (or force (and dotserver (string-match (conc ".*:" hostport "$") dotserver))) ;; port matches, good enough info to decide to remove the file (if (common:simple-file-lock lock-file) (begin (handle-exceptions exn #f