Index: api.scm ================================================================== --- api.scm +++ api.scm @@ -18,11 +18,11 @@ ;; You should have received a copy of the GNU General Public License ;; along with Megatest. If not, see . ;; ;;====================================================================== -(use srfi-69 posix) +(use srfi-69 posix srfi-18) (declare (unit api)) (declare (uses rmt)) (declare (uses db)) (declare (uses dbmod)) @@ -141,38 +141,39 @@ tasks-add tasks-set-state-given-param-key )) (define *db-write-mutexes* (make-hash-table)) +(define *api-watchdog* #f) +(define (api:watchdog dbstruct) ;; trim not-used sqlite3 db handles + (let* ((th1 (make-thread (lambda () + (let loop () + (thread-sleep! 60) ;; 2x the age we close at + (db:close-old dbstruct) + (loop))) + "api:watchdog thread"))) + (thread-start! th1) + (set! *api-watchdog* th1))) + ;; These are called by the server on recipt of /api calls ;; - keep it simple, only return the actual result of the call, i.e. no meta info here ;; ;; - returns #( flag result ) ;; (define (api:execute-requests dbstruct dat) (db:open-no-sync-db) ;; sets *no-sync-db* -;; (handle-exceptions -;; exn -;; (let ((call-chain (get-call-chain))) -;; (debug:print 0 *default-log-port* "WARNING: api:execute-requests received an exception from peer, dat=" dat ", exn=" exn) -;; (print-call-chain (current-error-port)) -;; (debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn)) - ;; (vector #f (vector exn call-chain dat))) ;; return some stuff for debug if an exception happens + (if (not *api-watchdog*)(api:watchdog dbstruct)) (if (> *api-process-request-count* 200) (begin (if (common:low-noise-print 30 "too many threads") (debug:print 0 *default-log-port* "WARNING: "*api-process-request-count*" threads, potential overload, adding 0.5 sec delay.")) (thread-sleep! 0.5) ;; take a nap )) (cond ((not (vector? dat)) ;; it is an error to not receive a vector (vector #f (vector #f "remote must be called with a vector"))) - #;((> *api-process-request-count* 200) ;; 20) - (debug:print 0 *default-log-port* "WARNING: api:execute-requests received an overloaded message.") - (set! *server-overloaded* #t) - (vector #f (vector #f 'overloaded))) ;; the inner vector is what gets returned. nope, don't know why. please refactor! (else (let* ((cmd-in (vector-ref dat 0)) (cmd (if (symbol? cmd-in) cmd-in (string->symbol cmd-in))) Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -551,18 +551,15 @@ ((and changed *time-to-exit*) ;; last sync #t) (else #f)))) (if (or dejunk do-cp) - (let* ( - (start-time (current-milliseconds)) + (let* ((start-time (current-milliseconds)) (subdb (or (dbfile:get-subdb dbstruct run-id) (dbfile:init-subdb dbstruct run-id dbfile:db-init-proc))) (mtdb (dbr:subdb-mtdbdat subdb)) - (tmpdb (dbfile:open-db dbstruct run-id dbfile:db-init-proc)) - - ) + (tmpdb (dbfile:open-db dbstruct run-id dbfile:db-init-proc))) (debug:print-info 2 *default-log-port* "delta syncing file: " srcfile ", time diff: " (- time1 time2) " seconds") (if old2new (begin (if dejunk (db:clean-up run-id mtdb)) @@ -4349,38 +4346,38 @@ ;;====================================================================== ;; currently the primary job of the watchdog is to run the sync back to megatest.db from the db in /tmp ;; if we are on the homehost and we are a server (by definition we are on the homehost if we are a server) ;; -(define (common:readonly-watchdog dbstruct) - (thread-sleep! 0.05) ;; delay for startup - (debug:print-info 13 *default-log-port* "common:readonly-watchdog entered.") - ;; sync megatest.db to /tmp/.../megatst.db - (let* ((sync-cool-off-duration 3) - (golden-mtdb (dbr:dbstruct-mtdb dbstruct)) - (golden-mtpath (db:dbdat-get-path golden-mtdb)) - (tmp-mtdb (dbr:dbstruct-tmpdb dbstruct)) - (tmp-mtpath (db:dbdat-get-path tmp-mtdb))) - (debug:print-info 0 *default-log-port* "Read-only periodic sync thread started.") - (let loop ((last-sync-time 0)) - (debug:print-info 13 *default-log-port* "loop top tmp-mtpath="tmp-mtpath" golden-mtpath="golden-mtpath) - (let* ((duration-since-last-sync (- (current-seconds) last-sync-time))) - (debug:print-info 13 *default-log-port* "duration-since-last-sync="duration-since-last-sync) - (if (and (not *time-to-exit*) - (< duration-since-last-sync sync-cool-off-duration)) - (thread-sleep! (- sync-cool-off-duration duration-since-last-sync))) - (if (not *time-to-exit*) - (let ((golden-mtdb-mtime (file-modification-time golden-mtpath)) - (tmp-mtdb-mtime (file-modification-time tmp-mtpath))) - (if (> golden-mtdb-mtime tmp-mtdb-mtime) - (if (< golden-mtdb-mtime (- (current-seconds) 3)) ;; file has NOT been touched in past three seconds, this way multiple servers won't fight to sync back - (let ((res (db:multi-db-sync dbstruct 'old2new))) - (debug:print-info 13 *default-log-port* "rosync called, " res " records transferred.")))) - (loop (current-seconds))) - #t))) - (debug:print-info 0 *default-log-port* "Exiting readonly-watchdog timer, *time-to-exit* = " *time-to-exit*" pid="(current-process-id)" mtpath="golden-mtpath))) - +;; (define (common:readonly-watchdog dbstruct) +;; (thread-sleep! 0.05) ;; delay for startup +;; (debug:print-info 13 *default-log-port* "common:readonly-watchdog entered.") +;; ;; sync megatest.db to /tmp/.../megatst.db +;; (let* ((sync-cool-off-duration 3) +;; (golden-mtdb (dbr:dbstruct-mtdb dbstruct)) +;; (golden-mtpath (db:dbdat-get-path golden-mtdb)) +;; (tmp-mtdb (dbr:dbstruct-tmpdb dbstruct)) +;; (tmp-mtpath (db:dbdat-get-path tmp-mtdb))) +;; (debug:print-info 0 *default-log-port* "Read-only periodic sync thread started.") +;; (let loop ((last-sync-time 0)) +;; (debug:print-info 13 *default-log-port* "loop top tmp-mtpath="tmp-mtpath" golden-mtpath="golden-mtpath) +;; (let* ((duration-since-last-sync (- (current-seconds) last-sync-time))) +;; (debug:print-info 13 *default-log-port* "duration-since-last-sync="duration-since-last-sync) +;; (if (and (not *time-to-exit*) +;; (< duration-since-last-sync sync-cool-off-duration)) +;; (thread-sleep! (- sync-cool-off-duration duration-since-last-sync))) +;; (if (not *time-to-exit*) +;; (let ((golden-mtdb-mtime (file-modification-time golden-mtpath)) +;; (tmp-mtdb-mtime (file-modification-time tmp-mtpath))) +;; (if (> golden-mtdb-mtime tmp-mtdb-mtime) +;; (if (< golden-mtdb-mtime (- (current-seconds) 3)) ;; file has NOT been touched in past three seconds, this way multiple servers won't fight to sync back +;; (let ((res (db:multi-db-sync dbstruct 'old2new))) +;; (debug:print-info 13 *default-log-port* "rosync called, " res " records transferred.")))) +;; (loop (current-seconds))) +;; #t))) +;; (debug:print-info 0 *default-log-port* "Exiting readonly-watchdog timer, *time-to-exit* = " *time-to-exit*" pid="(current-process-id)" mtpath="golden-mtpath))) +;; ;; Get a lock from the no-sync-db for the from-db, then copy the from-db to the to-db, otherwise return #f (define (db:lock-and-sync no-sync-db from-db to-db) (assert (not *db-sync-in-progress*) "FATAL: db:lock-and-sync called while a sync is in progress.") Index: dbfile.scm ================================================================== --- dbfile.scm +++ dbfile.scm @@ -66,10 +66,11 @@ (mtdbdat #f) ;; only need one of these for syncing ;; (dbdats (make-hash-table)) ;; id => dbdat (tmpdbfile #f) ;; /tmp/.../.megatest/1.db ;; (refndbfile #f) ;; /tmp/.../.megatest/1.db_ref (dbstack (make-stack)) ;; stack for tmp dbr:dbdat, + (stack-mutex (make-mutex)) ;; gate pop, push, peek and replace with this mutex (allows safe clean up of old handles) (homehost #f) ;; not used yet (on-homehost #f) ;; not used yet (read-only #f) (last-sync 0) (last-write (current-seconds)) @@ -79,11 +80,13 @@ (defstruct dbr:dbdat (dbfile #f) (dbh #f) (stmt-cache (make-hash-table)) (read-only #f) - (birth-sec (current-seconds))) + (birth-sec (current-seconds)) + (last-used (current-seconds)) + (in-use #f)) (define *dbstruct-dbs* #f) (define *db-open-mutex* (make-mutex)) (define *db-access-mutex* (make-mutex)) ;; used in common.scm (define *no-sync-db* #f) @@ -128,38 +131,53 @@ ) )))) ;; close all opened run-id dbs (define (db:close-all dbstruct) - (if (dbr:dbstruct? dbstruct) -;; (handle-exceptions -;; exn -;; (begin -;; (debug:print 0 *default-log-port* "WARNING: Finalizing failed, " ((condition-property-accessor 'exn 'message) exn) ", note - exn=" exn) -;; (print-call-chain *default-log-port*)) - ;; (db:sync-touched dbstruct 0 force-sync: #t) ;; NO. Do not do this here. Instead we rely on a server to be started when there are writes, even if the server itself is not going to be used as a server. - (let* ((subdbs (hash-table-values (dbr:dbstruct-subdbs dbstruct)))) - (for-each - (lambda (subdb) - (let* ((tdbs (stack->list (dbr:subdb-dbstack subdb))) - (mtdbdat (dbr:dbdat-dbh (dbr:subdb-mtdbdat subdb))) - #;(rdb (dbr:dbdat-dbh (dbr:subdb-refndb subdb)))) - - (map (lambda (dbdat) - (let* ((stmt-cache (dbr:dbdat-stmt-cache dbdat)) - (dbh (dbr:dbdat-dbh dbdat))) - (db:safely-close-sqlite3-db dbh stmt-cache))) - tdbs) - (db:safely-close-sqlite3-db mtdbdat (dbr:dbdat-stmt-cache (dbr:subdb-mtdbdat subdb))) - ;; (if (sqlite3:database? mdb) (sqlite3:finalize! mdb)) - #;(db:safely-close-sqlite3-db rdb #f))) ;; stmt-cache))))) ;; (if (sqlite3:database? rdb) (sqlite3:finalize! rdb)))))) - subdbs) - #t - ) - #f - ) -) + (assert (dbr:dbstruct? dbstruct) "FATAL: db:close-all called with non-dbstruct "dbstruct) + (let* ((subdbs (hash-table-values (dbr:dbstruct-subdbs dbstruct)))) + (for-each + (lambda (subdb) + (mutex-lock! (dbr:subdb-stack-mutex subdb)) + (let* ((tdbs (stack->list (dbr:subdb-dbstack subdb))) + (mtdbdat (dbr:dbdat-dbh (dbr:subdb-mtdbdat subdb)))) + (map (lambda (dbdat) + (let* ((stmt-cache (dbr:dbdat-stmt-cache dbdat)) + (dbh (dbr:dbdat-dbh dbdat))) + (db:safely-close-sqlite3-db dbh stmt-cache))) + tdbs) + (db:safely-close-sqlite3-db mtdbdat (dbr:dbdat-stmt-cache (dbr:subdb-mtdbdat subdb)))) + (mutex-unlock! (dbr:subdb-stack-mutex subdb))) + subdbs))) + +;; close opened run-id dbs that haven't been used in age seconds +(define (db:close-old dbstruct #!key (age 30)) ;; close dbs older than this age + (assert (dbr:dbstruct? dbstruct) "FATAL: db:close-all called with non-dbstruct "dbstruct) + (let* ((subdbs (hash-table-values (dbr:dbstruct-subdbs dbstruct)))) + (for-each + (lambda (subdb) + (mutex-lock! (dbr:subdb-stack-mutex subdb)) + (let* ((tdbs (stack->list (dbr:subdb-dbstack subdb))) + (mtdbdat (dbr:dbdat-dbh (dbr:subdb-mtdbdat subdb)))) + (dbr:subdb-dbstack-set! subdb (make-stack)) ;; replace the stack with a new one + (map (lambda (dbdat) + (assert (dbr:dbdat-in-use dbdat) "FATAL: dbdat in stack was in use "(dbr:dbdat-dbfile dbdat)) + (if (< (- (current-seconds) + (dbr:dbdat-last-used dbdat)) + age) + (stack-push! (dbr:subdb-dbstack subdb) dbdat) ;; keep it + (let* ((stmt-cache (dbr:dbdat-stmt-cache dbdat)) ;; close and discard + (dbh (dbr:dbdat-dbh dbdat))) + (dbfile:print-err "INFO: closing unused dbdat for "(dbr:dbdat-dbfile dbdat)) + (db:safely-close-sqlite3-db dbh stmt-cache)))) + tdbs) + (let* ((size (stack-count (dbr:subdb-dbstack subdb))) + (delta (- (length tdbs) size))) + (if (> delta 0) + (dbfile:print-err "INFO: removed "delta" and "size" dbs left.")))) + (mutex-unlock! (dbr:subdb-stack-mutex subdb))) + subdbs))) ;; ;; set up a single db (e.g. main.db, 1.db ... etc.) ;; ;; ;; (define (db:setup-db dbstruct areapath run-id) ;; (let* ((dbname (db:run-id->dbname run-id)) @@ -234,18 +252,24 @@ ;; if db not open, open inmem, rundb and sync then return inmem ;; inuse gets set automatically for rundb's ;; (define (dbfile:get-dbdat dbstruct run-id) (let* ((subdb (dbfile:get-subdb dbstruct run-id))) - (if (stack-empty? (dbr:subdb-dbstack subdb)) - #f - (begin - (stack-pop! (dbr:subdb-dbstack subdb)))))) + (mutex-lock! (dbr:subdb-stack-mutex subdb)) + (let* ((res (if (stack-empty? (dbr:subdb-dbstack subdb)) + #f + (let ((dbdat (stack-pop! (dbr:subdb-dbstack subdb)))) + (dbr:dbdat-last-used-set! dbdat (current-seconds)) + (dbr:dbdat-in-use-set! dbdat #t) + dbdat)))) + (mutex-unlock! (dbr:subdb-stack-mutex subdb)) + res))) ;; return a previously opened db handle to the stack of available handles (define (dbfile:add-dbdat dbstruct run-id dbdat) (let* ((subdb (dbfile:get-subdb dbstruct run-id))) + (dbr:dbdat-in-use-set! dbdat #f) (stack-push! (dbr:subdb-dbstack subdb) dbdat) dbdat)) ;; set up a subdb ;;