Index: tasksmod.scm ================================================================== --- tasksmod.scm +++ tasksmod.scm @@ -28,18 +28,19 @@ (import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable) (import commonmod) ;; (use (prefix ulex ulex:)) (include "common_records.scm") +(include "task_records.scm") ;;====================================================================== ;; Tasks db ;;====================================================================== ;; wait up to aprox n seconds for a journal to go away ;; -(define (tasks:wait-on-journal path n #!key (remove #f)(waiting-msg #f)) +#;(define (tasks:wait-on-journal path n #!key (remove #f)(waiting-msg #f)) (if (not (string? path)) (debug:print-error 0 *default-log-port* "Called tasks:wait-on-journal with path=" path " (not a string)") (let ((fullpath (conc path "-journal"))) (handle-exceptions exn @@ -65,11 +66,11 @@ (debug:print 0 *default-log-port* "ERROR: removing the journal file " fullpath ", this is not good. Look for disk full, write access and other issues.") (if remove (system (conc "rm -rf " fullpath))) #f))) #t)))))) -(define (tasks:get-task-db-path) +#;(define (tasks:get-task-db-path) (let ((dbdir (or (configf:lookup *configdat* "setup" "monitordir") (configf:lookup *configdat* "setup" "dbdir") (conc (common:get-linktree) "/.db")))) (handle-exceptions exn @@ -86,11 +87,12 @@ ;; file NOT readable ;; ==> open in-mem version ;; If file NOT exists ;; ==> open in-mem version ;; -(define (tasks:open-db #!key (numretries 4)) +#;(define (tasks:open-db alldat #!key (numretries 4)) + (if *task-db* *task-db* (handle-exceptions exn (if (> numretries 0) @@ -183,16 +185,16 @@ (define (tasks:hostinfo-get-pubport vec) (vector-ref vec 3)) (define (tasks:hostinfo-get-transport vec) (vector-ref vec 4)) (define (tasks:hostinfo-get-pid vec) (vector-ref vec 5)) (define (tasks:hostinfo-get-hostname vec) (vector-ref vec 6)) -(define (tasks:need-server run-id) +#;(define (tasks:need-server run-id) (equal? (configf:lookup *configdat* "server" "required") "yes")) ;; no elegance here ... ;; -(define (tasks:kill-server hostname pid #!key (kill-switch "")) +#;(define (tasks:kill-server hostname pid #!key (kill-switch "")) (debug:print-info 0 *default-log-port* "Attempting to kill server process " pid " on host " hostname) (setenv "TARGETHOST" hostname) (let* ((logdir (if (directory-exists? "logs") "logs/" "")) @@ -213,26 +215,26 @@ ;;====================================================================== ;; M O N I T O R S ;;====================================================================== -(define (tasks:remove-monitor-record mdb) +#;(define (tasks:remove-monitor-record mdb) (sqlite3:execute mdb "DELETE FROM monitors WHERE pid=? AND hostname=?;" (current-process-id) (get-host-name))) -(define (tasks:get-monitors mdb) +#;(define (tasks:get-monitors mdb) (let ((res '())) (sqlite3:for-each-row (lambda (a . rem) (set! res (cons (apply vector a rem) res))) mdb "SELECT id,pid,strftime('%m/%d/%Y %H:%M',datetime(start_time,'unixepoch'),'localtime'),strftime('%m/%d/%Y %H:%M:%S',datetime(last_update,'unixepoch'),'localtime'),hostname,username FROM monitors ORDER BY last_update ASC;") (reverse res) )) -(define (tasks:monitors->text-table monitors) +#;(define (tasks:monitors->text-table monitors) (let ((fmtstr "~4a~8a~20a~20a~10a~10a")) (conc (format #f fmtstr "id" "pid" "start time" "last update" "hostname" "user") "\n" (string-intersperse (map (lambda (monitor) (format #f fmtstr @@ -245,11 +247,11 @@ monitors) "\n")))) ;; update the last_update field with the current time and ;; if any monitors appear dead, remove them -(define (tasks:monitors-update mdb) +#;(define (tasks:monitors-update mdb) (sqlite3:execute mdb "UPDATE monitors SET last_update=strftime('%s','now') WHERE pid=? AND hostname=?;" (current-process-id) (get-host-name)) (let ((deadlist '())) (sqlite3:for-each-row @@ -258,20 +260,20 @@ (set! deadlist (cons id deadlist))) mdb "SELECT id,pid,hostname,last_update,strftime('%s','now')-last_update AS delta FROM monitors WHERE delta > 700;") (sqlite3:execute mdb (conc "DELETE FROM monitors WHERE id IN ('" (string-intersperse (map conc deadlist) "','") "');"))) ) -(define (tasks:register-monitor db port) +#;(define (tasks:register-monitor db port) (let* ((pid (current-process-id)) (hostname (get-host-name)) (userinfo (user-information (current-user-id))) (username (car userinfo))) (print "Register monitor, pid: " pid ", hostname: " hostname ", port: " port ", username: " username) (sqlite3:execute db "INSERT INTO monitors (pid,start_time,last_update,hostname,username) VALUES (?,strftime('%s','now'),strftime('%s','now'),?,?);" pid hostname username))) -(define (tasks:get-num-alive-monitors mdb) +#;(define (tasks:get-num-alive-monitors mdb) (let ((res 0)) (sqlite3:for-each-row (lambda (count) (set! res count)) mdb @@ -278,11 +280,11 @@ "SELECT count(id) FROM monitors WHERE last_update < (strftime('%s','now') - 300) AND username=?;" (car (user-information (current-user-id)))) res)) ;; -(define (tasks:start-monitor db mdb) +#;(define (tasks:start-monitor db mdb) (if (> (tasks:get-num-alive-monitors mdb) 2) ;; have two running, no need for more (debug:print-info 1 *default-log-port* "Not starting monitor, already have more than two running") (let* ((megatestdb (conc *toppath* "/megatest.db")) (monitordbf (conc (common:get-db-tmp-area *alldat*) "/monitor.db")) (last-db-update 0)) ;; (file-modification-time megatestdb))) @@ -323,11 +325,11 @@ ;; creation_time TIMESTAMP DEFAULT (strftime('%s','now')), ;; execution_time TIMESTAMP); ;; register a task -(define (tasks:add dbstruct action owner target runname testpatt params) +#;(define (tasks:add dbstruct action owner target runname testpatt params) (db:with-db dbstruct #f #t (lambda (db) (sqlite3:execute db "INSERT INTO tasks_queue (action,owner,state,target,name,testpatt,params,creation_time,execution_time) VALUES (?,?,'new',?,?,?,?,strftime('%s','now'),0);" @@ -336,11 +338,11 @@ target runname testpatt (if params params ""))))) -(define (keys:key-vals-hash->target keys key-params) +#;(define (keys:key-vals-hash->target keys key-params) (let ((tmp (hash-table-ref/default key-params (vector-ref (car keys) 0) ""))) (if (> (length keys) 1) (for-each (lambda (key) (set! tmp (conc tmp "/" (hash-table-ref/default key-params (vector-ref key 0) "")))) (cdr keys))) @@ -356,11 +358,11 @@ ;; (params (hash-table-ref/default var-params "params" ""))) ;; (tasks:add mdb action owner target runname testpatts params))) ;; return one task from those who are 'new' OR 'waiting' AND more than 10sec old ;; -(define (tasks:snag-a-task dbstruct) +#;(define (tasks:snag-a-task dbstruct) (let ((res #f) (keytxt (conc (current-process-id) "-" (get-host-name) "-" (car (user-information (current-user-id)))))) (db:with-db dbstruct #f #t (lambda (db) @@ -384,11 +386,11 @@ (sqlite3:execute db "UPDATE tasks_queue SET state='inprogress',execution_time=strftime('%s','now') WHERE id=?;" (tasks:task-get-id res)) res) #f))))) -(define (tasks:reset-stuck-tasks dbstruct) +#;(define (tasks:reset-stuck-tasks dbstruct) (let ((res '())) (db:with-db dbstruct #f #t (lambda (db) (sqlite3:for-each-row @@ -401,11 +403,11 @@ (conc "UPDATE tasks_queue SET state='reset' WHERE id IN ('" (string-intersperse (map conc res) "','") "');") ))))) ;; return all tasks in the tasks_queue table ;; -(define (tasks:get-tasks dbstruct types states) +#;(define (tasks:get-tasks dbstruct types states) (let ((res '())) (db:with-db dbstruct #f #f (lambda (db) (sqlite3:for-each-row @@ -418,11 +420,11 @@ ;; state IN " statesstr " AND ;; action IN " actionsstr " ORDER BY creation_time DESC;")) res)))) -(define (tasks:get-last dbstruct target runname) +#;(define (tasks:get-last dbstruct target runname) (let ((res #f)) (db:with-db dbstruct #f #f (lambda (db) (sqlite3:for-each-row @@ -436,17 +438,17 @@ ORDER BY creation_time DESC LIMIT 1;") target runname) res)))) ;; remove tasks given by a string of numbers comma separated -(define (tasks:remove-queue-entries dbstruct task-ids) +#;(define (tasks:remove-queue-entries dbstruct task-ids) (db:with-db dbstruct #f #t (lambda (db) (sqlite3:execute db (conc "DELETE FROM tasks_queue WHERE id IN (" task-ids ");"))))) -(define (tasks:process-queue dbstruct) +#;(define (tasks:process-queue dbstruct) (let* ((task (tasks:snag-a-task dbstruct)) (action (if task (tasks:task-get-action task) #f))) (if action (print "tasks:process-queue task: " task)) (if action (case (string->symbol action) @@ -456,11 +458,11 @@ ;; ((monitor) (tasks:start-monitor db task)) ((rollup) (tasks:rollup-runs dbstruct task)) ((updatemeta)(tasks:update-meta dbstruct task)) ((kill) (tasks:kill-monitors dbstruct task)))))) -(define (tasks:tasks->text tasks) +#;(define (tasks:tasks->text tasks) (let ((fmtstr "~10a~10a~10a~12a~20a~12a~12a~10a")) (conc (format #f fmtstr "id" "action" "owner" "state" "target" "runname" "testpatts" "params") "\n" (string-intersperse (map (lambda (task) (format #f fmtstr @@ -473,11 +475,11 @@ (tasks:task-get-test task) ;; (tasks:task-get-item task) (tasks:task-get-params task))) tasks) "\n")))) -(define (tasks:set-state dbstruct task-id state) +#;(define (tasks:set-state dbstruct task-id state) (db:with-db dbstruct #f #t (lambda (db) (sqlite3:execute db "UPDATE tasks_queue SET state=? WHERE id=?;" state @@ -485,27 +487,27 @@ ;;====================================================================== ;; Access using task key (stored in params; (hash-table->alist flags) hostname pid ;;====================================================================== -(define (tasks:param-key->id dbstruct task-params) +#;(define (tasks:param-key->id dbstruct task-params) (db:with-db dbstruct #f #f (lambda (db) (handle-exceptions exn #f (sqlite3:first-result db "SELECT id FROM tasks_queue WHERE params LIKE ?;" task-params))))) -(define (tasks:set-state-given-param-key dbstruct param-key new-state) +#;(define (tasks:set-state-given-param-key dbstruct param-key new-state) (db:with-db dbstruct #f #t (lambda (db) (sqlite3:execute db "UPDATE tasks_queue SET state=? WHERE params LIKE ?;" new-state param-key)))) -(define (tasks:get-records-given-param-key dbstruct param-key state-patt action-patt test-patt) +#;(define (tasks:get-records-given-param-key dbstruct param-key state-patt action-patt test-patt) (db:with-db dbstruct #f #f (lambda (db) (handle-exceptions exn @@ -512,11 +514,11 @@ '() (sqlite3:first-row db "SELECT id,action,owner,state,target,name,testpatt,keylock,params WHERE params LIKE ? AND state LIKE ? AND action LIKE ? AND testpatt LIKE ?;" param-key state-patt action-patt test-patt))))) -(define (tasks:find-task-queue-records dbstruct target run-name test-patt state-patt action-patt) +#;(define (tasks:find-task-queue-records dbstruct target run-name test-patt state-patt action-patt) ;; (handle-exceptions ;; exn ;; '() ;; (sqlite3:first-row (let ((db (db:delay-if-busy (db:get-db dbstruct))) @@ -532,11 +534,11 @@ ;; kill any runner processes (i.e. processes handling -runtests) that match target/runname ;; ;; do a remote call to get the task queue info but do the killing as self here. ;; -(define (tasks:kill-runner target run-name testpatt) +#;(define (tasks:kill-runner target run-name testpatt) (let ((records (rmt:tasks-find-task-queue-records target run-name testpatt "running" "run-tests")) (hostpid-rx (regexp "\\s+(\\w+)\\s+(\\d+)$"))) ;; host pid is at end of param string (if (null? records) (debug:print 0 *default-log-port* "No run launching processes found for " target " / " run-name " with testpatt " (or testpatt "* no testpatt specified! *")) (debug:print 0 *default-log-port* "Found " (length records) " run(s) to kill.")) @@ -610,11 +612,11 @@ ;; sync to postgres here for now. ;; attempt to automatically set up an area. call only if get area by path ;; returns naught of interest ;; -(define (tasks:set-area dbh configdat #!key (toppath #f)) ;; could I safely put *toppath* in for the default for toppath? when would it be evaluated? +#;(define (tasks:set-area dbh configdat #!key (toppath #f)) ;; could I safely put *toppath* in for the default for toppath? when would it be evaluated? (let loop ((area-name (or (configf:lookup configdat "setup" "area-name") (common:get-area-name *alldat*))) (modifier 'none)) (let ((success (handle-exceptions exn @@ -627,20 +629,20 @@ ((none)(loop (conc (current-user-name) "_" area-name) 'user)) ((user)(loop (conc (substring (common:get-area-path-signature) 0 4) area-name) 'areasig)) (else #f)))))) ;; give up -(define (task:print-runtime run-times saperator) +#;(define (task:print-runtime run-times saperator) (for-each (lambda (run-time-info) (let* ((run-name (vector-ref run-time-info 0)) (run-time (vector-ref run-time-info 1)) (target (vector-ref run-time-info 2))) (print target saperator run-name saperator run-time ))) run-times)) -(define (task:print-runtime-as-json run-times) +#;(define (task:print-runtime-as-json run-times) (let loop ((run-time-info (car run-times)) (rema (cdr run-times)) (str "")) (let* ((run-name (vector-ref run-time-info 0)) (run-time (vector-ref run-time-info 1)) @@ -650,11 +652,11 @@ (set! str (conc str ","))) (if (null? rema) (print "[" str "{target:" target ",run-name:" run-name ", run-time:" run-time "}]") (loop (car rema) (cdr rema) (conc str "{target:" target ", run-name:" run-name ", run-time:" run-time "}")))))) -(define (task:get-run-times) +#;(define (task:get-run-times) (let* ( (run-patt (if (args:get-arg "-run-patt") (args:get-arg "-run-patt") "%")) (target-patt (if (args:get-arg "-target-patt") @@ -671,11 +673,11 @@ (if (equal? (args:get-arg "-dumpmode") "csv") (task:print-runtime run-times ",") (task:print-runtime run-times " "))))) -(define (task:print-testtime test-times saperator) +#;(define (task:print-testtime test-times saperator) (for-each (lambda (test-time-info) (let* ((test-name (vector-ref test-time-info 0)) (test-time (vector-ref test-time-info 2)) (test-item (if (eq? (string-length (vector-ref test-time-info 1)) 0) @@ -682,11 +684,11 @@ "N/A" (vector-ref test-time-info 1)))) (print test-name saperator test-item saperator test-time ))) test-times)) -(define (task:print-testtime-as-json test-times) +#;(define (task:print-testtime-as-json test-times) (let loop ((test-time-info (car test-times)) (rema (cdr test-times)) (str "")) (let* ((test-name (vector-ref test-time-info 0)) (test-time (vector-ref test-time-info 2)) @@ -697,11 +699,11 @@ (if (null? rema) (print "[" str "{test-name:" test-name ", item-path:" item ", test-time:" test-time "}]") (loop (car rema) (cdr rema) (conc str "{test-name:" test-name ", item-path:" item ", test-time:" test-time "}")))))) - (define (task:get-test-times) +#; (define (task:get-test-times) (let* ((runname (if (args:get-arg "-runname") (args:get-arg "-runname") #f)) (target (if (args:get-arg "-target") (args:get-arg "-target") @@ -737,11 +739,11 @@ ;; gets mtpg-run-id and syncs the record if different ;; -(define (tasks:run-id->mtpg-run-id dbh cached-info run-id area-info smallest-last-update-time) +#;(define (tasks:run-id->mtpg-run-id dbh cached-info run-id area-info smallest-last-update-time) (let* ((runs-ht (hash-table-ref cached-info 'runs)) (runinf (hash-table-ref/default runs-ht run-id #f)) (area-id (vector-ref area-info 0))) (if runinf runinf ;; already cached @@ -810,11 +812,11 @@ (if (or (not smallest-time) (< last-update smallest-time)) (hash-table-set! smallest-last-update-time "smallest-time" last-update)) (tasks:run-id->mtpg-run-id dbh cached-info run-id area-info smallest-last-update-time)) #f))))))) -(define (task:add-run-tag dbh run-id tag) +#;(define (task:add-run-tag dbh run-id tag) (let* ((tag-info (pgdb:get-tag-info-by-name dbh tag))) (if (not tag-info) (begin (if (handle-exceptions exn @@ -832,11 +834,11 @@ #f) (if (not (pgdb:is-run-taged-with-a-tag dbh (vector-ref tag-info 0) run-id)) (pgdb:insert-run-tag dbh (vector-ref tag-info 0) run-id))))) -(define (tasks:sync-test-steps dbh cached-info test-step-ids smallest-last-update-time) +#;(define (tasks:sync-test-steps dbh cached-info test-step-ids smallest-last-update-time) ; (print "Sync Steps " test-step-ids ) (let ((test-ht (hash-table-ref cached-info 'tests)) (step-ht (hash-table-ref cached-info 'steps))) (for-each (lambda (test-step-id) @@ -875,11 +877,11 @@ (hash-table-set! step-ht step-id pgdb-step-id )) (debug:print-info 1 *default-log-port* "Error: Test not cashed"))) (debug:print-info 1 *default-log-port* "Error: Could not get test step info for step id " test-step-id )))) ;; this is a wierd senario need to debug test-step-ids))) -(define (tasks:sync-test-gen-data dbh cached-info test-data-ids smallest-last-update-time) +#;(define (tasks:sync-test-gen-data dbh cached-info test-data-ids smallest-last-update-time) (let ((test-ht (hash-table-ref cached-info 'tests)) (data-ht (hash-table-ref cached-info 'data))) (for-each (lambda (test-data-id) (let* ((test-data-info (rmt:get-data-info-by-id test-data-id)) @@ -935,11 +937,11 @@ (debug:print-info 1 *default-log-port* "Error: Could not get test data info for data id " test-data-id )))) ;; this is a wierd senario need to debug test-data-ids))) -(define (tasks:sync-tests-data dbh cached-info test-ids area-info smallest-last-update-time) +#;(define (tasks:sync-tests-data dbh cached-info test-ids area-info smallest-last-update-time) (let ((test-ht (hash-table-ref cached-info 'tests))) (for-each (lambda (test-id) ; (print test-id) (let* ((test-info (rmt:get-test-info-by-id #f test-id)) @@ -988,11 +990,11 @@ (set! pgdb-test-id (pgdb:get-test-id dbh pgdb-run-id test-name item-path)))) (hash-table-set! test-ht test-id pgdb-test-id)) (debug:print-info 1 *default-log-port* "WARNING: Skipping run with run-id:" run-id ". This run was created after privious sync and removed before this sync.")))) test-ids))) -(define (task:add-area-tag dbh area-info tag) +#;(define (task:add-area-tag dbh area-info tag) (let* ((tag-info (pgdb:get-tag-info-by-name dbh tag))) (if (not tag-info) (begin (if (handle-exceptions exn @@ -1009,11 +1011,11 @@ (debug:print-info 1 *default-log-port* ((condition-property-accessor 'exn 'message) exn)) #f) (if (not (pgdb:is-area-taged-with-a-tag dbh (vector-ref tag-info 0) (vector-ref area-info 0))) (pgdb:insert-area-tag dbh (vector-ref tag-info 0) (vector-ref area-info 0)))))) -(define (tasks:sync-run-data dbh cached-info run-ids area-info smallest-last-update-time) +#;(define (tasks:sync-run-data dbh cached-info run-ids area-info smallest-last-update-time) (for-each (lambda (run-id) (debug:print-info 1 *default-log-port* "Check if run with " run-id " needs to be synced" ) (tasks:run-id->mtpg-run-id dbh cached-info run-id area-info smallest-last-update-time)) run-ids)) @@ -1021,11 +1023,11 @@ ;; get runs changed since last sync ;; (define (tasks:sync-test-data dbh cached-info area-info) ;; (let* (( -(define (tasks:sync-to-postgres configdat dest) +#;(define (tasks:sync-to-postgres configdat dest) (print "In sync") (let* ((dbh (pgdb:open configdat dbname: dest)) (area-info (pgdb:get-area-by-path dbh *toppath*)) (cached-info (make-hash-table)) (start (current-seconds))