Index: build-assist/ck5-eggs.list ================================================================== --- build-assist/ck5-eggs.list +++ build-assist/ck5-eggs.list @@ -12,10 +12,11 @@ format http-client itemsmod json linenoise +mailbox md5 message-digest nanomsg postgresql queues @@ -38,9 +39,10 @@ srfi-19 sxml-modifications sxml-serializer sxml-transforms system-information +tcp6 test typed-records uri-common z3 Index: ulex/ulex.scm ================================================================== --- ulex/ulex.scm +++ ulex/ulex.scm @@ -23,50 +23,53 @@ ;; NOTES: ;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity. ;; ;;====================================================================== -(use mailbox) - (module ulex * (import scheme - - chicken - data-structures - files - foreign - hostinfo + chicken.base + chicken.file + chicken.time + chicken.condition + chicken.string + chicken.sort + + address-info mailbox matchable - ports extras - posix + queues regex regex-case srfi-1 srfi-18 srfi-4 srfi-69 + system-information tcp6 typed-records ) ;; udat struct, used by both caller and callee ;; instantiated as uconn by convention ;; (defstruct udat ;; the listener side + (port #f) (host-port #f) (socket #f) ;; the peers (peers (make-hash-table)) ;; host:port->peer ;; work handling (work-queue (make-queue)) + (work-proc #f) ;; set by user (cnum 0) ;; cookie number (mboxes (make-hash-table)) - ) + (avail-cmboxes '()) ;; list of ( . ) for re-use + ) ;; struct for keeping track of others we are talking to ;; (defstruct pdat (host-port #f) @@ -80,65 +83,10 @@ (oup #f) (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59) (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes ) -;; send structured data to recipient -;; -;; NOTE: qrykey is what was called the "cookie" previously -;; -;; retval tells send to expect and wait for return data (one line) and return it or time out -;; this is for ping where we don't want to necessarily have set up our own server yet. -;; -(define (send udata host-port cmd qrykey data - #!key (hostname #f)(pid #f)(params '())(retval #f)) - (let* ((my-host-port (udat-my-host-port udata)) - (isme (equal? host-port my-host-port)) ;; am I calling - ;; myself? - (dat (list - cmd ;; " " - my-host-port ;; " " - (udat-my-pid udata) ;; " " - qrykey - params ;;(if (null? params) "" (conc " " - ;;(string-intersperse params " "))) - ))) - ;; (print "send isme is " (if isme "true!" "false!") ", - ;; my-host-port: " my-host-port ", host-port: " host-port) - (if isme - (ulex-handler udata dat data) - (handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE - ;; SPECIFIC - exn - #f - (let-values (((inp oup)(tcp-connect host-port))) - ;; - ;; CONTROL LINE: - ;; cmdkey host:port pid qrykey params ... - ;; - (let ((res - (if (and inp oup) - (let* () - (if my-host-port - (begin - (write dat oup) - (write data oup) ;; send as sexpr - ;; (print "Sent dat: " dat " data: " data) - (if retval - (read inp) - #t)) - (begin - (print "ERROR: send called but no receiver has been setup. Please call setup first!") - #f)) - ;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE! - ;; (there is a listener for handling that) - ) - #f))) ;; #f means failed to connect and send - (close-input-port inp) - (close-output-port oup) - res)))))) - ;;====================================================================== ;; listener ;;====================================================================== ;; create a tcp listener and return a populated udat struct with ;; my port, address, hostname, pid etc. @@ -158,21 +106,15 @@ (define (connect-listener uconn port) ;; (tcp-listener-socket LISTENER)(socket-name so) ;; sockaddr-address, sockaddr-port, sockaddr->string (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]]) (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname))) - (udat-my-address-set! uconn addr) - (udat-my-port-set! uconn port) - (udat-my-hostname-set! uconn (get-host-name)) - (udat-serv-listener-set! uconn tlsn) + (udat-port-set! uconn port) + (udat-host-port-set! uconn (conc addr":"port)) + (udat-socket-set! uconn tlsn) uconn)) -(define (udat-my-host-port uconn) - (if (and (udat-my-address uconn)(udat-my-port uconn)) - (conc (udat-my-address uconn) ":" (udat-my-port uconn)) - #f)) - ;;====================================================================== ;; peers and connections ;;====================================================================== ;; send structured data to recipient @@ -184,167 +126,147 @@ ;; ;; NOTE: see below for beginnings of code to allow re-use of tcp connections ;; - I believe (without substantial evidence) that re-using connections will ;; be beneficial ... ;; -(define (send udata host-port cmd qrykey data - #!key (hostname #f)(pid #f)(params '())(retval #f)) - (let* ((my-host-port (udat-host-port udata)) - (isme (equal? host-port my-host-port)) ;; am I calling - ;; myself? - (dat (list - cmd ;; " " - my-host-port ;; " " - qrykey - params - ))) +(define (send udata host-port qrykey cmd params) + (let* ((my-host-port (udat-host-port udata)) ;; remote will return to this + (isme (equal? host-port my-host-port)) ;; calling myself? + ;; dat is a self-contained work block that can be sent or handled locally + (dat (list my-host-port qrykey cmd params)) + ) (if isme - (ulex-handler udata dat data) - (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC + (ulex-handler udata dat) ;; no transmission needed + (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC? exn - #f + #f (let-values (((inp oup)(tcp-connect host-port))) - ;; - ;; CONTROL LINE: - ;; cmdkey host:port pid qrykey params ... - ;; (let ((res (if (and inp oup) - (if my-host-port - (begin - (write dat oup) - (write data oup) ;; send as sexpr - ;; (print "Sent dat: " dat " data: " data) - (if retval - (read inp) - #t)) - (begin - (print "ERROR: send called but no receiver has been setup. Please call setup first!") - #f)) - ;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE! - ;; (there is a listener for handling that) - #f))) ;; #f means failed to connect and send + (begin + (write dat oup) + (read inp)) ;; yes, we always want an ack + (begin + (print "ERROR: send called but no receiver has been setup. Please call setup first!") + #f)))) (close-input-port inp) (close-output-port oup) - res)))))) + res)))))) ;; res will always be 'ack ;; send a request to the given host-port and register a mailbox in udata ;; wait for the mailbox data and return it ;; -(define (send-receive udata host-port cmd qrykey data #!key (hostname #f)(pid #f)(params '())(timeout 20)) - (let ((mbox (make-mailbox)) - (mbox-time (current-milliseconds)) - (mboxes (udat-mboxes udata))) - (hash-table-set! mboxes qrykey mbox) - (if (send udata host-port cmd qrykey data hostname: hostname pid: pid params: params) - (let* ((mbox-timeout-secs timeout) +(define (send-receive uconn host-port cmd data) + (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse? + (qrykey (car cmbox)) + (mbox (cdr cmbox)) + (mbox-time (current-milliseconds))) + (if (eq? (send uconn host-port qrykey cmd data) 'ack) + (let* ((mbox-timeout-secs 120) ;; timeout) (mbox-timeout-result 'MBOX_TIMEOUT) (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) (mbox-receive-time (current-milliseconds))) - (hash-table-delete! mboxes qrykey) (if (eq? res 'MBOX_TIMEOUT) - #f + #f ;; convert to raising exception? res)) #f))) ;; #f means failed to communicate -;; -(define (ulex-handler udata controldat data) - (print "controldat: " controldat " data: " data) - (match controldat ;; (string-split controldat) - ((cmdkey host-port pid qrykey params ...) - ;; (print "cmdkey: " cmdkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params) - (case cmdkey ;; (string->symbol cmdkey) - ((ack)(print "Got ack!")) - ((ping) ;; special case - return result immediately on the same connection - (let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f)) - (val (if proc (proc) "gotping")) - (peer (make-peer addr-port: host-port pid: pid)) - (dbshash (udat-dbowners udata))) - (peer-dbs-set! peer params) ;; params for ping is list of dbs owned by pinger - (for-each (lambda (dbfile) - (hash-table-set! dbshash dbfile host-port)) ;; WRONG? - params) ;; register each db in the dbshash - (if (not (hash-table-exists? (udat-peers udata) host-port)) - (hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers - qrykey)) ;; End of ping +;;====================================================================== +;; responder side +;;====================================================================== + +;; take a request, rdata, and if not immediate put it in the work queue +;; +;; Reserved cmds; ack ping goodbye response +;; +(define (ulex-handler uconn rdata) + (print "ulex-handler received data: "rdata) + (match rdata ;; (string-split controldat) + ((rem-host-port qrykey cmd params) ;; cmdkey host-port pid qrykey params ...) + (case cmd + ((ack )(print "Got ack! But why? Should NOT get here.") 'ack) + ((ping) 'ack) ;; special case - return result immediately on the same connection ((goodbye) - ;; remove all traces of the caller in db ownership etc. - (let* ((peer (hash-table-ref/default (udat-peers udata) host-port #f)) - (dbs (if peer (peer-dbs peer) '())) - (dbshash (udat-dbowners udata))) - (for-each (lambda (dbfile)(hash-table-delete! dbshash dbfile)) dbs) - (hash-table-delete! (udat-peers udata) host-port) - qrykey)) - ((immediate read-only normal low-priority) ;; do this work immediately - ;; host-port (caller), pid (caller), qrykey (cookie), params <= all from first line - ;; data => a single line encoded however you want, or should I build json into it? - (print "cmdkey=" cmdkey) - (let* ((pdat (get-peer-dat udata host-port))) - (match params ;; dbfile prockey procparam - ((dbfile prockey procparam) - (case cmdkey - ((immediate read-only) - (process-request udata pdat dbfile qrykey prockey procparam data)) - ((normal low-priority) ;; split off later and add logic to support low priority - (add-to-work-queue udata pdat dbfile qrykey prockey procparam data)) - (else - #f))) - (else - (print "INFO: params=" params " cmdkey=" cmdkey " controldat=" controldat) - #f)))) - (else - (add-to-work-queue udata (get-peer-dat udata host-port) cmdkey qrykey data) - #f))) - (else - (print "BAD DATA? controldat=" controldat " data=" data) - #f)));; handles the incoming messages and dispatches to queues - -;; -(define (ulex-cmd-loop udata) - (let* ((serv-listener (udat-serv-listener udata))) - ;; data comes as two lines - ;; cmdkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db] - ;; data + ;; just clear out references to the caller + 'ack) + ((response) ;; this is a result from remote processing, send it as mail ... + (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) + (if mbox + (mailbox-send! mbox params) ;; params here is our result + (begin + (print "ERROR: received result but no associated mbox for cookie "qrykey) + #f)))) + ((else + (add-to-work-queue uconn rdata) + 'ack)))) + (else + (print "BAD DATA? controldat=" rdata) + 'ack) ;; send ack anyway? + )) + +;; given an already set up uconn start the cmd-loop +;; +(define (ulex-cmd-loop uconn) + (let* ((serv-listener (udat-socket uconn))) (let loop ((state 'start)) (let-values (((inp oup)(tcp-accept serv-listener))) - (let* ((controldat (read inp)) - (data (read inp)) - (resp (ulex-handler udata controldat data))) + (let* ((rdat (read inp)) + (resp (ulex-handler uconn rdat))) (if resp (write resp oup)) (close-input-port inp) (close-output-port oup)) (loop state))))) ;; add a proc to the cmd list, these are done symetrically (i.e. in all instances) ;; so that the proc can be dereferenced remotely ;; -(define (register-cmd udata key proc) - (hash-table-set! (udat-cmds udata) key proc)) - -;;====================================================================== -;; work queues -;;====================================================================== - -(define (add-to-work-queue udata peer-dat cmdkey qrykey data) - (let ((wdat (make-work peer-dat: peer-dat cmdkey: cmdkey qrykey: qrykey data: data))) - (if (udat-busy udata) - (queue-add! (udat-work-queue udata) wdat) - (process-work udata wdat)) ;; passing in wdat tells process-work to first process the passed in wdat - )) - -(define (do-work udata wdat) - #f) - -(define (process-work udata #!optional wdat) - (if wdat (do-work udata wdat)) ;; process wdat - (let ((wqueue (udat-work-queue udata))) - (if (not (queue-empty? wqueue)) - (let loop ((wd (queue-remove! wqueue))) - (do-work udata wd) - (if (not (queue-empty? wqueue)) - (loop (queue-remove! wqueue))))))) - - +(define (set-work-handler uconn proc) + (udat-work-proc-set! uconn proc)) + +;; run-listener does all the work of starting a listener in a thread +;; it then returns control +;; +(define (run-listener handler-proc) + (let* ((uconn (make-udat))) + (if (setup-listener uconn) + (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop")) + (th2 (make-thread (lambda ()(process-work-queue uconn)) "Ulex work queue processor"))) + (thread-start! th1) + (thread-start! th2) + ) + (begin + (print "ERROR: run-listener called without proper setup.") + (exit))))) + +;;====================================================================== +;; work queues - this is all happening on the listener side +;;====================================================================== + +;; rdata is (rem-host-port qrykey cmd params) + +(define (add-to-work-queue uconn rdata) + (queue-add! (udat-work-queue uconn) rdata)) + +(define (do-work uconn rdata) + (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change + ;; put this following into a do-work procedure + (match rdata + ((rem-host-port qrykey cmd params) + (let* ((result (proc rem-host-port qrykey cmd params))) + (send uconn rem-host-port qrykey result))) ;; could check for ack + (else + (print "ERROR: rdata "rdata", did not match rem-host-port qrykey cmd params"))))) + + +(define (process-work-queue uconn) + (let ((wqueue (udat-work-queue uconn)) + (proc (udat-work-proc uconn))) + (let loop () + (if (queue-empty? wqueue) + (thread-sleep! 0.1) + (let ((rdata (queue-remove! wqueue))) + (do-work uconn rdata))) + (loop)))) ;; below was to enable re-use of connections. This seems non-trivial so for ;; now lets open on each call ;; ;; ;; given host-port get or create peer struct @@ -379,14 +301,33 @@ ;;====================================================================== (define (make-cookie uconn) (let ((newcnum (+ (udat-cnum uconn) 1))) (udat-cnum-set! uconn newcnum) - (conc (udat-my-address uconn) ":" - (udat-my-port uconn) "-" + (conc (udat-host-port uconn) ":" newcnum))) +;; cookie/mboxes + +;; we store each mbox with a cookie ( . ) +;; +(define (get-cmbox uconn) + (if (null? (udat-avail-cmboxes uconn)) + (let ((cookie (make-cookie)) + (mbox (make-mailbox))) + (hash-table-set! (udat-mboxes uconn) cookie mbox) + `(cookie . mbox)) + (let ((cmbox (car (udat-avail-cmboxes uconn)))) + (udat-avail-cmboxes-set! uconn (cdr (udat-avail-cmboxes uconn))) + cmbox))) + +(define (put-cmbox uconn cmbox) + (udat-avail-cmboxes-set! uconn (cons cmbox (udat-avail-cmboxes uconn)))) + +;; peers + + ;;====================================================================== ;; network utilities ;;====================================================================== ;; NOTE: Look at address-info egg as alternative to some of this @@ -414,10 +355,17 @@ (define (get-all-ips-sorted) (sort (get-all-ips) ip-pref-less?)) (define (get-all-ips) - (map ip->string (vector->list - (hostinfo-addresses - (host-information (current-hostname)))))) + (map address-info-host + (filter (lambda (x) + (equal? (address-info-type x) "tcp")) + (address-infos (get-host-name))))) + +;; (map ip->string (vector->list +;; (hostinfo-addresses +;; (host-information (current-hostname)))))) ) + +(import ulex)