Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -43,35 +43,49 @@ (mutex-lock! *heartbeat-mutex*) (set! *server-loop-heart-beat* (current-seconds)) (mutex-unlock! *heartbeat-mutex*) (loop)))) +(define-inline (zmqsock:get-pub dat)(vector-ref dat 0)) +(define-inline (zmqsock:get-pull dat)(vector-ref dat 1)) +(define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0)) +(define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0)) + (define (server:run hostn) (debug:print 0 "Attempting to start the server ...") (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") (exit)))) - (let* ((zmq-socket #f) - (zmq-socket-dat #f) - (iface (if (string=? "-" hostn) - "*" ;; (get-host-name) - hostn)) - (hostname (get-host-name)) - (ipaddrstr (let ((ipstr (if (string=? "-" hostn) - (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") - #f))) - (if ipstr ipstr hostname))) - (actual-port #f)) - ;; (set! zmq-socket (server:find-free-port-and-open iface zmq-socket 5555 0)) - (set! zmq-socket-dat (server:find-free-port-and-open ipaddrstr zmq-socket (if (args:get-arg "-port") - (string->number (args:get-arg "-port")) - (+ 5000 (random 1001))) - 0)) - (set! zmq-socket (cadr zmq-socket-dat)) - (set! actual-port (caddr zmq-socket-dat)) + (let* ((zmq-sdat1 #f) + (zmq-sdat2 #f) + (zmq-socket1 #f) + (zmq-socket2 #f) + (p1 #f) + (p2 #f) + (zmq-sockets-dat #f) + (iface (if (string=? "-" hostn) + "*" ;; (get-host-name) + hostn)) + (hostname (get-host-name)) + (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + #f))) + (if ipstr ipstr hostname)))) + (set! zmq-sockets-dat (server:setup-ports ipaddrstr (if (args:get-arg "-port") + (string->number (args:get-arg "-port")) + (+ 5000 (random 1001))))) + + (set! zmq-sdat1 (car zmq-socket-dat)) + (set! zmq-socket1 (car zmq-sdat1)) + (set! p1 (caddr zmq-sdat1)) + + (set! zmq-sdat2 (cadr zmq-socket-dat)) + (set! zmq-socket2 (car zmq-sdat2)) + (set! p2 (caddr zmq-sdat2)) + (set! *cache-on* #t) ;; (set! th1 (make-thread (lambda () ;; (server:self-ping ipaddrstr actual-port)))) ;; (thread-start! th1) @@ -79,11 +93,11 @@ ;; what to do when we quit ;; (on-exit (lambda () (if (and *toppath* *server-info*) (begin - (open-run-close tasks:server-deregister-self tasks:open-db ipaddrstr)) + (open-run-close tasks:server-deregister-self tasks:open-db ipaddrstr p1 p2)) (let loop () (let ((queue-len 0)) (thread-sleep! (random 5)) (mutex-lock! *incoming-mutex*) (set! queue-len (length *incoming-data*)) @@ -94,21 +108,13 @@ (loop)))))))) ;; The heavy lifting ;; (let loop () - ;; ;; Ugly yuk. - ;; (mutex-lock! *incoming-mutex*) - ;; (set! *server-loop-heart-beat* (list 'waiting (current-seconds))) - ;; (mutex-unlock! *incoming-mutex*) - (let* ((rawmsg (receive-message* zmq-socket)) + (let* ((rawmsg (receive-message* zmq-socket1)) (params (db:string->obj rawmsg)) ;; (with-input-from-string rawmsg (lambda ()(deserialize)))) (res #f)) - ;;; Ugly yuk. - ;; (mutex-lock! *incoming-mutex*) - ;; (set! *server-loop-heart-beat* (list 'working (current-seconds))) - ;; (mutex-unlock! *incoming-mutex*) (debug:print-info 12 "server=> received params=" params) (set! res (cdb:cached-access params)) (debug:print-info 12 "server=> processed res=" res) (send-message zmq-socket (db:obj->string res)) (if (not *time-to-exit*) @@ -172,12 +178,12 @@ (thread-sleep! 1) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") (exit))))))) -(define (server:find-free-port-and-open iface s port #!key (trynum 50)) - (let ((s (if s s (make-socket 'rep))) +(define (server:find-free-port-and-open iface s port stype #!key (trynum 50)) + (let ((s (if s s (make-socket stype))) (p (if (number? port) port 5555)) (old-handler (current-exception-handler))) (handle-exceptions exn (begin @@ -186,20 +192,28 @@ ;; (old-handler) ;; (print-call-chain) (if (> trynum 0) (server:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1)) (debug:print-info 0 "Tried ports up to " p - " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use"))) + " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use")) + (exit)) ;; To exit or not? That is the question. (let ((zmq-url (conc "tcp://" iface ":" p))) - (print "Trying to start server on " zmq-url) + (debug:print 0 "Trying to start server on " zmq-url) (bind-socket s zmq-url) - (set! *runremote* #f) - (debug:print 0 "Server started on " zmq-url) - (mutex-lock! *heartbeat-mutex*) - (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) iface p 0 'live)) - (mutex-unlock! *heartbeat-mutex*) (list iface s port))))) + +(define (server:setup-ports ipadrstr startport) + (let* ((s1 (server:find-free-port-and-open ipadrstr #f startport 'pub)) + (p1 (caddr s1)) + (s2 (server:find-free-port-and-open ipadrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pull)) + (p2 (caddr s2))) + (set! *runremote* #f) + (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and p2") + (mutex-lock! *heartbeat-mutex*) + (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) iface p 0 'live)) + (mutex-unlock! *heartbeat-mutex*) + (list s1 s2))) (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda ()