Index: api.scm ================================================================== --- api.scm +++ api.scm @@ -226,12 +226,12 @@ ;; (define (api:tcp-dispatch-request-make-handler dbstruct) ;; cmd run-id params) (assert *toppath* "FATAL: api:tcp-dispatch-request-make-handler called but *toppath* not set.") (if (not *server-signature*) (set! *server-signature* (tt:mk-signature *toppath*))) - (lambda () - (let* ((indat (deserialize)) + (lambda (indat) + (let* (;; (indat (deserialize)) (newcount (+ *api-process-request-count* 1)) (delay-wait (if (> newcount 10) (- newcount 10) 0)) (normal-proc (lambda (cmd run-id params) @@ -271,11 +271,12 @@ (meta (case cmd ((ping) `((sstate . ,server-state))) (else `((wait . ,delay-wait))))) (payload (list status errmsg result meta))) (set! *api-process-request-count* (- *api-process-request-count* 1)) - (serialize payload))) + ;; (serialize payload) + payload)) (else (assert #f "FATAL: failed to deserialize indat "indat)))))) (define (api:dispatch-request dbstruct cmd run-id params) Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -2642,13 +2642,13 @@ run-id #f (lambda (dbdat db) (db:first-result-default db - "SELECT attemptnum FROM tests WHERE id=?;" + "SELECT attemptnum FROM tests WHERE id=? AND run_id=?;" #f - test-id)))) + test-id run-id)))) (define db:test-record-fields '("id" "run_id" "testname" "state" "status" "event_time" "host" "cpuload" "diskfree" "uname" "rundir" "item_path" "run_duration" "final_logf" "comment" "shortdir" "attemptnum" "archived" "last_update")) @@ -2762,13 +2762,15 @@ (let ((res #f)) (sqlite3:for-each-row ;; attemptnum added to hold pid of top process (not Megatest) controlling a test (lambda (id run-id testname state status event-time host cpuload diskfree uname rundir-id item-path run_duration final-logf-id comment short-dir-id attemptnum archived last-update) ;; 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 (set! res (vector id run-id testname state status event-time host cpuload diskfree uname rundir-id item-path run_duration final-logf-id comment short-dir-id attemptnum archived last-update))) - (db:get-cache-stmth dbdat db - (conc "SELECT " db:test-record-qry-selector " FROM tests WHERE id=?;")) - test-id) + db + ;; (db:get-cache-stmth dbdat db + ;; (conc "SELECT " db:test-record-qry-selector " FROM tests WHERE id=? AND run_id=?;")) + (conc "SELECT " db:test-record-qry-selector " FROM tests WHERE id=? AND run_id=?;") + test-id run-id) res)))) ;; Get test state, status using test_id ;; (define (db:get-test-state-status-by-id dbstruct run-id test-id) @@ -2781,12 +2783,12 @@ ;; (stmth (db:get-cache-stmth dbdat db "SELECT state,status FROM tests WHERE id=?;"))) (sqlite3:for-each-row ;; attemptnum added to hold pid of top process (not Megatest) controlling a test (lambda (state status) (cons state status)) db - "SELECT state,status FROM tests WHERE id=?;" ;; stmth try not compiling this one - yes, this fixed the bind issue - test-id) + "SELECT state,status FROM tests WHERE id=? AND run_id=?;" ;; stmth try not compiling this one - yes, this fixed the bind issue + test-id run-id) res)))) ;; Use db:test-get* to access ;; Get test data using test_ids. NB// Only works within a single run!! ;; @@ -2845,13 +2847,13 @@ run-id #f (lambda (dbdat db) (db:first-result-default db - "SELECT rundir FROM tests WHERE id=?;" + "SELECT rundir FROM tests WHERE id=? AND run_id=?;" #f ;; default result - test-id)))) + test-id run-id)))) (define (db:get-test-times dbstruct run-name target) (let ((res `()) (qry (conc "select testname, item_path, run_duration, " (string-join (db:get-keys dbstruct) " || '/' || ") Index: tcp-transportmod.scm ================================================================== --- tcp-transportmod.scm +++ tcp-transportmod.scm @@ -323,18 +323,57 @@ (host (tt-conn-host conn)) (port (tt-conn-port conn)) (dat (list cmd run-id params #f))) ;; no meta data yet (tt:send-receive-direct host port dat))) +(defstruct tt:backoff + (last-ioerr (current-seconds)) + (last-adj-t (current-seconds)) + (wait-delay 0.1)) + +(define *tt:backoff-smoothing* (make-hash-table)) ;; host:port => lastaccess backoffdelay ) + +(define (tt:backoff-incr host port) ;; call if tcp fails i/o net + (let* ((host-port (conc host":"port)) + (bkoff (hash-table-ref/default *tt:backoff-smoothing* host-port #f))) + (if bkoff + (begin + (tt:backoff-last-ioerr-set! bkoff (current-seconds)) + (tt:backoff-wait-delay-set! bkoff (+ (tt:backoff-wait-delay bkoff) 0.1))) + (hash-table-set! *tt:backoff-smoothing* host-port (make-tt:backoff))))) + +(define (tt:backoff-decr-and-wait host port) + (let* ((host-port (conc host":"port)) + (bkoff (hash-table-ref/default *tt:backoff-smoothing* host-port #f))) + (if bkoff + (let* ((wait-delay (tt:backoff-wait-delay bkoff)) + (last-ioerr (tt:backoff-last-ioerr bkoff)) + (last-adj-t (tt:backoff-last-adj-t bkoff)) + (delta (- (current-seconds) last-adj-t)) + (adj (* delta 0.01)) ;; it takes ten seconds to recover from hitting an io err + (new-wait (if (> wait-delay 0) + (if (> adj wait-delay) + 0 + (- wait-delay adj)) + 0))) + (if (> new-wait 0) + (begin + (debug:print-info 0 *default-log-port* "Server loaded, DelayWait: "new-wait) + (tt:backoff-wait-delay-set! bkoff new-wait) + (tt:backoff-last-adj-t-set! bkoff (current-seconds)) + (thread-sleep! new-wait)) + (hash-table-delete! *tt:backoff-smoothing* host-port)))))) + (define (tt:send-receive-direct host port dat #!key (ping-mode #f)(tries-remaining 25)) (assert (number? port) "FATAL: tt:send-receive-direct called with port not a number "port) + (tt:backoff-decr-and-wait host port) (let* ((retry (lambda () (tt:send-receive-direct host port dat tries-remaining: (- tries-remaining 1)))) - (full-err-print (lambda (exn) + (full-err-print (lambda (exn msg) (pp (condition->list exn) *default-log-port*) (pp dat *default-log-port*) - (debug:print 0 *default-log-port* + (debug:print 0 *default-log-port* msg ", error: " ((condition-property-accessor 'exn 'message) exn) ", arguments: " ((condition-property-accessor 'exn 'arguments) exn) ", location: " ((condition-property-accessor 'exn 'location) exn) )))) (condition-case @@ -344,26 +383,36 @@ (serialize dat oup) (close-output-port oup) (deserialize inp)) ))) (close-input-port inp) - res)) + (match res + ((result exn-result stdout-result) + (if exn-result + (full-err-print exn-result "ERROR: Server side exception detected")) + (if stdout-result + (debug:print 0 *default-log-port* "ERROR: Output detected on stdout on server side execution => "stdout-result)) + result) + (else + (debug:print 0 *default-log-port* "ERROR: server returned non-standard output: "res) + #f)))) (exn (io-error) - (full-err-print exn) - (debug:print 0 *default-log-port* exn "ERROR: i/o error") + (full-err-print exn "ERROR: i/o error") + (tt:backoff-incr host port) #f) (exn (i/o net) (if ping-mode #f (if (>= tries-remaining 0) (let* ((backoff-delay (* (- 26 tries-remaining) 0.5))) (debug:print 0 *default-log-port* "WARNING: TCP overload, trying again in "backoff-delay"s.") (thread-sleep! backoff-delay) + (tt:backoff-incr host port) (retry)) (assert #f "FATAL: Too many retries in tt:send-receive-direct")))) (exn () - (full-err-print exn) + (full-err-print exn "Unhandled exception from client side.") #f)))) ;;====================================================================== ;; server @@ -685,12 +734,33 @@ ;; for the entire server system ;; (define (tt:start-tcp-server ttdat) (setup-listener-portlogger ttdat) (let* ((socket (tt-socket ttdat)) - (handler (tt-handler ttdat))) - ((make-tcp-server socket handler) + (handler (tt-handler ttdat)) + (handler-proc (lambda () + (let* ((indat (deserialize)) + (result #f) + (exn-result #f) + (stdout-result (with-output-to-string + (lambda () + (let ((res (handle-exceptions + exn + (begin + (set! exn-result (condition->list exn)) + #f) + (handler indat)))) + (set! result res))))) + (full-result (list result exn-result (if (equal? stdout-result "") #f stdout-result)))) + (handle-exceptions + exn + (begin + (debug:print 0 *default-log-port* "Serialization failure. full-result="full-result) + ;; (serialize '(#f #f #f)) ;; doesn't work - the first call to serialize caused failure + ) + (serialize full-result)))))) + ((make-tcp-server socket handler-proc) #f ;; yes, send error messages to std-err ))) ;; create a tcp listener and return a populated udat struct with ;; my port, address, hostname, pid etc.