Index: api.scm
==================================================================
--- api.scm
+++ api.scm
@@ -18,21 +18,26 @@
;; You should have received a copy of the GNU General Public License
;; along with Megatest. If not, see .
;;
;;======================================================================
-(use srfi-69 posix)
-
(declare (unit api))
(declare (uses rmt))
(declare (uses db))
(declare (uses dbmod))
(declare (uses dbfile))
(declare (uses tasks))
+(declare (uses tcp-transportmod))
(import dbmod)
(import dbfile)
+(import tcp-transportmod)
+
+(use srfi-69
+ posix
+ matchable
+ s11n)
;; allow these queries through without starting a server
;;
(define api:read-only-queries
'(get-key-val-pairs
@@ -225,32 +230,36 @@
payload: `((params . ,params)
(ok-res . #f)))
(vector #t res))))))))
;; indat is (cmd run-id params meta)
-(define (api:tcp-dispatch-request dbstruct indat) ;; cmd run-id params)
- (set! *api-process-request-count* (+ *api-process-request-count* 1))
- (match (deserialize indat)
- ((cmd run-id params meta)
- (let* ((status (cond
- ((> *api-process-request-count* 50) 'busy)
- ((> *api-process-request-count* 25) 'loaded)
- (else 'ok)))
- (errmsg (case status
- ((busy) (conc "Server overloaded, "*api-process-request-count*" threads in flight"))
- ((loaded) (conc "Server loaded, "*api-process-request-count*" threads in flight"))
- (else #f)))
- (result (case status
- ((busy) #f)
- (else (api:dispatch-request dbstruct cmd run-id params))))
- (payload (list status errmsg result '()))
- (pdat (serialize payload)))
- (set! *api-process-request-count* (- *api-process-request-count* 1))
- pdat))
- (else
- (let* ((msg (conc "(deserialize indat)="(deserialize indat)", indat="indat)))
- (assert #f "FATAL: failed to deserialize indat "msg)))))
+(define (api:tcp-dispatch-request-make-handler dbstruct) ;; cmd run-id params)
+ (lambda ()
+ (let* ((indat (deserialize)))
+ (set! *api-process-request-count* (+ *api-process-request-count* 1))
+ (match indat
+ ((cmd run-id params meta)
+ (let* ((status (cond
+ ((> *api-process-request-count* 50) 'busy)
+ ((> *api-process-request-count* 25) 'loaded)
+ (else 'ok)))
+ (errmsg (case status
+ ((busy) (conc "Server overloaded, "*api-process-request-count*" threads in flight"))
+ ((loaded) (conc "Server loaded, "*api-process-request-count*" threads in flight"))
+ (else #f)))
+ (result (case status
+ ((busy) #f)
+ (else
+ (case cmd
+ ((ping) (tt:mk-signature *toppath*))
+ (else
+ (api:dispatch-request dbstruct cmd run-id params))))))
+ (payload (list status errmsg result '())))
+ (set! *api-process-request-count* (- *api-process-request-count* 1))
+ (serialize payload)))
+ (else
+ (assert #f "FATAL: failed to deserialize indat "indat))))))
(define (api:dispatch-request dbstruct cmd run-id params)
(case cmd
;;===============================================
Index: megatest.scm
==================================================================
--- megatest.scm
+++ megatest.scm
@@ -941,11 +941,11 @@
(case (rmt:transport-mode)
((http)(http-transport:launch))
((tcp)
(debug:print 0 *default-log-port* "INFO: Running using tcp method.")
(if run-id
- (tt:start-server tl run-id dbfname api:tcp-dispatch-request)
+ (tt:start-server tl run-id dbfname api:tcp-dispatch-request-make-handler)
(begin
(debug:print 0 *default-log-port* "ERROR: transport mode is tcp - -run-id is required.")
(exit 1))))
(else (debug:print 0 *default-log-port* "ERROR: rmt:transport-mode value not recognised "(rmt:transport-mode))))
(set! *didsomething* #t)))
Index: tcp-transportmod.scm
==================================================================
--- tcp-transportmod.scm
+++ tcp-transportmod.scm
@@ -98,39 +98,67 @@
(cmd-thread #f)
(last-access (current-seconds))
)
(define (tt:make-remote areapath)
- (make-tt area: areapath))
+ (make-tt areapath: areapath))
;; do all the busy work of finding and setting up conn for
;; connecting to a server
;;
-(define (tt:client-connect-to-server ttdat dbfname run-id)
- (let* ((conn (hash-table-ref/default (tt-conns ttdat) dbfname #f)))
+(define (tt:client-connect-to-server ttdat dbfname run-id )
+ (let* ((conn (hash-table-ref/default (tt-conns ttdat) dbfname #f))
+ (server-start-proc (lambda ()
+ (tt:server-process-run
+ (tt-areapath ttdat)
+ (dbfile:testsuite-name)
+ (common:find-local-megatest)
+ run-id))))
(if conn
conn ;; we are already connected to the server
(let* ((sdat (tt:get-current-server-info ttdat dbfname run-id)))
(match sdat
- ((host port start-time server-id pid)
- (let ((conn (make-tt-conn
- host: host
- port: port
- dbfname: dbfname
- server-id: server-id
- server-start: start-time
- pid: pid)))
+ ((host port start-time server-id pid dbfname2)
+ (assert (equal? dbfname dbfname2) "FATAL: read server info from wrong file.")
+ (let* ((host-port (conc host":"port))
+ (conn (make-tt-conn
+ host: host
+ port: port
+ host-port: host-port
+ dbfname: dbfname
+ server-id: server-id
+ server-start: start-time
+ pid: pid)))
(hash-table-set! (tt-conns ttdat) dbfname conn)
- conn))
+ ;; verify we can talk to this server
+ (if (tt:ping host port server-id)
+ conn
+ (begin
+ ;; rm the (last server) would go here
+ (server-start-proc)
+ (thread-sleep! 1)
+ (tt:client-connect-to-server ttdat dbfname run-id)))))
(else
- (tt:server-process-run
- (tt-areapath ttdat)
- (dbfile:testsuite-name)
- (common:find-local-megatest)
- run-id)
+ (server-start-proc)
(thread-sleep! 1)
(tt:client-connect-to-server ttdat dbfname run-id)))))))
+
+(define (tt:ping host port server-id)
+ (let* ((res (tt:send-receive-direct host port `(ping #f #f #f)))) ;; please send me your server-id
+ ;;
+ ;; need two threads, one a 5 second timer
+ ;;
+ (match res
+ ((status errmsg result meta)
+ (if (equal? result server-id)
+ #t ;; then we are good
+ (begin
+ (debug:print 0 *default-log-port* "WARNING: server-id does not match, expected: "server-id", got: "result)
+ #f)))
+ (else
+ (debug:print 0 *default-log-port* "res not in form (status errmsg resutl meta), got: "res)
+ #f))))
;; client side handler
;;
(define (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe)
;; NOTE: areapath is passed in and in tt struct. We'll use passed in value for now.
@@ -149,11 +177,13 @@
((loaded)
(debug:print 0 *default-log-port* "WARNING: server is loaded, will try again in a second.")
(thread-sleep! 1)
(tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe))
(else
- result)))))
+ result)))
+ (else
+ (assert #f "FATAL: tt:handler received bad data "res))))
(begin
(thread-sleep! 1) ;; give it a rest and try again
(tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe)))))
;; no conn yet, find and or start and find a server
@@ -172,32 +202,43 @@
(define (tt:bid-for-servership run-id)
#f)
(define (tt:get-current-server-info ttdat dbfname run-id)
- (let* ((sfiles (tt:find-server ttdat dbfname)))
- (case (length sfiles)
- ((0) #f) ;; no server around
- ((1) (tt:server-get-info (car sfiles)))
- (else #f) ;; we'll want to wait until extra servers have exited
- )))
+ (assert (tt-areapath ttdat) "FATAL: areapath not set in ttdat.")
+ (let* ((areapath (tt-areapath ttdat))
+ (sfiles (tt:find-server areapath dbfname))
+ (sdats (filter car (map tt:server-get-info sfiles))) ;; first element is #f if the file disappeared while being read
+ (sorted (sort sdats (lambda (a b)
+ (< (list-ref a 2)(list-ref b 2))))))
+ (if (null? sorted)
+ #f ;; we'll want to wait until extra servers have exited
+ (car sorted))))
(define (tt:send-receive ttdat conn cmd run-id params)
- (let* ((host-port (conc (tt-conn-host conn)":"(tt-conn-port conn)))
- (dat (list cmd run-id params)))
- (let-values (((inp oup)(tcp-connect host-port)))
+ (let* ((host-port (tt-conn-host-port conn)) ;; (conc (tt-conn-host conn)":"(tt-conn-port conn)))
+ (host (tt-conn-host conn))
+ (port (tt-conn-port conn))
+ (dat (list cmd run-id params #f))) ;; no meta data yet
+ (tt:send-receive-direct host port dat)))
+
+(define (tt:send-receive-direct host port dat)
+ (assert (number? port) "FATAL: tt:send-receive-direct called with port not a number "port)
+ (handle-exceptions
+ exn
+ #f ;; Add condition-case or better handling here
+ (let-values (((inp oup)(tcp-connect host port)))
(let ((res (if (and inp oup)
(begin
(serialize dat oup)
(close-output-port oup)
(deserialize inp))
- (begin
- (debug:print 0 *default-log-port* "ERROR: send called but no receiver has been setup. Please call setup first!")
- #f))))
+ )))
(close-input-port inp)
- ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
res))))
+
+
;;======================================================================
;; server
;;======================================================================
@@ -210,38 +251,39 @@
;; to pull in more modules
;;
;; This is the routine called in megatest.scm to start a server
;;
(define (tt:start-server areapath run-id dbfname handler)
+ (assert areapath "FATAL: areapath not provided for tt:start-server")
;; is there already a server for this dbfile? Then exit.
(let* ((ttdat (make-tt areapath: areapath))
- (servers (tt:find-server ttdat dbfname)))
- (tt-handler-set! ttdat handler)
+ (servers (tt:find-server areapath dbfname))) ;; should use tt:get-current-server-info instead
(if (null? servers)
- (let* ((dbstruct (dbmod:open-dbmoddb areapath run-id (dbfile:db-init-proc)))
- (tcp-thread (make-thread
- (lambda ()
- (tt:start-tcp-server ttdat)) ;; start the tcp-server which applies handler to incoming data
- "tcp-server-thread"))
- (run-thread (make-thread
- (lambda ()
- (tt:keep-running ttdat dbfname)))))
- (thread-start! tcp-thread)
- (thread-start! run-thread)
- (thread-join! run-thread) ;; run thread will exit on timeout or other conditions
- ;;
- ;; set a flag here to tell tcp-thread to stop running
- ;;
- ;; (thread-join! tcp-thread) ;; can't wait
- ;;
- ;; remove the servinfo file
- ;;
- ;; close the database, remove lock in on-disk db
- ;;
- ;; close the listener ports
- ;;
- (exit))
+ (let* ((dbstruct (dbmod:open-dbmoddb areapath run-id (dbfile:db-init-proc))))
+ (tt-handler-set! ttdat (handler dbstruct))
+ (let* ((tcp-thread (make-thread
+ (lambda ()
+ (tt:start-tcp-server ttdat)) ;; start the tcp-server which applies handler to incoming data
+ "tcp-server-thread"))
+ (run-thread (make-thread
+ (lambda ()
+ (tt:keep-running ttdat dbfname)))))
+ (thread-start! tcp-thread)
+ (thread-start! run-thread)
+ (thread-join! run-thread) ;; run thread will exit on timeout or other conditions
+ ;;
+ ;; set a flag here to tell tcp-thread to stop running
+ ;;
+ ;; (thread-join! tcp-thread) ;; can't wait
+ ;;
+ ;; remove the servinfo file
+ ;;
+ ;; close the database, remove lock in on-disk db
+ ;;
+ ;; close the listener ports
+ ;;
+ (exit)))
(begin
(debug:print 0 *default-log-port* "INFO: found server(s) already running for db "dbfname", "(string-intersperse servers ",")" Exiting.")
(exit)))))
(define (tt:keep-running ttdat dbfname)
@@ -260,11 +302,11 @@
(tt:create-server-registration-file ttdat dbfname)
;; now start watching the last-access, if it hasn't been touched
;; in over ten seconds we exit
(let loop ()
- (if (< (- (current-seconds) (tt-last-access ttdat)) 10)
+ (if (< (- (current-seconds) (tt-last-access ttdat)) 60)
(begin
(thread-sleep! 2)
(loop))))
(if (tt-cleanup-proc ttdat)
((tt-cleanup-proc ttdat)))
@@ -336,25 +378,24 @@
;; find valid server
;; get servers listed, last part of name must match :
;; if more than one, wait one second and look again
;; future: ping oldest, if alive remove other : files
;;
-(define (tt:find-server ttdat dbfname)
- (let* ((areapath (tt-areapath ttdat))
- (servdir (tt:get-servinfo-dir areapath))
+(define (tt:find-server areapath dbfname)
+ (let* ((servdir (tt:get-servinfo-dir areapath))
(sfiles (glob (conc servdir"/*:"dbfname))))
sfiles))
;; given a path to a server info file return: host port startseconds server-id
;; example of what it's looking for in the log file:
;; SERVER STARTED: 10.38.175.67:50216 AT 1616502350.0 server-id: 4907e90fc55c7a09694e3f658c639cf4
;;
(define (tt:server-get-info logf)
- (let ((server-rx (regexp "^SERVER STARTED: (\\S+):(\\d+) AT ([\\d\\.]+) server-id: (\\S+) pid: (\\d+)")) ;; SERVER STARTED: host:port AT timesecs server id
+ (let ((server-rx (regexp "^SERVER STARTED: (\\S+):(\\d+) AT ([\\d\\.]+) server-id: (\\S+) pid: (\\d+) dbfname: (\\S+)")) ;; SERVER STARTED: host:port AT timesecs server id
(dbprep-rx (regexp "^SERVER: dbprep"))
(dbprep-found 0)
- (bad-dat (list #f #f #f #f #f)))
+ (bad-dat (list #f #f #f #f #f #f)))
(handle-exceptions
exn
(begin
;; WARNING: this is potentially dangerous to blanket ignore the errors
(if (file-exists? logf)
@@ -374,16 +415,17 @@
(loop (read-line)(+ lnum 1))
(begin
(debug:print-info 0 *default-log-port* "Unable to get server info from first 500 lines of " logf )
bad-dat))
(match mlst
- ((_ host port start server-id pid)
+ ((_ host port start server-id pid dbfname)
(list host
(string->number port)
(string->number start)
server-id
- (string->number pid)))
+ (string->number pid)
+ dbfname))
(else
(debug:print 0 *default-log-port* "ERROR: did not recognise SERVER line info "mlst)
bad-dat))))
(begin
(if dbprep-found