Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -34,13 +34,15 @@ (include "key_records.scm") (include "run_records.scm") ;; timestamp type (val1 val2 ...) ;; type: meta-info, step -(define *incoming-data* '()) +(define *incoming-writes* '()) +(define *completed-writes* '()) (define *incoming-last-time* (current-seconds)) (define *incoming-mutex* (make-mutex)) +(define *completed-mutex* (make-mutex)) (define *cache-on* #f) (define (db:set-sync db) (let* ((syncval (config-lookup *configdat* "setup" "synchronous")) (val (cond ;; 0 | OFF | 1 | NORMAL | 2 | FULL; @@ -1320,12 +1322,12 @@ (open-run-close (lambda (db . junkparams) (let ((queries (make-hash-table)) (data #f)) (mutex-lock! *incoming-mutex*) - (set! data (sort *incoming-data* (lambda (a b)(< (vector-ref a 1)(vector-ref b 1))))) - (set! *incoming-data* '()) + (set! data (reverse *incoming-writes*)) ;; (sort ... (lambda (a b)(< (vector-ref a 1)(vector-ref b 1))))) + (set! *incoming-writes* '()) (mutex-unlock! *incoming-mutex*) (if (> (length data) 0) (debug:print-info 4 "Writing cached data " data)) ;; prepare the needed statements (for-each (lambda (request-item) @@ -1334,40 +1336,32 @@ (let ((stmt (alist-ref stmt-key db:queries))) (if stmt (hash-table-set! queries stmt-key (sqlite3:prepare db (car stmt))) (debug:print 0 "ERROR: Missing query spec for " stmt-key "!")))))) data) - (let outerloop ((special-qry #f) - (stmts data)) - (if special-qry - ;; handle a query that cannot be part of the grouped queries - (let* ((stmt-key (vector-ref special-qry 0)) - (qry (hash-table-ref queries stmt-key)) - (params (vector-ref speical-qry 2))) - (apply sqlite3:execute db qry params) - (if (not (null? stmts)) - (outerloop #f stmts))) - ;; handle normal queries - (sqlite3:with-transaction - db - (lambda () - (debug:print-info 11 "flushing " stmts " to db") - (if (not (null? stmts)) - (let innerloop ((hed (car stmts)) - (tal (cdr stmts))) - (let ((params (vector-ref hed 2)) - (stmt-key (vector-ref hed 0))) - (if (not (member stmt-key db:special-queries)) - (begin - (debug:print-info 11 "Executing " stmt-key " for " params) - (apply sqlite3:execute (hash-table-ref queries stmt-key) params) - (if (not (null? tal)) - (innerloop (car tal)(cdr tal)))) - (outerloop hed tal))))))))) + ;; No outer loop needed. Single loop for write items only. Reads trigger flush of queue + ;; and then are executed. + (sqlite3:with-transaction + db + (lambda () + (debug:print-info 11 "flushing " data " to db") + (for-each + (lambda (hed) + (let ((params (vector-ref hed 2)) + (stmt-key (vector-ref hed 0))) + (debug:print-info 11 "Executing " stmt-key " for " params) + (apply sqlite3:execute (hash-table-ref queries stmt-key) params))) + data))) + ;; let all the waiting calls know all is done + (mutex-lock! *completed-mutex*) + (set! *completed-writes* (append *completed-writes* data)) + (mutex-unlock! *completed-mutex*) + ;; finalize the statements (for-each (lambda (stmt-key) (sqlite3:finalize! (hash-table-ref queries stmt-key))) (hash-table-keys queries)) + ;; keep a little chest thumping data around (let ((cache-size (length data))) (if (> cache-size *max-cache-size*) (set! *max-cache-size* cache-size))) )) #f)) @@ -1383,10 +1377,49 @@ ;; (for-each ;; (lambda (item) ;; (db:process-queue-item db pubsock item)) ;; data))) +(define (db:queue-write-and-wait db item) + (let ((res #f) + (got-it #f) + (qry-sig (cdb:packet-get-query-sig item))) + (mutex-lock! *incoming-mutex*) + (set! *incoming-writes (cons item *incoming-writes*)) + (mutex-unlock! *incoming-mutex*) + ;; let the queue build three times, look for processed + ;; item. + (let loop ((count 0)) + (debug:print-info 11 "db:queue-write-and-wait count=" count ", item=" item) + (thread-sleep! 0.1) + (mutex-lock! *completed-mutex*) + (for-each (lambda (result) + (if (equal? (cdb:packet-get-query-sig result) qry-sig) + (set! got-it #t))) + *completed-writes*) + (mutex-unlock! *completed-mutex*) + (if (not got-it) + (if (< count 4) ;; give it 3/10 of a second of queue up time + (loop (+ count 1)) + (db:write-cached-data)))) + ;; at the point db:write-cached-data was called either by this call + ;; or by another. Now every 1/100 sec check to see if this query is + ;; at the "head" of the completed queue and pop it off + (let loop () + (thread-sleep! 0.001) + ;; there must always be at least one item in the completed-writes at this point, right? + (mutex-lock! *completed-mutex*) + (set! res (car *completed-writes*)) + (mutex-unlock! *completed-mutex*) + (if (equal? (cdb:packet-get-query-sig res) qry-sig) ;; yay! we are done! + (begin + (mutex-lock! *completed-mutex*) + (set! *completed-writes* (cdr *completed-writes*)) + (mutex-unlock! *completed-mutex*) + res) + (loop))))) + (define (db:process-queue-item db item) (let* ((stmt-key (cdb:packet-get-qtype item)) (qry-sig (cdb:packet-get-query-sig item)) (return-address (cdb:packet-get-client-sig item)) (params (cdb:packet-get-params item)) @@ -1394,12 +1427,13 @@ (if q (car q) #f)))) (debug:print-info 11 "Special queries/requests stmt-key=" stmt-key ", return-address=" return-address ", query=" query ", params=" params) (cond (query ;; transactionize needed here. - - (apply sqlite3:execute db query params) + (case *transport-type* + ((http)(db:queue-write-and-wait db item)) + (else (apply sqlite3:execute db query params))) (server:reply return-address qry-sig #t #t)) ((member stmt-key db:special-queries) (debug:print-info 11 "Handling special statement " stmt-key) (case stmt-key ((immediate)