Index: Makefile ================================================================== --- Makefile +++ Makefile @@ -173,11 +173,10 @@ mofiles/mutils.o : mutils/mutils.scm mofiles/cookie.o : stml2/cookie.scm mofiles/stml2.o : stml2/stml2.scm # for the modularized stuff - mofiles/commonmod.o : megatest-fossil-hash.scm mofiles/stml2.o \ mofiles/mtargs.o mofiles/pkts.o mofiles/mtconfigf.o \ mofiles/processmod.o mofiles/pgdbmod.o : mofiles/commonmod.o mofiles/dbmod.o : mofiles/commonmod.o mofiles/keysmod.o \ @@ -191,10 +190,13 @@ mofiles/dbmod.o mofiles/pgdbmod.o mofiles/launchmod.o \ mofiles/subrunmod.o mofiles/servermod.o : mofiles/commonmod.o mofiles/dbmod.o mofiles/testsmod.o : mofiles/servermod.o mofiles/dbmod.o mofiles/launchmod.o : mofiles/subrunmod.o mofiles/testsmod.o + +# special cases where an upstream .import file is needed to compile a module +mofiles/rmtmod.o : ulex.import.o # Removed from megamod.o dep: mofiles/ftail.o mofiles/megamod.o : \ mofiles/rmtmod.o \ mofiles/commonmod.o \ Index: commonmod.scm ================================================================== --- commonmod.scm +++ commonmod.scm @@ -20,11 +20,10 @@ (declare (unit commonmod)) (declare (uses mtargs)) ;; (declare (uses stml2)) (declare (uses mtconfigf)) -(declare (uses ulex)) (declare (uses pkts)) (module commonmod * (import scheme chicken data-structures extras) @@ -41,11 +40,10 @@ z3 directory-utils sparse-vectors) (import pkts) -(import ulex) (import (prefix mtconfigf configf:)) (import (prefix mtargs args:)) (include "common_records.scm") (include "megatest-fossil-hash.scm") Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -22,11 +22,11 @@ ;; fake out readline usage of toplevel-command (define (toplevel-command . a) #f) (use (prefix sqlite3 sqlite3:) srfi-1 posix regex regex-case srfi-69 (prefix base64 base64:) readline apropos json http-client directory-utils typed-records - http-client srfi-18 extras format) + http-client srfi-18 extras format tcp6) ;; Added for csv stuff - will be removed ;; (use sparse-vectors) Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -76,37 +76,39 @@ ;; - initializes the connection object if this is the first access ;; - finds the "captain" and asks who to talk to for the given dbfname ;; - establishes the connection to the current dbowner ;; (define (rmt:connect alldat dbfname) - (let* ((ulexdat (let ((uconn (alldat-ulexdat alldat))) - (if uconn - uconn - (let* ((new-ulexdat (ulex:setup))) ;; establish connection to ulex - (alldat-ulexdat-set! alldat new-ulexdat) - (rmt:setup-ulex alldat) - new-ulexdat))))) + (let* ((ulexdat (or (alldat-ulexdat alldat) + (rmt:setup-ulex alldat)))) (ulex:connect ulexdat dbfname))) + +;; setup the remote calls +(define (rmt:setup-ulex alldat) + (let* ((new-ulexdat (ulex:setup))) ;; establish connection to ulex + (alldat-ulexdat-set! alldat new-ulexdat) + (let ((udata (alldat-ulexdat alldat))) + ;; register all needed procs + (ulex:register-handler udata 'ping common:get-full-version) + (ulex:register-handler udata 'login common:get-full-version) ;; force setup of the connection + new-ulexdat))) ;; set up a connection to the current owner of the dbfile associated with rid ;; then send the query to that dbfile owner and wait for a response. ;; (define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected (let* ((alldat *alldat*) (areapath (alldat-areapath alldat)) - (dbfname (if (or (not rid)(< rid 1)) ;; this is the criteria for "main.db" + (dbtype (if (or (not rid)(< rid 1)) ;; this is the criteria for "main.db" + 'main 'runs)) + (dbfname (if (eq? dbtype 'main) "main.db" (conc rid ".db"))) (dbfile (conc areapath "/.db/" dbfname)) - (ulexconn (rmt:connect alldat dbfname))) + (ulexconn (rmt:connect alldat dbfname dbtype))) (rmt:open-qry-close-locally cmd 0 params))) -;; setup the remote calls -(define (rmt:setup-ulex alldat) - (let ((udata (alldat-ulexdat alldat))) - (ulex:register-handler udata 'ping common:get-full-version) - )) ;; ;; ;; #;(common:telemetry-log (conc "rmt:"(->string cmd)) ;; ;; #;(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected ;; ;; ;; ;; #;(common:telemetry-log (conc "rmt:"(->string cmd)) Index: ulex/ulex.scm ================================================================== --- ulex/ulex.scm +++ ulex/ulex.scm @@ -37,10 +37,54 @@ (prefix sqlite3 sqlite3:) foreign tcp6 ;; ulex-netutil hostinfo) + +;;====================================================================== +;; KEY FUNCTIONS - THESE ARE TOO BE EXPOSED AND USED +;;====================================================================== + +;; connection setup and management functions + +;; This is the basic setup command. Must always be +;; called before connecting to a db using connect. +;; +;; find or become the captain +;; setup and return a ulex object +;; +(define (setup) + (let* ((udata (make-udat)) + (cpkts (get-all-captain-pkts udata)) ;; read captain pkts + (captn (get-winning-pkt cpkts))) + (if captn + (let* ((port (alist-ref 'port captn)) + (host (alist-ref 'host captn)) + (ipaddr (alist-ref 'ipaddr captn)) + (pid (alist-ref 'pid captn)) + (Z (alist-ref 'Z captn))) + (udat-captain-address-set! udata ipaddr) + (udat-captain-host-set! udata host) + (udat-captain-port-set! udata port) + (udat-captain-pid-set! udata pid) + (if (ping udata ipaddr port) + udata + (begin + (remove-captain-pkt udata captn) + (setup)))) + (setup-as-captain udata)) ;; this saves the thread to captain-thread and starts the thread + )) + +;; connect to a specific dbfile +(define (connect udata dbfname) + udata) + +(define (ping udata host-port) + (let ((cookie (make-cookie udata))) + (send udata host-port 'ping "just pinging" (current-seconds)) + ;; (mailbox-rec + )) ;;====================================================================== ;; network utilities ;;====================================================================== @@ -123,10 +167,12 @@ (handlers (make-hash-table)) (outgoing-conns (make-hash-table)) ;; host:port -> conn ;; app info (appname #f) (dbtypes (make-hash-table)) ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ] + ;; cookies + (cnum 0) ;; cookie num ) ;; struct for keeping track of others we are talking to (defstruct peer @@ -169,10 +215,30 @@ (if (eq? a b) (let ((az (alist-ref 'Z a)) (bz (alist-ref 'Z b))) (string>=? az bz)) (> ad bd)))))))) + +;; remove pkt associated with captn (the Z key .pkt) +;; +(define (remove-captain-pkt udata captn) + (let ((Z (alist-ref 'Z captn)) + (cpktdir (udat-cpkts-dir udata))) + (delete-file* (conc cpktdir "/" Z ".pkt")))) + + +;;====================================================================== +;; server primitives +;;====================================================================== + +(define (make-cookie udata) + (let ((newcnum (+ (udat-cnum udata)))) + (udat-cnum-set! udata newcnum) + (conc (udat-my-address udata) ":" + (udat-my-port udata) "-" + (udat-my-pid udata) "-" + newcnum))) ;; create a tcp listener and return a populated udat struct with ;; my port, address, hostname, pid etc. ;; return #f if fail to find a port to allocate. ;; @@ -250,31 +316,42 @@ (define (get-peer-ports udata host-port #!optional (hostname #f)(pid #f)) (let ((pdat (get-peer-dat udata host-port hostname pid))) (values (peer-inp pdat)(peer-oup pdat)))) -;; send back ack, amusing I suppose that this looks almost like what -;; tcp itself does ... +;; send structured data to recipient +;; +;; NOTE: qrykey is what was called the "cookie" previously ;; -(define (reply udata host-port handler qrykey data #!optional (hostname #f)(pid #f)) +(define (send udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())) (let-values (((inp oup)(get-peer-ports udata host-port hostname pid))) + ;; CONTROL LINE: (note: removed the hostname - I don't think it adds much value + ;; + ;; handlerkey host:port pid qrykey params ... + ;; (write-line (conc handler " " (udat-my-address udata) ":" (udat-my-port udata) " " - (udat-my-hostname udata) " " - (udat-my-pid udata) " " - qrykey) + ;; (udat-my-hostname udata) " " + (udat-my-pid udata) " " + qrykey + (if (null? params) "" (conc " " (string-intersperse params " ")))) oup) - (write-line data oup))) ;; we must send a second line - for the ack let it be the qrykey + (write-line data oup) + ;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE! + ;; (there is a listener for handling that) + )) (define (add-to-work-queue udata . blah) #f) -;; send back ack +;; send back ack - this is tcp we are talking about, do we really need an ack? +;; +;; NOTE: No need to send back host:port of self - that is locked in by qrykey ;; -(define (send-ack udata host-port qrykey #!optional (hostname #f)(pid #f)) - (reply udata "ack" qrykey qrykey hostname pid)) ;; we must send a second line - for the ack let it be the qrykey +(define (send-ack udata host-port qrykey) ;; #!optional (hostname #f)(pid #f)) + (send udata host-port "ack" qrykey qrykey)) ;; we must send a second line - for the ack let it be the qrykey ;; ;; (define (ulex-handler udata) (let* ((serv-listener (udat-serv-listener udata))) @@ -284,23 +361,23 @@ ;; data (let loop ((state 'start)) (let* ((controldat (read-line inp)) (data (read-line inp))) (match (string-split controldat) - ((handlerkey host:port hostname pid qrykey params ...) + ((handlerkey host:port pid qrykey cookie params ...) (case (string->symbol handlerkey) ((ack)(print "Got ack!")) ((ping) (let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f)) (val (if proc (proc) "gotping"))) - (reply udata host:port "version" qrykey val))) + (send udata host:port "version" qrykey cookie val))) ((rucaptain) - (reply udata host:port "iamcaptain" qrykey (if (udat-my-cpkt-key udata) + (send udata host:port "iamcaptain" qrykey (if (udat-my-cpkt-key udata) "yes" "no"))) (else - (send-ack udata host:port qrykey hostname pid) + (send-ack udata host:port qrykey) (add-to-work-queue udata (get-peer-dat udata host:port) handlerkey data))) (else (print "BAD DATA? handler=" handlerkey " data=" data))))) (loop state))))) ;; add a proc to the handler list @@ -308,10 +385,16 @@ (hash-table-set! (udat-handlers udata) key proc)) ;;====================================================================== ;; Generic db handling +;; setup a inmem db instance +;; open connection to on-disk db +;; sync on-disk db to inmem +;; get lock in on-disk db for dbowner of this db +;; put sync-proc, init-proc, on-disk handle, inmem handle in dbconn stuct +;; return the stuct ;;====================================================================== (defstruct dbconn (inmem #f) (conn #f) @@ -390,41 +473,10 @@ ;; open databases, do initial sync (define (ulexdb-sync dbconndat udata) #f) -;;====================================================================== -;; connection setup and management functions -;;====================================================================== - -;; find or become the captain, return a ulex object -;; -(define (setup) - (let* ((udata (make-udat)) - (cpkts (get-all-captain-pkts udata)) ;; read captain pkts - (captn (get-winning-pkt cpkts))) - (if captn - (let* ((port (alist-ref 'port captn)) - (host (alist-ref 'host captn)) - (ipaddr (alist-ref 'ipaddr captn)) - (pid (alist-ref 'pid captn))) - (udat-captain-address-set! udata ipaddr) - (udat-captain-host-set! udata host) - (udat-captain-port-set! udata port) - (udat-captain-pid-set! udata pid) - ;;(if (ping-captain udata) - ;; udata - ;; (begin - ;; (remove-captain-pkt udata captn) - ;; (setup))) - udata) - (setup-as-captain udata)) ;; this saves the thread to captain-thread and starts the thread - )) - -(define (connect udata dbfname) - udata) - ) ;; END OF ULEX ;;; ;;====================================================================== ;;; ;; D E B U G H E L P E R S