Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -782,18 +782,18 @@ (args:get-arg "-kill-server")) (let ((tl (launch:setup))) (if tl (let* ((tdbdat (tasks:open-db)) (servers (tasks:get-all-servers (db:delay-if-busy tdbdat))) - (fmtstr "~5a~12a~8a~20a~24a~10a~10a~10a~10a\n") + (fmtstr "~5a~6a~12a~8a~20a~24a~10a~10a~10a~10a\n") (servers-to-kill '()) (kill-switch (if (args:get-arg "-kill-server") "-9" "")) (killinfo (or (args:get-arg "-stop-server") (args:get-arg "-kill-server") )) (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f)) (sid (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f))) - (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface:OutPort" "InPort" "LastBeat" "State" "Transport") - (format #t fmtstr "==" "=====" "===" "====" "=================" "======" "========" "=====" "=========") + (format #t fmtstr "Id" "RunId" "MTver" "Pid" "Host" "Interface:OutPort" "InPort" "LastBeat" "State" "Transport") + (format #t fmtstr "==" "=====" "=====" "===" "====" "=================" "======" "========" "=====" "=========") (for-each (lambda (server) (let* ((id (vector-ref server 0)) (pid (vector-ref server 1)) (hostname (vector-ref server 2)) @@ -804,10 +804,11 @@ (priority (vector-ref server 7)) (state (vector-ref server 8)) (mt-ver (vector-ref server 9)) (last-update (vector-ref server 10)) (transport (vector-ref server 11)) + (run-id (vector-ref server 12)) (killed #f) (status (< last-update 20))) ;; (zmq-sockets (if status (server:client-connect hostname port) #f))) ;; no need to login as status of #t indicates we are connecting to correct ;; server @@ -814,12 +815,12 @@ (if (equal? state "dead") (if (> last-update (* 25 60 60)) ;; keep records around for slighly over a day. (tasks:server-deregister (db:delay-if-busy tdbdat) hostname pullport: pullport pid: pid action: 'delete)) (if (> last-update 20) ;; Mark as dead if not updated in last 20 seconds (tasks:server-deregister (db:delay-if-busy tdbdat) hostname pullport: pullport pid: pid))) - (format #t fmtstr id mt-ver pid hostname (conc interface ":" pullport) pubport last-update - (if status "alive" "dead") transport) + (format #t fmtstr id run-id mt-ver pid hostname (conc interface ":" pullport) pubport last-update + (if status state "dead") transport) (if (or (equal? id sid) (equal? sid 0)) ;; kill all/any (begin (debug:print-info 0 *default-log-port* "Attempting to kill "kill-switch" server with pid " pid) (tasks:kill-server hostname pid kill-switch: kill-switch))))) Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -118,11 +118,18 @@ (define *rmt:srmutex* (make-mutex)) (define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected ;; side-effect: clean out old connections + + (when (eq? (modulo attemptnum 5) 0) + (debug:print-error 0 *default-log-port* "rmt:send-receive did not succeed after "(sub1 attemptnum)" tries. Aborting. (cmd="cmd" rid="rid" param="params) + (exit 1)) + (mutex-lock! *rmt:srmutex*) + + ;; expire connections (let ((expire-time (- (current-seconds) (server:get-timeout) 10))) ;; don't forget the 10 second margin (for-each (lambda (run-id) (let ((connection (rmt:get-cinfo run-id))) (if (and (vector? connection) @@ -133,14 +140,15 @@ (hash-table-keys *runremote*))) (let* ((run-id (if rid rid 0)) (connection-info (rmt:get-connection-info-start-server-if-none run-id))) ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) + (BB> "in rmt:send-receive; run-id="run-id";;connection-info="connection-info) (if connection-info ;; use the server if have connection info (let* ((transport-type (rmt:run-id->transport-type run-id)) - + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Here, we make request to remote server ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (dat (begin @@ -157,11 +165,13 @@ (vector #f (conc "transport ["transport-type"] unimplemented")))))) (success (if (vector? dat) (vector-ref dat 0) #f)) (res (if (vector? dat) (vector-ref dat 1) #f))) - (if (vector? connection-info)(http-transport:server-dat-update-last-access connection-info)) ;; BB> BBTODO: make this generic, not http transport specific. + (BB> "in rmt:send-receive; transport-type="transport-type" success="success" connection-info="connection-info" res="res " dat="dat) + (if (and success (vector? connection-info)) + (http-transport:server-dat-update-last-access connection-info)) ;; BB> BBTODO: make this generic, not http transport specific. (if success (begin (mutex-unlock! *rmt:srmutex*) ;; (mutex-unlock! *send-receive-mutex*) (case transport-type @@ -175,32 +185,40 @@ ;; no success... (begin ;; let ((new-connection-info (client:setup run-id))) (debug:print 0 *default-log-port* "WARNING: Communication failed, trying call to rmt:send-receive again.") (mutex-unlock! *rmt:srmutex*) - (case transport-type - - ((http rpc) - (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection - ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. - ;; (if (eq? (modulo attemptnum 5) 0) - ;; (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) - ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications - (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) - ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) - - ;; no longer killing the server in http-transport:client-api-send-receive - ;; may kill it here but what are the criteria? - ;; start with three calls then kill server - ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) - ;; (thread-sleep! 2) - (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1))) - (else - (debug:print-error 0 *default-log-port* "(3) Transport [" transport-type - "] specified for run-id [" run-id - "] is not implemented in rmt:send-receive. Cannot proceed.") - (exit 1)))))) + (rmt:del-cinfo run-id) ;; don't keep using the same connection + (rmt:send-receive cmd rid params attemptnum: attemptnum) + + + ;; (case transport-type + + ;; ((http rpc) + + ;; ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. + ;; ;; (if (eq? (modulo attemptnum 5) 0) + ;; ;; (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) + ;; ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications + ;; (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) + ;; (thread-sleep! 5) + + ;; ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) + + ;; ;; no longer killing the server in http-transport:client-api-send-receive + ;; ;; may kill it here but what are the criteria? + ;; ;; start with three calls then kill server + ;; ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) + ;; ;; (thread-sleep! 2) + ;; (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1))) + ;; (else + ;; (debug:print-error 0 *default-log-port* "(3) Transport [" transport-type + ;; "] specified for run-id [" run-id + ;; "] is not implemented in rmt:send-receive. Cannot proceed.") + ;; (exit 1))) + + ))) ;; no connection info; try to start a server ;; ;; Note: The tasks db was checked for a server in starting mode in the rmt:get-connection-info call ;; Index: rpc-transport.scm ================================================================== --- rpc-transport.scm +++ rpc-transport.scm @@ -244,31 +244,31 @@ (define *api-exec-ht* (make-hash-table)) ;; let's see if caching the rpc stub curbs thread-profusion on server side (define (rpc-transport:get-api-exec iface port) - (let* ((lu (hash-table-ref/default *api-exec-ht* '(iface . port) #f))) + (let* ((lu (hash-table-ref/default *api-exec-ht* (cons iface port) #f))) (if lu lu (let ((res (rpc:procedure 'api-exec iface port))) - (hash-table-set! *api-exec-ht* '(iface . port) res) + (hash-table-set! *api-exec-ht* (cons iface port) res) res)))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; this client-side procedure makes rpc call to server and returns result ;; (define (rpc-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 3)) - (BB> "entered nsport:client-api-send-receive with run-id="run-id " serverdat="serverdat" cmd="cmd" params="params" numretries="numretries) + (BB> "entered rpc-transport:client-api-send-receive with run-id="run-id " serverdat="serverdat" cmd="cmd" params="params" numretries="numretries) (if (not (vector? serverdat)) (begin (BB> "WHAT?? for run-id="run-id", serverdat="serverdat) (print-call-chain) (exit 1))) (let* ((iface (rpc-transport:server-dat-get-iface serverdat)) (port (rpc-transport:server-dat-get-port serverdat)) (res #f) - (api-exec (rpc-transport:get-api-exec iface port)) + (api-exec (rpc-transport:get-api-exec iface port)) ;; chached by host/port. may need to clear... (send-receive (lambda () (tcp-buffer-size 0) (set! res (retry-thunk (lambda () (condition-case @@ -304,11 +304,11 @@ (debug:print-info 11 *default-log-port* "got res=" res) (if (vector? res) (case (vector-ref res 0) ((success) (vector #t (vector-ref res 1))) ((comms-fail) - (debug:print 0 *default-log-port* "WARNING: comms failure for rpc request") + (debug:print 0 *default-log-port* "WARNING: comms failure for rpc request >>"res"<<") ;;(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn)) (vector #f (vector-ref res 1))) (else (debug:print-error 0 *default-log-port* "error occured at server, info=" (vector-ref res 1)) (debug:print 0 *default-log-port* " client call chain:")