Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -394,101 +394,109 @@ '("jobgroup" #f))))) ;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) ) (define (db:sync-tables tbls fromdb todb . slave-dbs) (mutex-lock! *db-sync-mutex*) - (cond - ((not fromdb) (debug:print 3 "WARNING: db:sync-tables called with fromdb missing") -1) - ((not todb) (debug:print 3 "WARNING: db:sync-tables called with todb missing") -2) - ((not (sqlite3:database? fromdb)) - (debug:print 0 "ERROR: db:sync-tables called with fromdb not a database " fromdb) -3) - ((not (sqlite3:database? todb)) - (debug:print 0 "ERROR: db:sync-tables called with todb not a database " todb) -4) - (else - (let ((stmts (make-hash-table)) ;; table-field => stmt - (all-stmts '()) ;; ( ( stmt1 value1 ) ( stml2 value2 )) - (numrecs (make-hash-table)) - (start-time (current-milliseconds)) - (tot-count 0)) - (for-each ;; table - (lambda (tabledat) - (let* ((tablename (car tabledat)) - (fields (cdr tabledat)) - (num-fields (length fields)) - (field->num (make-hash-table)) - (num->field (apply vector (map car fields))) - (full-sel (conc "SELECT " (string-intersperse (map car fields) ",") - " FROM " tablename ";")) - (full-ins (conc "INSERT OR REPLACE INTO " tablename " ( " (string-intersperse (map car fields) ",") " ) " - " VALUES ( " (string-intersperse (make-list num-fields "?") ",") " );")) - (fromdat '()) - (todat (make-hash-table)) - (count 0)) - - ;; set up the field->num table - (for-each - (lambda (field) - (hash-table-set! field->num field count) - (set! count (+ count 1))) - fields) - - ;; read the source table - (sqlite3:for-each-row - (lambda (a . b) - (set! fromdat (cons (apply vector a b) fromdat))) - fromdb - full-sel) - - (debug:print-info 2 "found " (length fromdat) " records to sync") - - ;; read the target table - (sqlite3:for-each-row - (lambda (a . b) - (hash-table-set! todat a (apply vector a b))) - todb - full-sel) - - ;; first pass implementation, just insert all changed rows - (for-each - (lambda (targdb) - (let ((stmth (sqlite3:prepare targdb full-ins))) - (sqlite3:with-transaction - targdb - (lambda () - (for-each ;; - (lambda (fromrow) - (let* ((a (vector-ref fromrow 0)) - (curr (hash-table-ref/default todat a #f)) - (same #t)) - (let loop ((i 0)) - (if (or (not curr) - (not (equal? (vector-ref fromrow i)(vector-ref curr i)))) - (set! same #f)) - (if (and same - (< i (- num-fields 1))) - (loop (+ i 1)))) - (if (not same) - (begin - (apply sqlite3:execute stmth (vector->list fromrow)) - (hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0))))))) - fromdat))) - (sqlite3:finalize! stmth))) - (append (list todb) slave-dbs)))) - tbls) - (let* ((runtime (- (current-milliseconds) start-time)) - (should-print (common:low-noise-print 30 "db sync" (> runtime 500)))) ;; low and high sync times treated as separate. - (if should-print (debug:print 0 "INFO: db sync, total run time " runtime " ms")) - (for-each - (lambda (dat) - (let ((tblname (car dat)) - (count (cdr dat))) - (set! tot-count (+ tot-count count)) - (if (> count 0) - (if should-print (debug:print 0 (format #f " ~10a ~5a" tblname count)))))) - (sort (hash-table->alist numrecs)(lambda (a b)(> (cdr a)(cdr b)))))) - tot-count))) - (mutex-unlock! *db-sync-mutex*)) + (handle-exceptions + exn + (begin + (debug:print 0 "EXCEPTION: database probably overloaded or unreadable in db:sync-tables.") + (debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) + (print "exn=" (condition->list exn)) + (debug:print 0 " status: " ((condition-property-accessor 'sqlite3 'status) exn)) + (print-call-chain)) + (cond + ((not fromdb) (debug:print 3 "WARNING: db:sync-tables called with fromdb missing") -1) + ((not todb) (debug:print 3 "WARNING: db:sync-tables called with todb missing") -2) + ((not (sqlite3:database? fromdb)) + (debug:print 0 "ERROR: db:sync-tables called with fromdb not a database " fromdb) -3) + ((not (sqlite3:database? todb)) + (debug:print 0 "ERROR: db:sync-tables called with todb not a database " todb) -4) + (else + (let ((stmts (make-hash-table)) ;; table-field => stmt + (all-stmts '()) ;; ( ( stmt1 value1 ) ( stml2 value2 )) + (numrecs (make-hash-table)) + (start-time (current-milliseconds)) + (tot-count 0)) + (for-each ;; table + (lambda (tabledat) + (let* ((tablename (car tabledat)) + (fields (cdr tabledat)) + (num-fields (length fields)) + (field->num (make-hash-table)) + (num->field (apply vector (map car fields))) + (full-sel (conc "SELECT " (string-intersperse (map car fields) ",") + " FROM " tablename ";")) + (full-ins (conc "INSERT OR REPLACE INTO " tablename " ( " (string-intersperse (map car fields) ",") " ) " + " VALUES ( " (string-intersperse (make-list num-fields "?") ",") " );")) + (fromdat '()) + (todat (make-hash-table)) + (count 0)) + + ;; set up the field->num table + (for-each + (lambda (field) + (hash-table-set! field->num field count) + (set! count (+ count 1))) + fields) + + ;; read the source table + (sqlite3:for-each-row + (lambda (a . b) + (set! fromdat (cons (apply vector a b) fromdat))) + fromdb + full-sel) + + (debug:print-info 2 "found " (length fromdat) " records to sync") + + ;; read the target table + (sqlite3:for-each-row + (lambda (a . b) + (hash-table-set! todat a (apply vector a b))) + todb + full-sel) + + ;; first pass implementation, just insert all changed rows + (for-each + (lambda (targdb) + (let ((stmth (sqlite3:prepare targdb full-ins))) + (sqlite3:with-transaction + targdb + (lambda () + (for-each ;; + (lambda (fromrow) + (let* ((a (vector-ref fromrow 0)) + (curr (hash-table-ref/default todat a #f)) + (same #t)) + (let loop ((i 0)) + (if (or (not curr) + (not (equal? (vector-ref fromrow i)(vector-ref curr i)))) + (set! same #f)) + (if (and same + (< i (- num-fields 1))) + (loop (+ i 1)))) + (if (not same) + (begin + (apply sqlite3:execute stmth (vector->list fromrow)) + (hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0))))))) + fromdat))) + (sqlite3:finalize! stmth))) + (append (list todb) slave-dbs)))) + tbls) + (let* ((runtime (- (current-milliseconds) start-time)) + (should-print (common:low-noise-print 30 "db sync" (> runtime 500)))) ;; low and high sync times treated as separate. + (if should-print (debug:print 0 "INFO: db sync, total run time " runtime " ms")) + (for-each + (lambda (dat) + (let ((tblname (car dat)) + (count (cdr dat))) + (set! tot-count (+ tot-count count)) + (if (> count 0) + (if should-print (debug:print 0 (format #f " ~10a ~5a" tblname count)))))) + (sort (hash-table->alist numrecs)(lambda (a b)(> (cdr a)(cdr b)))))) + tot-count))) + (mutex-unlock! *db-sync-mutex*))) ;; options: ;; ;; 'killservers - kills all servers ;; 'dejunk - removes junk records Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -298,14 +298,18 @@ (lambda (run-id) (let ((last-write (hash-table-ref/default *db-local-sync* run-id 0))) (if ;; (and (> (- start-time last-write) 5) ;; every five seconds ;; (common:db-access-allowed?)) - (begin + (let ((sync-time (- (current-seconds) start-time))) (db:multi-db-sync (list run-id) 'new2old) (if (common:low-noise-print 30 "sync new to old") - (debug:print-info 0 "Sync of newdb to olddb for run-id " run-id " completed in " (- (current-seconds) start-time) " seconds")) + (begin + (debug:print-info 0 "Sync of newdb to olddb for run-id " run-id " completed in " sync-time " seconds") + (if (> sync-time 10) ;; took more than ten seconds, start a server for this run + (debug:print-info 0 "Sync is taking a long time, start up a server to assist for run " run-id) + (server:kind-run run-id)))) (hash-table-delete! *db-local-sync* run-id))))) (hash-table-keys *db-local-sync*)) (mutex-unlock! *db-multi-sync-mutex*)) ;; keep going unless time to exit @@ -811,11 +815,11 @@ (tdb:step-get-status step) (tdb:step-get-event_time step))) steps))))) tests))))) runs) - (db:close-all dbstruct) + ;; (db:close-all dbstruct) (set! *didsomething* #t)))) ;;====================================================================== ;; full run ;;======================================================================