Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -39,14 +39,20 @@ ;;====================================================================== ;; R E C O R D S ;;====================================================================== ;; each db entry is a pair ( db . dbfilepath ) +;; I propose this record evolves into the area record +;; (defstruct dbr:dbstruct - (tmpdb #f) - (mtdb #f) - (refndb #f)) + (tmpdb #f) + (mtdb #f) + (refndb #f) + (homehost #f) ;; not used yet + (on-homehost #f) ;; not used yet + ) ;; goal is to converge on one struct for an area but for now it is too confusing + ;; record for keeping state,status and count for doing roll-ups in ;; iterated tests ;; (defstruct dbr:counts Index: remotediff-nmsg.scm ================================================================== --- remotediff-nmsg.scm +++ remotediff-nmsg.scm @@ -38,10 +38,15 @@ #f) (loop (read-line inp))))))) (define *max-running* 40) +(define my-mutex-lock! conc) +(define my-mutex-unlock! conc) +;; (define my-mutex-lock! mutex-lock!) +;; (define my-mutex-unlock! mutex-unlock!) + (define (gather-dir-info path) (let ((mtx1 (make-mutex)) (threads (make-hash-table)) (last-num 0) (req (nn-socket 'req))) @@ -58,47 +63,47 @@ ((directory? p) '(dir)) ((symbolic-link? p) (list 'symlink (read-symbolic-link p))) (else '(data))))) (if (eq? (car info) 'data) (let loop ((start-time (current-seconds))) - (mutex-lock! mtx1) + (my-mutex-lock! mtx1) (let* ((num-threads (hash-table-size threads)) (ok-to-run (> *max-running* num-threads))) ;; (if (> (abs (- num-threads last-num)) 2) ;; (begin ;; ;; (print "num-threads:" num-threads) ;; (set! last-num num-threads))) - (mutex-unlock! mtx1) + (my-mutex-unlock! mtx1) (if ok-to-run (let ((run-time-start (current-seconds))) ;; (print "num threads: " num-threads) (let ((th1 (make-thread (lambda () (let ((cksum (checksum mtx1 p cmd: "md5sum")) (run-time (- (current-seconds) run-time-start))) - (mutex-lock! mtx1) + (my-mutex-lock! mtx1) (client-send-receive req (conc p " " cksum)) - (mutex-unlock! mtx1)) + (my-mutex-unlock! mtx1)) (let loop2 () - (mutex-lock! mtx1) + (my-mutex-lock! mtx1) (let ((registered (hash-table-exists? threads p))) (if registered (begin ;; (print "deleting thread reference for " p) (hash-table-delete! threads p))) ;; delete myself - (mutex-unlock! mtx1) + (my-mutex-unlock! mtx1) (if (not registered) (begin (thread-sleep! 0.5) (loop2)))))) p))) (thread-start! th1) ;; (thread-sleep! 0.05) ;; give things a little time to get going ;; (thread-join! th1) ;; - (mutex-lock! mtx1) + (my-mutex-lock! mtx1) (hash-table-set! threads p th1) - (mutex-unlock! mtx1) + (my-mutex-unlock! mtx1) )) ;; thread is launched (let ((run-time (- (current-seconds) start-time))) ;; couldn't launch yet (cond ((< run-time 5)) ;; blast on through ((< run-time 30)(thread-sleep! 0.1)) Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -76,20 +76,42 @@ ;; ensure we have a record for our connection for given area ((not *runremote*) (set! *runremote* (make-remote)) (mutex-unlock! *rmt-mutex*) (rmt:send-receive cmd rid params attemptnum: attemptnum)) + ;; ensure we have a homehost record + ((not (pair? (remote-hh-dat *runremote*))) ;; have a homehost record? + (thread-sleep! 0.1) ;; since we shouldn't get here, delay a little + (remote-hh-dat-set! *runremote* (common:get-homehost)) + (mutex-unlock! *rmt-mutex*) + (rmt:send-receive cmd rid params attemptnum: attemptnum)) + ;; on homehost and this is a read + ((and (cdr (remote-hh-dat *runremote*)) ;; on homehost + (member cmd api:read-only-queries)) ;; this is a read + (mutex-unlock! *rmt-mutex*) + (rmt:open-qry-close-locally cmd 0 params)) + ;; on homehost and this is a write, we already have a server + ((and (cdr (remote-hh-dat *runremote*)) ;; on homehost + (not (member cmd api:read-only-queries)) ;; this is a write + (remote-server-url *runremote*)) ;; have a server + (mutex-unlock! *rmt-mutex*) + (rmt:open-qry-close-locally cmd 0 params)) ;; no server contact made and this is a write, try starting a server ((and (not (remote-server-url *runremote*)) (not (member cmd api:read-only-queries))) (let ((serverconn (server:check-if-running *toppath*))) (if serverconn (remote-server-url-set! *runremote* serverconn) ;; the string can be consumed by the client setup if needed (if (not (server:start-attempted? *toppath*)) (server:kind-run *toppath*)))) - (mutex-unlock! *rmt-mutex*) - (rmt:send-receive cmd rid params attemptnum: attemptnum)) + (if (cdr (remote-hh-dat *runremote*)) ;; we are on the homehost, just do the call + (begin + (mutex-unlock! *rmt-mutex*) + (rmt:open-qry-close-locally cmd 0 params)) + (begin + (mutex-unlock! *rmt-mutex*) + (rmt:send-receive cmd rid params attemptnum: attemptnum)))) ;; if not on homehost ensure we have a connection to a live server ((or (not (pair? (remote-hh-dat *runremote*))) ;; have a homehost record? (not (cdr (remote-hh-dat *runremote*))) ;; have record, are we on a homehost? (not (remote-conndat *runremote*))) ;; do we not have a connection? (remote-hh-dat-set! *runremote* (common:get-homehost)) @@ -98,11 +120,11 @@ (server:kind-run *toppath*) ;; we need a sever (rmt:send-receive cmd rid params attemptnum: attemptnum)) ;; all set up if get this far, dispatch the query ((cdr (remote-hh-dat *runremote*)) ;; we are on homehost (mutex-unlock! *rmt-mutex*) - (rmt:open-qry-close-locally cmd (if rid rid 0) params) ) + (rmt:open-qry-close-locally cmd (if rid rid 0) params)) ;; reset the connection if it has been unused too long ((and (remote-conndat *runremote*) (let ((expire-time (- start-time (remote-server-timeout *runremote*)))) (< (http-transport:server-dat-get-last-access connection) expire-time))) (remote-conndatr *runremote* #f)) @@ -131,11 +153,10 @@ (debug:print 0 *default-log-port* "WARNING: communication failed. Trying again, try num: " attemptnum) (remote-conndat-set! *runremote* #f) (server-url-set! *runremote* #f) (tasks:start-and-wait-for-server (tasks:open-db) 0 15) (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))))))))) - (define (rmt:update-db-stats run-id rawcmd params duration) (mutex-lock! *db-stats-mutex*) (handle-exceptions exn