Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -1254,104 +1254,70 @@ ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; -(define (db:process-queue pubsock indata) - (open-run-close - (lambda (db . junkparams) - (let* ((queries (make-hash-table)) - (data (sort indata (lambda (a b) - (< (cdb:packet-get-qtime a)(cdb:packet-get-qtime b)))))) - (for-each - (lambda (special-qry) - (let* ((stmt-key (cdb:packet-get-qtype special-qry)) - (qry-sig (cdb:packet-get-query-sig special-qry)) - (return-address (cdb:packet-get-client-sig special-qry)) - (qry (hash-table-ref/default queries stmt-key #f)) - (params (cdb:packet-get-params special-qry))) - (debug:print-info 11 "Special queries/requests stmt-key=" stmt-key ", return-address=" return-address ", qry=" qry ", params=" params) - (cond - ;; Special queries - ((string? qry) - (apply sqlite3:execute db qry params) - (server:reply pubsock return-address qry-sig #t #t)) - ;; ((and (not (null? params)) - ;; (procedure? (car params))) - ;; (let ((proc (car params)) - ;; (remparams (cdr params))) - ;; ;; we are being handed a procedure so call it - ;; (debug:print-info 11 "Running (apply " proc " " db " " remparams ")") - ;; (server:reply pubsock return-address (apply proc db remparams)))) - - (else - (case stmt-key - ((immediate) - (let ((proc (car params)) - (remparams (cdr params))) - ;; we are being handed a procedure so call it - (debug:print-info 11 "Running (apply " proc " " remparams ")") - (server:reply pubsock return-address qry-sig #t (apply proc remparams)))) - ((login) - (if (< (length params) 3) ;; should get toppath, version and signature - '(#f "login failed due to missing params") ;; missing params - (let ((calling-path (car params)) - (calling-vers (cadr params)) - (client-key (caddr params))) - (if (and (equal? calling-path *toppath*) - (equal? megatest-version calling-vers)) - (begin - (hash-table-set! *logged-in-clients* client-key (current-seconds)) - (server:reply pubsock return-address qry-sig #t '(#t "successful login"))) ;; path matches - pass! Should vet the caller at this time ... - (list #f (conc "Login failed due to mismatch paths: " calling-path ", " *toppath*)))))) - ((flush sync) - (server:reply pubsock return-address qry-sig #t (length data))) - ((set-verbosity) - (set! *verbosity* (car params)) - (server:reply pubsock return-address qry-sig #t '(#t *verbosity*))) - ((killserver) - (debug:print 0 "WARNING: Server going down in 15 seconds by user request!") - (open-run-close tasks:server-deregister tasks:open-db - (cadr *server-info*) - pullport: (caddr *server-info*)) - (thread-start! (make-thread (lambda ()(thread-sleep! 15)(exit)))) - (server:reply pubsock return-address qry-sig #t '(#t "exit process started"))) - (let ((params (cdb:packet-get-params hed)) - (return-address (cdb:packet-get-client-sig hed)) - (qry-sig (cdb:packet-get-query-sig hed)) - (stmt-key (cdb:packet-get-qtype hed))) - (if (or (not (hash-table-ref/default queries stmt-key #f)) - (member stmt-key db:special-queries)) - (begin - (debug:print-info 11 "Handling special statement " stmt-key) - (cons hed tal)) - (begin - (debug:print-info 11 "Executing " stmt-key " for " params) - (apply sqlite3:execute (hash-table-ref queries stmt-key) params) - (server:reply pubsock return-address qry-sig #t #t) - (if (not (null? tal)) - (innerloop (car tal)(cdr tal)) - '())) - )))))))) - - (else - (debug:print 0 "ERROR: Unrecognised queued call " qry " " params) - (server:reply pubsock return-address qry-sig #f #t)) - - - - - (if (not (null? rem)) - (outerloop (car rem)(cdr rem)))))) - (for-each (lambda (stmt-key) - (sqlite3:finalize! (hash-table-ref queries stmt-key))) - (hash-table-keys queries)) - (let ((cache-size (length data))) - (if (> cache-size *max-cache-size*) - (set! *max-cache-size* cache-size))) - )) - #f)) +(define (db:process-queue db pubsock indata) + (let* ((data (sort indata (lambda (a b) + (< (cdb:packet-get-qtime a)(cdb:packet-get-qtime b)))))) + (for-each + (lambda (item) + (db:process-queue-item db pubsock item)) + data))) + +(define (db:process-queue-item db pubsock item) + (let* ((stmt-key (cdb:packet-get-qtype item)) + (qry-sig (cdb:packet-get-query-sig item)) + (return-address (cdb:packet-get-client-sig item)) + (params (cdb:packet-get-params item)) + (query (let ((q (alist-ref stmt-key db:queries))) + (if q (car q) #f)))) + (debug:print-info 11 "Special queries/requests stmt-key=" stmt-key ", return-address=" return-address ", qrery=" query ", params=" params) + (cond + (query + (apply sqlite3:execute db query params) + (server:reply pubsock return-address qry-sig #t #t)) + ((member stmt-key db:special-queries) + (debug:print-info 11 "Handling special statement " stmt-key) + (case stmt-key + ((immediate) + (let ((proc (car params)) + (remparams (cdr params))) + ;; we are being handed a procedure so call it + (debug:print-info 11 "Running (apply " proc " " remparams ")") + (server:reply pubsock return-address qry-sig #t (apply proc remparams)))) + ((login) + (if (< (length params) 3) ;; should get toppath, version and signature + '(#f "login failed due to missing params") ;; missing params + (let ((calling-path (car params)) + (calling-vers (cadr params)) + (client-key (caddr params))) + (if (and (equal? calling-path *toppath*) + (equal? megatest-version calling-vers)) + (begin + (hash-table-set! *logged-in-clients* client-key (current-seconds)) + (server:reply pubsock return-address qry-sig #t '(#t "successful login"))) ;; path matches - pass! Should vet the caller at this time ... + (list #f (conc "Login failed due to mismatch paths: " calling-path ", " *toppath*)))))) + ((flush sync) + (server:reply pubsock return-address qry-sig #t 1)) ;; (length data))) + ((set-verbosity) + (set! *verbosity* (car params)) + (server:reply pubsock return-address qry-sig #t '(#t *verbosity*))) + ((killserver) + (debug:print 0 "WARNING: Server going down in 15 seconds by user request!") + (open-run-close tasks:server-deregister tasks:open-db + (cadr *server-info*) + pullport: (caddr *server-info*)) + (thread-start! (make-thread (lambda ()(thread-sleep! 15)(exit)))) + (server:reply pubsock return-address qry-sig #t '(#t "exit process started"))) + (else ;; not a command, i.e. is a query + (debug:print 0 "ERROR: Unrecognised query/command " stmt-key) + (server:reply pubsock return-address qry-sig #f 'failed)))) + (else + (debug:print-info 11 "Executing " stmt-key " for " params) + (apply sqlite3:execute (hash-table-ref queries stmt-key) params) + (server:reply pubsock return-address qry-sig #t #t))))) (define (db:test-get-records-for-index-file db run-id test-name) (let ((res '())) (sqlite3:for-each-row (lambda (id itempath state status run_duration logf comment) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -116,23 +116,23 @@ (set! *cache-on* #t) ;; what to do when we quit ;; - (on-exit (lambda () - (if (and *toppath* *server-info*) - (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*)) - (let loop () - (let ((queue-len 0)) - (thread-sleep! (random 5)) - (mutex-lock! *incoming-mutex*) - (set! queue-len (length *incoming-data*)) - (mutex-unlock! *incoming-mutex*) - (if (> queue-len 0) - (begin - (debug:print-info 0 "Queue not flushed, waiting ...") - (loop)))))))) +;; (on-exit (lambda () +;; (if (and *toppath* *server-info*) +;; (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*)) +;; (let loop () +;; (let ((queue-len 0)) +;; (thread-sleep! (random 5)) +;; (mutex-lock! *incoming-mutex*) +;; (set! queue-len (length *incoming-data*)) +;; (mutex-unlock! *incoming-mutex*) +;; (if (> queue-len 0) +;; (begin +;; (debug:print-info 0 "Queue not flushed, waiting ...") +;; (loop)))))))) ;; The heavy lifting ;; ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime ;; @@ -140,11 +140,11 @@ (let* ((rawmsg (receive-message* pull-socket)) (packet (db:string->obj rawmsg))) (debug:print-info 12 "server=> received packet=" packet) (if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue (begin - (db:process-queue pub-socket (cons packet queue-lst)) + (open-run-close db:process-queue #f pub-socket (cons packet queue-lst)) (loop '())) (loop (cons packet queue-lst))))))) (define (server:reply pubsock target query-sig success/fail result) (debug:print-info 11 "server:reply target=" target ", result=" result) @@ -240,10 +240,15 @@ (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) (argv))))))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + ;;====================================================================== ;; C L I E N T S ;;====================================================================== @@ -364,16 +369,16 @@ ;; (server:self-ping server-info) ;; )) ;; "Self ping")) (th2 (make-thread (lambda () (server:run (args:get-arg "-server"))) "Server run")) - (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) + ;; (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) ) (set! *client-non-blocking-mode* #t) ;; (thread-start! th1) (thread-start! th2) - (thread-start! th3) + ;; (thread-start! th3) (set! *didsomething* #t) ;; (thread-join! th3) (thread-join! th2) ) (debug:print 0 "ERROR: Failed to setup for megatest")))