Index: dbmod.scm ================================================================== --- dbmod.scm +++ dbmod.scm @@ -422,32 +422,22 @@ ;; create and fill the inmemory db ;; assemble into dbr:dbdat struct and return ;; (define (db:open-dbdat apath dbfile dbinit-proc) (let* ((db (db:open-run-db dbfile dbinit-proc)) - ;; (inmem (db:open-inmem-db dbinit-proc)) + (inmem (db:open-inmem-db dbinit-proc)) (dbdat (make-dbr:dbdat - db: #f ;; db - inmem: db ;; inmem + db: db + inmem: inmem ;; run-id: run-id ;; no can do, there are many run-id values that point to single db fname: dbfile))) + (assert (and (sqlite3:database? db)(sqlite3:database? inmem)) + "FATAL: should have both inmem and on-disk db at this time.") ;; now sync the disk file data into the inmemory db - ;; (db:sync-tables (db:sync-all-tables-list) '("last_update" . 0) db inmem) + (db:sync-tables (db:sync-all-tables-list) '("last_update" . 0) db inmem) ;; (sqlite3:finalize! db) ;; open and close every sync dbdat)) -;; (define (db:open-dbdat apath dbfile dbinit-proc) -;; (let* ((db (db:open-run-db dbfile dbinit-proc)) -;; (inmem (db:open-inmem-db dbinit-proc)) -;; (dbdat (make-dbr:dbdat -;; db: #f ;; db -;; inmem: inmem -;; ;; run-id: run-id ;; no can do, there are many run-id values that point to single db -;; fname: dbfile))) -;; ;; now sync the disk file data into the inmemory db -;; (db:sync-tables (db:sync-all-tables-list) '("last_update" . 0) db inmem) -;; (sqlite3:finalize! db) ;; open and close every sync -;; dbdat)) ;; open the disk database file ;; NOTE: May need to add locking to file create process here ;; returns an sqlite3 database handle ;; @@ -501,11 +491,20 @@ (define (db:setup db-file) ;; run-id) (assert *toppath* "FATAL: db:setup called before toppath is available.") (let* ((dbstruct (or *dbstruct-db* (make-dbr:dbstruct)))) (db:get-dbdat dbstruct *toppath* db-file) (if (not *dbstruct-db*)(set! *dbstruct-db* dbstruct)) + (assert (db:check-setup dbstruct *toppath* db-file) "FATAL: db:setup did NOT complete properly") dbstruct)) + +(define (db:check-setup dbstruct apath dbfile) + (let* ((dbdat (db:get-dbdat dbstruct apath dbfile)) + (dbfullname (conc apath "/" dbfile)) + (db (dbr:dbdat-db dbdat)) ;; (db:open-run-db dbfullname db:initialize-db)) ;; + (inmem (dbr:dbdat-inmem dbdat))) + (and (sqlite3:database? db) + (sqlite3:database? inmem)))) ;;====================================================================== ;; setting/getting a lock on the db for only one server per db ;; ;; NOTE: @@ -693,35 +692,36 @@ ;; NOTE: touched logic is disabled/not done ;; sync run to disk if touched ;; (define (db:sync-inmem->disk dbstruct apath dbfile #!key (force-sync #f)) - (if #f - (debug:print-info 0 *default-log-port* "syncing "*toppath*" "dbfile" at "(current-seconds)) - #f)) ;; disabled -;; (let* ((dbdat (db:get-dbdat dbstruct apath dbfile)) -;; (dbfullname (conc apath "/" dbfile)) -;; (db (db:open-run-db dbfullname db:initialize-db)) ;; (dbr:dbdat-db dbdat)) -;; (inmem (dbr:dbdat-inmem dbdat)) -;; (start-t (current-seconds)) -;; (last-update (dbr:dbdat-last-write dbdat)) -;; (last-sync (dbr:dbdat-last-sync dbdat))) -;; (debug:print-info 0 *default-log-port* "Syncing for dbfile: "dbfile", last-update: "last-update", last-sync: "last-sync) -;; (mutex-lock! *db-multi-sync-mutex*) -;; (let* ((update_info (cons "last_update" (if force-sync 0 last-update))) ;; "last_update")) -;; (need-sync (or force-sync (>= last-update last-sync)))) -;; (if need-sync -;; (begin -;; (db:sync-tables (db:sync-all-tables-list) update_info inmem db) -;; (dbr:dbdat-last-sync-set! dbdat start-t)) -;; (debug:print 0 *default-log-port* "Skipping sync as nothing touched."))) -;; (sqlite3:finalize! db) -;; (mutex-unlock! *db-multi-sync-mutex*))) - + (let* ((dbdat (db:get-dbdat dbstruct apath dbfile)) + (dbfullname (conc apath "/" dbfile)) + (db (dbr:dbdat-db dbdat)) ;; (db:open-run-db dbfullname db:initialize-db)) ;; + (inmem (dbr:dbdat-inmem dbdat)) + (start-t (current-seconds)) + (last-update (dbr:dbdat-last-write dbdat)) + (last-sync (dbr:dbdat-last-sync dbdat))) + (if (and (sqlite3:database? db) + (sqlite3:database? inmem)) + (begin + (debug:print-info 0 *default-log-port* "Syncing for dbfile: "dbfile", last-update: "last-update", last-sync: "last-sync) + (mutex-lock! *db-multi-sync-mutex*) + (let* ((update_info (cons "last_update" (if force-sync 0 last-update))) ;; "last_update")) + (need-sync (or force-sync (>= last-update last-sync)))) + (if need-sync + (begin + (db:sync-tables (db:sync-all-tables-list) update_info inmem db) + (dbr:dbdat-last-sync-set! dbdat start-t)) + (debug:print 0 *default-log-port* "Skipping sync as nothing touched."))) + ;; (sqlite3:finalize! db) + (mutex-unlock! *db-multi-sync-mutex*)) + (debug:print-info 0 *default-log-port* "Skipping sync due to databases not being open.")))) + ;; TODO: Add final sync to this ;; -#;(define (db:safely-close-sqlite3-db db stmt-cache #!key (try-num 3)) +(define (db:safely-close-sqlite3-db db stmt-cache #!key (try-num 3)) (if (<= try-num 0) #f (handle-exceptions exn (begin @@ -735,11 +735,11 @@ (sqlite3:finalize! db) #t) #f)))) ;; close all opened run-id dbs -#;(define (db:close-all dbstruct) +(define (db:close-all dbstruct) (assert (dbr:dbstruct? dbstruct) "FATAL: db:close-all called with dbstruct not set up.") (handle-exceptions exn (begin (debug:print 0 *default-log-port* "WARNING: Finalizing failed, " ((condition-property-accessor 'exn 'message) exn) ", note - exn=" exn) @@ -1714,13 +1714,13 @@ state TEXT DEFAULT 'new', status TEXT DEFAULT 'n/a', archive_type TEXT DEFAULT 'bup', du INTEGER, archive_path TEXT);"))) - (print "creating triggers from init") - (db:create-triggers db) - db)) ;; ) + (debug:print 0 *default-log-port* "creating triggers from init") + (db:create-triggers db) + db)) ;; ) ;;====================================================================== ;; A R C H I V E S ;;====================================================================== @@ -3029,12 +3029,10 @@ (db:with-db dbstruct #f #f (lambda (db) (sqlite3:execute db "UPDATE runs SET status=?,state=? WHERE id=?;" status state run-id)))) - - (define (db:get-run-status dbstruct run-id) (let ((res "n/a")) (db:with-db dbstruct #f #f (lambda (db) @@ -4275,22 +4273,26 @@ (define (db:set-state-status-and-roll-up-run dbstruct run-id curr-state curr-status) ;; (mutex-lock! *db-transaction-mutex*) (db:with-db dbstruct #f #f (lambda (db) - (let ((tr-res +;; (let ((tr-res (sqlite3:with-transaction db (lambda () (let* ((state-status-counts (db:get-all-state-status-counts-for-run dbstruct run-id)) (state-statuses (db:roll-up-rules state-status-counts #f #f )) (newstate (car state-statuses)) (newstatus (cadr state-statuses))) (if (or (not (eq? newstate curr-state)) (not (eq? newstatus curr-status))) - (db:set-run-state-status dbstruct run-id newstate newstatus ))))))) + (begin + (db:set-run-state-status dbstruct run-id newstate newstatus) + #t) ;; changes made + #f) ;; no changes + )))))) ;; (mutex-unlock! *db-transaction-mutex*) - tr-res)))) +;; tr-res)))) (define (db:get-all-state-status-counts-for-run dbstruct run-id) (let* ((test-count-recs (db:with-db dbstruct #f #f Index: tests/simplerun/debug.scm ================================================================== --- tests/simplerun/debug.scm +++ tests/simplerun/debug.scm @@ -32,17 +32,18 @@ (i 1) (s 0)) ;; sum (let ((start-time (current-milliseconds)) (run-id (+ r (make-run-id)))) (rmt:register-test run-id "test1" (conc "item_" i)) + (thread-sleep! 0.01) (let* ((qry-time (- (current-milliseconds) start-time)) (tot-query-time (+ qry-time s)) (avg-query-time (* 1.0 (/ tot-query-time i)))) (if (> qry-time 500) (print "WARNING: rmt:register-test took more than 500ms, "qry-time"ms, i="i", avg-query-time="avg-query-time)) (if (eq? (modulo i 100) 0) - (print "For run-id="run-id", "(rmt:get-keys-write)" num tests registered="i)) + (print "For run-id="run-id", "(rmt:get-keys-write)" num tests registered="i" avg-query-time="avg-query-time)) (if (< i 500) (loop r (+ i 1) tot-query-time) (if (< r 100) (let* ((start-time (current-milliseconds))) (print "rmt:get-keys "(rmt:get-keys)" in "(- (current-milliseconds) start-time)) Index: ulex-simple/ulex.scm ================================================================== --- ulex-simple/ulex.scm +++ ulex-simple/ulex.scm @@ -63,10 +63,11 @@ chicken.base chicken.file chicken.io chicken.time chicken.condition + chicken.port chicken.string chicken.sort chicken.pretty-print chicken.tcp @@ -84,10 +85,15 @@ srfi-69 system-information ;; tcp6 tcp-server typed-records + + md5 + message-digest + (prefix base64 base64:) + z3 ) ;; udat struct, used by both caller and callee ;; instantiated as uconn by convention ;; @@ -108,10 +114,57 @@ (numthreads 10) (cmd-thread #f) (work-queue-thread #f) (num-threads-running 0) ) + +;;====================================================================== +;; serialization +;; NOTE: I've had problems with read/write and s11n serialize, deserialize +;; thus the inefficient method here +;;====================================================================== + +(define serializing-method (make-parameter 'complex)) + +;; NOTE: Can remove the regex and base64 encoding for zmq +(define (obj->string obj) + (case (serializing-method) + ((complex) + (string-substitute + (regexp "=") "_" + (base64:base64-encode + (z3:encode-buffer + (with-output-to-string + (lambda ()(serialize obj))))) ;; BB: serialize - this is + ;; what causes problems + ;; between different builds of + ;; megatest communicating. + ;; serialize is sensitive to + ;; binary image of mtest. + #t)) + ((write)(with-output-to-string (lambda ()(write obj)))) + ((s11n) (with-output-to-string (lambda ()(serialize obj)))) + (else obj))) ;; rpc + +(define (string->obj msg #!key (transport 'http)) + (case (serializing-method) + ((complex) + (if (string? msg) + (with-input-from-string + (z3:decode-buffer + (base64:base64-decode + (string-substitute + (regexp "_") "=" msg #t))) + (lambda ()(deserialize))) + (begin + (print "ULEX ERROR: cannot translate received data \""msg"\"") + (print-call-chain (current-error-port)) + msg))) ;; crude reply for when things go awry + ((write)(with-input-from-string msg (lambda ()(read)))) + ((s11n)(with-input-from-string msg (lambda ()(deserialize)))) + (else msg))) ;; rpc + ;;====================================================================== ;; listener ;;====================================================================== @@ -153,13 +206,13 @@ (udat-work-proc-set! uconn handler-proc) (if (setup-listener uconn port-suggestion) ((make-tcp-server (udat-socket uconn) (lambda () - (let* ((rdat (deserialize)) ;; '(my-host-port qrykey cmd params) + (let* ((rdat (string->obj (read)) #;(deserialize)) ;; '(my-host-port qrykey cmd params) (resp (do-work uconn rdat))) - (serialize resp))))) + (write (obj->string resp)) #;(serialize resp))))) (assert #f "ERROR: run-listener called without proper setup.")))) (define (wait-and-close uconn) (thread-join! (udat-cmd-thread uconn)) (tcp-close (udat-socket uconn))) @@ -187,26 +240,29 @@ (dat (list my-host-port 'qrykey cmd params #;(cons (current-seconds)(current-milliseconds))))) (cond (isme (do-work udata dat)) ;; no transmission needed (else (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC? - exn - (message exn) - (begin - ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP - (let-values (((inp oup)(tcp-connect host port))) - (let ((res (if (and inp oup) - (begin - (serialize dat oup) - (close-output-port oup) - (deserialize inp)) - (begin - (print "ERROR: send called but no receiver has been setup. Please call setup first!") - #f)))) - (close-input-port inp) - ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP - res)))))))) ;; res will always be 'ack unless return-method is direct + exn + (begin + (print "ULEX send-receive: exn="exn) + (message exn)) + (begin + ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP + (let-values (((inp oup)(tcp-connect host port))) + (let ((res (if (and inp oup) + (begin + (write (obj->string dat) oup) ;; (write dat oup);; (serialize dat oup) + (close-output-port oup) + (string->obj (read inp))) ;; (deserialize inp)) + (begin + (print "ERROR: send called but no receiver has been setup. Please call setup first!") + #f)))) + ;; (close-output-port oup) + (close-input-port inp) + ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP + res)))))))) ;; res will always be 'ack unless return-method is direct ;;====================================================================== ;; work queues - this is all happening on the listener side ;;======================================================================