Index: client.scm ================================================================== --- client.scm +++ client.scm @@ -53,11 +53,11 @@ ;; ;; client:setup ;; ;; lookup_server, need to remove *runremote* stuff ;; -(define (client:setup run-id #!key (remaining-tries 2) (failed-connects 0)) +(define (client:setup run-id #!key (remaining-tries 10) (failed-connects 0)) (debug:print-info 2 "client:setup remaining-tries=" remaining-tries) (let* ((tdbdat (tasks:open-db))) (if (<= remaining-tries 0) (begin (debug:print 0 "ERROR: failed to start or connect to server for run-id " run-id) @@ -86,10 +86,11 @@ (begin ;; login failed but have a server record, clean out the record and try again (debug:print-info 0 "client:setup, login failed, will attempt to start server ... start-res=" start-res ", run-id=" run-id ", server-dat=" server-dat) (case *transport-type* ((http)(http-transport:close-connections run-id))) (hash-table-delete! *runremote* run-id) + (tasks:kill-server-run-id run-id) (tasks:server-force-clean-run-record (db:delay-if-busy tdbdat) run-id (tasks:hostinfo-get-interface server-dat) (tasks:hostinfo-get-port server-dat) " client:setup (server-dat = #t)") Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -66,11 +66,11 @@ (define *db-access-mutex* (make-mutex)) ;; SERVER (define *my-client-signature* #f) ;; (define *transport-type* 'nmsg) -(define *transport-type* 'http) +(define *transport-type* 'http) ;; override with [server] transport http|rpc|nmsg (define *runremote* (make-hash-table)) ;; if set up for server communication this will hold (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) (define *client-non-blocking-mode* #f) (define *server-id* #f) Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -617,10 +617,11 @@ ;; (define (db:multi-db-sync run-ids . options) (let* ((toppath (launch:setup-for-run)) (dbstruct (if toppath (make-dbr:dbstruct path: toppath) #f)) (mtdb (if toppath (db:open-megatest-db))) + (allow-cleanup (if run-ids #f #t)) (run-ids (if run-ids run-ids (if toppath (begin (db:delay-if-busy mtdb) (db:get-all-run-ids mtdb))))) @@ -663,21 +664,47 @@ (db:replace-test-records dbstruct run-id testrecs) (sqlite3:finalize! (db:dbdat-get-db (dbr:dbstruct-get-rundb dbstruct))))) run-ids))) ;; now ensure all newdb data are synced to megatest.db + ;; do not use the run-ids list passed in to the function + ;; (if (member 'new2old options) - (for-each - (lambda (run-id) - (let* ((fromdb (if toppath (make-dbr:dbstruct path: toppath local: #t) #f)) - (frundb (db:dbdat-get-db (db:get-db fromdb run-id)))) - ;; (db:delay-if-busy frundb) - ;; (db:delay-if-busy mtdb) - (if (eq? run-id 0) - (db:sync-tables (db:sync-main-list dbstruct) (db:get-db fromdb #f) mtdb) - (db:sync-tables db:sync-tests-only (db:get-db fromdb run-id) mtdb)))) - (cons 0 run-ids))) + (let* ((maindb (make-dbr:dbstruct path: toppath local: #t)) + (src-run-ids (db:get-all-run-ids (db:dbdat-get-db (db:get-db maindb 0)))) + (all-run-ids (sort (delete-duplicates (cons 0 src-run-ids)) <)) + (count 1) + (total (length all-run-ids)) + (dead-runs '())) + (for-each + (lambda (run-id) + (debug:print 0 "Processing run " (if (eq? run-id 0) " main.db " run-id) ", " count " of " total) + (set! count (+ count 1)) + (let* ((fromdb (if toppath (make-dbr:dbstruct path: toppath local: #t) #f)) + (frundb (db:dbdat-get-db (db:get-db fromdb run-id)))) + ;; (db:delay-if-busy frundb) + ;; (db:delay-if-busy mtdb) + ;; (db:clean-up frundb) + (if (eq? run-id 0) + (begin + (db:sync-tables (db:sync-main-list dbstruct) (db:get-db fromdb #f) mtdb) + (set! dead-runs (db:clean-up-maindb (db:get-db fromdb #f)))) + (begin + ;; NB// must sync first to ensure deleted tests get marked as such in megatest.db + (db:sync-tables db:sync-tests-only (db:get-db fromdb run-id) mtdb) + (db:clean-up-rundb (db:get-db fromdb run-id)) + )))) + all-run-ids) + ;; removed deleted runs + (let ((dbdir (tasks:get-task-db-path))) + (for-each (lambda (run-id) + (let ((fullname (conc dbdir "/" run-id ".db"))) + (if (file-exists? fullname) + (begin + (debug:print 0 "Removing database file for deleted run " fullname) + (delete-file fullname))))) + dead-runs)))) ;; (db:close-all dbstruct) ;; (sqlite3:finalize! mdb) )) ;; keeping it around for debugging purposes only @@ -993,11 +1020,11 @@ ;; 2. Look at run records ;; a. If have tests that are not deleted, set state='unknown' ;; b. .... ;; (define (db:clean-up dbdat) - (debug:print 0 "WARNING: db clean up not fully ported to v1.60, cleanup action will be on megatest.db") + ;; (debug:print 0 "WARNING: db clean up not fully ported to v1.60, cleanup action will be on megatest.db") (let* ((db (db:dbdat-get-db dbdat)) (count-stmt (sqlite3:prepare db "SELECT (SELECT count(id) FROM tests)+(SELECT count(id) FROM runs);")) (statements (map (lambda (stmt) (sqlite3:prepare db stmt)) @@ -1027,10 +1054,99 @@ (map sqlite3:finalize! statements) (sqlite3:finalize! count-stmt) ;; (db:find-and-mark-incomplete db) (db:delay-if-busy dbdat) (sqlite3:execute db "VACUUM;"))) + +;; Clean out old junk and vacuum the database +;; +;; Ultimately do something like this: +;; +;; 1. Look at test records either deleted or part of deleted run: +;; a. If test dir exists, set the the test to state='UNKNOWN', Set the run to 'unknown' +;; b. If test dir gone, delete the test record +;; 2. Look at run records +;; a. If have tests that are not deleted, set state='unknown' +;; b. .... +;; +(define (db:clean-up-rundb dbdat) + ;; (debug:print 0 "WARNING: db clean up not fully ported to v1.60, cleanup action will be on megatest.db") + (let* ((db (db:dbdat-get-db dbdat)) + (count-stmt (sqlite3:prepare db "SELECT (SELECT count(id) FROM tests);")) + (statements + (map (lambda (stmt) + (sqlite3:prepare db stmt)) + (list + ;; delete all tests that belong to runs that are 'deleted' + ;; (conc "DELETE FROM tests WHERE run_id NOT IN (" (string-intersperse (map conc valid-runs) ",") ");") + ;; delete all tests that are 'DELETED' + "DELETE FROM tests WHERE state='DELETED';" + )))) + (db:delay-if-busy dbdat) + (sqlite3:with-transaction + db + (lambda () + (sqlite3:for-each-row (lambda (tot) + (debug:print-info 0 "Records count before clean: " tot)) + count-stmt) + (map sqlite3:execute statements) + (sqlite3:for-each-row (lambda (tot) + (debug:print-info 0 "Records count after clean: " tot)) + count-stmt))) + (map sqlite3:finalize! statements) + (sqlite3:finalize! count-stmt) + ;; (db:find-and-mark-incomplete db) + (db:delay-if-busy dbdat) + (sqlite3:execute db "VACUUM;"))) + +;; Clean out old junk and vacuum the database +;; +;; Ultimately do something like this: +;; +;; 1. Look at test records either deleted or part of deleted run: +;; a. If test dir exists, set the the test to state='UNKNOWN', Set the run to 'unknown' +;; b. If test dir gone, delete the test record +;; 2. Look at run records +;; a. If have tests that are not deleted, set state='unknown' +;; b. .... +;; +(define (db:clean-up-maindb dbdat) + ;; (debug:print 0 "WARNING: db clean up not fully ported to v1.60, cleanup action will be on megatest.db") + (let* ((db (db:dbdat-get-db dbdat)) + (count-stmt (sqlite3:prepare db "SELECT (SELECT count(id) FROM runs);")) + (statements + (map (lambda (stmt) + (sqlite3:prepare db stmt)) + (list + ;; delete all tests that belong to runs that are 'deleted' + ;; (conc "DELETE FROM tests WHERE run_id NOT IN (" (string-intersperse (map conc valid-runs) ",") ");") + ;; delete all tests that are 'DELETED' + "DELETE FROM runs WHERE state='deleted';" + ))) + (dead-runs '())) + (sqlite3:for-each-row + (lambda (run-id) + (set! dead-runs (cons run-id dead-runs))) + db + "SELECT id FROM runs WHERE state='deleted';") + (db:delay-if-busy dbdat) + (sqlite3:with-transaction + db + (lambda () + (sqlite3:for-each-row (lambda (tot) + (debug:print-info 0 "Records count before clean: " tot)) + count-stmt) + (map sqlite3:execute statements) + (sqlite3:for-each-row (lambda (tot) + (debug:print-info 0 "Records count after clean: " tot)) + count-stmt))) + (map sqlite3:finalize! statements) + (sqlite3:finalize! count-stmt) + ;; (db:find-and-mark-incomplete db) + (db:delay-if-busy dbdat) + (sqlite3:execute db "VACUUM;") + dead-runs)) ;;====================================================================== ;; M E T A G E T A N D S E T V A R S ;;====================================================================== @@ -2323,11 +2439,11 @@ (string-substitute (regexp "_") "=" msg #t))) (lambda ()(deserialize))) (begin (debug:print 0 "ERROR: reception failed. Received " msg " but cannot translate it.") - #f))) ;; crude reply for when things go awry + msg))) ;; crude reply for when things go awry ((zmq nmsg)(with-input-from-string msg (lambda ()(deserialize)))) (else msg))) (define (db:test-set-status-state dbstruct run-id test-id status state msg) (let ((dbdat (db:get-db dbstruct run-id))) Index: http-transport.scm ================================================================== --- http-transport.scm +++ http-transport.scm @@ -283,13 +283,14 @@ (db:string->obj (handle-exceptions exn (begin (set! success #f) - (debug:print 0 "WARNING: failure in with-input-from-request to " fullurl ". Killing associated server to allow clean retry.") + (debug:print 0 "WARNING: failure in with-input-from-request to " fullurl ".") (debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) (hash-table-delete! *runremote* run-id) + ;; Killing associated server to allow clean retry.") ;; (tasks:kill-server-run-id run-id) ;; better to kill the server in the logic that called this routine. #f) (with-input-from-request ;; was dat fullurl (list (cons 'key "thekey") @@ -313,15 +314,15 @@ (debug:print-info 11 "got res=" res) (if (vector? res) (if (vector-ref res 0) res (begin ;; note: this code also called in nmsg-transport - consider consolidating it - (debug:print 0 "ERROR: error occured at server, info=" (vector-ref result 2)) + (debug:print 0 "ERROR: error occured at server, info=" (vector-ref res 2)) (debug:print 0 " client call chain:") (print-call-chain (current-error-port)) (debug:print 0 " server call chain:") - (pp (vector-ref result 1) (current-error-port)) + (pp (vector-ref res 1) (current-error-port)) (signal (vector-ref result 0)))) (signal (make-composite-condition (make-property-condition 'timeout 'message "nmsg-transport:client-api-send-receive-raw timed out talking to server"))))))) Index: launch.scm ================================================================== --- launch.scm +++ launch.scm @@ -458,10 +458,17 @@ environ-patt: "env-override" given-toppath: (get-environment-variable "MT_RUN_AREA_HOME") pathenvvar: "MT_RUN_AREA_HOME")) (set! *configdat* (if (car *configinfo*)(car *configinfo*) #f)) (set! *toppath* (if (car *configinfo*)(cadr *configinfo*) #f)) + (let* ((tmptransport (configf:lookup *configdat* "server" "transport")) + (transport (if tmptransport (string->symbol tmptransport) 'http))) + (if (member transport '(http rpc nmsg)) + (set! *transport-type* transport) + (begin + (debug:print 0 "ERROR: Unrecognised transport " transport) + (exit)))) (let ((linktree (configf:lookup *configdat* "setup" "linktree"))) ;; link tree is critical (if linktree (if (not (file-exists? linktree)) (begin (handle-exceptions Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -283,15 +283,18 @@ args:arg-hash 0)) ;; The watchdog is to keep an eye on things like db sync etc. ;; +(define *time-zero* (current-seconds)) (define *watchdog* (make-thread (lambda () (thread-sleep! 0.05) ;; delay for startup - (let ((legacy-sync (configf:lookup *configdat* "setup" "megatest-db"))) + (let ((legacy-sync (configf:lookup *configdat* "setup" "megatest-db")) + (debug-mode (debug:debug-mode 1)) + (last-time (current-seconds))) (let loop () ;; sync for filesystem local db writes ;; (let ((start-time (current-seconds)) (servers-started (make-hash-table))) @@ -310,17 +313,22 @@ ;; (begin ;; (debug:print-info 0 "Sync is taking a long time, start up a server to assist for run " run-id) ;; (server:kind-run run-id))))) (hash-table-delete! *db-local-sync* run-id))) (mutex-unlock! *db-multi-sync-mutex*)) - (hash-table-keys *db-local-sync*))) - + (hash-table-keys *db-local-sync*)) + (if (and debug-mode + (> (- start-time last-time) 14)) + (begin + (set! last-time start-time) + (debug:print-info 0 "timestamp -> " (seconds->time-string (current-seconds)) ", time since start -> " (seconds->hr-min-sec (- (current-seconds) *time-zero*)))))) + ;; keep going unless time to exit ;; (if (not *time-to-exit*) (begin - (thread-sleep! 1) ;; wait one second before syncing again + (thread-sleep! 5) ;; wait five seconds before syncing again, we'll also sync on exit (loop))))) "Watchdog thread"))) (thread-start! *watchdog*) Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -64,13 +64,14 @@ ;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id)) (if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id) (client:setup run-id) #f)))) +(define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id (define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected ;; clean out old connections - (mutex-lock! *db-multi-sync-mutex*) + ;; (mutex-lock! *db-multi-sync-mutex*) ;; (let ((expire-time (- (current-seconds) 60))) ;; (for-each ;; (lambda (run-id) ;; (let ((connection (hash-table-ref/default *runremote* run-id #f))) ;; (if (and connection @@ -78,11 +79,12 @@ ;; (begin ;; (debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses") ;; ;; SHOULD CLOSE THE CONNECTION HERE ;; (hash-table-delete! *runremote* run-id))))) ;; (hash-table-keys *runremote*))) - (mutex-unlock! *db-multi-sync-mutex*) + ;; (mutex-unlock! *db-multi-sync-mutex*) + ;; (mutex-lock! *send-receive-mutex*) (let* ((run-id (if rid rid 0)) (connection-info (rmt:get-connection-info run-id))) ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) (if connection-info ;; use the server if have connection info @@ -94,20 +96,23 @@ (else (exit)))) (success (if (and dat (vector? dat)) (vector-ref dat 0) #f)) (res (if (and dat (vector? dat)) (vector-ref dat 1) #f))) (http-transport:server-dat-update-last-access connection-info) (if success - (case *transport-type* - ((http) res) ;; (db:string->obj res)) - ((nmsg) res)) ;; (vector-ref res 1))) + (begin + ;; (mutex-unlock! *send-receive-mutex*) + (case *transport-type* + ((http) res) ;; (db:string->obj res)) + ((nmsg) res))) ;; (vector-ref res 1))) (begin ;; let ((new-connection-info (client:setup run-id))) (debug:print 0 "WARNING: Communication failed, trying call to rmt:send-receive again.") ;; (case *transport-type* ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info)))) (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection (if (eq? (modulo attemptnum 5) 0) (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) + ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) ;; no longer killing the server in http-transport:client-api-send-receive ;; may kill it here but what are the criteria? @@ -118,16 +123,18 @@ ;; no connection info? try to start a server (if (and (< attemptnum 15) (tasks:need-server run-id)) (begin (hash-table-delete! *runremote* run-id) + ;; (mutex-unlock! *send-receive-mutex*) (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10) (client:setup run-id) (thread-sleep! (random 5)) ;; give some time to settle and minimize collison? (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))) (begin (debug:print 0 "ERROR: Communication failed!") + ;; (mutex-unlock! *send-receive-mutex*) (exit) ;; (rmt:open-qry-close-locally cmd run-id params)))) ))))) (define (rmt:update-db-stats run-id rawcmd params duration) @@ -335,16 +342,40 @@ '()))) ;; IDEA: Threadify these - they spend a lot of time waiting ... ;; (define (rmt:get-tests-for-runs-mindata run-ids testpatt states status not-in) - (let ((run-id-list (if run-ids + (let ((multi-run-mutex (make-mutex)) + (run-id-list (if run-ids run-ids - (rmt:get-all-run-ids)))) - (apply append (map (lambda (run-id) - (rmt:send-receive 'get-tests-for-run-mindata run-id (list run-id testpatt states status not-in))) - run-id-list)))) + (rmt:get-all-run-ids))) + (result '())) + (if (null? run-id-list) + '() + (for-each + (lambda (th) + (thread-join! th)) ;; I assume that joining completed threads just moves on + (let loop ((hed (car run-id-list)) + (tal (cdr run-id-list)) + (threads '())) + (let* ((newthread (make-thread + (lambda () + (let ((res (rmt:send-receive 'get-tests-for-run-mindata hed (list hed testpatt states status not-in)))) + (if (list? res) + (begin + (mutex-lock! multi-run-mutex) + (set! result (append result res)) + (mutex-unlock! multi-run-mutex)) + (debug:print 0 "ERROR: get-tests-for-run-mindata failed for run-id " hed ", testpatt " testpatt ", states " states ", status " status ", not-in " not-in)))) + (conc "multi-run-thread for run-id " hed))) + (newthreads (cons newthread threads))) + (thread-start! newthread) + (thread-sleep! 0.5) ;; give that thread some time to start + (if (null? tal) + newthreads + (loop (car tal)(cdr tal) newthreads)))))) + result)) (define (rmt:delete-test-records run-id test-id) (rmt:send-receive 'delete-test-records run-id (list run-id test-id))) ;; This is not needed as test steps are deleted on test delete call Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -361,11 +361,11 @@ (maxqry (cdr (rmt:get-max-query-average run-id))) (threshold (string->number (or (configf:lookup *configdat* "server" "server-query-threshold") "10")))) (cond (forced (if (common:low-noise-print 60 run-id "server required is set") - (debug:print-info 0 "Server required is set, starting server.")) + (debug:print-info 0 "Server required is set, starting server for run-id " run-id ".")) #t) ((> maxqry threshold) (if (common:low-noise-print 60 run-id "Max query time execeeded") (debug:print-info 0 "Max avg query time of " maxqry "ms exceeds limit of " threshold "ms, starting server.")) #t) Index: tests/fullrun/megatest.config ================================================================== --- tests/fullrun/megatest.config +++ tests/fullrun/megatest.config @@ -31,13 +31,10 @@ # yes, anything else is no run-wait yes -# Use http instead of direct filesystem access -# transport http -# transport fs # If set to "default" the old code is used. Otherwise defaults to 200 or uses # numeric value given. # runqueue 20 @@ -124,10 +121,16 @@ MAX_ALLOWED_LOAD 200 # XTERM [system xterm] # RUNDEAD [system exit 56] [server] + +# Use http instead of direct filesystem access +transport http +# transport fs +# transport nmsg + synchronous 0 # If the server can't be started on this port it will try the next port until # it succeeds port 8080