@@ -35,11 +35,12 @@ (use srfi-69 srfi-18 posix matchable - s11n) + s11n + typed-records) ;; allow these queries through without starting a server ;; (define api:read-only-queries '(get-key-val-pairs @@ -152,78 +153,10 @@ tasks-set-state-given-param-key )) (define *db-write-mutexes* (make-hash-table)) (define *server-signature* #f) -;; ;; These are called by the server on recipt of /api calls -;; ;; - keep it simple, only return the actual result of the call, i.e. no meta info here -;; ;; -;; ;; - returns #( flag result ) -;; ;; -;; (define (api:execute-requests dbstruct dat) -;; (if (> *api-process-request-count* 50) -;; (begin -;; (if (common:low-noise-print 30 "too many threads") -;; (debug:print 0 *default-log-port* "WARNING: "*api-process-request-count*" threads, potential overload, adding 0.5 sec delay.")) -;; ;; (thread-sleep! 0.5) ;; take a nap - no, the napping is moved to the clients via tt:backoff-incr -;; )) -;; (cond -;; ((not (vector? dat)) ;; it is an error to not receive a vector -;; (vector #f (vector #f "remote must be called with a vector"))) -;; (else -;; (let* ((cmd-in (vector-ref dat 0)) -;; (cmd (if (symbol? cmd-in) -;; cmd-in -;; (string->symbol cmd-in))) -;; (params (vector-ref dat 1)) -;; (run-id (if (null? params) -;; 0 -;; (car params))) -;; (write-mutex (if (hash-table-exists? *db-write-mutexes* run-id) -;; (hash-table-ref *db-write-mutexes* run-id) -;; (let* ((newmutex (make-mutex))) -;; (hash-table-set! *db-write-mutexes* run-id newmutex) -;; newmutex))) -;; (start-t (current-milliseconds)) -;; (readonly-mode (dbr:dbstruct-read-only dbstruct)) -;; (readonly-command (member cmd api:read-only-queries)) -;; (writecmd-in-readonly-mode (and readonly-mode (not readonly-command)))) -;; (if (not readonly-command) -;; (mutex-lock! write-mutex)) -;; (let* ((tmppath (dbr:dbstruct-tmppath dbstruct)) -;; (clean-run-id (cond -;; ((number? run-id) run-id) -;; ((equal? run-id #f) "main") -;; (else "other"))) -;; (crumbfile (dbfile:wait-for-qif tmppath clean-run-id (cons cmd params))) -;; (res -;; (if writecmd-in-readonly-mode -;; (conc "attempt to run write command "cmd" on a read-only database") -;; (api:dispatch-request dbstruct cmd run-id params)))) -;; (delete-file* crumbfile) -;; (if (not readonly-command) -;; (mutex-unlock! write-mutex)) -;; -;; ;; save all stats -;; (let ((delta-t (- (current-milliseconds) -;; start-t)) -;; (modified-cmd (if (eq? cmd 'general-call) -;; (string->symbol (conc "general-call-" (car params))) -;; cmd))) -;; (hash-table-set! *db-api-call-time* modified-cmd -;; (cons delta-t (hash-table-ref/default *db-api-call-time* modified-cmd '())))) -;; (if writecmd-in-readonly-mode -;; (begin -;; #;(common:telemetry-log (conc "api-out:"(->string cmd)) -;; payload: `((params . ,params) -;; (ok-res . #t))) -;; (vector #f res)) -;; (begin -;; #;(common:telemetry-log (conc "api-out:"(->string cmd)) -;; payload: `((params . ,params) -;; (ok-res . #f))) -;; (vector #t res)))))))) (define *api-threads* '()) (define (api:register-thread th-in) (set! *api-threads* (cons (cons th-in (current-seconds)) *api-threads*))) @@ -237,11 +170,25 @@ (not (member (thread-state (car thdat)) '(terminated dead)))) *api-threads*))) (define (api:get-count-threads-alive) (length *api-threads*)) - + +(define *api:last-stats-print* 0) +(define *api-print-db-stats-mutex* (make-mutex)) +(define (api:print-db-stats) + (debug:print-info 0 *default-log-port* "Started periodic db stats printer") + (let loop () + (mutex-lock! *api-print-db-stats-mutex*) + (if (> (- (current-seconds) *api:last-stats-print*) 15) + (begin + (rmt:print-db-stats) + (set! *api:last-stats-print* (current-seconds)))) + (mutex-unlock! *api-print-db-stats-mutex*) + (thread-sleep! 5) + (loop))) + ;; indat is (cmd run-id params meta) ;; ;; WARNING: Do not print anything in the lambda of this function as it ;; reads/writes to current in/out port @@ -250,86 +197,96 @@ (assert *toppath* "FATAL: api:tcp-dispatch-request-make-handler called but *toppath* not set.") (if (not *server-signature*) (set! *server-signature* (tt:mk-signature *toppath*))) (lambda (indat) (api:register-thread (current-thread)) - (let* (;; (indat (deserialize)) - (newcount (+ *api-process-request-count* 1)) - (numthreads (api:get-count-threads-alive)) - (delay-wait (if (> newcount 10) - (- newcount 10) - 0)) - (normal-proc (lambda (cmd run-id params) - (case cmd - ((ping) *server-signature*) - (else - (api:dispatch-request dbstruct cmd run-id params)))))) - (set! *api-process-request-count* newcount) - (set! *db-last-access* (current-seconds)) - (if (not (eq? newcount numthreads)) - (begin - (api:remove-dead-or-terminated) - (let ((threads-now (api:get-count-threads-alive))) - (debug:print 0 *default-log-port* "WARNING: newcount="newcount", numthreads="numthreads", remaining="threads-now) - (set! newcount threads-now)))) - (match indat - ((cmd run-id params meta) - (let* ((db-ok (let* ((dbfname (dbmod:run-id->dbfname run-id)) - (ok (equal? dbfname (dbr:dbstruct-dbfname dbstruct)))) - (case cmd - ((ping) #t) ;; we are fine - (else - (if (not ok)(debug:print 0 *default-log-port* "ERROR: "cmd", run-id "run-id", not correct for dbfname "(dbr:dbstruct-dbfname dbstruct))) - (assert ok "FATAL: database file and run-id not aligned."))))) - (ttdat *server-info*) - (server-state (tt-state ttdat)) - (status (cond - ((> newcount 3) 'busy) - ;; ((> newcount 5) 'loaded) ;; this gets transmitted to the client which calls tt:backoff-incr to slow stuff down. - (else 'ok))) - (errmsg (case status - ((busy) (conc "Server overloaded, "newcount" threads in flight")) - ((loaded) (conc "Server loaded, "newcount" threads in flight")) - (else #f))) - (result (case status - ((busy) - (if (eq? cmd 'ping) - (normal-proc cmd run-id params) - ;; newcount must be greater than 5 for busy - (* 1 (- newcount 3)) ;; was 15 - )) ;; (- newcount 29)) ;; call back in as many seconds - ((loaded) -;; (if (eq? (rmt:transport-mode) 'tcp) -;; (thread-sleep! 0.5)) - (normal-proc cmd run-id params)) - (else - (normal-proc cmd run-id params)))) - (meta (case cmd - ((ping) `((sstate . ,server-state))) - (else `((wait . ,delay-wait))))) - (payload (list status errmsg result meta))) - (set! *api-process-request-count* (- *api-process-request-count* 1)) - ;; (serialize payload) - (api:unregister-thread (current-thread)) - payload)) - (else - (assert #f "FATAL: failed to deserialize indat "indat)))))) + (let* ((result + (let* ((numthreads (api:get-count-threads-alive)) + (delay-wait (if (> numthreads 10) + (- numthreads 10) + 0)) + (normal-proc (lambda (cmd run-id params) + (case cmd + ((ping) *server-signature*) + (else + (api:dispatch-request dbstruct cmd run-id params)))))) + (set! *api-process-request-count* numthreads) + (set! *db-last-access* (current-seconds)) +;; (if (not (eq? numthreads numthreads)) +;; (begin +;; (api:remove-dead-or-terminated) +;; (let ((threads-now (api:get-count-threads-alive))) +;; (debug:print 0 *default-log-port* "WARNING: numthreads="numthreads", numthreads="numthreads", remaining="threads-now) +;; (set! numthreads threads-now)))) + (match indat + ((cmd run-id params meta) + (let* ((start-t (current-milliseconds)) + (db-ok (let* ((dbfname (dbmod:run-id->dbfname run-id)) + (ok (equal? dbfname (dbr:dbstruct-dbfname dbstruct)))) + (case cmd + ((ping) #t) ;; we are fine + (else + (assert ok "FATAL: database file and run-id not aligned."))))) + (ttdat *server-info*) + (server-state (tt-state ttdat)) + (maxthreads 10) ;; make this a parameter? + (status (cond + ((and (> numthreads maxthreads) + (> (random 100) 70)) ;; allow a 30% probability to go through so we can figure out what is going wrong in main.db server. + 'busy) + ;; ((> numthreads 5) 'loaded) ;; this gets transmitted to the client which calls tt:backoff-incr to slow stuff down. + (else 'ok))) + (errmsg (case status + ((busy) (conc "Server overloaded, "numthreads" threads in flight")) + ((loaded) (conc "Server loaded, "numthreads" threads in flight")) + (else #f))) + (result (case status + ((busy) + (if (eq? cmd 'ping) + (normal-proc cmd run-id params) + ;; numthreads must be greater than 5 for busy + (* 1 (- numthreads maxthreads)) ;; was 15 + )) ;; (- numthreads 29)) ;; call back in as many seconds + ((loaded) + ;; (if (eq? (rmt:transport-mode) 'tcp) + ;; (thread-sleep! 0.5)) + (normal-proc cmd run-id params)) + (else + (normal-proc cmd run-id params)))) + (meta (case cmd + ((ping) `((sstate . ,server-state))) + (else `((wait . ,delay-wait))))) + (payload (list status errmsg result meta))) + ;; (cmd run-id params meta) + (db:add-stats cmd run-id params (- (current-milliseconds) start-t)) + payload)) + (else + (assert #f "FATAL: failed to deserialize indat "indat)))))) + ;; (set! *api-process-request-count* (- *api-process-request-count* 1)) + ;; (serialize payload) + + (api:unregister-thread (current-thread)) + result))) + + (define *api-halt-writes* #f) (define (api:dispatch-request dbstruct cmd run-id params) (if (not *no-sync-db*) (db:open-no-sync-db)) - (if (member cmd api:write-queries) - (let loop ((start-time (current-milliseconds))) - (if *api-halt-writes* - (begin - (thread-sleep! 0.2) - (if (< (- (current-milliseconds) start-time) - 5000) ;; hope it don't take more than five seconds to sync - (loop start-time) - (debug:print 0 *default-log-port* "ERROR: writes halted for more than 5 seconds, sync might be taking too long")))))) + (let* ((start-time (current-milliseconds))) + (if (member cmd api:write-queries) + (let loop () + (if *api-halt-writes* + (begin + (thread-sleep! 0.2) + (if (< (- (current-milliseconds) start-time) + 5000) ;; hope it don't take more than five seconds to sync + (loop-time) + #;(debug:print 0 *default-log-port* "ERROR: writes halted for more than 5 seconds, sync might be taking too long")))))) + (db:add-stats 'api-write-blocking-for-sync run-id params (- (current-milliseconds) start-time))) (case cmd ;;=============================================== ;; READ/WRITE QUERIES ;;=============================================== @@ -523,43 +480,5 @@ ((find-task-queue-records) (apply tasks:find-task-queue-records dbstruct params)) (else (debug:print 0 *default-log-port* "ERROR: bad api call " cmd) (conc "ERROR: BAD api call " cmd)))) -;; http-server send-response -;; api:process-request -;; db:* -;; -;; NB// Runs on the server as part of the server loop -;; -(define (api:process-request dbstruct $) ;; the $ is the request vars proc - (debug:print 4 *default-log-port* "server-id:" *server-id*) - (let* ((cmd ($ 'cmd)) - (paramsj ($ 'params)) - (key ($ 'key)) - (params (db:string->obj paramsj transport: 'http))) ;; incoming data from the POST (or is it a GET?) - (debug:print 4 *default-log-port* "cmd:" cmd " with params " params "key " key) - (if (equal? key *server-id*) - (begin - (set! *api-process-request-count* (+ *api-process-request-count* 1)) - (let* ((resdat (api:execute-requests dbstruct (vector cmd params))) ;; process the request, resdat = #( flag result ) - (success (vector-ref resdat 0)) - (res (vector-ref resdat 1))) ;; (vector flag payload), get the payload, ignore the flag (why?) - (debug:print 4 *default-log-port* "res:" res) - (if (not success) - (debug:print 0 *default-log-port* "ERROR: success flag is #f for " cmd " with params " params)) - (if (> *api-process-request-count* *max-api-process-requests*) - (set! *max-api-process-requests* *api-process-request-count*)) - (set! *api-process-request-count* (- *api-process-request-count* 1)) - ;; This can be here but needs controls to ensure it doesn't run more than every 4 seconds - ;; (rmt:dat->json-str - ;; (if (or (string? res) - ;; (list? res) - ;; (number? res) - ;; (boolean? res)) - ;; res - ;; (list "ERROR, not string, list, number or boolean" 1 cmd params res))))) - (db:obj->string res transport: 'http))) - (begin - (debug:print 0 *default-log-port* "Server refused to process request. Server id mismatch. recived " key " expected: " *server-id* ".\nOther arguments recived: cmd=" cmd " params = " params) - (db:obj->string (conc "Server refused to process request server-id mismatch: " key ", " *server-id*) transport: 'http))))) -