@@ -33,12 +33,12 @@ ;; 3. server puts responses from completed requests into pub port ;; ;; TODO ;; ;; Done Tested -;; [ ] [ ] 1. Add columns pullport pubport to servers table -;; [ ] [ ] 2. Add rm of monitor.db if older than 11/12/2012 +;; [x] [ ] 1. Add columns pullport pubport to servers table +;; [x] [ ] 2. Add rm of monitor.db if older than 11/12/2012 ;; [x] [ ] 3. Add create of pullport and pubport with finding of available ports ;; [ ] [ ] 4. Add client compose of request ;; [ ] [ ] - name of client: testname/itempath-test_id-hostname ;; [ ] [ ] - name of request: callname, params ;; [ ] [ ] - request key: f(clientname, callname, params) @@ -249,26 +249,26 @@ (let ((sig (server:mk-signature))) (set! *my-client-signature* sig) *my-client-signature*))) ;; -(define (server:client-connect iface port #!key (context #f)) +(define (server:client-connect iface port #!key (context #f)(type 'req)) (debug:print-info 3 "client-connect " iface ":" port) (let ((connect-ok #f) (zmq-socket (if context - (make-socket 'req context) - (make-socket 'req))) + (make-socket type context) + (make-socket type))) (conurl (server:make-server-url (list iface port)))) (if (socket? zmq-socket) (begin (connect-socket zmq-socket conurl) zmq-socket) #f))) -(define (server:client-login zmq-socket) - (cdb:login zmq-socket *toppath* (server:get-client-signature))) +(define (server:client-login zmq-sockets) + (cdb:login zmq-sockets *toppath* (server:get-client-signature))) (define (server:client-logout zmq-socket) (let ((ok (and (socket? zmq-socket) (cdb:logout zmq-socket *toppath* (server:get-client-signature))))) ;; (close-socket zmq-socket) @@ -281,13 +281,14 @@ (begin (debug:print 0 "ERROR: failed to find megatest.config, exiting") (exit)))) (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo - (let ((host (car hostinfo)) - (iface (cadr hostinfo)) - (port (caddr hostinfo))) + (let ((host (list-ref hostinfo 0)) + (iface (list-ref hostinfo 1)) + (pullport (list-ref hostinfo 2)) + (pubport (list-ref hostinfo 3))) (debug:print-info 2 "Setting up to connect to " hostinfo) (handle-exceptions exn (begin ;; something went wrong in connecting to the server. In this scenario it is ok @@ -296,15 +297,20 @@ (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) (debug:print 0 " perhaps jobs killed with -9? Removing server records") (open-run-close tasks:server-deregister tasks:open-db host port: port) (server:client-setup (- numtries 1)) #f) - (let* ((zmq-socket (server:client-connect iface port)) - (login-res (server:client-login zmq-socket)) - (connect-ok (if (null? login-res) #f (car login-res))) + (let* ((push-socket (server:client-connect iface pullport 'push)) + (sub-socket (server:client-connect iface pubport 'sub)) + (zmq-sockets (vector push-socket sub-socket)) + (login-res #f) + ;; (connect-ok (conurl (server:make-server-url (list iface port)))) - (if connect-ok + (socket-option-set! sub-socket 'subscribe (server:get-client-signature)) + (set! login-res (server:client-login zmq-sockets)) + (if (and (not (null? login-res)) + (car login-res)) (begin (debug:print-info 2 "Logged in and connected to " conurl) (set! *runremote* zmq-socket) #t) (begin