Index: client.scm ================================================================== --- client.scm +++ client.scm @@ -50,14 +50,15 @@ ;; ((rpc) (rpc:client-connect iface port)) ;; ((http) (http:client-connect iface port)) ;; ((zmq) (zmq:client-connect iface port)) ;; (else (rpc:client-connect iface port)))) -(define (client:setup run-id #!key (remaining-tries 10)) +(define (client:setup run-id #!key (remaining-tries 10)) (debug:print-info 2 *default-log-port* "client:setup remaining-tries=" remaining-tries) (let* ((server-dat (tasks:bb-get-server-info run-id)) - (transport (if server-dat (tasks:hostinfo-get-transport server-dat) 'noserver))) + (transport (if server-dat (string->symbol (tasks:hostinfo-get-transport server-dat)) 'noserver))) + ;;(BB> "transport >"transport"< string? transport >"(string? transport)"< symbol? transport >"(symbol? transport)"<") (case transport ((noserver) ;; no server registered (if (<= remaining-tries 0) (begin (debug:print-error 0 *default-log-port* "failed to start or connect to server for run-id " run-id) @@ -67,27 +68,32 @@ (debug:print-info 0 *default-log-port* "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available) (if (< num-available 2) (server:try-running run-id)) (thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. (client:setup run-id remaining-tries: (- remaining-tries 1)))))) - ((http)(client:setup-http server-dat run-id remaining-tries)) + ((http)(client:setup-http run-id server-dat remaining-tries)) ;; ((rpc) (rpc-transport:client-setup run-id)) ;;(client:setup-rpc run-id)) rpc not implemented; want to see a failure here for now. (else - (debug:print-error 0 *default-log-port* "Unknown transport [" - transport "] specified used by server for run-id " run-id) + (debug:print-error 0 *default-log-port* "Transport [" + transport "] specified for run-id [" run-id "] is not implemented in client:setup. Cannot proceed.") (exit 1))))) - +;; client:setup-http +;; +;; For http transport, robustly ensure an advertised-running server is actually working and responding, and +;; establish tcp connection to server. For servers marked running but not responding, kill them and clear from mdb +;; (define (client:setup-http run-id server-dat remaining-tries) (let* ((iface (tasks:hostinfo-get-interface server-dat)) (hostname (tasks:hostinfo-get-hostname server-dat)) (port (tasks:hostinfo-get-port server-dat)) + (start-res (http-transport:client-connect iface port)) (ping-res (rmt:login-no-auto-client-setup start-res run-id))) (if (and start-res ping-res) (begin - (hash-table-set! *runremote* run-id start-res) + (hash-table-set! *runremote* run-id start-res) ;; side-effect - *runremote* cache init fpr rmt:* (debug:print-info 2 *default-log-port* "connected to " (http-transport:server-dat-make-url start-res)) start-res) (begin ;; login failed but have a server record, clean out the record and try again (debug:print-info 0 *default-log-port* "client:setup, login failed, will attempt to start server ... start-res=" start-res ", run-id=" run-id ", server-dat=" server-dat) (http-transport:close-connections run-id) Index: http-transport.scm ================================================================== --- http-transport.scm +++ http-transport.scm @@ -323,11 +323,11 @@ (define (http-transport:server-dat-get-port vec) (vector-ref vec 1)) (define (http-transport:server-dat-get-api-uri vec) (vector-ref vec 2)) (define (http-transport:server-dat-get-api-url vec) (vector-ref vec 3)) (define (http-transport:server-dat-get-api-req vec) (vector-ref vec 4)) (define (http-transport:server-dat-get-last-access vec) (vector-ref vec 5)) -(define (http-transport:server-dat-get-socket vec) (vector-ref vec 6)) +(define (http-transport:server-dat-get-transport vec) (vector-ref vec 6)) (define (http-transport:server-dat-make-url vec) (if (and (http-transport:server-dat-get-iface vec) (http-transport:server-dat-get-port vec)) (conc "http://" @@ -348,11 +348,11 @@ ;; (define (http-transport:client-connect iface port) (let* ((api-url (conc "http://" iface ":" port "/api")) (api-uri (uri-reference (conc "http://" iface ":" port "/api"))) (api-req (make-request method: 'POST uri: api-uri)) - (server-dat (vector iface port api-uri api-url api-req (current-seconds)))) + (server-dat (vector iface port api-uri api-url api-req (current-seconds) 'http))) server-dat)) ;; run http-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -98,20 +98,21 @@ (let* ((run-id (if rid rid 0)) (connection-info (rmt:get-connection-info run-id))) ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) (if connection-info ;; use the server if have connection info - (let* ((dat (case *transport-type* + (let* ((transport-type (vector-ref connection-info 6)) + (dat (case transport-type ;; BB: replaced *transport-type* global with run-id specific transport-type, item 6 in server-info vector which was populated by *-transport:client-connect with >> (vector iface port api-uri api-url api-req (current-seconds) 'http ) << ((http)(condition-case (http-transport:client-api-send-receive run-id connection-info cmd params) ((commfail)(vector #f "communications fail")) ((exn)(vector #f "other fail")))) - ;; ((nmsg)(condition-case - ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd params) - ;; ((timeout)(vector #f "timeout talking to server")))) - ((rpc) (rpc-transport:client-api-send-receive run-id connection-info cmd params)) - (else (exit)))) + ;;((rpc) (rpc-transport:client-api-send-receive run-id connection-info cmd params)) ;; BB: let us error out for now + (else + (debug:print-error 0 *default-log-port* "Transport [" + transport "] specified for run-id [" run-id "] is not implemented in rmt:send-receive. Cannot proceed.") + (exit 1)))) (success (if (vector? dat) (vector-ref dat 0) #f)) (res (if (vector? dat) (vector-ref dat 1) #f))) (if (vector? connection-info)(http-transport:server-dat-update-last-access connection-info)) (if success (begin Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -331,11 +331,11 @@ ;; BB> bb opinion - want to push responsibility into api (encapsulation), like waiting if db is busy and finding the db handle in the first place. why should the caller need to be concerned?? If my opinion carries, we'll remove the bb- and make other needful adjustments. (define (bb-mdb-inserter mdb-expecting-proc mdbless-args) (let ((mdb (db:delay-if-busy (tasks:open-db)))) - (apply mdb-expecting-proc (cons mdb args)))) + (apply mdb-expecting-proc (cons mdb mdbless-args)))) (define (tasks:bb-get-server-info . args) (bb-mdb-inserter tasks:get-server-info args)) (define (tasks:bb-num-in-available-state . args)