Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -36,11 +36,11 @@ (include "run_records.scm") ;; timestamp type (val1 val2 ...) ;; type: meta-info, step (define *incoming-writes* '()) -(define *completed-writes* '()) +(define *completed-writes* (make-hash-table)) (define *incoming-last-time* (current-seconds)) (define *incoming-mutex* (make-mutex)) (define *completed-mutex* (make-mutex)) (define *cache-on* #f) @@ -1089,21 +1089,10 @@ ;;====================================================================== ;; QUEUE UP META, TEST STATUS AND STEPS REMOTE ACCESS ;;====================================================================== -;; db:updater is run in a thread to write out the cached data periodically -;; (define (db:updater) -;; (debug:print-info 4 "Starting cache processing") -;; (let loop () -;; (thread-sleep! 10) ;; move save time around to minimize regular collisions? -;; (db:write-cached-data) -;; (loop))) -;; The queue is a list of vectors where the zeroth slot indicates the type of query to -;; apply and the second slot is the time of the query and the third entry is a list of -;; values to be applied -;; ;; NOTE: Can remove the regex and base64 encoding for zmq (define (db:obj->string obj) (case *transport-type* ((fs) obj) ((http) @@ -1303,167 +1292,159 @@ killserver)) ;; not used, intended to indicate to run in calling process (define db:run-local-queries '()) ;; rollup-tests-pass-fail)) -(define (db:write-cached-data) - (open-run-close - (lambda (db . junkparams) - (let ((queries (make-hash-table)) - (data #f)) - (mutex-lock! *incoming-mutex*) - (set! data (reverse *incoming-writes*)) ;; (sort ... (lambda (a b)(< (vector-ref a 1)(vector-ref b 1))))) - (set! *incoming-writes* '()) - (mutex-unlock! *incoming-mutex*) - (if (> (length data) 0) - (debug:print-info 4 "Writing cached data " data)) - ;; prepare the needed statements - (for-each (lambda (request-item) - (let ((stmt-key (vector-ref request-item 0))) - (if (not (hash-table-ref/default queries stmt-key #f)) - (let ((stmt (alist-ref stmt-key db:queries))) - (if stmt - (hash-table-set! queries stmt-key (sqlite3:prepare db (car stmt))) - (debug:print 0 "ERROR: Missing query spec for " stmt-key "!")))))) - data) - ;; No outer loop needed. Single loop for write items only. Reads trigger flush of queue - ;; and then are executed. - (sqlite3:with-transaction - db - (lambda () - (debug:print-info 11 "flushing " data " to db") - (for-each - (lambda (hed) - (let ((params (vector-ref hed 2)) - (stmt-key (vector-ref hed 0))) - (debug:print-info 11 "Executing " stmt-key " for " params) - (apply sqlite3:execute (hash-table-ref queries stmt-key) params))) - data))) - ;; let all the waiting calls know all is done - (mutex-lock! *completed-mutex*) - (set! *completed-writes* (append *completed-writes* data)) - (mutex-unlock! *completed-mutex*) - ;; finalize the statements - (for-each (lambda (stmt-key) - (sqlite3:finalize! (hash-table-ref queries stmt-key))) - (hash-table-keys queries)) - ;; keep a little chest thumping data around - (let ((cache-size (length data))) - (if (> cache-size *max-cache-size*) - (set! *max-cache-size* cache-size))) - )) - #f)) - +(define (db:process-cached-writes db) + (let ((queries (make-hash-table)) + (data #f)) + (mutex-lock! *incoming-mutex*) + (set! data (reverse *incoming-writes*)) ;; (sort ... (lambda (a b)(< (vector-ref a 1)(vector-ref b 1))))) + (set! *incoming-writes* '()) + (mutex-unlock! *incoming-mutex*) + (if (> (length data) 0) + (debug:print-info 4 "Writing cached data " data)) + + ;; Prepare the needed sql statements + ;; + (for-each (lambda (request-item) + (let ((stmt-key (vector-ref request-item 0))) + (if (not (hash-table-ref/default queries stmt-key #f)) + (let ((stmt (alist-ref stmt-key db:queries))) + (if stmt + (hash-table-set! queries stmt-key (sqlite3:prepare db (car stmt))) + (debug:print 0 "ERROR: Missing query spec for " stmt-key "!")))))) + data) + + ;; No outer loop needed. Single loop for write items only. Reads trigger flush of queue + ;; and then are executed. + (sqlite3:with-transaction + db + (lambda () + (debug:print-info 11 "flushing " data " to db") + (for-each + (lambda (hed) + (let ((params (vector-ref hed 2)) + (stmt-key (vector-ref hed 0))) + (debug:print-info 11 "Executing " stmt-key " for " params) + (apply sqlite3:execute (hash-table-ref queries stmt-key) params))) + data))) + + ;; let all the waiting calls know all is done + (mutex-lock! *completed-mutex*) + (for-each (lambda (item) + (let ((qry-sig (cdb:packet-get-client-sig item))) + (hash-table-set! *completed-writes* qry-sig #t))) + data) + (mutex-unlock! *completed-mutex*) + + ;; Finalize the statements. Should this be done inside the mutex above? + ;; I think sqlite3 mutexes will keep the data safe + (for-each (lambda (stmt-key) + (sqlite3:finalize! (hash-table-ref queries stmt-key))) + (hash-table-keys queries)) + + ;; Do a little record keeping + (let ((cache-size (length data))) + (if (> cache-size *max-cache-size*) + (set! *max-cache-size* cache-size))) + #f)) + +(define *db:process-queue-mutex* (make-mutex)) ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; -;; (define (db:process-queue db pubsock indata) -;; (let* ((data (sort indata (lambda (a b) -;; (< (cdb:packet-get-qtime a)(cdb:packet-get-qtime b)))))) -;; (for-each -;; (lambda (item) -;; (db:process-queue-item db pubsock item)) -;; data))) - (define (db:queue-write-and-wait db item) - (let ((res #f) - (got-it #f) - (qry-sig (cdb:packet-get-query-sig item))) + (let ((res #f) + (got-it #f) + (qry-sig (cdb:packet-get-query-sig item)) + (timeout (+ 10 (current-seconds)))) ;; set the time out to 10 secs in future + + ;; Put the item in the queue *incoming-writes* (mutex-lock! *incoming-mutex*) (set! *incoming-writes (cons item *incoming-writes*)) (mutex-unlock! *incoming-mutex*) - ;; let the queue build three times, look for processed - ;; item. - (let loop ((count 0)) - (debug:print-info 11 "db:queue-write-and-wait count=" count ", item=" item) + + ;; poll for the write to complete, timeout after 10 seconds + ;; periodic flushing of the queue is taken care of by + ;; db:flush-queue + (let loop () (thread-sleep! 0.1) (mutex-lock! *completed-mutex*) - (for-each (lambda (result) - (if (equal? (cdb:packet-get-query-sig result) qry-sig) - (set! got-it #t))) - *completed-writes*) - (mutex-unlock! *completed-mutex*) - (if (not got-it) - (if (< count 4) ;; give it 3/10 of a second of queue up time - (loop (+ count 1)) - (db:write-cached-data)))) - ;; at the point db:write-cached-data was called either by this call - ;; or by another. Now every 1/100 sec check to see if this query is - ;; at the "head" of the completed queue and pop it off - (let loop () - (thread-sleep! 0.001) - ;; there must always be at least one item in the completed-writes at this point, right? - (mutex-lock! *completed-mutex*) - (set! res (car *completed-writes*)) - (mutex-unlock! *completed-mutex*) - (if (equal? (cdb:packet-get-query-sig res) qry-sig) ;; yay! we are done! - (begin - (mutex-lock! *completed-mutex*) - (set! *completed-writes* (cdr *completed-writes*)) - (mutex-unlock! *completed-mutex*) - res) - (loop))))) + (if (hash-table-ref/default *completed-writes* qry-sig #f) + (begin + (hash-table-delete! *completed-writes* qry-sig) + (set! got-it #t))) + (mutex-unlock! *completed-mutex*) + (if (and (not got-it) + (< (current-seconds) timeout)) + (loop))) + got-it)) (define (db:process-queue-item db item) (let* ((stmt-key (cdb:packet-get-qtype item)) (qry-sig (cdb:packet-get-query-sig item)) (return-address (cdb:packet-get-client-sig item)) (params (cdb:packet-get-params item)) (query (let ((q (alist-ref stmt-key db:queries))) (if q (car q) #f)))) (debug:print-info 11 "Special queries/requests stmt-key=" stmt-key ", return-address=" return-address ", query=" query ", params=" params) - (cond - (query - ;; transactionize needed here. - ;; (case *transport-type* - ;; ((http)(db:queue-write-and-wait db item)) - ;; (else - (apply sqlite3:execute db query params) - ;; )) - (server:reply return-address qry-sig #t #t)) - ((member stmt-key db:special-queries) - (debug:print-info 11 "Handling special statement " stmt-key) - (case stmt-key - ((immediate) - (let ((proc (car params)) - (remparams (cdr params))) - ;; we are being handed a procedure so call it - (debug:print-info 11 "Running (apply " proc " " remparams ")") - (server:reply return-address qry-sig #t (apply proc remparams)))) - ((login) - (if (< (length params) 3) ;; should get toppath, version and signature - (server:reply return-address qry-sig '(#f "login failed due to missing params")) ;; missing params - (let ((calling-path (car params)) - (calling-vers (cadr params)) - (client-key (caddr params))) - (if (and (equal? calling-path *toppath*) - (equal? megatest-version calling-vers)) - (begin - (hash-table-set! *logged-in-clients* client-key (current-seconds)) - (server:reply return-address qry-sig #t '(#t "successful login"))) ;; path matches - pass! Should vet the caller at this time ... - (server:reply return-address qry-sig #f (list #f (conc "Login failed due to mismatch paths: " calling-path ", " *toppath*))))))) - ((flush sync) - (server:reply return-address qry-sig #t 1)) ;; (length data))) - ((set-verbosity) - (set! *verbosity* (car params)) - (server:reply return-address qry-sig #t '(#t *verbosity*))) - ((killserver) - (debug:print 0 "WARNING: Server going down in 15 seconds by user request!") - (open-run-close tasks:server-deregister tasks:open-db - (car *runremote*) - pullport: (cadr *runremote*)) - (thread-start! (make-thread (lambda ()(thread-sleep! 15)(exit)))) - (server:reply return-address qry-sig #t '(#t "exit process started"))) - (else ;; not a command, i.e. is a query - (debug:print 0 "ERROR: Unrecognised query/command " stmt-key) - (server:reply pubsock return-address qry-sig #f 'failed)))) - (else - (debug:print-info 11 "Executing " stmt-key " for " params) - (apply sqlite3:execute (hash-table-ref queries stmt-key) params) - (server:reply return-address qry-sig #t #t))))) + (if query + ;; hand queries off to the write queue + (begin + (case *transport-type* + ((http)(db:queue-write-and-wait db item)) + (else + (apply sqlite3:execute db query params))) + (server:reply return-address qry-sig #t #t)) + ;; otherwise if appropriate flush the queue (this is a read or complex query) + (begin + (case *transport-type* + ((http)(db:process-cached-writes db))) + (cond + ((member stmt-key db:special-queries) + (debug:print-info 11 "Handling special statement " stmt-key) + (case stmt-key + ((immediate) + (let ((proc (car params)) + (remparams (cdr params))) + ;; we are being handed a procedure so call it + (debug:print-info 11 "Running (apply " proc " " remparams ")") + (server:reply return-address qry-sig #t (apply proc remparams)))) + ((login) + (if (< (length params) 3) ;; should get toppath, version and signature + (server:reply return-address qry-sig '(#f "login failed due to missing params")) ;; missing params + (let ((calling-path (car params)) + (calling-vers (cadr params)) + (client-key (caddr params))) + (if (and (equal? calling-path *toppath*) + (equal? megatest-version calling-vers)) + (begin + (hash-table-set! *logged-in-clients* client-key (current-seconds)) + (server:reply return-address qry-sig #t '(#t "successful login"))) ;; path matches - pass! Should vet the caller at this time ... + (server:reply return-address qry-sig #f (list #f (conc "Login failed due to mismatch paths: " calling-path ", " *toppath*))))))) + ((flush sync) + (server:reply return-address qry-sig #t 1)) ;; (length data))) + ((set-verbosity) + (set! *verbosity* (car params)) + (server:reply return-address qry-sig #t '(#t *verbosity*))) + ((killserver) + (debug:print 0 "WARNING: Server going down in 15 seconds by user request!") + (open-run-close tasks:server-deregister tasks:open-db + (car *runremote*) + pullport: (cadr *runremote*)) + (thread-start! (make-thread (lambda ()(thread-sleep! 15)(exit)))) + (server:reply return-address qry-sig #t '(#t "exit process started"))) + (else ;; not a command, i.e. is a query + (debug:print 0 "ERROR: Unrecognised query/command " stmt-key) + (server:reply pubsock return-address qry-sig #f 'failed)))) + (else + (debug:print-info 11 "Executing " stmt-key " for " params) + (apply sqlite3:execute (hash-table-ref queries stmt-key) params) + (server:reply return-address qry-sig #t #t))))))) (define (db:test-get-records-for-index-file db run-id test-name) (let ((res '())) (sqlite3:for-each-row (lambda (id itempath state status run_duration logf comment) Index: http-transport.scm ================================================================== --- http-transport.scm +++ http-transport.scm @@ -273,13 +273,15 @@ (let* ((th2 (make-thread (lambda () (http-transport:run (if (args:get-arg "-server") (args:get-arg "-server") "-"))) "Server run")) - (th3 (make-thread (lambda ()(http-transport:keep-running)) "Keep running"))) + (th3 (make-thread http-transport:keep-running "Keep running")) + (th1 (make-thread server:write-queue-handler "write queue"))) (thread-start! th2) (thread-start! th3) + (thread-start! th1) (set! *didsomething* #t) (thread-join! th2)) (debug:print 0 "ERROR: Failed to setup for megatest"))) (exit))) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -39,12 +39,10 @@ ;;====================================================================== ;; Call this to start the actual server ;; -(define *db:process-queue-mutex* (make-mutex)) - ;; all routes though here end in exit ... (define (server:launch transport) (if (not *toppath*) (if (not (setup-for-run)) (begin @@ -58,20 +56,39 @@ ((zmq) (zmq-transport:launch)) (else (debug:print "WARNING: unrecognised transport " transport) (exit)))) +;;====================================================================== +;; Q U E U E M A N A G E M E N T +;;====================================================================== + +;; Flush the queue every third of a second. Can we assume that setup-for-run +;; has already been done? +(define (server:write-queue-handler) + (if (setup-for-run) + (let ((db (open-db))) + (let loop () + (db:process-cached-writes db) + (thread-sleep! 0.3) + (loop))) + (begin + (debug:print 0 "ERROR: failed to setup for Megatest in server:write-queue-handler") + (exit 1)))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +;; Generate a unique signature for this server (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) (argv))))))) -;;====================================================================== -;; S E R V E R U T I L I T I E S -;;====================================================================== ;; When using zmq this would send the message back (two step process) ;; with spiffy or rpc this simply returns the return data to be returned ;; (define (server:reply return-addr query-sig success/fail result) Index: synchash.scm ================================================================== --- synchash.scm +++ synchash.scm @@ -81,10 +81,12 @@ newdat) (for-each (lambda (id) (hash-table-delete! myhash id)) removs) + ;; WHICH ONE!? + ;; data)) ;; return the changed and deleted list (list newdat removs))) ;; synchash)) (define *synchashes* (make-hash-table)) (define (synchash:server-get db proc synckey keynum . params) Index: tests/Makefile ================================================================== --- tests/Makefile +++ tests/Makefile @@ -19,11 +19,11 @@ TARGET = "-target ubuntu/nfs/none" all : test1 test2 test3 test4 test5 server : - (cd ..;make install) && \ + (cd ..;make;make install) && \ (cd fullrun;../../bin/megatest -server - -debug 22) & test0 : cleanprep cd simplerun ; $(MEGATEST) -server - -debug $(DEBUG)& @@ -64,11 +64,11 @@ cd fullrun;$(MEGATEST) -rollup :runname newrun -target ubuntu/nfs/none -debug 10 cleanprep : ../*.scm Makefile */*.config mkdir -p /tmp/mt_runs /tmp/mt_links - cd ..;make install + cd ..;make;make install rm -f */logging.db touch cleanprep fullprep : cleanprep cd fullrun;$(MEGATEST) -remove-runs :runname $(RUNNAME)% -target %/%/% -testpatt %/%