Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -17,10 +17,12 @@ (use (srfi 18) extras tcp stack) ;; RADT => use of require-extension? (use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest base64 format dot-locking z3 typed-records) (import (prefix sqlite3 sqlite3:)) (import (prefix base64 base64:)) ;; RADT => prefix?? +(include "/nfs/site/disks/icf_fdk_cw_gwa002/srehman/fossil/dbi/dbi.scm") +(import (prefix dbi dbi:)) (declare (unit db)) (declare (uses common)) (declare (uses keys)) (declare (uses ods)) @@ -46,10 +48,11 @@ (defstruct dbr:dbstruct ;; (tmpdb #f) (dbstack #f) ;; stack for tmp db handles, do not initialize with a stack (mtdb #f) (refndb #f) + (slave-dbs '()) (homehost #f) ;; not used yet (on-homehost #f) ;; not used yet ) ;; goal is to converge on one struct for an area but for now it is too confusing @@ -280,16 +283,49 @@ (dbfexists (file-exists? (conc dbpath "/megatest.db"))) (tmpdb (db:open-megatest-db path: dbpath)) ;; lock-create-open dbpath db:initialize-main-db)) (mtdb (db:open-megatest-db)) (refndb (db:open-megatest-db path: dbpath name: "megatest_ref.db")) (write-access (file-write-access? dbpath))) + + (if (args:get-arg "-server") + (if (configf:get-section *configdat* "ext-sync") + (let* ((dblist (configf:get-section *configdat* "ext-sync")) + (res '()) + (cfgdb #f)) + (for-each (lambda (dbitem) + (let* ((stringsplit (string-split (cadr dbitem))) + (dbtype (string->symbol (car stringsplit))) + (dbpath (cadr stringsplit))) + (case dbtype + ((sqlite3) + (set! cfgdb (dbi:open dbtype (cons (cons 'dbname dbpath) '()) )) + (db:initialize-main-db (dbi:db-conn cfgdb)) + (db:initialize-run-id-db (dbi:db-conn cfgdb)) + (set! res (cons (cons cfgdb dbpath) res))) + ((pg) + (let* ((dbinfo '())) + (for-each + (lambda (x) + (if (not (eqv? (string->symbol x) dbtype)) + (let* ((pair (string-split x ":"))) + (if (not (eqv? pair '())) + (set! dbinfo (cons (cons (string->symbol (car pair)) (cadr pair)) dbinfo)))))) + stringsplit) + (set! cfgdb (dbi:open dbtype dbinfo)) + (set! res (cons (cons cfgdb (alist-ref 'host dbinfo)) res)) + ))))) + dblist) + (dbr:dbstruct-slave-dbs-set! dbstruct res) + ))) + (if (and dbexists (not write-access)) (set! *db-write-access* #f)) (dbr:dbstruct-mtdb-set! dbstruct mtdb) (dbr:dbstruct-dbstack-set! dbstruct (make-stack)) (stack-push! (dbr:dbstruct-dbstack dbstruct) tmpdb) ;; olddb is already a (cons db path) (dbr:dbstruct-refndb-set! dbstruct refndb) + ;; (mutex-unlock! *rundb-mutex*) (if (and (not dbfexists) write-access) ;; *db-write-access*) ;; did not have a prior db and do have write access (begin (debug:print 0 *default-log-port* "filling db " (db:dbdat-get-path tmpdb) " with data from " (db:dbdat-get-path mtdb)) @@ -510,24 +546,24 @@ "megatest -import-megatest.db;megatest -cleanup-db") "\"\n") (exit) ;; we can not safely continue when a db was corrupted - even if fixed. ) ;; test read/write access to the database - (let ((db (sqlite3:open-database dbpath))) + (let ((db (dbi:open 'sqlite3 (cons (cons ('dbname dbpath) '()))))) (cond ((equal? fname "megatest.db") - (sqlite3:execute db "DELETE FROM tests WHERE state='DELETED';")) + (sqlite3:executeute db "DELETE FROM tests WHERE state='DELETED';")) ((equal? fname "main.db") - (sqlite3:execute db "DELETE FROM runs WHERE state='deleted';")) + (sqlite3:executeute db "DELETE FROM runs WHERE state='deleted';")) ((string-match "\\d.db" fname) - (sqlite3:execute db "UPDATE tests SET state='DELETED' WHERE state='DELETED';")) + (sqlite3:executeute db "UPDATE tests SET state='DELETED' WHERE state='DELETED';")) ((equal? fname "monitor.db") - (sqlite3:execute "DELETE FROM servers WHERE state LIKE 'defunct%';")) + (sqlite3:executeute "DELETE FROM servers WHERE state LIKE 'defunct%';")) (else - (sqlite3:execute db "vacuum;"))) + (sqlite3:executeute db "vacuum;"))) - (finalize! db) + (dbi:close db) #t)))))) ;; tbls is ( ("tablename" ( "field1" [#f|proc1] ) ( "field2" [#f|proc2] ) .... ) ) ;; db's are dbdat's ;; @@ -534,10 +570,13 @@ ;; if last-update specified ("field-name" . time-in-seconds) ;; then sync only records where field-name >= time-in-seconds ;; IFF field-name exists ;; (define (db:sync-tables tbls last-update fromdb todb . slave-dbs) + (set! todb (cons (dbi:convert (db:dbdat-get-db todb)) (db:dbdat-get-path todb))) + (set! fromdb (cons (dbi:convert (db:dbdat-get-db fromdb)) (db:dbdat-get-path fromdb))) + (handle-exceptions exn (begin (debug:print 0 *default-log-port* "EXCEPTION: database probably overloaded or unreadable in db:sync-tables.") (print-call-chain (current-error-port)) @@ -557,13 +596,13 @@ 0) ;; this is the work to be done (cond ((not fromdb) (debug:print 3 *default-log-port* "WARNING: db:sync-tables called with fromdb missing") -1) ((not todb) (debug:print 3 *default-log-port* "WARNING: db:sync-tables called with todb missing") -2) - ((not (sqlite3:database? (db:dbdat-get-db fromdb))) + ((not (dbi:database? (db:dbdat-get-db fromdb))) (debug:print-error 0 *default-log-port* "db:sync-tables called with fromdb not a database " fromdb) -3) - ((not (sqlite3:database? (db:dbdat-get-db todb))) + ((not (dbi:database? (db:dbdat-get-db todb))) (debug:print-error 0 *default-log-port* "db:sync-tables called with todb not a database " todb) -4) (else (let ((stmts (make-hash-table)) ;; table-field => stmt (all-stmts '()) ;; ( ( stmt1 value1 ) ( stml2 value2 )) (numrecs (make-hash-table)) @@ -591,10 +630,11 @@ ";")) (full-ins (conc "INSERT OR REPLACE INTO " tablename " ( " (string-intersperse (map car fields) ",") " ) " " VALUES ( " (string-intersperse (make-list num-fields "?") ",") " );")) (fromdat '()) (fromdats '()) + (tabletypes '()) (totrecords 0) (batch-len (string->number (or (configf:lookup *configdat* "sync" "batchsize") "10"))) (todat (make-hash-table)) (count 0)) @@ -604,13 +644,13 @@ (hash-table-set! field->num field count) (set! count (+ count 1))) fields) ;; read the source table - (sqlite3:for-each-row - (lambda (a . b) - (set! fromdat (cons (apply vector a b) fromdat)) + (dbi:for-each-row + (lambda (a) + (set! fromdat (cons a fromdat)) (if (> (length fromdat) batch-len) (begin (set! fromdats (cons fromdat fromdats)) (set! fromdat '()) (set! totrecords (+ totrecords 1))))) @@ -623,25 +663,41 @@ (if (common:low-noise-print 120 "sync-records") (debug:print-info 4 *default-log-port* "found " totrecords " records to sync")) ;; read the target table - (sqlite3:for-each-row - (lambda (a . b) - (hash-table-set! todat a (apply vector a b))) + (dbi:for-each-row + (lambda (a) + (hash-table-set! todat (vector-ref a 0) a)) (db:dbdat-get-db todb) full-sel) ;; first pass implementation, just insert all changed rows (for-each (lambda (targdb) - (let* ((db (db:dbdat-get-db targdb)) - (stmth (sqlite3:prepare db full-ins))) - (db:delay-if-busy targdb) ;; NO WAITING + (set! targdb (dbi:convert (db:dbdat-get-db targdb))) + (if (eqv? (dbi:db-dbtype targdb) 'pg) + (let* ((prep "")) + (for-each + (lambda (row) + (set! tabletypes (cons (cons (string->symbol (vector-ref row 1)) (vector-ref row 2)) tabletypes))) + (dbi:pull-metadata (db:dbdat-get-db fromdb) tablename)) + (set! prep (string-intersperse (map (lambda (x) (alist-ref (string->symbol (car x)) tabletypes)) fields) ",")) + (set! prep (conc "PREPARE fullins (" prep ") AS REPLACE INTO " tablename " ( " (string-intersperse (map car fields) ",") " ) VALUES ( ")) + (let loop ((i 1)) + (set! prep (conc prep "$" i ",")) + (if (< i (- num-fields 1)) + (loop (+ i 1)) + (set! prep (conc prep "$" (+ i 1) " );")))) + (set! full-ins prep))) + + (let* ((db (dbi:convert (db:dbdat-get-db targdb))) + (stmth (dbi:prepare db full-ins))) + ;; (db:delay-if-busy targdb) ;; NO WAITING (for-each (lambda (fromdat-lst) - (sqlite3:with-transaction + (dbi:with-transaction db (lambda () (for-each ;; (lambda (fromrow) (let* ((a (vector-ref fromrow 0)) @@ -653,17 +709,20 @@ (set! same #f)) (if (and same (< i (- num-fields 1))) (loop (+ i 1)))) (if (not same) - (begin - (apply sqlite3:execute stmth (vector->list fromrow)) - (hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0))))))) + (begin + (apply dbi:prepare-exec stmth (vector->list fromrow)) + (hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0))))))) + ;;(begin + ;; (dbi:prepare-exec stmth (vector->list fromrow)) + ;;(hash-table-set! numrecs tablename (+ 1 (hash-table-ref/default numrecs tablename 0))))))) fromdat-lst)) )) fromdats) - (sqlite3:finalize! stmth))) + (dbi:close stmth))) (append (list todb) slave-dbs)))) tbls) (let* ((runtime (- (current-milliseconds) start-time)) (should-print (or (debug:debug-mode 12) (common:low-noise-print 120 "db sync" (> runtime 500))))) ;; low and high sync times treated as separate. @@ -822,12 +881,13 @@ ;; (define (db:multi-db-sync dbstruct . options) (if (not (launch:setup)) (debug:print 0 *default-log-port* "ERROR: not able to setup up for megatest.") (let* ((mtdb (dbr:dbstruct-mtdb dbstruct)) - (tmpdb (db:get-db dbstruct)) + (tmpdb (db:get-db dbstruct)) (refndb (dbr:dbstruct-refndb dbstruct)) + (slave-dbs (dbr:dbstruct-slave-dbs dbstruct)) (allow-cleanup #t) ;; (if run-ids #f #t)) (tdbdat (tasks:open-db)) (servers (tasks:get-all-servers (db:delay-if-busy tdbdat))) (data-synced 0)) ;; count of changed records (I hope) @@ -874,11 +934,11 @@ ;; now ensure all newdb data are synced to megatest.db ;; do not use the run-ids list passed in to the function ;; (if (member 'new2old options) (set! data-synced - (+ (db:sync-tables (db:sync-all-tables-list dbstruct) #f tmpdb refndb mtdb) + (+ (apply db:sync-tables (db:sync-all-tables-list dbstruct) #f tmpdb refndb mtdb slave-dbs) data-synced))) (if (member 'fixschema options) (begin @@ -1148,11 +1208,11 @@ fail_count INTEGER DEFAULT 0, pass_count INTEGER DEFAULT 0, archived INTEGER DEFAULT 0, -- 0=no, > 1=archive block id where test data can be found last_update INTEGER DEFAULT (strftime('%s','now')), CONSTRAINT testsconstraint UNIQUE (run_id, testname, item_path));") - (sqlite3:execute db "CREATE INDEX IF NOT EXISTS tests_index ON tests (run_id, testname, item_path, uname);") + (sqlite3:execute db "CREATE INDEX IF NOT EXISTS tests_index ON tests (run_id, testname, item_path);") (sqlite3:execute db "CREATE TRIGGER IF NOT EXISTS update_tests_trigger AFTER UPDATE ON tests FOR EACH ROW BEGIN UPDATE tests SET last_update=(strftime('%s','now')) WHERE id=old.id;