@@ -64,13 +64,14 @@ ;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id)) (if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id) (client:setup run-id) #f)))) +(define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id (define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected ;; clean out old connections - (mutex-lock! *db-multi-sync-mutex*) + ;; (mutex-lock! *db-multi-sync-mutex*) ;; (let ((expire-time (- (current-seconds) 60))) ;; (for-each ;; (lambda (run-id) ;; (let ((connection (hash-table-ref/default *runremote* run-id #f))) ;; (if (and connection @@ -78,11 +79,12 @@ ;; (begin ;; (debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses") ;; ;; SHOULD CLOSE THE CONNECTION HERE ;; (hash-table-delete! *runremote* run-id))))) ;; (hash-table-keys *runremote*))) - (mutex-unlock! *db-multi-sync-mutex*) + ;; (mutex-unlock! *db-multi-sync-mutex*) + ;; (mutex-lock! *send-receive-mutex*) (let* ((run-id (if rid rid 0)) (connection-info (rmt:get-connection-info run-id))) ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) (if connection-info ;; use the server if have connection info @@ -94,20 +96,23 @@ (else (exit)))) (success (if (and dat (vector? dat)) (vector-ref dat 0) #f)) (res (if (and dat (vector? dat)) (vector-ref dat 1) #f))) (http-transport:server-dat-update-last-access connection-info) (if success - (case *transport-type* - ((http) res) ;; (db:string->obj res)) - ((nmsg) res)) ;; (vector-ref res 1))) + (begin + ;; (mutex-unlock! *send-receive-mutex*) + (case *transport-type* + ((http) res) ;; (db:string->obj res)) + ((nmsg) res))) ;; (vector-ref res 1))) (begin ;; let ((new-connection-info (client:setup run-id))) (debug:print 0 "WARNING: Communication failed, trying call to rmt:send-receive again.") ;; (case *transport-type* ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info)))) (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection (if (eq? (modulo attemptnum 5) 0) (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) + ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) ;; no longer killing the server in http-transport:client-api-send-receive ;; may kill it here but what are the criteria? @@ -118,16 +123,18 @@ ;; no connection info? try to start a server (if (and (< attemptnum 15) (tasks:need-server run-id)) (begin (hash-table-delete! *runremote* run-id) + ;; (mutex-unlock! *send-receive-mutex*) (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10) (client:setup run-id) (thread-sleep! (random 5)) ;; give some time to settle and minimize collison? (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))) (begin (debug:print 0 "ERROR: Communication failed!") + ;; (mutex-unlock! *send-receive-mutex*) (exit) ;; (rmt:open-qry-close-locally cmd run-id params)))) ))))) (define (rmt:update-db-stats run-id rawcmd params duration) @@ -335,16 +342,40 @@ '()))) ;; IDEA: Threadify these - they spend a lot of time waiting ... ;; (define (rmt:get-tests-for-runs-mindata run-ids testpatt states status not-in) - (let ((run-id-list (if run-ids + (let ((multi-run-mutex (make-mutex)) + (run-id-list (if run-ids run-ids - (rmt:get-all-run-ids)))) - (apply append (map (lambda (run-id) - (rmt:send-receive 'get-tests-for-run-mindata run-id (list run-id testpatt states status not-in))) - run-id-list)))) + (rmt:get-all-run-ids))) + (result '())) + (if (null? run-id-list) + '() + (for-each + (lambda (th) + (thread-join! th)) ;; I assume that joining completed threads just moves on + (let loop ((hed (car run-id-list)) + (tal (cdr run-id-list)) + (threads '())) + (let* ((newthread (make-thread + (lambda () + (let ((res (rmt:send-receive 'get-tests-for-run-mindata hed (list hed testpatt states status not-in)))) + (if (list? res) + (begin + (mutex-lock! multi-run-mutex) + (set! result (append result res)) + (mutex-unlock! multi-run-mutex)) + (debug:print 0 "ERROR: get-tests-for-run-mindata failed for run-id " hed ", testpatt " testpatt ", states " states ", status " status ", not-in " not-in)))) + (conc "multi-run-thread for run-id " hed))) + (newthreads (cons newthread threads))) + (thread-start! newthread) + (thread-sleep! 0.5) ;; give that thread some time to start + (if (null? tal) + newthreads + (loop (car tal)(cdr tal) newthreads)))))) + result)) (define (rmt:delete-test-records run-id test-id) (rmt:send-receive 'delete-test-records run-id (list run-id test-id))) ;; This is not needed as test steps are deleted on test delete call