Index: common_records.scm ================================================================== --- common_records.scm +++ common_records.scm @@ -56,12 +56,12 @@ (define (debug:print-info n . params) (if (debug:debug-mode n) (with-output-to-port (current-error-port) (lambda () - (let ((res (format#format #f "INFO:~2d ~a" n (apply conc params)))) - (print res) + (let ((res #f));; (format#format #f "INFO:~2d ~a" n (apply conc params)))) + (apply print "INFO: (" n ") " params) ;; res) (if *logging* (db:log-event res))))))) ;; if a value is printable (i.e. string or number) return the value ;; else return an empty string (define-inline (printable val) Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -14,11 +14,11 @@ ;;====================================================================== (require-extension (srfi 18) extras tcp rpc) (import (prefix rpc rpc:)) -(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml) +(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n zmq) (import (prefix sqlite3 sqlite3:)) (declare (unit db)) (declare (uses common)) (declare (uses keys)) @@ -53,26 +53,27 @@ (begin (debug:print-info 11 "db:set-sync, setting pragma synchronous to " val) (sqlite3:execute db (conc "PRAGMA synchronous = '" val "';")))))) (define (open-db) ;; (conc *toppath* "/megatest.db") (car *configinfo*))) + (if (not *toppath*)(setup-for-run)) (let* ((dbpath (conc *toppath* "/megatest.db")) ;; fname) (dbexists (file-exists? dbpath)) (db (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath)) (handler (make-busy-timeout (if (args:get-arg "-override-timeout") (string->number (args:get-arg "-override-timeout")) 136000)))) ;; 136000))) ;; 136000 = 2.2 minutes - (debug:print-info 11 "open-db, dbpath=" dbpath) + (debug:print-info 11 "open-db, dbpath=" dbpath " argv=" (argv)) (sqlite3:set-busy-handler! db handler) (if (not dbexists) (db:initialize db)) (db:set-sync db) db)) ;; keeping it around for debugging purposes only (define (open-run-close-no-exception-handling proc idb . params) - (debug:print-info 11 "open-run-close-no-exception-handling START, idb=" idb ", params=" params) + (debug:print-info 11 "open-run-close-no-exception-handling START given a db=" (if idb "yes " "no ") ", params=" params) (let* ((db (if idb idb (open-db))) (res #f)) (set! res (apply proc db params)) (if (not idb)(sqlite3:finalize! db)) (debug:print-info 11 "open-run-close-no-exception-handling END" ) @@ -442,18 +443,22 @@ 2)) (if (> (abs (- *last-global-delta-printed* *global-delta*)) 0.08) ;; don't print all the time, only if it changes a bit (begin (debug:print-info 4 "launch throttle factor=" *global-delta*) (set! *last-global-delta-printed* *global-delta*))) - (debug:print-info 11 "db:get-var END " var) + (debug:print-info 11 "db:get-var END " var " val=" res) res)) (define (db:set-var db var val) (debug:print-info 11 "db:set-var START " var " " val) (sqlite3:execute db "INSERT OR REPLACE INTO metadat (var,val) VALUES (?,?);" var val) - (debug:print-info 11 "db:set-var END " var " " val) -) + (debug:print-info 11 "db:set-var END " var " " val)) + +(define (db:del-var db var) + (debug:print-info 11 "db:del-var START " var) + (sqlite3:execute db "DELETE FROM metadat WHERE var=?;" var) + (debug:print-info 11 "db:del-var END " var)) ;; use a global for some primitive caching, it is just silly to re-read the db ;; over and over again for the keys since they never change (define (db:get-keys db) @@ -1073,14 +1078,11 @@ t.comment t.event_time t.fail_count t.pass_count t.archived - - - - FROM tests AS t INNER JOIN runs AS r ON t.run_id=r.id WHERE " + FROM tests AS t INNER JOIN runs AS r ON t.run_id=r.id WHERE " keystr " AND r.runname LIKE '" runname "' AND item_path LIKE '" itempatt "' AND testname LIKE '" testpatt "' AND t.state LIKE '" statepatt "' AND t.status LIKE '" statuspatt "'ORDER BY t.event_time ASC;"))) (debug:print 3 "qrystr: " qrystr) (sqlite3:for-each-row @@ -1092,146 +1094,180 @@ ;;====================================================================== ;; QUEUE UP META, TEST STATUS AND STEPS ;;====================================================================== +;; db:updater is run in a thread to write out the cached data periodically (define (db:updater) (debug:print-info 4 "Starting cache processing") - (let loop ((start-time (current-time))) + (let loop () (thread-sleep! 10) ;; move save time around to minimize regular collisions? (db:write-cached-data) - (loop start-time))) - -(define (cdb:test-set-status-state test-id status state msg) - (debug:print-info 4 "cdb:test-set-status-state test-id=" test-id ", status=" status ", state=" state ", msg=" msg) - (mutex-lock! *incoming-mutex*) - (set! *last-db-access* (current-seconds)) - (if msg - (set! *incoming-data* (cons (vector 'state-status-msg - (current-milliseconds) - (list state status msg test-id)) - *incoming-data*)) - (set! *incoming-data* (cons (vector 'state-status - (current-milliseconds) - (list state status test-id)) ;; run-id test-name item-path minutes cpuload diskfree tmpfree) - *incoming-data*))) - (mutex-unlock! *incoming-mutex*) - (if *cache-on* - (debug:print-info 6 "*cache-on* is " *cache-on* ", skipping cache write") - (db:write-cached-data))) - -(define (cdb:test-rollup-test_data-pass-fail test-id) - (debug:print-info 4 "Adding " test-id " for test_data rollup to the queue") - (mutex-lock! *incoming-mutex*) - (set! *last-db-access* (current-seconds)) - (set! *incoming-data* (cons (vector 'test_data-pf-rollup - (current-milliseconds) - (list test-id test-id test-id test-id)) - *incoming-data*)) - (mutex-unlock! *incoming-mutex*) - (if *cache-on* - (debug:print-info 6 "*cache-on* is " *cache-on* ", skipping cache write") - (db:write-cached-data))) - -(define (cdb:pass-fail-counts test-id fail-count pass-count) - (debug:print-info 4 "Adding " test-id " for setting pass/fail counts to the queue") - (mutex-lock! *incoming-mutex*) - (set! *last-db-access* (current-seconds)) - (set! *incoming-data* (cons (vector 'pass-fail-counts - (current-milliseconds) - (list fail-count pass-count test-id)) - *incoming-data*)) - (mutex-unlock! *incoming-mutex*) - (if *cache-on* - (debug:print-info 6 "*cache-on* is " *cache-on* ", skipping cache write") - (db:write-cached-data))) - -(define (cdb:tests-register-test db run-id test-name item-path #!key (force-write #f)) + (loop))) + +;; cdb:cached-access is called by the server loop to dispatch commands or queue up +;; db accesses +;; +;; params := qry-name cached? val1 val2 val3 ... +(define (cdb:cached-access params) + (debug:print-info 12 "cdb:cached-access params=" params) + (if (< (length params) 2) + "ERROR" + (let ((qry-name (car params)) + (cached? (cadr params)) + (remparam (list-tail params 2))) + (debug:print-info 12 "cdb:cached-access qry-name=" qry-name " params=" params) + ;; Any special calls are dispatched here. + ;; Remainder are put in the db queue + (case qry-name + ((login) ;; login checks that the megatest path matches + (if (null? remparam) + #f ;; no path - fail! + (let ((calling-path (car remparam))) + (if (equal? calling-path *toppath*) + #t ;; path matches - pass! Should vet the caller at this time ... + #f)))) ;; else fail to login + ((flush) + (db:write-cached-data) + #t) + (else + (mutex-lock! *incoming-mutex*) + (set! *last-db-access* (current-seconds)) + (set! *incoming-data* (cons + (vector qry-name + (current-milliseconds) + remparam) + *incoming-data*)) + (mutex-unlock! *incoming-mutex*) + ;; NOTE: if cached? is #f then this call must be run immediately + ;; but first all calls in the queue are run first in the order + ;; of their time stamp + (if (and cached? *cache-on*) + (begin + (debug:print-info 12 "*cache-on* is " *cache-on* ", skipping cache write") + "CACHED") + (begin + (db:write-cached-data) + "WRITTEN"))))))) + +(define (db:obj->string obj)(with-output-to-string (lambda ()(serialize obj)))) +(define (db:string->obj msg)(with-input-from-string msg (lambda ()(deserialize)))) + +(define (cdb:client-call zmq-socket . params) + (debug:print-info 11 "cdb:client-call zmq-socket=" zmq-socket " params=" params) + (let ((zdat (db:obj->string params)) ;; (with-output-to-string (lambda ()(serialize params)))) + (res #f)) + (send-message zmq-socket zdat) + (set! res (db:string->obj (receive-message zmq-socket zdat))) + (debug:print-info 11 "zmq-socket " (car params) " res=" res) + res)) + +(define (cdb:test-set-status-state zmqsocket test-id status state msg) + (if msg + (cdb:client-call zmqsocket 'state-status-msg #t state status msg test-id) + (cdb:client-call zmqsocket 'state-status #t state status test-id))) ;; run-id test-name item-path minutes cpuload diskfree tmpfree) + +(define (cdb:test-rollup-test_data-pass-fail zmqsocket test-id) + (cdb:client-call zmqsocket 'test_data-pf-rollup #t test-id test-id test-id test-id)) + +(define (cdb:pass-fail-counts zmqsocket test-id fail-count pass-count) + (cdb:client-call zmqsocket 'pass-fail-counts #t fail-count pass-count test-id)) + +(define (cdb:tests-register-test zmqsocket run-id test-name item-path) (let ((item-paths (if (equal? item-path "") (list item-path) (list item-path "")))) - (debug:print-info 4 "Adding " run-id ", " test-name "/" item-path " for setting pass/fail counts to the queue") - (mutex-lock! *incoming-mutex*) - (set! *last-db-access* (current-seconds)) - (set! *incoming-data* (cons (vector 'register-test - (current-milliseconds) - (list run-id test-name item-path)) ;; fail-count pass-count test-id)) - *incoming-data*)) - (mutex-unlock! *incoming-mutex*) - (if (and (not force-write) *cache-on*) - (debug:print-info 6 "*cache-on* is " *cache-on* ", skipping cache write") - (db:write-cached-data)))) + (cdb:client-call zmqsocket 'register-test #t run-id test-name item-path))) + +(define (cdb:flush-queue zmqsocket) + (cdb:client-call zmqsocket 'flush #f)) + +(define db:queries + '((register-test "INSERT OR IGNORE INTO tests (run_id,testname,event_time,item_path,state,status) VALUES (?,?,strftime('%s','now'),?,'NOT_STARTED','n/a');") + (state-status "UPDATE tests SET state=?,status=? WHERE id=?;") + (state-status-msg "UPDATE tests SET state=?,status=?,comment=? WHERE id=?;") + (pass-fail-counts "UPDATE tests SET fail_count=?,pass_count=? WHERE id=?;") + (test_data-pf-rollup "UPDATE tests + SET status=CASE WHEN (SELECT fail_count FROM tests WHERE id=?) > 0 + THEN 'FAIL' + WHEN (SELECT pass_count FROM tests WHERE id=?) > 0 AND + (SELECT status FROM tests WHERE id=?) NOT IN ('WARN','FAIL') + THEN 'PASS' + ELSE status + END WHERE id=?;") + (rollup-tests-pass-fail "UPDATE tests + SET fail_count=(SELECT count(id) FROM tests WHERE + run_id=? AND testname=? AND item_path != '' AND status='FAIL'), + pass_count=(SELECT count(id) FROM tests WHERE + run_id=? AND testname=? AND item_path != '' AND (status='PASS' OR status='WARN' OR status='WAIVED')) + WHERE run_id=? AND testname=? AND item_path='';"))) + +(define db:special-queries '(rollup-tests-pass-fail)) +(define db:run-local-queries '(rollup-tests-pass-fail)) ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; (define (db:write-cached-data) (open-run-close - (lambda (db . params) - (let ((register-test-stmt (sqlite3:prepare db "INSERT OR IGNORE INTO tests (run_id,testname,event_time,item_path,state,status) VALUES (?,?,strftime('%s','now'),?,'NOT_STARTED','n/a');")) - (state-status-stmt (sqlite3:prepare db "UPDATE tests SET state=?,status=? WHERE id=?;")) - (state-status-msg-stmt (sqlite3:prepare db "UPDATE tests SET state=?,status=?,comment=? WHERE id=?;")) - (pass-fail-counts-stmt (sqlite3:prepare db "UPDATE tests SET fail_count=?,pass_count=? WHERE id=?;")) - (test_data-rollup-stmt (sqlite3:prepare db "UPDATE tests - SET status=CASE WHEN (SELECT fail_count FROM tests WHERE id=?) > 0 - THEN 'FAIL' - WHEN (SELECT pass_count FROM tests WHERE id=?) > 0 AND - (SELECT status FROM tests WHERE id=?) NOT IN ('WARN','FAIL') - THEN 'PASS' - ELSE status - END WHERE id=?;")) - (data #f) - (rollups (make-hash-table))) + (lambda (db . junkparams) + (let ((queries (make-hash-table)) + (data #f)) (mutex-lock! *incoming-mutex*) (set! data (sort *incoming-data* (lambda (a b)(< (vector-ref a 1)(vector-ref b 1))))) (set! *incoming-data* '()) (mutex-unlock! *incoming-mutex*) (if (> (length data) 0) (debug:print-info 4 "Writing cached data " data)) - (sqlite3:with-transaction - db - (lambda () - (debug:print-info 4 "flushing " data " to db") - (for-each (lambda (entry) - (let ((params (vector-ref entry 2))) - (debug:print-info 4 "Applying " entry " to params " params) - (case (vector-ref entry 0) - ((state-status) - (apply sqlite3:execute state-status-stmt params)) - ((state-status-msg) - (apply sqlite3:execute state-status-msg-stmt params)) - ((test_data-pf-rollup) - ;; (hash-table-set! rollups (car params) params)) - (apply sqlite3:execute test_data-rollup-stmt params)) - ((pass-fail-counts) - (apply sqlite3:execute pass-fail-counts-stmt params)) - ((register-test) - (apply sqlite3:execute register-test-stmt params)) - (else - (debug:print 0 "ERROR: Queued entry not recognised " entry))))) - data))) - ;; now do any rollups - ;; (for-each - ;; (lambda (test-id) - ;; (apply sqlite3:execute test_data-rollup-stmt (hash-table-ref rollups test-id))) - ;; (hash-table-keys rollups)) - (sqlite3:finalize! state-status-stmt) - (sqlite3:finalize! state-status-msg-stmt) - (sqlite3:finalize! test_data-rollup-stmt) - (sqlite3:finalize! pass-fail-counts-stmt) - (sqlite3:finalize! register-test-stmt) + ;; prepare the needed statements + (for-each (lambda (request-item) + (let ((stmt-key (vector-ref request-item 0))) + (if (not (hash-table-ref/default queries stmt-key #f)) + (let ((stmt (alist-ref stmt-key db:queries))) + (if stmt + (hash-table-set! queries stmt-key (sqlite3:prepare db (car stmt))) + (debug:print 0 "ERROR: Missing query spec for " stmt-key "!")))))) + data) + (let outerloop ((special-qry #f) + (stmts data)) + (if special-qry + ;; handle a query that cannot be part of the grouped queries + (let* ((stmt-key (vector-ref special-qry 0)) + (qry (hash-table-ref queries stmt-key)) + (params (vector-ref speical-qry 2))) + (apply sqlite3:execute db qry params) + (if (not (null? stmts)) + (outerloop #f stmts))) + ;; handle normal queries + (sqlite3:with-transaction + db + (lambda () + (debug:print-info 11 "flushing " stmts " to db") + (if (not (null? stmts)) + (let innerloop ((hed (car stmts)) + (tal (cdr stmts))) + (let ((params (vector-ref hed 2)) + (stmt-key (vector-ref hed 0))) + (if (not (member stmt-key db:special-queries)) + (begin + (debug:print-info 11 "Executing " stmt-key " for " params) + (apply sqlite3:execute (hash-table-ref queries stmt-key) params) + (if (not (null? tal)) + (innerloop (car tal)(cdr tal)))) + (outerloop hed tal))))))))) + (for-each (lambda (stmt-key) + (sqlite3:finalize! (hash-table-ref queries stmt-key))) + (hash-table-keys queries)) (let ((cache-size (length data))) (if (> cache-size *max-cache-size*) (set! *max-cache-size* cache-size))) )) #f)) -(define cdb:flush-queue db:write-cached-data) - (define (db:roll-up-pass-fail-counts db run-id test-name item-path status) - (rdb:flush-queue) + (cdb:flush-queue *runremote*) (if (and (not (equal? item-path "")) (or (equal? status "PASS") (equal? status "WARN") (equal? status "FAIL") (equal? status "WAIVED") @@ -1255,10 +1291,11 @@ ELSE 'COMPLETED' END, status=CASE WHEN fail_count > 0 THEN 'FAIL' WHEN pass_count > 0 AND fail_count=0 THEN 'PASS' ELSE 'UNKNOWN' END WHERE run_id=? AND testname=? AND item_path='';" run-id test-name run-id test-name)) #f) + #f)) ;;====================================================================== ;; Tests meta data ;;====================================================================== @@ -1390,18 +1427,18 @@ (SELECT count(id) FROM test_data WHERE test_id=? AND status like 'pass') AS pass_count;" test-id test-id) (sqlite3:finalize! tdb) ;; Now rollup the counts to the central megatest.db - (rdb:pass-fail-counts test-id fail-count pass-count) + (cdb:pass-fail-counts *runremote* test-id fail-count pass-count) ;; (sqlite3:execute db "UPDATE tests SET fail_count=?,pass_count=? WHERE id=?;" ;; fail-count pass-count test-id) (thread-sleep! 1) ;; play nice with the queue by ensuring the rollup is at least 10ms later than the set ;; if the test is not FAIL then set status based on the fail and pass counts. - (rdb:test-rollup-test_data-pass-fail test-id) + (cdb:test-rollup-test_data-pass-fail *runremote* test-id) ;; (sqlite3:execute ;; db ;;; NOTE: Should this be WARN,FAIL? A WARN is not a FAIL????? BUG FIXME ;; "UPDATE tests ;; SET status=CASE WHEN (SELECT fail_count FROM tests WHERE id=?) > 0 ;; THEN 'FAIL' @@ -1699,55 +1736,48 @@ ;;====================================================================== ;; REMOTE DB ACCESS VIA RPC ;;====================================================================== -(define (rdb:open-run-close procname . remargs) - (if *runremote* - (let ((host (vector-ref *runremote* 0)) - (port (vector-ref *runremote* 1))) - (apply (rpc:procedure 'rdb:open-run-close host port) procname remargs)) - (apply open-run-close (eval procname) remargs))) - -(define (rdb:test-set-status-state test-id status state msg) - (if *runremote* - (let ((host (vector-ref *runremote* 0)) - (port (vector-ref *runremote* 1))) - (handle-exceptions - exn - (begin - (debug:print 0 "EXCEPTION: rpc call failed?") - (debug:print 0 " " ((condition-property-accessor 'exn 'message) exn)) - (print-call-chain) - (cdb:test-set-status-state test-id status state msg)) - ((rpc:procedure 'cdb:test-set-status-state host port) test-id status state msg))) - (cdb:test-set-status-state test-id status state msg))) - -(define (rdb:test-rollup-test_data-pass-fail test-id) - (if *runremote* - (let ((host (vector-ref *runremote* 0)) - (port (vector-ref *runremote* 1))) - ((rpc:procedure 'cdb:test-rollup-test_data-pass-fail host port) test-id)) - (cdb:test-rollup-test_data-pass-fail test-id))) - -(define (rdb:pass-fail-counts test-id fail-count pass-count) - (if *runremote* - (let ((host (vector-ref *runremote* 0)) - (port (vector-ref *runremote* 1))) - ((rpc:procedure 'cdb:pass-fail-counts host port) test-id fail-count pass-count)) - (cdb:pass-fail-counts test-id fail-count pass-count))) - -;; currently forces a flush of the queue -(define (rdb:tests-register-test db run-id test-name item-path) - (if *runremote* - (let ((host (vector-ref *runremote* 0)) - (port (vector-ref *runremote* 1))) - ((rpc:procedure 'cdb:tests-register-test host port) db run-id test-name item-path force-write: #t)) - (cdb:tests-register-test db run-id test-name item-path force-write: #t))) - -(define (rdb:flush-queue) - (if *runremote* - (let ((host (vector-ref *runremote* 0)) - (port (vector-ref *runremote* 1))) - ((rpc:procedure 'cdb:flush-queue host port))) - (cdb:flush-queue))) - +;; (define (rdb:test-set-status-state test-id status state msg) +;; (if *runremote* +;; (let ((host (vector-ref *runremote* 0)) +;; (port (vector-ref *runremote* 1))) +;; (handle-exceptions +;; exn +;; (begin +;; (debug:print 0 "EXCEPTION: rpc call failed?") +;; (debug:print 0 " " ((condition-property-accessor 'exn 'message) exn)) +;; (print-call-chain) +;; (cdb:test-set-status-state test-id status state msg)) +;; ((rpc:procedure 'cdb:test-set-status-state host port) test-id status state msg))) +;; (cdb:test-set-status-state test-id status state msg))) +;; +;; (define (rdb:test-rollup-test_data-pass-fail test-id) +;; (if *runremote* +;; (let ((host (vector-ref *runremote* 0)) +;; (port (vector-ref *runremote* 1))) +;; ((rpc:procedure 'cdb:test-rollup-test_data-pass-fail host port) test-id)) +;; (cdb:test-rollup-test_data-pass-fail test-id))) +;; +;; (define (rdb:pass-fail-counts test-id fail-count pass-count) +;; (if *runremote* +;; (let ((host (vector-ref *runremote* 0)) +;; (port (vector-ref *runremote* 1))) +;; ((rpc:procedure 'cdb:pass-fail-counts host port) test-id fail-count pass-count)) +;; (cdb:pass-fail-counts test-id fail-count pass-count))) +;; +;; ;; currently forces a flush of the queue +;; (define (rdb:tests-register-test db run-id test-name item-path) +;; (if *runremote* +;; (let ((host (vector-ref *runremote* 0)) +;; (port (vector-ref *runremote* 1))) +;; ((rpc:procedure 'cdb:tests-register-test host port) db run-id test-name item-path force-write: #t)) +;; (cdb:tests-register-test db run-id test-name item-path force-write: #t))) +;; +;; (define (rdb:flush-queue) +;; (if *runremote* +;; (let ((host (vector-ref *runremote* 0)) +;; (port (vector-ref *runremote* 1))) +;; ((rpc:procedure 'cdb:flush-queue host port))) +;; (cdb:flush-queue))) +;; Index: launch.scm ================================================================== --- launch.scm +++ launch.scm @@ -275,11 +275,11 @@ (if p-id (begin (debug:print 0 "Killing " (cadr parts) "; kill -9 " p-id) (system (conc "kill -9 " p-id)))))) (car processes)) - (system (conc "kill -9 " pid)))) + (system (conc "kill -9 -" pid)))) (begin (debug:print 0 "WARNING: Request received to kill job but problem with process, attempting to kill manager process") (tests:test-set-status! test-id "KILLED" "FAIL" (args:get-arg "-m") #f) (sqlite3:finalize! tdb) @@ -296,10 +296,11 @@ (thread-start! th2) (thread-join! th2) (mutex-lock! m) (let* ((item-path (item-list->path itemdat)) (testinfo (open-run-close db:get-test-info-by-id #f test-id))) ;; )) ;; run-id test-name item-path))) + ;; Am I completed? (if (not (equal? (db:test-get-state testinfo) "COMPLETED")) (begin (debug:print 2 "Test NOT logged as COMPLETED, (state=" (db:test-get-state testinfo) "), updating result, rollup-status is " rollup-status) (tests:test-set-status! test-id (if kill-job? "KILLED" "COMPLETED") Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -8,11 +8,11 @@ ;; PURPOSE. ;; (include "common.scm") ;; (include "megatest-version.scm") -(use sqlite3 srfi-1 posix regex regex-case srfi-69 base64 format readline apropos) ;; (srfi 18) extras) +(use sqlite3 srfi-1 posix regex regex-case srfi-69 base64 format readline apropos zmq) ;; (srfi 18) extras) (import (prefix sqlite3 sqlite3:)) (import (prefix base64 base64:)) (declare (uses common)) (declare (uses megatest-version)) @@ -252,10 +252,18 @@ (if (args:get-arg "-env2file") (begin (save-environment-as-files (args:get-arg "-env2file")) (set! *didsomething* #t))) + +;;====================================================================== +;; Start the server - can be done in conjunction with -runall or -runtests (one day...) +;; we start the server if not running else start the client thread +;;====================================================================== +(if (args:get-arg "-server") + (server:launch) + (server:client-launch)) ;;====================================================================== ;; Remove old run(s) ;;====================================================================== @@ -360,27 +368,10 @@ tests)))) runs) (set! *didsomething* #t) ))) -;;====================================================================== -;; Start the server - can be done in conjunction with -runall or -runtests (one day...) -;;====================================================================== -(if (args:get-arg "-server") - (let* ((toppath (setup-for-run)) - (db (if toppath (open-db) #f))) - (debug:print-info 0 "Starting the standalone server") - (if db - (let* ((host:port (db:get-var db "SERVER")) ;; this doen't support multiple servers BUG!!!! - (th2 (server:start db (args:get-arg "-server"))) - (th3 (make-thread (lambda () - (server:keep-running db host:port))))) - (thread-start! th3) - (thread-join! th3) - (set! *didsomething* #t)) - (debug:print 0 "ERROR: Failed to setup for megatest")))) - ;;====================================================================== ;; full run ;;====================================================================== ;; get lock in db for full run for this directory @@ -397,21 +388,34 @@ ;; if still ok to run tasks ;; process deferred tasks per above steps ;; run all tests are are Not COMPLETED and PASS or CHECK (if (args:get-arg "-runall") - (general-run-call - "-runall" - "run all tests" - (lambda (target runname keys keynames keyvallst) + (let ((server-thread #f)) + (if (args:get-arg "-server") + (let ((toppath (setup-for-run)) + (db (open-db))) + (if db + (let* ((host:port (db:get-var db "SERVER")) ;; this doen't support multiple servers BUG!!!! + (th2 (server:start db (args:get-arg "-server"))) + (th3 (make-thread (lambda () + (server:keep-running db host:port))))) + (thread-start! th3) + (set! server-thread th3))))) + (general-run-call + "-runall" + "run all tests" + (lambda (target runname keys keynames keyvallst) (runs:run-tests target runname (if (args:get-arg "-testpatt") (args:get-arg "-testpatt") "%/%") user - args:arg-hash)))) ;; ) + args:arg-hash))) + (if server-thread + (thread-join! server-thread)))) ;;====================================================================== ;; run one test ;;====================================================================== @@ -828,10 +832,14 @@ (set! *didsomething* #t))) ;;====================================================================== ;; Exit and clean up ;;====================================================================== + +;; this is the socket if we are a client +(if (socket? *runremote*) + (close-socket *runremote*)) (if (not *didsomething*) (debug:print 0 help)) ;; (if *runremote* (rpc:close-all-connections!)) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -6,14 +6,14 @@ ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. -(require-extension (srfi 18) extras tcp rpc) +(require-extension (srfi 18) extras tcp rpc s11n) (import (prefix rpc rpc:)) -(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo) +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo zmq) (import (prefix sqlite3 sqlite3:)) (declare (unit server)) (declare (uses common)) @@ -21,193 +21,158 @@ (declare (uses tests)) (include "common_records.scm") (include "db_records.scm") -;; procstr is the name of the procedure to be called as a string -(define (server:autoremote procstr params) - (handle-exceptions - exn - (begin - (debug:print 1 "Remote failed for " proc " " params) - (apply (eval (string->symbol procstr)) params)) - ;; (if *runremote* - ;; (apply (eval (string->symbol (conc "remote:" procstr))) params) - (apply (eval (string->symbol procstr)) params))) - -(define (server:start db hostn) +(define (server:run hostn) (debug:print 0 "Attempting to start the server ...") - (let ((host:port (db:get-var db "SERVER"))) ;; do whe already have a server running? + (let ((host:port (open-run-close db:get-var #f "SERVER"))) ;; do whe already have a server running? (if host:port - (set! *runremote* (let* ((lst (string-split host:port ":")) - (port (if (> (length lst) 1) - (string->number (cadr lst)) - #f))) - (if port (vector (car lst) port) #f))) - (let* ((rpc:listener (server:find-free-port-and-open (rpc:default-server-port))) - (th1 (make-thread - (cute (rpc:make-server rpc:listener) "rpc:server") - 'rpc:server)) - ;; (th2 (make-thread (lambda ()(db:updater)))) + (begin + (debug:print 0 "NOTE: server already running.") + (if (server:client-setup) + (begin + (debug:print-info 0 "Server is alive, not starting another") + ;;(exit) + ) + (begin + (debug:print-info 0 "Server is dead, removing flag and trying again") + (open-run-close db:del-var #f "SERVER") + (server:run hostn)))) + (let* ((zmq-socket #f) (hostname (if (string=? "-" hostn) (get-host-name) hostn)) - (ipaddrstr (if (string=? "-" hostn) - (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") - #f)) - (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" (rpc:default-server-port)))) - (debug:print 0 "Server started on " host:port) - (db:set-var db "SERVER" host:port) + (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + #f))) + (if ipstr ipstr hostname)))) + (set! zmq-socket (server:find-free-port-and-open ipaddrstr zmq-socket 5555)) (set! *cache-on* #t) - ;; can use this to run most anything at the remote - (rpc:publish-procedure! - 'remote:run - (lambda (procstr . params) - (server:autoremote procstr params))) - - (rpc:publish-procedure! - 'server:login - (lambda (toppath) - (set! *last-db-access* (current-seconds)) - (if (equal? *toppath* toppath) - (begin - (debug:print-info 2 "login successful") - #t) - #f))) - - ;;====================================================================== - ;; db specials here - ;;====================================================================== - ;; remote call to open-run-close - (rpc:publish-procedure! - 'rdb:open-run-close - (lambda (procname . remargs) - (debug:print-info 12 "Remote call of rdb:open-run-close " procname " " remargs) - (set! *last-db-access* (current-seconds)) - (apply open-run-close (eval procname) remargs))) - - (rpc:publish-procedure! - 'cdb:test-set-status-state - (lambda (test-id status state msg) - (debug:print-info 12 "Remote call of cdb:test-set-status-state test-id=" test-id ", status=" status ", state=" state ", msg=" msg) - (cdb:test-set-status-state test-id status state msg))) - - (rpc:publish-procedure! - 'cdb:test-rollup-test_data-pass-fail - (lambda (test-id) - (debug:print-info 12 "Remote call of cdb:test-rollup-test_data-pass-fail " test-id) - (cdb:test-rollup-test_data-pass-fail test-id))) - - (rpc:publish-procedure! - 'cdb:pass-fail-counts - (lambda (test-id fail-count pass-count) - (debug:print-info 12 "Remote call of cdb:pass-fail-counts " test-id " passes: " pass-count " fails: " fail-count) - (cdb:pass-fail-counts test-id fail-count pass-count))) - - (rpc:publish-procedure! - 'cdb:tests-register-test - (lambda (db run-id test-name item-path) - (debug:print-info 12 "Remote call of cdb:tests-register-test " run-id " testname: " test-name " item-path: " item-path) - (cdb:tests-register-test db run-id test-name item-path))) - - (rpc:publish-procedure! - 'cdb:flush-queue - (lambda () - (debug:print-info 12 "Remote call of cdb:flush-queue") - (cdb:flush-queue))) - - ;;====================================================================== - ;; end of publish-procedure section - ;;====================================================================== - - (set! *rpc:listener* rpc:listener) + ;; what to do when we quit + ;; (on-exit (lambda () - (open-run-close - (lambda (db . params) - (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER' and val=?;" host:port)) - #f ;; for db - #f) ;; for a param - (let loop ((n 0)) + (open-run-close db:del-var #f "SERVER") + (let loop () (let ((queue-len 0)) (thread-sleep! (random 5)) (mutex-lock! *incoming-mutex*) (set! queue-len (length *incoming-data*)) (mutex-unlock! *incoming-mutex*) (if (> queue-len 0) (begin (debug:print-info 0 "Queue not flushed, waiting ...") - (loop (+ n 1))))) - ))) - (db:updater) - (thread-start! th1) - ;; (debug:print 0 "Server started on port " (rpc:default-server-port) "...") - ;; (thread-start! th2) - ;; (thread-join! th2) - ;; return th2 for the calling process to do a join with - th1 - )))) ;; rpc:server))) - -(define (server:keep-running db host:port) + (loop))))))) + + ;; The heavy lifting + ;; + (let loop () + (let* ((rawmsg (receive-message zmq-socket)) + (params (db:string->obj rawmsg)) ;; (with-input-from-string rawmsg (lambda ()(deserialize)))) + (res #f)) + (debug:print-info 12 "server=> received params=" params) + (set! res (cdb:cached-access params)) + (debug:print-info 12 "server=> processed res=" res) + (send-message zmq-socket (db:obj->string res)) + (loop))))))) + +;; run server:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (server:keep-running) ;; if none running or if > 20 seconds since ;; server last used then start shutdown (let loop ((count 0)) - (thread-sleep! 20) ;; no need to do this very often - (let ((numrunning (db:get-count-tests-running db))) - (if (or (> numrunning 0) - (> (+ *last-db-access* 60)(current-seconds))) - (begin - (debug:print-info 0 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*)) - (loop (+ 1 count))) - (begin - (debug:print-info 0 "Starting to shutdown the server side") - ;; need to delete only *my* server entry (future use) - (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER' AND val like ?;" host:port) - (thread-sleep! 10) - (debug:print-info 0 "Max cached queries was " *max-cache-size*) - (debug:print-info 0 "Server shutdown complete. Exiting") - ;; (exit))) - ))))) - -(define (server:find-free-port-and-open port) - (handle-exceptions - exn - (begin - (print "Failed to bind to port " (rpc:default-server-port) ", trying next port") - (server:find-free-port-and-open (+ port 1))) - (rpc:default-server-port port) - (tcp-read-timeout 240000) - (tcp-listen (rpc:default-server-port) 10000))) + (thread-sleep! 1) ;; no need to do this very often + (db:write-cached-data) + (if (< count 100) + (loop 0) + (let ((numrunning (open-run-close db:get-count-tests-running #f))) + (if (or (> numrunning 0) + (> (+ *last-db-access* 60)(current-seconds))) + (begin + (debug:print-info 0 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*)) + (loop (+ count 1))) + (begin + (debug:print-info 0 "Starting to shutdown the server side") + ;; need to delete only *my* server entry (future use) + (open-run-close db:del-var #f "SERVER") + (thread-sleep! 10) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + ;; (exit))) + )))))) + +(define (server:find-free-port-and-open host s port) + (let ((s (if s s (make-socket 'rep))) + (p (if (number? port) port 5555))) + (handle-exceptions + exn + (begin + (debug:print 0 "Failed to bind to port " p ", trying next port") + (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) + (server:find-free-port-and-open host s (+ p 1))) + (let ((zmq-url (conc "tcp://" host ":" p))) + (print "Trying to start server on " zmq-url) + (bind-socket s zmq-url) + (set! *runremote* #f) + (debug:print 0 "Server started on " zmq-url) + (open-run-close db:set-var #f "SERVER" zmq-url) + s)))) (define (server:client-setup) - (if *runremote* - (begin - (debug:print 0 "ERROR: Attempt to connect to server but already connected") - #f) - (let* ((hostinfo (open-run-close db:get-var #f "SERVER")) - (hostdat (if hostinfo (string-split hostinfo ":") #f)) - (host (if hostinfo (car hostdat) #f)) - (port (if (and hostinfo (> (length hostdat) 1))(cadr hostdat) #f))) - (if (and port - (string->number port)) - (let ((portn (string->number port))) - (debug:print-info 2 "Setting up to connect to host " host ":" port) - (handle-exceptions - exn - (begin - (debug:print 0 "ERROR: Failed to open a connection to the server at host: " host " port: " port) - (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) - ;; (open-run-close - ;; (lambda (db . param) - ;; (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER'")) - ;; #f) - (set! *runremote* #f)) - (if (and (not (args:get-arg "-server")) ;; no point in the server using the server using the server - ((rpc:procedure 'server:login host portn) *toppath*)) - (begin - (debug:print-info 2 "Logged in and connected to " host ":" port) - (set! *runremote* (vector host portn))) - (begin - (debug:print-info 2 "Failed to login or connect to " host ":" port) - (set! *runremote* #f))))) - (debug:print-info 2 "no server available"))))) - + (let* ((hostinfo (open-run-close db:get-var #f "SERVER")) + (zmq-socket (make-socket 'req))) + (if hostinfo + (begin + (debug:print-info 2 "Setting up to connect to " hostinfo) + (handle-exceptions + exn + (begin + (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo) + (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) + (debug:print 0 " perhaps jobs killed with -9? Removing server records") + (open-run-close db:del-var #f "SERVER") + (exit) + #f) + (let ((connect-ok #f)) + (connect-socket zmq-socket hostinfo) + (set! connect-ok (cdb:client-call zmq-socket 'login #t *toppath*)) + (if connect-ok + (begin + (debug:print-info 2 "Logged in and connected to " hostinfo) + (set! *runremote* zmq-socket) + #t) + (begin + (debug:print-info 2 "Failed to login or connect to " hostinfo) + (set! *runremote* #f) + #f))))) + (begin + (debug:print-info 2 "No server available, attempting to start one...") + (system (conc "megatest -server - " (if (args:get-arg "-debug") + (conc "-debug " (args:get-arg "-debug")) + "") + " &")) + (sleep 5) + (server:client-setup))))) + +(define (server:launch) + (let* ((toppath (setup-for-run))) + (debug:print-info 0 "Starting the standalone server") + (if *toppath* + (let* ((th2 (make-thread (lambda () + (server:run (args:get-arg "-server"))))) + (th3 (make-thread (lambda () + (server:keep-running))))) + (thread-start! th3) + (thread-start! th2) + (thread-join! th3) + (set! *didsomething* #t)) + (debug:print 0 "ERROR: Failed to setup for megatest")))) + +(define (server:client-launch) + (if (server:client-setup) + (debug:print-info 0 "connected as client") + (begin + (debug:print 0 "ERROR: Failed to connect as client") + (exit)))) Index: tests.scm ================================================================== --- tests.scm +++ tests.scm @@ -218,14 +218,14 @@ (if waived (set! real-status "WAIVED")) (debug:print 4 "real-status " real-status ", waived " waived ", status " status) ;; update the primary record IF state AND status are defined (if (and state status) - (rdb:test-set-status-state test-id real-status state #f)) + (cdb:test-set-status-state *runremote* test-id real-status state #f)) ;; if status is "AUTO" then call rollup (note, this one modifies data in test - ;; run area, do not rpc it (yet) + ;; run area, it does remote calls under the hood. (if (and test-id state status (equal? status "AUTO")) (db:test-data-rollup #f test-id status)) ;; add metadata (need to do this way to avoid SQL injection issues) Index: tests/Makefile ================================================================== --- tests/Makefile +++ tests/Makefile @@ -65,11 +65,12 @@ cd ..;make install rm -f fullrun/logging.db touch cleanprep fullprep : cleanprep - cd fullrun;$(MEGATEST) -remove-runs :runname $(RUNNAME)% -target %/%/% -testpatt % -itempatt % + cd fullrun;$(MEGATEST) -server - -debug $(DEBUG) & + sleep 5;cd fullrun;$(MEGATEST) -remove-runs :runname $(RUNNAME)% -target %/%/% -testpatt %/% cd fullrun;$(BINPATH)/dboard -rows 15 & dashboard : cleanprep cd fullrun && $(BINPATH)/dashboard -rows 25 & Index: tests/tests.scm ================================================================== --- tests/tests.scm +++ tests/tests.scm @@ -112,11 +112,11 @@ (test "get all legal tests" (list "test1" "test2") (sort (get-all-legal-tests) string<=?)) (test "register-test, test info" "NOT_STARTED" (begin - (rdb:tests-register-test *db* 1 "nada" "") + (cdb:tests-register-test *remoterun* 1 "nada" "") ;; (rdb:flush-queue) (vector-ref (db:get-test-info *db* 1 "nada" "") 3))) (test #f "NOT_STARTED" (begin @@ -143,10 +143,11 @@ (define keys (db:get-keys *db*)) ;;====================================================================== ;; D B ;;====================================================================== + (test #f "FOO LIKE 'abc%def'" (db:patt->like "FOO" "abc%def")) (test #f (vector '("SYSTEM" "RELEASE" "id" "runname" "state" "status" "owner" "event_time") '()) (runs:get-runs-by-patt db keys "%")) (test #f "SYSTEM,RELEASE,id,runname,state,status,owner,event_time" (car (runs:get-std-run-fields keys '("id" "runname" "state" "status" "owner" "event_time")))) (test #f #t (runs:operate-on 'print "%" "%" "%")) @@ -254,18 +255,22 @@ ;; R E M O T E C A L L S ;;====================================================================== ;; start a server process (set! *verbosity* 10) -(define server-pid (process-run "../../bin/megatest" (list "-server" "-" "-debug" (conc *verbosity*)))) -(sleep 2) +;; (define server-pid (process-run "../../bin/megatest" (list "-server" "-" "-debug" (conc *verbosity*)))) +;; (sleep 2) + +(define th1 (make-thread server:launch)) +(thread-start! th1) + (define start-wait (current-seconds)) (server:client-setup) (print "Starting intensive cache and rpc test") (for-each (lambda (params) ;;; (rdb:tests-register-test #f 1 (conc "test" (random 20)) "") - (apply rdb:test-set-status-state test-id params) + (apply cdb:test-set-status-state *remoterun* test-id params) (rdb:pass-fail-counts test-id (random 100) (random 100)) (rdb:test-rollup-test_data-pass-fail test-id) (thread-sleep! 0.01)) ;; cache ordering granularity is at the second level. Should really be at the ms level '(("COMPLETED" "PASS" #f) ("NOT_STARTED" "FAIL" "Just testing") Index: testzmq/hwclient.scm ================================================================== --- testzmq/hwclient.scm +++ testzmq/hwclient.scm @@ -1,9 +1,9 @@ (use zmq posix) (define s (make-socket 'req)) -(connect-socket s "tcp://127.0.0.1:5563") +(connect-socket s "tcp://*:5563") (define myname (cadr (argv))) (print "Start client...") Index: testzmq/hwserver.scm ================================================================== --- testzmq/hwserver.scm +++ testzmq/hwserver.scm @@ -1,15 +1,15 @@ (use zmq srfi-18 posix) (define s (make-socket 'rep)) -(bind-socket s "tcp://127.0.0.1:5563") +(bind-socket s "tcp://*:5563") (print "Start server...") (let loop () (let* ((msg (receive-message s)) (name (caddr (string-split msg " "))) (resp (conc "World " name))) (print "Received request: [" msg "]") - (thread-sleep! 0.01) + (thread-sleep! 0.0001) (print "Sending response \"" resp "\"") (send-message s resp) (loop))) Index: utils/installall.sh ================================================================== --- utils/installall.sh +++ utils/installall.sh @@ -234,10 +234,13 @@ --disable-schedutils \ --disable-libblkid \ --disable-wall make install +# --disable-makeinstall-chown \ +# --disable-makeinstall-setuid \ + # --disable-chsh-only-listed # --disable-pg-bell let pg not ring the bell on invalid keys # --disable-require-password # --disable-use-tty-group do not install wall and write setgid tty # --disable-makeinstall-chown