Index: client.scm ================================================================== --- client.scm +++ client.scm @@ -177,11 +177,11 @@ (car (vector-ref logininfo 1)) #f)))))) (if (and start-res ping-res) (begin - (hash-table-set! *runremote* run-id start-res) + ;; (hash-table-set! *runremote* run-id start-res) (debug:print-info 2 "connected to " (http-transport:server-dat-make-url start-res)) start-res) (begin ;; login failed but have a server record, clean out the record and try again (debug:print-info 0 "client:setup, login failed, will attempt to start server ... start-res=" start-res ", run-id=" run-id ", server-dat=" server-dat) (case *transport-type* @@ -199,16 +199,17 @@ (server:try-running run-id) (thread-sleep! 5) ;; give server a little time to start up (client:setup run-id remaining-tries: (- remaining-tries 1)) ))) (begin ;; no server registered - (let ((num-available (tasks:num-in-available-state (db:dbdat-get-db tdbdat) run-id))) - (debug:print-info 0 "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available) - (if (< num-available 2) - (server:try-running run-id)) - (thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. - (client:setup run-id remaining-tries: (- remaining-tries 1))))))))) + ;; (let ((num-available (tasks:num-in-available-state (db:dbdat-get-db tdbdat) run-id))) + ;; (debug:print-info 0 "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available) + ;; (if (< num-available 2) + ;; (server:try-running run-id)) + (tasks:start-and-wait-for-server tdbdat run-id delay-max-tries) + ;; (thread-sleep! (+ 2 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. + (client:setup run-id remaining-tries: (- remaining-tries 1)))))))) ;; (let ((host-info (hash-table-ref/default *runremote* run-id #f))) ;; (if host-info ;; this is a bit circular. the host-info *is* the start-res FIXME ;; (let* ((iface (http-transport:server-dat-get-iface host-info)) ;; (port (http-transport:server-dat-get-port host-info)) Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -63,19 +63,21 @@ (begin (debug:print-info 1 "db write rate too high, starting a server, count=" count " start=" start " run-id=" run-id " queries-per-second=" queries-per-second) #t) #f)))) -(define (rmt:get-connection-info run-id) - (let ((cinfo (hash-table-ref/default *runremote* run-id #f))) - (if cinfo - cinfo - ;; NB// can cache the answer for server running for 10 seconds ... - ;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id)) - (if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id) - (client:setup run-id) - #f)))) +;; (define (rmt:get-connection-info run-id) +;; (let ((cinfo (hash-table-ref/default *runremote* run-id #f))) +;; (if cinfo +;; cinfo +;; ;; NB// can cache the answer for server running for 10 seconds ... +;; ;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id)) +;; ;; (if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id) +;; ;; (begin +;; ;; (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id) +;; (client:setup run-id)))) +;; ;; #f)))) (define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id (define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected ;; clean out old connections (mutex-lock! *db-multi-sync-mutex*) @@ -92,21 +94,26 @@ ((nmsg)(nn-close (http-transport:server-dat-get-socket (hash-table-ref *runremote* run-id))))) (hash-table-delete! *runremote* run-id))))) (hash-table-keys *runremote*))) (mutex-unlock! *db-multi-sync-mutex*) + ;; (mutex-lock! *send-receive-mutex*) (let* ((run-id (if rid rid 0)) - (connection-info (rmt:get-connection-info run-id))) + (connection-info (hash-table-ref/default *runremote* run-id #f))) ;; (rmt:get-connection-info run-id))) ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) (if connection-info ;; use the server if have connection info (let* ((dat (case *transport-type* ((http)(condition-case (http-transport:client-api-send-receive run-id connection-info cmd params) - ((commfail)(vector #f "communications fail")) - ((exn)(vector #f "other fail")))) + ((commfail) + (tasks:kill-server-run-id run-id) + (vector #f "communications fail")) + ((exn) + (tasks:kill-server-run-id run-id) + (vector #f "other fail")))) ((nmsg)(condition-case (nmsg-transport:client-api-send-receive run-id connection-info cmd params) ((timeout)(vector #f "timeout talking to server")))) (else (exit)))) (success (if (vector? dat) (vector-ref dat 0) #f)) @@ -113,45 +120,31 @@ (res (if (vector? dat) (vector-ref dat 1) #f))) (if (vector? connection-info)(http-transport:server-dat-update-last-access connection-info)) (if success (begin ;; (mutex-unlock! *send-receive-mutex*) + ;; all is well, return the result! (case *transport-type* ((http) res) ;; (db:string->obj res)) ((nmsg) res))) ;; (vector-ref res 1))) + ;; we had a connection but it is borked. clean up and reconnect (begin ;; let ((new-connection-info (client:setup run-id))) (debug:print 0 "WARNING: Communication failed, trying call to rmt:send-receive again.") ;; (case *transport-type* ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info)))) (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection - ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. - ;; (if (eq? (modulo attemptnum 5) 0) - ;; (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) - ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications - (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) - ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) - - ;; no longer killing the server in http-transport:client-api-send-receive - ;; may kill it here but what are the criteria? - ;; start with three calls then kill server - ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) - ;; (thread-sleep! 2) (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1))))) ;; no connection info? try to start a server (if (and (< attemptnum 15) (member cmd api:write-queries)) (begin (hash-table-delete! *runremote* run-id) ;; (mutex-unlock! *send-receive-mutex*) (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10) - ;; (client:setup run-id) ;; client setup happens in rmt:get-connection-info - (thread-sleep! (random 5)) ;; give some time to settle and minimize collison? + (hash-table-set! *runremote* run-id (client:setup run-id)) (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))) (begin - ;; (debug:print 0 "ERROR: Communication failed!") - ;; (mutex-unlock! *send-receive-mutex*) - ;; (exit) (rmt:open-qry-close-locally cmd run-id params) ))))) (define (rmt:update-db-stats run-id rawcmd params duration) (mutex-lock! *db-stats-mutex*) Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -380,21 +380,21 @@ ;; try to start a server and wait for it to be available ;; (define (tasks:start-and-wait-for-server tdbdat run-id delay-max-tries) ;; ensure a server is running for this run - (let loop ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id)) + (let loop ((server-running (tasks:server-running? (db:delay-if-busy tdbdat) run-id)) (delay-time 0)) - (if (and (not server-dat) + (if (and (not server-running) (< delay-time delay-max-tries)) (begin (if (common:low-noise-print 60 "tasks:start-and-wait-for-server" run-id) (debug:print 0 "Try starting server for run-id " run-id)) (thread-sleep! (/ (random 2000) 1000)) (server:kind-run run-id) (thread-sleep! (min delay-time 1)) - (loop (tasks:get-server (db:delay-if-busy tdbdat) run-id)(+ delay-time 1)))))) + (loop (tasks:server-running? (db:delay-if-busy tdbdat) run-id)(+ delay-time 1)))))) (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id)