@@ -14,11 +14,11 @@ ;;====================================================================== (require-extension (srfi 18) extras tcp rpc) (import (prefix rpc rpc:)) -(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml) +(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n zmq) (import (prefix sqlite3 sqlite3:)) (declare (unit db)) (declare (uses common)) (declare (uses keys)) @@ -53,10 +53,11 @@ (begin (debug:print-info 11 "db:set-sync, setting pragma synchronous to " val) (sqlite3:execute db (conc "PRAGMA synchronous = '" val "';")))))) (define (open-db) ;; (conc *toppath* "/megatest.db") (car *configinfo*))) + (if (not *toppath*)(setup-for-run)) (let* ((dbpath (conc *toppath* "/megatest.db")) ;; fname) (dbexists (file-exists? dbpath)) (db (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath)) (handler (make-busy-timeout (if (args:get-arg "-override-timeout") (string->number (args:get-arg "-override-timeout")) @@ -442,18 +443,22 @@ 2)) (if (> (abs (- *last-global-delta-printed* *global-delta*)) 0.08) ;; don't print all the time, only if it changes a bit (begin (debug:print-info 4 "launch throttle factor=" *global-delta*) (set! *last-global-delta-printed* *global-delta*))) - (debug:print-info 11 "db:get-var END " var) + (debug:print-info 11 "db:get-var END " var " val=" res) res)) (define (db:set-var db var val) (debug:print-info 11 "db:set-var START " var " " val) (sqlite3:execute db "INSERT OR REPLACE INTO metadat (var,val) VALUES (?,?);" var val) - (debug:print-info 11 "db:set-var END " var " " val) -) + (debug:print-info 11 "db:set-var END " var " " val)) + +(define (db:del-var db var) + (debug:print-info 11 "db:del-var START " var) + (sqlite3:execute db "DELETE FROM metadat WHERE var=?;" var) + (debug:print-info 11 "db:del-var END " var)) ;; use a global for some primitive caching, it is just silly to re-read the db ;; over and over again for the keys since they never change (define (db:get-keys db) @@ -1073,14 +1078,11 @@ t.comment t.event_time t.fail_count t.pass_count t.archived - - - - FROM tests AS t INNER JOIN runs AS r ON t.run_id=r.id WHERE " + FROM tests AS t INNER JOIN runs AS r ON t.run_id=r.id WHERE " keystr " AND r.runname LIKE '" runname "' AND item_path LIKE '" itempatt "' AND testname LIKE '" testpatt "' AND t.state LIKE '" statepatt "' AND t.status LIKE '" statuspatt "'ORDER BY t.event_time ASC;"))) (debug:print 3 "qrystr: " qrystr) (sqlite3:for-each-row @@ -1095,20 +1097,21 @@ ;;====================================================================== ;; db:updater is run in a thread to write out the cached data periodically (define (db:updater) (debug:print-info 4 "Starting cache processing") - (let loop ((start-time (current-time))) + (let loop () (thread-sleep! 10) ;; move save time around to minimize regular collisions? (db:write-cached-data) - (loop start-time))) + (loop))) ;; cdb:cached-access is called by the server loop to dispatch commands or queue up ;; db accesses ;; ;; params := qry-name cached? val1 val2 val3 ... (define (cdb:cached-access params) + (debug:print-info 12 "cdb:cached-access params=" params) (if (< (length params) 2) "ERROR" (let ((qry-name (car params)) (cached? (cadr params)) (remparam (list-tail params 2))) @@ -1122,11 +1125,12 @@ (let ((calling-path (car remparam))) (if (equal? calling-path *toppath*) #t ;; path matches - pass! Should vet the caller at this time ... #f)))) ;; else fail to login ((flush) - ( + (db:write-cached-data) + #t) (else (mutex-lock! *incoming-mutex*) (set! *last-db-access* (current-seconds)) (set! *incoming-data* (cons (vector qry-name @@ -1143,37 +1147,29 @@ "CACHED") (begin (db:write-cached-data) "WRITTEN"))))))) +(define (db:obj->string obj)(with-output-to-string (lambda ()(serialize obj)))) +(define (db:string->obj msg)(with-input-from-string msg (lambda ()(deserialize)))) + (define (cdb:client-call zmq-socket . params) - (debug:print-info 11 "zmq-socket " params) - (let ((zdat (with-output-to-string (lambda ()(serialize params)))) + (debug:print-info 11 "cdb:client-call zmq-socket=" zmq-socket " params=" params) + (let ((zdat (db:obj->string params)) ;; (with-output-to-string (lambda ()(serialize params)))) (res #f)) + (print "cdb:client-call before send message") (send-message zmq-socket zdat) - (set! res (receive-message zdat)) + (print "cdb:client-call after send message") + (set! res (db:string->obj (receive-message zmq-socket zdat))) (debug:print-info 11 "zmq-socket " (car params) " res=" res) res)) -(define (cdb:test-set-status-state test-id status state msg) - (debug:print-info 4 "cdb:test-set-status-state test-id=" test-id ", status=" status ", state=" state ", msg=" msg) - (mutex-lock! *incoming-mutex*) - (set! *last-db-access* (current-seconds)) +(define (cdb:test-set-status-state zmqsocket test-id status state msg) (if msg - (set! *incoming-data* (cons (vector 'state-status-msg - (current-milliseconds) - (list state status msg test-id)) - *incoming-data*)) - (set! *incoming-data* (cons (vector 'state-status - (current-milliseconds) - (list state status test-id)) ;; run-id test-name item-path minutes cpuload diskfree tmpfree) - *incoming-data*))) - (mutex-unlock! *incoming-mutex*) - (if *cache-on* - (debug:print-info 6 "*cache-on* is " *cache-on* ", skipping cache write") - (db:write-cached-data))) - + (cdb:client-call zmqsocket 'state-status-msg state status msg test-id) + (cdb:client-call zmqsocket 'state-status state status test-id))) ;; run-id test-name item-path minutes cpuload diskfree tmpfree) + (define (cdb:test-rollup-test_data-pass-fail zmqsocket test-id) (cdb:client-call zmqsocket 'test_data-pf-rollup #t test-id test-id test-id)) (define (cdb:pass-fail-counts zmqsocket test-id fail-count pass-count) (cdb:client-call zmqsocket 'pass-fail-counts fail-count pass-count test-id)) @@ -1250,11 +1246,23 @@ #f)) (define cdb:flush-queue db:write-cached-data) (define (db:roll-up-pass-fail-counts db run-id test-name item-path status) - (rdb:flush-queue) + + + + + + ;; NEEDED!? + ;; (rdb:flush-queue) + + + + + + (if (and (not (equal? item-path "")) (or (equal? status "PASS") (equal? status "WARN") (equal? status "FAIL") (equal? status "WAIVED") @@ -1722,55 +1730,48 @@ ;;====================================================================== ;; REMOTE DB ACCESS VIA RPC ;;====================================================================== -(define (rdb:open-run-close procname . remargs) - (if *runremote* - (let ((host (vector-ref *runremote* 0)) - (port (vector-ref *runremote* 1))) - (apply (rpc:procedure 'rdb:open-run-close host port) procname remargs)) - (apply open-run-close (eval procname) remargs))) - -(define (rdb:test-set-status-state test-id status state msg) - (if *runremote* - (let ((host (vector-ref *runremote* 0)) - (port (vector-ref *runremote* 1))) - (handle-exceptions - exn - (begin - (debug:print 0 "EXCEPTION: rpc call failed?") - (debug:print 0 " " ((condition-property-accessor 'exn 'message) exn)) - (print-call-chain) - (cdb:test-set-status-state test-id status state msg)) - ((rpc:procedure 'cdb:test-set-status-state host port) test-id status state msg))) - (cdb:test-set-status-state test-id status state msg))) - -(define (rdb:test-rollup-test_data-pass-fail test-id) - (if *runremote* - (let ((host (vector-ref *runremote* 0)) - (port (vector-ref *runremote* 1))) - ((rpc:procedure 'cdb:test-rollup-test_data-pass-fail host port) test-id)) - (cdb:test-rollup-test_data-pass-fail test-id))) - -(define (rdb:pass-fail-counts test-id fail-count pass-count) - (if *runremote* - (let ((host (vector-ref *runremote* 0)) - (port (vector-ref *runremote* 1))) - ((rpc:procedure 'cdb:pass-fail-counts host port) test-id fail-count pass-count)) - (cdb:pass-fail-counts test-id fail-count pass-count))) - -;; currently forces a flush of the queue -(define (rdb:tests-register-test db run-id test-name item-path) - (if *runremote* - (let ((host (vector-ref *runremote* 0)) - (port (vector-ref *runremote* 1))) - ((rpc:procedure 'cdb:tests-register-test host port) db run-id test-name item-path force-write: #t)) - (cdb:tests-register-test db run-id test-name item-path force-write: #t))) - -(define (rdb:flush-queue) - (if *runremote* - (let ((host (vector-ref *runremote* 0)) - (port (vector-ref *runremote* 1))) - ((rpc:procedure 'cdb:flush-queue host port))) - (cdb:flush-queue))) - +;; (define (rdb:test-set-status-state test-id status state msg) +;; (if *runremote* +;; (let ((host (vector-ref *runremote* 0)) +;; (port (vector-ref *runremote* 1))) +;; (handle-exceptions +;; exn +;; (begin +;; (debug:print 0 "EXCEPTION: rpc call failed?") +;; (debug:print 0 " " ((condition-property-accessor 'exn 'message) exn)) +;; (print-call-chain) +;; (cdb:test-set-status-state test-id status state msg)) +;; ((rpc:procedure 'cdb:test-set-status-state host port) test-id status state msg))) +;; (cdb:test-set-status-state test-id status state msg))) +;; +;; (define (rdb:test-rollup-test_data-pass-fail test-id) +;; (if *runremote* +;; (let ((host (vector-ref *runremote* 0)) +;; (port (vector-ref *runremote* 1))) +;; ((rpc:procedure 'cdb:test-rollup-test_data-pass-fail host port) test-id)) +;; (cdb:test-rollup-test_data-pass-fail test-id))) +;; +;; (define (rdb:pass-fail-counts test-id fail-count pass-count) +;; (if *runremote* +;; (let ((host (vector-ref *runremote* 0)) +;; (port (vector-ref *runremote* 1))) +;; ((rpc:procedure 'cdb:pass-fail-counts host port) test-id fail-count pass-count)) +;; (cdb:pass-fail-counts test-id fail-count pass-count))) +;; +;; ;; currently forces a flush of the queue +;; (define (rdb:tests-register-test db run-id test-name item-path) +;; (if *runremote* +;; (let ((host (vector-ref *runremote* 0)) +;; (port (vector-ref *runremote* 1))) +;; ((rpc:procedure 'cdb:tests-register-test host port) db run-id test-name item-path force-write: #t)) +;; (cdb:tests-register-test db run-id test-name item-path force-write: #t))) +;; +;; (define (rdb:flush-queue) +;; (if *runremote* +;; (let ((host (vector-ref *runremote* 0)) +;; (port (vector-ref *runremote* 1))) +;; ((rpc:procedure 'cdb:flush-queue host port))) +;; (cdb:flush-queue))) +;;