Index: ulex/ulex.scm ================================================================== --- ulex/ulex.scm +++ ulex/ulex.scm @@ -67,31 +67,55 @@ (Z (alist-ref 'Z captn))) (udat-captain-address-set! udata ipaddr) (udat-captain-host-set! udata host) (udat-captain-port-set! udata port) (udat-captain-pid-set! udata pid) - (if (ping udata (conc ipaddr ":" port)) - udata - (begin - (print "Found unreachable captain at " ipaddr ":" port ", removing pkt") - (remove-captain-pkt udata captn) - (setup)))) + (let-values (((success pingtime)(ping udata (conc ipaddr ":" port)))) + (if success + udata + (begin + (print "Found unreachable captain at " ipaddr ":" port ", removing pkt") + (remove-captain-pkt udata captn) + (setup))))) (begin (setup-as-captain udata) ;; this saves the thread to captain-thread and starts the thread - (setup))) - )) + (setup))))) ;; connect to a specific dbfile (define (connect udata dbfname dbtype) udata) +;; returns: success pingtime +;; +;; NOTE: causes the callee to store the info on this host along with the dbs this host currently owns +;; (define (ping udata host-port) - (let* ((cookie (make-cookie udata)) - (res (send udata host-port 'ping cookie (conc (current-seconds)) retval: #t))) - ;; (print "got res=" res) - (equal? res cookie) - )) + (let* ((start (current-milliseconds)) + (cookie (make-cookie udata)) + (dbs (udat-my-dbs udata)) + (msg (string-intersperse dbs " ")) + (res (send udata host-port 'ping cookie msg retval: #t)) + (delta (- (current-milliseconds) start))) + (values (equal? res cookie) delta))) + +;; returns: success pingtime +;; +;; NOTE: causes all references to this worker to be wiped out in the callee (ususally the captain) +;; +(define (goodbye-ping udata host-port) + (let* ((start (current-milliseconds)) + (cookie (make-cookie udata)) + (dbs (udat-my-dbs udata)) + (res (send udata host-port 'goodbye cookie "nomsg" retval: #t)) + (delta (- (current-milliseconds) start))) + (values (equal? res cookie) delta))) + +(define (goodbye-captain udata) + (let* ((host-port (udat-captain-host-port udata))) + (if host-port + (goodbye-ping udata host-port) + (values #f -1)))) ;;====================================================================== ;; network utilities ;;====================================================================== @@ -166,17 +190,19 @@ (my-cpkt-key #f) ;; put Z card here when I create a pkt for myself as captain (my-address #f) (my-hostname #f) (my-port #f) (my-pid (current-process-id)) + (my-dbs '()) ;; server and handler thread (serv-listener #f) ;; this processes server info (handler-thread #f) (mboxes (make-hash-table)) ;; key => mbox ;; other servers (peers (make-hash-table)) ;; host-port => peer record - (handlers (make-hash-table)) ;; dbfile => peer record + (dbowners (make-hash-table)) ;; dbfile => host-port + (handlers (make-hash-table)) ;; dbfile => proc (outgoing-conns (make-hash-table)) ;; host:port -> conn (work-queue (make-queue)) ;; most stuff goes here ;; (fast-queue (make-queue)) ;; super quick stuff goes here (e.g. ping) (busy #f) ;; is either of the queues busy, use to switch between queuing tasks or doing immediately ;; app info @@ -184,19 +210,30 @@ (dbtypes (make-hash-table)) ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ] ;; cookies (cnum 0) ;; cookie num ) + +(define (udat-my-host-port udata) + (if (and (udat-my-address udata)(udat-my-port udata)) + (conc (udat-my-address udata) ":" (udat-my-port udata)) + #f)) + +(define (udat-captain-host-port udata) + (if (and (udat-captain-address udata)(udat-captain-port udata)) + (conc (udat-captain-address udata) ":" (udat-captain-port udata)) + #f)) + ;; struct for keeping track of others we are talking to (defstruct peer (addr-port #f) (hostname #f) (pid #f) (inp #f) (oup #f) - (owns '()) ;; list of databases this peer is currently handling + (dbs '()) ;; list of databases this peer is currently handling ) (defstruct work (peer-dat #f) (handlerkey #f) @@ -323,11 +360,11 @@ (define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f)) ;; I'm currently very fuzzy on whether it makes sense to be reusing the outgoing connections. ;; at the other end of the line I think the reciever has closed the ports - thus each message ;; requires new connection? - (let* ((pdat (or (hash-table-ref/default (udat-outgoing-conns udata) host-port #f) + (let* ((pdat (or #f #;(hash-table-ref/default (udat-outgoing-conns udata) host-port #f) (handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC exn #f (let ((npdat (make-peer addr-port: host-port))) (if hostname (peer-hostname-set! npdat hostname)) @@ -441,20 +478,33 @@ ((handlerkey host-port pid qrykey params ...) (print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params) (case (string->symbol handlerkey) ((ack)(print "Got ack!")) ((ping) ;; special case - return result immediately on the same connection - (let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f)) - (val (if proc (proc) "gotping")) - (peer (make-peer addr-port: host-port pid: pid))) + (let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f)) + (val (if proc (proc) "gotping")) + (peer (make-peer addr-port: host-port pid: pid)) + (dbshash (udat-dbowners udata))) + (peer-dbs-set! peer params) ;; params for ping is list of dbs owned by pinger + (for-each (lambda (dbfile) + (hash-table-set! dbshash dbfile host-port)) + params) ;; register each db in the dbshash (if (not (hash-table-exists? (udat-peers udata) host-port)) (hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers (write-line qrykey oup) - #;(send udata host-port "version" qrykey val) - ) - (close-input-port inp) - (close-output-port oup)) + (close-input-port inp) + (close-output-port oup))) ;; End of ping + ((goodbye) + ;; remove all traces of the caller in db ownership etc. + (let* ((peer (hash-table-ref/default (udat-peers udata) host-port #f)) + (dbs (if peer (peer-dbs peer) '())) + (dbshash (udat-dbowners udata))) + (for-each (lambda (dbfile)(hash-table-delete! dbshash dbfile)) dbs) + (hash-table-delete! (udat-peers udata) host-port) + (write-line qrykey oup) + (close-input-port inp) + (close-output-port oup))) ((rucaptain) ;; remote is asking if I'm the captain (write-line (if (udat-my-cpkt-key udata) "yes" "no")) (close-input-port inp) (close-output-port oup)) ((whoowns) ;; given a db name who do I send my queries to