Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -11,14 +11,14 @@ ;;====================================================================== ;; Database access ;;====================================================================== -(require-extension (srfi 18) extras tcp rpc) -(import (prefix rpc rpc:)) +(require-extension (srfi 18) extras tcp) ;; rpc) +;; (import (prefix rpc rpc:)) -(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n) +(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest) (import (prefix sqlite3 sqlite3:)) (use zmq) (declare (unit db)) @@ -1190,13 +1190,13 @@ (query-id (conc (server:get-client-signature) "-" (message-digest-string (md5-primitive) (conc params)))) (zdat (db:obj->string (vector query-id params))) ;; (with-output-to-string (lambda ()(serialize params)))) (res #f) (get-res (lambda () (db:string->obj (if *client-non-blocking-mode* - (receive-message* zmq-socket) - (receive-message zmq-socket)))))) - (send-message zmq-socket zdat) + (receive-message* sub-socket) + (receive-message sub-socket)))))) + (send-message push-socket zdat) (let loop ((res (get-res))) (if res res (begin (thread-sleep! 0.5) (get-res)))))) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -6,12 +6,12 @@ ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. -(require-extension (srfi 18) extras tcp rpc s11n) -(import (prefix rpc rpc:)) +(require-extension (srfi 18) extras tcp s11n) +;; (import (prefix rpc rpc:)) (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) (import (prefix sqlite3 sqlite3:)) (use zmq) @@ -47,25 +47,30 @@ ;; [ ] [ ] - return results ;; [ ] [ ] 6. Add timeout processing ;; [ ] [ ] - after 60 seconds ;; [ ] [ ] i. check server alive, connect to new if necessary ;; [ ] [ ] ii. resend request +;; [ ] [ ] 7. Turn self ping back on (define (server:make-server-url hostport) (if (not hostport) #f (conc "tcp://" (car hostport) ":" (cadr hostport)))) (define *server-loop-heart-beat* (current-seconds)) (define *heartbeat-mutex* (make-mutex)) -(define (server:self-ping iface port) - (let ((zsocket (server:client-connect iface port))) +(define (server:self-ping server-info) + ;; server-info: server-id interface pullport pubport + (let ((iface (list-ref server-info 1)) + (pullport (list-ref server-info 2)) + (pubport (list-ref server-info 3))) + (server:client-connect iface pullport pubport) (let loop () (thread-sleep! 2) - (cdb:client-call zsocket 'ping #t) - (debug:print 4 "server:self-ping - I'm alive on " iface ":" port "!") + (cdb:client-call *runremote* 'ping #t) + (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!") (mutex-lock! *heartbeat-mutex*) (set! *server-loop-heart-beat* (current-seconds)) (mutex-unlock! *heartbeat-mutex*) (loop)))) @@ -81,12 +86,12 @@ (begin (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") (exit)))) (let* ((zmq-sdat1 #f) (zmq-sdat2 #f) - (zmq-socket1 #f) - (zmq-socket2 #f) + (pull-socket #f) + (pub-socket #f) (p1 #f) (p2 #f) (zmq-sockets-dat #f) (iface (if (string=? "-" hostn) "*" ;; (get-host-name) @@ -98,16 +103,16 @@ (if ipstr ipstr hostname)))) (set! zmq-sockets-dat (server:setup-ports ipaddrstr (if (args:get-arg "-port") (string->number (args:get-arg "-port")) (+ 5000 (random 1001))))) - (set! zmq-sdat1 (car zmq-socket-dat)) - (set! zmq-socket1 (car zmq-sdat1)) + (set! zmq-sdat1 (car zmq-sockets-dat)) + (set! pull-socket (cadr zmq-sdat1)) ;; (iface s port) (set! p1 (caddr zmq-sdat1)) - (set! zmq-sdat2 (cadr zmq-socket-dat)) - (set! zmq-socket2 (car zmq-sdat2)) + (set! zmq-sdat2 (cadr zmq-sockets-dat)) + (set! pub-socket (cadr zmq-sdat2)) (set! p2 (caddr zmq-sdat2)) (set! *cache-on* #t) ;; (set! th1 (make-thread (lambda () @@ -132,16 +137,19 @@ (loop)))))))) ;; The heavy lifting ;; (let loop () - (let* ((rawmsg (receive-message* zmq-socket1)) + (let* ((rawmsg (receive-message* pull-socket)) (params (db:string->obj rawmsg)) ;; (with-input-from-string rawmsg (lambda ()(deserialize)))) (res #f)) (debug:print-info 12 "server=> received params=" params) (set! res (cdb:cached-access params)) (debug:print-info 12 "server=> processed res=" res) + + ;; need address here + ;; (send-message zmq-socket (db:obj->string res)) (if (not *time-to-exit*) (loop) (begin (open-run-close tasks:server-deregister-self tasks:open-db #f) @@ -177,11 +185,11 @@ ;; we don't want to update our heartbeat (set! pulse (- (current-seconds) server-loop-heartbeat)) (debug:print-info 2 "Heartbeat period is " pulse " seconds on " (cadr server-info) ":" (caddr server-info) ", last db access is " (- (current-seconds) *last-db-access*) " seconds ago") (if (> pulse 15) ;; must stay less than 10 seconds (begin - (open-run-close tasks:server-deregister tasks:open-db (cadr server-info) port: (caddr server-info)) + (open-run-close tasks:server-deregister tasks:open-db (cadr server-info) pullport: (caddr server-info)) (debug:print 0 "ERROR: Heartbeat failed, committing servercide") (exit)) (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))) ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access (if (> (+ *last-db-access* @@ -223,19 +231,19 @@ (let ((zmq-url (conc "tcp://" iface ":" p))) (debug:print 0 "Trying to start server on " zmq-url) (bind-socket s zmq-url) (list iface s port))))) -(define (server:setup-ports ipadrstr startport) - (let* ((s1 (server:find-free-port-and-open ipadrstr #f startport 'pub)) +(define (server:setup-ports ipaddrstr startport) + (let* ((s1 (server:find-free-port-and-open ipaddrstr #f startport 'pub)) (p1 (caddr s1)) - (s2 (server:find-free-port-and-open ipadrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pull)) + (s2 (server:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pull)) (p2 (caddr s2))) (set! *runremote* #f) - (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and p2") + (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2) (mutex-lock! *heartbeat-mutex*) - (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) iface p 0 'live)) + (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live)) (mutex-unlock! *heartbeat-mutex*) (list s1 s2))) (define (server:mk-signature) (message-digest-string (md5-primitive) @@ -249,32 +257,53 @@ (let ((sig (server:mk-signature))) (set! *my-client-signature* sig) *my-client-signature*))) ;; -(define (server:client-connect iface port #!key (context #f)(type 'req)) +(define (server:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '())) (debug:print-info 3 "client-connect " iface ":" port) (let ((connect-ok #f) (zmq-socket (if context (make-socket type context) (make-socket type))) (conurl (server:make-server-url (list iface port)))) (if (socket? zmq-socket) (begin + ;; first apply subscriptions + (for-each (lambda (subscription) + (socket-options-set! zmq-socket 'subscribe subscription)) + subscriptions) (connect-socket zmq-socket conurl) zmq-socket) #f))) - (define (server:client-login zmq-sockets) (cdb:login zmq-sockets *toppath* (server:get-client-signature))) (define (server:client-logout zmq-socket) (let ((ok (and (socket? zmq-socket) (cdb:logout zmq-socket *toppath* (server:get-client-signature))))) ;; (close-socket zmq-socket) ok)) + +(define (server:client-connect iface pullport pubport) + (let* ((push-socket (server:client-socket-connect iface pullport 'push)) + (sub-socket (server:client-socket-connect iface pubport 'sub + subscriptions: (list (server:get-client-signature) "all"))) + (zmq-sockets (vector push-socket sub-socket)) + (login-res #f)) + (set! login-res (server:client-login zmq-sockets)) + (if (and (not (null? login-res)) + (car login-res)) + (begin + (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".") + (set! *runremote* zmq-socket) + #t) + (begin + (debug:print-info 2 "Failed to login or connect to " conurl) + (set! *runremote* #f) + #f)))) ;; Do all the connection work, start a server if not already running (define (server:client-setup #!key (numtries 50)) (if (not *toppath*) (if (not (setup-for-run)) @@ -294,31 +323,14 @@ ;; something went wrong in connecting to the server. In this scenario it is ok ;; to try again (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo) (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) (debug:print 0 " perhaps jobs killed with -9? Removing server records") - (open-run-close tasks:server-deregister tasks:open-db host port: port) + (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport) (server:client-setup (- numtries 1)) #f) - (let* ((push-socket (server:client-connect iface pullport 'push)) - (sub-socket (server:client-connect iface pubport 'sub)) - (zmq-sockets (vector push-socket sub-socket)) - (login-res #f) - ;; (connect-ok - (conurl (server:make-server-url (list iface port)))) - (socket-option-set! sub-socket 'subscribe (server:get-client-signature)) - (set! login-res (server:client-login zmq-sockets)) - (if (and (not (null? login-res)) - (car login-res)) - (begin - (debug:print-info 2 "Logged in and connected to " conurl) - (set! *runremote* zmq-socket) - #t) - (begin - (debug:print-info 2 "Failed to login or connect to " conurl) - (set! *runremote* #f) - #f))))) + (server:client-connect iface pullport pubport))) (if (> numtries 0) (let ((exe (car (argv)))) (debug:print-info 1 "No server available, attempting to start one...") (process-run exe (list "-server" "-" "-debug" (conc *verbosity*))) (sleep 5) ;; give server time to start @@ -348,11 +360,12 @@ (mutex-lock! *heartbeat-mutex*) (set! server-info *server-info* ) (mutex-unlock! *heartbeat-mutex*) (if (not server-info)(loop))) (debug:print 1 "Server alive, starting self-ping") - (server:self-ping (cadr server-info)(caddr server-info)))) "Self ping")) + (server:self-ping server-info))) + "Self ping")) (th2 (make-thread (lambda () (server:run (args:get-arg "-server"))) "Server run")) (th3 (make-thread (lambda () (server:keep-running)) "Keep running"))) (set! *client-non-blocking-mode* #t) @@ -371,11 +384,11 @@ (let ((th1 (make-thread (lambda () (if (not *received-response*) (receive-message* *runremote*))) ;; flush out last call if applicable "eat response")) (th2 (make-thread (lambda () - (debug:print 0 "ERROR: Received ^C, attempting clean exit.") + (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") (thread-sleep! 3) ;; give the flush three seconds to do it's stuff (debug:print 0 " Done.") (exit 4)) "exit on ^C timer"))) (thread-start! th2) Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -25,12 +25,16 @@ (define (tasks:open-db) (let* ((dbpath (conc *toppath* "/monitor.db")) (exists (if (file-exists? dbpath) ;; BUGGISHNESS: Remove this code in six months. Today is 11/13/2012 (if (< (file-change-time dbpath) 1352851396.0) - (begin (delete-file dbpath) #f) - #t) #t)) + (begin + (debug:print 0 "NOTE: removing old db file " dbpath) + (delete-file dbpath) + #f) + #t) + #f)) (mdb (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath)) (handler (make-busy-timeout 36000))) (sqlite3:set-busy-handler! mdb handler) (sqlite3:execute mdb (conc "PRAGMA synchronous = 0;")) (if (not exists) @@ -63,11 +67,11 @@ start_time TIMESTAMP, priority INTEGER, state TEXT, mt_version TEXT, heartbeat TIMESTAMP, - CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));") + CONSTRAINT servers_constraint UNIQUE (pid,hostname,pullport,pubport));") (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY, server_id INTEGER, pid INTEGER, hostname TEXT, cmdline TEXT, @@ -81,54 +85,56 @@ ;;====================================================================== ;; Server and client management ;;====================================================================== ;; state: 'live, 'shutting-down, 'dead -(define (tasks:server-register mdb pid interface port priority state) +(define (tasks:server-register mdb pid interface pullport pubport priority state) (sqlite3:execute mdb - "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface) VALUES(?,?,?,strftime('%s','now'),?,?,?,strftime('%s','now'),?);" - pid (get-host-name) port priority (conc state) megatest-version interface) + "INSERT OR REPLACE INTO servers (pid,hostname,pullport,pubport,start_time,priority,state,mt_version,heartbeat,interface) + VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?);" + pid (get-host-name) pullport pubport priority (conc state) megatest-version interface) (list - (tasks:server-get-server-id mdb (get-host-name) port pid) + (tasks:server-get-server-id mdb (get-host-name) pullport pid) interface - port)) + pullport + pubport)) ;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used! -(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)) - (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid) +(define (tasks:server-deregister mdb hostname #!key (pullport #f)(pid #f)) + (debug:print-info 11 "server-deregister " hostname ", pullport " pullport ", pid " pid) (if pid ;; (sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid) (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid) - (if port + (if pullport ;; (sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND port=?;" hostname port) - (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND port=?;" hostname port) + (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND pullport=?;" hostname pullport) (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified")))) (define (tasks:server-deregister-self mdb hostname) (tasks:server-deregister mdb hostname pid: (current-process-id))) -(define (tasks:server-get-server-id mdb hostname port pid) +(define (tasks:server-get-server-id mdb hostname pullport pid) (let ((res #f)) (sqlite3:for-each-row (lambda (id) (set! res id)) mdb (if (and hostname pid) "SELECT id FROM servers WHERE hostname=? AND pid=?;" - "SELECT id FROM servers WHERE hostname=? AND port=?;") - hostname (if pid pid port)) + "SELECT id FROM servers WHERE hostname=? AND pullport=?;") + hostname (if pid pid pullport)) res)) (define (tasks:server-update-heartbeat mdb server-id) (sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id)) ;; alive servers keep the heartbeat field upto date with seconds every 6 or so seconds -(define (tasks:server-alive? mdb server-id #!key (hostname #f)(port #f)(pid #f)) +(define (tasks:server-alive? mdb server-id #!key (hostname #f)(pullport #f)(pid #f)) (let* ((server-id (if server-id server-id - (tasks:server-get-server-id mdb hostname port pid))) + (tasks:server-get-server-id mdb hostname pullport pid))) (heartbeat-delta 99e9)) (sqlite3:for-each-row (lambda (delta) (set! heartbeat-delta delta)) mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id) @@ -163,13 +169,13 @@ ;; remove any others. will not necessarily remove all! (define (tasks:get-best-server mdb) (let ((res '()) (best #f)) (sqlite3:for-each-row - (lambda (id hostname interface port pid) - (set! res (cons (list hostname interface port pid) res)) - (debug:print-info 2 "Found existing server " hostname ":" port " registered in db")) + (lambda (id hostname interface pullport pubport pid) + (set! res (cons (list hostname interface pullport pubport pid) res)) + (debug:print-info 2 "Found existing server " hostname ":" pullport " registered in db")) mdb "SELECT id,hostname,interface,pullport,pubport,pid FROM servers WHERE state='live' AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version) ;; (print "res=" res) (if (null? res) #f (let loop ((hed (car res)) @@ -231,14 +237,14 @@ (debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname)))))) (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row - (lambda (id pid hostname interface port start-time priority state mt-version) - (set! res (cons (vector id pid hostname interface port start-time priority state mt-version) res))) + (lambda (id pid hostname interface pullport pubport start-time priority state mt-version) + (set! res (cons (vector id pid hostname interface pullport pubport start-time priority state mt-version) res))) mdb - "SELECT id,pid,hostname,interface,port,start_time,priority,state,mt_version FROM servers ORDER BY start_time DESC;") + "SELECT id,pid,hostname,interface,pullport,pubport,start_time,priority,state,mt_version FROM servers ORDER BY start_time DESC;") res)) ;;====================================================================== ;; Tasks and Task monitors