Index: nmsg-transport.scm ================================================================== --- nmsg-transport.scm +++ nmsg-transport.scm @@ -61,11 +61,11 @@ ;;====================================================================== ;; S E R V E R ;;====================================================================== -(define (nmsg-transport:run dbstruct hostn run-id server-id) +(define (nmsg-transport:run dbstruct hostn run-id server-id #!key (retrynum 1000)) (debug:print 2 "Attempting to start the server ...") (let* ((start-port (portlogger:open-run-close portlogger:find-port)) (server-thread (make-thread (lambda () (nmsg-transport:try-start-server dbstruct run-id start-port server-id)) "server thread")) @@ -81,14 +81,19 @@ (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running") (thread-start! (make-thread (lambda ()(nmsg-transport:keep-running server-id)) "keep running")) (thread-join! server-thread)) - (begin - (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature") - (portlogger:open-run-close portlogger:set-failed start-port) - (nmsg-transport:run dbstruct hostn run-id server-id))))) + (if (> retrynum 0) + (begin + (debug:print 0 "WARNING: Failed to connect to server (self) on host " hostn ":" start-port ", trying again.") + (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature") + (portlogger:open-run-close portlogger:set-failed start-port) + (nmsg-transport:run dbstruct hostn run-id server-id)) + (begin + (debug:print 0 "ERROR: could not find an open port to start server on. Giving up") + (exit 1)))))) (define (nmsg-transport:try-start-server dbstruct run-id portnum server-id) (let ((repsoc (nn-socket 'rep))) (nn-bind repsoc (conc "tcp://*:" portnum)) (let loop ((msg-in (nn-recv repsoc))) @@ -171,61 +176,71 @@ ;; expect the key expected-key returned in payload ;; send our-key or #f as payload ;; (define (nmsg-transport:ping hostn port #!key (timeout 3)(return-socket #t)(expected-key #f)(our-key #f)(socket #f)) ;; send a random number along with pid and check that we get it back - (let* ((req (or socket (nn-socket 'req))) - (host (if (or (not hostn) + (let* ((host (if (or (not hostn) (equal? hostn "-")) ;; use localhost (get-host-name) hostn)) - (success #f) - (keepwaiting #t) + (req (or socket + (let ((soc (nn-socket 'req))) + (nn-connect soc (conc "tcp://" host ":" port)) + soc))) (dat (db:obj->string (vector "ping" our-key) transport: 'nmsg)) - (ping (make-thread - (lambda () - (nn-send req dat) - (let* ((result (nn-recv req)) - (key (vector-ref (db:string->obj result transport: 'nmsg) 1))) - (if (or (not expected-key) ;; just getting a reply is good enough then - (equal? key expected-key)) - (begin - ;; (print "ping, success: received \"" result "\"") - (set! success #t)) - (begin - ;; (print "ping, failed: received key \"" result "\"") - (set! keepwaiting #f) - (set! success #f))))) - "ping")) - (timeout (make-thread (lambda () - (let loop ((count 0)) - (thread-sleep! 1) - (print "still waiting after count seconds...") - (if (and keepwaiting (< count timeout)) ;; yes, this is very aproximate - (loop (+ count 1)))) - (if keepwaiting - (begin - (print "timeout waiting for ping") - (thread-terminate! ping)))) - "timeout"))) - (if (not socket)(nn-connect req (conc "tcp://" host ":" port))) + (result (nmsg-transport:client-api-send-receive-raw req dat timeout: timeout)) + (success (vector-ref result 0)) + (key (if success + (vector-ref (db:string->obj (vector-ref result 1) transport: 'nmsg) 1) + #f))) + (debug:print 0 "success=" success ", key=" key ", expected-key=" expected-key ", equal? " (equal? key expected-key)) + (if (and success + (or (not expected-key) ;; just getting a reply is good enough then + (equal? key expected-key))) + (if return-socket + req + (begin + (if (not socket)(nn-close req)) ;; don't want a side effect of closing socket if handed it + #t)) + (begin + (if (not socket)(nn-close req)) ;; failed to ping, close socket as side effect + #f)))) + +;; send data to server, wait max of timeout seconds for a response. +;; return #( success/fail result ) +;; +(define (nmsg-transport:client-api-send-receive-raw socreq dat #!key (timeout 5)) + (let* ((success #f) + (result #f) + (keepwaiting #t) + (send-recv (make-thread + (lambda () + (nn-send socreq dat) + (let* ((res (nn-recv socreq))) + (set! success #t) + (set! result res))) + "send-recv")) + (timeout (make-thread + (lambda () + (let loop ((count 0)) + (thread-sleep! 1) + (debug:print-info 1 "send-receive-raw, still waiting after " count " seconds...") + (if (and keepwaiting (< count timeout)) ;; yes, this is very aproximate + (loop (+ count 1)))) + (if keepwaiting + (begin + (print "timeout waiting for ping") + (thread-terminate! send-recv)))) + "timeout"))) (handle-exceptions exn - (begin - ;; (print-call-chain) - ;; (print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) - ;; (print "exn=" (condition->list exn)) - (debug:print-info 1 "ping failed to connect to " host ":" port)) + (set! result "timeout") (thread-start! timeout) - (thread-start! ping) - (thread-join! ping) + (thread-start! send-recv) + (thread-join! send-recv) (if success (thread-terminate! timeout))) - (if return-socket - (if success req #f) - (begin - (nn-close req) ;; should it be closed if we were handed a socket? - success)))) + (vector success result))) ;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (nmsg-transport:keep-running server-id) @@ -289,19 +304,22 @@ (define (nmsg-transport:client-connect iface portnum) (let* ((reqsoc (nmsg-transport:ping iface portnum return-socket: #t))) (vector iface portnum #f #f #f (current-seconds) reqsoc))) -(define (nmsg-transport:client-api-send-receive run-id connection-info cmd param) +;; return #( success result ) +;; +(define (nmsg-transport:client-api-send-receive run-id connection-info cmd param #!key (remtries 5)) (mutex-lock! *http-mutex*) - (let ((packet (vector cmd param)) - (reqsoc (http-transport:server-dat-get-socket connection-info))) - (nn-send reqsoc (db:obj->string packet transport: 'nmsg)) - (let ((res (db:string->obj (nn-recv reqsoc) transport: 'nmsg))) - (mutex-unlock! *http-mutex*) - res))) - + (let* ((packet (db:obj->string (vector cmd param) transport: 'nmsg)) + (reqsoc (http-transport:server-dat-get-socket connection-info)) + (rawres (nmsg-transport:client-api-send-receive-raw reqsoc packet)) + (status (vector-ref rawres 0)) + (result (vector-ref rawres 1))) + (mutex-unlock! *http-mutex*) + (vector status (if status (db:string->obj result transport: 'nmsg) result)))) + ;;====================================================================== ;; J U N K ;;====================================================================== ;; DO NOT USE Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -89,33 +89,38 @@ ;; use the server if have connection info (let* ((dat (case *transport-type* ((http)(http-transport:client-api-send-receive run-id connection-info cmd jparams)) ((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info cmd params)) (else (exit)))) - (res (if (and dat (vector? dat)) (vector-ref dat 1) #f)) - (success (if (and dat (vector? dat)) (vector-ref dat 0) #f))) + (success (if (and dat (vector? dat)) (vector-ref dat 0) #f)) + (res (if (and dat (vector? dat)) (vector-ref dat 1) #f))) (http-transport:server-dat-update-last-access connection-info) (if success (case *transport-type* ((http)(db:string->obj res)) ((nmsg) res)) (begin ;; let ((new-connection-info (client:setup run-id))) (debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.") + (case *transport-type* + ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info)))) (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection + (tasks:kill-server-run-id run-id tag: "api-send-receive-failed") + (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) + ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) ;; no longer killing the server in http-transport:client-api-send-receive ;; may kill it here but what are the criteria? ;; start with three calls then kill server - (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) - (thread-sleep! 2) - (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1))))) - (if (and (< attemptnum 10) - (tasks:need-server run-id)) - (begin - (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10) - (rmt:send-receive cmd rid params (+ attemptnum 1))) - (rmt:open-qry-close-locally cmd run-id params))))) + ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) + ;; (thread-sleep! 2) + (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))))) + (if (and (< attemptnum 10) + (tasks:need-server run-id)) + (begin + (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10) + (rmt:send-receive cmd rid params (+ attemptnum 1))) + (rmt:open-qry-close-locally cmd run-id params)))) (define (rmt:update-db-stats run-id rawcmd params duration) (mutex-lock! *db-stats-mutex*) (handle-exceptions exn