Index: dbmod.scm ================================================================== --- dbmod.scm +++ dbmod.scm @@ -224,11 +224,11 @@ (dbr:dbstruct-dbtmpname-set! dbstruct tmpdb) (dbr:dbstruct-dbfname-set! dbstruct dbfname) (dbr:dbstruct-sync-proc-set! dbstruct (lambda (last-update) (if *sync-in-progress* - (debug:print 3 *default-log-port* "WARNING: overlapping calls to sync to disk") + (debug:print 0 *default-log-port* "WARNING: overlapping calls to sync to disk") (let* ((syncer-logfile (conc areapath"/logs/"dbfname"-syncer.log")) (sync-cmd (if (eq? syncdir 'todisk) (conc "(NBFAKE_LOG="syncer-logfile" nbfake megatest -db2db -from "tmpdb" -to "dbfullname" -period 5 -timeout 10 > /dev/null 2&>1)&") (conc "(NBFAKE_LOG="syncer-logfile" nbfake megatest -db2db -from "dbfullname" -to "tmpdb" -period 5 -timeout 10 > /dev/null 2&>1)&"))) (synclock-file (conc dbfullname".lock")) @@ -242,11 +242,11 @@ (thethread (lambda () (thread-start! (make-thread (lambda () (set! *sync-in-progress* #t) - (debug:print-info "Running "sync-cmd) + (debug:print 0 *default-log-port* "Running "sync-cmd) (if (file-exists? syncer-running-file) (debug:print-info 0 *default-log-port* "Syncer still running, skipping syncer start.") (system sync-cmd)) (set! *sync-in-progress* #f))))))) (if ((if (eq? syncdir 'todisk) < >) ;; use less than for todisk, greater than for from disk @@ -474,47 +474,63 @@ (set! has-last #t))) dbh (conc "SELECT name FROM pragma_table_info('"tablename"') as tblInfo;")) has-last)) +(define (replace-question-marks-with-number str num) + (define (replace-helper str index result) + (if (>= index (string-length str)) + result + (let ((char (string-ref str index))) + (if (char=? char #\?) + (replace-helper str (+ index 1) (string-append result (number->string num))) + (replace-helper str (+ index 1) (string-append result (string char))))))) + + (replace-helper str 0 "")) + + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) ) ;; ;; direction = fromdest, todisk ;; mode = 'full, 'incr ;; ;; Idea: youngest in dest is last_update time -;; +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + (define (dbmod:attach-sync tables dbh destdbfile direction #!key (mode 'full) (no-update '("keys")) ;; do ) - (let* ((num-changes 0) + (debug:print-info 2 *default-log-port* "dbmod:attach-sync") + (let* ((num-changes 0) (update-changed (lambda (num-changed table qryname) (if (> num-changed 0) (begin (debug:print-info 0 *default-log-port* "Changed "num-changed" rows for table "table", qry "qryname) (set! num-changes (+ num-changes num-changed))))))) - (debug:print 0 *default-log-port* "Doing sync "direction" "destdbfile) + (debug:print 2 *default-log-port* "Doing sync "direction" "destdbfile) (if (not (sqlite3:auto-committing? dbh)) (debug:print 0 *default-log-port* "Skipping sync due to transaction in flight.") (let* ((table-names (map car tables)) (dest-exists (file-exists? destdbfile))) (assert dest-exists "FATAL: sync called with non-existant file, "destdbfile) ;; attach the destdbfile ;; for each table ;; insert into dest. select * from src.
where last_update>last_update ;; done - (debug:print 0 *default-log-port* "Attaching "destdbfile" as auxdb") + (debug:print 2 *default-log-port* "Attaching "destdbfile" as auxdb") (handle-exceptions exn (begin (debug:print 0 "ATTACH failed, exiting. exn="(condition->list exn)) (exit 1)) (sqlite3:execute dbh (conc "ATTACH '"destdbfile"' AS auxdb;"))) (for-each (lambda (table) - (let* ((tbldat (alist-ref table tables equal?)) + (let* ((dummy (debug:print 2 *default-log-port* "Doing table " table)) + (tbldat (alist-ref table tables equal?)) (fields (map car tbldat)) (no-id-fields (filter (lambda (x)(not (equal? x "id"))) fields)) (fields-str (string-intersperse fields ",")) (no-id-fields-str (string-intersperse no-id-fields ",")) (dir (eq? direction 'todisk)) @@ -529,27 +545,38 @@ " SELECT * FROM "fromdb table";")) (stmt2 (conc "INSERT OR IGNORE INTO "todb table " SELECT * FROM "fromdb table" WHERE "fromdb table".id=?;")) (stmt8 (conc "UPDATE "todb table" SET ("no-id-fields-str") = (SELECT "no-id-fields-str" FROM "fromdb table" WHERE "todb table".id="fromdb table".id" (conc " AND "fromdb table".last_update > "todb table".last_update);") - ");")) - (stmt9 (conc "UPDATE "todb table" SET ("no-id-fields-str") = " + ");")) + (update-string (conc "UPDATE "todb table" SET ")) + (split-update + (let () + (for-each + (lambda (column) + (set! update-string (conc update-string column" = (SELECT "column" FROM "fromdb table" WHERE "fromdb table".id=?), ")) + ) + no-id-fields + ) + ;; drop the last ", " + (conc (substring update-string 0 (-(string-length update-string) 2)) " WHERE "todb table".id=? ") + ) + ) + + + (stmt9 (conc "UPDATE "todb table" SET ("no-id-fields-str") = " "(SELECT "no-id-fields-str" FROM "fromdb table" WHERE "fromdb table".id=?)" " WHERE "todb table".id=?")) (newrec (conc "SELECT id FROM "fromdb table" WHERE id NOT IN (SELECT id FROM "todb table");")) - #;(changedrec (conc "SELECT id FROM "fromdb table" WHERE "fromdb table".last_update > "todb table".last_update AND " - fromdb table".id="todb table".id;")) ;; main = fromdb (changedrec (conc "SELECT "fromdb table".id FROM "fromdb table" join "todb table" on "fromdb table".id="todb table".id WHERE "fromdb table".last_update > "todb table".last_update;")) - ;; SELECT main.tests.id FROM main.tests join auxdb.tests on main.tests.id=auxdb.tests.id WHERE main.tests.last_update > auxdb.tests.last_update;" (start-ms (current-milliseconds)) (new-ids (sqlite3:fold-row (lambda (res id)(cons id res)) '() dbh newrec))) - ;; (debug:print 0 *default-log-port* "Got "(length aux-ids)" in aux-ids and "(length main-ids)" in main-ids") (update-changed (length new-ids) table "new records") (mutex-lock! *db-transaction-mutex*) (handle-exceptions exn - (debug:print 0 *default-log-port* "Transaction update of "table" failed.") + (debug:print 0 *default-log-port* "Transaction update of id fields in "table" failed.") (sqlite3:with-transaction dbh (lambda () (for-each (lambda (id) (sqlite3:execute dbh stmt2 id)) @@ -556,23 +583,40 @@ new-ids)))) (if (member "last_update" fields) (handle-exceptions exn - (debug:print 0 *default-log-port* "Transaction update of "table" failed.") + (debug:print 0 *default-log-port* "Transaction update of non id fields in "table" failed.") (sqlite3:with-transaction dbh (lambda () - (let* ((changed-ids (sqlite3:fold-row (lambda (res id)(cons id res)) '() dbh changedrec))) + (let* ((changed-ids (sqlite3:fold-row (lambda (res id)(cons id res)) '() dbh changedrec)) + (sql-query "") + ) (update-changed (length changed-ids) table "changed records") (for-each (lambda (id) - (sqlite3:execute dbh stmt9 id id)) - changed-ids)))))) - + (let* ((update-with-ids (replace-question-marks-with-number split-update id)) + ) + (debug:print 2 *default-log-port* "about to do sqlite3:execute " dbh " " update-with-ids ) + (handle-exceptions + exn + (debug:print 0 *default-log-port* "update from " fromdb table " to " todb table " failed: " ((condition-property-accessor 'exn 'message) exn)) + (sqlite3:execute dbh update-with-ids) + ) + (debug:print 2 *default-log-port* "after sqlite3:execute") + ) + ) + changed-ids + ) + ) + ) + ) + ) + ) (mutex-unlock! *db-transaction-mutex*) - (debug:print 0 *default-log-port* "Synced table "table + (debug:print 2 *default-log-port* "Synced table "table " in "(- (current-milliseconds) start-ms)"ms") )) table-names) (sqlite3:execute dbh "DETACH auxdb;"))) Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -2633,10 +2633,11 @@ (if (args:get-arg "-sync-to") (let ((toppath (launch:setup))) (tasks:sync-to-postgres *configdat* (args:get-arg "-sync-to")) (set! *didsomething* #t))) + ;; use with -from and -to ;; (if (args:get-arg "-db2db") (let* ((duh (launch:setup)) @@ -2650,18 +2651,19 @@ (sync-timeout (if sync-timeout-in (string->number sync-timeout-in) #f)) (lockfile (conc dest-db".sync-lock")) (keys (db:get-keys #f)) (thesync (lambda (last-update) (debug:print-info 0 *default-log-port* "Attempting to sync data from "src-db" to "dest-db"...") + (debug:print-info 0 *default-log-port* "PID = " (current-process-id)) (if (not (file-exists? dest-db)) (begin (debug:print 0 *default-log-port* "Using copy to create "dest-db" from "src-db) (file-copy src-db dest-db) 1) (let ((res (dbmod:db-to-db-sync src-db dest-db last-update (dbfile:db-init-proc) keys))) (if res - (debug:print-info 0 *default-log-port* "Synced " res " records from "src-db" to "dest-db) + (debug:print-info 2 *default-log-port* "Synced " res " records from "src-db" to "dest-db) (debug:print-info 0 *default-log-port* "No sync due to permissions or other issue.")) res)))) (start-time (current-seconds)) (synclock-mod-time (if (file-exists? lockfile) (handle-exceptions @@ -2673,11 +2675,17 @@ ) (if (and src-db dest-db) (if (file-exists? src-db) (if (and (file-exists? lockfile) (< age 20)) (debug:print 0 *default-log-port* "Lock "lockfile" exists, skipping sync...") - (begin + (begin + (if (file-exists? lockfile) + (begin + (debug:print 0 *default-log-port* "Deleting old lock file " lockfile) + (delete-file lockfile) + ) + ) (dbfile:with-simple-file-lock lockfile (lambda () (let loop ((last-changed (current-seconds)) (last-update 0)) @@ -2694,11 +2702,11 @@ (> sync-timeout (- now-time last-changed))) (begin (if sync-period (thread-sleep! sync-period)) (loop (if (> changes 0) now-time last-changed) now-time)))))))) (debug:print 0 *default-log-port* "Releasing lock file " lockfile) - ) + ) ) (debug:print 0 *default-log-port* "No sync due to unreadble or non-existant source file"src-db)) (debug:print 0 *default-log-port* "Usage for -db2db; -to and -from must be specified")) (set! *didsomething* #t)))