Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -165,12 +165,12 @@ ;; ;; ulex parameters ;; (work-method 'direct) ;; (return-method 'direct) ;; ulex parameters - (work-method 'mailbox) - (return-method 'mailbox) +;; (work-method 'mailbox) +;; (return-method 'mailbox) ;; fake out readline usage of toplevel-command (define (toplevel-command . a) #f) (define *didsomething* #f) (define *db* #f) ;; this is only for the repl, do not use in general!!!! Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -113,15 +113,15 @@ ;; info about me as a listener and my connections to db servers ;; stored (for now) in *db-serv-info* ;; (defstruct servdat - (host #f) + (host (get-host-name)) (port #f) (uuid #f) (dbfile #f) - (uconn #f) ;; this is the listener *FOR THIS PROCESS* + (uconn (make-udat host: (get-host-name))) ;; this is the ulex record *FOR THIS PROCESS* (mode #f) (status 'starting) (trynum 0) ;; count the number of ports we've tried (conns (make-hash-table)) ;; apath/dbname => conndat ) @@ -198,29 +198,12 @@ ;; (define (rmt:open-main-connection remdat apath) (let* ((fullpath (db:dbname->path apath ".db/main.db")) (conns (servdat-conns remdat)) (conn (rmt:get-conn remdat apath ".db/main.db")) ;; (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this - (start-rmt:run (lambda () - (let* ((th1 (make-thread (lambda ()(rmt:run (get-host-name))) "non-db mode server"))) - (thread-start! th1) - (thread-sleep! 1) - (let loop ((count 0)) - (assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection") - (if (or (not *db-serv-info*) - (not (servdat-uconn *db-serv-info*))) - (begin - (thread-sleep! 1) - (loop (+ count 1))) - (begin - (servdat-mode-set! *db-serv-info* 'non-db) - (servdat-uconn *db-serv-info*))))))) - (myconn (servdat-uconn *db-serv-info*))) + (myconn (servdat-uconn remdat))) (cond - ((not myconn) - (start-rmt:run) - (rmt:open-main-connection remdat apath)) ((and conn ;; conn is NOT a socket, just saying ... (< (current-seconds) (conndat-expires conn))) #t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died ((and conn (>= (current-seconds)(conndat-expires conn))) @@ -248,11 +231,10 @@ (let* ((srv-addr (server-address the-srv)) ;; need serv (ipaddr (alist-ref 'ipaddr the-srv)) (port (alist-ref 'port the-srv)) (srvkey (alist-ref 'servkey the-srv)) (fullpath (db:dbname->path apath dbname)) - (new-the-srv (make-conndat apath: apath dbname: dbname fullname: fullpath hostport: srv-addr @@ -359,22 +341,16 @@ ;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed ;; sometime in the future ;; (define (rmt:send-receive-real sinfo apath dbname cmd params) - (assert (not (eq? 'primordial (thread-name (current-thread)))) "FATAL: Do not call rmt:send-receive-real in the primodial thread.") (let* ((cdat (rmt:get-conn sinfo apath dbname))) (assert cdat "FATAL: rmt:send-receive-real called without the needed channels opened") (let* ((uconn (servdat-uconn sinfo)) ;; get the interface to ulex ;; then send-receive using the ulex layer to host-port stored in cdat - (res (send-receive uconn (conndat-hostport cdat) cmd params)) - #;(th1 (make-thread (lambda () - (set! res (send-receive uconn (conndat-hostport cdat) cmd params))) - "send-receive thread"))) - ;; (thread-start! th1) - ;; (thread-join! th1) ;; gratuitious thread stuff is so that mailbox is not used in primordial thead - ;; since we accessed the server we can bump the expires time up + (res (send-receive uconn (conndat-hostport cdat) cmd params))) + ;; since we accessed the server we can bump the expires time up (conndat-expires-set! cdat (+ (current-seconds) (server:expiration-timeout) -2)) ;; two second margin for network time misalignments etc. res))) @@ -1648,10 +1624,11 @@ (define *rmt:run-mutex* (make-mutex)) (define *rmt:run-flag* #f) ;; Main entry point to start a server. was start-server (define (rmt:run hostn) + (assert (args:get-arg "-server") "FATAL: rmt:run called on non-server process") (mutex-lock! *rmt:run-mutex*) (if *rmt:run-flag* (begin (debug:print-warn 0 *default-log-port* "rmt:run already running.") (mutex-unlock! *rmt:run-mutex*)) @@ -1665,11 +1642,11 @@ (if (and *db-serv-info* (servdat-uconn *db-serv-info*)) (let* ((uconn (servdat-uconn *db-serv-info*))) (wait-and-close uconn)) (let* ((port (portlogger:open-run-close portlogger:find-port)) - (handler-proc (lambda (rem-host-port qrykey cmd params) ;; + (handler-proc (lambda (rem-host-port cmd params) ;; (set! *db-last-access* (current-seconds)) (assert (list? params) "FATAL: handler called with non-list params") (assert (args:get-arg "-server") "FATAL: handler called on non-server side. cmd="cmd", params="params) (debug:print 0 *default-log-port* "handler call: "cmd", params="params) (api:execute-requests *dbstruct-db* cmd params)))) @@ -2198,37 +2175,38 @@ (sexpr->string 'quit)))))))))) (define (rmt:get-reasonable-hostname) (let* ((inhost (or (args:get-arg "-server") "-"))) (if (equal? inhost "-") - (get-host-name) + (get-host-name) ;; (get-my-best-address) inhost))) ;; Call this to start the actual server ;; ;; all routes though here end in exit ... ;; ;; This is the point at which servers are started ;; (define (rmt:server-launch dbname) + (assert (args:get-arg "-server") "FATAL: rmt:server-launch called in non-server process.") (debug:print-info 0 *default-log-port* "Entered rmt:server-launch") (let* ((th2 (make-thread (lambda () (debug:print-info 0 *default-log-port* "Server run thread started") (rmt:run (rmt:get-reasonable-hostname))) "Server run")) (th3 (make-thread (lambda () (debug:print-info 0 *default-log-port* "Server monitor thread started") (if (args:get-arg "-server") (rmt:keep-running dbname))) - "Keep running"))) + "Keep running"))) (thread-start! th2) (thread-sleep! 0.252) ;; give the server time to settle before starting the keep-running monitor. (thread-start! th3) (set! *didsomething* #t) (thread-join! th2) - (thread-join! th3)) - #f) + (thread-join! th3) + #f)) ;; Generate a unique signature for this process, used at both client and ;; server side (define (rmt:mk-signature) (message-digest-string (md5-primitive) Index: tests/tests.scm ================================================================== --- tests/tests.scm +++ tests/tests.scm @@ -27,12 +27,12 @@ ulex ) (define test-work-dir (current-directory)) -(work-method 'mailbox) ;; threads, direct, mailbox -(return-method 'mailbox) ;; polling, mailbox, direct +;; (work-method 'mailbox) ;; threads, direct, mailbox +;; (return-method 'mailbox) ;; polling, mailbox, direct ;; given list of lists ;; ( ( msg expected param1 param2 ...) ;; ( ... ) ) ;; apply test to all Index: ulex-simple/ulex.scm ================================================================== --- ulex-simple/ulex.scm +++ ulex-simple/ulex.scm @@ -24,11 +24,12 @@ ;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity. ;; ;;====================================================================== (module ulex - ( + * + #;( ;; NOTE: looking for the handler proc - find the run-listener :) run-listener ;; (run-listener handler-proc [port]) => uconn @@ -60,10 +61,11 @@ chicken.time chicken.condition chicken.string chicken.sort chicken.pretty-print + ;; chicken.tcp address-info mailbox matchable ;; queues @@ -75,10 +77,12 @@ srfi-4 srfi-69 system-information tcp6 typed-records + ;; tcp-server + ) ;; udat struct, used by both caller and callee ;; instantiated as uconn by convention ;; @@ -97,27 +101,12 @@ (avail-cmboxes '()) ;; list of ( . ) for re-use ;; threads (numthreads 50) (cmd-thread #f) (work-queue-thread #f) - ) - -;; ;; struct for keeping track of others we are talking to -;; ;; -;; (defstruct pdat -;; (host-port #f) -;; (conns '()) ;; list of pcon structs, pop one off when calling the peer -;; ) -;; -;; ;; struct for peer connections, keep track of expiration etc. -;; ;; -;; (defstruct pcon -;; (inp #f) -;; (oup #f) -;; (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59) -;; (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes -;; ) + (num-threads-running 0) + ) ;;====================================================================== ;; listener ;;====================================================================== @@ -135,19 +124,22 @@ ;; (define (setup-listener uconn #!optional (port 4242)) (handle-exceptions exn (if (< port 65535) - (setup-listener uconn (+ port 1)) + (begin + (thread-sleep! 0.1) ;; I'm not sure this helps but give the OS some time to do it's thing + (print "ULEX INFO: skipping port already in use "port) + (setup-listener uconn (+ port 1))) #f) (connect-listener uconn port))) (define (connect-listener uconn port) ;; (tcp-listener-socket LISTENER)(socket-name so) ;; sockaddr-address, sockaddr-port, sockaddr->string (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]]) - (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname))) + (addr (get-host-name))) ;; (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname))) (udat-port-set! uconn port) (udat-host-port-set! uconn (conc addr":"port)) (udat-socket-set! uconn tlsn) uconn)) @@ -155,26 +147,27 @@ ;; it then returns control ;; (define (run-listener handler-proc #!optional (port-suggestion 4242)) (let* ((uconn (make-udat))) (udat-work-proc-set! uconn handler-proc) + (tcp-buffer-size 2048) (if (setup-listener uconn port-suggestion) - (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop")) - #;(th2 (make-thread (lambda ()(process-work-queue uconn)) "Ulex work queue processor"))) - (tcp-buffer-size 2048) - ;; (max-connections 2048) + (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))) (thread-start! th1) - #;(thread-start! th2) (udat-cmd-thread-set! uconn th1) - #;(udat-work-queue-thread-set! uconn th2) - (print "cmd loop and process workers started") + (print "cmd loop started") uconn) (assert #f "ERROR: run-listener called without proper setup.")))) (define (wait-and-close uconn) + (let loop () + (if (not (udat-cmd-thread uconn)) + (begin + (thread-sleep! 1) + (loop)))) (thread-join! (udat-cmd-thread uconn)) - (tcp-close (udat-socket uconn))) + #;(tcp-close (udat-socket uconn))) ;;====================================================================== ;; peers and connections ;;====================================================================== @@ -189,48 +182,33 @@ ;; ;; NOTE: see below for beginnings of code to allow re-use of tcp connections ;; - I believe (without substantial evidence) that re-using connections will ;; be beneficial ... ;; -(define (send udata host-port qrykey cmd params) +(define (send-receive udata host-port cmd params) (mutex-lock! *send-mutex*) (let* ((my-host-port (udat-host-port udata)) ;; remote will return to this - (isme #f #;(equal? host-port my-host-port)) ;; calling myself? + (isme (equal? host-port my-host-port)) ;; calling myself? ;; dat is a self-contained work block that can be sent or handled locally - (dat (list my-host-port qrykey cmd params))) + (dat (list my-host-port cmd params)) + (parts (string-split host-port ":")) + (host (car parts)) + (port (string->number (cadr parts)))) (if isme (ulex-handler udata dat) ;; no transmission needed - (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC? - exn - #f - (let-values (((inp oup)(tcp-connect host-port))) - (let ((res (if (and inp oup) - (begin - (serialize dat oup) - (deserialize inp)) ;; yes, we always want an ack - (begin - (print "ERROR: send called but no receiver has been setup. Please call setup first!") - #f)))) - (close-input-port inp) - (close-output-port oup) - (mutex-unlock! *send-mutex*) - res)))))) ;; res will always be 'ack - -;; send a request to the given host-port and register a mailbox in udata -;; wait for the mailbox data and return it -;; -(define (send-receive uconn host-port cmd data) - (cond - ((member cmd '(ping goodbye)) ;; these are immediate - (send uconn host-port 'ping cmd data)) - (else - (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse? - (qrykey (car cmbox)) - (mbox (cdr cmbox)) - (mbox-time (current-milliseconds)) - (sres (send uconn host-port qrykey cmd data))) ;; short res - sres)))) + (let-values (((inp oup)(tcp-connect host-port))) + (let ((res (if (and inp oup) + (begin + (serialize dat oup) + (deserialize inp)) ;; yes, we always want an ack + (begin + (print "ERROR: send called but no receiver has been setup. Please call setup first!") + #f)))) + (close-input-port inp) + (close-output-port oup) + (mutex-unlock! *send-mutex*) + res))))) ;;====================================================================== ;; responder side ;;====================================================================== @@ -238,57 +216,54 @@ ;; ;; Reserved cmds; ack ping goodbye response ;; (define (ulex-handler uconn rdat) (assert (list? rdat) "FATAL: ulex-handler give rdat as not list") - (match rdat ;; (string-split controldat) - ((rem-host-port qrykey cmd params) - ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params) - (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) - (case cmd - ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack) - ((ping) - ;; (print "Got Ping!") - ;; (add-to-work-queue uconn rdat) - 'ack) - (else - (do-work uconn rdat))))) + (match rdat + ((rem-host-port cmd params) + (do-work uconn rdat)) (else (print "BAD DATA? controldat=" rdat) - 'ack) ;; send ack anyway? + 'bad-data) )) + +;; given an already set up uconn start the cmd-loop +;; +#;(define (ulex-cmd-loop uconn) + (let* ((serv-listener (udat-socket uconn)) + (server (make-tcp-server + serv-listener + (lambda () + (let* ((rdat (deserialize)) ;; '(my-host-port qrykey cmd params) + (resp (ulex-handler uconn rdat))) + (if resp + (serialize resp) + (write resp))))))) + (server))) ;; given an already set up uconn start the cmd-loop ;; (define (ulex-cmd-loop uconn) - (let* ((serv-listener (udat-socket uconn))) - (let loop ((state 'start)) - (let-values (((inp oup)(tcp-accept serv-listener))) - (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params) - (resp (ulex-handler uconn rdat))) - (if resp (serialize resp oup)) - (close-input-port inp) - (close-output-port oup)) - (loop state))))) -;;(define (ulex-cmd-loop uconn) -;; (let* ((serv-listener (udat-socket uconn)) -;; ;; (old-listener (lambda () -;; ;; (let loop ((state 'start)) -;; ;; (let-values (((inp oup)(tcp-accept serv-listener))) -;; ;; (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params) -;; ;; (resp (ulex-handler uconn rdat))) -;; ;; (if resp (serialize resp oup)) -;; ;; (close-input-port inp) -;; ;; (close-output-port oup)) -;; ;; (loop state))))) -;; (server (make-tcp-server -;; serv-listener -;; (lambda () -;; (let* ((rdat (deserialize )) ;; '(my-host-port qrykey cmd params) -;; (resp (ulex-handler uconn rdat))) -;; (if resp (serialize resp) resp)))))) -;; (server))) + (let* ((serv-listener (udat-socket uconn)) + (listener (lambda () + (let loop ((state 'start)) + (let-values (((inp oup)(tcp-accept serv-listener))) + (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params) + (resp (ulex-handler uconn rdat))) + (serialize resp oup) + (close-input-port inp) + (close-output-port oup)) + (loop state)))))) + ;; start N of them + (let loop ((thnum 0) + (threads '())) + (if (< thnum 100) + (let* ((th (make-thread listener (conc "listener" thnum)))) + (thread-start! th) + (loop (+ thnum 1) + (cons th threads))) + (map thread-join! threads))))) ;; add a proc to the cmd list, these are done symetrically (i.e. in all instances) ;; so that the proc can be dereferenced remotely ;; (define (set-work-handler uconn proc) @@ -306,72 +281,20 @@ (define (do-work uconn rdat) (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change ;; put this following into a do-work procedure (match rdat - ((rem-host-port qrykey cmd params) + ((rem-host-port cmd params) (let* ((start-time (current-milliseconds)) - (result (proc rem-host-port qrykey cmd params)) + (result (proc rem-host-port cmd params)) (end-time (current-milliseconds)) (run-time (- end-time start-time))) result)) (else (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params") #f)))) - -(define (process-work-queue uconn) - (let ((wqueue (udat-work-queue uconn)) - (proc (udat-work-proc uconn)) - (numthr (udat-numthreads uconn))) - (let loop ((thnum 1) - (threads '())) - (let ((thlst (cons (make-thread (lambda () - (let work-loop () - (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT))) - (do-work uconn rdat)) - (work-loop))) - (conc "work thread " thnum)) - threads))) - (if (< thnum numthr) - (loop (+ thnum 1) - thlst) - (begin - (print "ULEX: Starting "(length thlst)" worker threads.") - (map thread-start! thlst) - (print "ULEX: Threads started. Joining all.") - (map thread-join! thlst))))))) - -;; below was to enable re-use of connections. This seems non-trivial so for -;; now lets open on each call -;; -;; ;; given host-port get or create peer struct -;; ;; -;; (define (udat-get-peer uconn host-port) -;; (or (hash-table-ref/default (udat-peers uconn) host-port #f) -;; ;; no peer, so create pdat and init it -;; -;; ;; NEED stack of connections, pop and use; inp, oup, -;; ;; creation_time (remove and create new if over 24hrs old -;; ;; -;; (let ((pdat (make-pdat host-port: host-port))) -;; (hash-table-set! (udat-peers uconn) host-port pdat) -;; pdat))) -;; -;; ;; is pcon alive -;; -;; ;; given host-port and pdat get a pcon -;; ;; -;; (define (pdat-get-pcon pdat host-port) -;; (let loop ((conns (pdat-conns pdat))) -;; (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later -;; (init-pcon (make-pcon)) -;; (let* ((conn (pop conns))) -;; -;; ;; given host-port get a pcon struct -;; ;; -;; (define (udat-get-pcon - + ;;====================================================================== ;; misc utils ;;====================================================================== (define (make-cookie uconn) @@ -432,9 +355,9 @@ (sort (get-all-ips) ip-pref-less?)) (define (get-all-ips) (map address-info-host (filter (lambda (x) - (equal? (address-info-type x) "tcp")) + (equal? (address-info-type x) 'tcp)) (address-infos (get-host-name))))) ) Index: ulex.scm ================================================================== --- ulex.scm +++ ulex.scm @@ -18,7 +18,7 @@ ;;====================================================================== (declare (unit ulex)) -(include "ulex/ulex.scm") -;; (include "ulex-simple/ulex.scm") +;; (include "ulex/ulex.scm") +(include "ulex-simple/ulex.scm")