Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -38,23 +38,30 @@ (define *waiting-queue* (make-hash-table)) (define *test-meta-updated* (make-hash-table)) (define *globalexitstatus* 0) ;; attempt to work around possible thread issues (define *passnum* 0) ;; when running track calls to run-tests or similar +;; SERVER +(define *my-client-signature* #f) (define *rpc:listener* #f) ;; if set up for server communication this will hold the tcp port (define *runremote* #f) ;; if set up for server communication this will hold (define *last-db-access* (current-seconds)) ;; update when db is accessed via server (define *max-cache-size* 0) +(define *logged-in-clients* (make-hash-table)) + + (define *target* (make-hash-table)) ;; cache the target here; target is keyval1/keyval2/.../keyvalN (define *keys* (make-hash-table)) ;; cache the keys here (define *keyvals* (make-hash-table)) (define *toptest-paths* (make-hash-table)) ;; cache toptest path settings here (define *test-paths* (make-hash-table)) ;; cache test-id to test run paths here (define *test-ids* (make-hash-table)) ;; cache run-id, testname, and item-path => test-id (define *test-info* (make-hash-table)) ;; cache the test info records, update the state, status, run_duration etc. from testdat.db (define *run-info-cache* (make-hash-table)) ;; run info is stable, no need to reget + + ;; Debugging stuff (define *verbosity* 1) (define *logging* #f) Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -1105,18 +1105,19 @@ (if (not cached?)(db:write-cached-data)) ;; Any special calls are dispatched here. ;; Remainder are put in the db queue (case qry-name ((login) ;; login checks that the megatest path matches - (if (eq? (length remparam) 2) ;; should get toppath and signature - #f ;; no path - fail! - (let ((calling-path (car remparam))) + (if (< (length remparam) 2) ;; should get toppath and signature + '(#f "login failed due to missing params") ;; missing params + (let ((calling-path (car remparam)) + (client-key (cadr remparam))) (if (equal? calling-path *toppath*) (begin - (hash-table-set! *logged-in-clients* (cadr remparam) (current-seconds)) - #t) ;; path matches - pass! Should vet the caller at this time ... - #f)))) ;; else fail to login + (hash-table-set! *logged-in-clients* client-key (current-seconds)) + '(#t "successful login")) ;; path matches - pass! Should vet the caller at this time ... + (list #f (conc "Login failed due to mismatch paths: " calling-path ", " *toppath*)))))) ((logout) (if (and (> (length remparam) 1) (eq? *toppath* (car remparam)) (hash-table-ref/default *logged-in-clients* (cadr remparam) #f)) #t @@ -1283,11 +1284,11 @@ (hash-table-set! queries stmt-key (sqlite3:prepare db (car stmt))) (if (procedure? stmt-key) (hash-table-set! queries stmt-key #f) (debug:print 0 "ERROR: Missing query spec for " stmt-key "!"))))))) data) - + ;; outer loop to handle special queries that cannot be handled in the ;; transaction. (let outerloop ((special-qry #f) (stmts data)) (if special-qry Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -263,20 +263,20 @@ ;; Start the server - can be done in conjunction with -runall or -runtests (one day...) ;; we start the server if not running else start the client thread ;;====================================================================== (if (args:get-arg "-server") - (server:launch)) - -(define *logged-in-clients* (make-hash-table)) + (begin + (debug:print 1 "Launching server...") + (server:launch))) (if (or (args:get-arg "-listservers") (args:get-arg "-killserver")) (let ((tl (setup-for-run))) (if tl (let ((servers (open-run-close tasks:get-all-servers tasks:open-db)) - (fmtstr "~5a~8a~20a~5a~20a~8a~10a\n")) + (fmtstr "~5a~8a~20a~5a~20a~9a~10a\n")) (format #t fmtstr "Id" "Pid" "Host" "Port" "Time" "Priority" "State") (format #t fmtstr "==" "===" "====" "====" "====" "========" "=====") (for-each (lambda (server) (let* ((id (vector-ref server 0)) @@ -284,33 +284,39 @@ (hostname (vector-ref server 2)) (port (vector-ref server 3)) (start-time (vector-ref server 4)) (priority (vector-ref server 5)) (state (vector-ref server 6)) - (accessible (handle-exceptions + (status (handle-exceptions exn - #f - (let ((zmq-socket (server:client-login hostname port))) + (conc "EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) + (let ((zmq-socket (server:client-connect hostname port))) (if zmq-socket - (server:client-logout zmq-socket) - #f))))) + (if (server:client-login zmq-socket) + (begin + (server:client-logout zmq-socket) + (close-socket zmq-socket) + "ACCESSIBLE") ;; (server:client-logout zmq-socket) + (begin + (close-socket zmq-socket) + "CAN'T LOGIN")) + "CAN'T CONNECT"))))) (format #t fmtstr id pid hostname port start-time priority - (cond - (accessible "ACCESSIBLE") - (else "DEAD"))))) - servers))))) - -(if (or (let ((res #f)) - (for-each - (lambda (key) - (if (args:get-arg key)(set! res #t))) - (list "-h" "-version" "-gen-megatest-area" "-gen-megatest-test")) - res) - (eq? (length (hash-table-keys args:arg-hash)) 0)) - (debug:print-info 1 "No server needed") - (server:client-launch)) - + status))) + servers) + (set! *didsomething* #t)))) + ;; if not list or kill then start a client (if appropriate) + (if (or (let ((res #f)) + (for-each + (lambda (key) + (if (args:get-arg key)(set! res #t))) + (list "-h" "-version" "-gen-megatest-area" "-gen-megatest-test")) + res) + (eq? (length (hash-table-keys args:arg-hash)) 0)) + (debug:print-info 1 "Server connection not needed") + (server:client-launch))) + ;;====================================================================== ;; Remove old run(s) ;;====================================================================== ;; since several actions can be specified on the command line the removal Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -31,64 +31,48 @@ (define *time-to-exit* #f) (define (server:run hostn) (debug:print 0 "Attempting to start the server ...") (if (not *toppath*)(setup-for-run)) - (let* ((hostport (open-run-close tasks:get-best-server tasks:open-db)) ;; do whe already have a server running? - (host:port (server:make-server-url hostport))) - (if host:port - (begin - (debug:print 0 "NOTE: server already running.") - (if (server:client-setup) - (begin - (debug:print-info 0 "Server is alive, not starting another")) - (begin - (debug:print-info 0 "Server is dead, deregistering it, please try again") - (open-run-close tasks:server-deregister tasks:open-db (car hostport) port: (cadr port)) - ;; (server:run hostn) - (debug:print 0 "WOULD NORMALLY START ANOTHER SERVER HERE") - ) - ) - ) - (let* ((zmq-socket #f) - (hostname (if (string=? "-" hostn) - (get-host-name) - hostn)) - (ipaddrstr (let ((ipstr (if (string=? "-" hostn) - (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") - #f))) - (if ipstr ipstr hostname)))) - (set! zmq-socket (server:find-free-port-and-open ipaddrstr zmq-socket 5555 0)) - (set! *cache-on* #t) - - ;; what to do when we quit - ;; - (on-exit (lambda () - (open-run-close tasks:server-deregister-self tasks:open-db) - (let loop () - (let ((queue-len 0)) - (thread-sleep! (random 5)) - (mutex-lock! *incoming-mutex*) - (set! queue-len (length *incoming-data*)) - (mutex-unlock! *incoming-mutex*) - (if (> queue-len 0) - (begin - (debug:print-info 0 "Queue not flushed, waiting ...") - (loop))))))) - - ;; The heavy lifting - ;; - (let loop () - (let* ((rawmsg (receive-message zmq-socket)) - (params (db:string->obj rawmsg)) ;; (with-input-from-string rawmsg (lambda ()(deserialize)))) - (res #f)) - (debug:print-info 12 "server=> received params=" params) - (set! res (cdb:cached-access params)) - (debug:print-info 12 "server=> processed res=" res) - (send-message zmq-socket (db:obj->string res)) - (if *time-to-exit* (exit)) - (loop))))))) + (let* ((zmq-socket #f) + (hostname (if (string=? "-" hostn) + (get-host-name) + hostn)) + (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + #f))) + (if ipstr ipstr hostname)))) + (set! zmq-socket (server:find-free-port-and-open ipaddrstr zmq-socket 5555 0)) + (set! *cache-on* #t) + + ;; what to do when we quit + ;; + (on-exit (lambda () + (open-run-close tasks:server-deregister-self tasks:open-db) + (let loop () + (let ((queue-len 0)) + (thread-sleep! (random 5)) + (mutex-lock! *incoming-mutex*) + (set! queue-len (length *incoming-data*)) + (mutex-unlock! *incoming-mutex*) + (if (> queue-len 0) + (begin + (debug:print-info 0 "Queue not flushed, waiting ...") + (loop))))))) + + ;; The heavy lifting + ;; + (let loop () + (let* ((rawmsg (receive-message zmq-socket)) + (params (db:string->obj rawmsg)) ;; (with-input-from-string rawmsg (lambda ()(deserialize)))) + (res #f)) + (debug:print-info 12 "server=> received params=" params) + (set! res (cdb:cached-access params)) + (debug:print-info 12 "server=> processed res=" res) + (send-message zmq-socket (db:obj->string res)) + (if *time-to-exit* (exit)) + (loop))))) ;; run server:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (server:keep-running) @@ -95,38 +79,38 @@ ;; if none running or if > 20 seconds since ;; server last used then start shutdown (let loop ((count 0)) (thread-sleep! 1) ;; no need to do this very often (db:write-cached-data) - (if (< count 100) - (loop 0) + (print "Server running, count is " count) + (if (< count 10) + (loop (+ count 1)) (let ((numrunning (open-run-close db:get-count-tests-running #f))) (if (or (> numrunning 0) (> (+ *last-db-access* 60)(current-seconds))) (begin (debug:print-info 0 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*)) - (loop (+ count 1))) + (loop 0))) (begin (debug:print-info 0 "Starting to shutdown the server side") ;; need to delete only *my* server entry (future use) (open-run-close db:del-var #f "SERVER") (thread-sleep! 10) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") - ;; (exit))) - )))))) + ))))) -(define (server:find-free-port-and-open host s port trynum) +(define (server:find-free-port-and-open host s port #!key (trynum 50)) (let ((s (if s s (make-socket 'rep))) (p (if (number? port) port 5555))) (handle-exceptions exn (begin (debug:print 0 "Failed to bind to port " p ", trying next port") (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) - (if (< trynum 100) - (server:find-free-port-and-open host s (+ p 1) (+ trynum 1)) + (if (> trynum 0) + (server:find-free-port-and-open host s (+ p 1) trynum: (- trynum 1)) (debug:print-info 0 "Tried ports from " (- p trynum) " to " p " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use"))) (let ((zmq-url (conc "tcp://" host ":" p))) (print "Trying to start server on " zmq-url) (bind-socket s zmq-url) @@ -140,91 +124,90 @@ (with-output-to-string (lambda () (write (list (current-directory) (argv))))))) -;; MOVE ME TO COMMON -(define *my-client-signature* #f) +(define (server:get-client-signature) + (if *my-client-signature* *my-client-signature* + (let ((sig (server:mk-signature))) + (set! *my-client-signature* sig) + *my-client-signature*))) -(define (server:client-login host port) +;; +(define (server:client-connect host port) (let ((connect-ok #f) (zmq-socket (make-socket 'req)) - (mysig (if *my-client-signature* *my-client-signature* (server:mk-signature))) (conurl (server:make-server-url (list host port)))) - (set! *my-client-signature* mysig) (connect-socket zmq-socket conurl) - (if (cdb:login zmq-socket *toppath* mysig) - zmq-socket - (if (socket? *runremote*) - (begin - (close-socket *runremote*) - #f) - zmq-socket)))) + zmq-socket)) + + +(define (server:client-login zmq-socket) + (cdb:login zmq-socket *toppath* (server:get-client-signature))) (define (server:client-logout zmq-socket) - (and (socket? zmq-socket) - (cdb:logout zmq-socket *toppath* *my-client-signature*) - (close-socket zmq-socket))) + (let ((ok (and (socket? zmq-socket) + (cdb:logout zmq-socket *toppath* (server:get-client-signature))))) + (close-socket zmq-socket) + ok)) -;;; IS THIS NEEDED? -(define (server:client-setup) +;; Do all the connection work, start a server if not already running +(define (server:client-setup #!key (numtries 10)) (if (not *toppath*)(setup-for-run)) - (let* ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)) - (zmq-socket (make-socket 'req))) + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo - (begin + (let* ((host (car hostinfo)) + (port (cadr hostinfo))) (debug:print-info 2 "Setting up to connect to " hostinfo) (handle-exceptions exn (begin (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo) (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) (debug:print 0 " perhaps jobs killed with -9? Removing server records") - (open-run-close tasks:server-deregister tasks:open-db (car hostinfo) port: (cadr hostinfo)) - ;; (exit) ;; why forced exit? + (open-run-close tasks:server-deregister tasks:open-db host port: port) #f) - ;; REPLACE WITH server:client-login - ;; - (let ((connect-ok #f) - (conurl (server:make-server-url hostinfo))) - (connect-socket zmq-socket conurl) - (set! connect-ok (cdb:login zmq-socket *toppath* *my-client-signature*)) + (let* ((zmq-socket (server:client-connect host port)) + (login-res (server:client-login zmq-socket)) + (connect-ok (if (null? login-res) #f (car login-res))) + (conurl (server:make-server-url hostinfo))) (if connect-ok (begin (debug:print-info 2 "Logged in and connected to " conurl) (set! *runremote* zmq-socket) #t) (begin (debug:print-info 2 "Failed to login or connect to " conurl) (set! *runremote* #f) #f))))) - (begin - (debug:print-info 0 "NO SERVER RUNNING! PLEASE START ONE! E.g. \"megatest -server - &\"") - ;; (debug:print-info 2 "No server available, attempting to start one...") - ;; (system (conc (car (argv)) " -server - " (if (args:get-arg "-debug") - ;; (conc "-debug " (args:get-arg "-debug")) - ;; "") - ;; " &")) - ;; (sleep 5) - ;; (server:client-setup) - )))) + (if (> numtries 0) + (let ((exe (car (argv)))) + (debug:print-info 1 "No server available, attempting to start one...") + (process-run exe (list "-server" "-" "-debug" (conc *verbosity*))) + ;; (system (conc " -server - " (if (args:get-arg "-debug") + ;; (conc "-debug " (args:get-arg "-debug")) + ;; "") + ;; " &")) + (sleep 10) + (server:client-setup numtries: (- numtries 1))) + (debug:print-info 1 "Too many retries, giving up"))))) (define (server:launch) (let* ((toppath (setup-for-run))) (debug:print-info 0 "Starting the standalone server") (if *toppath* (let* ((th2 (make-thread (lambda () - (server:run (args:get-arg "-server"))))) - (th3 (make-thread (lambda () - (server:keep-running))))) - (thread-start! th3) + (server:run (args:get-arg "-server")))))) + ;; (th3 (make-thread (lambda () + ;; (server:keep-running))))) (thread-start! th2) - (thread-join! th2) - (set! *didsomething* #t)) + ;; (thread-start! th3) + (set! *didsomething* #t) + (thread-join! th2)) (debug:print 0 "ERROR: Failed to setup for megatest")))) (define (server:client-launch) (if (server:client-setup) (debug:print-info 0 "connected as client") (begin (debug:print 0 "ERROR: Failed to connect as client") (exit)))) Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -132,11 +132,11 @@ (let ((res '())) (sqlite3:for-each-row (lambda (id pid hostname port start-time priority state) (set! res (cons (vector id pid hostname port start-time priority state) res))) mdb - "SELECT id,pid,hostname,port,start_time,priority,state FROM servers ORDER BY start_time ASC;") + "SELECT id,pid,hostname,port,start_time,priority,state FROM servers ORDER BY start_time DESC;") res)) ;;====================================================================== ;; Tasks and Task monitors Index: tests/tests.scm ================================================================== --- tests/tests.scm +++ tests/tests.scm @@ -82,38 +82,32 @@ (test "server-register, get-best-server" '("bob" 1234) (let ((res #f)) (open-run-close tasks:server-register tasks:open-db 1 "bob" 1234 100 'live) (set! res (open-run-close tasks:get-best-server tasks:open-db)) res)) -(test "de-register server" #f (let ((res #f)) +(test "de-register server" #t (let ((res #f)) (open-run-close tasks:server-deregister tasks:open-db "bob" port: 1234) - (open-run-close tasks:get-best-server tasks:open-db))) - -;; (exit) - -(set! *verbosity* 3) ;; enough to trigger turning off exception handling in db accesses -(define server-pid (process-run "../../bin/megatest" (list "-server" "-" "-debug" (conc *verbosity*)))) -(sleep 3) -(set! *verbosity* 1) + (list? (open-run-close tasks:get-best-server tasks:open-db)))) (define hostinfo #f) (test #f #t (let ((dat (open-run-close tasks:get-best-server tasks:open-db))) (set! hostinfo dat) (and (string? (car dat)) (number? (cadr dat))))) -(test #f #t (socket? (let ((s (server:client-login (car hostinfo)(cadr hostinfo)))) - (set! *runremote* s) - s))) - -(define th1 (make-thread (lambda ()(server:client-setup)))) -(thread-start! th1) - -(test #f #t (cdb:login *runremote* *toppath* *my-client-signature*)) +(test #f #t (let ((zmq-socket (apply server:client-connect hostinfo))) + (set! *runremote* zmq-socket) + (socket? *runremote*))) + +(test #f #t (let ((res (server:client-login *runremote*))) + (car res))) + (test #f #t (socket? *runremote*)) -(exit) +;; (test #f #t (server:client-setup)) + +(test #f #t (car (cdb:login *runremote* *toppath* *my-client-signature*))) ;;====================================================================== ;; C O N F I G F I L E S ;;======================================================================