Index: Makefile ================================================================== --- Makefile +++ Makefile @@ -36,11 +36,11 @@ subrun.scm archive.scm env.scm \ diff-report.scm cgisetup/models/pgdb.scm # module source files MSRCFILES = dbfile.scm debugprint.scm mtargs.scm commonmod.scm dbmod.scm \ - tcp-transportmod.scm rmtmod.scm portlogger.scm + tcp-transportmod.scm rmtmod.scm portlogger.scm apimod.scm transport-mode.scm : transport-mode.scm.template cp transport-mode.scm.template transport-mode.scm dashboard-transport-mode.scm : dashboard-transport-mode.scm.template @@ -52,13 +52,13 @@ # dbmod.import.o is just a hack here mofiles/portlogger.o : mofiles/dbmod.o mofiles/dbfile.o : \ mofiles/debugprint.o mofiles/commonmod.o - +mofiles/apimod.o : mofiles/commonmod.o mofiles/tcp-transportmod.o mofiles/dbmod.o : mofiles/dbfile.o - +mofiles/api.o : mofiles/apimod.o mofiles/commonmod.o : mofiles/debugprint.o configf.o : commonmod.import.o mofiles/dbfile.o : mofiles/debugprint.o mofiles/rmtmod.o mofiles/dbmod.o : mofiles/dbfile.o mofiles/commonmod.o mofiles/debugprint.o db.o : mofiles/dbmod.o mofiles/dbfile.o Index: api.scm ================================================================== --- api.scm +++ api.scm @@ -18,18 +18,21 @@ ;; ;;====================================================================== (declare (unit api)) (declare (uses db)) +(declare (uses apimod)) + (declare (uses debugprint)) (declare (uses commonmod)) (declare (uses dbmod)) (declare (uses dbfile)) (declare (uses tasks)) (declare (uses tcp-transportmod)) (import commonmod) +(import apimod) (import dbmod) (import dbfile) (import debugprint) (import tcp-transportmod) @@ -38,279 +41,16 @@ posix matchable s11n typed-records) -;; allow these queries through without starting a server -;; -(define api:read-only-queries - '(get-key-val-pairs - get-var - get-keys - get-key-vals - test-toplevel-num-items - get-test-info-by-id - get-test-state-status-by-id - get-steps-info-by-id - get-data-info-by-id - test-get-rundir-from-test-id - get-count-tests-running-for-testname - get-count-tests-running - get-count-tests-running-in-jobgroup - get-previous-test-run-record - get-matching-previous-test-run-records - test-get-logfile-info - test-get-records-for-index-file - get-testinfo-state-status - test-get-top-process-pid - test-get-paths-matching-keynames-target-new - get-prereqs-not-met - get-count-tests-running-for-run-id - get-run-info - get-run-status - get-run-state - get-run-stats - get-run-times - get-target - get-targets - ;; register-run - get-tests-tags - get-test-times - get-tests-for-run - get-tests-for-run-state-status - get-test-id - get-tests-for-runs-mindata - get-tests-for-run-mindata - get-run-name-from-id - get-runs - simple-get-runs - get-num-runs - get-runs-cnt-by-patt - get-all-run-ids - get-prev-run-ids - get-run-ids-matching-target - get-runs-by-patt - get-steps-data - get-steps-for-test - read-test-data - read-test-data-varpatt - login - tasks-get-last - testmeta-get-record - have-incompletes? - get-changed-record-ids - get-all-runids - get-changed-record-test-ids - get-changed-record-run-ids - get-run-record-ids - get-not-completed-cnt)) - -(define api:write-queries - '( - get-keys-write ;; dummy "write" query to force server start - - ;; SERVERS - ;; start-server - ;; kill-server - - ;; TESTS - test-set-state-status-by-id - delete-test-records - delete-old-deleted-test-records - test-set-state-status - test-set-top-process-pid - set-state-status-and-roll-up-items - - update-pass-fail-counts - top-test-set-per-pf-counts ;; (db:top-test-set-per-pf-counts (db:get-db *db* 5) 5 "runfirst") - - ;; RUNS - register-run - set-tests-state-status - delete-run - lock/unlock-run - update-run-event_time - mark-incomplete - set-state-status-and-roll-up-run - ;; STEPS - teststep-set-status! - delete-steps-for-test - ;; TEST DATA - test-data-rollup - csv->test-data - - ;; MISC - sync-cachedb->db - drop-all-triggers - create-all-triggers - update-tesdata-on-repilcate-db - - ;; TESTMETA - testmeta-add-record - testmeta-update-field - - ;; TASKS - tasks-add - tasks-set-state-given-param-key - )) - -(define *db-write-mutexes* (make-hash-table)) -(define *server-signature* #f) - -(define *api-threads* '()) -(define (api:register-thread th-in) - (set! *api-threads* (cons (cons th-in (current-seconds)) *api-threads*))) - -(define (api:unregister-thread th-in) - (set! *api-threads* (filter (lambda (thdat) - (not (eq? th-in (car thdat)))) - *api-threads*))) - -(define (api:remove-dead-or-terminated) - (set! *api-threads* (filter (lambda (thdat) - (not (member (thread-state (car thdat)) '(terminated dead)))) - *api-threads*))) - -(define (api:get-count-threads-alive) - (length *api-threads*)) - -(define *api:last-stats-print* 0) -(define *api-print-db-stats-mutex* (make-mutex)) -(define (api:print-db-stats) - (debug:print-info 0 *default-log-port* "Started periodic db stats printer") - (let loop () - (mutex-lock! *api-print-db-stats-mutex*) - (if (> (- (current-seconds) *api:last-stats-print*) 15) - (begin - (rmt:print-db-stats) - (set! *api:last-stats-print* (current-seconds)))) - (mutex-unlock! *api-print-db-stats-mutex*) - (thread-sleep! 5) - (loop))) - -(define *api:queue-mutex* (make-mutex)) -(define *api:queue-id* 0) - -(define *api:in-queue* '()) -(define *api:results* (make-hash-table)) ;; id->queue-item - -(define (api:start-queue-processor) - ;; look for work in in-queue - ;; process it - ;; put result in out-queue - ;; sleep 20ms - - #t) - -(defstuct api:queue-item - (proc #f) - (cmd #f) - (params #f) - (start-time (current-seconds)) - (end-time #f) - (id #f)) - -(define (api:add-queue-item proc cmd params) - (mutex-lock *api:queue-mutex*) - (set! *api:queue-id* (+ *api:queue-id* 1)) - (set! *api:in-queue* - (cons (make-api:queue-item - proc: proc - cmd: cmd - params: params - id: *api:queue-id* - ) - *api:in-queue*)) - (let ((id *api:queue-id*)) - (mutex-unlock *apt:queue-mutex*) - id)) ;; return id so calling proc can find the result in *api:results* - -(define (api:get-queue-item) - (mutex-lock *api:queue-mutex*) - (let* ((res (if (null? *api:in-queue*) - #f - (let* ((revlst (reverse *api:in-queue*))) - (set! *api:in-queue* (reverse (cdr revlist))) - (car revlist))))) - (mutex-unlock *api:queue-mutex*) - res)) + +;; QUEUE METHOD (define (api:tcp-dispatch-request-make-handler-new dbstruct) ;; cmd run-id params) - - ;; put proc into in-queue - ;; poll out-queue for result evey 25ms - ;; time out with big message - - (assert *toppath* "FATAL: api:tcp-dispatch-request-make-handler called but *toppath* not set.") - (if (not *server-signature*) - (set! *server-signature* (tt:mk-signature *toppath*))) - (lambda (indat) - (let* ((result - (let* ((numthreads (api:get-count-threads-alive)) - (delay-wait (if (> numthreads 10) - (- numthreads 10) - 0)) - (normal-proc (lambda (cmd run-id params) - (case cmd - ((ping) *server-signature*) - (else - (api:dispatch-request dbstruct cmd run-id params)))))) - (set! *api-process-request-count* numthreads) - (set! *db-last-access* (current-seconds)) -;; (if (not (eq? numthreads numthreads)) -;; (begin -;; (api:remove-dead-or-terminated) -;; (let ((threads-now (api:get-count-threads-alive))) -;; (debug:print 0 *default-log-port* "WARNING: numthreads="numthreads", numthreads="numthreads", remaining="threads-now) -;; (set! numthreads threads-now)))) - (match indat - ((cmd run-id params meta) - (let* ((start-t (current-milliseconds)) - (db-ok (let* ((dbfname (dbmod:run-id->dbfname run-id)) - (ok (equal? dbfname (dbr:dbstruct-dbfname dbstruct)))) - (case cmd - ((ping) #t) ;; we are fine - (else - (assert ok "FATAL: database file and run-id not aligned."))))) - (ttdat *server-info*) - (server-state (tt-state ttdat)) - (maxthreads 20) ;; make this a parameter? - (status (cond - ((and (> numthreads maxthreads) - (> (random 100) 70)) ;; allow a 30% probability to go through so we can figure out what is going wrong in main.db server. - 'busy) - ;; ((> numthreads 5) 'loaded) ;; this gets transmitted to the client which calls tt:backoff-incr to slow stuff down. - (else 'ok))) - (errmsg (case status - ((busy) (conc "Server overloaded, "numthreads" threads in flight")) - ((loaded) (conc "Server loaded, "numthreads" threads in flight")) - (else #f))) - (result (case status - ((busy) - (if (eq? cmd 'ping) - (normal-proc cmd run-id params) - ;; numthreads must be greater than 5 for busy - (* 0.1 (- numthreads maxthreads)) ;; was 15 - return a number for the remote to delay - )) ;; (- numthreads 29)) ;; call back in as many seconds - ((loaded) - ;; (if (eq? (rmt:transport-mode) 'tcp) - ;; (thread-sleep! 0.5)) - (normal-proc cmd run-id params)) - (else - (normal-proc cmd run-id params)))) - (meta (case cmd - ((ping) `((sstate . ,server-state))) - (else `((wait . ,delay-wait))))) - (payload (list status errmsg result meta))) - ;; (cmd run-id params meta) - (db:add-stats cmd run-id params (- (current-milliseconds) start-t)) - payload)) - (else - (assert #f "FATAL: failed to deserialize indat "indat)))))) - result))) - -(define api:tcp-dispatch-request-make-handler api:tcp-dispatch-request-make-handler-old) + (api:tcp-dispatch-request-make-handler-core dbstruct api:dispatch-request)) + ;; indat is (cmd run-id params meta) ;; ;; WARNING: Do not print anything in the lambda of this function as it ;; reads/writes to current in/out port @@ -387,11 +127,11 @@ ;; (serialize payload) (api:unregister-thread (current-thread)) result))) - +(define api:tcp-dispatch-request-make-handler api:tcp-dispatch-request-make-handler-new) (define *api-halt-writes* #f) (define (api:dispatch-request dbstruct cmd run-id params) (if (not *no-sync-db*) Index: apimod.scm ================================================================== --- apimod.scm +++ apimod.scm @@ -18,15 +18,315 @@ ;;====================================================================== (declare (unit apimod)) (declare (uses commonmod)) +(declare (uses debugprint)) +(declare (uses dbmod)) +(declare (uses dbfile)) +(declare (uses tcp-transportmod)) (module apimod * (import scheme chicken data-structures extras) -(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18) +(import (prefix sqlite3 sqlite3:) posix matchable typed-records srfi-1 srfi-18 srfi-69 ) (import commonmod) +(import debugprint) +(import dbmod) +(import dbfile) +(import tcp-transportmod) + +;; allow these queries through without starting a server +;; +(define api:read-only-queries + '(get-key-val-pairs + get-var + get-keys + get-key-vals + test-toplevel-num-items + get-test-info-by-id + get-test-state-status-by-id + get-steps-info-by-id + get-data-info-by-id + test-get-rundir-from-test-id + get-count-tests-running-for-testname + get-count-tests-running + get-count-tests-running-in-jobgroup + get-previous-test-run-record + get-matching-previous-test-run-records + test-get-logfile-info + test-get-records-for-index-file + get-testinfo-state-status + test-get-top-process-pid + test-get-paths-matching-keynames-target-new + get-prereqs-not-met + get-count-tests-running-for-run-id + get-run-info + get-run-status + get-run-state + get-run-stats + get-run-times + get-target + get-targets + ;; register-run + get-tests-tags + get-test-times + get-tests-for-run + get-tests-for-run-state-status + get-test-id + get-tests-for-runs-mindata + get-tests-for-run-mindata + get-run-name-from-id + get-runs + simple-get-runs + get-num-runs + get-runs-cnt-by-patt + get-all-run-ids + get-prev-run-ids + get-run-ids-matching-target + get-runs-by-patt + get-steps-data + get-steps-for-test + read-test-data + read-test-data-varpatt + login + tasks-get-last + testmeta-get-record + have-incompletes? + get-changed-record-ids + get-all-runids + get-changed-record-test-ids + get-changed-record-run-ids + get-run-record-ids + get-not-completed-cnt)) + +(define api:write-queries + '( + get-keys-write ;; dummy "write" query to force server start + + ;; SERVERS + ;; start-server + ;; kill-server + + ;; TESTS + test-set-state-status-by-id + delete-test-records + delete-old-deleted-test-records + test-set-state-status + test-set-top-process-pid + set-state-status-and-roll-up-items + + update-pass-fail-counts + top-test-set-per-pf-counts ;; (db:top-test-set-per-pf-counts (db:get-db *db* 5) 5 "runfirst") + + ;; RUNS + register-run + set-tests-state-status + delete-run + lock/unlock-run + update-run-event_time + mark-incomplete + set-state-status-and-roll-up-run + ;; STEPS + teststep-set-status! + delete-steps-for-test + ;; TEST DATA + test-data-rollup + csv->test-data + + ;; MISC + sync-cachedb->db + drop-all-triggers + create-all-triggers + update-tesdata-on-repilcate-db + + ;; TESTMETA + testmeta-add-record + testmeta-update-field + + ;; TASKS + tasks-add + tasks-set-state-given-param-key + )) + +(define *db-write-mutexes* (make-hash-table)) +(define *server-signature* #f) + +(define *api-threads* '()) +(define (api:register-thread th-in) + (set! *api-threads* (cons (cons th-in (current-seconds)) *api-threads*))) + +(define (api:unregister-thread th-in) + (set! *api-threads* (filter (lambda (thdat) + (not (eq? th-in (car thdat)))) + *api-threads*))) + +(define (api:remove-dead-or-terminated) + (set! *api-threads* (filter (lambda (thdat) + (not (member (thread-state (car thdat)) '(terminated dead)))) + *api-threads*))) + +(define (api:get-count-threads-alive) + (length *api-threads*)) + +(define *api:last-stats-print* 0) +(define *api-print-db-stats-mutex* (make-mutex)) +(define (api:print-db-stats) + (debug:print-info 0 *default-log-port* "Started periodic db stats printer") + (let loop () + (mutex-lock! *api-print-db-stats-mutex*) + (if (> (- (current-seconds) *api:last-stats-print*) 15) + (begin + (dbmod:print-db-stats) + (set! *api:last-stats-print* (current-seconds)))) + (mutex-unlock! *api-print-db-stats-mutex*) + (thread-sleep! 5) + (loop))) + +;; QUEUE METHOD + +(define *api:queue-mutex* (make-mutex)) +(define *api:queue-id* 0) + +(define *api:in-queue* '()) +(define *api:results* (make-hash-table)) ;; id->queue-item + +(defstruct api:queue-item + (proc #f) + (cmd #f) + (run-id #f) + (params #f) + (start-time (current-seconds)) + (end-time #f) + (id #f) + (results #f)) + +;; Add an item to the incoming queue. +;; +(define (api:add-queue-item proc cmd run-id params) + (mutex-lock! *api:queue-mutex*) + (set! *api:queue-id* (+ *api:queue-id* 1)) + (set! *api:in-queue* + (cons (make-api:queue-item + proc: proc + cmd: cmd + run-id: run-id + params: params + id: *api:queue-id* + ) + *api:in-queue*)) + (let ((id *api:queue-id*)) + (mutex-unlock! *api:queue-mutex*) + id)) ;; return id so calling proc can find the result in *api:results* + +;; get a queue item from the end of the queue. +;; return #f if there are no items to be processed. +;; +(define (api:get-queue-item) + (mutex-lock! *api:queue-mutex*) + (let* ((res (if (null? *api:in-queue*) + #f + (let* ((revlist (reverse *api:in-queue*))) + (set! *api:in-queue* (reverse (cdr revlist))) + (car revlist))))) + (mutex-unlock! *api:queue-mutex*) + res)) + +(define (api:put-item-in-results id item) + (hash-table-set! *api:results* id item)) + +(define (api:retrieve-result-item id) + (let ((res (hash-table-ref/default *api:results* id #f))) + (if res + (begin + (hash-table-delete! *api:results* id) + res) + #f))) + +;; timeout is in ms, poll less frequently over time +;; +;; Yes, it would be better to do this with mailboxes. My last attempt to use +;; mailboxes resulted in erratic behavior but that was likely due to something +;; unrelated. Just to eliminate uncertainty we'll start with polling and switch +;; to mailboxes laters. +;; +(define (api:wait-for-result id #!key (timeout 30000)) + (let loop ((start (current-milliseconds))) + (thread-sleep! (let ((delta (- (current-milliseconds) start))) + (cond + ((< delta 500) 0.01) + ((< delta 5000) 0.1) + ((< delta 10000) 0.25) + (else 1.25)))) + (let ((res (api:retrieve-result-item id))) + (if res + res + (loop start))))) + +(define (api:queue-run-one) + (let* ((item (api:get-queue-item))) ;; this removes it from the in-queue + (if item + (let* ((id (api:queue-item-id item)) + (proc (api:queue-item-proc item)) + (result (proc))) + (api:queue-item-end-time-set! item (current-seconds)) + (api:queue-item-results-set! item result) + (api:put-item-in-results id item))))) +(define (api:queue-processor) + (let* ((thproc (lambda () + (let loop () + (api:queue-run-one) + (thread-sleep! 0.1) + (loop))))) + (let loop ((thnum 0)) + (thread-start! (make-thread thproc (conc "queue-thread-" thnum))) + (thread-sleep! 0.05) + (if (< thnum 20) + (loop (+ thnum 1)))))) +(define (api:tcp-dispatch-request-make-handler-core dbstruct api:dispatch-request) + (assert *toppath* "FATAL: api:tcp-dispatch-request-make-handler called but *toppath* not set.") + (if (not *server-signature*) + (set! *server-signature* (tt:mk-signature *toppath*))) + (lambda (indat) + (let* ((outer-proc (lambda (cmd run-id params) + (case cmd + ((ping) *server-signature*) ;; but ping in api:dispatch-request is (current-process-id)? + (else + (let* ((id (api:add-queue-item + (lambda () + (api:dispatch-request dbstruct cmd run-id params)) + cmd run-id params))) + (api:wait-for-result id))))))) + ;; (set! *api-process-request-count* numthreads) + (set! *db-last-access* (current-seconds)) + (match indat + ((cmd run-id params meta) + (let* ((start-t (current-milliseconds)) + ;; factor this out and move before this let, it is just + ;; an assert if not ping and dbfname is not correct + (db-ok (let* ((dbfname (dbmod:run-id->dbfname run-id)) + (ok (equal? dbfname (dbr:dbstruct-dbfname dbstruct)))) + (case cmd + ((ping) #t) ;; we are fine + (else + (assert ok "FATAL: database file and run-id not aligned."))))) + (ttdat *server-info*) + (server-state (tt-state ttdat)) + (status 'ok) ;; anything legit we can do with status? + (delay-wait 0) + (result (if (eq? cmd 'ping) + *server-signature* ;; (current-process-id) ;; process id or server-signature? + (outer-proc cmd run-id params))) + (meta (case cmd + ((ping) `((sstate . ,server-state))) + (else `((wait . ,delay-wait))))) + (errmsg "") + (payload (list status errmsg result meta))) + ;; (cmd run-id params meta) + (db:add-stats cmd run-id params (- (current-milliseconds) start-t)) + payload)) + (else + (assert #f "FATAL: failed to deserialize indat "indat)))))) ) Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -140,11 +140,11 @@ (define *pkts-info* (make-hash-table)) ;; store stuff like the last parent here (define *configinfo* #f) ;; raw results from setup, includes toppath and table from megatest.config (define *runconfigdat* #f) ;; run configs data (define *configdat* #f) ;; megatest.config data (define *configstatus* #f) ;; status of data; 'fulldata : all processing done, #f : no data yet, 'partialdata : partial read done -(define *toppath* #f) +;; (define *toppath* #f) ;; moved to commonmod (define *already-seen-runconfig-info* #f) (define *test-meta-updated* (make-hash-table)) (define *globalexitstatus* 0) ;; attempt to work around possible thread issues (define *passnum* 0) ;; when running track calls to run-tests or similar @@ -176,11 +176,11 @@ (define *transport-type* 'http) ;; override with [server] transport http|rpc|nmsg (define *runremote* #f) ;; if set up for server communication this will hold ;; (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) (define *server-id* #f) -(define *server-info* #f) ;; good candidate for easily convert to non-global +;; (define *server-info* #f) ;; good candidate for easily convert to non-global (define *time-to-exit* #f) (define *run-id* #f) (define *server-kind-run* (make-hash-table)) (define *home-host* #f) ;; (define *total-non-write-delay* 0) Index: commonmod.scm ================================================================== --- commonmod.scm +++ commonmod.scm @@ -135,10 +135,13 @@ (define (client:get-signature) (if *my-client-signature* *my-client-signature* (let ((sig (conc (get-host-name) " " (current-process-id)))) (set! *my-client-signature* sig) *my-client-signature*))) + +(define *server-info* #f) +(define *toppath* #f) ;;====================================================================== ;; config file utils ;;====================================================================== Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -4630,11 +4630,11 @@ (set! *time-to-exit* #t) #t)))) (debug:print-info 4 *default-log-port* "starting exit process, finalizing databases.") (if (and no-hurry (debug:debug-mode 18)) - (rmt:print-db-stats)) + (dbmod:print-db-stats)) (let ((th1 (make-thread (lambda () ;; thread for cleaning up, give it five seconds (if *dbstruct-dbs* (db:close-all *dbstruct-dbs*)) ;; one second allocated (if (list? *on-exit-procs*) (for-each (lambda (proc) Index: dbmod.scm ================================================================== --- dbmod.scm +++ dbmod.scm @@ -898,11 +898,11 @@ ;; (define *dbstruct-dbs* #f) ;; used to cache the dbstruct in db:setup. Goal is to remove this. ;; db stats (define *db-stats* (make-hash-table)) ;; hash of vectors < count duration-total > (define *db-stats-mutex* (make-mutex)) -(define (rmt:print-db-stats) +(define (dbmod:print-db-stats) (let ((fmtstr "~40a~8-d~20-d~20,2-f")) ;; "~20,2-f" (debug:print 0 *default-log-port* "DB Stats\n========") (debug:print 0 *default-log-port* (format #f "~40a~8a~20a~10a" "Cmd" "Count" "TotTime" "Avg")) (for-each (lambda (cmd) (let* ((dat (hash-table-ref *db-stats* cmd)) Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -55,10 +55,12 @@ (declare (uses dbmod.import)) (declare (uses portlogger)) (declare (uses portlogger.import)) (declare (uses tcp-transportmod)) (declare (uses tcp-transportmod.import)) +(declare (uses apimod)) +(declare (uses apimod.import)) (declare (uses rmtmod)) (declare (uses rmtmod.import)) ;; (declare (uses debugprint)) ;; (declare (uses debugprint.import)) @@ -72,10 +74,11 @@ commonmod dbfile portlogger tcp-transportmod rmtmod + apimod ) (define *db* #f) ;; this is only for the repl, do not use in general!!!! (include "common_records.scm") Index: tcp-transportmod.scm ================================================================== --- tcp-transportmod.scm +++ tcp-transportmod.scm @@ -136,11 +136,11 @@ ;; parameters ;; (define tt-server-timeout-param (make-parameter 600)) ;; make ttdat visible -(define *server-info* #f) +;; (define *server-info* #f) ;; get this from commonmod (define *server-run* #t) (define (tt:make-remote areapath) (make-tt areapath: areapath))