@@ -207,22 +207,24 @@ )) (define (tasks:num-in-available-state mdb run-id) (let ((res 0)) (dbi:for-each-row + (lambda (output) (lambda (num-in-queue) - (set! res num-in-queue)) + (set! res num-in-queue))) mdb "SELECT count(id) FROM servers WHERE run_id=? AND state = 'available' AND (strftime('%s','now') - start_time) < 30 ;" run-id) res)) (define (tasks:num-servers-non-zero-running mdb) (let ((res 0)) (dbi:for-each-row + (lambda (output) (lambda (num-running) - (set! res num-running)) + (set! res num-running))) mdb "SELECT count(id) FROM servers WHERE run_id != 0 AND state = 'running';") res)) (define (tasks:server-clean-out-old-records-for-run-id mdb run-id tag) @@ -279,12 +281,13 @@ ;; (string->number (config-lookup *configdat* "server" "port"))) ;; (string->number (config-lookup *configdat* "server" "port")) ;; #f)) ) (dbi:for-each-row + (lambda (output) (lambda (port) - (set! used-ports (cons port used-ports))) + (set! used-ports (cons port used-ports)))) mdb "SELECT port FROM servers;") (cond ((and port-param res) (if (> res port-param) res port-param)) (port-param port-param) @@ -326,12 +329,13 @@ (define (tasks:server-get-servers-vying-for-run-id mdb run-id) (let* ((header (list "id" "hostname" "pid" "interface" "port" "pubport" "state" "run_id" "priority" "start_time")) (selstr (string-intersperse header ",")) (res '())) (dbi:for-each-row + (lambda (output) (lambda (a . b) - (set! res (cons (apply vector a b) res))) + (set! res (cons (apply vector a b) res)))) mdb (conc "SELECT " selstr " FROM servers WHERE run_id=? AND state in ('available','running','dbprep') ORDER BY start_time DESC;") run-id) (vector header res))) @@ -351,12 +355,13 @@ (debug:print 0 *default-log-port* " trying call to tasks:get-server again in 10 seconds") (thread-sleep! 10) (tasks:get-server mdb run-id retries: (- retries 0))) (debug:print 0 *default-log-port* "10 tries of tasks:get-server all crashed and burned. Giving up and returning \"no server found\""))) (dbi:for-each-row + (lambda (output) (lambda (id interface port pubport transport pid hostname) - (set! res (vector id interface port pubport transport pid hostname))) + (set! res (vector id interface port pubport transport pid hostname)))) mdb ;; removed: ;; strftime('%s','now')-heartbeat < 10 AND mt_version = ? "SELECT id,interface,port,pubport,transport,pid,hostname FROM servers WHERE run_id=? AND state='running' @@ -364,21 +369,23 @@ res))) (define (tasks:server-running-or-starting? mdb run-id) (let ((res #f)) (dbi:for-each-row + (lambda (output) (lambda (id) - (set! res id)) + (set! res id))) mdb ;; NEEDS dbprep ADDED "SELECT id FROM servers WHERE run_id=? AND (state = 'running' OR (state = 'dbprep' AND (strftime('%s','now') - start_time) < 60));" run-id) res)) (define (tasks:server-running? mdb run-id) (let ((res #f)) (dbi:for-each-row + (lambda (output) (lambda (id) - (set! res id)) + (set! res id))) mdb ;; NEEDS dbprep ADDED "SELECT id FROM servers WHERE run_id=? AND state = 'running';" run-id) res)) (define (tasks:need-server run-id) @@ -419,36 +426,41 @@ #f))) (define (tasks:get-all-servers mdb) (let ((res '())) (dbi:for-each-row + (lambda (output) (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id) ;; 0 1 2 3 4 5 6 7 8 9 10 11 12 - (set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id) res))) + (set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id) res)))) mdb "SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport,run_id FROM servers WHERE state NOT LIKE 'defunct%' ORDER BY start_time DESC;") res)) (define (tasks:get-server-by-id mdb id) (let ((res #f)) (dbi:for-each-row + (lambda (output) + (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id) ;; 0 1 2 3 4 5 6 7 8 9 10 11 12 - (set! res (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id))) + (set! res (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id)))) mdb "SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport,run_id FROM servers WHERE id=?;" id) res)) (define (tasks:get-server-records mdb run-id) (let ((res '())) (dbi:for-each-row + (lambda (output) + (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id) ;; 0 1 2 3 4 5 6 7 8 9 10 11 12 - (set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id) res))) + (set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id) res)))) mdb "SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport,run_id FROM servers WHERE run_id=? AND state NOT LIKE 'defunct%' ORDER BY start_time DESC;" run-id) (reverse res))) @@ -490,12 +502,13 @@ (get-host-name))) (define (tasks:get-monitors mdb) (let ((res '())) (dbi:for-each-row + (lambda (output) (lambda (a . rem) - (set! res (cons (apply vector a rem) res))) + (set! res (cons (apply vector a rem) res)))) mdb "SELECT id,pid,strftime('%m/%d/%Y %H:%M',datetime(start_time,'unixepoch'),'localtime'),strftime('%m/%d/%Y %H:%M:%S',datetime(last_update,'unixepoch'),'localtime'),hostname,username FROM monitors ORDER BY last_update ASC;") (reverse res) )) @@ -520,13 +533,14 @@ (dbi:exec mdb "UPDATE monitors SET last_update=strftime('%s','now') WHERE pid=? AND hostname=?;" (current-process-id) (get-host-name)) (let ((deadlist '())) (dbi:for-each-row + (lambda (output) (lambda (id pid host last-update delta) (print "Going to delete stale record for monitor with pid " pid " on host " host " last updated " delta " seconds ago") - (set! deadlist (cons id deadlist))) + (set! deadlist (cons id deadlist)))) mdb "SELECT id,pid,hostname,last_update,strftime('%s','now')-last_update AS delta FROM monitors WHERE delta > 700;") (dbi:exec mdb (conc "DELETE FROM monitors WHERE id IN ('" (string-intersperse (map conc deadlist) "','") "');"))) ) (define (tasks:register-monitor db port) @@ -539,12 +553,13 @@ pid hostname username))) (define (tasks:get-num-alive-monitors mdb) (let ((res 0)) (dbi:for-each-row + (lambda (output) (lambda (count) - (set! res count)) + (set! res count))) mdb "SELECT count(id) FROM monitors WHERE last_update < (strftime('%s','now') - 300) AND username=?;" (car (user-information (current-user-id)))) res)) @@ -642,12 +657,13 @@ (state='waiting' AND (strftime('%s','now')-execution_time) > 10) OR state='reset' ORDER BY RANDOM() LIMIT 1);" keytxt) (dbi:for-each-row + (lambda (output) (lambda (id . rem) - (set! res (apply vector id rem))) + (set! res (apply vector id rem)))) db "SELECT id,action,owner,state,target,name,test,item,params,creation_time,execution_time FROM tasks_queue WHERE keylock=? ORDER BY execution_time ASC LIMIT 1;" keytxt) (if res ;; yep, have work to be done (begin (dbi:exec db "UPDATE tasks_queue SET state='inprogress',execution_time=strftime('%s','now') WHERE id=?;" @@ -659,12 +675,13 @@ (let ((res '())) (db:with-db dbstruct #f #t (lambda (db) (dbi:for-each-row + (lambda (output) (lambda (id delta) - (set! res (cons id res))) + (set! res (cons id res)))) db "SELECT id,strftime('%s','now')-execution_time AS delta FROM tasks_queue WHERE state='inprogress' AND delta>700 ORDER BY delta DESC LIMIT 2;") (dbi:exec db (conc "UPDATE tasks_queue SET state='reset' WHERE id IN ('" (string-intersperse (map conc res) "','") "');") @@ -676,12 +693,13 @@ (let ((res '())) (db:with-db dbstruct #f #f (lambda (db) (dbi:for-each-row + (lambda (output) (lambda (id . rem) - (set! res (cons (apply vector id rem) res))) + (set! res (cons (apply vector id rem) res)))) db (conc "SELECT id,action,owner,state,target,name,test,item,params,creation_time,execution_time FROM tasks_queue " ;; WHERE ;; state IN " statesstr " AND @@ -693,12 +711,13 @@ (let ((res #f)) (db:with-db dbstruct #f #f (lambda (db) (dbi:for-each-row - (lambda (id . rem) - (set! res (apply vector id rem))) + (lambda (output) + (lambda (id . rem) + (set! res (apply vector id rem)))) db (conc "SELECT id,action,owner,state,target,name,testpatt,keylock,params,creation_time,execution_time FROM tasks_queue WHERE target = ? AND name =? @@ -788,13 +807,14 @@ ;; exn ;; '() ;; (sqlite3:first-row (let ((db (db:delay-if-busy (db:get-db dbstruct #f))) (res '())) - (dbi:for-each-row + (dbi:for-each-row + (lambda (output) (lambda (a . b) - (set! res (cons (cons a b) res))) + (set! res (cons (cons a b) res)))) db "SELECT id,action,owner,state,target,name,testpatt,keylock,params FROM tasks_queue WHERE target = ? AND name = ? AND state LIKE ? AND action LIKE ? AND testpatt LIKE ?;" target run-name state-patt action-patt test-patt) res)) ;; )