@@ -13,10 +13,11 @@ (declare (unit rmt)) (declare (uses api)) (declare (uses tdb)) (declare (uses http-transport)) +(declare (uses nmsg-transport)) ;; ;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!! ;; @@ -32,11 +33,18 @@ ;;====================================================================== ;; S U P P O R T F U N C T I O N S ;;====================================================================== -;; #t means - please start a server! +(define (rmt:call-transport run-id connection-info cmd jparams) + (case (server:get-transport) + ((rpc) ( rpc-transport:client-api-send-receive run-id connection-info cmd jparams)) + ((http) (http-transport:client-api-send-receive run-id connection-info cmd jparams)) + ((fs) ( fs-transport:client-api-send-receive run-id connection-info cmd jparams)) + ((zmq) (zmq-transport:client-api-send-receive run-id connection-info cmd jparams)) + (else ( rpc-transport:client-api-send-receive run-id connection-info cmd jparams)))) + ;; (define (rmt:write-frequency-over-limit? cmd run-id) (and (not (member cmd api:read-only-queries)) (let* ((tmprec (hash-table-ref/default *write-frequency* run-id #f)) (record (if tmprec tmprec @@ -63,63 +71,86 @@ ;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id)) (if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id) (client:setup run-id) #f)))) -;; cmd is a symbol -;; vars is a json string encoding the parameters for the call -;; -(define (rmt:send-receive cmd rid params #!key (attemptnum 0)) +(define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id +(define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected ;; clean out old connections (mutex-lock! *db-multi-sync-mutex*) - (let ((expire-time (- (current-seconds) 60))) + (let ((expire-time (- (current-seconds) (server:get-timeout) 10))) ;; don't forget the 10 second margin (for-each (lambda (run-id) (let ((connection (hash-table-ref/default *runremote* run-id #f))) - (if (and connection - (< (http-transport:server-dat-get-last-access connection) expire-time)) - (begin - (debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses") - (hash-table-delete! *runremote* run-id))))) + (if (and connection + (< (http-transport:server-dat-get-last-access connection) expire-time)) + (begin + (debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses") + ;; SHOULD CLOSE THE CONNECTION HERE + (case *transport-type* + ((nmsg)(nn-close (http-transport:server-dat-get-socket + (hash-table-ref *runremote* run-id))))) + (hash-table-delete! *runremote* run-id))))) (hash-table-keys *runremote*))) (mutex-unlock! *db-multi-sync-mutex*) + ;; (mutex-lock! *send-receive-mutex*) (let* ((run-id (if rid rid 0)) - (connection-info (rmt:get-connection-info run-id)) - (jparams (db:obj->string params))) + (connection-info (rmt:get-connection-info run-id))) + ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) (if connection-info ;; use the server if have connection info - (let* ((dat (http-transport:client-api-send-receive run-id connection-info cmd jparams)) - (res (if (and dat (vector? dat)) (vector-ref dat 1) #f)) - (success (if (and dat (vector? dat)) (vector-ref dat 0) #f))) + (let* ((dat (case *transport-type* + ((http)(condition-case + (http-transport:client-api-send-receive run-id connection-info cmd params) + ((commfail)(vector #f "communications fail")))) + ((nmsg)(condition-case + (nmsg-transport:client-api-send-receive run-id connection-info cmd params) + ((timeout)(vector #f "timeout talking to server")))) + (else (exit)))) + (success (if (and dat (vector? dat)) (vector-ref dat 0) #f)) + (res (if (and dat (vector? dat)) (vector-ref dat 1) #f))) (http-transport:server-dat-update-last-access connection-info) (if success - (db:string->obj res) - ;; (if (< attemptnum 100) - ;; (begin - ;; (hash-table-delete! *runremote* run-id) - ;; (thread-sleep! 0.5) - ;; (rmt:send-receive cmd rid params attempnum: (+ attemptnum 1))) - ;; (begin - ;; (print-call-chain (current-error-port)) - ;; (debug:print 0 "ERROR: too many attempts to communicate have failed. Giving up. Kill your mtest processes and start over") - ;; (exit 1))))) + (begin + ;; (mutex-unlock! *send-receive-mutex*) + (case *transport-type* + ((http) res) ;; (db:string->obj res)) + ((nmsg) res))) ;; (vector-ref res 1))) (begin ;; let ((new-connection-info (client:setup run-id))) - (debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.") + (debug:print 0 "WARNING: Communication failed, trying call to rmt:send-receive again.") + ;; (case *transport-type* + ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info)))) (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection + ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. + ;; (if (eq? (modulo attemptnum 5) 0) + ;; (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) + ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications + (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) + ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) ;; no longer killing the server in http-transport:client-api-send-receive ;; may kill it here but what are the criteria? ;; start with three calls then kill server - (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) - (thread-sleep! 2) + ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) + ;; (thread-sleep! 2) (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1))))) - (if (and (< attemptnum 10) - (tasks:need-server run-id)) + ;; no connection info? try to start a server + (if (and (< attemptnum 15) + (member cmd api:write-queries)) (begin + (hash-table-delete! *runremote* run-id) + ;; (mutex-unlock! *send-receive-mutex*) (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10) - (rmt:send-receive cmd rid params (+ attemptnum 1))) - (rmt:open-qry-close-locally cmd run-id params))))) + ;; (client:setup run-id) ;; client setup happens in rmt:get-connection-info + (thread-sleep! (random 5)) ;; give some time to settle and minimize collison? + (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))) + (begin + ;; (debug:print 0 "ERROR: Communication failed!") + ;; (mutex-unlock! *send-receive-mutex*) + ;; (exit) + (rmt:open-qry-close-locally cmd run-id params) + ))))) (define (rmt:update-db-stats run-id rawcmd params duration) (mutex-lock! *db-stats-mutex*) (handle-exceptions exn @@ -184,11 +215,12 @@ (set! *dbstruct-db* db) db))) (db-file-path (db:dbfile-path 0))) ;; (read-only (not (file-read-access? db-file-path))) (let* ((start (current-milliseconds)) - (res (api:execute-requests dbstruct-local (symbol->string cmd) params)) + (resdat (api:execute-requests dbstruct-local (vector (symbol->string cmd) params))) + (res (vector-ref resdat 1)) (duration (- (current-milliseconds) start))) (rmt:update-db-stats run-id cmd params duration) ;; mark this run as dirty if this was a write (if (not (member cmd api:read-only-queries)) (let ((start-time (current-seconds))) @@ -199,17 +231,19 @@ (mutex-unlock! *db-multi-sync-mutex*))) res))) (define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params) (let* ((run-id (if run-id run-id 0)) - (jparams (db:obj->string params)) ;; (rmt:dat->json-str params)) - (dat (http-transport:client-api-send-receive run-id connection-info cmd jparams))) - (if (and dat (vector-ref dat 0)) - (db:string->obj (vector-ref dat 1)) - (begin - (debug:print 0 "ERROR: rmt:send-receive-no-auto-client-setup failed, attempting to continue. Got " dat) - dat)))) + ;; (jparams (db:obj->string params)) ;; (rmt:dat->json-str params)) + (res (http-transport:client-api-send-receive run-id connection-info cmd params))) + (if (and res (vector-ref res 0)) + res + #f))) +;; (db:string->obj (vector-ref dat 1)) +;; (begin +;; (debug:print 0 "ERROR: rmt:send-receive-no-auto-client-setup failed, attempting to continue. Got " dat) +;; dat)))) ;; Wrap json library for strings (why the ports crap in the first place?) (define (rmt:dat->json-str dat) (with-output-to-string (lambda () @@ -242,14 +276,17 @@ (define (rmt:login run-id) (rmt:send-receive 'login run-id (list *toppath* megatest-version run-id *my-client-signature*))) ;; This login does no retries under the hood - it acts a bit like a ping. +;; Deprecated for nmsg-transport. ;; (define (rmt:login-no-auto-client-setup connection-info run-id) - (rmt:send-receive-no-auto-client-setup connection-info 'login run-id (list *toppath* megatest-version run-id *my-client-signature*))) - + (case *transport-type* + ((http)(rmt:send-receive-no-auto-client-setup connection-info 'login run-id (list *toppath* megatest-version run-id *my-client-signature*))) + ((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info 'login (list *toppath* megatest-version run-id *my-client-signature*))))) + ;; hand off a call to one of the db:queries statements ;; added run-id to make looking up the correct db possible ;; (define (rmt:general-call stmtname run-id . params) (rmt:send-receive 'general-call run-id (append (list stmtname run-id) params))) @@ -325,33 +362,31 @@ run-ids (rmt:get-all-run-ids))) (result '())) (if (null? run-id-list) '() - (for-each - (lambda (th) - - (thread-join! th)) ;; I assume that joining completed threads just moves on - (let loop ((hed (car run-id-list)) - (tal (cdr run-id-list)) - (threads '())) - (let* ((newthread (make-thread - (lambda () - (let ((res (rmt:send-receive 'get-tests-for-run-mindata hed (list hed testpatt states status not-in)))) - (if (list? res) - (begin - (mutex-lock! multi-run-mutex) - (set! result (append result res)) - (mutex-unlock! multi-run-mutex)) - (debug:print 0 "ERROR: get-tests-for-run-mindata failed for run-id " hed ", testpatt " testpatt ", states " states ", status " status ", not-in " not-in)))) - (conc "multi-run-thread for run-id " hed))) - (newthreads (cons newthread threads))) - (thread-start! newthread) - (thread-sleep! 0.5) ;; give that thread some time to start - (if (null? tal) - newthreads - (loop (car tal)(cdr tal) newthreads)))))) + (let loop ((hed (car run-id-list)) + (tal (cdr run-id-list)) + (threads '())) + (if (> (length threads) 5) + (loop hed tal (filter (lambda (th)(not (member (thread-state th) '(terminated dead)))) threads)) + (let* ((newthread (make-thread + (lambda () + (let ((res (rmt:send-receive 'get-tests-for-run-mindata hed (list hed testpatt states status not-in)))) + (if (list? res) + (begin + (mutex-lock! multi-run-mutex) + (set! result (append result res)) + (mutex-unlock! multi-run-mutex)) + (debug:print 0 "ERROR: get-tests-for-run-mindata failed for run-id " hed ", testpatt " testpatt ", states " states ", status " status ", not-in " not-in)))) + (conc "multi-run-thread for run-id " hed))) + (newthreads (cons newthread threads))) + (thread-start! newthread) + (thread-sleep! 0.5) ;; give that thread some time to start + (if (null? tal) + newthreads + (loop (car tal)(cdr tal) newthreads)))))) result)) ;; ;; IDEA: Threadify these - they spend a lot of time waiting ... ;; ;; ;; (define (rmt:get-tests-for-runs-mindata run-ids testpatt states status not-in) @@ -482,11 +517,12 @@ (define (rmt:get-runs-by-patt keys runnamepatt targpatt offset limit) (rmt:send-receive 'get-runs-by-patt #f (list keys runnamepatt targpatt offset limit))) (define (rmt:find-and-mark-incomplete run-id ovr-deadtime) - (rmt:send-receive 'find-and-mark-incomplete run-id (list run-id ovr-deadtime))) + (if (rmt:send-receive 'have-incompletes? run-id (list run-id ovr-deadtime)) + (rmt:send-receive 'mark-incomplete run-id (list run-id ovr-deadtime)))) ;;====================================================================== ;; M U L T I R U N Q U E R I E S ;;====================================================================== @@ -573,5 +609,18 @@ (define (rmt:test-data-rollup run-id test-id status) (rmt:send-receive 'test-data-rollup run-id (list run-id test-id status))) (define (rmt:csv->test-data run-id test-id csvdata) (rmt:send-receive 'csv->test-data run-id (list run-id test-id csvdata))) + +;;====================================================================== +;; T A S K S +;;====================================================================== + +(define (rmt:tasks-find-task-queue-records target run-name test-patt state-patt action-patt) + (rmt:send-receive 'find-task-queue-records #f (list target run-name test-patt state-patt action-patt))) + +(define (rmt:tasks-add action owner target runname testpatt params) + (rmt:send-receive 'tasks-add #f (list action owner target runname testpatt params))) + +(define (rmt:tasks-set-state-given-param-key param-key new-state) + (rmt:send-receive 'tasks-set-state-given-param-key #f (list param-key new-state)))