Index: ulex/ulex.scm ================================================================== --- ulex/ulex.scm +++ ulex/ulex.scm @@ -98,10 +98,12 @@ (servkey . s) ;; server key - this needs to match at server end or reject the msg (format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json (data . d) ;; base64 encoded slln data ))) +;; struct for keeping track of our world + (defstruct udat (captain-address #f) (captain-host #f) (captain-port #f) (captain-pid #f) @@ -111,10 +113,24 @@ (my-address #f) (my-hostname #f) (my-port #f) (my-pid (current-process-id)) (serv-listener #f) + (handler-thread #f) + (handlers '()) + (outgoing-conns (make-hash-table)) ;; host:port -> conn + ) + +;; struct for keeping track of others we are talking to + +(defstruct peer + (addr-port #f) + (hostname #f) + (pid #f) + (inp #f) ;; input port from the peer + (oup #f) ;; output port to the peer + (owns '()) ;; list of databases this peer is currently handling ) ;;====================================================================== ;; Captain pkt functions ;;====================================================================== @@ -134,11 +150,11 @@ (read-pkt->alist pkt-file pktspec: pkt-spec)) all-pkt-files))) ;; sort by D then Z, return one, choose the oldest then ;; differentiate if needed using the Z key -;; +;;l (define (get-winning-pkt pkts) (if (null? pkts) #f (car (sort pkts (lambda (a b) (let ((ad (string->number (alist-ref 'D a))) @@ -151,17 +167,17 @@ ;; create a tcp listener and return a populated udat struct with ;; my port, address, hostname, pid etc. ;; return #f if fail to find a port to allocate. ;; -(define (start-server-find-port udata #!optional (port 9999)) +(define (start-server-find-port udata #!optional (port 4242)) (handle-exceptions exn - (if (< port 65535)(start-server-find-port (+ port 1)) #f) - (start-server udata port))) + (if (< port 65535)(connect-server-find-port udata (+ port 1)) #f) + (connect-server udata port))) -(define (start-server udata port) +(define (connect-server udata port) ;; (tcp-listener-socket LISTENER)(socket-name so) ;; sockaddr-address, sockaddr-port, sockaddr->string (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]]) (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname))) (udat-my-address-set! udata addr) @@ -193,11 +209,77 @@ pktdir pktdat pktspec: pktspec ptype: 'captain)) (udat-my-cpkt-key udata)))) - + +;; NB// This needs to be started in a thread +;; +;; setup to be a captain +;; - start server +;; - create pkt +;; - start server port handler +;; +(define (setup-as-captain udata) + (if (start-server-find-port udata) ;; puts the server in udata + (if (create-captain-pkt udata) + (let* ((th (make-thread (lambda () + (ulex-handler udata)) "Captain handler"))) + (udat-handler-thread-set! udata th) + (thread-start! th)) + #f) + #f)) + +(define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f)) + (let* ((pdat (or (hash-table-ref/default (udat-outgoing-conns udata) host-port #f) + (let ((npdat (make-peer addr-port: host-port))) + (if hostname (peer-hostname-set! npdat hostname)) + (if pid (peer-pid-set! npdat pid)) + (let-values (((ninp noup)(tcp-connect host-port))) + (peer-inp-set! npdat ninp) + (peer-oup-set! npdat noup)) + (hash-table-set! (udat-outgoing-conns udata) host-port npdat) + npdat)))) + pdat)) + +(define (get-peer-ports udata host-port hostname pid) + (let ((pdat (get-peer-dat udata host-port hostname pid))) + (values (peer-inp pdat)(peer-oup pdat)))) + +;; send back ack +;; +(define (send-ack udata qrykey oup) + (write-line (conc + "ack " + (udat-my-address udata) ":" (udat-my-port udata) " " + (udat-my-hostname udata) " " + (udat-my-pid udata) " " + qrykey) + oup) + (write-line qrykey oup)) ;; we must send a second line - for the ack let it be the qrykey + +;; +;; +(define (ulex-handler udata) + (let* ((serv-listener (udat-serv-listener udata))) + (let-values (((inp oup)(tcp-accept serv-listener))) + ;; data comes as two lines + ;; handlerkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db] + ;; data + (let loop ((state 'start)) + (let* ((controldat (read-line inp)) + (data (read-line inp))) + (match (string-split controldat) + ((handlerkey host:port hostname pid qrykey params ...) + (case (string->symbol handlerkey) + (else + (let-values (((pinp poup)(get-peer-ports udata host:port hostname pid))) + (send-ack udata qrykey poup)) + (add-to-work-queue (get-peer-dat udata host:port) handlerkey data)))) + (else (print "BAD DATA? handler=" handler " data=" data)))) + (loop state))))) + ;;====================================================================== ;; connection setup and management functions ;;====================================================================== ;; find or become the captain, return a ulex object @@ -213,18 +295,18 @@ (pid (alist-ref 'pid captn))) (udat-captain-address-set! udata ipaddr) (udat-captain-host-set! udata host) (udat-captain-port-set! udata port) (udat-captain-pid-set! udata pid) + ;;(if (ping-captain udata) + ;; udata + ;; (begin + ;; (remove-captain-pkt udata captn) + ;; (setup))) udata) - ;; - ;; register captn here - ;; - ;; then run setup again - ;; - udata - ))) + (setup-as-captain udata)) ;; this saves the thread to captain-thread and starts the thread + )) (define (connect udata dbfname) udata) ) ;; END OF ULEX