@@ -98,11 +98,12 @@ hostn)) (hostname (get-host-name)) (ipaddrstr (let ((ipstr (if (string=? "-" hostn) (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") #f))) - (if ipstr ipstr hostname)))) + (if ipstr ipstr hostname))) + (last-run 0)) (set! zmq-sockets-dat (server:setup-ports ipaddrstr (if (args:get-arg "-port") (string->number (args:get-arg "-port")) (+ 5000 (random 1001))))) (set! zmq-sdat1 (car zmq-sockets-dat)) @@ -113,14 +114,10 @@ (set! pub-socket (cadr zmq-sdat2)) (set! p2 (caddr zmq-sdat2)) (set! *cache-on* #t) - ;; (set! th1 (make-thread (lambda () - ;; (server:self-ping ipaddrstr actual-port)))) - ;; (thread-start! th1) - ;; what to do when we quit ;; (on-exit (lambda () (if (and *toppath* *server-info*) (begin @@ -136,55 +133,55 @@ (debug:print-info 0 "Queue not flushed, waiting ...") (loop)))))))) ;; The heavy lifting ;; - (let loop () + ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime + ;; + (let loop ((queue-lst '())) ;; (print "GOT HERE EH?") (let* ((rawmsg (receive-message* pull-socket)) - (params (db:string->obj rawmsg)) ;; (with-input-from-string rawmsg (lambda ()(deserialize)))) - (res #f)) - (debug:print-info 12 "server=> received params=" params) - (set! res (cdb:cached-access params)) - (debug:print-info 12 "server=> processed res=" res) - - ;; need address here - ;; - ;; (send-message zmq-socket (db:obj->string res)) - (if (not *time-to-exit*) - (loop) + (packet (db:string->obj rawmsg))) + (debug:print-info 12 "server=> received packet=" packet) + (if (cdb:packet-get-immediate packet) ;; process immediately or put in queue (begin - (open-run-close tasks:server-deregister-self tasks:open-db #f) - (db:write-cached-data) - (exit) - )))) - (thread-join! th1))) + (db:process-queue pubsock (cons packet queue)) + (loop '())) + (loop (cons packet queue))))))) +(define (server:reply pubsock target result) + (send-message pubsock target send-more: #t) + (send-message pubsock result)) ;; run server:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (server:keep-running) ;; if none running or if > 20 seconds since ;; server last used then start shutdown ;; This thread waits for the server to come alive - (let ((server-info (let loop () - (let ((sdat #f)) - (mutex-lock! *heartbeat-mutex*) - (set! sdat *server-info*) - (mutex-unlock! *heartbeat-mutex*) - (if sdat sdat - (begin - (sleep 4) - (loop))))))) + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *server-info*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (sleep 4) + (loop)))))) + (iface (cadr server-info)) + (pullport (caddr server-info)) + (pubport (cadddr server-info)) ;; id interface pullport pubport) + ;; (zmq-sockets (server:client-connect iface pullport pubport))) + ) (let loop ((count 0)) (thread-sleep! 4) ;; no need to do this very often - (db:write-cached-data) + ;; (let ((queue-len (string->number (cdb:client-call zmq-sockets 'sync #t 1)))) ;; (print "Server running, count is " count) (if (< count 1) ;; 3x3 = 9 secs aprox (loop (+ count 1))) - + ;; NOTE: Get rid of this mechanism! It really is not needed... (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access (if (> (+ *last-db-access* @@ -252,25 +249,28 @@ (set! *my-client-signature* sig) *my-client-signature*))) ;; (define (server:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '())) - (debug:print-info 3 "client-connect " iface ":" port) + (debug:print-info 3 "client-connect " iface ":" port ", type=" type ", subscriptions=" subscriptions) (let ((connect-ok #f) (zmq-socket (if context (make-socket type context) (make-socket type))) (conurl (server:make-server-url (list iface port)))) (if (socket? zmq-socket) (begin ;; first apply subscriptions (for-each (lambda (subscription) - (socket-options-set! zmq-socket 'subscribe subscription)) + (debug:print 2 "Subscribing to " subscription) + (socket-option-set! zmq-socket 'subscribe subscription)) subscriptions) (connect-socket zmq-socket conurl) zmq-socket) - #f))) + (begin + (debug:print 0 "ERROR: Failed to open socket to " conurl) + #f)))) (define (server:client-login zmq-sockets) (cdb:login zmq-sockets *toppath* (server:get-client-signature))) (define (server:client-logout zmq-socket) @@ -278,22 +278,23 @@ (cdb:logout zmq-socket *toppath* (server:get-client-signature))))) ;; (close-socket zmq-socket) ok)) (define (server:client-connect iface pullport pubport) - (let* ((push-socket (server:client-socket-connect iface pullport 'push)) - (sub-socket (server:client-socket-connect iface pubport 'sub + (let* ((push-socket (server:client-socket-connect iface pullport type: 'push)) + (sub-socket (server:client-socket-connect iface pubport + type: 'sub subscriptions: (list (server:get-client-signature) "all"))) (zmq-sockets (vector push-socket sub-socket)) (login-res #f)) (set! login-res (server:client-login zmq-sockets)) (if (and (not (null? login-res)) (car login-res)) (begin (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".") - (set! *runremote* zmq-socket) - #t) + (set! *runremote* zmq-sockets) + zmq-sockets) (begin (debug:print-info 2 "Failed to login or connect to " conurl) (set! *runremote* #f) #f)))) @@ -359,18 +360,20 @@ ;; (server:self-ping server-info) ;; )) ;; "Self ping")) (th2 (make-thread (lambda () (server:run (args:get-arg "-server"))) "Server run")) - (th3 (make-thread (lambda () - (server:keep-running)) "Keep running"))) + (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) + ) (set! *client-non-blocking-mode* #t) ;; (thread-start! th1) (thread-start! th2) (thread-start! th3) (set! *didsomething* #t) - (thread-join! th3)) + ;; (thread-join! th3) + (thread-join! th2) + ) (debug:print 0 "ERROR: Failed to setup for megatest"))) (exit))) (define (server:client-signal-handler signum) (handle-exceptions