Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -1136,15 +1136,15 @@ ;; (print "zdat=" zdat) (let* ( (res #f) (rawdat (server:client-send-receive serverdat zdat)) (tmp #f)) - (print "Sent " zdat ", received " rawdat) - (set! tmp (db:string->obj newres)) + (debug:print-info 11 "Sent " zdat ", received " rawdat) + (set! tmp (db:string->obj rawdat)) ;; (if (equal? query-sig (vector-ref myres 1)) ;; (set! res - (vector-ref myres 2) + (vector-ref tmp 2) ;; (loop (server:client-send-receive serverdat zdat))))))) ;; (timeout (lambda () ;; (let loop ((n numretries)) ;; (thread-sleep! 15) ;; (if (not res) @@ -1276,11 +1276,11 @@ (for-each (lambda (item) (db:process-queue-item db pubsock item)) data))) -(define (db:process-queue-item db pubsock item) +(define (db:process-queue-item db item) (let* ((stmt-key (cdb:packet-get-qtype item)) (qry-sig (cdb:packet-get-query-sig item)) (return-address (cdb:packet-get-client-sig item)) (params (cdb:packet-get-params item)) (query (let ((q (alist-ref stmt-key db:queries))) @@ -1296,42 +1296,42 @@ ((immediate) (let ((proc (car params)) (remparams (cdr params))) ;; we are being handed a procedure so call it (debug:print-info 11 "Running (apply " proc " " remparams ")") - (server:reply pubsock return-address qry-sig #t (apply proc remparams)))) + (server:reply return-address qry-sig #t (apply proc remparams)))) ((login) (if (< (length params) 3) ;; should get toppath, version and signature - '(#f "login failed due to missing params") ;; missing params + (server:reply return-address qry-sig '(#f "login failed due to missing params")) ;; missing params (let ((calling-path (car params)) (calling-vers (cadr params)) (client-key (caddr params))) (if (and (equal? calling-path *toppath*) (equal? megatest-version calling-vers)) (begin (hash-table-set! *logged-in-clients* client-key (current-seconds)) - (server:reply pubsock return-address qry-sig #t '(#t "successful login"))) ;; path matches - pass! Should vet the caller at this time ... - (list #f (conc "Login failed due to mismatch paths: " calling-path ", " *toppath*)))))) + (server:reply return-address qry-sig #t '(#t "successful login"))) ;; path matches - pass! Should vet the caller at this time ... + (server:reply return-address qry-sig #f (list #f (conc "Login failed due to mismatch paths: " calling-path ", " *toppath*))))))) ((flush sync) - (server:reply pubsock return-address qry-sig #t 1)) ;; (length data))) + (server:reply return-address qry-sig #t 1)) ;; (length data))) ((set-verbosity) (set! *verbosity* (car params)) - (server:reply pubsock return-address qry-sig #t '(#t *verbosity*))) + (server:reply return-address qry-sig #t '(#t *verbosity*))) ((killserver) (debug:print 0 "WARNING: Server going down in 15 seconds by user request!") (open-run-close tasks:server-deregister tasks:open-db (cadr *server-info*) pullport: (caddr *server-info*)) (thread-start! (make-thread (lambda ()(thread-sleep! 15)(exit)))) - (server:reply pubsock return-address qry-sig #t '(#t "exit process started"))) + (server:reply return-address qry-sig #t '(#t "exit process started"))) (else ;; not a command, i.e. is a query (debug:print 0 "ERROR: Unrecognised query/command " stmt-key) (server:reply pubsock return-address qry-sig #f 'failed)))) (else (debug:print-info 11 "Executing " stmt-key " for " params) (apply sqlite3:execute (hash-table-ref queries stmt-key) params) - (server:reply pubsock return-address qry-sig #t #t))))) + (server:reply return-address qry-sig #t #t))))) (define (db:test-get-records-for-index-file db run-id test-name) (let ((res '())) (sqlite3:for-each-row (lambda (id itempath state status run_duration logf comment) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -77,11 +77,11 @@ (if (not (member qtype '(sync ping))) (begin (mutex-lock! *heartbeat-mutex*) (set! *last-db-access* (current-seconds)) (mutex-unlock! *heartbeat-mutex*))) - (open-run-close db:process-queue-item packet)))))) + (open-run-close db:process-queue-item open-db packet)))))) ;; This is recursively run by server:run until sucessful ;; (define (server:try-start-server ipaddrstr portnum) @@ -115,18 +115,18 @@ ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== -(define (server:reply pubsock target query-sig success/fail result) - (debug:print-info 11 "server:reply target=" target ", result=" result) +(define (server:reply return-addr query-sig success/fail result) + (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) ;; (send-message pubsock target send-more: #t) ;; (send-message pubsock (db:obj->string (vector success/fail query-sig result))) ;;====================================================================== -;; C L I E N T S +;; C L I E N T S ;;====================================================================== (define (server:get-client-signature) (if *my-client-signature* *my-client-signature* (let ((sig (server:mk-signature))) @@ -138,15 +138,18 @@ ;; 1 Hello, world! Goodbye Dolly ;; Send msg to serverdat and receive result (define (server:client-send-receive serverdat msg) (let* ((url (server:make-server-url serverdat)) (fullurl (conc url "/?dat=" msg))) - (print "url=" url ", fullurl=" fullurl) + (debug:print-info 11 "fullurl=" fullurl) (let* ((res (with-input-from-request fullurl #f read-string))) - (print "got res=" res) - (let ((match (string-search (regexp "(.*)<.body>") (caddr (string-split res "\n"))))) - (cadr match))))) + (debug:print-info 11 "got res=" res) + (let ((match (string-search (regexp "(.*)<.body>") res))) + (debug:print-info 11 "match=" match) + (let ((final (cadr match))) + (debug:print-info 11 "final=" final) + final))))) (define (server:client-login serverdat) (cdb:login serverdat *toppath* (server:get-client-signature))) ;; Not currently used! But, I think it *should* be used!!! @@ -179,25 +182,27 @@ (debug:print 0 "ERROR: failed to find megatest.config, exiting") (exit)))) (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo (let ((host (list-ref hostinfo 0)) - (iface (list-ref hostinfo 1))) + (iface (list-ref hostinfo 1)) + (port (list-ref hostinfo 2)) + (pid (list-ref hostinfo 3))) (debug:print-info 2 "Setting up to connect to " hostinfo) (server:client-connect iface port)) ;; ) (if (> numtries 0) - (let ((exe (car (argv))) + (let (;; (exe (car (argv))) (pid #f)) (debug:print-info 0 "No server available, attempting to start one...") ;; (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) ;; (string-intersperse *verbosity* ",") ;; (conc *verbosity*))))) (set! pid (process-fork (lambda () ;; (current-input-port (open-input-file "/dev/null")) ;; (current-output-port (open-output-file "/dev/null")) ;; (current-error-port (open-output-file "/dev/null")) - (server:launch)))) ;; should never get here .... + (server:launch)))) (let loop ((count 0)) (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if (not hostinfo) (begin (debug:print-info 0 "Waiting for server pid=" pid " to start") @@ -225,22 +230,23 @@ (begin (sleep 4) (loop)))))) (iface (car server-info)) (port (cadr server-info)) - (last-access 0)) - ;; (print "Keep-running got server-info " server-info) + (last-access 0) + (spid (open-run-close tasks:server-get-server-id tasks:open-db #f iface port #f))) + (print "Keep-running got server pid " spid ", using iface " iface " and port " port) (let loop ((count 0)) (thread-sleep! 4) ;; no need to do this very often ;; NB// sync currently does NOT return queue-length - (let ((queue-len (cdb:client-call server-info 'sync #t 1))) + (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) ;; (print "Server running, count is " count) (if (< count 1) ;; 3x3 = 9 secs aprox (loop (+ count 1))) ;; NOTE: Get rid of this mechanism! It really is not needed... - (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) + (open-run-close tasks:server-update-heartbeat tasks:open-db spid) ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access (mutex-lock! *heartbeat-mutex*) (set! last-access *last-db-access*) (mutex-unlock! *heartbeat-mutex*) @@ -281,18 +287,15 @@ (let* ((th2 (make-thread (lambda () (server:run (if (args:get-arg "-server") (args:get-arg "-server") "-"))) "Server run")) - ;; (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) + (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) ) - (set! *client-non-blocking-mode* #t) - ;; (thread-start! th1) (thread-start! th2) - ;; (thread-start! th3) + (thread-start! th3) (set! *didsomething* #t) - ;; (thread-join! th3) (thread-join! th2) ) (debug:print 0 "ERROR: Failed to setup for megatest"))) (exit))) @@ -299,16 +302,15 @@ (define (server:client-signal-handler signum) (handle-exceptions exn (debug:print " ... exiting ...") (let ((th1 (make-thread (lambda () - (if (not *received-response*) - (receive-message* *runremote*))) ;; flush out last call if applicable + "") ;; do nothing for now (was flush out last call if applicable) "eat response")) (th2 (make-thread (lambda () (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") - (thread-sleep! 3) ;; give the flush three seconds to do it's stuff + (thread-sleep! 1) ;; give the flush one second to do it's stuff (debug:print 0 " Done.") (exit 4)) "exit on ^C timer"))) (thread-start! th2) (thread-start! th1) Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -91,11 +91,11 @@ mdb "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface) VALUES(?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?);" pid (get-host-name) port priority (conc state) megatest-version interface) (list - (tasks:server-get-server-id mdb (get-host-name) port pid) + (tasks:server-get-server-id mdb (get-host-name) interface port pid) interface port )) ;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used! @@ -112,30 +112,35 @@ (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified")))) (define (tasks:server-deregister-self mdb hostname) (tasks:server-deregister mdb hostname pid: (current-process-id))) -(define (tasks:server-get-server-id mdb hostname port pid) +(define (tasks:server-get-server-id mdb hostname iface port pid) (let ((res #f)) (sqlite3:for-each-row (lambda (id) (set! res id)) mdb - (if (and hostname pid) - "SELECT id FROM servers WHERE hostname=? AND pid=?;" - "SELECT id FROM servers WHERE hostname=? AND port=?;") - hostname (if pid pid port)) + (cond + ((and hostname pid) "SELECT id FROM servers WHERE hostname=? AND pid=?;") + ((and iface port) "SELECT id FROM servers WHERE interface=? AND port=?;") + ((and hostname port) "SELECT id FROM servers WHERE hostname=? AND port=?;") + (else + (begin + (debug:print 0 "ERROR: tasks:server-get-server-id needs (hostname and pid) OR (iface and port) OR (hostname and port)") + "SELECT id FROM servers WHERE pid=-999;"))) + (if hostname hostname iface)(if pid pid port)) res)) (define (tasks:server-update-heartbeat mdb server-id) (sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id)) ;; alive servers keep the heartbeat field upto date with seconds every 6 or so seconds -(define (tasks:server-alive? mdb server-id #!key (hostname #f)(port #f)(pid #f)) +(define (tasks:server-alive? mdb server-id #!key (iface #f)(hostname #f)(port #f)(pid #f)) (let* ((server-id (if server-id server-id - (tasks:server-get-server-id mdb hostname port pid))) + (tasks:server-get-server-id mdb hostname iface port pid))) (heartbeat-delta 99e9)) (sqlite3:for-each-row (lambda (delta) (set! heartbeat-delta delta)) mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id) @@ -143,11 +148,11 @@ (define (tasks:client-register mdb pid hostname cmdline) (sqlite3:execute mdb "INSERT OR REPLACE INTO clients (server_id,pid,hostname,cmdline,login_time) VALUES(?,?,?,?,strftime('%s','now'));") - (tasks:server-get-server-id mdb hostname #f pid) + (tasks:server-get-server-id mdb hostname #f #f pid) pid hostname cmdline) (define (tasks:client-logout mdb pid hostname cmdline) (sqlite3:execute mdb @@ -177,34 +182,39 @@ (debug:print-info 2 "Found existing server " hostname ":" port " registered in db")) mdb "SELECT id,hostname,interface,port,pid FROM servers WHERE strftime('%s','now')-heartbeat < 10 AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version) - (if (null? res) #f - (let loop ((hed (car res)) - (tal (cdr res))) - ;; (print "hed=" hed ", tal=" tal) - (let* ((host (list-ref hed 0)) - (iface (list-ref hed 1)) - (port (list-ref hed 2)) - (pid (list-ref hed 4)) - (alive (open-run-close tasks:server-alive? tasks:open-db #f hostname: host port: port))) - (if alive - (begin - (debug:print-info 2 "Found an existing, alive, server " host ", " port ".") - (list host iface port)) - (begin - (debug:print-info 1 "Marking " host ":" port " as dead in server registry.") - (if port - (open-run-close tasks:server-deregister tasks:open-db host port: port) - (open-run-close tasks:server-deregister tasks:open-db host pid: pid)) - (if (null? tal) - #f - (loop (car tal)(cdr tal)))))))))) + ;; for now we are keeping only one server registered in the db, return #f or first server found + (if (null? res) #f (car res)))) + +;; BUG: This logic is probably needed unless methodology changes completely... +;; +;; (if (null? res) #f +;; (let loop ((hed (car res)) +;; (tal (cdr res))) +;; ;; (print "hed=" hed ", tal=" tal) +;; (let* ((host (list-ref hed 0)) +;; (iface (list-ref hed 1)) +;; (port (list-ref hed 2)) +;; (pid (list-ref hed 4)) +;; (alive (open-run-close tasks:server-alive? tasks:open-db #f hostname: host port: port))) +;; (if alive +;; (begin +;; (debug:print-info 2 "Found an existing, alive, server " host ", " port ".") +;; (list host iface port)) +;; (begin +;; (debug:print-info 1 "Marking " host ":" port " as dead in server registry.") +;; (if port +;; (open-run-close tasks:server-deregister tasks:open-db host port: port) +;; (open-run-close tasks:server-deregister tasks:open-db host pid: pid)) +;; (if (null? tal) +;; #f +;; (loop (car tal)(cdr tal)))))))))) (define (tasks:remove-server-records mdb) - (sqlite3:exec mdb "DELETE FROM servers;")) + (sqlite3:execute mdb "DELETE FROM servers;")) (define (tasks:mark-server hostname port pid state) (if port (open-run-close tasks:server-deregister tasks:open-db hostname port: port) (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid)))