@@ -7,11 +7,10 @@ ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. (require-extension (srfi 18) extras tcp s11n) -;; (import (prefix rpc rpc:)) (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) (import (prefix sqlite3 sqlite3:)) (use zmq) @@ -57,25 +56,11 @@ (conc "tcp://" (car hostport) ":" (cadr hostport)))) (define *server-loop-heart-beat* (current-seconds)) (define *heartbeat-mutex* (make-mutex)) -;; (define (server:self-ping server-info) -;; ;; server-info: server-id interface pullport pubport -;; (let ((iface (list-ref server-info 1)) -;; (pullport (list-ref server-info 2)) -;; (pubport (list-ref server-info 3))) -;; (server:client-connect iface pullport pubport) -;; (let loop () -;; (thread-sleep! 2) -;; (cdb:client-call *runremote* 'ping #t) -;; (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!") -;; (mutex-lock! *heartbeat-mutex*) -;; (set! *server-loop-heart-beat* (current-seconds)) -;; (mutex-unlock! *heartbeat-mutex*) -;; (loop)))) - + (define-inline (zmqsock:get-pub dat)(vector-ref dat 0)) (define-inline (zmqsock:get-pull dat)(vector-ref dat 1)) (define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0)) (define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0)) @@ -150,15 +135,10 @@ (begin (open-run-close db:process-queue #f pub-socket (cons packet queue-lst)) (loop '())) (loop (cons packet queue-lst))))))) -(define (server:reply pubsock target query-sig success/fail result) - (debug:print-info 11 "server:reply target=" target ", result=" result) - (send-message pubsock target send-more: #t) - (send-message pubsock (db:obj->string (vector success/fail query-sig result)))) - ;; run server:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (server:keep-running) ;; if none running or if > 20 seconds since @@ -290,10 +270,11 @@ #f)))) (define (server:client-login zmq-sockets) (cdb:login zmq-sockets *toppath* (server:get-client-signature))) +;; Not currently used! But, I think it *should* be used!!! (define (server:client-logout zmq-socket) (let ((ok (and (socket? zmq-socket) (cdb:logout zmq-socket *toppath* (server:get-client-signature))))) ;; (close-socket zmq-socket) ok)) @@ -342,18 +323,26 @@ ;; (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport) ;; (server:client-setup (- numtries 1)) ;; #f) (server:client-connect iface pullport pubport)) ;; ) (if (> numtries 0) - (let ((exe (car (argv)))) - (debug:print-info 2 "No server available, attempting to start one...") - (process-run exe (list "-server" "-" "-debug" (conc *verbosity*))) - ;; (process-fork (lambda () - ;; (server:launch) - ;; (exit) ;; should never get here .... - ;; )) - (sleep 5) ;; give server time to start + (let ((exe (car (argv))) + (pid #f)) + (debug:print-info 0 "No server available, attempting to start one...") + ;; (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) + ;; (string-intersperse *verbosity* ",") + ;; (conc *verbosity*))))) + (set! pid (process-fork (lambda () + (server:launch)))) ;; should never get here .... + (let loop ((count 0)) + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) + (if (not hostinfo) + (begin + (debug:print-info 0 "Waiting for server pid=" pid " to start") + (sleep 2) ;; give server time to start + (if (< count 5) + (loop (+ count 1))))))) ;; we are starting a server, do not try again! That can lead to ;; recursively starting many processes!!! (server:client-setup numtries: 0)) (debug:print-info 1 "Too many attempts, giving up"))))) @@ -371,22 +360,25 @@ (if *toppath* (let* (;; (th1 (make-thread (lambda () ;; (let ((server-info #f)) ;; ;; wait for the server to be online and available ;; (let loop () - ;; (debug:print-info 2 "Waiting for the server to come online before starting heartbeat") + ;; (debug:print-info 2 "Waiting for the server to come online before starting heartbeat") ;; (thread-sleep! 2) ;; (mutex-lock! *heartbeat-mutex*) ;; (set! server-info *server-info* ) ;; (mutex-unlock! *heartbeat-mutex*) ;; (if (not server-info)(loop))) - ;; (debug:print 2 "Server alive, starting self-ping") + ;; (debug:print 2 "Server alive, starting self-ping") ;; (server:self-ping server-info) ;; )) ;; "Self ping")) (th2 (make-thread (lambda () - (server:run (args:get-arg "-server"))) "Server run")) + (server:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-"))) "Server run")) (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) ) (set! *client-non-blocking-mode* #t) ;; (thread-start! th1) (thread-start! th2) @@ -421,10 +413,14 @@ (if (server:client-setup) (debug:print-info 2 "connected as client") (begin (debug:print 0 "ERROR: Failed to connect as client") (exit)))) + +;;====================================================================== +;; Defunct functions +;;====================================================================== ;; ping a server and return number of clients or #f (if no response) ;; NOT IN USE! (define (server:ping host port #!key (secs 10)(return-socket #f)) (cdb:use-non-blocking-mode @@ -462,5 +458,26 @@ (handle-exceptions exn (set! res (list #f "TIMED OUT" #f)) (thread-join! th1 secs)) res)))) + +;; (define (server:self-ping server-info) +;; ;; server-info: server-id interface pullport pubport +;; (let ((iface (list-ref server-info 1)) +;; (pullport (list-ref server-info 2)) +;; (pubport (list-ref server-info 3))) +;; (server:client-connect iface pullport pubport) +;; (let loop () +;; (thread-sleep! 2) +;; (cdb:client-call *runremote* 'ping #t) +;; (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!") +;; (mutex-lock! *heartbeat-mutex*) +;; (set! *server-loop-heart-beat* (current-seconds)) +;; (mutex-unlock! *heartbeat-mutex*) +;; (loop)))) + +(define (server:reply pubsock target query-sig success/fail result) + (debug:print-info 11 "server:reply target=" target ", result=" result) + (send-message pubsock target send-more: #t) + (send-message pubsock (db:obj->string (vector success/fail query-sig result)))) +