Index: lock-queue.scm ================================================================== --- lock-queue.scm +++ lock-queue.scm @@ -5,15 +5,10 @@ ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. -;;====================================================================== -;; launch a task - this runs on the originating host, tests themselves -;; -;;====================================================================== - (use sqlite3 srfi-18) (import (prefix sqlite3 sqlite3:)) (declare (unit lock-queue)) (declare (uses common)) @@ -21,25 +16,35 @@ ;;====================================================================== ;; attempt to prevent overlapping updates of rollup files by queueing ;; update requests in an sqlite db ;;====================================================================== +;;====================================================================== +;; db record, +;;====================================================================== + +(define (make-lock-queue:db-dat)(make-vector 3)) +(define-inline (lock-queue:db-dat-get-db vec) (vector-ref vec 0)) +(define-inline (lock-queue:db-dat-get-path vec) (vector-ref vec 1)) +(define-inline (lock-queue:db-dat-set-db! vec val)(vector-set! vec 0 val)) +(define-inline (lock-queue:db-dat-set-path! vec val)(vector-set! vec 1 val)) + (define (lock-queue:open-db fname #!key (count 10)) (let* ((actualfname (conc fname ".lockdb")) (dbexists (file-exists? actualfname)) (db (sqlite3:open-database actualfname)) (handler (make-busy-timeout 136000))) (if dbexists - db + (vector db actualfname) (begin (handle-exceptions exn (begin (thread-sleep! 10) (if (> count 0) (lock-queue:open-db fname count: (- count 1)) - db)) + (vector db actualfname))) (sqlite3:with-transaction db (lambda () (sqlite3:execute db @@ -55,63 +60,67 @@ id INTEGER PRIMARY KEY, test_id INTEGER, run_lock TEXT, CONSTRAINT runlock_constraint UNIQUE (run_lock));")))))) (sqlite3:set-busy-handler! db handler) - db)) + (vector db actualfname))) -(define (lock-queue:set-state db test-id newstate #!key (remtries 10)) +(define (lock-queue:set-state dbdat test-id newstate #!key (remtries 10)) + (tasks:wait-on-journal (lock-queue:db-dat-get-path dbdat) 1200) (handle-exceptions exn (if (> remtries 0) (begin (debug:print 0 "WARNING: exception on lock-queue:set-state. Trying again in 30 seconds.") (debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) (thread-sleep! 30) - (lock-queue:set-state db test-id newstate remtries: (- remtries 1))) + (lock-queue:set-state dbdat test-id newstate remtries: (- remtries 1))) (begin (debug:print 0 "ERROR: Failed to set lock state for test with id " test-id ", error: " ((condition-property-accessor 'exn 'message) exn) ", giving up.") #f)) - (sqlite3:execute db "UPDATE queue SET state=? WHERE test_id=?;" + (sqlite3:execute (lock-queue:db-dat-get-db dbdat) "UPDATE queue SET state=? WHERE test_id=?;" newstate test-id))) -(define (lock-queue:any-younger? db mystart test-id #!key (remtries 10)) +(define (lock-queue:any-younger? dbdat mystart test-id #!key (remtries 10)) + (tasks:wait-on-journal (lock-queue:db-dat-get-path dbdat) 1200) (handle-exceptions exn (if (> remtries 0) (begin (debug:print 0 "WARNING: exception on lock-queue:any-younger. Trying again in 30 seconds.") (debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) (thread-sleep! 30) - (lock-queue:any-younger? db mystart test-id remtries: (- remtries 1))) + (lock-queue:any-younger? dbdat mystart test-id remtries: (- remtries 1))) (begin (debug:print 0 "ERROR: Failed to find younger locks for test with id " test-id ", error: " ((condition-property-accessor 'exn 'message) exn) ", giving up.") #f)) (let ((res #f)) (sqlite3:for-each-row (lambda (tid) ;; Actually this should not be needed as mystart cannot be simultaneously less than and test-id same as (if (not (equal? tid test-id)) (set! res tid))) - db + (lock-queue:db-dat-get-db dbdat) "SELECT test_id FROM queue WHERE start_time > ?;" mystart) res))) -(define (lock-queue:get-lock db test-id #!key (count 10)) - (let ((res #f) - (lckqry (sqlite3:prepare db "SELECT test_id,run_lock FROM runlocks WHERE run_lock='locked';")) - (mklckqry (sqlite3:prepare db "INSERT INTO runlocks (test_id,run_lock) VALUES (?,'locked');"))) +(define (lock-queue:get-lock dbdat test-id #!key (count 10)) + (tasks:wait-on-journal (lock-queue:db-dat-get-path dbdat) 1200 remove: #t waiting-msg "lock-queue:get-lock, waiting on journal") + (let* ((res #f) + (db (lock-queue:db-dat-get-db dbdat)) + (lckqry (sqlite3:prepare db "SELECT test_id,run_lock FROM runlocks WHERE run_lock='locked';")) + (mklckqry (sqlite3:prepare db "INSERT INTO runlocks (test_id,run_lock) VALUES (?,'locked');"))) (let ((result (handle-exceptions exn (begin (debug:print 0 "WARNING: failed to get queue lock. Will try again in a few seconds") (debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) (thread-sleep! 10) (if (> count 0) - (lock-queue:get-lock db test-id count: (- count 1))) + (lock-queue:get-lock dbdat test-id count: (- count 1))) #f) (sqlite3:with-transaction db (lambda () (sqlite3:for-each-row (lambda (tid lockstate) @@ -128,72 +137,80 @@ (sqlite3:finalize! lckqry) (sqlite3:finalize! mklckqry) result))) (define (lock-queue:release-lock fname test-id #!key (count 10)) - (let ((db (lock-queue:open-db fname))) + (let* ((dbdat (lock-queue:open-db fname))) (handle-exceptions exn (begin (debug:print 0 "WARNING: Failed to release queue lock. Will try again in few seconds") (debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) (thread-sleep! 10) (if (> count 0) - (lock-queue:release-lock fname test-id count: (- count 1)) + (begin + (sqlite3:finalize! (lock-queue:db-dat-get-db dbdat)) + (lock-queue:release-lock fname test-id count: (- count 1))) #f)) - (sqlite3:execute db "DELETE FROM runlocks WHERE test_id=?;" test-id) - (sqlite3:finalize! db)))) + (sqlite3:execute (lock-queue:db-dat-get-db dbdat) "DELETE FROM runlocks WHERE test_id=?;" test-id) + (sqlite3:finalize! (lock-queue:db-dat-get-db dbdat))))) -(define (lock-queue:steal-lock db test-id #!key (count 10)) +(define (lock-queue:steal-lock dbdat test-id #!key (count 10)) + (debug:print-info 0 "Attempting to steal lock at " (lock-queue:db-dat-get-path dbdat)) + (tasks:wait-on-journal (lock-queue:db-dat-get-path dbdat) 1200 "lock-queue:steal-lock; waiting on journal") (handle-exceptions exn (begin (debug:print 0 "WARNING: Failed to steal queue lock. Will try again in few seconds") (debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) (thread-sleep! 10) (if (> count 0) - (lock-queue:steal-lock db test-id count: (- count 1)) + (lock-queue:steal-lock dbdat test-id count: (- count 1)) #f)) - (sqlite3:execute db "DELETE FROM runlocks WHERE run_lock='locked';")) - (lock-queue:get-lock db test-it)) + (sqlite3:execute (lock-queue:db-dat-get-db dbdat) "DELETE FROM runlocks WHERE run_lock='locked';")) + (lock-queue:get-lock dbdat test-it)) ;; returns #f if ok to skip the task ;; returns #t if ok to proceed with task ;; otherwise waits ;; (define (lock-queue:wait-turn fname test-id #!key (count 10)) - (let ((db (lock-queue:open-db fname)) - (mystart (current-seconds))) + (let* ((dbdat (lock-queue:open-db fname)) + (mystart (current-seconds)) + (db (lock-queue:db-dat-get-db dbdat))) (handle-exceptions exn (begin (debug:print 0 "WARNING: Failed to find out if it is ok to skip the wait queue. Will try again in few seconds") (debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) (thread-sleep! 10) (if (> count 0) - (lock-queue:wait-turn fname test-id count: (- count 1)) + (begin + (sqlite3:finalize! db) + (lock-queue:wait-turn fname test-id count: (- count 1))) #f)) + (tasks:wait-on-journal (lock-queue:db-dat-get-path dbdat) 1200 waiting-msg: "lock-queue:wait-turn; waiting on journal file") (sqlite3:execute db "INSERT OR REPLACE INTO queue (test_id,start_time,state) VALUES (?,?,'waiting');" test-id mystart) (thread-sleep! 1) ;; give other tests a chance to register (let ((result - (let loop ((younger-waiting (lock-queue:any-younger? db mystart test-id))) + (let loop ((younger-waiting (lock-queue:any-younger? dbdat mystart test-id))) (if younger-waiting (begin ;; no need for us to wait. mark in the lock queue db as skipping - (lock-queue:set-state db test-id "skipping") + (lock-queue:set-state dbdat test-id "skipping") #f) ;; let the calling process know that nothing needs to be done - (if (lock-queue:get-lock db test-id) + (if (lock-queue:get-lock dbdat test-id) #t (if (> (- (current-seconds) mystart) 36000) ;; waited too long, steal the lock - (lock-queue:steal-lock db test-id) + (lock-queue:steal-lock dbdat test-id) (begin (thread-sleep! 1) - (loop (lock-queue:any-younger? db mystart test-id))))))))) + (loop (lock-queue:any-younger? dbdat mystart test-id))))))))) (sqlite3:finalize! db) result)))) ;; (use trace) ;; (trace lock-queue:get-lock lock-queue:release-lock lock-queue:wait-turn lock-queue:any-younger? lock-queue:set-state) Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -22,26 +22,30 @@ ;; Tasks db ;;====================================================================== ;; wait up to aprox n seconds for a journal to go away ;; -(define (tasks:wait-on-journal path n #!key (remove #f)) +(define (tasks:wait-on-journal path n #!key (remove #f)(waiting-msg #f)) (let ((fullpath (conc path "-journal"))) (handle-exceptions exn #t ;; if stuff goes wrong just allow it to move on (let loop ((journal-exists (file-exists? fullpath)) (count n)) ;; wait ten times ... (if journal-exists - (if (> count 0) - (begin - (thread-sleep! 1) - (loop (file-exists? fullpath) - (- count 1))) - (begin - (if remove (system (conc "rm -rf " path))) - #f)) + (begin + (if (and waiting-msg + (eq? (modulo n 30) 0)) + (debug:print 0 waiting-msg)) + (if (> count 0) + (begin + (thread-sleep! 1) + (loop (file-exists? fullpath) + (- count 1))) + (begin + (if remove (system (conc "rm -rf " fullpath))) + #f))) #t))))) (define (tasks:get-task-db-path) (if *task-db* (vector-ref *task-db* 1)