Index: client.scm ================================================================== --- client.scm +++ client.scm @@ -51,16 +51,19 @@ ;; ((http) (http:client-connect iface port)) ;; ((zmq) (zmq:client-connect iface port)) ;; (else (rpc:client-connect iface port)))) (define (client:setup run-id #!key (remaining-tries 10)) + (BB> "Entered client:setup with run-id="run-id" and remaining-tries="remaining-tries) + (debug:print-info 2 *default-log-port* "client:setup remaining-tries=" remaining-tries) (let* ((server-dat (tasks:bb-get-server-info run-id)) - (transport (if server-dat (string->symbol (tasks:hostinfo-get-transport server-dat)) 'noserver))) - + (transport (if (and server-dat (vector? server-dat)) (string->symbol (tasks:hostinfo-get-transport server-dat)) 'noserver))) + (case transport ((noserver) ;; no server registered + (BB> "noserver") (if (<= remaining-tries 0) (begin (debug:print-error 0 *default-log-port* "failed to start or connect to server for run-id " run-id) (exit 1)) (begin @@ -68,12 +71,12 @@ (debug:print-info 0 *default-log-port* "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available) (if (< num-available 2) (server:try-running run-id)) (thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. (client:setup run-id remaining-tries: (- remaining-tries 1)))))) - ((http)(client:setup-http run-id server-dat remaining-tries)) - ((rpc) (rpc-transport:client-setup run-id server-dat remtries: remaining-tries)) + ((http) (BB> "have http") (client:setup-http run-id server-dat remaining-tries)) + ((rpc) (BB> "have rpc") (rpc-transport:client-setup run-id server-dat remtries: remaining-tries)) (else (debug:print-error 0 *default-log-port* "(6) Transport [" transport "] specified for run-id [" run-id "] is not implemented in client:setup. Cannot proceed.") (exit 1))))) @@ -90,17 +93,17 @@ (start-res (http-transport:client-connect iface port)) (ping-res (rmt:login-no-auto-client-setup start-res run-id))) (if (and start-res ping-res) (begin - (hash-table-set! *runremote* run-id start-res) ;; side-effect - *runremote* cache init fpr rmt:* + (rmt:set-cinfo run-id start-res) ;; (hash-table-set! *runremote* run-id start-res) ;; side-effect - *runremote* cache init fpr rmt:* (debug:print-info 2 *default-log-port* "connected to " (http-transport:server-dat-make-url start-res)) start-res) (begin ;; login failed but have a server record, clean out the record and try again (debug:print-info 0 *default-log-port* "client:setup-http, login failed, will attempt to start server ... start-res=" start-res ", run-id=" run-id ", server-dat=" server-dat) (http-transport:close-connections run-id) - (hash-table-delete! *runremote* run-id) ;; BB: suspect there is nothing to delete ... + (rmt:del-cinfo run-id) ;; (hash-table-delete! *runremote* run-id) ;; BB: suspect there is nothing to delete ... (tasks:kill-server-run-id run-id) ;; -9 so the hung processes dont eat 100% when not responding to sigterm. (tasks:bb-server-force-clean-run-record run-id iface port " client:setup-http (server-dat = #t)") (if (> remaining-tries 8) (thread-sleep! (+ 1 (random 5))) ;; spread out the starts a little Index: ezsteps.scm ================================================================== --- ezsteps.scm +++ ezsteps.scm @@ -164,11 +164,11 @@ new-state new-status (args:get-arg "-m") #f) ;; need to update the top test record if PASS or FAIL and this is a subtest (if (not (equal? item-path "")) - (cdb:roll-up-pass-fail-counts *runremote* run-id test-name item-path new-status)))) + (cdb:roll-up-pass-fail-counts *runremote* run-id test-name item-path new-status)))) ;; BB> I think this is dead code. cdb:roll-up-pass-fail-counts does not exist anywhere. ;; for automated creation of the rollup html file this is a good place... (if (not (equal? item-path "")) (tests:summarize-items #f run-id test-id test-name #f)) ;; don't force - just update if no ))) (pop-directory) Index: http-transport.scm ================================================================== --- http-transport.scm +++ http-transport.scm @@ -265,11 +265,11 @@ exn (begin (set! success #f) (debug:print 0 *default-log-port* "WARNING: failure in with-input-from-request to " fullurl ".") (debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn)) - (hash-table-delete! *runremote* run-id) + (rmt:del-cinfo run-id) ;;(hash-table-delete! *runremote* run-id) ;; Killing associated server to allow clean retry.") ;; (tasks:kill-server-run-id run-id) ;; better to kill the server in the logic that called this routine? (mutex-unlock! *http-mutex*) ;;; (signal (make-composite-condition ;;; (make-property-condition 'commfail 'message "failed to connect to server"))) @@ -312,11 +312,11 @@ 'message "nmsg-transport:client-api-send-receive-raw timed out talking to server"))))))) ;; careful closing of connections stored in *runremote* ;; (define (http-transport:close-connections run-id) - (let* ((server-dat (hash-table-ref/default *runremote* run-id #f))) + (let* ((server-dat (rmt:get-connection-info run-id))) ;;(hash-table-ref/default *runremote* run-id #f))) (if (vector? server-dat) (let ((api-dat (http-transport:server-dat-get-api-uri server-dat))) (close-connection! api-dat) #t) #f))) Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -60,23 +60,50 @@ #f)))) ;; if a server is either running or in the process of starting call client:setup ;; else return #f to let the calling proc know that there is no server available ;; -(define (rmt:get-connection-info run-id) - (let ((cinfo (hash-table-ref/default *runremote* run-id #f))) + +(define *rrr-mutex* (make-mutex)) +(define (rmt:get-cinfo rid) + (mutex-lock! *rrr-mutex*) + (let* ((run-id (if rid rid 0)) + (cinfo (hash-table-ref/default *runremote* run-id #f))) + (mutex-unlock! *rrr-mutex*) + cinfo)) + +(define (rmt:set-cinfo rid server-dat) + (mutex-lock! *rrr-mutex*) + (let* ((run-id (if rid rid 0)) + (res (hash-table-set! *runremote* run-id server-dat))) + (mutex-unlock! *rrr-mutex*) + res)) + +(define (rmt:del-cinfo rid) + (mutex-lock! *rrr-mutex*) + (let* ((run-id (if rid rid 0)) + (res (hash-table-delete! *runremote* run-id))) + (mutex-unlock! *rrr-mutex*) + res)) + + + + +(define (rmt:get-connection-info-start-server-if-none run-id) + (let ((cinfo (rmt:get-cinfo run-id))) (if cinfo - cinfo + cinfo ;; NB// can cache the answer for server running for 10 seconds ... ;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id)) (if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id) (client:setup run-id) #f)))) -(define (rmt:run-id->transport-type rid) - (let* ((run-id (if rid rid 0)) - (connection-info (hash-table-ref/default *runremote* run-id #f))) + + +(define (rmt:run-id->transport-type run-id) + (let* ((connection-info (rmt:get-cinfo run-id))) ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) (if connection-info ;; if we already have a connection for this run-id, use that precendent ;; use the server if have connection info (let* ((transport-type (vector-ref connection-info 6))) ;; BB: assumes all transport-type'-servertdat vector's item 6 ids transport type @@ -91,20 +118,20 @@ (define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected ;; side-effect: clean out old connections (let ((expire-time (- (current-seconds) (server:get-timeout) 10))) ;; don't forget the 10 second margin (for-each (lambda (run-id) - (let ((connection (hash-table-ref/default *runremote* run-id #f))) + (let ((connection (rmt:get-cinfo run-id))) (if (and (vector? connection) (< (http-transport:server-dat-get-last-access connection) expire-time)) ;; BB> BBTODO: make this generic, not http transport specific. (begin (debug:print-info 0 *default-log-port* "Discarding connection to server for run-id " run-id ", too long between accesses") (hash-table-delete! *runremote* run-id))))) (hash-table-keys *runremote*))) (let* ((run-id (if rid rid 0)) - (connection-info (rmt:get-connection-info run-id))) + (connection-info (rmt:get-connection-info-start-server-if-none run-id))) ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) (if connection-info ;; use the server if have connection info (let* ((transport-type (rmt:run-id->transport-type run-id)) Index: rpc-transport.scm ================================================================== --- rpc-transport.scm +++ rpc-transport.scm @@ -558,26 +558,29 @@ #f)) res)) (define (rpc-transport:client-setup run-id server-dat #!key (remtries 10)) + (BB> "entered rpc-transport:client-setup with run-id="run-id" and server-dat="server-dat" and retries="remtries) (tcp-buffer-size 0) (debug:print-info 0 *default-log-port* "rpc-transport:client-setup run-id="run-id" server-dat=" server-dat ", remaining-tries=" remtries) (let* ((iface (tasks:hostinfo-get-interface server-dat)) (hostname (tasks:hostinfo-get-hostname server-dat)) (port (tasks:hostinfo-get-port server-dat)) (runremote-server-dat (vector iface port #f #f #f (current-seconds) 'rpc)) ;; http version := (vector iface port api-uri api-url api-req (current-seconds) 'http ) (ping-res (retry-thunk (lambda () ;; make 3 attempts to ping. ((rpc:procedure 'server:login iface port) *toppath*)) + chatty: #t retries: 3))) ;; we got here from rmt:get-connection-info on the condition that *runremote* has no entry for run-id... (if ping-res (begin (debug:print-info 0 *default-log-port* "rpc-transport:client-setup CONNECTION ESTABLISHED run-id="run-id" server-dat=" server-dat) - (hash-table-set! *runremote* run-id runremote-server-dat) ;; side-effect - *runremote* cache init fpr rmt:* + (rmt:set-cinfo run-id runremote-server-dat) ;; (hash-table-set! *runremote* run-id runremote-server-dat) ;; side-effect - *runremote* cache init fpr rmt:* runremote-server-dat) (begin ;; login failed but have a server record, clean out the record and try again + (debug:print-info 0 *default-log-port* "rpc-transport:client-setup UNABLE TO CONNECT run-id="run-id" server-dat=" server-dat) (tasks:kill-server-run-id run-id) (tasks:bb-server-force-clean-run-record run-id iface port " rpc-transport:client-setup (server-dat = #t)") (if (> remtries 2) (thread-sleep! (+ 1 (random 5))) ;; spread out the starts a little Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -96,14 +96,10 @@ ;; (send-message pubsock target send-more: #t) ;; (send-message pubsock (case (server:get-transport) ((rpc) (db:obj->string (vector success/fail query-sig result))) ((http) (db:obj->string (vector success/fail query-sig result))) - ((zmq) - (let ((pub-socket (vector-ref *runremote* 1))) - (send-message pub-socket return-addr send-more: #t) - (send-message pub-socket (db:obj->string (vector success/fail query-sig result))))) ((fs) result) (else (debug:print-error 0 *default-log-port* "unrecognised transport type: " *transport-type*) result))) Index: tests/fullrun/megatest.config ================================================================== --- tests/fullrun/megatest.config +++ tests/fullrun/megatest.config @@ -151,11 +151,11 @@ # force use of server always # required yes # Use http instead of direct filesystem access -transport http +# transport http # transport fs # transport nmsg synchronous 0