Index: api.scm ================================================================== --- api.scm +++ api.scm @@ -18,21 +18,26 @@ ;; You should have received a copy of the GNU General Public License ;; along with Megatest. If not, see . ;; ;;====================================================================== -(use srfi-69 posix) - (declare (unit api)) (declare (uses rmt)) (declare (uses db)) (declare (uses dbmod)) (declare (uses dbfile)) (declare (uses tasks)) +(declare (uses tcp-transportmod)) (import dbmod) (import dbfile) +(import tcp-transportmod) + +(use srfi-69 + posix + matchable + s11n) ;; allow these queries through without starting a server ;; (define api:read-only-queries '(get-key-val-pairs @@ -225,32 +230,36 @@ payload: `((params . ,params) (ok-res . #f))) (vector #t res)))))))) ;; indat is (cmd run-id params meta) -(define (api:tcp-dispatch-request dbstruct indat) ;; cmd run-id params) - (set! *api-process-request-count* (+ *api-process-request-count* 1)) - (match (deserialize indat) - ((cmd run-id params meta) - (let* ((status (cond - ((> *api-process-request-count* 50) 'busy) - ((> *api-process-request-count* 25) 'loaded) - (else 'ok))) - (errmsg (case status - ((busy) (conc "Server overloaded, "*api-process-request-count*" threads in flight")) - ((loaded) (conc "Server loaded, "*api-process-request-count*" threads in flight")) - (else #f))) - (result (case status - ((busy) #f) - (else (api:dispatch-request dbstruct cmd run-id params)))) - (payload (list status errmsg result '())) - (pdat (serialize payload))) - (set! *api-process-request-count* (- *api-process-request-count* 1)) - pdat)) - (else - (let* ((msg (conc "(deserialize indat)="(deserialize indat)", indat="indat))) - (assert #f "FATAL: failed to deserialize indat "msg))))) +(define (api:tcp-dispatch-request-make-handler dbstruct) ;; cmd run-id params) + (lambda () + (let* ((indat (deserialize))) + (set! *api-process-request-count* (+ *api-process-request-count* 1)) + (match indat + ((cmd run-id params meta) + (let* ((status (cond + ((> *api-process-request-count* 50) 'busy) + ((> *api-process-request-count* 25) 'loaded) + (else 'ok))) + (errmsg (case status + ((busy) (conc "Server overloaded, "*api-process-request-count*" threads in flight")) + ((loaded) (conc "Server loaded, "*api-process-request-count*" threads in flight")) + (else #f))) + (result (case status + ((busy) #f) + (else + (case cmd + ((ping) (tt:mk-signature *toppath*)) + (else + (api:dispatch-request dbstruct cmd run-id params)))))) + (payload (list status errmsg result '()))) + (set! *api-process-request-count* (- *api-process-request-count* 1)) + (serialize payload))) + (else + (assert #f "FATAL: failed to deserialize indat "indat)))))) (define (api:dispatch-request dbstruct cmd run-id params) (case cmd ;;=============================================== Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -941,11 +941,11 @@ (case (rmt:transport-mode) ((http)(http-transport:launch)) ((tcp) (debug:print 0 *default-log-port* "INFO: Running using tcp method.") (if run-id - (tt:start-server tl run-id dbfname api:tcp-dispatch-request) + (tt:start-server tl run-id dbfname api:tcp-dispatch-request-make-handler) (begin (debug:print 0 *default-log-port* "ERROR: transport mode is tcp - -run-id is required.") (exit 1)))) (else (debug:print 0 *default-log-port* "ERROR: rmt:transport-mode value not recognised "(rmt:transport-mode)))) (set! *didsomething* #t))) Index: tcp-transportmod.scm ================================================================== --- tcp-transportmod.scm +++ tcp-transportmod.scm @@ -98,39 +98,67 @@ (cmd-thread #f) (last-access (current-seconds)) ) (define (tt:make-remote areapath) - (make-tt area: areapath)) + (make-tt areapath: areapath)) ;; do all the busy work of finding and setting up conn for ;; connecting to a server ;; -(define (tt:client-connect-to-server ttdat dbfname run-id) - (let* ((conn (hash-table-ref/default (tt-conns ttdat) dbfname #f))) +(define (tt:client-connect-to-server ttdat dbfname run-id ) + (let* ((conn (hash-table-ref/default (tt-conns ttdat) dbfname #f)) + (server-start-proc (lambda () + (tt:server-process-run + (tt-areapath ttdat) + (dbfile:testsuite-name) + (common:find-local-megatest) + run-id)))) (if conn conn ;; we are already connected to the server (let* ((sdat (tt:get-current-server-info ttdat dbfname run-id))) (match sdat - ((host port start-time server-id pid) - (let ((conn (make-tt-conn - host: host - port: port - dbfname: dbfname - server-id: server-id - server-start: start-time - pid: pid))) + ((host port start-time server-id pid dbfname2) + (assert (equal? dbfname dbfname2) "FATAL: read server info from wrong file.") + (let* ((host-port (conc host":"port)) + (conn (make-tt-conn + host: host + port: port + host-port: host-port + dbfname: dbfname + server-id: server-id + server-start: start-time + pid: pid))) (hash-table-set! (tt-conns ttdat) dbfname conn) - conn)) + ;; verify we can talk to this server + (if (tt:ping host port server-id) + conn + (begin + ;; rm the (last server) would go here + (server-start-proc) + (thread-sleep! 1) + (tt:client-connect-to-server ttdat dbfname run-id))))) (else - (tt:server-process-run - (tt-areapath ttdat) - (dbfile:testsuite-name) - (common:find-local-megatest) - run-id) + (server-start-proc) (thread-sleep! 1) (tt:client-connect-to-server ttdat dbfname run-id))))))) + +(define (tt:ping host port server-id) + (let* ((res (tt:send-receive-direct host port `(ping #f #f #f)))) ;; please send me your server-id + ;; + ;; need two threads, one a 5 second timer + ;; + (match res + ((status errmsg result meta) + (if (equal? result server-id) + #t ;; then we are good + (begin + (debug:print 0 *default-log-port* "WARNING: server-id does not match, expected: "server-id", got: "result) + #f))) + (else + (debug:print 0 *default-log-port* "res not in form (status errmsg resutl meta), got: "res) + #f)))) ;; client side handler ;; (define (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe) ;; NOTE: areapath is passed in and in tt struct. We'll use passed in value for now. @@ -149,11 +177,13 @@ ((loaded) (debug:print 0 *default-log-port* "WARNING: server is loaded, will try again in a second.") (thread-sleep! 1) (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe)) (else - result))))) + result))) + (else + (assert #f "FATAL: tt:handler received bad data "res)))) (begin (thread-sleep! 1) ;; give it a rest and try again (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe))))) ;; no conn yet, find and or start and find a server @@ -172,32 +202,43 @@ (define (tt:bid-for-servership run-id) #f) (define (tt:get-current-server-info ttdat dbfname run-id) - (let* ((sfiles (tt:find-server ttdat dbfname))) - (case (length sfiles) - ((0) #f) ;; no server around - ((1) (tt:server-get-info (car sfiles))) - (else #f) ;; we'll want to wait until extra servers have exited - ))) + (assert (tt-areapath ttdat) "FATAL: areapath not set in ttdat.") + (let* ((areapath (tt-areapath ttdat)) + (sfiles (tt:find-server areapath dbfname)) + (sdats (filter car (map tt:server-get-info sfiles))) ;; first element is #f if the file disappeared while being read + (sorted (sort sdats (lambda (a b) + (< (list-ref a 2)(list-ref b 2)))))) + (if (null? sorted) + #f ;; we'll want to wait until extra servers have exited + (car sorted)))) (define (tt:send-receive ttdat conn cmd run-id params) - (let* ((host-port (conc (tt-conn-host conn)":"(tt-conn-port conn))) - (dat (list cmd run-id params))) - (let-values (((inp oup)(tcp-connect host-port))) + (let* ((host-port (tt-conn-host-port conn)) ;; (conc (tt-conn-host conn)":"(tt-conn-port conn))) + (host (tt-conn-host conn)) + (port (tt-conn-port conn)) + (dat (list cmd run-id params #f))) ;; no meta data yet + (tt:send-receive-direct host port dat))) + +(define (tt:send-receive-direct host port dat) + (assert (number? port) "FATAL: tt:send-receive-direct called with port not a number "port) + (handle-exceptions + exn + #f ;; Add condition-case or better handling here + (let-values (((inp oup)(tcp-connect host port))) (let ((res (if (and inp oup) (begin (serialize dat oup) (close-output-port oup) (deserialize inp)) - (begin - (debug:print 0 *default-log-port* "ERROR: send called but no receiver has been setup. Please call setup first!") - #f)))) + ))) (close-input-port inp) - ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP res)))) + + ;;====================================================================== ;; server ;;====================================================================== @@ -210,38 +251,39 @@ ;; to pull in more modules ;; ;; This is the routine called in megatest.scm to start a server ;; (define (tt:start-server areapath run-id dbfname handler) + (assert areapath "FATAL: areapath not provided for tt:start-server") ;; is there already a server for this dbfile? Then exit. (let* ((ttdat (make-tt areapath: areapath)) - (servers (tt:find-server ttdat dbfname))) - (tt-handler-set! ttdat handler) + (servers (tt:find-server areapath dbfname))) ;; should use tt:get-current-server-info instead (if (null? servers) - (let* ((dbstruct (dbmod:open-dbmoddb areapath run-id (dbfile:db-init-proc))) - (tcp-thread (make-thread - (lambda () - (tt:start-tcp-server ttdat)) ;; start the tcp-server which applies handler to incoming data - "tcp-server-thread")) - (run-thread (make-thread - (lambda () - (tt:keep-running ttdat dbfname))))) - (thread-start! tcp-thread) - (thread-start! run-thread) - (thread-join! run-thread) ;; run thread will exit on timeout or other conditions - ;; - ;; set a flag here to tell tcp-thread to stop running - ;; - ;; (thread-join! tcp-thread) ;; can't wait - ;; - ;; remove the servinfo file - ;; - ;; close the database, remove lock in on-disk db - ;; - ;; close the listener ports - ;; - (exit)) + (let* ((dbstruct (dbmod:open-dbmoddb areapath run-id (dbfile:db-init-proc)))) + (tt-handler-set! ttdat (handler dbstruct)) + (let* ((tcp-thread (make-thread + (lambda () + (tt:start-tcp-server ttdat)) ;; start the tcp-server which applies handler to incoming data + "tcp-server-thread")) + (run-thread (make-thread + (lambda () + (tt:keep-running ttdat dbfname))))) + (thread-start! tcp-thread) + (thread-start! run-thread) + (thread-join! run-thread) ;; run thread will exit on timeout or other conditions + ;; + ;; set a flag here to tell tcp-thread to stop running + ;; + ;; (thread-join! tcp-thread) ;; can't wait + ;; + ;; remove the servinfo file + ;; + ;; close the database, remove lock in on-disk db + ;; + ;; close the listener ports + ;; + (exit))) (begin (debug:print 0 *default-log-port* "INFO: found server(s) already running for db "dbfname", "(string-intersperse servers ",")" Exiting.") (exit))))) (define (tt:keep-running ttdat dbfname) @@ -260,11 +302,11 @@ (tt:create-server-registration-file ttdat dbfname) ;; now start watching the last-access, if it hasn't been touched ;; in over ten seconds we exit (let loop () - (if (< (- (current-seconds) (tt-last-access ttdat)) 10) + (if (< (- (current-seconds) (tt-last-access ttdat)) 60) (begin (thread-sleep! 2) (loop)))) (if (tt-cleanup-proc ttdat) ((tt-cleanup-proc ttdat))) @@ -336,25 +378,24 @@ ;; find valid server ;; get servers listed, last part of name must match : ;; if more than one, wait one second and look again ;; future: ping oldest, if alive remove other : files ;; -(define (tt:find-server ttdat dbfname) - (let* ((areapath (tt-areapath ttdat)) - (servdir (tt:get-servinfo-dir areapath)) +(define (tt:find-server areapath dbfname) + (let* ((servdir (tt:get-servinfo-dir areapath)) (sfiles (glob (conc servdir"/*:"dbfname)))) sfiles)) ;; given a path to a server info file return: host port startseconds server-id ;; example of what it's looking for in the log file: ;; SERVER STARTED: 10.38.175.67:50216 AT 1616502350.0 server-id: 4907e90fc55c7a09694e3f658c639cf4 ;; (define (tt:server-get-info logf) - (let ((server-rx (regexp "^SERVER STARTED: (\\S+):(\\d+) AT ([\\d\\.]+) server-id: (\\S+) pid: (\\d+)")) ;; SERVER STARTED: host:port AT timesecs server id + (let ((server-rx (regexp "^SERVER STARTED: (\\S+):(\\d+) AT ([\\d\\.]+) server-id: (\\S+) pid: (\\d+) dbfname: (\\S+)")) ;; SERVER STARTED: host:port AT timesecs server id (dbprep-rx (regexp "^SERVER: dbprep")) (dbprep-found 0) - (bad-dat (list #f #f #f #f #f))) + (bad-dat (list #f #f #f #f #f #f))) (handle-exceptions exn (begin ;; WARNING: this is potentially dangerous to blanket ignore the errors (if (file-exists? logf) @@ -374,16 +415,17 @@ (loop (read-line)(+ lnum 1)) (begin (debug:print-info 0 *default-log-port* "Unable to get server info from first 500 lines of " logf ) bad-dat)) (match mlst - ((_ host port start server-id pid) + ((_ host port start server-id pid dbfname) (list host (string->number port) (string->number start) server-id - (string->number pid))) + (string->number pid) + dbfname)) (else (debug:print 0 *default-log-port* "ERROR: did not recognise SERVER line info "mlst) bad-dat)))) (begin (if dbprep-found