@@ -36,10 +36,11 @@ state TEXT DEFAULT 'new', target TEXT DEFAULT '', name TEXT DEFAULT '', test TEXT DEFAULT '', item TEXT DEFAULT '', + keylock TEXT, creation_time TIMESTAMP, execution_time TIMESTAMP);") (sqlite3:execute tdb "CREATE TABLE IF NOT EXISTS monitors (id INTEGER PRIMARY KEY, pid INTEGER, start_time TIMESTAMP, @@ -113,32 +114,34 @@ (tasks:add tdb action owner target runname testpatts itempatts))) ;; return one task from those who are 'new' OR 'waiting' AND more than 10sec old ;; (define (tasks:snag-a-task tdb) - (let ((res #f)) - (with-transaction - tdb - (lambda () - ;; execution time is updated with every snag, wait 10 secs before doing anything with the queue - (sqlite3:for-each-row - (lambda (id . rem) - (set! res (apply vector id rem))) - tdb - "SELECT id,action,owner,state,target,name,test,item,creation_time,execution_time - FROM tasks_queue - WHERE - state='new' OR - (state='waiting' AND (strftime('%s','now')-execution_time) > 10) OR - state='reset' - ORDER BY execution_time ASC LIMIT 1;") - (if res ;; yep, have work to be done - (begin - (sqlite3:execute tdb "UPDATE tasks_queue SET state='inprogress',execution_time=strftime('%s','now') WHERE id=?;" - (tasks:task-get-id res)) - res) - #f))))) + (let ((res #f) + (keytxt (conc (current-process-id) "-" (get-host-name) "-" (car (user-information (current-user-id)))))) + + ;; first randomly set a new to pid-hostname-hostname + (sqlite3:execute + tdb + "UPDATE tasks_queue SET keylock=? WHERE id IN + (SELECT id FROM tasks_queue + WHERE state='new' OR + (state='waiting' AND (strftime('%s','now')-execution_time) > 10) OR + state='reset' + ORDER BY RANDOM() LIMIT 1);" keytxt) + + (sqlite3:for-each-row + (lambda (id . rem) + (set! res (apply vector id rem))) + tdb + "SELECT id,action,owner,state,target,name,test,item,creation_time,execution_time FROM tasks_queue WHERE keylock=? ORDER BY execution_time ASC LIMIT 1;" keytxt) + (if res ;; yep, have work to be done + (begin + (sqlite3:execute tdb "UPDATE tasks_queue SET state='inprogress',execution_time=strftime('%s','now') WHERE id=?;" + (tasks:task-get-id res)) + res) + #f))) (define (tasks:reset-stuck-tasks tdb) (let ((res '())) (sqlite3:for-each-row (lambda (id delta) @@ -163,10 +166,14 @@ ;; state IN " statesstr " AND ;; action IN " actionsstr " ORDER BY creation_time DESC;")) res)) +;; remove tasks given by a string of numbers comma separated +(define (tasks:remove-queue-entries tdb task-ids) + (sqlite3:execute tdb (conc "DELETE FROM tasks_queue WHERE id IN (" task-ids ");"))) + ;; (define (tasks:start-monitor db tdb) (if (> (tasks:get-num-alive-monitors tdb) 2) ;; have two running, no need for more (debug:print 1 "INFO: Not starting monitor, already have more than two running") (let* ((megatestdb (conc *toppath* "/megatest.db"))