Index: api.scm ================================================================== --- api.scm +++ api.scm @@ -184,19 +184,107 @@ (rmt:print-db-stats) (set! *api:last-stats-print* (current-seconds)))) (mutex-unlock! *api-print-db-stats-mutex*) (thread-sleep! 5) (loop))) + +(define *api:queue-mutex* (make-mutex)) +(define *api:in-queue* '()) +(define *api:out-queue* '()) + +(define (api:start-queue-processor) + ;; look for work in in-queue + ;; process it + ;; put result in out-queue + ;; sleep 20ms + + #t) + +(defstuct api:queue-item + (proc #f) + (cmd #f) + (params #f) + (start-time (current-seconds)) + (end-time #f) + (id #f)) + +(define (api:add-queue-item proc cmd params) + #f) (define (api:tcp-dispatch-request-make-handler-new dbstruct) ;; cmd run-id params) ;; put proc into in-queue ;; poll out-queue for result evey 25ms ;; time out with big message - #f - ) + (assert *toppath* "FATAL: api:tcp-dispatch-request-make-handler called but *toppath* not set.") + (if (not *server-signature*) + (set! *server-signature* (tt:mk-signature *toppath*))) + (lambda (indat) + (let* ((result + (let* ((numthreads (api:get-count-threads-alive)) + (delay-wait (if (> numthreads 10) + (- numthreads 10) + 0)) + (normal-proc (lambda (cmd run-id params) + (case cmd + ((ping) *server-signature*) + (else + (api:dispatch-request dbstruct cmd run-id params)))))) + (set! *api-process-request-count* numthreads) + (set! *db-last-access* (current-seconds)) +;; (if (not (eq? numthreads numthreads)) +;; (begin +;; (api:remove-dead-or-terminated) +;; (let ((threads-now (api:get-count-threads-alive))) +;; (debug:print 0 *default-log-port* "WARNING: numthreads="numthreads", numthreads="numthreads", remaining="threads-now) +;; (set! numthreads threads-now)))) + (match indat + ((cmd run-id params meta) + (let* ((start-t (current-milliseconds)) + (db-ok (let* ((dbfname (dbmod:run-id->dbfname run-id)) + (ok (equal? dbfname (dbr:dbstruct-dbfname dbstruct)))) + (case cmd + ((ping) #t) ;; we are fine + (else + (assert ok "FATAL: database file and run-id not aligned."))))) + (ttdat *server-info*) + (server-state (tt-state ttdat)) + (maxthreads 20) ;; make this a parameter? + (status (cond + ((and (> numthreads maxthreads) + (> (random 100) 70)) ;; allow a 30% probability to go through so we can figure out what is going wrong in main.db server. + 'busy) + ;; ((> numthreads 5) 'loaded) ;; this gets transmitted to the client which calls tt:backoff-incr to slow stuff down. + (else 'ok))) + (errmsg (case status + ((busy) (conc "Server overloaded, "numthreads" threads in flight")) + ((loaded) (conc "Server loaded, "numthreads" threads in flight")) + (else #f))) + (result (case status + ((busy) + (if (eq? cmd 'ping) + (normal-proc cmd run-id params) + ;; numthreads must be greater than 5 for busy + (* 0.1 (- numthreads maxthreads)) ;; was 15 - return a number for the remote to delay + )) ;; (- numthreads 29)) ;; call back in as many seconds + ((loaded) + ;; (if (eq? (rmt:transport-mode) 'tcp) + ;; (thread-sleep! 0.5)) + (normal-proc cmd run-id params)) + (else + (normal-proc cmd run-id params)))) + (meta (case cmd + ((ping) `((sstate . ,server-state))) + (else `((wait . ,delay-wait))))) + (payload (list status errmsg result meta))) + ;; (cmd run-id params meta) + (db:add-stats cmd run-id params (- (current-milliseconds) start-t)) + payload)) + (else + (assert #f "FATAL: failed to deserialize indat "indat)))))) + result))) (define api:tcp-dispatch-request-make-handler api:tcp-dispatch-request-make-handler-old) ;; indat is (cmd run-id params meta) ;;