Index: dbmod.scm ================================================================== --- dbmod.scm +++ dbmod.scm @@ -259,13 +259,14 @@ (delete-file synclock-file)) (thethread)) (debug:print 0 *default-log-port* "Skipping sync, lockfile "synclock-file" found.")) (thethread))))))) ;; (dbmod:sync-tables tables #f db cachedb) - ;; (if db + ;; + (thread-sleep! 1) ;; let things settle before syncing in needed data (dbmod:sync-gasket tables #f cachedb db dbfullname 'fromdest keys) ;; ) ;; load into cachedb - (dbr:dbstruct-last-update-set! dbstruct (current-seconds)) ;; should this be offset back in time by one second? + (dbr:dbstruct-last-update-set! dbstruct (+ (current-seconds) -10)) ;; should this be offset back in time by one second? dbstruct)) ;; (if (eq? syncdir 'todisk) ;; sync to disk normally, sync from in dashboard ;; (dbmod:sync-tables tables last-update cachedb db) ;; (dbmod:sync-tables tables last-update db cachedb)))) @@ -468,97 +469,97 @@ (conc "SELECT name FROM pragma_table_info('"tablename"') as tblInfo;")) has-last)) ;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) ) ;; -;; direction = fromdest, todest +;; direction = fromdest, todisk ;; mode = 'full, 'incr ;; ;; Idea: youngest in dest is last_update time ;; (define (dbmod:attach-sync tables dbh destdbfile direction #!key (mode 'full) (no-update '("keys")) ;; do ) - (debug:print 0 *default-log-port* "Doing sync "direction" "destdbfile) - (if (not (sqlite3:auto-committing? dbh)) - (debug:print 0 *default-log-port* "Skipping sync due to transaction in flight.") - (let* ((table-names (map car tables)) - (dest-exists (file-exists? destdbfile))) - (assert dest-exists "FATAL: sync called with non-existant file, "destdbfile) - ;; attach the destdbfile - ;; for each table - ;; insert into dest. select * from src.
where last_update>last_update - ;; done - (debug:print 0 *default-log-port* "Attaching "destdbfile" as auxdb") - (sqlite3:execute dbh (conc "ATTACH '"destdbfile"' AS auxdb;")) - (for-each - (lambda (table) - (let* ((tbldat (alist-ref table tables equal?)) - (fields (map car tbldat)) - (no-id-fields (filter (lambda (x)(not (equal? x "id"))) fields)) - (fields-str (string-intersperse fields ",")) - (no-id-fields-str (string-intersperse no-id-fields ",")) - (dir (eq? direction 'todest)) - (fromdb (if dir "" "auxdb.")) - (todb (if dir "auxdb." "")) - (set-str (string-intersperse - (map (lambda (field) - (conc fromdb field"="todb field)) - fields) - ",")) - (stmt1 (conc "INSERT OR IGNORE INTO "todb table - " SELECT * FROM "fromdb table";")) - (stmt8 (conc "UPDATE "todb table" SET ("no-id-fields-str") = (SELECT "no-id-fields-str" FROM "fromdb table" WHERE "todb table".id="fromdb table".id" - (if (member "last_update" fields) - (conc " AND "fromdb table".last_update > "todb table".last_update);") - ");"))) - (start-ms (current-milliseconds))) - ;; (debug:print 0 *default-log-port* "stmt8="stmt8) - ;; (if (sqlite3:auto-committing? dbh) - ;; (begin - (mutex-lock! *db-transaction-mutex*) - (sqlite3:with-transaction - dbh - (lambda () - (debug:print-info 0 *default-log-port* "Sync from "fromdb table" to "todb table" using "stmt1) - (sqlite3:execute dbh stmt1) ;; get all new rows - - #;(if (member "last_update" fields) - (sqlite3:execute dbh stmt8)) ;; get all updated rows - ;; (sqlite3:execute dbh stmt5) - ;; (sqlite3:execute dbh stmt4) ;; if it worked this would be better for incremental up - ;; (sqlite3:execute dbh stmt6) - )) - (debug:print 0 *default-log-port* "Synced table "table - " in "(- (current-milliseconds) start-ms)"ms") ;; ) - (mutex-unlock! *db-transaction-mutex*))) - - ;; (debug:print 0 *default-log-port* "Skipping sync of table "table" due to transaction in flight.")))) - table-names) - (sqlite3:execute dbh "DETACH auxdb;")))) - -;; FAILED ATTEMPTS - - ;; (if (not (has-last-update dbh table)) - ;; (sqlite3:execute dbh (conc "ALTER TABLE "table" ADD COLUMN last_update INTEGER;"))) - ;; (if (not (has-last-update dbh (conc "auxdb."table))) - ;; (sqlite3:execute dbh (conc "ALTER TABLE auxdb."table" ADD COLUMN last_update INTEGER;"))) - - ;; (stmt2 (conc "INSERT OR REPLACE INTO "todb table - ;; " SELECT * FROM "fromdb table" WHERE " - ;; fromdb table".last_update > " - ;; todb table".last_update;")) - ;; (stmt3 (conc "INSERT OR REPLACE INTO "todb"."table - ;; " SELECT * FROM "fromdb table";")) - ;; (stmt4 (conc "DELETE FROM "todb table" WHERE "fromdb - ;; table ".last_update > "todb table".last_update;")) - ;; (stmt5 (conc "DELETE FROM "todb table";")) - ;; (stmt6 (conc "INSERT OR REPLACE INTO "todb table" ("fields-str") SELECT "fields-str" FROM "fromdb table";")) - ;; (stmt7 (conc "UPDATE "todb table" SET "set-str (if (member "last_update" fields) - ;; (conc " WHERE "fromdb table".last_update > "todb table".last_update;") - ;; ";"))) + (let* ((num-changes 0) + (update-changed (lambda (num-changed table qryname) + (if (> num-changed 0) + (begin + (debug:print-info 0 *default-log-port* "Changed "num-changed" rows for table "table", qry "qryname) + (set! num-changes (+ num-changes num-changed))))))) + (debug:print 0 *default-log-port* "Doing sync "direction" "destdbfile) + (if (not (sqlite3:auto-committing? dbh)) + (debug:print 0 *default-log-port* "Skipping sync due to transaction in flight.") + (let* ((table-names (map car tables)) + (dest-exists (file-exists? destdbfile))) + (assert dest-exists "FATAL: sync called with non-existant file, "destdbfile) + ;; attach the destdbfile + ;; for each table + ;; insert into dest.
select * from src.
where last_update>last_update + ;; done + (debug:print 0 *default-log-port* "Attaching "destdbfile" as auxdb") + (sqlite3:execute dbh (conc "ATTACH '"destdbfile"' AS auxdb;")) + (for-each + (lambda (table) + (let* ((tbldat (alist-ref table tables equal?)) + (fields (map car tbldat)) + (no-id-fields (filter (lambda (x)(not (equal? x "id"))) fields)) + (fields-str (string-intersperse fields ",")) + (no-id-fields-str (string-intersperse no-id-fields ",")) + (dir (eq? direction 'todisk)) + (fromdb (if dir "main." "auxdb.")) + (todb (if dir "auxdb." "main.")) + (set-str (string-intersperse + (map (lambda (field) + (conc fromdb field"="todb field)) + fields) + ",")) + (stmt1 (conc "INSERT OR IGNORE INTO "todb table + " SELECT * FROM "fromdb table";")) + (stmt2 (conc "INSERT OR IGNORE INTO "todb table + " SELECT * FROM "fromdb table" WHERE "fromdb table".id=?;")) + (stmt8 (conc "UPDATE "todb table" SET ("no-id-fields-str") = (SELECT "no-id-fields-str" FROM "fromdb table" WHERE "todb table".id="fromdb table".id" + (conc " AND "fromdb table".last_update > "todb table".last_update);") + ");")) + (stmt9 (conc "UPDATE "todb table" SET ("no-id-fields-str") = " + "(SELECT "no-id-fields-str" FROM "fromdb table" WHERE "fromdb table".id=?)" + " WHERE "todb table".id=?")) + (newrec (conc "SELECT id FROM "fromdb table" WHERE id NOT IN (SELECT id FROM "todb table");")) + #;(changedrec (conc "SELECT id FROM "fromdb table" WHERE "fromdb table".last_update > "todb table".last_update AND " + fromdb table".id="todb table".id;")) ;; main = fromdb + (changedrec (conc "SELECT "fromdb table".id FROM "fromdb table" join "todb table" on "fromdb table".id="todb table".id WHERE "fromdb table".last_update > "todb table".last_update;")) + ;; SELECT main.tests.id FROM main.tests join auxdb.tests on main.tests.id=auxdb.tests.id WHERE main.tests.last_update > auxdb.tests.last_update;" + (start-ms (current-milliseconds)) + (new-ids (sqlite3:fold-row (lambda (res id)(cons id res)) '() dbh newrec))) + ;; (debug:print 0 *default-log-port* "Got "(length aux-ids)" in aux-ids and "(length main-ids)" in main-ids") + (update-changed (length new-ids) table "new records") + (mutex-lock! *db-transaction-mutex*) + (sqlite3:with-transaction + dbh + (lambda () + (for-each (lambda (id) + (sqlite3:execute dbh stmt2 id)) + new-ids))) + (if (member "last_update" fields) + (sqlite3:with-transaction + dbh + (lambda () + (let* ((changed-ids (sqlite3:fold-row (lambda (res id)(cons id res)) '() dbh changedrec))) + (update-changed (length changed-ids) table "changed records") + (for-each (lambda (id) + (sqlite3:execute dbh stmt9 id id)) + changed-ids))))) + + (mutex-unlock! *db-transaction-mutex*) + + (debug:print 0 *default-log-port* "Synced table "table + " in "(- (current-milliseconds) start-ms)"ms") + + )) + table-names) + (sqlite3:execute dbh "DETACH auxdb;"))) + num-changes)) ;; prefix is "" or "auxdb." ;; ;; (define (dbmod:last-update-patch dbh prefix) ;; (let (( Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -23,12 +23,10 @@ (define (toplevel-command . a) #f) (declare (uses common)) ;; (declare (uses megatest-version)) ;; (declare (uses margs)) -;; (declare (uses mtargs)) -;; (declare (uses mtargs.import)) (declare (uses mtargs)) (declare (uses mtargs.import)) (declare (uses debugprint)) (declare (uses debugprint.import)) (declare (uses commonmod)) @@ -2571,14 +2569,16 @@ ;; (if (args:get-arg "-db2db") (let* ((duh (launch:setup)) (src-db (args:get-arg "-from")) (dest-db (args:get-arg "-to")) - (sync-period (args:get-arg-number "-period")) - (sync-timeout (args:get-arg-number "-timeout")) - ;; (sync-period (if sync-period-in (string->number sync-period-in) #f)) - ;; (sync-timeout (if sync-timeout-in (string->number sync-timeout-in) #f)) + ;; (sync-period (args:get-arg-number "-period")) + ;; (sync-timeout (args:get-arg-number "-timeout")) + (sync-period-in (args:get-arg "-period")) + (sync-timeout-in (args:get-arg "-timeout")) + (sync-period (if sync-period-in (string->number sync-period-in) #f)) + (sync-timeout (if sync-timeout-in (string->number sync-timeout-in) #f)) (lockfile (conc dest-db".sync-lock")) (keys (db:get-keys #f)) (thesync (lambda (last-update) (debug:print-info 0 *default-log-port* "Attempting to sync data from "src-db" to "dest-db"...") (if (not (file-exists? dest-db)) Index: transport-mode.scm.template ================================================================== --- transport-mode.scm.template +++ transport-mode.scm.template @@ -13,10 +13,10 @@ ;; (dbfile:sync-method 'none) ;; (dbfile:cache-method 'none) ;; (rmt:transport-mode 'nfs) ;; uncomment this block to test with tcp -(dbfile:sync-method 'original) ;; attach) ;; original +(dbfile:sync-method 'attach) ;; attach) ;; original (dbfile:cache-method 'tmp) (rmt:transport-mode 'tcp)