Index: dbfile.scm ================================================================== --- dbfile.scm +++ dbfile.scm @@ -578,11 +578,11 @@ (tmpdb (db:open-db dbstruct run-id dbinit)) ;; sqlite3-db tmpdbfile #f)) (start-t (current-seconds))) (mutex-lock! *db-multi-sync-mutex*) (let ((update_info (cons "last_update" (if force-sync 0 *db-last-sync*) ))) (mutex-unlock! *db-multi-sync-mutex*) - (db:sync-tables (db:sync-all-tables-list dbstruct keys) update_info tmpdb mtdb)) + (db:sync-tables (db:sync-all-tables-list keys) update_info tmpdb mtdb)) (mutex-lock! *db-multi-sync-mutex*) (set! *db-last-sync* start-t) (set! *db-last-access* start-t) (mutex-unlock! *db-multi-sync-mutex*) (dbfile:add-dbdat dbstruct run-id tmpdb) @@ -641,12 +641,12 @@ '("type" #f) '("last_update" #f)))) ;; needs db to get keys, this is for syncing all tables ;; -(define (db:sync-main-list dbstruct keys) - (let ((keys keys)) ;; (db:get-keys dbstruct))) +(define (db:sync-main-list keys) + (let ((keys keys)) (list (list "keys" '("id" #f) '("fieldname" #f) '("fieldtype" #f)) @@ -697,12 +697,12 @@ '("params" #f) '("creation_time" #f) '("execution_time" #f)) ))) -(define (db:sync-all-tables-list dbstruct keys) - (append (db:sync-main-list dbstruct keys) +(define (db:sync-all-tables-list keys) + (append (db:sync-main-list keys) db:sync-tests-only)) ;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) ) ;; db's are dbdat's ;; Index: dbmod.scm ================================================================== --- dbmod.scm +++ dbmod.scm @@ -32,10 +32,11 @@ extras (prefix sqlite3 sqlite3:) posix typed-records + srfi-1 srfi-18 srfi-69 commonmod dbfile @@ -87,11 +88,11 @@ ;; Returns dbstruct ;; ;; * This routine creates the db if not found ;; * Probably can get rid of the dbstruct-in ;; -(define (dbmod:open-dbmoddb areapath run-id init-proc #!key (dbstruct-in #f)) ;; (conc *toppath* "/megatest.db") (car *configinfo*))) +(define (dbmod:open-dbmoddb areapath run-id init-proc keys #!key (dbstruct-in #f)) ;; (conc *toppath* "/megatest.db") (car *configinfo*))) (let* ((dbstruct (or dbstruct-in (make-dbr:dbstruct areapath: areapath))) (dbfname (dbmod:run-id->dbfname run-id)) (dbpath (dbmod:get-dbdir dbstruct run-id)) ;; directory where all the .db files are kept (dbfullname (dbmod:run-id->full-dbfname dbstruct run-id)) (dbexists (file-exists? dbfullname)) @@ -103,22 +104,180 @@ (let* ((db (sqlite3:open-database dbfullname)) (handler (sqlite3:make-busy-timeout 136000))) (sqlite3:set-busy-handler! db handler) (if write-access (init-proc db)) - db))))) + db)))) + (tables (db:sync-all-tables-list keys))) (dbr:dbstruct-inmem-set! dbstruct inmem) (dbr:dbstruct-ondiskdb-set! dbstruct db) (dbr:dbstruct-dbfile-set! dbstruct dbfullname) + (dbmod:sync-tables tables #f db inmem) dbstruct)) (define (dbmod:close-db dbstruct) ;; do final sync to disk file ;; (do-sync ...) (sqlite3:finalize! (dbr:dbstruct-ondiskdb dbstruct))) + +;;====================================================================== +;; Sync db +;;====================================================================== + +(define (dbmod:calc-use-last-update has-last-update fields last-update) + (cond + ((and has-last-update + (member "last_update" fields)) + #t) ;; if given a number, just use it for all fields + ((number? last-update) #f) ;; if not matched first entry then ignore last-update for this table + ((and (pair? last-update) + (member (car last-update) ;; last-update field name + (map car fields))) + #t) + ((and last-update (not (pair? last-update)) (not (number? last-update))) + (debug:print 0 *default-log-port* "ERROR: parameter last-update for db:sync-tables must be a pair or a number, received: " last-update);; found in fields + #f) + (else + #f))) + +;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) ) +;; dbs are sqlite3 db handles +;; +;; if last-update specified ("field-name" . time-in-seconds) +;; then sync only records where field-name >= time-in-seconds +;; IFF field-name exists +;; +;; Use (db:sync-all-tables-list keys) to get the tbls input +;; +(define (dbmod:sync-tables tbls last-update fromdb todb) + (let ((stmts (make-hash-table)) ;; table-field => stmt + (all-stmts '()) ;; ( ( stmt1 value1 ) ( stml2 value2 )) + (numrecs (make-hash-table)) + (start-time (current-milliseconds)) + (tot-count 0)) + (for-each ;; table + (lambda (tabledat) + (let* ((tablename (car tabledat)) + (fields (cdr tabledat)) + (has-last-update (member "last_update" fields)) + (use-last-update (dbmod:calc-use-last-update has-last-update fields last-update)) + (last-update-value (if use-last-update ;; no need to check for has-last-update - it is already accounted for + (if (number? last-update) + last-update + (cdr last-update)) + #f)) + (last-update-field (if use-last-update + (if (number? last-update) + "last_update" + (car last-update)) + #f)) + (num-fields (length fields)) + (field->num (make-hash-table)) + (num->field (apply vector (map car fields))) ;; BBHERE + (full-sel (conc "SELECT " (string-intersperse (map car fields) ",") + " FROM " tablename (if use-last-update ;; apply last-update criteria + (conc " WHERE " last-update-field " >= " last-update-value) + "") + ";")) + (full-ins (conc "INSERT OR REPLACE INTO " tablename " ( " (string-intersperse (map car fields) ",") " ) " + " VALUES ( " (string-intersperse (make-list num-fields "?") ",") " );")) + (fromdat '()) + (fromdats '()) + (totrecords 0) + (batch-len 100) ;; (string->number (or (configf:lookup *configdat* "sync" "batchsize") "100"))) + (todat (make-hash-table)) + (count 0) + (field-names (map car fields))) + + ;; set up the field->num table + (for-each + (lambda (field) + (hash-table-set! field->num field count) + (set! count (+ count 1))) + fields) + + ;; read the source table + ;; store a list of all rows in the table in fromdat, up to batch-len. + ;; Then add fromdat to the fromdats list, clear fromdat and repeat. + (sqlite3:for-each-row + (lambda (a . b) + (set! fromdat (cons (apply vector a b) fromdat)) + (if (> (length fromdat) batch-len) + (begin + (set! fromdats (cons fromdat fromdats)) + (set! fromdat '()) + (set! totrecords (+ totrecords 1))))) + fromdb + full-sel) + + ;; Count less than batch-len as a record + (if (> (length fromdat) 0) + (set! totrecords (+ totrecords 1))) + + ;; tack on remaining records in fromdat + (if (not (null? fromdat)) + (set! fromdats (cons fromdat fromdats))) + + (sqlite3:for-each-row + (lambda (a . b) + (hash-table-set! todat a (apply vector a b))) + todb + full-sel) + + ;; first pass implementation, just insert all changed rows + (let* ((db todb) + (drp-trigger (if (member "last_update" field-names) + (db:drop-trigger db tablename) + #f)) + (has-last-update (member "last_update" field-names)) + (is-trigger-dropped (if has-last-update + (db:is-trigger-dropped db tablename) + #f)) + (stmth (sqlite3:prepare db full-ins)) + (changed-rows 0)) + (for-each + (lambda (fromdat-lst) + (sqlite3:with-transaction + db + (lambda () + (for-each ;; + (lambda (fromrow) + (let* ((a (vector-ref fromrow 0)) + (curr (hash-table-ref/default todat a #f)) + (same #t)) + (let loop ((i 0)) + (if (or (not curr) + (not (equal? (vector-ref fromrow i)(vector-ref curr i)))) + (set! same #f)) + (if (and same + (< i (- num-fields 1))) + (loop (+ i 1)))) + (if (not same) + (begin + (apply sqlite3:execute stmth (vector->list fromrow)) + (hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0))) + (set! changed-rows (+ changed-rows 1)))))) + fromdat-lst)))) + fromdats) + + (sqlite3:finalize! stmth) + (if (member "last_update" field-names) + (db:create-trigger db tablename))))) + tbls) + (let* ((runtime (- (current-milliseconds) start-time)) + (should-print (or ;; (debug:debug-mode 12) + (common:low-noise-print 120 "db sync") + (> runtime 500)))) ;; low and high sync times treated as separate. + (for-each + (lambda (dat) + (let ((tblname (car dat)) + (count (cdr dat))) + (set! tot-count (+ tot-count count)))) + (sort (hash-table->alist numrecs)(lambda (a b)(> (cdr a)(cdr b)))))) + tot-count)) ;;====================================================================== ;; Moved from dbfile ;;====================================================================== ) Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -935,17 +935,18 @@ ;; Server? Start up here. ;; (if (args:get-arg "-server") (let* ((run-id (args:get-arg "-run-id")) (dbfname (args:get-arg "-db")) - (tl (launch:setup))) + (tl (launch:setup)) + (keys (keys:config-get-fields *configdat*))) (case (rmt:transport-mode) ((http)(http-transport:launch)) ((tcp) (debug:print 0 *default-log-port* "INFO: Running using tcp method.") (if run-id - (tt:start-server tl run-id dbfname api:tcp-dispatch-request-make-handler) + (tt:start-server tl run-id dbfname api:tcp-dispatch-request-make-handler keys) (begin (debug:print 0 *default-log-port* "ERROR: transport mode is tcp - -run-id is required.") (exit 1)))) (else (debug:print 0 *default-log-port* "ERROR: rmt:transport-mode value not recognised "(rmt:transport-mode)))) (set! *didsomething* #t))) Index: tcp-transportmod.scm ================================================================== --- tcp-transportmod.scm +++ tcp-transportmod.scm @@ -257,17 +257,17 @@ ;; NOTE: organise by dbfname, not run-id so we don't need ;; to pull in more modules ;; ;; This is the routine called in megatest.scm to start a server ;; -(define (tt:start-server areapath run-id dbfname handler) +(define (tt:start-server areapath run-id dbfname handler keys) (assert areapath "FATAL: areapath not provided for tt:start-server") ;; is there already a server for this dbfile? Then exit. (let* ((ttdat (make-tt areapath: areapath)) (servers (tt:find-server areapath dbfname))) ;; should use tt:get-current-server-info instead (if (null? servers) - (let* ((dbstruct (dbmod:open-dbmoddb areapath run-id (dbfile:db-init-proc)))) + (let* ((dbstruct (dbmod:open-dbmoddb areapath run-id (dbfile:db-init-proc) keys))) (tt-handler-set! ttdat (handler dbstruct)) (let* ((tcp-thread (make-thread (lambda () (tt:start-tcp-server ttdat)) ;; start the tcp-server which applies handler to incoming data "tcp-server-thread"))