Index: api.scm ================================================================== --- api.scm +++ api.scm @@ -215,10 +215,11 @@ ((update-run-stats) (apply db:update-run-stats dbstruct params)) ((set-var) (apply db:set-var dbstruct params)) ((inc-var) (apply db:inc-var dbstruct params)) ((dec-var) (apply db:dec-var dbstruct params)) ((del-var) (apply db:del-var dbstruct params)) + ((add-var) (apply db:add-var dbstruct params)) ;; STEPS ((teststep-set-status!) (apply db:teststep-set-status! dbstruct params)) ((delete-steps-for-test!) (apply db:delete-steps-for-test! dbstruct params)) Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -1981,10 +1981,15 @@ (define (db:set-var dbstruct var val) (db:with-db dbstruct #f #t (lambda (db) (sqlite3:execute db "INSERT OR REPLACE INTO metadat (var,val) VALUES (?,?);" var val)))) +(define (db:add-var dbstruct var val) + (db:with-db dbstruct #f #t + (lambda (db) + (sqlite3:execute db "UPDATE metadat SET val=val+? WHERE var=?;" val var)))) + (define (db:del-var dbstruct var) (db:with-db dbstruct #f #t (lambda (db) (sqlite3:execute db "DELETE FROM metadat WHERE var=?;" var)))) Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -816,10 +816,13 @@ (rmt:send-receive 'inc-var #f (list varname))) (define (rmt:dec-var varname) (rmt:send-receive 'dec-var #f (list varname))) +(define (rmt:add-var varname value) + (rmt:send-receive 'add-var #f (list varname value))) + ;;====================================================================== ;; M U L T I R U N Q U E R I E S ;;====================================================================== ;; Need to move this to multi-run section and make associated changes Index: runs.scm ================================================================== --- runs.scm +++ runs.scm @@ -50,11 +50,12 @@ can-run-more-tests ((can-run-more-tests-count 0) : fixnum) (last-runners-check 0) ;; time when we last checked number of runners (last-runners-count #f) ;; (runner-registered #f) ;; have I registered myself? - (run-skip-count 0) ;; how many times have I skipped running sequentially + (run-skip-count 0) ;; how many times have I skipped running sequentially + (runners-mgmt-mode 'rest-mode) ) (defstruct runs:testdat hed tal reg reruns test-record test-name item-path jobgroup @@ -68,12 +69,13 @@ (define (runs:print-parallel-runners-state2 state num-registered last-runners-count skip-count) (debug:print-info 0 *default-log-port* "runs:parallel-runners-mgmt, state=" state ", num-registered=" num-registered ", last-runners-count=" last-runners-count ", skip-count=" skip-count)) - -(define (runs:parallel-runners-mgmt rdat) +;; Second try +;; +(define (runs:parallel-runners-mgmt-2 rdat) (let ((time-to-check 2.8) ;; 28 (time-to-wait 3.0)) (if (> (- (current-seconds) (runs:dat-last-runners-check rdat)) time-to-check) ;; time to check (let* ((num-registered (or (rmt:get-var "num-runners") 0)) (last-runners-count (runs:dat-last-runners-count rdat)) @@ -80,39 +82,132 @@ (skip-count (runs:dat-run-skip-count rdat))) (cond ;; first time in ((not last-runners-count) (runs:print-parallel-runners-state2 "A" num-registered last-runners-count skip-count) - (rmt:set-var "num-runners" 1) + (if (eq? num-registered 0) + (rmt:set-var "num-runners" 1) + (rmt:inc-var "num-runners")) (runs:dat-last-runners-count-set! rdat num-registered) (runs:dat-run-skip-count-set! rdat 0)) - ;; too many waits, decrement num-runners and continue on + ;; too many waits, decrement num-runners, reset last-runners and continue on ((> (runs:dat-run-skip-count rdat) 3) (runs:print-parallel-runners-state2 "B" num-registered last-runners-count skip-count) (rmt:dec-var "num-runners") - (runs:dat-run-skip-count-set! rdat 0)) + (runs:dat-run-skip-count-set! rdat 0) + (runs:dat-last-runners-count-set! rdat num-registered)) ;; too many running, take a break - ((> num-registered last-runners-count) + ((> num-registered last-runners-count) ;; (+ last-runners-count 1)) (runs:print-parallel-runners-state2 "C" num-registered last-runners-count skip-count) (rmt:dec-var "num-runners") (debug:print-info 0 *default-log-port* "Too many running (" num-registered - "), last-count=" last-runners-count " waiting... ") + "), last-count=" last-runners-count " waiting " time-to-wait " seconds ... ") (thread-sleep! time-to-wait) - (runs:dat-run-skip-count-set! rdat (+ (runs:dat-run-skip-count rdat) 1))) + (runs:dat-run-skip-count-set! rdat (+ (runs:dat-run-skip-count rdat) 1)) + ;; adjust down last-runners-count + (if (< num-registered last-runners-count) + (runs:dat-last-runners-set! rdat num-running)) + (rmt:inc-var "num-runners") + ) ;; we have been in waiting mode, do not increment again as we already did that ((> skip-count 0) (runs:print-parallel-runners-state2 "D" num-registered last-runners-count skip-count) (runs:dat-run-skip-count-set! rdat 0) ;; (runs:dat-last-runners-count-set! rdat num-registered) ) ;; skip count is zero, not too many running, this is transition into running (else (runs:print-parallel-runners-state2 "E" num-registered last-runners-count skip-count) - (rmt:inc-var "num-runners") + ;; (rmt:inc-var "num-runners") #;(runs:dat-run-skip-count-set! rdat 0))))))) +;; Third try, use a running average +;; +;; ADD A COUNT OF TIMES CYCLED THROUGH REST MODE +;; +;; runners-mgmt-mode +;; +(define (runs:parallel-runners-mgmt-3 rdat) + (let ((time-to-check 2.8) ;; 28 + (time-to-wait 3.0)) + (if (> (- (current-seconds) (runs:dat-last-runners-check rdat)) time-to-check) ;; time to check + (let* ((skip-count (runs:dat-run-skip-count rdat)) + (mgmt-mode (runs:dat-runners-mgmt-mode rdat)) ;; + (num-registered (rmt:get-var "num-runners")) + (last-runners-count (if (runs:dat-last-runners-count rdat) + (runs:dat-last-runners-count rdat) + (or num-registered 1))) + (last-runners-ravg (/ (+ last-runners-count num-registered) 2)#;(if (> num-registered last-runners-count) + (/ (+ last-runners-count num-registered) 2) + (/ (+ (* num-registered 4) last-runners-count) 5) ;; slow on down + )) ;; running average + ) + ;; initialize and sanitize values if needed + (cond + ((not num-registered) ;; first in, initialize to 1 + (debug:print-info 0 *default-log-port* " adjusting num-runners up to 1, currently it is not defined") + (rmt:set-var "num-runners" 1)) + ((< num-registered 1) ;; this should not be, reset to 1 to make it less confusing + (debug:print-info 0 *default-log-port* " adjusting num-runners up to 1, currently it is " num-registered) + (rmt:set-var "num-runners" 1))) + (if (not (member mgmt-mode '(rest-mode work-mode))) + (begin + (debug:print-info 0 *default-log-port* " setting mgmt-mode to work-mode, currently it is " mgmt-mode) + (rmt:inc-var "num-runners") + (set! last-runners-ravg (+ last-runners-ravg 1)) + (runs:dat-runners-mgmt-mode-set! rdat 'rest-mode))) + + (runs:dat-last-runners-count-set! rdat last-runners-ravg) + ;; to rest or not rest? + (if (and (< skip-count 5) + (> num-registered last-runners-count)) ;;(+ last-runners-ravg 0.5))) ;; there seem to be other runners out there + (begin ;; gonna rest + (debug:print-info 0 *default-log-port* "Too many running, num-registered=" num-registered ", ravg=" last-runners-ravg + ", real num runners=" (rmt:get-var "num-runners") ", skip-count=" skip-count) + (if (eq? mgmt-mode 'work-mode) + (rmt:dec-var "num-runners")) + (runs:dat-runners-mgmt-mode-set! rdat 'rest-mode) + (runs:dat-run-skip-count-set! rdat (+ (runs:dat-run-skip-count rdat) 1)) + (thread-sleep! time-to-wait) + (runs:parallel-runners-mgmt-3 rdat) + ) + (begin + (runs:dat-run-skip-count-set! rdat 0) + (if (eq? mgmt-mode 'rest-mode) + (rmt:inc-var "num-runners")) ;; going into work mode if not already in work mode + (runs:dat-runners-mgmt-mode-set! rdat 'work-mode) + (debug:print-info 0 *default-log-port* "All good, keep running, num-registered=" + num-registered ", ravg=" last-runners-ravg ", mode=" mgmt-mode + ", skip-count=" skip-count)) + ))))) + + +;; To test parallel-runners management start a repl: +;; megatest -repl +;; then run: +;; (runs:test-parallel-runners 60) +;; +(define (runs:test-parallel-runners duration #!optional (proc #f)) + (let* ((rdat (make-runs:dat)) + (rtime 0) + (startt (current-seconds)) + (endt (+ startt duration))) + ((or proc runs:parallel-runners-mgmt-3) rdat) + (let loop () + (let* ((wstart (current-seconds))) + (if (< wstart endt) + (let* ((work-time (random 10))) + #;(debug:print-info 0 *default-log-port* "working for " work-time + " seconds. Total work: " rtime ", elapsed time: " (- wstart startt)) + (thread-sleep! work-time) + (set! rtime (+ rtime work-time)) + ((or proc runs:parallel-runners-mgmt-3) rdat) + (loop))))) + (let* ((done-time (current-seconds))) + (debug:print-info 0 *default-log-port* "DONE: rtime=" rtime ", elapsed time=" (- done-time startt) + ", ratio=" (/ rtime (- done-time startt)))))) ;; (define (runs:parallel-runners-mgmt rdat) ;; (let ((time-to-check 2.8) ;; 28 ;; (time-to-wait 3.0)) ;; (if (> (- (current-seconds) (runs:dat-last-runners-check rdat)) time-to-check) ;; time to check