Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -1296,77 +1296,84 @@ (define (db:process-cached-writes db) (let ((queries (make-hash-table)) (data #f)) (mutex-lock! *incoming-mutex*) + ;; data is a list of query packets (length data) 0) - (debug:print-info 4 "Writing cached data " data)) - - ;; Prepare the needed sql statements - ;; - (for-each (lambda (request-item) - (let ((stmt-key (vector-ref request-item 0))) - (if (not (hash-table-ref/default queries stmt-key #f)) - (let ((stmt (alist-ref stmt-key db:queries))) - (if stmt - (hash-table-set! queries stmt-key (sqlite3:prepare db (car stmt))) - (debug:print 0 "ERROR: Missing query spec for " stmt-key "!")))))) - data) - - ;; No outer loop needed. Single loop for write items only. Reads trigger flush of queue - ;; and then are executed. - (sqlite3:with-transaction - db - (lambda () - (debug:print-info 11 "flushing " data " to db") - (for-each - (lambda (hed) - (let ((params (vector-ref hed 2)) - (stmt-key (vector-ref hed 0))) - (debug:print-info 11 "Executing " stmt-key " for " params) - (apply sqlite3:execute (hash-table-ref queries stmt-key) params))) - data))) - - ;; let all the waiting calls know all is done - (mutex-lock! *completed-mutex*) - (for-each (lambda (item) - (let ((qry-sig (cdb:packet-get-client-sig item))) - (hash-table-set! *completed-writes* qry-sig #t))) - data) - (mutex-unlock! *completed-mutex*) - - ;; Finalize the statements. Should this be done inside the mutex above? - ;; I think sqlite3 mutexes will keep the data safe - (for-each (lambda (stmt-key) - (sqlite3:finalize! (hash-table-ref queries stmt-key))) - (hash-table-keys queries)) - - ;; Do a little record keeping - (let ((cache-size (length data))) - (if (> cache-size *max-cache-size*) - (set! *max-cache-size* cache-size))) - #f)) + ;; Process if we have data + (begin + (debug:print-info 7 "Writing cached data " data) + + ;; Prepare the needed sql statements + ;; + (for-each (lambda (request-item) + (let ((stmt-key (vector-ref request-item 0)) + (query (vector-ref request-item 1))) + (hash-table-set! queries stmt-key (sqlite3:prepare db query)))) + data) + + ;; No outer loop needed. Single loop for write items only. Reads trigger flush of queue + ;; and then are executed. + (sqlite3:with-transaction + db + (lambda () + (for-each + (lambda (hed) + (let* ((params (vector-ref hed 2)) + (stmt-key (vector-ref hed 0)) + (stmt (hash-table-ref/default queries stmt-key #f))) + (if stmt + (apply sqlite3:execute stmt params) + (debug:print 0 "ERROR: Problem Executing " stmt-key " for " params)))) + data))) + + ;; let all the waiting calls know all is done + (mutex-lock! *completed-mutex*) + (for-each (lambda (item) + (let ((qry-sig (cdb:packet-get-client-sig item))) + (debug:print-info 7 "Registering query " qry-sig " as done") + (hash-table-set! *completed-writes* qry-sig #t))) + data) + (mutex-unlock! *completed-mutex*) + + ;; Finalize the statements. Should this be done inside the mutex above? + ;; I think sqlite3 mutexes will keep the data safe + (for-each (lambda (stmt-key) + (sqlite3:finalize! (hash-table-ref queries stmt-key))) + (hash-table-keys queries)) + + ;; Do a little record keeping + (let ((cache-size (length data))) + (if (> cache-size *max-cache-size*) + (set! *max-cache-size* cache-size))) + #t) + #f))) (define *db:process-queue-mutex* (make-mutex)) ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; -(define (db:queue-write-and-wait db item) - (let ((res #f) +(define (db:queue-write-and-wait db qry-sig query params) + (let ((queue-len 0) + (res #f) (got-it #f) - (qry-sig (cdb:packet-get-query-sig item)) + (qry-pkt (vector qry-sig query params)) (timeout (+ 10 (current-seconds)))) ;; set the time out to 10 secs in future ;; Put the item in the queue *incoming-writes* (mutex-lock! *incoming-mutex*) - (set! *incoming-writes (cons item *incoming-writes*)) + (set! *incoming-writes* (cons qry-pkt *incoming-writes*)) + (set! queue-len (length *incoming-writes*)) (mutex-unlock! *incoming-mutex*) + + (debug:print-info 7 "Current write queue length is " queue-len) ;; poll for the write to complete, timeout after 10 seconds ;; periodic flushing of the queue is taken care of by ;; db:flush-queue (let loop () @@ -1390,16 +1397,19 @@ (query (let ((q (alist-ref stmt-key db:queries))) (if q (car q) #f)))) (debug:print-info 11 "Special queries/requests stmt-key=" stmt-key ", return-address=" return-address ", query=" query ", params=" params) (if query ;; hand queries off to the write queue - (begin - (case *transport-type* - ((http)(db:queue-write-and-wait db item)) - (else - (apply sqlite3:execute db query params))) - (server:reply return-address qry-sig #t #t)) + (let ((response (case *transport-type* + ((http) + (debug:print-info 7 "Queuing item " item " for wrapped write") + (db:queue-write-and-wait db qry-sig query params)) + (else + (apply sqlite3:execute db query params) + #t)))) + (debug:print-info 7 "Received " response " from wrapped write") + (server:reply return-address qry-sig response response)) ;; otherwise if appropriate flush the queue (this is a read or complex query) (begin (case *transport-type* ((http)(db:process-cached-writes db))) (cond