Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -165,12 +165,12 @@ ;; ;; ulex parameters ;; (work-method 'direct) ;; (return-method 'direct) ;; ulex parameters - (work-method 'mailbox) - (return-method 'mailbox) +;; (work-method 'mailbox) +;; (return-method 'mailbox) ;; fake out readline usage of toplevel-command (define (toplevel-command . a) #f) (define *didsomething* #f) (define *db* #f) ;; this is only for the repl, do not use in general!!!! Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -199,26 +199,16 @@ (define (rmt:open-main-connection remdat apath) (let* ((fullpath (db:dbname->path apath ".db/main.db")) (conns (servdat-conns remdat)) (conn (rmt:get-conn remdat apath ".db/main.db")) ;; (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this (start-rmt:run (lambda () - (let* ((th1 (make-thread (lambda ()(rmt:run (get-host-name))) "non-db mode server"))) - (thread-start! th1) - (thread-sleep! 1) - (let loop ((count 0)) - (assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection") - (if (or (not *db-serv-info*) - (not (servdat-uconn *db-serv-info*))) - (begin - (thread-sleep! 1) - (loop (+ count 1))) - (begin - (servdat-mode-set! *db-serv-info* 'non-db) - (servdat-uconn *db-serv-info*))))))) + (set! *db-serv-info* (make-servdat host: (get-host-name))) + (servdat-mode-set! *db-serv-info* 'non-db) + (servdat-uconn-set! *db-serv-info* (make-udat)))) (myconn (servdat-uconn *db-serv-info*))) (cond - ((not myconn) + ((not *db-serv-info*) ;; myconn) (start-rmt:run) (rmt:open-main-connection remdat apath)) ((and conn ;; conn is NOT a socket, just saying ... (< (current-seconds) (conndat-expires conn))) #t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died @@ -2199,37 +2189,38 @@ (sexpr->string 'quit)))))))))) (define (rmt:get-reasonable-hostname) (let* ((inhost (or (args:get-arg "-server") "-"))) (if (equal? inhost "-") - (get-host-name) + (get-host-name) ;; (get-my-best-address) inhost))) ;; Call this to start the actual server ;; ;; all routes though here end in exit ... ;; ;; This is the point at which servers are started ;; (define (rmt:server-launch dbname) + (assert (args:get-arg "-server") "FATAL: rmt:server-launch called in non-server process.") (debug:print-info 0 *default-log-port* "Entered rmt:server-launch") (let* ((th2 (make-thread (lambda () (debug:print-info 0 *default-log-port* "Server run thread started") (rmt:run (rmt:get-reasonable-hostname))) "Server run")) (th3 (make-thread (lambda () (debug:print-info 0 *default-log-port* "Server monitor thread started") (if (args:get-arg "-server") (rmt:keep-running dbname))) - "Keep running"))) + "Keep running"))) (thread-start! th2) (thread-sleep! 0.252) ;; give the server time to settle before starting the keep-running monitor. (thread-start! th3) (set! *didsomething* #t) (thread-join! th2) - (thread-join! th3)) - #f) + (thread-join! th3) + #f)) ;; Generate a unique signature for this process, used at both client and ;; server side (define (rmt:mk-signature) (message-digest-string (md5-primitive) Index: tests/tests.scm ================================================================== --- tests/tests.scm +++ tests/tests.scm @@ -27,12 +27,12 @@ ulex ) (define test-work-dir (current-directory)) -(work-method 'mailbox) ;; threads, direct, mailbox -(return-method 'mailbox) ;; polling, mailbox, direct +;; (work-method 'mailbox) ;; threads, direct, mailbox +;; (return-method 'mailbox) ;; polling, mailbox, direct ;; given list of lists ;; ( ( msg expected param1 param2 ...) ;; ( ... ) ) ;; apply test to all Index: ulex-simple/ulex.scm ================================================================== --- ulex-simple/ulex.scm +++ ulex-simple/ulex.scm @@ -24,11 +24,12 @@ ;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity. ;; ;;====================================================================== (module ulex - ( + * + #;( ;; NOTE: looking for the handler proc - find the run-listener :) run-listener ;; (run-listener handler-proc [port]) => uconn @@ -60,10 +61,11 @@ chicken.time chicken.condition chicken.string chicken.sort chicken.pretty-print + chicken.tcp address-info mailbox matchable ;; queues @@ -73,12 +75,14 @@ srfi-1 srfi-18 srfi-4 srfi-69 system-information - tcp6 + ;; tcp6 typed-records + tcp-server + ) ;; udat struct, used by both caller and callee ;; instantiated as uconn by convention ;; @@ -135,19 +139,22 @@ ;; (define (setup-listener uconn #!optional (port 4242)) (handle-exceptions exn (if (< port 65535) - (setup-listener uconn (+ port 1)) + (begin + (thread-sleep! 0.1) ;; I'm not sure this helps but give the OS some time to do it's thing + (print "ULEX INFO: skipping port already in use "port) + (setup-listener uconn (+ port 1))) #f) (connect-listener uconn port))) (define (connect-listener uconn port) ;; (tcp-listener-socket LISTENER)(socket-name so) ;; sockaddr-address, sockaddr-port, sockaddr->string (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]]) - (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname))) + (addr (get-host-name))) ;; (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname))) (udat-port-set! uconn port) (udat-host-port-set! uconn (conc addr":"port)) (udat-socket-set! uconn tlsn) uconn)) @@ -155,20 +162,16 @@ ;; it then returns control ;; (define (run-listener handler-proc #!optional (port-suggestion 4242)) (let* ((uconn (make-udat))) (udat-work-proc-set! uconn handler-proc) + (tcp-buffer-size 2048) (if (setup-listener uconn port-suggestion) - (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop")) - #;(th2 (make-thread (lambda ()(process-work-queue uconn)) "Ulex work queue processor"))) - (tcp-buffer-size 2048) - ;; (max-connections 2048) + (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))) (thread-start! th1) - #;(thread-start! th2) (udat-cmd-thread-set! uconn th1) - #;(udat-work-queue-thread-set! uconn th2) - (print "cmd loop and process workers started") + (print "cmd loop started") uconn) (assert #f "ERROR: run-listener called without proper setup.")))) (define (wait-and-close uconn) (thread-join! (udat-cmd-thread uconn)) @@ -192,19 +195,22 @@ ;; be beneficial ... ;; (define (send udata host-port qrykey cmd params) (mutex-lock! *send-mutex*) (let* ((my-host-port (udat-host-port udata)) ;; remote will return to this - (isme #f #;(equal? host-port my-host-port)) ;; calling myself? + (isme (equal? host-port my-host-port)) ;; calling myself? ;; dat is a self-contained work block that can be sent or handled locally - (dat (list my-host-port qrykey cmd params))) + (dat (list my-host-port qrykey cmd params)) + (parts (string-split host-port ":")) + (host (car parts)) + (port (string->number (cadr parts)))) (if isme (ulex-handler udata dat) ;; no transmission needed - (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC? - exn - #f - (let-values (((inp oup)(tcp-connect host-port))) + ;; (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC? + ;; exn + ;; #f + (let-values (((inp oup)(tcp-connect host port))) (let ((res (if (and inp oup) (begin (serialize dat oup) (deserialize inp)) ;; yes, we always want an ack (begin @@ -211,17 +217,18 @@ (print "ERROR: send called but no receiver has been setup. Please call setup first!") #f)))) (close-input-port inp) (close-output-port oup) (mutex-unlock! *send-mutex*) - res)))))) ;; res will always be 'ack + res))))) ;; res will always be 'ack ;; send a request to the given host-port and register a mailbox in udata ;; wait for the mailbox data and return it ;; (define (send-receive uconn host-port cmd data) - (cond + (send uconn host-port 'qrykey cmd data) + #;(cond ((member cmd '(ping goodbye)) ;; these are immediate (send uconn host-port 'ping cmd data)) (else (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse? (qrykey (car cmbox)) @@ -258,37 +265,24 @@ )) ;; given an already set up uconn start the cmd-loop ;; (define (ulex-cmd-loop uconn) - (let* ((serv-listener (udat-socket uconn))) - (let loop ((state 'start)) - (let-values (((inp oup)(tcp-accept serv-listener))) - (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params) - (resp (ulex-handler uconn rdat))) - (if resp (serialize resp oup)) - (close-input-port inp) - (close-output-port oup)) - (loop state))))) -;;(define (ulex-cmd-loop uconn) -;; (let* ((serv-listener (udat-socket uconn)) -;; ;; (old-listener (lambda () -;; ;; (let loop ((state 'start)) -;; ;; (let-values (((inp oup)(tcp-accept serv-listener))) -;; ;; (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params) -;; ;; (resp (ulex-handler uconn rdat))) -;; ;; (if resp (serialize resp oup)) -;; ;; (close-input-port inp) -;; ;; (close-output-port oup)) -;; ;; (loop state))))) -;; (server (make-tcp-server -;; serv-listener -;; (lambda () -;; (let* ((rdat (deserialize )) ;; '(my-host-port qrykey cmd params) -;; (resp (ulex-handler uconn rdat))) -;; (if resp (serialize resp) resp)))))) -;; (server))) + (let* ((serv-listener (udat-socket uconn)) + (server (make-tcp-server + serv-listener + (lambda () + (let* ((rdat (read)#;(deserialize)) ;; '(my-host-port qrykey cmd params) + (resp #;(ulex-handler uconn rdat) + (do-work uconn rdat))) + (if resp + #;(serialize resp) + (write resp) + (begin + (print "ULEX ERROR: communication error in ulex-cmd-loop.") + resp))))))) + (server))) ;; add a proc to the cmd list, these are done symetrically (i.e. in all instances) ;; so that the proc can be dereferenced remotely ;; (define (set-work-handler uconn proc) @@ -316,32 +310,10 @@ result)) (else (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params") #f)))) -(define (process-work-queue uconn) - (let ((wqueue (udat-work-queue uconn)) - (proc (udat-work-proc uconn)) - (numthr (udat-numthreads uconn))) - (let loop ((thnum 1) - (threads '())) - (let ((thlst (cons (make-thread (lambda () - (let work-loop () - (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT))) - (do-work uconn rdat)) - (work-loop))) - (conc "work thread " thnum)) - threads))) - (if (< thnum numthr) - (loop (+ thnum 1) - thlst) - (begin - (print "ULEX: Starting "(length thlst)" worker threads.") - (map thread-start! thlst) - (print "ULEX: Threads started. Joining all.") - (map thread-join! thlst))))))) - ;; below was to enable re-use of connections. This seems non-trivial so for ;; now lets open on each call ;; ;; ;; given host-port get or create peer struct ;; ;; @@ -432,9 +404,9 @@ (sort (get-all-ips) ip-pref-less?)) (define (get-all-ips) (map address-info-host (filter (lambda (x) - (equal? (address-info-type x) "tcp")) + (equal? (address-info-type x) 'tcp)) (address-infos (get-host-name))))) )