Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -1249,10 +1249,35 @@ set-verbosity killserver)) ;; not used, intended to indicate to run in calling process (define db:run-local-queries '()) ;; rollup-tests-pass-fail)) + + + + + + + + +UPDATE DB:PROCESS_QUEUE@@@@ + + + + + + + + + + + + + + + + ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -58,11 +58,27 @@ (if ipstr ipstr hostname))) (start-port (if (args:get-arg "-port") (string->number (args:get-arg "-port")) (+ 5000 (random 1001))))) (set! *cache-on* #t) - (server:try-start-server ipaddrstr portnum))) + (server:try-start-server ipaddrstr start-port))) + + +(define (server:main-loop) + (define-page (main-page-path) + (lambda () + (with-request-variables (dat) + (let* ((packet (db:string->obj dat)) + (qtype (cdb:packet-get-qtype packet))) + (debug:print-info 12 "server=> received packet=" packet) + (if (not (member qtype '(sync ping))) + (begin + (mutex-lock! *heartbeat-mutex*) + (set! *last-db-access* (current-seconds)) + (mutex-unlock! *heartbeat-mutex*))) + (open-run-close db:process-queue #f pub-socket (cons packet queue-lst))))))) + ;; This is recursively run by server:run until sucessful ;; (define (server:try-start-server ipaddrstr portnum) (handle-exceptions @@ -74,11 +90,17 @@ (print "WARNING: failed to start on portnum: " portnum ", trying next port") (sleep 1) (server:try-start-server ipaddrstr (+ portnum 1))) (print "ERROR: Tried and tried but could not start the server"))) (print "INFO: Trying to start server on portnum: " portnum) - (awful-start hello-world ip-address: ipaddrstr port: portnum))) + + (set! *runremote* (list ipaddrstr portnum)) + (open-run-close tasks:server-register + tasks:open-db + (current-process-id) + ipaddrstr portnum 0 'live) + (awful-start server:main-loop ip-address: ipaddrstr port: portnum))) (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () @@ -97,40 +119,42 @@ (define (server:get-client-signature) (if *my-client-signature* *my-client-signature* (let ((sig (server:mk-signature))) (set! *my-client-signature* sig) *my-client-signature*))) + ;; ;; ;; 1 Hello, world! Goodbye Dolly ;; Send msg to serverdat and receive result (define (server:client-send-receive serverdat msg) - (let* ((res (with-input-from-request (conc serverdat "/?dat=" msg) #f read-string)) + (let* ((res (with-input-from-request (conc (server:make-server-url serverdat) "/?dat=" msg) #f read-string)) (match (string-search (regexp "(.*)<.body>") (caddr (string-split res "\n"))))) (cadr match))) (define (server:client-login serverdat) (cdb:login serverdat *toppath* (server:get-client-signature))) ;; Not currently used! But, I think it *should* be used!!! -(define (server:client-logout zmq-socket) - (let ((ok (and (socket? zmq-socket) - (cdb:logout zmq-socket *toppath* (server:get-client-signature))))) - ;; (close-socket zmq-socket) +(define (server:client-logout serverdat) + (let ((ok (and (socket? serverdat) + (cdb:logout serverdat *toppath* (server:get-client-signature))))) + ;; (close-socket serverdat) ok)) (define (server:client-connect iface port) - (let* ((login-res #f)) + (let* ((login-res #f) + (serverdat (list iface port))) (set! login-res (server:client-login serverdat)) (if (and (not (null? login-res)) (car login-res)) (begin - (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".") + (debug:print-info 2 "Logged in and connected to " iface ":" port) (set! *runremote* serverdat) serverdat) (begin - (debug:print-info 2 "Failed to login or connect to " conurl) + (debug:print-info 2 "Failed to login or connect to " iface ":" port) (set! *runremote* #f) #f)))) ;; Do all the connection work, start a server if not already running (define (server:client-setup #!key (numtries 50)) @@ -142,11 +166,11 @@ (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo (let ((host (list-ref hostinfo 0)) (iface (list-ref hostinfo 1))) (debug:print-info 2 "Setting up to connect to " hostinfo) - (server:client-connect iface pullport pubport)) ;; ) + (server:client-connect iface port)) ;; ) (if (> numtries 0) (let ((exe (car (argv))) (pid #f)) (debug:print-info 0 "No server available, attempting to start one...") ;; (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -60,18 +60,17 @@ CONSTRAINT monitors_constraint UNIQUE (pid,hostname));") (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY, pid INTEGER, interface TEXT, hostname TEXT, - pullport INTEGER, - pubport INTEGER, + port INTEGER, start_time TIMESTAMP, priority INTEGER, state TEXT, mt_version TEXT, heartbeat TIMESTAMP, - CONSTRAINT servers_constraint UNIQUE (pid,hostname,pullport,pubport));") + CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));") (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY, server_id INTEGER, pid INTEGER, hostname TEXT, cmdline TEXT, @@ -85,58 +84,58 @@ ;;====================================================================== ;; Server and client management ;;====================================================================== ;; state: 'live, 'shutting-down, 'dead -(define (tasks:server-register mdb pid interface pullport pubport priority state) +(define (tasks:server-register mdb pid interface port priority state) (sqlite3:execute mdb - "INSERT OR REPLACE INTO servers (pid,hostname,pullport,pubport,start_time,priority,state,mt_version,heartbeat,interface) - VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?);" - pid (get-host-name) pullport pubport priority (conc state) megatest-version interface) + "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface) + VALUES(?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?);" + pid (get-host-name) port priority (conc state) megatest-version interface) (list - (tasks:server-get-server-id mdb (get-host-name) pullport pid) + (tasks:server-get-server-id mdb (get-host-name) port pid) interface - pullport - pubport)) + port + )) ;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used! -(define (tasks:server-deregister mdb hostname #!key (pullport #f)(pid #f)(action 'markdead)) - (debug:print-info 11 "server-deregister " hostname ", pullport " pullport ", pid " pid) +(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)(action 'markdead)) + (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid) (if pid (case action ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid)) (else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid))) - (if pullport + (if port (case action - ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND pullport=?;" hostname port)) - (else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND pullport=?;" hostname pullport))) + ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND port=?;" hostname port)) + (else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND port=?;" hostname port))) (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified")))) (define (tasks:server-deregister-self mdb hostname) (tasks:server-deregister mdb hostname pid: (current-process-id))) -(define (tasks:server-get-server-id mdb hostname pullport pid) +(define (tasks:server-get-server-id mdb hostname port pid) (let ((res #f)) (sqlite3:for-each-row (lambda (id) (set! res id)) mdb (if (and hostname pid) "SELECT id FROM servers WHERE hostname=? AND pid=?;" - "SELECT id FROM servers WHERE hostname=? AND pullport=?;") - hostname (if pid pid pullport)) + "SELECT id FROM servers WHERE hostname=? AND port=?;") + hostname (if pid pid port)) res)) (define (tasks:server-update-heartbeat mdb server-id) (sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id)) ;; alive servers keep the heartbeat field upto date with seconds every 6 or so seconds -(define (tasks:server-alive? mdb server-id #!key (hostname #f)(pullport #f)(pid #f)) +(define (tasks:server-alive? mdb server-id #!key (hostname #f)(port #f)(pid #f)) (let* ((server-id (if server-id server-id - (tasks:server-get-server-id mdb hostname pullport pid))) + (tasks:server-get-server-id mdb hostname port pid))) (heartbeat-delta 99e9)) (sqlite3:for-each-row (lambda (delta) (set! heartbeat-delta delta)) mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id) @@ -171,47 +170,45 @@ ;; remove any others. will not necessarily remove all! (define (tasks:get-best-server mdb) (let ((res '()) (best #f)) (sqlite3:for-each-row - (lambda (id hostname interface pullport pubport pid) - (set! res (cons (list hostname interface pullport pubport pid) res)) - (debug:print-info 2 "Found existing server " hostname ":" pullport " registered in db")) + (lambda (id hostname interface port pid) + (set! res (cons (list hostname interface port pid) res)) + (debug:print-info 2 "Found existing server " hostname ":" port " registered in db")) mdb - "SELECT id,hostname,interface,pullport,pubport,pid FROM servers + "SELECT id,hostname,interface,port,pid FROM servers WHERE strftime('%s','now')-heartbeat < 10 AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version) (if (null? res) #f (let loop ((hed (car res)) (tal (cdr res))) ;; (print "hed=" hed ", tal=" tal) (let* ((host (list-ref hed 0)) (iface (list-ref hed 1)) - (pullport (list-ref hed 2)) - (pubport (list-ref hed 3)) + (port (list-ref hed 2)) (pid (list-ref hed 4)) - (alive (open-run-close tasks:server-alive? tasks:open-db #f hostname: host pullport: pullport))) + (alive (open-run-close tasks:server-alive? tasks:open-db #f hostname: host port: port))) (if alive (begin - (debug:print-info 2 "Found an existing, alive, server " host ", " pullport " and " pubport ".") - (list host iface pullport pubport)) + (debug:print-info 2 "Found an existing, alive, server " host ", " port ".") + (list host iface port)) (begin - (debug:print-info 1 "Marking " host ":" pullport " as dead in server registry.") - (if pullport - (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport) + (debug:print-info 1 "Marking " host ":" port " as dead in server registry.") + (if port + (open-run-close tasks:server-deregister tasks:open-db host port: port) (open-run-close tasks:server-deregister tasks:open-db host pid: pid)) (if (null? tal) #f (loop (car tal)(cdr tal)))))))))) -(define (tasks:mark-server hostname pullport pid state) +(define (tasks:mark-server hostname port pid state) (if port (open-run-close tasks:server-deregister tasks:open-db hostname port: port) (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid))) -;; NOTE: NOT PORTED TO WORK WITH pullport/pubport (define (tasks:kill-server status hostname port pid) (debug:print-info 1 "Removing defunct server record for " hostname ":" port) (if port (open-run-close tasks:server-deregister tasks:open-db hostname port: port) (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid)) @@ -242,14 +239,14 @@ (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row - (lambda (id pid hostname interface pullport pubport start-time priority state mt-version last-update) - (set! res (cons (vector id pid hostname interface pullport pubport start-time priority state mt-version last-update) res))) + (lambda (id pid hostname interface port start-time priority state mt-version last-update) + (set! res (cons (vector id pid hostname interface port start-time priority state mt-version last-update) res))) mdb - "SELECT id,pid,hostname,interface,pullport,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update FROM servers ORDER BY start_time DESC;") + "SELECT id,pid,hostname,interface,port,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update FROM servers ORDER BY start_time DESC;") res)) ;;====================================================================== ;; Tasks and Task monitors