Index: ulex/ulex.scm ================================================================== --- ulex/ulex.scm +++ ulex/ulex.scm @@ -210,11 +210,10 @@ (dbtypes (make-hash-table)) ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ] ;; cookies (cnum 0) ;; cookie num ) - (define (udat-my-host-port udata) (if (and (udat-my-address udata)(udat-my-port udata)) (conc (udat-my-address udata) ":" (udat-my-port udata)) #f)) @@ -325,10 +324,25 @@ ;; (define (remove-captain-pkt udata captn) (let ((Z (alist-ref 'Z captn)) (cpktdir (udat-cpkts-dir udata))) (delete-file* (conc cpktdir "/" Z ".pkt")))) + +;; call all known peers and tell them to delete their info on the captain +;; thus forcing them to re-read pkts and connect to a new captain +;; call this when the captain needs to exit and if an older captain is +;; detected. Due to delays in sending file meta data in NFS multiple +;; captains can be initiated in a "Storm of Captains", book soon to be +;; on Amazon +;; +(define (drop-captain udata) + (let* ((peers (hash-table-keys (udat-peers udata))) + (cookie (make-cookie udata))) + (for-each + (lambda (host-port) + (send udata host-port 'dropcaptain cookie "nomsg" retval: #t)) + peers))) ;;====================================================================== ;; server primitives ;;====================================================================== @@ -360,13 +374,10 @@ (udat-my-hostname-set! udata (get-host-name)) (udat-serv-listener-set! udata tlsn) udata)) (define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f)) - ;; I'm currently very fuzzy on whether it makes sense to be reusing the outgoing connections. - ;; at the other end of the line I think the reciever has closed the ports - thus each message - ;; requires new connection? (let* ((pdat (or (udat-get-peer udata host-port) (handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC exn #f (let ((npdat (make-peer addr-port: host-port))) @@ -438,30 +449,11 @@ (if (eq? res 'MBOX_TIMEOUT) #f res)) #f))) ;; #f means failed to communicate -(define (add-to-work-queue udata peer-dat handlerkey qrykey data) - (let ((wdat (make-work peer-dat: peer-dat handlerkey: handlerkey qrykey: qrykey data: data))) - (if (udat-busy udata) - (queue-add! (udat-work-queue udata) wdat) - (process-work udata wdat)) ;; passing in wdat tells process-work to first process the passed in wdat - )) - -(define (do-work udata wdat) - #f) - -(define (process-work udata #!optional wdat) - (if wdat (do-work udata wdat)) ;; process wdat - (let ((wqueue (udat-work-queue udata))) - (if (not (queue-empty? wqueue)) - (let loop ((wd (queue-remove! wqueue))) - (do-work udata wd) - (if (not (queue-empty? wqueue)) - (loop (queue-remove! wqueue))))))) - -;; +;; handles the incoming messages and dispatches to queues ;; (define (ulex-handler udata) (let* ((serv-listener (udat-serv-listener udata))) (print "serv-listner: " serv-listener) ;; data comes as two lines @@ -500,10 +492,19 @@ (for-each (lambda (dbfile)(hash-table-delete! dbshash dbfile)) dbs) (hash-table-delete! (udat-peers udata) host-port) (write-line qrykey oup) (close-input-port inp) (close-output-port oup))) + ((dropcaptain) + ;; remove all traces of the captain + (udat-captain-address-set! udata #f) + (udat-captain-host-set! udata #f) + (udat-captain-port-set! udata #f) + (udat-captain-pid-set! udata #f) + (write-line qrykey oup) + (close-input-port inp) + (close-output-port oup)) ((rucaptain) ;; remote is asking if I'm the captain (write-line (if (udat-my-cpkt-key udata) "yes" "no")) (close-input-port inp) (close-output-port oup)) ((whoowns) ;; given a db name who do I send my queries to @@ -519,10 +520,33 @@ ;; add a proc to the handler list (define (register-handler udata key proc) (hash-table-set! (udat-handlers udata) key proc)) + +;;====================================================================== +;; work queues +;;====================================================================== + +(define (add-to-work-queue udata peer-dat handlerkey qrykey data) + (let ((wdat (make-work peer-dat: peer-dat handlerkey: handlerkey qrykey: qrykey data: data))) + (if (udat-busy udata) + (queue-add! (udat-work-queue udata) wdat) + (process-work udata wdat)) ;; passing in wdat tells process-work to first process the passed in wdat + )) + +(define (do-work udata wdat) + #f) + +(define (process-work udata #!optional wdat) + (if wdat (do-work udata wdat)) ;; process wdat + (let ((wqueue (udat-work-queue udata))) + (if (not (queue-empty? wqueue)) + (let loop ((wd (queue-remove! wqueue))) + (do-work udata wd) + (if (not (queue-empty? wqueue)) + (loop (queue-remove! wqueue))))))) ;;====================================================================== ;; Generic db handling ;; setup a inmem db instance ;; open connection to on-disk db