1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
|
(debug:print-info 7 "Current write queue length is " queue-len)
;; poll for the write to complete, timeout after 10 seconds
;; periodic flushing of the queue is taken care of by
;; db:flush-queue
(let loop ()
(thread-sleep! 0.1)
(mutex-lock! *completed-mutex*)
(if (hash-table-ref/default *completed-writes* qry-sig #f)
(begin
(hash-table-delete! *completed-writes* qry-sig)
(set! got-it #t)))
(mutex-unlock! *completed-mutex*)
(if (and (not got-it)
(< (current-seconds) timeout))
(loop)))
(set! *number-of-writes* (+ *number-of-writes* 1))
(set! *writes-total-delay* (+ *writes-total-delay* 1))
got-it))
(define (db:process-queue-item db item)
(let* ((stmt-key (cdb:packet-get-qtype item))
(qry-sig (cdb:packet-get-query-sig item))
|
|
>
>
|
|
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
|
(debug:print-info 7 "Current write queue length is " queue-len)
;; poll for the write to complete, timeout after 10 seconds
;; periodic flushing of the queue is taken care of by
;; db:flush-queue
(let loop ()
(thread-sleep! 0.002)
(mutex-lock! *completed-mutex*)
(if (hash-table-ref/default *completed-writes* qry-sig #f)
(begin
(hash-table-delete! *completed-writes* qry-sig)
(set! got-it #t)))
(mutex-unlock! *completed-mutex*)
(if (and (not got-it)
(< (current-seconds) timeout))
(begin
(thread-sleep! 0.01)
(loop))))
(set! *number-of-writes* (+ *number-of-writes* 1))
(set! *writes-total-delay* (+ *writes-total-delay* 1))
got-it))
(define (db:process-queue-item db item)
(let* ((stmt-key (cdb:packet-get-qtype item))
(qry-sig (cdb:packet-get-query-sig item))
|