Index: dbmod.scm ================================================================== --- dbmod.scm +++ dbmod.scm @@ -32,10 +32,11 @@ data-structures extras files (prefix sqlite3 sqlite3:) + matchable posix typed-records srfi-1 srfi-18 srfi-69 @@ -245,15 +246,16 @@ ;; direction: 'fromdest 'todest ;; (define (dbmod:sync-gasket tables last-update inmem dbh dbfname direction) (assert (sqlite3:database? inmem) "FATAL: sync-gasket: inmem is not a db") (assert (sqlite3:database? dbh) "FATAL: sync-gasket: dbh is not a db") + (debug:print-info 0 *default-log-port* "Db sync using "(dbfile:sync-method)" method") (case (dbfile:sync-method) ((none) #f) ((attach) (dbmod:attach-sync tables inmem dbfname direction)) - ((newsync) + ((newsync) ;; DON'T USE THIS ONE. IT IS BORKED (dbmod:new-sync tables inmem dbh dbfname direction)) (else (case direction ((todisk) (dbmod:sync-tables tables last-update inmem dbh) @@ -296,140 +298,244 @@ ;; Use (db:sync-all-tables-list keys) to get the tbls input ;; (define (dbmod:sync-tables tbls last-update fromdb todb) (assert (sqlite3:database? fromdb) "FATAL: dbmod:sync-tables called with fromdb not a database" fromdb) (assert (sqlite3:database? todb) "FATAL: dbmod:sync-tables called with fromdb not a database" todb) - (let ((stmts (make-hash-table)) ;; table-field => stmt + (let ((specials '(("keys" . "fieldname") + ("meta" . "var"))) + (stmts (make-hash-table)) ;; table-field => stmt (all-stmts '()) ;; ( ( stmt1 value1 ) ( stml2 value2 )) (numrecs (make-hash-table)) (start-time (current-milliseconds)) (tot-count 0)) (for-each ;; table (lambda (tabledat) - (let* ((tablename (car tabledat)) - (fields (cdr tabledat)) - (has-last-update (member "last_update" fields)) - (use-last-update (dbmod:calc-use-last-update has-last-update fields last-update)) - (last-update-value (if use-last-update ;; no need to check for has-last-update - it is already accounted for - (if (number? last-update) - last-update - (cdr last-update)) - #f)) - (last-update-field (if use-last-update - (if (number? last-update) - "last_update" - (car last-update)) - #f)) - (num-fields (length fields)) - (field->num (make-hash-table)) - (num->field (apply vector (map car fields))) ;; BBHERE - (full-sel (conc "SELECT " (string-intersperse (map car fields) ",") - " FROM " tablename (if use-last-update ;; apply last-update criteria - (conc " WHERE " last-update-field " >= " last-update-value) - "") - ";")) - (full-ins (conc "INSERT OR REPLACE INTO " tablename " ( " (string-intersperse (map car fields) ",") " ) " - " VALUES ( " (string-intersperse (make-list num-fields "?") ",") " );")) - (fromdat '()) - (fromdats '()) - (totrecords 0) - (batch-len 100) ;; (string->number (or (configf:lookup *configdat* "sync" "batchsize") "100"))) - (todat (make-hash-table)) - (count 0) - (field-names (map car fields))) - - ;; set up the field->num table - (for-each - (lambda (field) - (hash-table-set! field->num field count) - (set! count (+ count 1))) - fields) - - ;; read the source table - ;; store a list of all rows in the table in fromdat, up to batch-len. - ;; Then add fromdat to the fromdats list, clear fromdat and repeat. - (sqlite3:for-each-row - (lambda (a . b) - (set! fromdat (cons (apply vector a b) fromdat)) - (if (> (length fromdat) batch-len) - (begin - (set! fromdats (cons fromdat fromdats)) - (set! fromdat '()) - (set! totrecords (+ totrecords 1))))) - fromdb - full-sel) - - ;; Count less than batch-len as a record - (if (> (length fromdat) 0) - (set! totrecords (+ totrecords 1))) - - ;; tack on remaining records in fromdat - (if (not (null? fromdat)) - (set! fromdats (cons fromdat fromdats))) - - (sqlite3:for-each-row - (lambda (a . b) - (hash-table-set! todat a (apply vector a b))) - todb - full-sel) - - ;; first pass implementation, just insert all changed rows - - (let* ((db todb) - (drp-trigger (if (member "last_update" field-names) - (db:drop-trigger db tablename) - #f)) - (has-last-update (member "last_update" field-names)) - (is-trigger-dropped (if has-last-update - (db:is-trigger-dropped db tablename) - #f)) - (stmth (sqlite3:prepare db full-ins)) - (changed-rows 0)) - (for-each - (lambda (fromdat-lst) - (mutex-lock! *db-transaction-mutex*) - (sqlite3:with-transaction - db - (lambda () - (for-each ;; - (lambda (fromrow) - (let* ((a (vector-ref fromrow 0)) - (curr (hash-table-ref/default todat a #f)) - (same #t)) - (let loop ((i 0)) - (if (or (not curr) - (not (equal? (vector-ref fromrow i)(vector-ref curr i)))) - (set! same #f)) - (if (and same - (< i (- num-fields 1))) - (loop (+ i 1)))) - (if (not same) - (begin - (apply sqlite3:execute stmth (vector->list fromrow)) - (hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0))) - (set! changed-rows (+ changed-rows 1)))))) - fromdat-lst))) - (mutex-unlock! *db-transaction-mutex*)) - fromdats) - - (sqlite3:finalize! stmth) - (if (member "last_update" field-names) - (db:create-trigger db tablename))) - )) + (let* ((count (match tabledat + ((tablename . fields) + (debug:print-info 0 *default-log-port* "Syncing table "tablename) + (dbmod:sync-table tablename fields fromdb todb (alist-ref tablename specials equal?))) + (else + (debug:print-warn 0 *default-log-port* "Bad tabledat entry: "tabledat) + 0)))) + (set! tot-count (+ tot-count count)))) tbls) - (let* ((runtime (- (current-milliseconds) start-time)) - (should-print (or ;; (debug:debug-mode 12) - (common:low-noise-print 120 "db sync") - (> runtime 500)))) ;; low and high sync times treated as separate. - (for-each - (lambda (dat) - (let ((tblname (car dat)) - (count (cdr dat))) - (set! tot-count (+ tot-count count)))) - (sort (hash-table->alist numrecs)(lambda (a b)(> (cdr a)(cdr b)))))) + (debug:print-info 0 *default-log-port* "dbmod:sync-tables completed in "(- (current-milliseconds) start-time)"ms") tot-count)) +(define (dbmod:sync-table tablename fields from-db to-db keyfield) + (let* ((field-names (map car fields)) + (has-last-update (member "last_update" field-names)) + (fields-sans-lu (filter (lambda (x) + (not (member x '("id" "last_update")))) + field-names)) + (get-ids (lambda (db) + (sqlite3:fold-row (lambda (res id) + (cons id res)) + '() + db + (conc "SELECT id FROM "tablename";")))) + (get-val (lambda (db fieldname id) + (let* ((res #f) + (sql (conc "SELECT "fieldname" FROM "tablename" WHERE id=?;"))) + (sqlite3:for-each-row + (lambda (val) + (set! res val)) + db + sql + id) + ;; (debug:print-info 0 *default-log-port* "get-val "db" "fieldname" "id", sql="sql", res="res) + res))) + (get-row (lambda (db id) + (let* ((res #f)) + (sqlite3:for-each-row + (lambda tuple + (set! res tuple)) + db + (conc "SELECT " (string-intersperse fields-sans-lu ",") + " FROM "tablename" WHERE id=?;") + id) + res))) + (ins-row (lambda (db id row) + (let* ((qry (conc "INSERT INTO "tablename" (id," + (string-intersperse fields-sans-lu ",") + ") VALUES ("id"," + (string-intersperse + (make-list (length fields-sans-lu) "?") + ",") + ");"))) + ;; (debug:print-info 0 *default-log-port* "qry="qry) + (apply sqlite3:execute db + qry + row)))) + (num-inserts 0) + (num-updates 0) + ) + ;; (debug:print-info 0 *default-log-port* "field-names: "field-names", fields-sans-lu: "fields-sans-lu) + ;; (sqlite3:with-transaction + ;; from-db + ;; (lambda () + (let* ((from-ids (get-ids from-db))) + ;; (debug:print-info 0 *default-log-port* "Table "tablename", has "(length from-ids)" records.") + ;; (sqlite3:with-transaction + ;; to-db + ;; (lambda () + (let* ((to-ids (get-ids to-db))) + ;; (debug:print 0 *default-log-port* "to-ids="to-ids) + (for-each ;; from-id + (lambda (from-id) + (if (member from-id to-ids) + (for-each ;; case where record exists, do one by one the fields if different + (lambda (fieldname) + (let* ((from-val (get-val from-db fieldname from-id)) + (dest-val (get-val to-db fieldname from-id))) + #;(debug:print 0 *default-log-port* + "fieldname="fieldname + ", from-id="from-id + ", from-val="from-val + ", dest-val="dest-val + ) + (if (not (equal? from-val dest-val)) + (begin + (sqlite3:execute to-db (conc "UPDATE "tablename" SET "fieldname"=? WHERE id=?;") + from-val + from-id) + (set! num-updates (+ num-updates 1)))))) + fields-sans-lu) + (let ((row (get-row from-db from-id))) ;; need to insert the row + ;; (debug:print 0 *default-log-port* "row="row) + (set! num-inserts (+ num-inserts 1)) + (ins-row to-db from-id row)))) + from-ids)));; )))) + (+ num-inserts num-updates))) + +;; (for-each ;; table +;; (lambda (tabledat) +;; (let* ((tablename (car tabledat)) +;; (fields (cdr tabledat)) +;; (has-last-update (member "last_update" fields)) +;; (use-last-update (dbmod:calc-use-last-update has-last-update fields last-update)) +;; (last-update-value (if use-last-update ;; no need to check for has-last-update - it is already accounted for +;; (if (number? last-update) +;; last-update +;; (cdr last-update)) +;; #f)) +;; (last-update-field (if use-last-update +;; (if (number? last-update) +;; "last_update" +;; (car last-update)) +;; #f)) +;; (num-fields (length fields)) +;; (field->num (make-hash-table)) +;; (num->field (apply vector (map car fields))) ;; BBHERE +;; (full-sel (conc "SELECT " (string-intersperse (map car fields) ",") +;; " FROM " tablename (if use-last-update ;; apply last-update criteria +;; (conc " WHERE " last-update-field " >= " last-update-value) +;; "") +;; ";")) +;; (full-ins (conc "INSERT OR REPLACE INTO " tablename " ( " (string-intersperse (map car fields) ",") " ) " +;; " VALUES ( " (string-intersperse (make-list num-fields "?") ",") " );")) +;; (fromdat '()) +;; (fromdats '()) +;; (totrecords 0) +;; (batch-len 10000000) ;; (string->number (or (configf:lookup *configdat* "sync" "batchsize") "100"))) +;; (todat (make-hash-table)) +;; (count 0) +;; (field-names (map car fields))) +;; +;; (debug:print-info 0 *default-log-port* "Syncing table "tablename) +;; +;; ;; set up the field->num table +;; (for-each +;; (lambda (field) +;; (hash-table-set! field->num field count) +;; (set! count (+ count 1))) +;; fields) +;; +;; ;; read the source table +;; ;; store a list of all rows in the table in fromdat, up to batch-len. +;; ;; Then add fromdat to the fromdats list, clear fromdat and repeat. +;; (sqlite3:for-each-row +;; (lambda (a . b) +;; (set! fromdat (cons (apply vector a b) fromdat)) +;; (if (> (length fromdat) batch-len) +;; (begin +;; (set! fromdats (cons fromdat fromdats)) +;; (set! fromdat '()) +;; (set! totrecords (+ totrecords 1))))) +;; fromdb +;; full-sel) +;; +;; (debug:print-info 0 *default-log-port* "Have "totrecords" records to update.") +;; ;; Count less than batch-len as a record +;; (if (> (length fromdat) 0) +;; (set! totrecords (+ totrecords 1))) +;; +;; ;; tack on remaining records in fromdat +;; (if (not (null? fromdat)) +;; (set! fromdats (cons fromdat fromdats))) +;; +;; (sqlite3:for-each-row +;; (lambda (a . b) +;; (hash-table-set! todat a (apply vector a b))) +;; todb +;; full-sel) +;; +;; ;; first pass implementation, just insert all changed rows +;; +;; (let* ((db todb) +;; (has-last-update (member "last_update" field-names)) +;; (drp-trigger (if has-last-update +;; (db:drop-trigger db tablename) +;; #f)) +;; (is-trigger-dropped (if has-last-update +;; (db:is-trigger-dropped db tablename) +;; #f)) +;; (stmth (sqlite3:prepare db full-ins)) +;; (changed-rows 0)) +;; (for-each +;; (lambda (fromdat-lst) +;; (mutex-lock! *db-transaction-mutex*) +;; (sqlite3:with-transaction +;; db +;; (lambda () +;; (for-each ;; +;; (lambda (fromrow) +;; (let* ((a (vector-ref fromrow 0)) +;; (curr (hash-table-ref/default todat a #f)) +;; (same #t)) +;; (let loop ((i 0)) +;; (if (or (not curr) +;; (not (equal? (vector-ref fromrow i)(vector-ref curr i)))) +;; (set! same #f)) +;; (if (and same +;; (< i (- num-fields 1))) +;; (loop (+ i 1)))) +;; (if (not same) +;; (begin +;; (apply sqlite3:execute stmth (vector->list fromrow)) +;; (hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0))) +;; (set! changed-rows (+ changed-rows 1)))))) +;; fromdat-lst))) +;; (mutex-unlock! *db-transaction-mutex*)) +;; fromdats) +;; +;; (sqlite3:finalize! stmth) +;; (if (member "last_update" field-names) +;; (db:create-trigger db tablename))) +;; )) +;; tbls) +;; (let* ((runtime (- (current-milliseconds) start-time)) +;; (should-print (or ;; (debug:debug-mode 12) +;; (common:low-noise-print 120 "db sync") +;; (> runtime 500)))) ;; low and high sync times treated as separate. +;; (for-each +;; (lambda (dat) +;; (let ((tblname (car dat)) +;; (count (cdr dat))) +;; (set! tot-count (+ tot-count count)))) +;; (sort (hash-table->alist numrecs)(lambda (a b)(> (cdr a)(cdr b)))))) + (define (has-last-update dbh tablename) (let* ((has-last #f)) (sqlite3:for-each-row (lambda (name) (if (equal? name "last_update") Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -2571,27 +2571,26 @@ (src-db (args:get-arg "-from")) (dest-db (args:get-arg "-to")) (sync-period (args:get-arg "-period")) ;; NOT IMPLEMENTED YET (sync-timeout (args:get-arg "-timeout")) ;; NOT IMPLEMENTED YET (lockfile (conc dest-db".lock")) - ;; (locked (common:simple-file-lock lockfile)) (keys (db:get-keys #f)) - (res ;; (if locked - (dbmod:db-to-db-sync src-db dest-db 0 (dbfile:db-init-proc) keys) - ;; #f))) - )) + ) (if (and src-db dest-db) (begin (debug:print-info 0 *default-log-port* "Attempting to sync data from "src-db" to "dest-db"...") - (if res - (begin - (common:simple-file-release-lock lockfile) - (debug:print 0 *default-log-port* "Synced " res " records from "src-db" to "dest-db)) - (debug:print 0 *default-log-port* "Skipping sync, there is a sync in progress.")) - (set! *didsomething* #t)) - (debug:print 0 *default-log-port* "Usage for -db2db; -to and -from must be specified")))) + ;; (if (common:simple-file-lock lockfile) + ;; (begin + (if (not (file-exists? dest-db)) ;; use copy to get going + (file-copy src-db dest-db)) + (let ((res (dbmod:db-to-db-sync src-db dest-db 0 (dbfile:db-init-proc) keys))) + ;; (common:simple-file-release-lock lockfile) + (debug:print 0 *default-log-port* "Synced " res " records from "src-db" to "dest-db))) + (debug:print 0 *default-log-port* "Skipping sync, there is a sync in progress.")) + (set! *didsomething* #t)) + (debug:print 0 *default-log-port* "Usage for -db2db; -to and -from must be specified")) (if (args:get-arg "-list-test-time") (let* ((toppath (launch:setup))) (task:get-test-times) (set! *didsomething* #t)))