@@ -61,11 +61,11 @@ ;;====================================================================== ;; S E R V E R ;;====================================================================== -(define (nmsg-transport:run dbstruct hostn run-id server-id) +(define (nmsg-transport:run dbstruct hostn run-id server-id #!key (retrynum 1000)) (debug:print 2 "Attempting to start the server ...") (let* ((start-port (portlogger:open-run-close portlogger:find-port)) (server-thread (make-thread (lambda () (nmsg-transport:try-start-server dbstruct run-id start-port server-id)) "server thread")) @@ -81,14 +81,19 @@ (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running") (thread-start! (make-thread (lambda ()(nmsg-transport:keep-running server-id)) "keep running")) (thread-join! server-thread)) - (begin - (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature") - (portlogger:open-run-close portlogger:set-failed start-port) - (nmsg-transport:run dbstruct hostn run-id server-id))))) + (if (> retrynum 0) + (begin + (debug:print 0 "WARNING: Failed to connect to server (self) on host " hostn ":" start-port ", trying again.") + (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature") + (portlogger:open-run-close portlogger:set-failed start-port) + (nmsg-transport:run dbstruct hostn run-id server-id)) + (begin + (debug:print 0 "ERROR: could not find an open port to start server on. Giving up") + (exit 1)))))) (define (nmsg-transport:try-start-server dbstruct run-id portnum server-id) (let ((repsoc (nn-socket 'rep))) (nn-bind repsoc (conc "tcp://*:" portnum)) (let loop ((msg-in (nn-recv repsoc))) @@ -171,61 +176,71 @@ ;; expect the key expected-key returned in payload ;; send our-key or #f as payload ;; (define (nmsg-transport:ping hostn port #!key (timeout 3)(return-socket #t)(expected-key #f)(our-key #f)(socket #f)) ;; send a random number along with pid and check that we get it back - (let* ((req (or socket (nn-socket 'req))) - (host (if (or (not hostn) + (let* ((host (if (or (not hostn) (equal? hostn "-")) ;; use localhost (get-host-name) hostn)) - (success #f) - (keepwaiting #t) + (req (or socket + (let ((soc (nn-socket 'req))) + (nn-connect soc (conc "tcp://" host ":" port)) + soc))) (dat (db:obj->string (vector "ping" our-key) transport: 'nmsg)) - (ping (make-thread - (lambda () - (nn-send req dat) - (let* ((result (nn-recv req)) - (key (vector-ref (db:string->obj result transport: 'nmsg) 1))) - (if (or (not expected-key) ;; just getting a reply is good enough then - (equal? key expected-key)) - (begin - ;; (print "ping, success: received \"" result "\"") - (set! success #t)) - (begin - ;; (print "ping, failed: received key \"" result "\"") - (set! keepwaiting #f) - (set! success #f))))) - "ping")) - (timeout (make-thread (lambda () - (let loop ((count 0)) - (thread-sleep! 1) - (print "still waiting after count seconds...") - (if (and keepwaiting (< count timeout)) ;; yes, this is very aproximate - (loop (+ count 1)))) - (if keepwaiting - (begin - (print "timeout waiting for ping") - (thread-terminate! ping)))) - "timeout"))) - (if (not socket)(nn-connect req (conc "tcp://" host ":" port))) + (result (nmsg-transport:client-api-send-receive-raw req dat timeout: timeout)) + (success (vector-ref result 0)) + (key (if success + (vector-ref (db:string->obj (vector-ref result 1) transport: 'nmsg) 1) + #f))) + (debug:print 0 "success=" success ", key=" key ", expected-key=" expected-key ", equal? " (equal? key expected-key)) + (if (and success + (or (not expected-key) ;; just getting a reply is good enough then + (equal? key expected-key))) + (if return-socket + req + (begin + (if (not socket)(nn-close req)) ;; don't want a side effect of closing socket if handed it + #t)) + (begin + (if (not socket)(nn-close req)) ;; failed to ping, close socket as side effect + #f)))) + +;; send data to server, wait max of timeout seconds for a response. +;; return #( success/fail result ) +;; +(define (nmsg-transport:client-api-send-receive-raw socreq dat #!key (timeout 5)) + (let* ((success #f) + (result #f) + (keepwaiting #t) + (send-recv (make-thread + (lambda () + (nn-send socreq dat) + (let* ((res (nn-recv socreq))) + (set! success #t) + (set! result res))) + "send-recv")) + (timeout (make-thread + (lambda () + (let loop ((count 0)) + (thread-sleep! 1) + (debug:print-info 1 "send-receive-raw, still waiting after " count " seconds...") + (if (and keepwaiting (< count timeout)) ;; yes, this is very aproximate + (loop (+ count 1)))) + (if keepwaiting + (begin + (print "timeout waiting for ping") + (thread-terminate! send-recv)))) + "timeout"))) (handle-exceptions exn - (begin - ;; (print-call-chain) - ;; (print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) - ;; (print "exn=" (condition->list exn)) - (debug:print-info 1 "ping failed to connect to " host ":" port)) + (set! result "timeout") (thread-start! timeout) - (thread-start! ping) - (thread-join! ping) + (thread-start! send-recv) + (thread-join! send-recv) (if success (thread-terminate! timeout))) - (if return-socket - (if success req #f) - (begin - (nn-close req) ;; should it be closed if we were handed a socket? - success)))) + (vector success result))) ;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (nmsg-transport:keep-running server-id) @@ -289,19 +304,22 @@ (define (nmsg-transport:client-connect iface portnum) (let* ((reqsoc (nmsg-transport:ping iface portnum return-socket: #t))) (vector iface portnum #f #f #f (current-seconds) reqsoc))) -(define (nmsg-transport:client-api-send-receive run-id connection-info cmd param) +;; return #( success result ) +;; +(define (nmsg-transport:client-api-send-receive run-id connection-info cmd param #!key (remtries 5)) (mutex-lock! *http-mutex*) - (let ((packet (vector cmd param)) - (reqsoc (http-transport:server-dat-get-socket connection-info))) - (nn-send reqsoc (db:obj->string packet transport: 'nmsg)) - (let ((res (db:string->obj (nn-recv reqsoc) transport: 'nmsg))) - (mutex-unlock! *http-mutex*) - res))) - + (let* ((packet (db:obj->string (vector cmd param) transport: 'nmsg)) + (reqsoc (http-transport:server-dat-get-socket connection-info)) + (rawres (nmsg-transport:client-api-send-receive-raw reqsoc packet)) + (status (vector-ref rawres 0)) + (result (vector-ref rawres 1))) + (mutex-unlock! *http-mutex*) + (vector status (if status (db:string->obj result transport: 'nmsg) result)))) + ;;====================================================================== ;; J U N K ;;====================================================================== ;; DO NOT USE