Index: client.scm ================================================================== --- client.scm +++ client.scm @@ -48,15 +48,61 @@ ((rpc) (rpc:client-connect iface port)) ((http) (http:client-connect iface port)) ((zmq) (zmq:client-connect iface port)) (else (rpc:client-connect iface port)))) -(define (client:setup run-id #!key (remaining-tries 10) (failed-connects 0)) - (case (server:get-transport) - ((rpc) (rpc-transport:client-setup run-id)) ;;(client:setup-rpc run-id)) - ((http)(client:setup-http run-id)) - (else (rpc-transport:client-setup run-id)))) ;; (client:setup-rpc run-id)))) +(define (client:setup run-id #!key (remaining-tries 10)) + (debug:print-info 2 *default-log-port* "client:setup remaining-tries=" remaining-tries) + (let* ((server-dat (tasks:bb-get-server-info run-id)) + (transport (if server-dat (tasks:hostinfo-get-transport server-dat) 'noserver))) + (case transport + ((noserver) ;; no server registered + (if (<= remaining-tries 0) + (begin + (debug:print-error 0 *default-log-port* "failed to start or connect to server for run-id " run-id) + (exit 1)) + (begin + (let ((num-available (tasks:bb-num-in-available-state run-id))) + (debug:print-info 0 *default-log-port* "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available) + (if (< num-available 2) + (server:try-running run-id)) + (thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. + (client:setup run-id remaining-tries: (- remaining-tries 1)))))) + ((http)(client:setup-http server-dat run-id remaining-tries)) + ((rpc) (rpc-transport:client-setup run-id)) ;;(client:setup-rpc run-id)) + (else + (debug:print-error 0 *default-log-port* "Unknown transport [" + transport "] specified used by server for run-id " run-id) + (exit 1))))) + + +(define (client:setup-http run-id server-dat remaining-tries) + (let* ((iface (tasks:hostinfo-get-interface server-dat)) + (hostname (tasks:hostinfo-get-hostname server-dat)) + (port (tasks:hostinfo-get-port server-dat)) + (start-res (http-transport:client-connect iface port)) + (ping-res (rmt:login-no-auto-client-setup start-res run-id))) + (if (and start-res ping-res) + (begin + (hash-table-set! *runremote* run-id start-res) + (debug:print-info 2 *default-log-port* "connected to " (http-transport:server-dat-make-url start-res)) + start-res) + (begin ;; login failed but have a server record, clean out the record and try again + (debug:print-info 0 *default-log-port* "client:setup, login failed, will attempt to start server ... start-res=" start-res ", run-id=" run-id ", server-dat=" server-dat) + (http-transport:close-connections run-id) + (hash-table-delete! *runremote* run-id) + (tasks:kill-server-run-id run-id) + (tasks:bb-server-force-clean-run-record run-id iface port + " client:setup (server-dat = #t)") + (if (> remaining-tries 8) + (thread-sleep! (+ 1 (random 5))) ;; spread out the starts a little + (thread-sleep! (+ 15 (random 20)))) ;; it isn't going well. give it plenty of time + (server:try-running run-id) + (thread-sleep! 5) ;; give server a little time to start up + (client:setup run-id remaining-tries: (- remaining-tries 1)) + )))) + ;; (define (client:login-no-auto-setup server-info run-id) ;; (case (server:get-transport) ;; ((rpc) (rpc:login-no-auto-client-setup server-info run-id)) ;; ((http) (rmt:login-no-auto-client-setup server-info run-id)) @@ -95,11 +141,11 @@ ;; (begin ;; (debug:print 25 *default-log-port* "INFO: client:setup failed to connect, start-res=" start-res ", run-id=" run-id ", host-info=" host-info) ;; (thread-sleep! 5) ;; (client:setup run-id remaining-tries: (- remaining-tries 1)))))) ;; ;; YUK: rename server-dat here -;; (let* ((server-dat (open-run-close tasks:get-server tasks:open-db run-id))) +;; (let* ((server-dat (open-run-close tasks:get-server-info tasks:open-db run-id))) ;; (debug:print-info 0 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) ;; (if server-dat ;; (let* ((iface (tasks:hostinfo-get-interface server-dat)) ;; (port (tasks:hostinfo-get-port server-dat)) ;; (start-res (http-transport:client-connect iface port)) @@ -152,66 +198,11 @@ ;; ;; client:setup ;; ;; lookup_server, need to remove *runremote* stuff ;; -(define (client:setup-http run-id #!key (remaining-tries 10) (failed-connects 0)) - (debug:print-info 2 *default-log-port* "client:setup remaining-tries=" remaining-tries) - (let* ((tdbdat (tasks:open-db))) - (if (<= remaining-tries 0) - (begin - (debug:print-error 0 *default-log-port* "failed to start or connect to server for run-id " run-id) - (exit 1)) - (let* ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id))) - (debug:print-info 4 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) - (if server-dat - (let* ((iface (tasks:hostinfo-get-interface server-dat)) - (hostname (tasks:hostinfo-get-hostname server-dat)) - (port (tasks:hostinfo-get-port server-dat)) - (start-res (case *transport-type* - ((http)(http-transport:client-connect iface port)) - ;;((nmsg)(nmsg-transport:client-connect hostname port)) - )) - (ping-res (case *transport-type* - ((http)(rmt:login-no-auto-client-setup start-res run-id)) - ;; ((nmsg)(let ((logininfo (rmt:login-no-auto-client-setup start-res run-id))) - ;; (if logininfo - ;; (car (vector-ref logininfo 1)) - ;; #f))) - - ))) - (if (and start-res - ping-res) - (begin - (hash-table-set! *runremote* run-id start-res) - (debug:print-info 2 *default-log-port* "connected to " (http-transport:server-dat-make-url start-res)) - start-res) - (begin ;; login failed but have a server record, clean out the record and try again - (debug:print-info 0 *default-log-port* "client:setup, login failed, will attempt to start server ... start-res=" start-res ", run-id=" run-id ", server-dat=" server-dat) - (case *transport-type* - ((http)(http-transport:close-connections run-id))) - (hash-table-delete! *runremote* run-id) - (tasks:kill-server-run-id run-id) - (tasks:server-force-clean-run-record (db:delay-if-busy tdbdat) - run-id - (tasks:hostinfo-get-interface server-dat) - (tasks:hostinfo-get-port server-dat) - " client:setup (server-dat = #t)") - (if (> remaining-tries 8) - (thread-sleep! (+ 1 (random 5))) ;; spread out the starts a little - (thread-sleep! (+ 15 (random 20)))) ;; it isn't going well. give it plenty of time - (server:try-running run-id) - (thread-sleep! 5) ;; give server a little time to start up - (client:setup run-id remaining-tries: (- remaining-tries 1)) - ))) - (begin ;; no server registered - (let ((num-available (tasks:num-in-available-state (db:dbdat-get-db tdbdat) run-id))) - (debug:print-info 0 *default-log-port* "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available) - (if (< num-available 2) - (server:try-running run-id)) - (thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. - (client:setup run-id remaining-tries: (- remaining-tries 1))))))))) + ;; keep this as a function to ease future (define (client:start run-id server-info) (http-transport:client-connect (tasks:hostinfo-get-interface server-info) (tasks:hostinfo-get-port server-info))) Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -81,11 +81,11 @@ (let ((expire-time (- (current-seconds) (server:get-timeout) 10))) ;; don't forget the 10 second margin (for-each (lambda (run-id) (let ((connection (hash-table-ref/default *runremote* run-id #f))) (if (and (vector? connection) - (< (http-transport:server-dat-get-last-access connection) expire-time)) + (< (http-transport:server-dat-get-last-access connection) expire-time)) ;; BB> BBTODO: make this generic, not http transport specific. (begin (debug:print-info 0 *default-log-port* "Discarding connection to server for run-id " run-id ", too long between accesses") ;; bb- disabling nanomsg ;; SHOULD CLOSE THE CONNECTION HERE ;; (case *transport-type* @@ -106,45 +106,47 @@ ((commfail)(vector #f "communications fail")) ((exn)(vector #f "other fail")))) ;; ((nmsg)(condition-case ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd params) ;; ((timeout)(vector #f "timeout talking to server")))) + ((rpc) (rpc-transport:client-api-send-receive run-id connection-info cmd params)) (else (exit)))) (success (if (vector? dat) (vector-ref dat 0) #f)) (res (if (vector? dat) (vector-ref dat 1) #f))) (if (vector? connection-info)(http-transport:server-dat-update-last-access connection-info)) (if success (begin ;; (mutex-unlock! *send-receive-mutex*) (case *transport-type* - ((http) res) ;; (db:string->obj res)) + ((http rpc) res) ;; (db:string->obj res)) ;; ((nmsg) res) )) ;; (vector-ref res 1))) (begin ;; let ((new-connection-info (client:setup run-id))) (debug:print 0 *default-log-port* "WARNING: Communication failed, trying call to rmt:send-receive again.") - ;; (case *transport-type* - ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info)))) - (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection - ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. - ;; (if (eq? (modulo attemptnum 5) 0) - ;; (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) - ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications - (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) - ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) - - ;; no longer killing the server in http-transport:client-api-send-receive - ;; may kill it here but what are the criteria? - ;; start with three calls then kill server - ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) - ;; (thread-sleep! 2) - (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1))))) + (case *transport-type* + ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info)))) + ((http) + (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection + ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. + ;; (if (eq? (modulo attemptnum 5) 0) + ;; (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) + ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications + (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) + ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) + + ;; no longer killing the server in http-transport:client-api-send-receive + ;; may kill it here but what are the criteria? + ;; start with three calls then kill server + ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) + ;; (thread-sleep! 2) + (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1))))))) ;; no connection info? try to start a server, or access locally if no ;; server and the query is read-only ;; ;; Note: The tasks db was checked for a server in starting mode in the rmt:get-connection-info call ;; - (if (and (< attemptnum 15) + (if (and (< attemptnum 15) (member cmd api:write-queries)) (let ((faststart (configf:lookup *configdat* "server" "faststart"))) (hash-table-delete! *runremote* run-id) ;; (mutex-unlock! *send-receive-mutex*) (if (and faststart (equal? faststart "no")) Index: rpc-transport.scm ================================================================== --- rpc-transport.scm +++ rpc-transport.scm @@ -192,11 +192,11 @@ server-dat) (begin (server:try-running run-id) (thread-sleep! 2) (rpc-transport:client-setup run-id (- remtries 1))))) - (let* ((server-db-info (open-run-close tasks:get-server tasks:open-db run-id))) + (let* ((server-db-info (open-run-close tasks:get-server-info tasks:open-db run-id))) (debug:print-info 0 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) (if server-db-info (let* ((iface (tasks:hostinfo-get-interface server-db-info)) (port (tasks:hostinfo-get-port server-db-info)) (server-dat (list iface port #f #f #f)) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -72,11 +72,11 @@ "http")))) (set! *transport-type* ttype) ttype)) ;; Get the transport -(define (server:get-transport) +(define (server:get-transport #!key (run-id #f)) ;; BB> BBTODO Shouldn't this be run-id sensitive and not a global?? (added run-id key to get this is we are supplied a run-id (added this in client:setup) (if *transport-type* *transport-type* (server:set-transport))) ;; Generate a unique signature for this server @@ -185,11 +185,11 @@ (server:run run-id) (rmt:start-server run-id))) (define (server:check-if-running run-id) (let ((tdbdat (tasks:open-db))) - (let loop ((server (tasks:get-server (db:delay-if-busy tdbdat) run-id)) + (let loop ((server (tasks:get-server-info (db:delay-if-busy tdbdat) run-id)) (trycount 0)) (if server ;; note: client:start will set *runremote*. this needs to be changed ;; also, client:start will login to the server, also need to change that. ;; @@ -220,11 +220,11 @@ (let* ((host-port (let ((slst (string-split host:port ":"))) (if (eq? (length slst) 2) (list (car slst)(string->number (cadr slst))) #f))) (toppath (launch:setup)) - (server-db-dat (if (not host-port)(tasks:get-server (db:delay-if-busy tdbdat) run-id) #f))) + (server-db-dat (if (not host-port)(tasks:get-server-info (db:delay-if-busy tdbdat) run-id) #f))) (if (not run-id) (begin (debug:print-error 0 *default-log-port* "must specify run-id when doing ping, -run-id n") (print "ERROR: No run-id") (exit 1)) Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -327,27 +327,42 @@ mdb (conc "SELECT " selstr " FROM servers WHERE run_id=? AND state in ('available','running','dbprep') ORDER BY start_time DESC;") run-id) (vector header res))) -(define (tasks:get-server mdb run-id #!key (retries 10)) + +;; BB> bb opinion - want to push responsibility into api (encapsulation), like waiting if db is busy and finding the db handle in the first place. why should the caller need to be concerned?? If my opinion carries, we'll remove the bb- and make other needful adjustments. +(define (bb-mdb-inserter mdb-expecting-proc mdbless-args) + (let ((mdb (db:delay-if-busy (tasks:open-db)))) + (apply mdb-expecting-proc (cons mdb args)))) + +(define (tasks:bb-get-server-info . args) + (bb-mdb-inserter tasks:get-server-info args)) + +(define (tasks:bb-num-in-available-state . args) + (bb-mdb-inserter tasks:num-in-available-state args)) + + + +;; BB: renaming tasks:get-server to get-server-info to make clear we aren't creating servers here +(define (tasks:get-server-info mdb run-id #!key (retries 10)) (let ((res #f) (best #f)) (handle-exceptions exn (begin (print-call-chain (current-error-port)) - (debug:print 0 *default-log-port* "WARNING: tasks:get-server db access error.") + (debug:print 0 *default-log-port* "WARNING: tasks:get-server-info db access error.") (debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn)) (debug:print 0 *default-log-port* " for run " run-id) (print-call-chain (current-error-port)) (if (> retries 0) (begin - (debug:print 0 *default-log-port* " trying call to tasks:get-server again in 10 seconds") + (debug:print 0 *default-log-port* " trying call to tasks:get-server-info again in 10 seconds") (thread-sleep! 10) - (tasks:get-server mdb run-id retries: (- retries 0))) - (debug:print 0 *default-log-port* "10 tries of tasks:get-server all crashed and burned. Giving up and returning \"no server found\""))) + (tasks:get-server-info mdb run-id retries: (- retries 0))) + (debug:print 0 *default-log-port* "10 tries of tasks:get-server-info all crashed and burned. Giving up and returning \"no server found\""))) (sqlite3:for-each-row (lambda (id interface port pubport transport pid hostname) (set! res (vector id interface port pubport transport pid hostname))) mdb ;; removed: @@ -394,21 +409,21 @@ ;; try to start a server and wait for it to be available ;; (define (tasks:start-and-wait-for-server tdbdat run-id delay-max-tries) ;; ensure a server is running for this run - (let loop ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id)) + (let loop ((server-dat (tasks:get-server-info (db:delay-if-busy tdbdat) run-id)) (delay-time 0)) (if (and (not server-dat) (< delay-time delay-max-tries)) (begin (if (common:low-noise-print 60 "tasks:start-and-wait-for-server" run-id) (debug:print 0 *default-log-port* "Try starting server for run-id " run-id)) (thread-sleep! (/ (random 2000) 1000)) (server:kind-run run-id) (thread-sleep! (min delay-time 1)) - (loop (tasks:get-server (db:delay-if-busy tdbdat) run-id)(+ delay-time 1)))))) + (loop (tasks:get-server-info (db:delay-if-busy tdbdat) run-id)(+ delay-time 1)))))) (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id) @@ -443,18 +458,18 @@ ;; look up a server by run-id and send it a kill, also delete the record for that server ;; (define (tasks:kill-server-run-id run-id #!key (tag "default")) (let* ((tdbdat (tasks:open-db)) - (sdat (tasks:get-server (db:delay-if-busy tdbdat) run-id))) + (sdat (tasks:get-server-info (db:delay-if-busy tdbdat) run-id))) (if sdat (let ((hostname (vector-ref sdat 6)) (pid (vector-ref sdat 5)) (server-id (vector-ref sdat 0))) (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "killed") (debug:print-info 0 *default-log-port* "Killing server " server-id " for run-id " run-id " on host " hostname " with pid " pid) - (tasks:kill-server hostname pid) + (tasks:kill-server hostname pid kill-switch: "-9") ;; BB: added -9, let's not be kind here. we need it to die (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id tag) ) (debug:print-info 0 *default-log-port* "No server found for run-id " run-id ", nothing to kill")) ;; (sqlite3:finalize! tdb) ))