Index: nng-trial/nng-test.scm ================================================================== --- nng-trial/nng-test.scm +++ nng-trial/nng-test.scm @@ -1,15 +1,21 @@ (import (chicken io) + (chicken file) + (chicken file posix) (chicken string) (chicken process-context) (chicken process-context posix) miscmacros nng srfi-18 + srfi-69 test matchable - typed-records) + typed-records + system-information + directory-utils + ) (define help "Usage: nng-test COMMAND where COMMAND is one of: dotest : run the basic req/rep test ") @@ -53,45 +59,95 @@ (test "inproc req-rep" "message" (req-rep-test address-inproc-1))) (test-exit)) -(defstruct srv - myaddr - remaddr - req - rep - name) - -(define (server-setup myname myaddr remoteaddr) - (let* ((srvdat (make-srv))) - (srv-myaddr-set! srvdat myaddr) - (srv-remaddr-set! srvdat remoteaddr) - (srv-rep-set! srvdat (make-listening-reply-socket myaddr)) - (srv-req-set! srvdat (make-dialed-request-socket myaddr)) - (srv-name-set! srvdat myname) - srvdat)) - +;; talking to self here... +;; (define (send-n-messages n srvdat) (let* ((name (srv-name srvdat))) (let loop ((i 0)) (if (< i n) (begin - (print "send: "(nng-send (srv-req srvdat) (conc name "-" i))) - (print "receive: "(nng-recv (srv-rep srvdat))) + (nng-send (srv-req srvdat) (conc name "-" i)) + (print "received: "(nng-recv (srv-rep srvdat))) (loop (+ i 1))))))) + +;; this should be run in a thread +(define (run-listener-responder socket myaddr) + (let loop ((status 'running)) + (let* ((msg (nng-recv socket)) + (response (process-message msg))) + (if (not (eq? response 'done)) + (begin + (nng-send socket response) + (loop status)))))) + +(define *channels* (make-hash-table)) + +(define (call channels msg addr) + (let* ((csocket (hash-table-ref/default channels addr #f)) + (socket (or csocket (make-dialed-request-socket addr)))) + (nng-send socket msg) + (print "Sent: "msg", received: "(nng-recv socket)) + (if (not (hash-table-exists? channels addr)) + (hash-table-set! channels addr socket)))) + +;; start => hello 0 +;; hello 0 => hello 1 +;; hello 1 => hello 2 +;; ... +;; hello 11 => 'done +;; +(define (process-message mesg) + (let ((parts (string-split mesg))) + (match + parts + ((msg c) + (let ((count (string->number c))) + (if (> count 10) + 'done + (conc msg " " (if count count 0))))) + ((msg) + (conc msg " 0")) + (else + "hello 0")))) (define (close-srv srvdat) (nng-close! (srv-rep srvdat))) (match (command-line-arguments) (("do-test")(do-test)) - (("send-n" n myaddr toaddr) - (let ((n-num (string->number n)) - (sdat (server-setup "just testing" myaddr toaddr))) - (send-n-messages n-num sdat) - (close-srv sdat))) + ((run myaddr) + ;; start listener + ;; put myaddr into file by host-pid in .runners + ;; for 1 minute + ;; get all in .runners + ;; call each with a message + ;; + (let* ((socket (make-listening-reply-socket myaddr)) + (rfile (conc ".runners/"(get-host-name)"-"(current-process-id))) + (th1 (make-thread (lambda () + (run-listener-responder socket myaddr) + (delete-file* rfile) + (exit)) + "responder"))) + (if (not (and (file-exists? ".runners") + (directory? ".runners"))) + (create-directory ".runners" #t)) + (with-output-to-file rfile + (lambda () + (print myaddr))) + (thread-start! th1) + (let loop ((entries '())) + (if (null? entries) + (loop (glob ".runners/*")) + (let* ((entry (car entries)) + (destaddr (with-input-from-file entry read-line))) + (call *channels* (conc "hello-from-"destaddr) destaddr) + (thread-sleep! 0.25) + (loop (cdr entries))))))) ((cmd)(print "ERROR: command "cmd", not recognised.\n\n"help)) (else (print help)))