Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -1181,30 +1181,33 @@ (let ((res (proc))) (set! *client-non-blocking-mode* #f) res)) ;; params = 'target cached remparams -(define (cdb:client-call zmq-socket . params) - (debug:print-info 11 "cdb:client-call zmq-socket=" zmq-socket " params=" params) - (let ((zdat (db:obj->string params)) ;; (with-output-to-string (lambda ()(serialize params)))) - (res #f)) - ;; (signal-mask! signal/int) - (set! *received-response* #f) +(define (cdb:client-call zmq-sockets . params) + (debug:print-info 11 "cdb:client-call zmq-sockets=" zmq-sockets " params=" params) + (let* ((push-socket (vector-ref zmq-sockets 0)) + (sub-socket (vector-ref zmq-sockets 1)) + (query-id (conc (server:get-client-signature) "-" (message-digest-string (md5-primitive) (conc params)))) + (zdat (db:obj->string (vector query-id params))) ;; (with-output-to-string (lambda ()(serialize params)))) + (res #f) + (get-res (lambda () + (db:string->obj (if *client-non-blocking-mode* + (receive-message* zmq-socket) + (receive-message zmq-socket)))))) (send-message zmq-socket zdat) - ;; (signal-unmask! signal/int) - (set! res (db:string->obj (if *client-non-blocking-mode* - (receive-message* zmq-socket) - (receive-message zmq-socket)))) - (set! *received-response* #t) - (debug:print-info 11 "zmq-socket " (car params) " res=" res) - res)) + (let loop ((res (get-res))) + (if res res + (begin + (thread-sleep! 0.5) + (get-res)))))) (define (cdb:set-verbosity zmq-socket val) (cdb:client-call zmq-socket 'set-verbosity #f val)) -(define (cdb:login zmq-socket keyval signature) - (cdb:client-call zmq-socket 'login #t keyval megatest-version signature)) +(define (cdb:login zmq-sockets keyval signature) + (cdb:client-call zmq-sockets 'login #t keyval megatest-version signature)) (define (cdb:logout zmq-socket keyval signature) (cdb:client-call zmq-socket 'logout #t keyval signature)) (define (cdb:num-clients zmq-socket) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -33,12 +33,12 @@ ;; 3. server puts responses from completed requests into pub port ;; ;; TODO ;; ;; Done Tested -;; [ ] [ ] 1. Add columns pullport pubport to servers table -;; [ ] [ ] 2. Add rm of monitor.db if older than 11/12/2012 +;; [x] [ ] 1. Add columns pullport pubport to servers table +;; [x] [ ] 2. Add rm of monitor.db if older than 11/12/2012 ;; [x] [ ] 3. Add create of pullport and pubport with finding of available ports ;; [ ] [ ] 4. Add client compose of request ;; [ ] [ ] - name of client: testname/itempath-test_id-hostname ;; [ ] [ ] - name of request: callname, params ;; [ ] [ ] - request key: f(clientname, callname, params) @@ -249,26 +249,26 @@ (let ((sig (server:mk-signature))) (set! *my-client-signature* sig) *my-client-signature*))) ;; -(define (server:client-connect iface port #!key (context #f)) +(define (server:client-connect iface port #!key (context #f)(type 'req)) (debug:print-info 3 "client-connect " iface ":" port) (let ((connect-ok #f) (zmq-socket (if context - (make-socket 'req context) - (make-socket 'req))) + (make-socket type context) + (make-socket type))) (conurl (server:make-server-url (list iface port)))) (if (socket? zmq-socket) (begin (connect-socket zmq-socket conurl) zmq-socket) #f))) -(define (server:client-login zmq-socket) - (cdb:login zmq-socket *toppath* (server:get-client-signature))) +(define (server:client-login zmq-sockets) + (cdb:login zmq-sockets *toppath* (server:get-client-signature))) (define (server:client-logout zmq-socket) (let ((ok (and (socket? zmq-socket) (cdb:logout zmq-socket *toppath* (server:get-client-signature))))) ;; (close-socket zmq-socket) @@ -281,13 +281,14 @@ (begin (debug:print 0 "ERROR: failed to find megatest.config, exiting") (exit)))) (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo - (let ((host (car hostinfo)) - (iface (cadr hostinfo)) - (port (caddr hostinfo))) + (let ((host (list-ref hostinfo 0)) + (iface (list-ref hostinfo 1)) + (pullport (list-ref hostinfo 2)) + (pubport (list-ref hostinfo 3))) (debug:print-info 2 "Setting up to connect to " hostinfo) (handle-exceptions exn (begin ;; something went wrong in connecting to the server. In this scenario it is ok @@ -296,15 +297,20 @@ (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) (debug:print 0 " perhaps jobs killed with -9? Removing server records") (open-run-close tasks:server-deregister tasks:open-db host port: port) (server:client-setup (- numtries 1)) #f) - (let* ((zmq-socket (server:client-connect iface port)) - (login-res (server:client-login zmq-socket)) - (connect-ok (if (null? login-res) #f (car login-res))) + (let* ((push-socket (server:client-connect iface pullport 'push)) + (sub-socket (server:client-connect iface pubport 'sub)) + (zmq-sockets (vector push-socket sub-socket)) + (login-res #f) + ;; (connect-ok (conurl (server:make-server-url (list iface port)))) - (if connect-ok + (socket-option-set! sub-socket 'subscribe (server:get-client-signature)) + (set! login-res (server:client-login zmq-sockets)) + (if (and (not (null? login-res)) + (car login-res)) (begin (debug:print-info 2 "Logged in and connected to " conurl) (set! *runremote* zmq-socket) #t) (begin Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -22,11 +22,15 @@ ;; Tasks db ;;====================================================================== (define (tasks:open-db) (let* ((dbpath (conc *toppath* "/monitor.db")) - (exists (file-exists? dbpath)) + (exists (if (file-exists? dbpath) + ;; BUGGISHNESS: Remove this code in six months. Today is 11/13/2012 + (if (< (file-change-time dbpath) 1352851396.0) + (begin (delete-file dbpath) #f) + #t) #t)) (mdb (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath)) (handler (make-busy-timeout 36000))) (sqlite3:set-busy-handler! mdb handler) (sqlite3:execute mdb (conc "PRAGMA synchronous = 0;")) (if (not exists) @@ -52,11 +56,12 @@ CONSTRAINT monitors_constraint UNIQUE (pid,hostname));") (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY, pid INTEGER, interface TEXT, hostname TEXT, - port INTEGER, + pullport INTEGER, + pubport INTEGER, start_time TIMESTAMP, priority INTEGER, state TEXT, mt_version TEXT, heartbeat TIMESTAMP, @@ -162,32 +167,42 @@ (sqlite3:for-each-row (lambda (id hostname interface port pid) (set! res (cons (list hostname interface port pid) res)) (debug:print-info 2 "Found existing server " hostname ":" port " registered in db")) mdb - "SELECT id,hostname,interface,port,pid FROM servers WHERE state='live' AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version) + "SELECT id,hostname,interface,pullport,pubport,pid FROM servers WHERE state='live' AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version) ;; (print "res=" res) (if (null? res) #f (let loop ((hed (car res)) (tal (cdr res))) ;; (print "hed=" hed ", tal=" tal) - (let* ((host (car hed)) - (iface (cadr hed)) - (port (caddr hed)) - (pid (cadddr hed)) - (alive (open-run-close tasks:server-alive? tasks:open-db #f hostname: host port: port))) + (let* ((host (list-ref hed 0)) + (iface (list-ref hed 1)) + (pullport (list-ref hed 2)) + (pubport (list-ref hed 3)) + (pid (list-ref hed 4)) + (alive (open-run-close tasks:server-alive? tasks:open-db #f hostname: host pullport: pullport))) (if alive (begin - (debug:print-info 2 "Found an existing, alive, server " host ":" port ".") - (list host iface port)) + (debug:print-info 2 "Found an existing, alive, server " host ", " pullport " and " pubport ".") + (list host iface pullport pubport)) (begin - (debug:print-info 1 "Removing " host ":" port " from server registry as it appears to be dead") - (tasks:kill-server #f host port pid) + (debug:print-info 1 "Marking " host ":" pullport " as dead in server registry.") + (if port + (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport) + (open-run-close tasks:server-deregister tasks:open-db host pid: pid)) (if (null? tal) #f (loop (car tal)(cdr tal)))))))))) +(define (tasks:mark-server hostname pullport pid state) + (if port + (open-run-close tasks:server-deregister tasks:open-db hostname port: port) + (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid))) + + +;; NOTE: NOT PORTED TO WORK WITH pullport/pubport (define (tasks:kill-server status hostname port pid) (debug:print-info 1 "Removing defunct server record for " hostname ":" port) (if port (open-run-close tasks:server-deregister tasks:open-db hostname port: port) (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid))