@@ -18,15 +18,315 @@ ;;====================================================================== (declare (unit apimod)) (declare (uses commonmod)) +(declare (uses debugprint)) +(declare (uses dbmod)) +(declare (uses dbfile)) +(declare (uses tcp-transportmod)) (module apimod * (import scheme chicken data-structures extras) -(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18) +(import (prefix sqlite3 sqlite3:) posix matchable typed-records srfi-1 srfi-18 srfi-69 ) (import commonmod) +(import debugprint) +(import dbmod) +(import dbfile) +(import tcp-transportmod) + +;; allow these queries through without starting a server +;; +(define api:read-only-queries + '(get-key-val-pairs + get-var + get-keys + get-key-vals + test-toplevel-num-items + get-test-info-by-id + get-test-state-status-by-id + get-steps-info-by-id + get-data-info-by-id + test-get-rundir-from-test-id + get-count-tests-running-for-testname + get-count-tests-running + get-count-tests-running-in-jobgroup + get-previous-test-run-record + get-matching-previous-test-run-records + test-get-logfile-info + test-get-records-for-index-file + get-testinfo-state-status + test-get-top-process-pid + test-get-paths-matching-keynames-target-new + get-prereqs-not-met + get-count-tests-running-for-run-id + get-run-info + get-run-status + get-run-state + get-run-stats + get-run-times + get-target + get-targets + ;; register-run + get-tests-tags + get-test-times + get-tests-for-run + get-tests-for-run-state-status + get-test-id + get-tests-for-runs-mindata + get-tests-for-run-mindata + get-run-name-from-id + get-runs + simple-get-runs + get-num-runs + get-runs-cnt-by-patt + get-all-run-ids + get-prev-run-ids + get-run-ids-matching-target + get-runs-by-patt + get-steps-data + get-steps-for-test + read-test-data + read-test-data-varpatt + login + tasks-get-last + testmeta-get-record + have-incompletes? + get-changed-record-ids + get-all-runids + get-changed-record-test-ids + get-changed-record-run-ids + get-run-record-ids + get-not-completed-cnt)) + +(define api:write-queries + '( + get-keys-write ;; dummy "write" query to force server start + + ;; SERVERS + ;; start-server + ;; kill-server + + ;; TESTS + test-set-state-status-by-id + delete-test-records + delete-old-deleted-test-records + test-set-state-status + test-set-top-process-pid + set-state-status-and-roll-up-items + + update-pass-fail-counts + top-test-set-per-pf-counts ;; (db:top-test-set-per-pf-counts (db:get-db *db* 5) 5 "runfirst") + + ;; RUNS + register-run + set-tests-state-status + delete-run + lock/unlock-run + update-run-event_time + mark-incomplete + set-state-status-and-roll-up-run + ;; STEPS + teststep-set-status! + delete-steps-for-test + ;; TEST DATA + test-data-rollup + csv->test-data + + ;; MISC + sync-cachedb->db + drop-all-triggers + create-all-triggers + update-tesdata-on-repilcate-db + + ;; TESTMETA + testmeta-add-record + testmeta-update-field + + ;; TASKS + tasks-add + tasks-set-state-given-param-key + )) + +(define *db-write-mutexes* (make-hash-table)) +(define *server-signature* #f) + +(define *api-threads* '()) +(define (api:register-thread th-in) + (set! *api-threads* (cons (cons th-in (current-seconds)) *api-threads*))) + +(define (api:unregister-thread th-in) + (set! *api-threads* (filter (lambda (thdat) + (not (eq? th-in (car thdat)))) + *api-threads*))) + +(define (api:remove-dead-or-terminated) + (set! *api-threads* (filter (lambda (thdat) + (not (member (thread-state (car thdat)) '(terminated dead)))) + *api-threads*))) + +(define (api:get-count-threads-alive) + (length *api-threads*)) + +(define *api:last-stats-print* 0) +(define *api-print-db-stats-mutex* (make-mutex)) +(define (api:print-db-stats) + (debug:print-info 0 *default-log-port* "Started periodic db stats printer") + (let loop () + (mutex-lock! *api-print-db-stats-mutex*) + (if (> (- (current-seconds) *api:last-stats-print*) 15) + (begin + (dbmod:print-db-stats) + (set! *api:last-stats-print* (current-seconds)))) + (mutex-unlock! *api-print-db-stats-mutex*) + (thread-sleep! 5) + (loop))) + +;; QUEUE METHOD + +(define *api:queue-mutex* (make-mutex)) +(define *api:queue-id* 0) + +(define *api:in-queue* '()) +(define *api:results* (make-hash-table)) ;; id->queue-item + +(defstruct api:queue-item + (proc #f) + (cmd #f) + (run-id #f) + (params #f) + (start-time (current-seconds)) + (end-time #f) + (id #f) + (results #f)) + +;; Add an item to the incoming queue. +;; +(define (api:add-queue-item proc cmd run-id params) + (mutex-lock! *api:queue-mutex*) + (set! *api:queue-id* (+ *api:queue-id* 1)) + (set! *api:in-queue* + (cons (make-api:queue-item + proc: proc + cmd: cmd + run-id: run-id + params: params + id: *api:queue-id* + ) + *api:in-queue*)) + (let ((id *api:queue-id*)) + (mutex-unlock! *api:queue-mutex*) + id)) ;; return id so calling proc can find the result in *api:results* + +;; get a queue item from the end of the queue. +;; return #f if there are no items to be processed. +;; +(define (api:get-queue-item) + (mutex-lock! *api:queue-mutex*) + (let* ((res (if (null? *api:in-queue*) + #f + (let* ((revlist (reverse *api:in-queue*))) + (set! *api:in-queue* (reverse (cdr revlist))) + (car revlist))))) + (mutex-unlock! *api:queue-mutex*) + res)) + +(define (api:put-item-in-results id item) + (hash-table-set! *api:results* id item)) + +(define (api:retrieve-result-item id) + (let ((res (hash-table-ref/default *api:results* id #f))) + (if res + (begin + (hash-table-delete! *api:results* id) + res) + #f))) + +;; timeout is in ms, poll less frequently over time +;; +;; Yes, it would be better to do this with mailboxes. My last attempt to use +;; mailboxes resulted in erratic behavior but that was likely due to something +;; unrelated. Just to eliminate uncertainty we'll start with polling and switch +;; to mailboxes laters. +;; +(define (api:wait-for-result id #!key (timeout 30000)) + (let loop ((start (current-milliseconds))) + (thread-sleep! (let ((delta (- (current-milliseconds) start))) + (cond + ((< delta 500) 0.01) + ((< delta 5000) 0.1) + ((< delta 10000) 0.25) + (else 1.25)))) + (let ((res (api:retrieve-result-item id))) + (if res + res + (loop start))))) + +(define (api:queue-run-one) + (let* ((item (api:get-queue-item))) ;; this removes it from the in-queue + (if item + (let* ((id (api:queue-item-id item)) + (proc (api:queue-item-proc item)) + (result (proc))) + (api:queue-item-end-time-set! item (current-seconds)) + (api:queue-item-results-set! item result) + (api:put-item-in-results id item))))) +(define (api:queue-processor) + (let* ((thproc (lambda () + (let loop () + (api:queue-run-one) + (thread-sleep! 0.1) + (loop))))) + (let loop ((thnum 0)) + (thread-start! (make-thread thproc (conc "queue-thread-" thnum))) + (thread-sleep! 0.05) + (if (< thnum 20) + (loop (+ thnum 1)))))) +(define (api:tcp-dispatch-request-make-handler-core dbstruct api:dispatch-request) + (assert *toppath* "FATAL: api:tcp-dispatch-request-make-handler called but *toppath* not set.") + (if (not *server-signature*) + (set! *server-signature* (tt:mk-signature *toppath*))) + (lambda (indat) + (let* ((outer-proc (lambda (cmd run-id params) + (case cmd + ((ping) *server-signature*) ;; but ping in api:dispatch-request is (current-process-id)? + (else + (let* ((id (api:add-queue-item + (lambda () + (api:dispatch-request dbstruct cmd run-id params)) + cmd run-id params))) + (api:wait-for-result id))))))) + ;; (set! *api-process-request-count* numthreads) + (set! *db-last-access* (current-seconds)) + (match indat + ((cmd run-id params meta) + (let* ((start-t (current-milliseconds)) + ;; factor this out and move before this let, it is just + ;; an assert if not ping and dbfname is not correct + (db-ok (let* ((dbfname (dbmod:run-id->dbfname run-id)) + (ok (equal? dbfname (dbr:dbstruct-dbfname dbstruct)))) + (case cmd + ((ping) #t) ;; we are fine + (else + (assert ok "FATAL: database file and run-id not aligned."))))) + (ttdat *server-info*) + (server-state (tt-state ttdat)) + (status 'ok) ;; anything legit we can do with status? + (delay-wait 0) + (result (if (eq? cmd 'ping) + *server-signature* ;; (current-process-id) ;; process id or server-signature? + (outer-proc cmd run-id params))) + (meta (case cmd + ((ping) `((sstate . ,server-state))) + (else `((wait . ,delay-wait))))) + (errmsg "") + (payload (list status errmsg result meta))) + ;; (cmd run-id params meta) + (db:add-stats cmd run-id params (- (current-milliseconds) start-t)) + payload)) + (else + (assert #f "FATAL: failed to deserialize indat "indat)))))) )