@@ -136,23 +136,24 @@ ;; The heavy lifting ;; ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime ;; (let loop ((queue-lst '())) - ;; (print "GOT HERE EH?") + (print "GOT HERE EH?") (let* ((rawmsg (receive-message* pull-socket)) (packet (db:string->obj rawmsg))) (debug:print-info 12 "server=> received packet=" packet) (if (cdb:packet-get-immediate packet) ;; process immediately or put in queue (begin - (db:process-queue pubsock (cons packet queue)) + (db:process-queue pub-socket (cons packet queue-lst)) (loop '())) - (loop (cons packet queue))))))) + (loop (cons packet queue-lst))))))) (define (server:reply pubsock target result) + (debug:print-info 11 "server:reply target=" target ", result=" result) (send-message pubsock target send-more: #t) - (send-message pubsock result)) + (send-message pubsock (db:obj->string result))) ;; run server:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (server:keep-running) @@ -169,15 +170,15 @@ (sleep 4) (loop)))))) (iface (cadr server-info)) (pullport (caddr server-info)) (pubport (cadddr server-info)) ;; id interface pullport pubport) - ;; (zmq-sockets (server:client-connect iface pullport pubport))) + ;; (zmq-sockets (server:client-connect iface pullport pubport)) ) (let loop ((count 0)) (thread-sleep! 4) ;; no need to do this very often - ;; (let ((queue-len (string->number (cdb:client-call zmq-sockets 'sync #t 1)))) + ;; (let ((queue-len (string->number (cdb:client-call zmq-sockets 'sync #t 1)))) ;; (print "Server running, count is " count) (if (< count 1) ;; 3x3 = 9 secs aprox (loop (+ count 1))) ;; NOTE: Get rid of this mechanism! It really is not needed... @@ -223,13 +224,13 @@ (debug:print 0 "Trying to start server on " zmq-url) (bind-socket s zmq-url) (list iface s port))))) (define (server:setup-ports ipaddrstr startport) - (let* ((s1 (server:find-free-port-and-open ipaddrstr #f startport 'pub)) + (let* ((s1 (server:find-free-port-and-open ipaddrstr #f startport 'pull)) (p1 (caddr s1)) - (s2 (server:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pull)) + (s2 (server:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub)) (p2 (caddr s2))) (set! *runremote* #f) (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2) (mutex-lock! *heartbeat-mutex*) (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live))