Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -188,12 +188,12 @@ ;; else setup a connection ;; ;; if that fails, return '(#f "some reason") ;; NB// convert to raising an exception ;; (define (rmt:get-conn remdat apath dbname) - (let* ((fullname (db:dbname->path apath dbname)) ;; we'll switch to full name later - (conn (hash-table-ref/default (remotedat-conns remdat) dbname #f))) + (let* ((fullname (db:dbname->path apath dbname)) + (conn (hash-table-ref/default (remotedat-conns remdat) fullname #f))) (if (and conn (< (current-seconds) (conndat-expires conn))) conn #f))) @@ -211,11 +211,12 @@ ;; ;; TODO: This is unnecessarily re-creating the record in the hash table ;; (define (rmt:open-main-connection remdat apath) (let* ((fullpath (db:dbname->path apath "/.db/main.db")) - (conn (hash-table-ref/default remdat fullpath))) ;; TODO - create call for this + (conns (remotedat-conns remdat)) + (conn (hash-table-ref/default conns fullpath #f))) ;; TODO - create call for this (if (and conn ;; conn is NOT a socket, just saying ... (< (current-seconds) (conndat-expires conn))) conn ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died ;; Below we will find or create and connect to main (let* ((dbname (db:run-id->dbname #f)) @@ -230,10 +231,11 @@ (let* ((srv-addr (server-address the-srv)) ;; need serv (ipaddr (alist-ref 'ipaddr the-srv)) (port (alist-ref 'port the-srv)) (srvkey (alist-ref 'servkey the-srv)) (fullpath (db:dbname->path apath dbname)) + (new-the-srv (make-conndat apath: apath dbname: dbname fullname: fullpath hostport: srv-addr @@ -243,20 +245,20 @@ srvpkt: the-srv srvkey: srvkey ;; generated by rmt:get-signature on the server side lastmsg: (current-seconds) expires: (+ (current-seconds) 60) ;; this needs to be gathered during the ping ))) - (hash-table-set! (remotedat-conns remdat) - fullpath ;; dbname ;; fullpath ;; yes, I'd prefer it to be fullpath - FIXME later - new-the-srv))))))) + (hash-table-set! conns fullpath new-the-srv))))))) ;; NB// remdat is a remotedat struct ;; (define (rmt:general-open-connection remdat apath dbname #!key (num-tries 5)) (assert (not (equal? dbname ".db/main.db")) "ERROR: general-open-connection should never be called with main as the db") - (let* ((mdbname (db:run-id->dbname #f)) - (mconn (rmt:get-conn remdat apath mdbname))) + (let* ((mdbname (db:run-id->dbname #f)) + (fullname (db:dbname->path apath dbname)) + (conns (remotedat-conns remdat)) + (mconn (rmt:get-conn remdat apath mdbname))) (if (and mconn (not (debug:print-logger))) (if (equal? dbname ".db/main.db") (debug:print-info 0 *default-log-port* "Not turning on logging to main, I am main!") (begin @@ -287,12 +289,12 @@ ;; ".db/1.db") (match res ((host port servkey pid ipaddr apath dbname) (debug:print-info 0 *default-log-port* "got "res) - (hash-table-set! (remotedat-conns remdat) - dbname + (hash-table-set! conns + fullname (make-conndat apath: apath dbname: dbname hostport: (conc host":"port) ipaddr: ipaddr @@ -318,25 +320,26 @@ (define *dbstruct* (make-dbr:dbstruct)) ;; Defaults to current area ;; (define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) - (if (not *remotedat*)(set! *remotedat* (make-remotedat))) + ;; (if (not *remotedat*)(set! *remotedat* (make-remotedat))) (let* ((apath *toppath*) - (conns *remotedat*) + (remdat *remotedat*) + (conns (remotedat-conns remdat)) ;; just checking that remdat is a remotedat (dbname (db:run-id->dbname rid))) (if *localmode* (let* ((dbdat (dbr:dbstruct-get-dbdat *dbstruct* dbname)) (indat `((cmd . ,cmd)(params . ,params)))) (api:process-request *dbstruct* indat) ;; (api:process-request dbdat indat) ) (begin (if rid - (rmt:general-open-connection conns apath dbname) - (rmt:open-main-connection conns apath)) - (rmt:send-receive-real conns apath dbname cmd params))))) + (rmt:general-open-connection remdat apath dbname) ;; was conns + (rmt:open-main-connection remdat apath)) + (rmt:send-receive-real remdat apath dbname cmd params))))) #;(define (rmt:send-receive-setup conn) (if (not (conndat-inport conn)) (let-values (((i o) (tcp-connect (conndat-ipaddr conn) (conndat-port conn)))) @@ -427,11 +430,11 @@ (define (rmt:start-server run-id) (rmt:send-receive 'start-server 0 (list run-id))) (define (rmt:get-server-info apath dbname) - (rmt:send-receive 'get-server-info #f (list #f apath dbname))) + (rmt:send-receive 'get-server-info #f (list apath dbname))) ;;====================================================================== ;; M I S C ;;====================================================================== @@ -1499,10 +1502,11 @@ (debug:print-info 0 *default-log-port* "dbfile is "dbfile) (if dbfile (let* ((am-server (args:get-arg "-server")) (dbfile (args:get-arg "-db")) (apath *toppath*) + (remdat *remotedat*) ;; foundation for future fix (dbdat (db:get-dbdat *dbstruct-db* apath dbfile)) (db (dbr:dbdat-db dbdat)) (inmem (dbr:dbdat-db dbdat)) ) ;; do a final sync here @@ -1531,16 +1535,16 @@ (let* ((sdat *server-info*) ;; we have a run-id server (host (servdat-host sdat)) (port (servdat-port sdat)) (uuid (servdat-uuid sdat))) (if (not (string-match ".db/main.db" (args:get-arg "-db"))) - (let* ((res (rmt:deregister-server *remotedat* ;; TODO/BUG: why is this requiring *remotedat*? + (let* ((res (rmt:deregister-server remdat *toppath* (servdat-host *server-info*) ;; iface (servdat-port *server-info*) (servdat-uuid *server-info*) - (current-process-id) + dbfile ;; (current-process-id) ))) (debug:print-info 0 *default-log-port* "deregistered-server, res="res))) (debug:print-info 0 *default-log-port* "deregistering server "host":"port" with uuid "uuid) ))))))) @@ -1909,15 +1913,10 @@ (define (server-address srv-pkt) (conc (alist-ref 'host srv-pkt) ":" (alist-ref 'port srv-pkt))) (define (server-ready? host port key) ;; server-address is host:port -;; (let-values (((i o)(handle-exceptions -;; exn -;; (values #f #f) -;; (tcp-connect host port)))) -;; (if (and i o) (let* ((data (sexpr->string `((cmd . ping) (key . ,key) (params . ())))) (res (open-send-receive-nn (conc host ":" port) data))) (string->sexpr res))) @@ -2032,11 +2031,11 @@ ;; END NEW SERVER METHOD ;;====================================================================== ;; if .db/main.db check the pkts ;; -(define (http-transport:wait-for-server pkts-dir db-file server-key) +(define (rmt:wait-for-server pkts-dir db-file server-key) (let* ((sdat *server-info*)) (let loop ((start-time (current-seconds)) (changed #t) (last-sdat "not this")) (begin ;; let ((sdat #f)) @@ -2081,21 +2080,21 @@ (debug:print 0 *default-log-port* "best-srv-key: "best-srv-key", server-key: "server-key", i-am-srv: "i-am-srv) ;; am I the best-srv, compare server-keys to know (if i-am-srv (if (get-lock-db sdat db-file (servdat-port sdat)) ;; (db:get-iam-server-lock *dbstruct-db* *toppath* run-id) (begin - (debug:print 0 *default-log-port* "I'm the server!") + (debug:print-info 0 *default-log-port* "I'm the server!") (servdat-dbfile-set! sdat db-file) (servdat-status-set! sdat 'db-locked)) (begin - (debug:print 0 *default-log-port* "I'm not the server, exiting.") + (debug:print-info 0 *default-log-port* "I'm not the server, exiting.") (bdat-time-to-exit-set! *bdat* #t) (delete-pkt) (thread-sleep! 0.2) (exit))) (begin - (debug:print 0 *default-log-port* + (debug:print-info 0 *default-log-port* "Keys do not match "best-srv-key", "server-key", exiting.") (bdat-time-to-exit-set! *bdat* #t) (delete-pkt) (thread-sleep! 0.2) (exit))) @@ -2110,10 +2109,11 @@ (loop start-time (equal? sdat last-sdat) sdat)))))))) (define (rmt:register-server remdat apath iface port server-key dbname) + (remotedat-conns remdat) ;; just checking types (rmt:open-main-connection remdat apath) ;; we need a channel to main.db (rmt:send-receive-real remdat apath ;; params: host port servkey pid ipaddr dbpath (db:run-id->dbname #f) 'register-server `(,iface ,port @@ -2122,17 +2122,19 @@ ,iface ,apath ,dbname))) (define (rmt:get-count-servers remdat apath) + (remotedat-conns remdat) ;; just checking types (rmt:open-main-connection remdat apath) ;; we need a channel to main.db (rmt:send-receive-real remdat apath ;; params: host port servkey pid ipaddr dbpath (db:run-id->dbname #f) 'get-count-servers `(,apath ))) (define (rmt:deregister-server remdat apath iface port server-key dbname) + (remotedat-conns remdat) ;; just checking types (rmt:open-main-connection remdat apath) ;; we need a channel to main.db (rmt:send-receive-real remdat apath ;; params: host port servkey pid ipaddr dbpath (db:run-id->dbname #f) 'deregister-server `(,iface ,port @@ -2188,11 +2190,12 @@ ;; if none running or if > 20 seconds since ;; server last used then start shutdown ;; This thread waits for the server to come alive (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server") - (let* ((server-start-time (current-seconds)) + (let* ((remdat *remotedat*) + (server-start-time (current-seconds)) (pkts-dir (get-pkts-dir)) (server-key (rmt:get-signature)) ;; This servers key (is-main (equal? (args:get-arg "-db") ".db/main.db")) (last-access 0) (server-timeout (server:expiration-timeout))) @@ -2218,11 +2221,11 @@ (debug:print 0 *default-log-port* "SERVER: dbprep") (db:setup dbname) ;; sets *dbstruct-db* as side effect (servdat-status-set! *server-info* 'db-opened) ;; IFF I'm not main, call into main and register self (if (not is-main) - (let ((res (rmt:register-server *remotedat* + (let ((res (rmt:register-server remdat *toppath* iface port server-key dbname))) (if res ;; we are the server (servdat-status-set! *server-info* 'have-interface-and-db) (let* ((serv-info (rmt:get-server-info *toppath* dbname))) @@ -2229,11 +2232,11 @@ (match serv-info ((host port servkey pid ipaddr apath dbpath) (if (not (server-ready? host port servkey)) (begin (debug:print-info 0 *default-log-port* "Server registered but not alive. Removing and trying again.") - (rmt:deregister-server host port servkey pid ipaddr apath dbpath) + (rmt:deregister-server remdat apath host port dbpath) ;; servkey pid ipaddr apath dbpath) (loop (+ count 1) bad-sync-count start-time)))) (else (debug:print 0 *default-log-port* "We are not the server for "dbname", exiting. Server info is: "serv-info) (exit))))))) (debug:print 0 *default-log-port* @@ -2283,11 +2286,11 @@ (cond ((and *server-run* (> (+ last-access server-timeout) (current-seconds)) (if is-main - (> (rmt:get-count-servers *remotedat* *toppath*) 1) + (> (rmt:get-count-servers remdat *toppath*) 1) #t)) (if (common:low-noise-print 120 "server continuing") (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access))) (loop 0 bad-sync-count (current-milliseconds))) (else @@ -2378,11 +2381,11 @@ (nng-dial #;nn-bind rep (conc "tcp://*:" portnum))) rep)) (define (open-nn-connection host-port) - (let ((req (make-req-socket 'req)) + (let ((req (make-req-socket)) (uri (conc "tcp://" host-port))) (socket-set! req 'nng/recvtimeo 2000) (nng-dial req uri) req)) @@ -2427,43 +2430,36 @@ ;; (thread-start! th1) ;; (thread-start! th2) ;; (thread-join! th1) ;; res)))) ;; -;; (define (open-send-receive-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds -;; (let ((req (make-req-socket)) -;; (uri (conc "tcp://" host-port)) -;; (res #f) -;; ;; (contacts (alist-ref 'contact attrib)) -;; ;; (mode (alist-ref 'mode attrib)) -;; ) -;; (handle-exceptions -;; exn -;; (let ((emsg ((condition-property-accessor 'exn 'message) exn))) -;; ;; Send notification -;; (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\", exn=" exn) -;; #f) -;; (nng-dial req uri) -;; ;; (print "Connected to the server " ) -;; (nng-send req msg) -;; ;; (print "Request Sent") -;; ;; receive code here -;; ;;(print (nn-recv req)) -;; (let* ((th1 (make-thread (lambda () -;; (let ((resp (nng-recv req))) -;; (nng-close! req) -;; ;; (print resp) -;; (set! res resp))) -;; "recv thread")) -;; (th2 (make-thread (lambda () -;; (thread-sleep! timeout) -;; (thread-terminate! th1)) -;; "timer thread"))) -;; (thread-start! th1) -;; (thread-start! th2) -;; (thread-join! th1) -;; res)))) +(define (open-send-receive-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds + (let ((req (make-req-socket)) + (uri (conc "tcp://" host-port)) + (res #f)) + (handle-exceptions + exn + (let ((emsg ((condition-property-accessor 'exn 'message) exn))) + ;; Send notification + (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\", exn=" exn) + #f) + (nng-dial req uri) + (nng-send req msg) + (let* ((th1 (make-thread (lambda () + (let ((resp (nng-recv req))) + (nng-close! req) + ;; (print resp) + (set! res resp))) + "recv thread")) + (th2 (make-thread (lambda () + (thread-sleep! timeout) + (thread-terminate! th1)) + "timer thread"))) + (thread-start! th1) + (thread-start! th2) + (thread-join! th1) + res)))) ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;======================================================================