Index: client.scm ================================================================== --- client.scm +++ client.scm @@ -51,11 +51,11 @@ ;; ((http) (http:client-connect iface port)) ;; ((zmq) (zmq:client-connect iface port)) ;; (else (rpc:client-connect iface port)))) (define (client:setup run-id #!key (remaining-tries 10)) - (BB> "Entered client:setup with run-id="run-id" and remaining-tries="remaining-tries) + ;;(BB> "Entered client:setup with run-id="run-id" and remaining-tries="remaining-tries) (debug:print-info 2 *default-log-port* "client:setup remaining-tries=" remaining-tries) (let* ((server-dat (tasks:bb-get-server-info run-id)) (transport (if (and server-dat (vector? server-dat)) (string->symbol (tasks:hostinfo-get-transport server-dat)) 'noserver))) Index: common_records.scm ================================================================== --- common_records.scm +++ common_records.scm @@ -117,10 +117,15 @@ (if (equal? this-func "BB>") (set! location this-loc)))) stack) (let ((dp-args (append (list 0 *default-log-port* location" " ) in-args))) (apply debug:print dp-args)))) + +(define (BB> . in-args) + (apply print "BB> " in-args) + "shouldn't do anything") + (define (debug:print-error n e . params) ;; normal print (if (debug:debug-mode n) (with-output-to-port (or e (current-error-port)) Index: http-transport.scm ================================================================== --- http-transport.scm +++ http-transport.scm @@ -339,10 +339,11 @@ ":" (http-transport:server-dat-get-port vec)) #f)) (define (http-transport:server-dat-update-last-access vec) + (BB> "entered http-transport:server-dat-update-last-access vec="vec) (if (vector? vec) (vector-set! vec 5 (current-seconds)) (begin (print-call-chain (current-error-port)) (debug:print-error 0 *default-log-port* "call to http-transport:server-dat-update-last-access with non-vector!!")))) Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -113,12 +113,16 @@ (define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id ;; RA => e.g. usage (rmt:send-receive 'get-var #f (list varname)) ;; + +(define *rmt:srmutex* (make-mutex)) + (define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected ;; side-effect: clean out old connections + (mutex-lock! *rmt:srmutex*) (let ((expire-time (- (current-seconds) (server:get-timeout) 10))) ;; don't forget the 10 second margin (for-each (lambda (run-id) (let ((connection (rmt:get-cinfo run-id))) (if (and (vector? connection) @@ -136,28 +140,31 @@ (let* ((transport-type (rmt:run-id->transport-type run-id)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Here, we make request to remote server ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - (dat (case transport-type ;; BB: replaced *transport-type* global with run-id specific transport-type - ((http)(condition-case - (http-transport:client-api-send-receive run-id connection-info cmd params) - ((commfail)(vector #f "communications fail")) - ((exn)(vector #f "other fail")))) - ((rpc) (rpc-transport:client-api-send-receive run-id connection-info cmd params)) ;; BB: let us error out for now - (else - (debug:print-error 0 *default-log-port* "(1) Transport [" transport-type - "] specified for run-id [" run-id - "] is not implemented in rmt:send-receive. Cannot proceed." (symbol? transport-type)) - (vector #f (conc "transport ["transport-type"] unimplemented"))))) + (dat (begin + + (case transport-type ;; BB: replaced *transport-type* global with run-id specific transport-type + ((http)(condition-case + (http-transport:client-api-send-receive run-id connection-info cmd params) + ((commfail)(vector #f "communications fail")) + ((exn)(vector #f "other fail")))) + ((rpc) (rpc-transport:client-api-send-receive run-id connection-info cmd params)) ;; BB: let us error out for now + (else + (debug:print-error 0 *default-log-port* "(1) Transport [" transport-type + "] specified for run-id [" run-id + "] is not implemented in rmt:send-receive. Cannot proceed." (symbol? transport-type)) + (vector #f (conc "transport ["transport-type"] unimplemented")))))) (success (if (vector? dat) (vector-ref dat 0) #f)) (res (if (vector? dat) (vector-ref dat 1) #f))) (if (vector? connection-info)(http-transport:server-dat-update-last-access connection-info)) ;; BB> BBTODO: make this generic, not http transport specific. (if success (begin + (mutex-unlock! *rmt:srmutex*) ;; (mutex-unlock! *send-receive-mutex*) (case transport-type ((http rpc) res) ;; (db:string->obj res)) (else (debug:print-error 0 *default-log-port* "(2) Transport [" transport-type @@ -167,10 +174,11 @@ )) ;; (vector-ref res 1))) ;; no success... (begin ;; let ((new-connection-info (client:setup run-id))) (debug:print 0 *default-log-port* "WARNING: Communication failed, trying call to rmt:send-receive again.") + (mutex-unlock! *rmt:srmutex*) (case transport-type ((http rpc) (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. @@ -188,17 +196,18 @@ (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1))) (else (debug:print-error 0 *default-log-port* "(3) Transport [" transport-type "] specified for run-id [" run-id "] is not implemented in rmt:send-receive. Cannot proceed.") - #f))))) + (exit 1)))))) ;; no connection info; try to start a server ;; ;; Note: The tasks db was checked for a server in starting mode in the rmt:get-connection-info call ;; (begin + (mutex-unlock! *rmt:srmutex*) (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10) (thread-sleep! (random 5)) ;; give some time to settle and minimize collison? (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1)))))) Index: rpc-transport.scm ================================================================== --- rpc-transport.scm +++ rpc-transport.scm @@ -255,10 +255,11 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; this client-side procedure makes rpc call to server and returns result ;; (define (rpc-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 3)) + (BB> "entered nsport:client-api-send-receive with run-id="run-id " serverdat="serverdat" cmd="cmd" params="params" numretries="numretries) (if (not (vector? serverdat)) (begin (BB> "WHAT?? for run-id="run-id", serverdat="serverdat) (print-call-chain) (exit 1))) @@ -273,18 +274,19 @@ (condition-case ;;(vector #t (run-remote cmd params)) (vector 'success (api-exec cmd params)) [x (exn i/o net) (vector 'comms-fail (conc "communications fail ["(->string x)"]") x)] [x () (vector 'other-fail "other fail ["(->string x)"]" x)])) - chatty: #f + chatty: #t accept-result?: (lambda(x) (and (vector? x) (vector-ref x 0))) retries: 4 back-off-factor: 1.5 random-wait: 0.2 retry-delay: 0.1 final-failure-returns-actual: #t)) + (BB> "HEY res="res) res )) (th1 (make-thread send-receive "send-receive")) (time-out-reached #f) (time-out (lambda () @@ -296,10 +298,11 @@ (th2 (make-thread time-out "time out"))) (thread-start! th1) (thread-start! th2) (thread-join! th1) (thread-terminate! th2) + (BB> "alt got res="res) (debug:print-info 11 *default-log-port* "got res=" res) (if (vector? res) (case (vector-ref res 0) ((success) (vector #t (vector-ref res 1))) ((comms-fail) @@ -585,7 +588,6 @@ (if (> remtries 2) (thread-sleep! (+ 1 (random 5))) ;; spread out the starts a little (thread-sleep! (+ 15 (random 20)))) ;; it isn't going well. give it plenty of time (server:try-running run-id) (thread-sleep! 5) ;; give server a little time to start up - (client:setup run-id remaining-tries: (sub1 remtries)) - " rpc-transport:client-setup (server-dat = #t)")))) + (client:setup run-id remaining-tries: (sub1 remtries))))))