Index: api.scm ================================================================== --- api.scm +++ api.scm @@ -35,11 +35,12 @@ (use srfi-69 srfi-18 posix matchable - s11n) + s11n + typed-records) ;; allow these queries through without starting a server ;; (define api:read-only-queries '(get-key-val-pairs @@ -152,78 +153,10 @@ tasks-set-state-given-param-key )) (define *db-write-mutexes* (make-hash-table)) (define *server-signature* #f) -;; ;; These are called by the server on recipt of /api calls -;; ;; - keep it simple, only return the actual result of the call, i.e. no meta info here -;; ;; -;; ;; - returns #( flag result ) -;; ;; -;; (define (api:execute-requests dbstruct dat) -;; (if (> *api-process-request-count* 50) -;; (begin -;; (if (common:low-noise-print 30 "too many threads") -;; (debug:print 0 *default-log-port* "WARNING: "*api-process-request-count*" threads, potential overload, adding 0.5 sec delay.")) -;; ;; (thread-sleep! 0.5) ;; take a nap - no, the napping is moved to the clients via tt:backoff-incr -;; )) -;; (cond -;; ((not (vector? dat)) ;; it is an error to not receive a vector -;; (vector #f (vector #f "remote must be called with a vector"))) -;; (else -;; (let* ((cmd-in (vector-ref dat 0)) -;; (cmd (if (symbol? cmd-in) -;; cmd-in -;; (string->symbol cmd-in))) -;; (params (vector-ref dat 1)) -;; (run-id (if (null? params) -;; 0 -;; (car params))) -;; (write-mutex (if (hash-table-exists? *db-write-mutexes* run-id) -;; (hash-table-ref *db-write-mutexes* run-id) -;; (let* ((newmutex (make-mutex))) -;; (hash-table-set! *db-write-mutexes* run-id newmutex) -;; newmutex))) -;; (start-t (current-milliseconds)) -;; (readonly-mode (dbr:dbstruct-read-only dbstruct)) -;; (readonly-command (member cmd api:read-only-queries)) -;; (writecmd-in-readonly-mode (and readonly-mode (not readonly-command)))) -;; (if (not readonly-command) -;; (mutex-lock! write-mutex)) -;; (let* ((tmppath (dbr:dbstruct-tmppath dbstruct)) -;; (clean-run-id (cond -;; ((number? run-id) run-id) -;; ((equal? run-id #f) "main") -;; (else "other"))) -;; (crumbfile (dbfile:wait-for-qif tmppath clean-run-id (cons cmd params))) -;; (res -;; (if writecmd-in-readonly-mode -;; (conc "attempt to run write command "cmd" on a read-only database") -;; (api:dispatch-request dbstruct cmd run-id params)))) -;; (delete-file* crumbfile) -;; (if (not readonly-command) -;; (mutex-unlock! write-mutex)) -;; -;; ;; save all stats -;; (let ((delta-t (- (current-milliseconds) -;; start-t)) -;; (modified-cmd (if (eq? cmd 'general-call) -;; (string->symbol (conc "general-call-" (car params))) -;; cmd))) -;; (hash-table-set! *db-api-call-time* modified-cmd -;; (cons delta-t (hash-table-ref/default *db-api-call-time* modified-cmd '())))) -;; (if writecmd-in-readonly-mode -;; (begin -;; #;(common:telemetry-log (conc "api-out:"(->string cmd)) -;; payload: `((params . ,params) -;; (ok-res . #t))) -;; (vector #f res)) -;; (begin -;; #;(common:telemetry-log (conc "api-out:"(->string cmd)) -;; payload: `((params . ,params) -;; (ok-res . #f))) -;; (vector #t res)))))))) (define *api-threads* '()) (define (api:register-thread th-in) (set! *api-threads* (cons (cons th-in (current-seconds)) *api-threads*))) @@ -237,11 +170,25 @@ (not (member (thread-state (car thdat)) '(terminated dead)))) *api-threads*))) (define (api:get-count-threads-alive) (length *api-threads*)) - + +(define *api:last-stats-print* 0) +(define *api-print-db-stats-mutex* (make-mutex)) +(define (api:print-db-stats) + (debug:print-info 0 *default-log-port* "Started periodic db stats printer") + (let loop () + (mutex-lock! *api-print-db-stats-mutex*) + (if (> (- (current-seconds) *api:last-stats-print*) 15) + (begin + (rmt:print-db-stats) + (set! *api:last-stats-print* (current-seconds)))) + (mutex-unlock! *api-print-db-stats-mutex*) + (thread-sleep! 5) + (loop))) + ;; indat is (cmd run-id params meta) ;; ;; WARNING: Do not print anything in the lambda of this function as it ;; reads/writes to current in/out port @@ -250,86 +197,96 @@ (assert *toppath* "FATAL: api:tcp-dispatch-request-make-handler called but *toppath* not set.") (if (not *server-signature*) (set! *server-signature* (tt:mk-signature *toppath*))) (lambda (indat) (api:register-thread (current-thread)) - (let* (;; (indat (deserialize)) - (newcount (+ *api-process-request-count* 1)) - (numthreads (api:get-count-threads-alive)) - (delay-wait (if (> newcount 10) - (- newcount 10) - 0)) - (normal-proc (lambda (cmd run-id params) - (case cmd - ((ping) *server-signature*) - (else - (api:dispatch-request dbstruct cmd run-id params)))))) - (set! *api-process-request-count* newcount) - (set! *db-last-access* (current-seconds)) - (if (not (eq? newcount numthreads)) - (begin - (api:remove-dead-or-terminated) - (let ((threads-now (api:get-count-threads-alive))) - (debug:print 0 *default-log-port* "WARNING: newcount="newcount", numthreads="numthreads", remaining="threads-now) - (set! newcount threads-now)))) - (match indat - ((cmd run-id params meta) - (let* ((db-ok (let* ((dbfname (dbmod:run-id->dbfname run-id)) - (ok (equal? dbfname (dbr:dbstruct-dbfname dbstruct)))) - (case cmd - ((ping) #t) ;; we are fine - (else - (if (not ok)(debug:print 0 *default-log-port* "ERROR: "cmd", run-id "run-id", not correct for dbfname "(dbr:dbstruct-dbfname dbstruct))) - (assert ok "FATAL: database file and run-id not aligned."))))) - (ttdat *server-info*) - (server-state (tt-state ttdat)) - (status (cond - ((> newcount 3) 'busy) - ;; ((> newcount 5) 'loaded) ;; this gets transmitted to the client which calls tt:backoff-incr to slow stuff down. - (else 'ok))) - (errmsg (case status - ((busy) (conc "Server overloaded, "newcount" threads in flight")) - ((loaded) (conc "Server loaded, "newcount" threads in flight")) - (else #f))) - (result (case status - ((busy) - (if (eq? cmd 'ping) - (normal-proc cmd run-id params) - ;; newcount must be greater than 5 for busy - (* 1 (- newcount 3)) ;; was 15 - )) ;; (- newcount 29)) ;; call back in as many seconds - ((loaded) -;; (if (eq? (rmt:transport-mode) 'tcp) -;; (thread-sleep! 0.5)) - (normal-proc cmd run-id params)) - (else - (normal-proc cmd run-id params)))) - (meta (case cmd - ((ping) `((sstate . ,server-state))) - (else `((wait . ,delay-wait))))) - (payload (list status errmsg result meta))) - (set! *api-process-request-count* (- *api-process-request-count* 1)) - ;; (serialize payload) - (api:unregister-thread (current-thread)) - payload)) - (else - (assert #f "FATAL: failed to deserialize indat "indat)))))) + (let* ((result + (let* ((numthreads (api:get-count-threads-alive)) + (delay-wait (if (> numthreads 10) + (- numthreads 10) + 0)) + (normal-proc (lambda (cmd run-id params) + (case cmd + ((ping) *server-signature*) + (else + (api:dispatch-request dbstruct cmd run-id params)))))) + (set! *api-process-request-count* numthreads) + (set! *db-last-access* (current-seconds)) +;; (if (not (eq? numthreads numthreads)) +;; (begin +;; (api:remove-dead-or-terminated) +;; (let ((threads-now (api:get-count-threads-alive))) +;; (debug:print 0 *default-log-port* "WARNING: numthreads="numthreads", numthreads="numthreads", remaining="threads-now) +;; (set! numthreads threads-now)))) + (match indat + ((cmd run-id params meta) + (let* ((start-t (current-milliseconds)) + (db-ok (let* ((dbfname (dbmod:run-id->dbfname run-id)) + (ok (equal? dbfname (dbr:dbstruct-dbfname dbstruct)))) + (case cmd + ((ping) #t) ;; we are fine + (else + (assert ok "FATAL: database file and run-id not aligned."))))) + (ttdat *server-info*) + (server-state (tt-state ttdat)) + (maxthreads 10) ;; make this a parameter? + (status (cond + ((and (> numthreads maxthreads) + (> (random 100) 70)) ;; allow a 30% probability to go through so we can figure out what is going wrong in main.db server. + 'busy) + ;; ((> numthreads 5) 'loaded) ;; this gets transmitted to the client which calls tt:backoff-incr to slow stuff down. + (else 'ok))) + (errmsg (case status + ((busy) (conc "Server overloaded, "numthreads" threads in flight")) + ((loaded) (conc "Server loaded, "numthreads" threads in flight")) + (else #f))) + (result (case status + ((busy) + (if (eq? cmd 'ping) + (normal-proc cmd run-id params) + ;; numthreads must be greater than 5 for busy + (* 1 (- numthreads maxthreads)) ;; was 15 + )) ;; (- numthreads 29)) ;; call back in as many seconds + ((loaded) + ;; (if (eq? (rmt:transport-mode) 'tcp) + ;; (thread-sleep! 0.5)) + (normal-proc cmd run-id params)) + (else + (normal-proc cmd run-id params)))) + (meta (case cmd + ((ping) `((sstate . ,server-state))) + (else `((wait . ,delay-wait))))) + (payload (list status errmsg result meta))) + ;; (cmd run-id params meta) + (db:add-stats cmd run-id params (- (current-milliseconds) start-t)) + payload)) + (else + (assert #f "FATAL: failed to deserialize indat "indat)))))) + ;; (set! *api-process-request-count* (- *api-process-request-count* 1)) + ;; (serialize payload) + + (api:unregister-thread (current-thread)) + result))) + + (define *api-halt-writes* #f) (define (api:dispatch-request dbstruct cmd run-id params) (if (not *no-sync-db*) (db:open-no-sync-db)) - (if (member cmd api:write-queries) - (let loop ((start-time (current-milliseconds))) - (if *api-halt-writes* - (begin - (thread-sleep! 0.2) - (if (< (- (current-milliseconds) start-time) - 5000) ;; hope it don't take more than five seconds to sync - (loop start-time) - (debug:print 0 *default-log-port* "ERROR: writes halted for more than 5 seconds, sync might be taking too long")))))) + (let* ((start-time (current-milliseconds))) + (if (member cmd api:write-queries) + (let loop () + (if *api-halt-writes* + (begin + (thread-sleep! 0.2) + (if (< (- (current-milliseconds) start-time) + 5000) ;; hope it don't take more than five seconds to sync + (loop-time) + #;(debug:print 0 *default-log-port* "ERROR: writes halted for more than 5 seconds, sync might be taking too long")))))) + (db:add-stats 'api-write-blocking-for-sync run-id params (- (current-milliseconds) start-time))) (case cmd ;;=============================================== ;; READ/WRITE QUERIES ;;=============================================== @@ -523,43 +480,5 @@ ((find-task-queue-records) (apply tasks:find-task-queue-records dbstruct params)) (else (debug:print 0 *default-log-port* "ERROR: bad api call " cmd) (conc "ERROR: BAD api call " cmd)))) -;; http-server send-response -;; api:process-request -;; db:* -;; -;; NB// Runs on the server as part of the server loop -;; -(define (api:process-request dbstruct $) ;; the $ is the request vars proc - (debug:print 4 *default-log-port* "server-id:" *server-id*) - (let* ((cmd ($ 'cmd)) - (paramsj ($ 'params)) - (key ($ 'key)) - (params (db:string->obj paramsj transport: 'http))) ;; incoming data from the POST (or is it a GET?) - (debug:print 4 *default-log-port* "cmd:" cmd " with params " params "key " key) - (if (equal? key *server-id*) - (begin - (set! *api-process-request-count* (+ *api-process-request-count* 1)) - (let* ((resdat (api:execute-requests dbstruct (vector cmd params))) ;; process the request, resdat = #( flag result ) - (success (vector-ref resdat 0)) - (res (vector-ref resdat 1))) ;; (vector flag payload), get the payload, ignore the flag (why?) - (debug:print 4 *default-log-port* "res:" res) - (if (not success) - (debug:print 0 *default-log-port* "ERROR: success flag is #f for " cmd " with params " params)) - (if (> *api-process-request-count* *max-api-process-requests*) - (set! *max-api-process-requests* *api-process-request-count*)) - (set! *api-process-request-count* (- *api-process-request-count* 1)) - ;; This can be here but needs controls to ensure it doesn't run more than every 4 seconds - ;; (rmt:dat->json-str - ;; (if (or (string? res) - ;; (list? res) - ;; (number? res) - ;; (boolean? res)) - ;; res - ;; (list "ERROR, not string, list, number or boolean" 1 cmd params res))))) - (db:obj->string res transport: 'http))) - (begin - (debug:print 0 *default-log-port* "Server refused to process request. Server id mismatch. recived " key " expected: " *server-id* ".\nOther arguments recived: cmd=" cmd " params = " params) - (db:obj->string (conc "Server refused to process request server-id mismatch: " key ", " *server-id*) transport: 'http))))) - Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -154,14 +154,10 @@ (define *time-zero* (current-seconds)) ;; for the watchdog (define *on-exit-procs* '()) ;; add procs to this list to be executed on exit (define *default-area-tag* "local") ;; DATABASE -;; (define *dbstruct-dbs* #f) ;; used to cache the dbstruct in db:setup. Goal is to remove this. -;; db stats -(define *db-stats* (make-hash-table)) ;; hash of vectors < count duration-total > -(define *db-stats-mutex* (make-mutex)) ;; db access (define *db-last-access* (current-seconds)) ;; last db access, used in server ;; (define *db-write-access* #t) ;; db sync ;; (define *db-last-sync* 0) ;; last time the sync to megatest.db happened Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -3721,25 +3721,35 @@ tags))) db "SELECT testname,tags FROM test_meta") (hash-table->alist res))))) +;; testmeta doesn't change, we can cache it for up too an hour + +(define *db:testmeta-cache* (make-hash-table)) +(define *db:testmeta-last-update* 0) + ;; read the record given a testname (define (db:testmeta-get-record dbstruct testname) - (let ((res #f)) - (db:with-db - dbstruct - #f - #f - (lambda (dbdat db) - (sqlite3:for-each-row - (lambda (id testname author owner description reviewed iterated avg_runtime avg_disk tags jobgroup) - (set! res (vector id testname author owner description reviewed iterated avg_runtime avg_disk tags jobgroup))) - db - "SELECT id,testname,author,owner,description,reviewed,iterated,avg_runtime,avg_disk,tags,jobgroup FROM test_meta WHERE testname=?;" - testname) - res)))) + (if (and (< (- (current-seconds) *db:testmeta-last-update*) 600) + (hash-table-exists? *db:testmeta-cache* testname)) + (hash-table-ref *db:testmeta-cache* testname) + (let ((res #f)) + (db:with-db + dbstruct + #f + #f + (lambda (dbdat db) + (sqlite3:for-each-row + (lambda (id testname author owner description reviewed iterated avg_runtime avg_disk tags jobgroup) + (set! res (vector id testname author owner description reviewed iterated avg_runtime avg_disk tags jobgroup))) + db + "SELECT id,testname,author,owner,description,reviewed,iterated,avg_runtime,avg_disk,tags,jobgroup FROM test_meta WHERE testname=?;" + testname))) + (hash-table-set! *db:testmeta-cache* testname res) + (set! *db:testmeta-last-update* (current-seconds)) + res))) ;; create a new record for a given testname (define (db:testmeta-add-record dbstruct testname) (db:with-db dbstruct #f #t (lambda (dbdat db) @@ -4518,11 +4528,10 @@ ;; ;; time to exit, close the no-sync db here ;; (db:no-sync-close-db no-sync-db stmt-cache) (if (common:low-noise-print 30) (debug:print-info 0 *default-log-port* "Exiting watchdog timer, *time-to-exit* = " *time-to-exit*" pid="(current-process-id) )))) )) - (define (std-exit-procedure) ;;(common:telemetry-log-close) (on-exit (lambda () 0)) ;; why is this here? ;;(debug:print-info 13 *default-log-port* "std-exit-procedure called; *time-to-exit*="*time-to-exit*) Index: dbmod.scm ================================================================== --- dbmod.scm +++ dbmod.scm @@ -31,10 +31,11 @@ chicken data-structures extras files + format (prefix sqlite3 sqlite3:) matchable posix typed-records srfi-1 @@ -827,10 +828,54 @@ (res (dbmod:sync-gasket tables last-update sdb ddb dest-db 'todisk keys))) (sqlite3:finalize! sdb) (sqlite3:finalize! ddb) res))) #f)) + +;; ====================================================================== +;; dbstats +;;====================================================================== + +;; (define *dbstruct-dbs* #f) ;; used to cache the dbstruct in db:setup. Goal is to remove this. +;; db stats +(define *db-stats* (make-hash-table)) ;; hash of vectors < count duration-total > +(define *db-stats-mutex* (make-mutex)) + +(define (rmt:print-db-stats) + (let ((fmtstr "~40a~7-d~9-d~20,2-f")) ;; "~20,2-f" + (debug:print 0 *default-log-port* "DB Stats\n========") + (debug:print 0 *default-log-port* (format #f "~40a~8a~10a~10a" "Cmd" "Count" "TotTime" "Avg")) + (for-each (lambda (cmd) + (let* ((dat (hash-table-ref *db-stats* cmd)) + (count (dbstat-cnt dat)) + (tottime (dbstat-tottime dat))) + (debug:print 0 *default-log-port* + (format #f fmtstr cmd count tottime + (/ tottime count))))) + (sort (hash-table-keys *db-stats*) + (lambda (a b) + (> (dbstat-tottime (hash-table-ref *db-stats* a)) + (dbstat-tottime (hash-table-ref *db-stats* b)))))))) + +(defstruct dbstat + (cnt 0) + (tottime 0)) + +(define (db:add-stats cmd run-id params delta) + (let* ((modified-cmd (if (eq? cmd 'general-call) + (string->symbol (conc "general-call-" (car params))) + cmd)) + (rec (hash-table-ref/default *db-stats* modified-cmd #f))) + (if (not rec) + (let ((new-rec (make-dbstat))) + (hash-table-set! *db-stats* modified-cmd new-rec) + (set! rec new-rec))) + (dbstat-cnt-set! rec (+ (dbstat-cnt rec) 1)) + (dbstat-tottime-set! rec (+ (dbstat-tottime rec) delta)))) + + + ) ;; ATTIC Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -970,10 +970,11 @@ (case (rmt:transport-mode) ((tcp) (let* ((timeout (server:expiration-timeout))) (debug:print 0 *default-log-port* "INFO: Starting server for " dbfname " using tcp method with server timeout of "timeout) (tt-server-timeout-param timeout) + (thread-start! (make-thread api:print-db-stats "print-db-stats")) (if dbfname (tt:start-server tl #f dbfname api:tcp-dispatch-request-make-handler keys) (begin (debug:print 0 *default-log-port* "ERROR: transport mode is tcp - -db is required.") (exit 1))))) Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -114,22 +114,10 @@ ;; (define (nfs-transport-handler runremote cmd run-id params attemptnum area-dat areapath readonly-mode testsuite mtexe) ;; (let* ((keys (common:get-fields *configdat*)) ;; (dbstruct (dbmod:nfs-get-dbstruct run-id keys (dbfile:db-init-proc) areapath tmpadj: "/dashboard"))) ;; (api:dispatch-request dbstruct cmd run-id params))) -(define (rmt:print-db-stats) - (let ((fmtstr "~40a~7-d~9-d~20,2-f")) ;; "~20,2-f" - (debug:print 18 *default-log-port* "DB Stats\n========") - (debug:print 18 *default-log-port* (format #f "~40a~8a~10a~10a" "Cmd" "Count" "TotTime" "Avg")) - (for-each (lambda (cmd) - (let ((cmd-dat (hash-table-ref *db-stats* cmd))) - (debug:print 18 *default-log-port* (format #f fmtstr cmd (vector-ref cmd-dat 0) (vector-ref cmd-dat 1) (/ (vector-ref cmd-dat 1)(vector-ref cmd-dat 0)))))) - (sort (hash-table-keys *db-stats*) - (lambda (a b) - (> (vector-ref (hash-table-ref *db-stats* a) 0) - (vector-ref (hash-table-ref *db-stats* b) 0))))))) - (define (rmt:get-max-query-average run-id) (mutex-lock! *db-stats-mutex*) (let* ((runkey (conc "run-id=" run-id " ")) (cmds (filter (lambda (x) (substring-index runkey x)) Index: runs.scm ================================================================== --- runs.scm +++ runs.scm @@ -2064,11 +2064,11 @@ ;; ;; There is now a single call to runs:update-all-test_meta and this ;; per-test call is not needed. Given the delicacy of the move to ;; v1.55 this code is being left in place for the time being. ;; - (if (not (hash-table-ref/default *test-meta-updated* test-name #f)) + (if (not (hash-table-exists? *test-meta-updated* test-name)) (begin (hash-table-set! *test-meta-updated* test-name #t) (runs:update-test_meta test-name test-conf))) ;; itemdat => ((ripeness "overripe") (temperature "cool") (season "summer")) Index: tcp-transportmod.scm ================================================================== --- tcp-transportmod.scm +++ tcp-transportmod.scm @@ -255,12 +255,13 @@ (begin (debug:print 0 *default-log-port* "Server is loaded, delaying "delay-wait" seconds") (thread-sleep! delay-wait))))) (case status ((busy) ;; result will be how long the server wants you to delay - (let* ((dly (if (number? result) result 0.1))) - (debug:print 0 *default-log-port* "WARNING: server for "dbfname" is busy, will try again in "dly" seconds.") + (let* ((raw-dly (if (number? result) result 0.1)) + (dly (* raw-dly (/ attemptnum 2)))) + (debug:print 0 *default-log-port* "WARNING: server for "dbfname" is busy, cmd is "cmd", will try again in "dly" seconds. This is attempt "(- attemptnum 1)) (thread-sleep! dly) (tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe server-start-proc))) ((loaded) (debug:print 0 *default-log-port* "WARNING: server for "dbfname" is loaded, slowing queries.") (tt:backoff-incr (tt-conn-host conn)(tt-conn-port conn))