Overview
Comment: | sync working? |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v1.7001-multi-db-rb01 |
Files: | files | file ages | folders |
SHA1: |
f2cf1492f87d00a6e58667244e636485 |
User & Date: | matt on 2022-04-07 06:38:39 |
Other Links: | branch diff | manifest | tags |
Context
2022-04-07
| ||
07:04 | wip check-in: 5209afd099 user: matt tags: v1.7001-multi-db-rb01 | |
06:38 | sync working? check-in: f2cf1492f8 user: matt tags: v1.7001-multi-db-rb01 | |
05:16 | wip check-in: ad6bc47730 user: matt tags: v1.7001-multi-db-rb01 | |
Changes
Modified common.scm from [4413045495] to [914a0f730c].
︙ | ︙ | |||
147 148 149 150 151 152 153 | ;; db sync (define *db-last-sync* 0) ;; last time the sync to megatest.db happened (define *db-sync-in-progress* #f) ;; if there is a sync in progress do not try to start another (define *db-multi-sync-mutex* (make-mutex)) ;; protect access to *db-sync-in-progress*, *db-last-sync* ;; task db (define *task-db* #f) ;; (vector db path-to-db) (define *db-access-allowed* #t) ;; flag to allow access | | | | 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | ;; db sync (define *db-last-sync* 0) ;; last time the sync to megatest.db happened (define *db-sync-in-progress* #f) ;; if there is a sync in progress do not try to start another (define *db-multi-sync-mutex* (make-mutex)) ;; protect access to *db-sync-in-progress*, *db-last-sync* ;; task db (define *task-db* #f) ;; (vector db path-to-db) (define *db-access-allowed* #t) ;; flag to allow access ;; (define *db-access-mutex* (make-mutex)) ;; moved to dbfile (define *db-transaction-mutex* (make-mutex)) (define *db-cache-path* #f) (define *db-with-db-mutex* (make-mutex)) (define *db-api-call-time* (make-hash-table)) ;; hash of command => (list of times) ;; no sync db ;; (define *no-sync-db* #f) ;; moved to dbfile ;; SERVER (define *my-client-signature* #f) (define *transport-type* 'http) ;; override with [server] transport http|rpc|nmsg (define *runremote* #f) ;; if set up for server communication this will hold <host port> ;; (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) |
︙ | ︙ |
Modified db.scm from [0b86155e4d] to [40daf428a9].
︙ | ︙ | |||
2107 2108 2109 2110 2111 2112 2113 | ;;====================================================================== ;; no-sync.db - small bits of data to be shared between servers ;;====================================================================== (define (db:open-no-sync-db) (dbfile:open-no-sync-db (db:dbfile-path))) | < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < | 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 | ;;====================================================================== ;; no-sync.db - small bits of data to be shared between servers ;;====================================================================== (define (db:open-no-sync-db) (dbfile:open-no-sync-db (db:dbfile-path))) (define (db:no-sync-close-db db stmt-cache) (db:safely-close-sqlite3-db db stmt-cache)) ;; use a global for some primitive caching, it is just silly to ;; re-read the db over and over again for the keys since they never ;; change ;; why get the keys from the db? why not get from the *configdat* |
︙ | ︙ | |||
5048 5049 5050 5051 5052 5053 5054 | (define (db:lock-and-sync no-sync-db from-db to-db) (assert (not *db-sync-in-progress*) "FATAL: db:lock-and-sync called while a sync is in progress.") (let* ((lockdat (db:no-sync-get-lock no-sync-db from-db)) (gotlock (car lockdat)) (locktime (cdr lockdat))) (if gotlock (begin | | | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | | < | < < < < < < < < < < < < < < < < < < < < < < < < < | 4991 4992 4993 4994 4995 4996 4997 4998 4999 5000 5001 5002 5003 5004 5005 5006 5007 5008 5009 5010 5011 5012 5013 5014 5015 5016 5017 5018 5019 5020 5021 5022 5023 5024 5025 5026 5027 5028 5029 5030 5031 5032 5033 5034 5035 5036 5037 5038 5039 5040 5041 5042 5043 5044 5045 5046 5047 5048 5049 5050 5051 5052 5053 5054 5055 5056 5057 5058 5059 5060 5061 5062 5063 5064 5065 5066 5067 5068 5069 5070 5071 5072 5073 | (define (db:lock-and-sync no-sync-db from-db to-db) (assert (not *db-sync-in-progress*) "FATAL: db:lock-and-sync called while a sync is in progress.") (let* ((lockdat (db:no-sync-get-lock no-sync-db from-db)) (gotlock (car lockdat)) (locktime (cdr lockdat))) (if gotlock (begin (file-copy from-db to-db #t) (db:no-sync-del! no-sync-db from-db) #t) #f))) ;; sync for filesystem local db writes ;; (define (db:run-lock-and-sync no-sync-db) (let* ((tmp-area (common:get-db-tmp-area)) (dbfiles (glob (conc tmp-area"/.db/*.db"))) (sync-durations (make-hash-table))) ;; (debug:print-info 0 *default-log-port* "lock-and-sync, dbfiles: "dbfiles) (for-each (lambda (file) (let* ((fname (conc (pathname-file file) ".db")) (fulln (conc *toppath*"/.db/"fname)) (time1 (file-modification-time file)) (time2 (file-modification-time fulln)) (changed (> time1 time2)) (do-cp (cond ((not (file-exists? fulln)) ;; shouldn't happen, but this might recover (debug:print-info 0 *default-log-port* "File "fulln" not found! Copying "fname" to "fulln) #t) (changed ;; (and changed ;; (> (- (current-seconds) time1) 3)) ;; if file is changed and three seconds have passed. #t) ((and changed *time-to-exit*) ;; last copy #t) (else #f)))) (if do-cp (let* ((start-time (current-milliseconds))) (debug:print-info 0 *default-log-port* "sync file: "file", fname: "fname", time1: "time1", time2: "time2) (db:lock-and-sync no-sync-db file fulln) (hash-table-set! sync-durations (conc fname".db") (- (current-milliseconds) start-time))) #;(debug:print-info 0 *default-log-port* "skipping sync...")))) dbfiles) (hash-table->alist sync-durations))) ;; straight forward copy based sync ;; 1. for each .db fil ;; 2. next if file changed since last sync cycle ;; 2. next if time delta /tmp file to MTRA less than 3 seconds ;; 3. get a lock for the file in nosyncdb ;; 4. copy the file ;; 5. when copy is done release the lock ;; ;; DONE (define (server:writable-watchdog-copysync dbstruct) (thread-sleep! 0.05) ;; delay for startup (let ((legacy-sync (common:run-sync?)) (sync-stale-seconds (configf:lookup-number *configdat* "server" "sync-stale-seconds" default: 300)) (debug-mode (debug:debug-mode 1)) (last-time (current-seconds)) ;; last time through the sync loop (no-sync-db (db:open-no-sync-db)) (sync-duration 0) ;; run time of the sync in milliseconds (tmp-area (common:get-db-tmp-area))) ;; Sync moved to http-transport keep-running loop (set! *no-sync-db* no-sync-db) ;; make the no sync db available to api calls (debug:print-info 2 *default-log-port* "Periodic copy-based sync thread started. sync is "legacy-sync", tmp-area is "tmp-area) (debug:print-info 3 *default-log-port* "watchdog starting. legacy-sync is "legacy-sync" pid="(current-process-id));; " this-wd-num="this-wd-num) (if (and legacy-sync (not *time-to-exit*)) (begin (debug:print-info 0 *default-log-port* "Server running, periodic copy-based sync started.") (let loop () ;; run the sync and print out durations (debug:print-info 0 *default-log-port* "Sync durations: "(db:run-lock-and-sync no-sync-db)) ;; keep going unless time to exit ;; (if (not *time-to-exit*) (let delay-loop ((count 0)) ;;(debug:print-info 13 *default-log-port* "delay-loop top; count="count" pid="(current-process-id)" this-wd-num="this-wd-num" *time-to-exit*="*time-to-exit*) (if (and (not *time-to-exit*) |
︙ | ︙ |
Modified dbfile.scm from [4ab8a27c28] to [9671b9ee01].
︙ | ︙ | |||
71 72 73 74 75 76 77 78 79 80 81 82 83 84 | (defstruct dbr:dbdat (dbfile #f) (dbh #f) (stmt-cache (make-hash-table)) (read-only #f)) (define *dbstruct-dbs* #f) (define (dbfile:run-id->key run-id) (or run-id 'main)) (define (db:safely-close-sqlite3-db db stmt-cache #!key (try-num 3)) (if (<= try-num 0) #f | > > | 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 | (defstruct dbr:dbdat (dbfile #f) (dbh #f) (stmt-cache (make-hash-table)) (read-only #f)) (define *dbstruct-dbs* #f) (define *db-access-mutex* (make-mutex)) (define *no-sync-db* #f) (define (dbfile:run-id->key run-id) (or run-id 'main)) (define (db:safely-close-sqlite3-db db stmt-cache #!key (try-num 3)) (if (<= try-num 0) #f |
︙ | ︙ | |||
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 | (dbfile:print-err " db, " (dbr:dbdat-dbfile tmpdb) " already exists or fresh enough, not propogating data from\n " (dbr:dbdat-dbfile mtdb) " mod time delta: " modtimedelta) ) ;; (db:multi-db-sync subdb 'old2new)) ;; migrate data from megatest.db automatically tmpdb)) ;;====================================================================== ;; no-sync.db - small bits of data to be shared between servers ;;====================================================================== (define (dbfile:open-no-sync-db dbpath) (let* (;; (dbpath (db:dbfile-path)) (dbname (conc dbpath "/no-sync.db")) (db-exists (file-exists? dbname)) (db (sqlite3:open-database dbname))) (sqlite3:set-busy-handler! db (sqlite3:make-busy-timeout 136000)) (if (not db-exists) (begin (sqlite3:execute db "PRAGMA synchronous = 0;") (sqlite3:execute db "CREATE TABLE IF NOT EXISTS no_sync_metadat (var TEXT,val TEXT, CONSTRAINT no_sync_metadat_constraint UNIQUE (var));") (sqlite3:execute db "PRAGMA journal_mode=WAL;"))) db)) ;;====================================================================== ;; file utils ;;====================================================================== ;;====================================================================== ;; lazy-safe get file mod time. on any error (file not existing etc.) return 0 | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 | (dbfile:print-err " db, " (dbr:dbdat-dbfile tmpdb) " already exists or fresh enough, not propogating data from\n " (dbr:dbdat-dbfile mtdb) " mod time delta: " modtimedelta) ) ;; (db:multi-db-sync subdb 'old2new)) ;; migrate data from megatest.db automatically tmpdb)) ;;====================================================================== ;; no-sync.db - small bits of data to be shared between servers ;;====================================================================== ;; if we are not a server create a db handle. this is not finalized ;; so watch for problems. I'm still not clear if it is needed to manually ;; finalize sqlite3 dbs with the sqlite3 egg. ;; (define (db:no-sync-db db-in) (mutex-lock! *db-access-mutex*) (let ((res (if db-in db-in (let ((db (dbfile:open-no-sync-db))) (set! *no-sync-db* db) db)))) (mutex-unlock! *db-access-mutex*) res)) (define (dbfile:open-no-sync-db dbpath) (let* (;; (dbpath (db:dbfile-path)) (dbname (conc dbpath "/no-sync.db")) (db-exists (file-exists? dbname)) (db (sqlite3:open-database dbname))) (sqlite3:set-busy-handler! db (sqlite3:make-busy-timeout 136000)) (if (not db-exists) (begin (sqlite3:execute db "PRAGMA synchronous = 0;") (sqlite3:execute db "CREATE TABLE IF NOT EXISTS no_sync_metadat (var TEXT,val TEXT, CONSTRAINT no_sync_metadat_constraint UNIQUE (var));") (sqlite3:execute db "PRAGMA journal_mode=WAL;"))) db)) (define (db:no-sync-set db var val) (sqlite3:execute (db:no-sync-db db) "INSERT OR REPLACE INTO no_sync_metadat (var,val) VALUES (?,?);" var val)) (define (db:no-sync-del! db var) (sqlite3:execute (db:no-sync-db db) "DELETE FROM no_sync_metadat WHERE var=?;" var)) (define (db:no-sync-get/default db var default) (let ((res default)) (sqlite3:for-each-row (lambda (val) (set! res val)) (db:no-sync-db db) "SELECT val FROM no_sync_metadat WHERE var=?;" var) (if res (let ((newres (if (string? res) (string->number res) #f))) (if newres newres res)) res))) ;; transaction protected lock aquisition ;; either: ;; fails returns (#f . lock-creation-time) ;; succeeds (returns (#t . lock-creation-time) ;; use (db:no-sync-del! db keyname) to release the lock ;; (define (db:no-sync-get-lock db-in keyname) (let ((db (db:no-sync-db db-in))) (sqlite3:with-transaction db (lambda () (handle-exceptions exn (let ((lock-time (current-seconds))) ;; (debug:print-info 2 *default-log-port* "db:no-sync-get-lock keyname=" keyname ", lock-time=" lock-time ", exn=" exn) (sqlite3:execute db "INSERT INTO no_sync_metadat (var,val) VALUES(?,?);" keyname lock-time) `(#t . ,lock-time)) `(#f . ,(sqlite3:first-result db "SELECT val FROM no_sync_metadat WHERE var=?;" keyname))))))) ;;====================================================================== ;; file utils ;;====================================================================== ;;====================================================================== ;; lazy-safe get file mod time. on any error (file not existing etc.) return 0 |
︙ | ︙ |
Modified http-transport.scm from [58ed70fa3e] to [df8bc9e37e].
︙ | ︙ | |||
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | (declare (uses db)) (declare (uses tests)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (declare (uses server)) ;; (declare (uses daemon)) (declare (uses portlogger)) (declare (uses rmt)) (include "common_records.scm") (include "db_records.scm") (include "js-path.scm") (require-library stml) (define (http-transport:make-server-url hostport) (if (not hostport) #f (conc "http://" (car hostport) ":" (cadr hostport)))) | > > > | 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | (declare (uses db)) (declare (uses tests)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (declare (uses server)) ;; (declare (uses daemon)) (declare (uses portlogger)) (declare (uses rmt)) (declare (uses dbfile)) (include "common_records.scm") (include "db_records.scm") (include "js-path.scm") (import dbfile) (require-library stml) (define (http-transport:make-server-url hostport) (if (not hostport) #f (conc "http://" (car hostport) ":" (cadr hostport)))) |
︙ | ︙ | |||
462 463 464 465 466 467 468 | ;; Use this opportunity to sync the tmp db to megatest.db (if (not server-going) ;; *dbstruct-dbs* (begin (debug:print 0 *default-log-port* "SERVER: dbprep") (set! *dbstruct-dbs* (db:setup #t)) ;; run-id)) FIXME!!! (set! server-going #t) (debug:print 0 *default-log-port* "SERVER: running, megatest version: " (common:get-full-version)) ;; NOTE: the server is NOT yet marked as running in the log. We do that in the keep-running routine. | | > > | 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 | ;; Use this opportunity to sync the tmp db to megatest.db (if (not server-going) ;; *dbstruct-dbs* (begin (debug:print 0 *default-log-port* "SERVER: dbprep") (set! *dbstruct-dbs* (db:setup #t)) ;; run-id)) FIXME!!! (set! server-going #t) (debug:print 0 *default-log-port* "SERVER: running, megatest version: " (common:get-full-version)) ;; NOTE: the server is NOT yet marked as running in the log. We do that in the keep-running routine. (thread-start! *watchdog*)) (if *no-sync-db* (db:run-lock-and-sync *no-sync-db*))) ;; when things go wrong we don't want to be doing the various queries too often ;; so we strive to run this stuff only every four seconds or so. (let* ((sync-time (- (current-milliseconds) start-time)) (rem-time (quotient (- 4000 sync-time) 1000))) (if (and (<= rem-time 4) (> rem-time 0)) |
︙ | ︙ | |||
488 489 490 491 492 493 494 | (let ((new-iface (car sdat)) (new-port (cadr sdat))) (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info") (set! iface new-iface) (set! port new-port) (if (not *server-id*) (set! *server-id* (server:mk-signature))) | < < | 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 | (let ((new-iface (car sdat)) (new-port (cadr sdat))) (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info") (set! iface new-iface) (set! port new-port) (if (not *server-id*) (set! *server-id* (server:mk-signature))) (debug:print 0 *default-log-port* "SERVER STARTED: " iface ":" port " AT " (current-seconds) " server-id: " *server-id*) (flush-output *default-log-port*))) ;; Transfer *db-last-access* to last-access to use in checking that we are still alive (mutex-lock! *heartbeat-mutex*) (set! last-access *db-last-access*) (mutex-unlock! *heartbeat-mutex*) (if (common:low-noise-print 120 (conc "server running on " iface ":" port)) (begin (if (not *server-id*) (set! *server-id* (server:mk-signature))) (debug:print 0 *default-log-port* "SERVER STARTED: " iface ":" port " AT " (current-seconds) " server-id: " *server-id*) (flush-output *default-log-port*))) (if (common:low-noise-print 60 "dbstats") (begin (debug:print 0 *default-log-port* "Server stats:") (db:print-current-query-stats))) (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600))) |
︙ | ︙ |