@@ -15,10 +15,42 @@ (declare (unit tasks)) (declare (uses db)) (declare (uses common)) (include "task_records.scm") + +;;====================================================================== +;; Tasks db +;;====================================================================== + +(define (tasks:open-db) + (let* ((dbpath (conc *toppath* "/monitor.db")) + (exists (file-exists? dbpath)) + (tdb (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath)) + (handler (make-busy-timeout 36000))) + (sqlite3:set-busy-handler! tdb handler) + (if (not exists) + (begin + (sqlite3:execute tdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY, + action TEXT DEFAULT '', + owner TEXT, + state TEXT DEFAULT 'new', + target TEXT DEFAULT '', + name TEXT DEFAULT '', + test TEXT DEFAULT '', + item TEXT DEFAULT '', + creation_time TIMESTAMP, + execution_time TIMESTAMP);") + (sqlite3:execute tdb "CREATE TABLE IF NOT EXISTS monitors (id INTEGER PRIMARY KEY, + pid INTEGER, + start_time TIMESTAMP, + last_update TIMESTAMP, + hostname TEXT, + username TEXT, + CONSTRAINT monitors_constraint UNIQUE (pid,hostname));"))) + tdb)) + ;;====================================================================== ;; Tasks and Task monitors ;;====================================================================== @@ -31,32 +63,32 @@ ;;====================================================================== ;; Task Monitors ;;====================================================================== -(define (tasks:register-monitor db) +(define (tasks:register-monitor db tdb) (let* ((pid (current-process-id)) (hostname (get-host-name)) (userinfo (user-information (current-user-id))) (username (car userinfo))) (print "Register monitor, pid: " pid ", hostname: " hostname ", username: " username) - (sqlite3:execute db "INSERT INTO monitors (pid,start_time,last_update,hostname,username) VALUES (?,strftime('%s','now'),strftime('%s','now'),?,?);" + (sqlite3:execute tdb "INSERT INTO monitors (pid,start_time,last_update,hostname,username) VALUES (?,strftime('%s','now'),strftime('%s','now'),?,?);" pid hostname username))) -(define (tasks:get-num-alive-monitors db) +(define (tasks:get-num-alive-monitors tdb) (let ((res 0)) (sqlite3:for-each-row (lambda (count) (set! res count)) - db + tdb "SELECT count(id) FROM monitors WHERE last_update < (strftime('%s','now') - 300) AND username=?;" (car (user-information (current-user-id)))) res)) ;; register a task -(define (tasks:add db action owner target runname test item) - (sqlite3:execute db "INSERT INTO tasks_queue (action,owner,state,target,name,test,item,creation_time,execution_time) +(define (tasks:add tdb action owner target runname test item) + (sqlite3:execute tdb "INSERT INTO tasks_queue (action,owner,state,target,name,test,item,creation_time,execution_time) VALUES (?,?,'new',?,?,?,?,strftime('%s','now'),0);" action owner target runname @@ -70,111 +102,113 @@ (set! tmp (conc tmp "/" (hash-table-ref/default key-params (vector-ref key 0) "")))) (cdr keys))) tmp)) ;; for use from the gui -(define (tasks:add-from-params db action keys key-params var-params) +(define (tasks:add-from-params tdb action keys key-params var-params) (let ((target (keys:key-vals-hash->target keys key-params)) (owner (car (user-information (current-user-id)))) (runname (hash-table-ref/default var-params "runname" #f)) (testpatts (hash-table-ref/default var-params "testpatts" "%")) (itempatts (hash-table-ref/default var-params "itempatts" "%"))) - (tasks:add db action owner target runname testpatts itempatts))) + (tasks:add tdb action owner target runname testpatts itempatts))) ;; return one task from those who are 'new' OR 'waiting' AND more than 10sec old ;; -(define (tasks:snag-a-task db) +(define (tasks:snag-a-task tdb) (let ((res #f)) (with-transaction - db + tdb (lambda () ;; execution time is updated with every snag, wait 10 secs before doing anything with the queue (sqlite3:for-each-row (lambda (id . rem) (set! res (apply vector id rem))) - db + tdb "SELECT id,action,owner,state,target,name,test,item,creation_time,execution_time FROM tasks_queue WHERE state='new' OR (state='waiting' AND (strftime('%s','now')-execution_time) > 10) OR state='reset' ORDER BY execution_time ASC LIMIT 1;") (if res ;; yep, have work to be done (begin - (sqlite3:execute db "UPDATE tasks_queue SET state='inprogress',execution_time=strftime('%s','now') WHERE id=?;" + (sqlite3:execute tdb "UPDATE tasks_queue SET state='inprogress',execution_time=strftime('%s','now') WHERE id=?;" (tasks:task-get-id res)) res) #f))))) -(define (tasks:reset-stuck-tasks db) +(define (tasks:reset-stuck-tasks tdb) (let ((res '())) (sqlite3:for-each-row (lambda (id delta) (set! res (cons id res))) - db + tdb "SELECT id,strftime('%s','now')-execution_time AS delta FROM tasks_queue WHERE state='inprogress' AND delta>700 ORDER BY delta DESC LIMIT 2;") (sqlite3:execute - db + tdb (conc "UPDATE tasks_queue SET state='reset' WHERE id IN ('" (string-intersperse (map conc res) "','") "');")))) ;; return all tasks in the tasks_queue table ;; -(define (tasks:get-tasks db types states) +(define (tasks:get-tasks tdb types states) (let ((res '())) (sqlite3:for-each-row (lambda (id . rem) (set! res (cons (apply vector id rem) res))) - db + tdb (conc "SELECT id,action,owner,state,target,name,test,item,creation_time,execution_time FROM tasks_queue " ;; WHERE ;; state IN " statesstr " AND ;; action IN " actionsstr " ORDER BY creation_time DESC;")) res)) -(define (tasks:start-monitor db) - (if (> (tasks:get-num-alive-monitors db) 2) ;; have two running, no need for more +;; +(define (tasks:start-monitor db tdb) + (if (> (tasks:get-num-alive-monitors tdb) 2) ;; have two running, no need for more (debug:print 1 "INFO: Not starting monitor, already have more than two running") (let* ((megatestdb (conc *toppath* "/megatest.db")) + (monitordbf (conc *toppath* "/monitor.db")) (last-db-update 0)) ;; (file-modification-time megatestdb))) - (task:register-monitor db) + (task:register-monitor tdb) (let loop ((count 0) (next-touch 0)) ;; next-touch is the time where we need to update last_update ;; if the db has been modified we'd best look at the task queue (let ((modtime (file-modification-time megatestdbpath ))) (if (> modtime last-db-update) - (tasks:process-queue db last-db-update megatestdb next-touch)) + (tasks:process-queue db tdb last-db-update megatestdb next-touch)) ;; WARNING: Possible race conditon here!! ;; should this update be immediately after the task-get-action call above? (if (> (current-seconds) next-touch) (begin - (tasks:monitors-update db) + (tasks:monitors-update tdb) (loop (+ count 1)(+ (current-seconds) 240))) (loop (+ count 1) next-touch))))))) -(define (tasks:process-queue db megatestdbpath) - (let* ((task (tasks:snag-a-task db)) +(define (tasks:process-queue db tdb) + (let* ((task (tasks:snag-a-task tdb)) (action (if task (tasks:task-get-action task) #f))) (print "tasks:process-queue task: " task) (if action (case (string->symbol action) - ((run) (tasks:start-run db task)) - ((remove) (tasks:remove-runs db task)) - ((lock) (tasks:lock-runs db task)) + ((run) (tasks:start-run db tdb task)) + ((remove) (tasks:remove-runs db tdb task)) + ((lock) (tasks:lock-runs db tdb task)) ;; ((monitor) (tasks:start-monitor db task)) - ((rollup) (tasks:rollup-runs db task)) - ((updatemeta)(tasks:update-meta db task)) - ((kill) (tasks:kill-monitors db task)))))) + ((rollup) (tasks:rollup-runs db tdb task)) + ((updatemeta)(tasks:update-meta db tdb task)) + ((kill) (tasks:kill-monitors db tdb task)))))) -(define (tasks:get-monitors db) +(define (tasks:get-monitors tdb) (let ((res '())) (sqlite3:for-each-row (lambda (a . rem) (set! res (cons (apply vector a rem) res))) - db + tdb "SELECT id,pid,strftime('%m/%d/%Y %H:%M',datetime(start_time,'unixepoch'),'localtime'),strftime('%m/%d/%Y %H:%M:%S',datetime(last_update,'unixepoch'),'localtime'),hostname,username FROM monitors ORDER BY last_update ASC;") (reverse res) )) (define (tasks:tasks->text tasks) @@ -208,35 +242,35 @@ monitors) "\n")))) ;; update the last_update field with the current time and ;; if any monitors appear dead, remove them -(define (tasks:monitors-update db) - (sqlite3:execute db "UPDATE monitors SET last_update=strftime('%s','now') WHERE pid=? AND hostname=?;" +(define (tasks:monitors-update tdb) + (sqlite3:execute tdb "UPDATE monitors SET last_update=strftime('%s','now') WHERE pid=? AND hostname=?;" (current-process-id) (get-host-name)) (let ((deadlist '())) (sqlite3:for-each-row (lambda (id pid host last-update delta) (print "Going to delete stale record for monitor with pid " pid " on host " host " last updated " delta " seconds ago") (set! deadlist (cons id deadlist))) - db + tdb "SELECT id,pid,hostname,last_update,strftime('%s','now')-last_update AS delta FROM monitors WHERE delta > 700;") - (sqlite3:execute db (conc "DELETE FROM monitors WHERE id IN ('" (string-intersperse (map conc deadlist) "','") "');"))) + (sqlite3:execute tdb (conc "DELETE FROM monitors WHERE id IN ('" (string-intersperse (map conc deadlist) "','") "');"))) ) -(define (tasks:remove-monitor-record db) - (sqlite3:execute db "DELETE FROM monitors WHERE pid=? AND hostname=?;" +(define (tasks:remove-monitor-record tdb) + (sqlite3:execute tdb "DELETE FROM monitors WHERE pid=? AND hostname=?;" (current-process-id) (get-host-name))) -(define (tasks:set-state db task-id state) - (sqlite3:execute db "UPDATE tasks_queue SET state=? WHERE id=?;" +(define (tasks:set-state tdb task-id state) + (sqlite3:execute tdb "UPDATE tasks_queue SET state=? WHERE id=?;" state task-id)) -(define (tasks:start-run db task) +(define (tasks:start-run db tdb task) (let ((flags (make-hash-table))) (hash-table-set! flags "-rerun" "NOT_STARTED") (print "Starting run " task) ;; sillyness, just call the damn routine with the task vector and be done with it. FIXME SOMEDAY (runs:run-tests db @@ -244,6 +278,6 @@ (tasks:task-get-name task) (tasks:task-get-test task) (tasks:task-get-item task) (tasks:task-get-owner task) flags) - (tasks:set-state db (tasks:task-get-id task) "waiting"))) + (tasks:set-state tdb (tasks:task-get-id task) "waiting")))