@@ -49,105 +49,63 @@ ;; [x] [ ] - after 60 seconds ;; [ ] [ ] i. check server alive, connect to new if necessary ;; [ ] [ ] ii. resend request ;; [ ] [ ] 7. Turn self ping back on -(define (nmsg-transport:make-server-url hostport) +(define (nmsg-transport:make-server-url hostport #!key (bindall #f)) (if (not hostport) #f - (conc "tcp://" (car hostport) ":" (cadr hostport)))) + (conc "tcp://" (if bindall "*" (car hostport)) ":" (cadr hostport)))) (define *server-loop-heart-beat* (current-seconds)) (define *heartbeat-mutex* (make-mutex)) ;;====================================================================== ;; S E R V E R ;;====================================================================== -(define-inline (nmsgsock:get-pub dat)(vector-ref dat 0)) -(define-inline (nmsgsock:get-pull dat)(vector-ref dat 1)) -(define-inline (nmsgsock:set-pub! dat s)(vector-set! dat s 0)) -(define-inline (nmsgsock:set-pull! dat s)(vector-set! dat s 0)) - -(define (nmsg-transport:run hostn) +(define (nmsg-transport:run dbstruct hostn run-id server-id) (debug:print 2 "Attempting to start the server ...") - (if (not *toppath*) - (if (not (launch:setup-for-run)) - (begin - (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") - (exit)))) - (let* ((db (open-db)) ;; here we *do not* want to be opening and closing the db - (nmsg-sdat1 #f) - (nmsg-sdat2 #f) - (pull-socket #f) - (pub-socket #f) - (p1 #f) - (p2 #f) - (nmsg-sockets-dat #f) - (iface (if (string=? "-" hostn) - "*" ;; (get-host-name) - hostn)) - (hostname (get-host-name)) - (ipaddrstr (let ((ipstr (if (string=? "-" hostn) - (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") - #f))) - (if ipstr ipstr hostname))) - (last-run 0)) - (set! nmsg-sockets-dat (nmsg-transport:setup-ports ipaddrstr (if (args:get-arg "-port") - (string->number (args:get-arg "-port")) - (+ 5000 (random 1001))))) - - (set! nmsg-sdat1 (car nmsg-sockets-dat)) - (set! pull-socket (cadr nmsg-sdat1)) ;; (iface s port) - (set! p1 (caddr nmsg-sdat1)) - - (set! nmsg-sdat2 (cadr nmsg-sockets-dat)) - (set! pub-socket (cadr nmsg-sdat2)) - (set! p2 (caddr nmsg-sdat2)) - - (set! *cache-on* #t) - - (set! *runremote* (vector pull-socket pub-socket)) ;; overloading the use of *runremote* BUG!? - - ;; what to do when we quit - ;; -;; (on-exit (lambda () -;; (if (and *toppath* *server-info*) -;; (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*)) -;; (let loop () -;; (let ((queue-len 0)) -;; (thread-sleep! (random 5)) -;; (mutex-lock! *incoming-mutex*) -;; (set! queue-len (length *incoming-data*)) -;; (mutex-unlock! *incoming-mutex*) -;; (if (> queue-len 0) -;; (begin -;; (debug:print-info 0 "Queue not flushed, waiting ...") -;; (loop)))))))) - - ;; The heavy lifting - ;; - ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime - ;; - (debug:print-info 11 "Server setup complete, start listening for messages") - (let loop ((queue-lst '())) - (let* ((rawmsg (receive-message* pull-socket)) - (packet (db:string->obj rawmsg)) - (qtype (cdb:packet-get-qtype packet))) - (debug:print-info 12 "server=> received packet=" packet) - (if (not (member qtype '(sync ping))) - (begin - (mutex-lock! *heartbeat-mutex*) - (set! *last-db-access* (current-seconds)) - (mutex-unlock! *heartbeat-mutex*))) - (if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue - (begin - (db:process-queue-item db packet) - ;; (open-run-close db:process-queue #f pub-socket (cons packet queue-lst)) - - (loop '())) - (loop (cons packet queue-lst))))))) + (let* ((start-port (portlogger:open-run-close portlogger:find-port)) + (server-thread (make-thread (lambda () + (nmsg-transport:try-start-server dbstruct run-id start-port server-id)) + "server thread")) + (tdbdat (tasks:open-db))) + (thread-start! server-thread) + (if (nmsg-transport:ping hostn start-port) + (begin + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep") + (set! *server-info* (list hostn start-port)) ;; probably not needed anymore? + (thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access + (set! *inmemdb* dbstruct) + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running") + (thread-start! nmsg-transport:keep-running) + (thread-join! server-thread)) + (begin + (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature") + (portlogger:open-run-close portlogger:set-failed start-port) + (nmsg-transport:run dbstruct hostn run-id server-id))))) + +(define (nmsg-transport:try-start-server dbstruct run-id portnum server-id) + (let ((repsoc (nn-socket 'rep))) + (nn-bind repsoc (conc "tcp://*:" portnum)) + (let loop ((msg-in (nn-recv repsoc))) + (cond + ((equal? msg-in "quit") + (nn-send repsoc "Ok, quitting")) + ((and (>= (string-length msg-in) 4) + (equal? (substring msg-in 0 4) "ping")) + (nn-send repsoc (conc (current-process-id))) + (loop (nn-recv repsoc))) + (else + (let* ((dat (db:string->obj msg-in transport: 'nm)) + (cmd (vector-ref dat 0)) + (params (vector-ref dat 1)) + (result (api:execute-requests dbstruct cmd params)) + (newdat (db:obj->string result transport: 'nm))) + (nn-send repsoc newdat) + (loop (nn-recv repsoc)))))))) ;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (nmsg-transport:keep-running) @@ -178,11 +136,11 @@ ;; (print "Server running, count is " count) (if (< count 1) ;; 3x3 = 9 secs aprox (loop (+ count 1))) ;; NOTE: Get rid of this mechanism! It really is not needed... - (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) + ;; (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access (mutex-lock! *heartbeat-mutex*) (set! last-access *last-db-access*) (mutex-unlock! *heartbeat-mutex*) @@ -204,101 +162,136 @@ (thread-sleep! 1) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") (exit))))))) -(define (nmsg-transport:find-free-port-and-open iface s port stype #!key (trynum 50)) - (let ((s (if s s (make-socket stype))) - (p (if (number? port) port 5555)) - (old-handler (current-exception-handler))) - (handle-exceptions - exn - (begin - (debug:print 0 "Failed to bind to port " p ", trying next port") - (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) - ;; (old-handler) - ;; (print-call-chain) - (if (> trynum 0) - (nmsg-transport:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1)) - (debug:print-info 0 "Tried ports up to " p - " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use")) - (exit)) ;; To exit or not? That is the question. - (let ((nmsg-url (conc "tcp://" iface ":" p))) - (debug:print 2 "Trying to start server on " nmsg-url) - (bind-socket s nmsg-url) - (list iface s port))))) - -(define (nmsg-transport:setup-ports ipaddrstr startport) - (let* ((s1 (nmsg-transport:find-free-port-and-open ipaddrstr #f startport 'pull)) - (p1 (caddr s1)) - (s2 (nmsg-transport:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub)) - (p2 (caddr s2))) - (set! *runremote* #f) - (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2) - (mutex-lock! *heartbeat-mutex*) - (set! *server-info* (open-run-close tasks:server-register - tasks:open-db - (current-process-id) - ipaddrstr p1 - 0 - 'live - 'nmsg - pubport: p2)) - (debug:print-info 11 "*server-info* set to " *server-info*) - (mutex-unlock! *heartbeat-mutex*) - (list s1 s2))) +;; all routes though here end in exit ... +;; +(define (nmsg-transport:launch run-id) + (let* ((tdbdat (tasks:open-db)) + (dbstruct (db:setup run-id)) + (hostn (or (args:get-arg "-server") "-"))) + (set! *run-id* run-id) + ;; with nbfake daemonize isn't really needed + ;; + ;; (if (args:get-arg "-daemonize") + ;; (begin + ;; (daemon:ize) + ;; (if *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it + ;; (begin + ;; (current-error-port *alt-log-file*) + ;; (current-output-port *alt-log-file*))))) + (if (server:check-if-running run-id) + (begin + (debug:print-info 0 "Server for run-id " run-id " already running") + (exit 0))) + (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)) + (remtries 4)) + (if (not server-id) + (if (> remtries 0) + (begin + (thread-sleep! 2) + (if (not (server:check-if-running run-id)) + (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id) + (- remtries 1)) + (begin + (debug:print-info 0 "Another server took the slot, exiting") + (exit 0)))) + (begin + ;; since we didn't get the server lock we are going to clean up and bail out + (debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") + (tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch") + ))) + (nmsg-transport:run dbstruct hostn run-id server-id) + (set! *didsomething* #t) + (exit)))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== (define (nmsg-transport:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) (argv))))))) -;;====================================================================== -;; S E R V E R U T I L I T I E S -;;====================================================================== - ;;====================================================================== ;; C L I E N T S ;;====================================================================== -;; -(define (nmsg-transport:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '())) - (debug:print-info 3 "client-connect " iface ":" port ", type=" type ", subscriptions=" subscriptions) - (let ((connect-ok #f) - (nmsg-socket (if context - (make-socket type context) - (make-socket type))) - (conurl (nmsg-transport:make-server-url (list iface port)))) - (if (socket? nmsg-socket) - (begin - ;; first apply subscriptions - (for-each (lambda (subscription) - (debug:print 2 "Subscribing to " subscription) - (socket-option-set! nmsg-socket 'subscribe subscription)) - subscriptions) - (connect-socket nmsg-socket conurl) - nmsg-socket) - (begin - (debug:print 0 "ERROR: Failed to open socket to " conurl) - #f)))) - -(define (nmsg-transport:client-connect iface pullport pubport) - (let* ((push-socket (nmsg-transport:client-socket-connect iface pullport type: 'push)) - (sub-socket (nmsg-transport:client-socket-connect iface pubport - type: 'sub - subscriptions: (list (client:get-signature) "all"))) - (nmsg-sockets (vector push-socket sub-socket)) - (login-res #f)) +;; ping the server at host:port +;; return the open socket if successful (return-socket == #t) +;; expect the key expected-key returned in payload +;; send our-key or #f as payload +;; +(define (nmsg-transport:ping hostn port #!key (return-socket #t)(expected-key #f)(our-key #f)) + ;; send a random number along with pid and check that we get it back + (let* ((req (nn-socket 'req)) + (host (if (or (not hostn) + (equal? hostn "-")) ;; use localhost + (get-host-name) + hostn)) + (success #f) + (keepwaiting #t) + (dat (db:obj->string (vector "ping" our-key) transport: 'nm)) + (ping (make-thread + (lambda () + (nn-send req dat) + (let* ((result (nn-recv req)) + (key (vector-ref (db:string->obj result transport: 'nm) 1))) + (if (or (not expect-key) ;; just getting a reply is good enough then + (equal? (conc (current-process-id)) expected-key)) + (begin + ;; (print "ping, success: received \"" result "\"") + (set! success #t)) + (begin + ;; (print "ping, failed: received key \"" result "\"") + (set! keepwaiting #f) + (set! success #f))))) + "ping")) + (timeout (make-thread (lambda () + (let loop ((count 0)) + (thread-sleep! 1) + (print "still waiting after count seconds...") + (if (and keepwaiting (< count 10)) + (loop (+ count 1)))) + (if keepwaiting + (begin + (print "timeout waiting for ping") + (thread-terminate! ping)))) + "timeout"))) + (nn-connect req (conc "tcp://" host ":" port)) + (handle-exceptions + exn + (begin + (print-call-chain) + (print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) + (print "exn=" (condition->list exn)) + (print "ping failed to connect to " host ":" port)) + (thread-start! timeout) + (thread-start! ping) + (thread-join! ping) + (if success (thread-terminate! timeout))) + (if return-socket + (if success req #f) + (begin + (nn-close req) + success)))) + +(define (nmsg-transport:client-connect iface portnum) + (let* ((reqsoc (nmsg-transport:ping iface portnum)) + (login-res #f)) + (nn-connect reqsoc (conc "tcp://" iface ":" portnum)) (debug:print-info 11 "nmsg-transport:client-connect started. Next is login") (set! login-res (client:login serverdat nmsg-sockets)) (if (and (not (null? login-res)) (car login-res)) (begin (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".") - (set! *runremote* nmsg-sockets) + (set! *nm-port* nmsg-sockets) nmsg-sockets) (begin (debug:print-info 2 "Failed to login or connect to " conurl) (set! *runremote* #f) #f)))) @@ -358,49 +351,10 @@ (thread-sleep! 1) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") (exit))))))) -;; all routes though here end in exit ... -(define (nmsg-transport:launch) - (if (not *toppath*) - (if (not (launch:setup-for-run)) - (begin - (debug:print 0 "ERROR: cannot find megatest.config, exiting") - (exit)))) - (debug:print-info 2 "Starting nmsg server") - (if *toppath* - (let* (;; (th1 (make-thread (lambda () - ;; (let ((server-info #f)) - ;; ;; wait for the server to be online and available - ;; (let loop () - ;; (debug:print-info 2 "Waiting for the server to come online before starting heartbeat") - ;; (thread-sleep! 2) - ;; (mutex-lock! *heartbeat-mutex*) - ;; (set! server-info *server-info* ) - ;; (mutex-unlock! *heartbeat-mutex*) - ;; (if (not server-info)(loop))) - ;; (debug:print 2 "Server alive, starting self-ping") - ;; (nmsg-transport:self-ping server-info) - ;; )) - ;; "Self ping")) - (th2 (make-thread (lambda () - (nmsg-transport:run - (if (args:get-arg "-server") - (args:get-arg "-server") - "-"))) "Server run")) - ;; (th3 (make-thread (lambda ()(nmsg-transport:keep-running)) "Keep running")) - ) - (set! *client-non-blocking-mode* #t) - ;; (thread-start! th1) - (thread-start! th2) - ;; (thread-start! th3) - (set! *didsomething* #t) - ;; (thread-join! th3) - (thread-join! th2) - ) - (debug:print 0 "ERROR: Failed to setup for megatest"))) (define (nmsg-transport:client-signal-handler signum) (handle-exceptions exn (debug:print " ... exiting ...") @@ -416,78 +370,5 @@ "exit on ^C timer"))) (thread-start! th2) (thread-start! th1) (thread-join! th2)))) -(define (nmsg-transport:client-launch) - (set-signal-handler! signal/int nmsg-transport:client-signal-handler) - (if (nmsg-transport:client-setup) - (debug:print-info 2 "connected as client") - (begin - (debug:print 0 "ERROR: Failed to connect as client") - (exit)))) - -;;====================================================================== -;; Defunct functions -;;====================================================================== - -;; ping a server and return number of clients or #f (if no response) -;; NOT IN USE! -(define (nmsg-transport:ping host port #!key (secs 10)(return-socket #f)) - (cdb:use-non-blocking-mode - (lambda () - (let* ((res #f) - (th1 (make-thread - (lambda () - (let* ((nmsg-context (make-context 1)) - (nmsg-socket (nmsg-transport:client-connect host port context: nmsg-context))) - (if nmsg-socket - (if (nmsg-transport:client-login nmsg-socket) - (let ((numclients (cdb:num-clients nmsg-socket))) - (if (not return-socket) - (begin - (nmsg-transport:client-logout nmsg-socket) - (close-socket nmsg-socket))) - (set! res (list #t numclients (if return-socket nmsg-socket #f)))) - (begin - ;; (close-socket nmsg-socket) - (set! res (list #f "CAN'T LOGIN" #f)))) - (set! res (list #f "CAN'T CONNECT" #f))))) - "Ping: th1")) - (th2 (make-thread - (lambda () - (let loop ((count 1)) - (debug:print-info 1 "Ping " count " server on " host " at port " port) - (thread-sleep! 2) - (if (< count (/ secs 2)) - (loop (+ count 1)))) - ;; (thread-terminate! th1) - (set! res (list #f "TIMED OUT" #f))) - "Ping: th2"))) - (thread-start! th2) - (thread-start! th1) - (handle-exceptions - exn - (set! res (list #f "TIMED OUT" #f)) - (thread-join! th1 secs)) - res)))) - -;; (define (nmsg-transport:self-ping server-info) -;; ;; server-info: server-id interface pullport pubport -;; (let ((iface (list-ref server-info 1)) -;; (pullport (list-ref server-info 2)) -;; (pubport (list-ref server-info 3))) -;; (nmsg-transport:client-connect iface pullport pubport) -;; (let loop () -;; (thread-sleep! 2) -;; (cdb:client-call *runremote* 'ping #t) -;; (debug:print 4 "nmsg-transport:self-ping - I'm alive on " iface ":" pullport "/" pubport "!") -;; (mutex-lock! *heartbeat-mutex*) -;; (set! *server-loop-heart-beat* (current-seconds)) -;; (mutex-unlock! *heartbeat-mutex*) -;; (loop)))) - -(define (nmsg-transport:reply pubsock target query-sig success/fail result) - (debug:print-info 11 "nmsg-transport:reply target=" target ", result=" result) - (send-message pubsock target send-more: #t) - (send-message pubsock (db:obj->string (vector success/fail query-sig result)))) -