Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -1298,10 +1298,11 @@ (let ((queries (make-hash-table)) (data #f)) (mutex-lock! *incoming-mutex*) ;; data is a list of query packets (length data) 0) ;; Process if we have data (begin @@ -1351,10 +1352,13 @@ (set! *max-cache-size* cache-size))) #t) #f))) (define *db:process-queue-mutex* (make-mutex)) + +(define *number-of-writes* 0) +(define *writes-total-delay* 0) ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; @@ -1361,10 +1365,11 @@ (define (db:queue-write-and-wait db qry-sig query params) (let ((queue-len 0) (res #f) (got-it #f) (qry-pkt (vector qry-sig query params)) + (start-time (current-milliseconds)) (timeout (+ 10 (current-seconds)))) ;; set the time out to 10 secs in future ;; Put the item in the queue *incoming-writes* (mutex-lock! *incoming-mutex*) (set! *incoming-writes* (cons qry-pkt *incoming-writes*)) @@ -1385,10 +1390,12 @@ (set! got-it #t))) (mutex-unlock! *completed-mutex*) (if (and (not got-it) (< (current-seconds) timeout)) (loop))) + (set! *number-of-writes* (+ *number-of-writes* 1)) + (set! *writes-total-delay* (+ *writes-total-delay* 1)) got-it)) (define (db:process-queue-item db item) (let* ((stmt-key (cdb:packet-get-qtype item)) (qry-sig (cdb:packet-get-query-sig item)) Index: http-transport.scm ================================================================== --- http-transport.scm +++ http-transport.scm @@ -250,11 +250,13 @@ (debug:print-info 0 "Starting to shutdown the server.") ;; need to delete only *my* server entry (future use) (set! *time-to-exit* #t) (tasks:server-deregister-self tdb (get-host-name)) (thread-sleep! 1) - (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Number of cached writes " *number-of-writes*) + (debug:print-info 0 "Average cached write time " (/ *writes-total-delay* *number-of-writes*) " ms") (debug:print-info 0 "Server shutdown complete. Exiting") (exit))))))) ;; all routes though here end in exit ... (define (http-transport:launch) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -59,21 +59,30 @@ (exit)))) ;;====================================================================== ;; Q U E U E M A N A G E M E N T ;;====================================================================== + +;; We don't want to flush the queue if it was just flushed +(define *server:last-write-flush* (current-milliseconds)) ;; Flush the queue every third of a second. Can we assume that setup-for-run ;; has already been done? (define (server:write-queue-handler) (if (setup-for-run) (let ((db (open-db))) (let loop () - (mutex-lock! *db:process-queue-mutex*) - (db:process-cached-writes db) - (mutex-unlock! *db:process-queue-mutex*) - (thread-sleep! 0.3) + (let ((last-write-flush-time #f)) + (mutex-lock! *incoming-mutex*) + (set! last-write-flush-time *server:last-write-flush*) + (mutex-unlock! *incoming-mutex*) + (if (> (- (current-milliseconds) last-write-flush-time) 400) + (begin + (mutex-lock! *db:process-queue-mutex*) + (db:process-cached-writes db) + (mutex-unlock! *db:process-queue-mutex*) + (thread-sleep! 0.5)))) (loop))) (begin (debug:print 0 "ERROR: failed to setup for Megatest in server:write-queue-handler") (exit 1))))