Index: ulex/ulex.scm ================================================================== --- ulex/ulex.scm +++ ulex/ulex.scm @@ -256,11 +256,11 @@ ;; (define (setup-as-captain udata) (if (start-server-find-port udata) ;; puts the server in udata (if (create-captain-pkt udata) (let* ((th (make-thread (lambda () - (ulex-handler udata)) "Captain handler"))) + (ulex-handler-loop udata)) "Captain handler"))) (udat-handler-thread-set! udata th) (thread-start! th)) #f) #f)) @@ -449,76 +449,75 @@ (if (eq? res 'MBOX_TIMEOUT) #f res)) #f))) ;; #f means failed to communicate -;; handles the incoming messages and dispatches to queues +;; +(define (ulex-handler udata controldat data) + (print "controldat: " controldat " data: " data) + (match (string-split controldat) + ((handlerkey host-port pid qrykey params ...) + (print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params) + (case (string->symbol handlerkey) + ((ack)(print "Got ack!")) + ((ping) ;; special case - return result immediately on the same connection + (let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f)) + (val (if proc (proc) "gotping")) + (peer (make-peer addr-port: host-port pid: pid)) + (dbshash (udat-dbowners udata))) + (peer-dbs-set! peer params) ;; params for ping is list of dbs owned by pinger + (for-each (lambda (dbfile) + (hash-table-set! dbshash dbfile host-port)) + params) ;; register each db in the dbshash + (if (not (hash-table-exists? (udat-peers udata) host-port)) + (hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers + qrykey)) ;; End of ping + ((goodbye) + ;; remove all traces of the caller in db ownership etc. + (let* ((peer (hash-table-ref/default (udat-peers udata) host-port #f)) + (dbs (if peer (peer-dbs peer) '())) + (dbshash (udat-dbowners udata))) + (for-each (lambda (dbfile)(hash-table-delete! dbshash dbfile)) dbs) + (hash-table-delete! (udat-peers udata) host-port) + qrykey)) + ((dropcaptain) + ;; remove all traces of the captain + (udat-captain-address-set! udata #f) + (udat-captain-host-set! udata #f) + (udat-captain-port-set! udata #f) + (udat-captain-pid-set! udata #f) + qrykey) + ((rucaptain) ;; remote is asking if I'm the captain + (if (udat-my-cpkt-key udata) "yes" "no")) + ((whoowns) ;; given a db name who do I send my queries to + ;; look up the file in handlers, if have an entry ping them to be sure + ;; they are still alive and then return that host:port. + ;; if no handler found or if the ping fails pick from peers the oldest that + ;; is managing the fewest dbs + #f) + (else + (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data) + #f))) + (else + (print "BAD DATA? controldat=" controldat " data=" data) + #f)));; handles the incoming messages and dispatches to queues + ;; -(define (ulex-handler udata) +(define (ulex-handler-loop udata) (let* ((serv-listener (udat-serv-listener udata))) - (print "serv-listner: " serv-listener) ;; data comes as two lines ;; handlerkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db] ;; data (let loop ((state 'start)) (let-values (((inp oup)(tcp-accept serv-listener))) - (print "got here: inp=" inp " oup=" oup) (let* ((controldat (read-line inp)) - (data (read-line inp))) - (print "controldat: " controldat " data: " data) - (match (string-split controldat) - ((handlerkey host-port pid qrykey params ...) - (print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params) - (case (string->symbol handlerkey) - ((ack)(print "Got ack!")) - ((ping) ;; special case - return result immediately on the same connection - (let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f)) - (val (if proc (proc) "gotping")) - (peer (make-peer addr-port: host-port pid: pid)) - (dbshash (udat-dbowners udata))) - (peer-dbs-set! peer params) ;; params for ping is list of dbs owned by pinger - (for-each (lambda (dbfile) - (hash-table-set! dbshash dbfile host-port)) - params) ;; register each db in the dbshash - (if (not (hash-table-exists? (udat-peers udata) host-port)) - (hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers - (write-line qrykey oup) - (close-input-port inp) - (close-output-port oup))) ;; End of ping - ((goodbye) - ;; remove all traces of the caller in db ownership etc. - (let* ((peer (hash-table-ref/default (udat-peers udata) host-port #f)) - (dbs (if peer (peer-dbs peer) '())) - (dbshash (udat-dbowners udata))) - (for-each (lambda (dbfile)(hash-table-delete! dbshash dbfile)) dbs) - (hash-table-delete! (udat-peers udata) host-port) - (write-line qrykey oup) - (close-input-port inp) - (close-output-port oup))) - ((dropcaptain) - ;; remove all traces of the captain - (udat-captain-address-set! udata #f) - (udat-captain-host-set! udata #f) - (udat-captain-port-set! udata #f) - (udat-captain-pid-set! udata #f) - (write-line qrykey oup) - (close-input-port inp) - (close-output-port oup)) - ((rucaptain) ;; remote is asking if I'm the captain - (write-line (if (udat-my-cpkt-key udata) "yes" "no")) - (close-input-port inp) - (close-output-port oup)) - ((whoowns) ;; given a db name who do I send my queries to - ;; look up the file in handlers, if have an entry ping them to be sure - ;; they are still alive and then return that host:port. - ;; if no handler found or if the ping fails pick from peers the oldest that - ;; is managing the fewest dbs - #f) - (else - (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data)))) - (else (print "BAD DATA? controldat=" controldat " data=" data))))) - (loop state)))) + (data (read-line inp)) + (resp (ulex-handler udata controldat data))) + (if resp (write-line resp oup)) + (close-input-port inp) + (close-output-port oup)) + (loop state))))) ;; add a proc to the handler list (define (register-handler udata key proc) (hash-table-set! (udat-handlers udata) key proc))