Index: api.scm ================================================================== --- api.scm +++ api.scm @@ -212,10 +212,12 @@ ((delete-run) (apply db:delete-run dbstruct params)) ((lock/unlock-run) (apply db:lock/unlock-run dbstruct params)) ((update-run-event_time) (apply db:update-run-event_time dbstruct params)) ((update-run-stats) (apply db:update-run-stats dbstruct params)) ((set-var) (apply db:set-var dbstruct params)) + ((inc-var) (apply db:inc-var dbstruct params)) + ((dec-var) (apply db:dec-var dbstruct params)) ((del-var) (apply db:del-var dbstruct params)) ;; STEPS ((teststep-set-status!) (apply db:teststep-set-status! dbstruct params)) ((delete-steps-for-test!) (apply db:delete-steps-for-test! dbstruct params)) Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -1954,10 +1954,20 @@ (if (string? res) (let ((valnum (string->number res))) (if valnum (set! res valnum)))) res)))) +(define (db:inc-var dbstruct var) + (db:with-db dbstruct #f #t + (lambda (db) + (sqlite3:execute db "UPDATE metadat SET val=val+1 WHERE var=?;" var)))) + +(define (db:dec-var dbstruct var) + (db:with-db dbstruct #f #t + (lambda (db) + (sqlite3:execute db "UPDATE metadat SET val=val-1 WHERE var=?;" var)))) + ;; This was part of db:get-var. It was used to estimate the load on ;; the database files. ;; ;; scale by 10, average with current value. ;; (set! *global-delta* (/ (+ *global-delta* (* (- (current-milliseconds) start-ms) Index: ezsteps.scm ================================================================== --- ezsteps.scm +++ ezsteps.scm @@ -35,12 +35,10 @@ (include "run_records.scm") ;;(rmt:get-test-info-by-id run-id test-id) -> testdat - (setenv "MT_STEP_NAME" stepname) - (define (ezsteps:run-from testdat start-step-name run-one) ;;# TODO - recapture item variables, debug repeated step eval; regen logpro from test (let* ((do-update-test-state-status #f) (test-run-dir ;; (filedb:get-path *fdb* (db:test-get-rundir testdat)) ;; ) @@ -94,10 +92,11 @@ (equal? stepname start-step-name) (and saw-start-step-name (not run-one)) saw-start-step-name-next (and start-step-name (equal? stepname start-step-name)))) ) + (setenv "MT_STEP_NAME" stepname) (set! do-update-test-state-status (and proceed-with-this-step (null? tal))) ;;(BB> "stepname="stepname" proceed-with-this-step="proceed-with-this-step " do-update-test-state-status="do-update-test-state-status " orig-test-state="orig-test-state" orig-test-status="orig-test-status) (cond ((and (not proceed-with-this-step) (null? tal)) 'done) Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -788,11 +788,10 @@ (rmt:send-receive 'set-run-status #f (list run-id run-status msg))) (define (rmt:set-run-state-status run-id state status ) (rmt:send-receive 'set-run-state-status #f (list run-id state status))) - (define (rmt:update-run-event_time run-id) (rmt:send-receive 'update-run-event_time #f (list run-id))) (define (rmt:get-runs-by-patt keys runnamepatt targpatt offset limit fields last-runs-update #!key (sort-order "asc")) ;; fields of #f uses default (rmt:send-receive 'get-runs-by-patt #f (list keys runnamepatt targpatt offset limit fields last-runs-update sort-order))) @@ -811,10 +810,16 @@ (rmt:send-receive 'del-var #f (list varname))) (define (rmt:set-var varname value) (rmt:send-receive 'set-var #f (list varname value))) +(define (rmt:inc-var varname) + (rmt:send-receive 'inc-var #f (list varname))) + +(define (rmt:dec-var varname) + (rmt:send-receive 'dec-var #f (list varname))) + ;;====================================================================== ;; M U L T I R U N Q U E R I E S ;;====================================================================== ;; Need to move this to multi-run section and make associated changes Index: runs.scm ================================================================== --- runs.scm +++ runs.scm @@ -46,16 +46,123 @@ reglen regfull runname max-concurrent-jobs run-id test-patts required-tests test-registry registry-mutex flags keyvals run-info all-tests-registry can-run-more-tests - ((can-run-more-tests-count 0) : fixnum)) + ((can-run-more-tests-count 0) : fixnum) + (last-runners-check 0) ;; time when we last checked number of runners + (last-runners-count #f) ;; + (runner-registered #f) ;; have I registered myself? + (run-skip-count 0) ;; how many times have I skipped running sequentially + ) (defstruct runs:testdat hed tal reg reruns test-record test-name item-path jobgroup waitons testmode newtal itemmaps prereqs-not-met) + +(define (runs:print-parallel-runners-state state num-registered last-registered skip-count) + (debug:print-info 0 *default-log-port* "runs:parallel-runners-mgmt, state=" state + ", num-registered=" num-registered ", last-registered=" last-registered + ", skip-count=" skip-count)) + +(define (runs:print-parallel-runners-state2 state num-registered last-runners-count skip-count) + (debug:print-info 0 *default-log-port* "runs:parallel-runners-mgmt, state=" state + ", num-registered=" num-registered ", last-runners-count=" last-runners-count + ", skip-count=" skip-count)) + + +(define (runs:parallel-runners-mgmt rdat) + (let ((time-to-check 2.8) ;; 28 + (time-to-wait 3.0)) + (if (> (- (current-seconds) (runs:dat-last-runners-check rdat)) time-to-check) ;; time to check + (let* ((num-registered (or (rmt:get-var "num-runners") 0)) + (last-runners-count (runs:dat-last-runners-count rdat)) + (skip-count (runs:dat-run-skip-count rdat))) + (cond + ;; first time in + ((not last-runners-count) + (runs:print-parallel-runners-state2 "A" num-registered last-runners-count skip-count) + (rmt:set-var "num-runners" 1) + (runs:dat-last-runners-count-set! rdat num-registered) + (runs:dat-run-skip-count-set! rdat 0)) + ;; too many waits, decrement num-runners and continue on + ((> (runs:dat-run-skip-count rdat) 3) + (runs:print-parallel-runners-state2 "B" num-registered last-runners-count skip-count) + (rmt:dec-var "num-runners") + (runs:dat-run-skip-count-set! rdat 0)) + ;; too many running, take a break + ((> num-registered last-runners-count) + (runs:print-parallel-runners-state2 "C" num-registered last-runners-count skip-count) + (rmt:dec-var "num-runners") + (debug:print-info 0 *default-log-port* + "Too many running (" num-registered + "), last-count=" last-runners-count " waiting... ") + (thread-sleep! time-to-wait) + (runs:dat-run-skip-count-set! rdat (+ (runs:dat-run-skip-count rdat) 1))) + ;; we have been in waiting mode, do not increment again as we already did that + ((> skip-count 0) + (runs:print-parallel-runners-state2 "D" num-registered last-runners-count skip-count) + (runs:dat-run-skip-count-set! rdat 0) + ;; (runs:dat-last-runners-count-set! rdat num-registered) + ) + ;; skip count is zero, not too many running, this is transition into running + (else + (runs:print-parallel-runners-state2 "E" num-registered last-runners-count skip-count) + (rmt:inc-var "num-runners") + #;(runs:dat-run-skip-count-set! rdat 0))))))) + + +;; (define (runs:parallel-runners-mgmt rdat) +;; (let ((time-to-check 2.8) ;; 28 +;; (time-to-wait 3.0)) +;; (if (> (- (current-seconds) (runs:dat-last-runners-check rdat)) time-to-check) ;; time to check +;; (let* ((num-registered (or (rmt:get-var "num-runners") 0)) +;; (last-registered (or (rmt:get-var "runner-change-time") 0)) +;; (skip-count (runs:dat-run-skip-count rdat))) +;; (cond +;; ;; consider this the beginning of time +;; ((eq? num-registered 0) +;; (runs:print-parallel-runners-state "A" num-registered last-registered skip-count) +;; (rmt:set-var "num-runners" 1) ;; potential bug - not ACID +;; (rmt:set-var "runner-change-time" (current-seconds)) +;; (runs:dat-last-runners-check-set! rdat (current-seconds)) +;; (runs:dat-runner-registered-set! rdat #t) +;; (runs:dat-run-skip-count-set! rdat 0)) +;; ;; have headroom to run another +;; ((< num-registered 3) +;; (runs:print-parallel-runners-state "B" num-registered last-registered skip-count) +;; (rmt:inc-var "num-runners") +;; (rmt:set-var "runner-change-time" (current-seconds)) +;; (runs:dat-last-runners-check-set! rdat (current-seconds)) +;; (runs:dat-run-skip-count-set! rdat 0)) +;; ;; we've waited too many rounds, gonna force a round +;; ((> (runs:dat-run-skip-count rdat) 3) +;; (runs:print-parallel-runners-state "C" num-registered last-registered skip-count) +;; (rmt:set-var "num-runners" 1) +;; ;; (rmt:set-var "runner-change-time" (current-seconds)) +;; (runs:dat-last-runners-check-set! rdat (current-seconds)) +;; (runs:dat-run-skip-count-set! rdat 0)) +;; ;; have too many runners working, but this is the first time to wait since doing some work +;; ((eq? (runs:dat-run-skip-count rdat) 0) ;; and num-registered is >= 3 +;; (runs:print-parallel-runners-state "D" num-registered last-registered skip-count) +;; (if (not (eq? (runs:dat-last-runners-check rdat) 0)) ;; do not decrement if we've never incremented +;; (begin +;; (rmt:dec-var "num-runners") +;; #;(rmt:set-var "runner-change-time" (current-seconds)))) +;; (runs:dat-last-runners-check-set! rdat (current-seconds)) +;; (runs:dat-run-skip-count-set! rdat (+ (runs:dat-run-skip-count rdat) 1)) +;; (debug:print-info 0 *default-log-port* "Too many runners working (" num-registered +;; "). Resting for 30 seconds.") +;; (thread-sleep! time-to-wait) +;; (runs:parallel-runners-mgmt rdat)) +;; ;; ok, keep waiting +;; (else +;; (runs:print-parallel-runners-state "E" num-registered last-registered skip-count) +;; (thread-sleep! time-to-wait) +;; (runs:dat-run-skip-count-set! rdat (+ (runs:dat-run-skip-count rdat) 1)) +;; (runs:parallel-runners-mgmt rdat))))))) (define (runs:get-mt-env-alist run-id runname target testname itempath) ;;(bb-check-path msg: "runs:set-megatest-env-vars entry") `(("MT_TEST_NAME" . ,testname) @@ -1546,10 +1653,12 @@ "\n reruns: " reruns "\n regfull: " regfull "\n reglen: " reglen "\n length reg: " (length reg) ) + + (runs:parallel-runners-mgmt runsdat) ;; check for hed in waitons => this would be circular, remove it and issue an ;; error (if (member test-name waitons) (begin