Index: ulex-simple/dbmgr.scm ================================================================== --- ulex-simple/dbmgr.scm +++ ulex-simple/dbmgr.scm @@ -158,12 +158,16 @@ ;; TODO: This is unnecessarily re-creating the record in the hash table ;; (define (rmt:open-main-connection remdat apath) (let* ((fullpath (db:dbname->path apath ".db/main.db")) (conns (servdat-conns remdat)) - (conn (rmt:get-conn remdat apath ".db/main.db"))) ;; (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this + (conn (rmt:get-conn remdat apath ".db/main.db")) ;; (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this + (myconn (servdat-uconn remdat))) (cond + ((not (listener-running?)) + (servdat-uconn-set! remdat (make-udat)) + (rmt:open-main-connection remdat apath)) ((and conn ;; conn is NOT a socket, just saying ... (< (current-seconds) (conndat-expires conn))) #t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died ((and conn (>= (current-seconds)(conndat-expires conn))) @@ -233,11 +237,11 @@ #t) ;; good to go ((not mconn) ;; no channel open to main? open it... (rmt:open-main-connection sinfo apath) (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1))) ((not dconn) ;; no channel open to dbname? - (let* ((res (rmt:send-receive-real sinfo apath mdbname 'get-server `(,apath ,dbname)))) + (let* ((res (rmt:send-receive sinfo apath mdbname 'get-server `(,apath ,dbname)))) (case res ((server-started) (if (> num-tries 0) (begin (thread-sleep! 2) @@ -289,27 +293,23 @@ ;; (define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) (let* ((apath *toppath*) (sinfo *db-serv-info*) (dbname (db:run-id->dbname rid))) - (if *localmode* - (api:execute-requests *dbstruct* cmd params) - (begin - (rmt:open-main-connection sinfo apath) - (if rid (rmt:general-open-connection sinfo apath dbname)) - #;(if (not (member cmd '(log-to-main))) - (debug:print-info 0 *default-log-port* "rmt:send-receive "cmd" params="params)) - (assert (not (eq? 'primordial (thread-name (current-thread)))) "FATAL: Do not call rmt:send-receive-real in the primodial thread.") - (let* ((cdat (rmt:get-conn sinfo apath dbname))) - (assert cdat "FATAL: rmt:send-receive-real called without the needed channels opened") - (let* ((uconn (servdat-uconn sinfo)) ;; get the interface to ulex - ;; then send-receive using the ulex layer to host-port stored in cdat - (res (send-receive uconn (conndat-hostport cdat) cmd params))) - (conndat-expires-set! cdat (+ (current-seconds) - (server:expiration-timeout) - -2)) ;; two second margin for network time misalignments etc. - res))) + (rmt:open-main-connection sinfo apath) + (if rid (rmt:general-open-connection sinfo apath dbname)) + ;; (if (not (member cmd '(log-to-main))) + ;; (debug:print-info 0 *default-log-port* "rmt:send-receive "cmd" params="params)) + (let* ((cdat (rmt:get-conn sinfo apath dbname))) + (assert cdat "FATAL: rmt:send-receive-real called without the needed channels opened") + (let* ((uconn (servdat-uconn sinfo)) ;; get the interface to ulex + ;; then send-receive using the ulex layer to host-port stored in cdat + (res (send-receive uconn (conndat-hostport cdat) cmd params))) + (conndat-expires-set! cdat (+ (current-seconds) + (server:expiration-timeout) + -2)) ;; two second margin for network time misalignments etc. + res)))) ;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed ;; sometime in the future. ;; ;; Purpose - call the main.db server and request a server be started @@ -408,10 +408,14 @@ (args:get-arg "-server")) (define *rmt:run-mutex* (make-mutex)) (define *rmt:run-flag* #f) +(define (listener-running?) + (and *db-serv-info* + (servdat-uconn *db-serv-info*))) + ;; Main entry point to start a server. was start-server (define (rmt:run hostn) (mutex-lock! *rmt:run-mutex*) (if *rmt:run-flag* (begin @@ -422,12 +426,11 @@ (mutex-unlock! *rmt:run-mutex*) ;; ;; Configurations for server ;; (tcp-buffer-size 2048) ;; (max-connections 2048) (debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...") - (if (and *db-serv-info* - (servdat-uconn *db-serv-info*)) + (if (listener-running?) (let* ((uconn (servdat-uconn *db-serv-info*))) (wait-and-close uconn)) (let* ((port (portlogger:open-run-close portlogger:find-port)) (handler-proc (lambda (rem-host-port qrykey cmd params) ;; (set! *db-last-access* (current-seconds)) @@ -709,11 +712,11 @@ sdat)))))))) (define (rmt:register-server sinfo apath iface port server-key dbname) (servdat-conns sinfo) ;; just checking types (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db - (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath + (rmt:send-receive sinfo apath ;; params: host port servkey pid ipaddr dbpath (db:run-id->dbname #f) 'register-server `(,iface ,port ,server-key ,(current-process-id) @@ -722,20 +725,20 @@ ,dbname))) (define (rmt:get-count-servers sinfo apath) (servdat-conns sinfo) ;; just checking types (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db - (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath + (rmt:send-receive sinfo apath ;; params: host port servkey pid ipaddr dbpath (db:run-id->dbname #f) 'get-count-servers `(,apath))) (define (rmt:get-servers-info apath) (rmt:send-receive 'get-servers-info #f `(,apath))) (define (rmt:deregister-server db-serv-info apath iface port server-key dbname) (rmt:open-main-connection db-serv-info apath) ;; we need a channel to main.db - (rmt:send-receive-real db-serv-info apath ;; params: host port servkey pid ipaddr dbpath + (rmt:send-receive db-serv-info apath ;; params: host port servkey pid ipaddr dbpath (db:run-id->dbname #f) 'deregister-server `(,iface ,port ,server-key ,(current-process-id) @@ -850,24 +853,10 @@ (debug:print 0 *default-log-port* "We are not the server for "dbname", exiting. Server info is: "serv-info) (exit))))))) (debug:print 0 *default-log-port* "SERVER: running, db "dbname" opened, megatest version: " (common:get-full-version)) - ;; start the watchdog - - ;; is this really needed? - - #;(if watchdog - (if (not (member (thread-state watchdog) - '(ready running blocked - sleeping dead))) - (begin - (debug:print-info 0 *default-log-port* "Starting watchdog thread (in state "(thread-state watchdog)")") - (thread-start! watchdog)) - (debug:print-info 0 *default-log-port* "Not starting watchdog thread (in state "(thread-state watchdog)")")) - (debug:print 0 *default-log-port* "ERROR: *watchdog* not setup, cannot start it.")) - #;(loop (+ count 1) bad-sync-count start-time) )) (db:sync-inmem->disk *dbstruct-db* *toppath* dbname force-sync: #t) (mutex-unlock! *heartbeat-mutex*) @@ -934,11 +923,12 @@ (rmt:run (rmt:get-reasonable-hostname))) "Server run")) (th3 (make-thread (lambda () (debug:print-info 0 *default-log-port* "Server monitor thread started") (if (args:get-arg "-server") - (rmt:keep-running dbname))) + (rmt:keep-running dbname) + #;(rmt:wait-for-stable-interface))) "Keep running"))) (thread-start! th2) (thread-sleep! 0.252) ;; give the server time to settle before starting the keep-running monitor. (thread-start! th3) (set! *didsomething* #t) @@ -957,145 +947,7 @@ (rmt:send-receive 'start-server #f (list run-id))) (define (rmt:server-info apath dbname) (rmt:send-receive 'get-server-info #f (list apath dbname))) -;;====================================================================== -;; Nanomsg transport -;;====================================================================== - -#;(define (is-port-in-use port-num) - (let* ((ret #f)) - (let-values (((inp oup pid) - (process "netstat" (list "-tulpn" )))) - (let loop ((inl (read-line inp))) - (if (not (eof-object? inl)) - (begin - (if (string-search (regexp (conc ":" port-num)) inl) - (begin - ;(print "Output: " inl) - (set! ret #t)) - (loop (read-line inp))))))) - ret)) - -#;(define (open-nn-connection host-port) - (let ((req (make-req-socket)) - (uri (conc "tcp://" host-port))) - (nng-dial req uri) - (socket-set! req 'nng/recvtimeo 2000) - req)) - -#;(define (send-receive-nn req msg) - (nng-send req msg) - (nng-recv req)) - -#;(define (close-nn-connection req) - (nng-close! req)) - -;; ;; open connection to server, send message, close connection -;; ;; -;; (define (open-send-close-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds -;; (let ((req (make-req-socket 'req)) -;; (uri (conc "tcp://" host-port)) -;; (res #f) -;; ;; (contacts (alist-ref 'contact attrib)) -;; ;; (mode (alist-ref 'mode attrib)) -;; ) -;; (socket-set! req 'nng/recvtimeo 2000) -;; (handle-exceptions -;; exn -;; (let ((emsg ((condition-property-accessor 'exn 'message) exn))) -;; ;; Send notification -;; (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\"" ) -;; #f) -;; (nng-dial req uri) -;; ;; (print "Connected to the server " ) -;; (nng-send req msg) -;; ;; (print "Request Sent") -;; (let* ((th1 (make-thread (lambda () -;; (let ((resp (nng-recv req))) -;; (nng-close! req) -;; (set! res (if (equal? resp "ok") -;; #t -;; #f)))) -;; "recv thread")) -;; (th2 (make-thread (lambda () -;; (thread-sleep! timeout) -;; (thread-terminate! th1)) -;; "timer thread"))) -;; (thread-start! th1) -;; (thread-start! th2) -;; (thread-join! th1) -;; res)))) -;; -#;(define (open-send-receive-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds - (let ((req (make-req-socket)) - (uri (conc "tcp://" host-port)) - (res #f)) - (handle-exceptions - exn - (let ((emsg ((condition-property-accessor 'exn 'message) exn))) - ;; Send notification - (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\", exn=" exn) - #f) - (nng-dial req uri) - (nng-send req msg) - (let* ((th1 (make-thread (lambda () - (let ((resp (nng-recv req))) - (nng-close! req) - ;; (print resp) - (set! res resp))) - "recv thread")) - (th2 (make-thread (lambda () - (thread-sleep! timeout) - (thread-terminate! th1)) - "timer thread"))) - (thread-start! th1) - (thread-start! th2) - (thread-join! th1) - res)))) - -;;====================================================================== -;; S E R V E R U T I L I T I E S -;;====================================================================== - -;; run ping in separate process, safest way in some cases -;; -#;(define (server:ping-server ifaceport) - (with-input-from-pipe - (conc (common:get-megatest-exe) " -ping " ifaceport) - (lambda () - (let loop ((inl (read-line)) - (res "NOREPLY")) - (if (eof-object? inl) - (case (string->symbol res) - ((NOREPLY) #f) - ((LOGIN_OK) #t) - (else #f)) - (loop (read-line) inl)))))) - -;; NOT USED (well, ok, reference in rpc-transport but otherwise not used). -;; -#;(define (server:login toppath) - (lambda (toppath) - (set! *db-last-access* (current-seconds)) ;; might not be needed. - (if (equal? *toppath* toppath) - #t - #f))) - -;; (define server:sync-lock-token "SERVER_SYNC_LOCK") -;; (define (server:release-sync-lock) -;; (db:no-sync-del! *no-sync-db* server:sync-lock-token)) -;; (define (server:have-sync-lock?) -;; (let* ((have-lock-pair (db:no-sync-get-lock *no-sync-db* server:sync-lock-token)) -;; (have-lock? (car have-lock-pair)) -;; (lock-time (cdr have-lock-pair)) -;; (lock-age (- (current-seconds) lock-time))) -;; (cond -;; (have-lock? #t) -;; ((>lock-age -;; (* 3 (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 180))) -;; (server:release-sync-lock) -;; (server:have-sync-lock?)) -;; (else #f)))) ) Index: ulex-simple/ulex.scm ================================================================== --- ulex-simple/ulex.scm +++ ulex-simple/ulex.scm @@ -66,10 +66,11 @@ chicken.time chicken.condition chicken.string chicken.sort chicken.pretty-print + chicken.tcp address-info mailbox matchable ;; queues @@ -80,11 +81,12 @@ srfi-1 srfi-18 srfi-4 srfi-69 system-information - tcp6 + ;; tcp6 + tcp-server typed-records ) ;; udat struct, used by both caller and callee ;; instantiated as uconn by convention @@ -107,463 +109,161 @@ (cmd-thread #f) (work-queue-thread #f) (num-threads-running 0) ) -;; == << ;; Parameters -;; == << -;; == << ;; work-method: -;; == << (define work-method (make-parameter 'mailbox)) -;; == << ;; mailbox - all rdat goes through mailbox -;; == << ;; threads - all rdat immediately executed in new thread -;; == << ;; direct - no queuing -;; == << ;; -;; == << -;; == << ;; return-method, return the result to waiting send-receive: -;; == << (define return-method (make-parameter 'mailbox)) -;; == << ;; mailbox - create a mailbox and use it for passing returning results to send-receive -;; == << ;; polling - put the result in a hash table keyed by qrykey and send-receive can poll it for result -;; == << ;; direct - no queuing, result is passed back in single tcp connection -;; == << ;; -;; == << -;; == << ;; ;; struct for keeping track of others we are talking to -;; == << ;; ;; -;; == << ;; (defstruct pdat -;; == << ;; (host-port #f) -;; == << ;; (conns '()) ;; list of pcon structs, pop one off when calling the peer -;; == << ;; ) -;; == << ;; -;; == << ;; ;; struct for peer connections, keep track of expiration etc. -;; == << ;; ;; -;; == << ;; (defstruct pcon -;; == << ;; (inp #f) -;; == << ;; (oup #f) -;; == << ;; (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59) -;; == << ;; (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes -;; == << ;; ) -;; == << -;; == << ;;====================================================================== -;; == << ;; listener -;; == << ;;====================================================================== -;; == << -;; == << ;; is uconn a ulex connector (listener) -;; == << ;; -;; == << (define (ulex-listener? uconn) -;; == << (udat? uconn)) -;; == << -;; == << ;; create a tcp listener and return a populated udat struct with -;; == << ;; my port, address, hostname, pid etc. -;; == << ;; return #f if fail to find a port to allocate. -;; == << ;; -;; == << ;; if udata-in is #f create the record -;; == << ;; if there is already a serv-listener return the udata -;; == << ;; -;; == << (define (setup-listener uconn #!optional (port 4242)) -;; == << (handle-exceptions -;; == << exn -;; == << (if (< port 65535) -;; == << (setup-listener uconn (+ port 1)) -;; == << #f) -;; == << (connect-listener uconn port))) -;; == << -;; == << (define (connect-listener uconn port) -;; == << ;; (tcp-listener-socket LISTENER)(socket-name so) -;; == << ;; sockaddr-address, sockaddr-port, sockaddr->string -;; == << (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]]) -;; == << (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname))) -;; == << (udat-port-set! uconn port) -;; == << (udat-host-port-set! uconn (conc addr":"port)) -;; == << (udat-socket-set! uconn tlsn) -;; == << uconn)) -;; == << -;; == << ;; run-listener does all the work of starting a listener in a thread -;; == << ;; it then returns control -;; == << ;; -;; == << (define (run-listener handler-proc #!optional (port-suggestion 4242)) -;; == << (let* ((uconn (make-udat))) -;; == << (udat-work-proc-set! uconn handler-proc) -;; == << (if (setup-listener uconn port-suggestion) -;; == << (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop")) -;; == << (th2 (make-thread (lambda () -;; == << (case (work-method) -;; == << ((mailbox limited) -;; == << (process-work-queue uconn)))) -;; == << "Ulex work queue processor"))) -;; == << ;; (tcp-buffer-size 2048) -;; == << (thread-start! th1) -;; == << (thread-start! th2) -;; == << (udat-cmd-thread-set! uconn th1) -;; == << (udat-work-queue-thread-set! uconn th2) -;; == << (print "cmd loop and process workers started, listening on "(udat-host-port uconn)".") -;; == << uconn) -;; == << (assert #f "ERROR: run-listener called without proper setup.")))) -;; == << -;; == << (define (wait-and-close uconn) -;; == << (thread-join! (udat-cmd-thread uconn)) -;; == << (tcp-close (udat-socket uconn))) -;; == << -;; == << ;;====================================================================== +;;====================================================================== +;; listener +;;====================================================================== + +;; is uconn a ulex connector (listener) +;; +(define (ulex-listener? uconn) + (udat? uconn)) + +;; create a tcp listener and return a populated udat struct with +;; my port, address, hostname, pid etc. +;; return #f if fail to find a port to allocate. +;; +;; if udata-in is #f create the record +;; if there is already a serv-listener return the udata +;; +(define (setup-listener uconn #!optional (port 4242)) + (handle-exceptions + exn + (if (< port 65535) + (setup-listener uconn (+ port 1)) + #f) + (connect-listener uconn port))) + +(define (connect-listener uconn port) + ;; (tcp-listener-socket LISTENER)(socket-name so) + ;; sockaddr-address, sockaddr-port, sockaddr->string + (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]]) + (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname))) + (udat-port-set! uconn port) + (udat-host-port-set! uconn (conc addr":"port)) + (udat-socket-set! uconn tlsn) + uconn)) + +;; run-listener does all the work of starting a listener in a thread +;; it then returns control +;; +(define (run-listener handler-proc #!optional (port-suggestion 4242)) + (let* ((uconn (make-udat))) + (udat-work-proc-set! uconn handler-proc) + (if (setup-listener uconn port-suggestion) + ((make-tcp-server + (udat-socket uconn) + (lambda () + (let* ((rdat (deserialize)) ;; '(my-host-port qrykey cmd params) + (resp (do-work uconn rdat))) + (serialize resp))))) + (assert #f "ERROR: run-listener called without proper setup.")))) + +(define (wait-and-close uconn) + (thread-join! (udat-cmd-thread uconn)) + (tcp-close (udat-socket uconn))) + +;;====================================================================== ;; == << ;; peers and connections ;; == << ;;====================================================================== -;; == << -;; == << (define *send-mutex* (make-mutex)) -;; == << -;; == << ;; send structured data to recipient -;; == << ;; -;; == << ;; NOTE: qrykey is what was called the "cookie" previously -;; == << ;; -;; == << ;; retval tells send to expect and wait for return data (one line) and return it or time out -;; == << ;; this is for ping where we don't want to necessarily have set up our own server yet. -;; == << ;; -;; == << ;; NOTE: see below for beginnings of code to allow re-use of tcp connections -;; == << ;; - I believe (without substantial evidence) that re-using connections will -;; == << ;; be beneficial ... -;; == << ;; -;; == << (define (send udata host-port qrykey cmd params) -;; == << (let* ((my-host-port (udat-host-port udata)) ;; remote will return to this -;; == << (isme #f #;(equal? host-port my-host-port)) ;; calling myself? -;; == << ;; dat is a self-contained work block that can be sent or handled locally -;; == << (dat (list my-host-port qrykey cmd params #;(cons (current-seconds)(current-milliseconds))))) -;; == << (cond -;; == << (isme (ulex-handler udata dat)) ;; no transmission needed -;; == << (else -;; == << (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC? -;; == << exn -;; == << (message exn) -;; == << (begin -;; == << ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP -;; == << (let-values (((inp oup)(tcp-connect host-port))) -;; == << (let ((res (if (and inp oup) -;; == << (begin -;; == << (serialize dat oup) -;; == << (close-output-port oup) -;; == << (deserialize inp) -;; == << ) -;; == << (begin -;; == << (print "ERROR: send called but no receiver has been setup. Please call setup first!") -;; == << #f)))) -;; == << (close-input-port inp) -;; == << ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP -;; == << res)))))))) ;; res will always be 'ack unless return-method is direct -;; == << -;; == << (define (send-via-polling uconn host-port cmd data) -;; == << (let* ((qrykey (make-cookie uconn)) -;; == << (sres (send uconn host-port qrykey cmd data))) -;; == << (case sres -;; == << ((ack) -;; == << (let loop ((start-time (current-milliseconds))) -;; == << (if (> (current-milliseconds)(+ start-time 10000)) ;; ten seconds timeout -;; == << (begin -;; == << (print "ULEX ERROR: timed out waiting for response from "host-port", "cmd" "data) -;; == << #f) -;; == << (let* ((result (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) ;; NOTE: we are re-using mboxes hash -;; == << (if result ;; result is '(status . result-data) or #f for nothing yet -;; == << (begin -;; == << (hash-table-delete! (udat-mboxes uconn) qrykey) -;; == << (cdr result)) -;; == << (begin -;; == << (thread-sleep! 0.01) -;; == << (loop start-time))))))) -;; == << (else -;; == << (print "ULEX ERROR: Communication failed? sres="sres) -;; == << #f)))) -;; == << -;; == << (define (send-via-mailbox uconn host-port cmd data) -;; == << (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse? -;; == << (qrykey (car cmbox)) -;; == << (mbox (cdr cmbox)) -;; == << (mbox-time (current-milliseconds)) -;; == << (sres (send uconn host-port qrykey cmd data))) ;; short res -;; == << (if (eq? sres 'ack) -;; == << (let* ((mbox-timeout-secs 120 #;(if (eq? 'primordial (thread-name (current-thread))) -;; == << #f -;; == << 120)) ;; timeout) -;; == << (mbox-timeout-result 'MBOX_TIMEOUT) -;; == << (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) -;; == << (mbox-receive-time (current-milliseconds))) -;; == << ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it? -;; == << (hash-table-delete! (udat-mboxes uconn) qrykey) -;; == << (if (eq? res 'MBOX_TIMEOUT) -;; == << (begin -;; == << (print "WARNING: mbox timed out for query "cmd", with data "data -;; == << ", waiting for response from "host-port".") -;; == << -;; == << ;; here it might make sense to clean up connection records and force clean start? -;; == << ;; NO. The progam using ulex needs to do the reset. Right thing here is exception -;; == << -;; == << #f) ;; convert to raising exception? -;; == << res)) -;; == << (begin -;; == << (print "ERROR: Communication failed? Got "sres) -;; == << #f)))) -;; == << -;; == << ;; send a request to the given host-port and register a mailbox in udata -;; == << ;; wait for the mailbox data and return it -;; == << ;; -;; == << (define (send-receive uconn host-port cmd data) -;; == << (let* ((start-time (current-milliseconds)) -;; == << (result (cond -;; == << ((member cmd '(ping goodbye)) ;; these are immediate -;; == << (send uconn host-port 'ping cmd data)) -;; == << ((eq? (work-method) 'direct) -;; == << ;; the result from send will be the actual result, not an 'ack -;; == << (send uconn host-port 'direct cmd data)) -;; == << (else -;; == << (case (return-method) -;; == << ((polling) -;; == << (send-via-polling uconn host-port cmd data)) -;; == << ((mailbox) -;; == << (send-via-mailbox uconn host-port cmd data)) -;; == << (else -;; == << (print "ULEX ERROR: unrecognised return-method "(return-method)".") -;; == << #f))))) -;; == << (duration (- (current-milliseconds) start-time))) -;; == << ;; this is ONLY for development and debugging. It will be removed once Ulex is stable. -;; == << (if (< 5000 duration) -;; == << (print "ULEX WARNING: round-trip took "(inexact->exact (round (/ duration 1000))) -;; == << " seconds; "cmd", host-port="host-port", data="data)) -;; == << result)) -;; == << -;; == << -;; == << ;;====================================================================== -;; == << ;; responder side -;; == << ;;====================================================================== -;; == << -;; == << ;; take a request, rdat, and if not immediate put it in the work queue -;; == << ;; -;; == << ;; Reserved cmds; ack ping goodbye response -;; == << ;; -;; == << (define (ulex-handler uconn rdat) -;; == << (assert (list? rdat) "FATAL: ulex-handler give rdat as not list") -;; == << (match rdat ;; (string-split controldat) -;; == << ((rem-host-port qrykey cmd params);; timedata) -;; == << ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params) -;; == << (case cmd -;; == << ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack) -;; == << ((ping) -;; == << ;; (print "Got Ping!") -;; == << ;; (add-to-work-queue uconn rdat) -;; == << 'ack) -;; == << ((goodbye) -;; == << ;; just clear out references to the caller. NOT COMPLETE -;; == << (add-to-work-queue uconn rdat) -;; == << 'ack) -;; == << ((response) ;; this is a result from remote processing, send it as mail ... -;; == << (case (return-method) -;; == << ((polling) -;; == << (hash-table-set! (udat-mboxes uconn) qrykey (cons 'ok params)) -;; == << 'ack) -;; == << ((mailbox) -;; == << (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) -;; == << (if mbox -;; == << (begin -;; == << (mailbox-send! mbox params) ;; params here is our result -;; == << 'ack) -;; == << (begin -;; == << (print "ERROR: received result but no associated mbox for cookie "qrykey) -;; == << 'no-mbox-found)))) -;; == << (else (print "ULEX ERROR: unrecognised return-method "(return-method)) -;; == << 'bad-return-method))) -;; == << (else ;; generic request - hand it to the work queue -;; == << (add-to-work-queue uconn rdat) -;; == << 'ack))) -;; == << (else -;; == << (print "ULEX ERROR: bad rdat "rdat) -;; == << 'bad-rdat))) -;; == << -;; == << ;; given an already set up uconn start the cmd-loop -;; == << ;; -;; == << (define (ulex-cmd-loop uconn) -;; == << (let* ((serv-listener (udat-socket uconn)) -;; == << (listener (lambda () -;; == << (let loop ((state 'start)) -;; == << (let-values (((inp oup)(tcp-accept serv-listener))) -;; == << ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP -;; == << (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params) -;; == << (resp (ulex-handler uconn rdat))) -;; == << (serialize resp oup) -;; == << (close-input-port inp) -;; == << (close-output-port oup) -;; == << ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP -;; == << ) -;; == << (loop state)))))) -;; == << ;; start N of them -;; == << (let loop ((thnum 0) -;; == << (threads '())) -;; == << (if (< thnum 100) -;; == << (let* ((th (make-thread listener (conc "listener" thnum)))) -;; == << (thread-start! th) -;; == << (loop (+ thnum 1) -;; == << (cons th threads))) -;; == << (map thread-join! threads))))) -;; == << -;; == << ;; add a proc to the cmd list, these are done symetrically (i.e. in all instances) -;; == << ;; so that the proc can be dereferenced remotely -;; == << ;; -;; == << (define (set-work-handler uconn proc) -;; == << (udat-work-proc-set! uconn proc)) -;; == << -;; == << ;;====================================================================== -;; == << ;; work queues - this is all happening on the listener side -;; == << ;;====================================================================== -;; == << -;; == << ;; rdat is (rem-host-port qrykey cmd params) -;; == << -;; == << (define (add-to-work-queue uconn rdat) -;; == << #;(queue-add! (udat-work-queue uconn) rdat) -;; == << (case (work-method) -;; == << ((threads) -;; == << (thread-start! (make-thread (lambda () -;; == << (do-work uconn rdat)) -;; == << "worker thread"))) -;; == << ((mailbox) -;; == << (mailbox-send! (udat-work-queue uconn) rdat)) -;; == << ((direct) -;; == << (do-work uconn rdat)) -;; == << (else -;; == << (print "ULEX ERROR: work-method "(work-method)" not recognised, using mailbox.") -;; == << (mailbox-send! (udat-work-queue uconn) rdat)))) -;; == << -;; == << ;; move the logic to return the result somewhere else? -;; == << ;; -;; == << (define (do-work uconn rdat) -;; == << (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change -;; == << ;; put this following into a do-work procedure -;; == << (match rdat -;; == << ((rem-host-port qrykey cmd params) -;; == << (let* ((start-time (current-milliseconds)) -;; == << (result (proc rem-host-port qrykey cmd params)) -;; == << (end-time (current-milliseconds)) -;; == << (run-time (- end-time start-time))) -;; == << (case (work-method) -;; == << ((direct) result) -;; == << (else -;; == << (print "ULEX: work "cmd", "params" done in "run-time" ms") -;; == << ;; send 'response as cmd and result as params -;; == << (send uconn rem-host-port qrykey 'response result) ;; could check for ack -;; == << (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time)))))) -;; == << (MBOX_TIMEOUT 'do-work-timeout) -;; == << (else -;; == << (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params"))))) -;; == << -;; == << ;; NEW APPROACH: -;; == << ;; -;; == << (define (process-work-queue uconn) -;; == << (let ((wqueue (udat-work-queue uconn)) -;; == << (proc (udat-work-proc uconn)) -;; == << (numthr (udat-numthreads uconn))) -;; == << (let loop ((thnum 1) -;; == << (threads '())) -;; == << (let ((thlst (cons (make-thread (lambda () -;; == << (let work-loop () -;; == << (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT))) -;; == << (do-work uconn rdat)) -;; == << (work-loop))) -;; == << (conc "work thread " thnum)) -;; == << threads))) -;; == << (if (< thnum numthr) -;; == << (loop (+ thnum 1) -;; == << thlst) -;; == << (begin -;; == << (print "ULEX: Starting "(length thlst)" worker threads.") -;; == << (map thread-start! thlst) -;; == << (print "ULEX: Threads started. Joining all.") -;; == << (map thread-join! thlst))))))) -;; == << -;; == << ;; below was to enable re-use of connections. This seems non-trivial so for -;; == << ;; now lets open on each call -;; == << ;; -;; == << ;; ;; given host-port get or create peer struct -;; == << ;; ;; -;; == << ;; (define (udat-get-peer uconn host-port) -;; == << ;; (or (hash-table-ref/default (udat-peers uconn) host-port #f) -;; == << ;; ;; no peer, so create pdat and init it -;; == << ;; -;; == << ;; ;; NEED stack of connections, pop and use; inp, oup, -;; == << ;; ;; creation_time (remove and create new if over 24hrs old -;; == << ;; ;; -;; == << ;; (let ((pdat (make-pdat host-port: host-port))) -;; == << ;; (hash-table-set! (udat-peers uconn) host-port pdat) -;; == << ;; pdat))) -;; == << ;; -;; == << ;; ;; is pcon alive -;; == << ;; -;; == << ;; ;; given host-port and pdat get a pcon -;; == << ;; ;; -;; == << ;; (define (pdat-get-pcon pdat host-port) -;; == << ;; (let loop ((conns (pdat-conns pdat))) -;; == << ;; (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later -;; == << ;; (init-pcon (make-pcon)) -;; == << ;; (let* ((conn (pop conns))) -;; == << ;; -;; == << ;; ;; given host-port get a pcon struct -;; == << ;; ;; -;; == << ;; (define (udat-get-pcon -;; == << -;; == << ;;====================================================================== -;; == << ;; misc utils -;; == << ;;====================================================================== -;; == << -;; == << (define (make-cookie uconn) -;; == << (let ((newcnum (+ (udat-cnum uconn) 1))) -;; == << (udat-cnum-set! uconn newcnum) -;; == << (conc (udat-host-port uconn) ":" -;; == << newcnum))) -;; == << -;; == << ;; cookie/mboxes -;; == << -;; == << ;; we store each mbox with a cookie ( . ) -;; == << ;; -;; == << (define (get-cmbox uconn) -;; == << (if (null? (udat-avail-cmboxes uconn)) -;; == << (let ((cookie (make-cookie uconn)) -;; == << (mbox (make-mailbox))) -;; == << (hash-table-set! (udat-mboxes uconn) cookie mbox) -;; == << `(,cookie . ,mbox)) -;; == << (let ((cmbox (car (udat-avail-cmboxes uconn)))) -;; == << (udat-avail-cmboxes-set! uconn (cdr (udat-avail-cmboxes uconn))) -;; == << cmbox))) -;; == << -;; == << (define (put-cmbox uconn cmbox) -;; == << (udat-avail-cmboxes-set! uconn (cons cmbox (udat-avail-cmboxes uconn)))) -;; == << -;; == << (define (pp-uconn uconn) -;; == << (pp (udat->alist uconn))) -;; == << -;; == << -;; == << ;;====================================================================== -;; == << ;; network utilities -;; == << ;;====================================================================== -;; == << -;; == << ;; NOTE: Look at address-info egg as alternative to some of this -;; == << -;; == << (define (rate-ip ipaddr) -;; == << (regex-case ipaddr -;; == << ( "^127\\..*" _ 0 ) -;; == << ( "^(10\\.0|192\\.168)\\..*" _ 1 ) -;; == << ( else 2 ) )) -;; == << -;; == << ;; Change this to bias for addresses with a reasonable broadcast value? -;; == << ;; -;; == << (define (ip-pref-less? a b) -;; == << (> (rate-ip a) (rate-ip b))) -;; == << -;; == << (define (get-my-best-address) -;; == << (let ((all-my-addresses (get-all-ips))) -;; == << (cond -;; == << ((null? all-my-addresses) -;; == << (get-host-name)) ;; no interfaces? -;; == << ((eq? (length all-my-addresses) 1) -;; == << (car all-my-addresses)) ;; only one to choose from, just go with it -;; == << (else -;; == << (car (sort all-my-addresses ip-pref-less?)))))) -;; == << -;; == << (define (get-all-ips-sorted) -;; == << (sort (get-all-ips) ip-pref-less?)) -;; == << -;; == << (define (get-all-ips) -;; == << (map address-info-host -;; == << (filter (lambda (x) -;; == << (equal? (address-info-type x) "tcp")) -;; == << (address-infos (get-host-name))))) -;; == << + +(define *send-mutex* (make-mutex)) + +;; send structured data to recipient +;; +;; NOTE: qrykey is what was called the "cookie" previously +;; +;; retval tells send to expect and wait for return data (one line) and return it or time out +;; this is for ping where we don't want to necessarily have set up our own server yet. +;; +(define (send-receive udata host-port cmd params) + (let* ((my-host-port (udat-host-port udata)) ;; remote will return to this + (isme (equal? host-port my-host-port)) ;; calling myself? + ;; dat is a self-contained work block that can be sent or handled locally + (dat (list my-host-port 'qrykey cmd params #;(cons (current-seconds)(current-milliseconds))))) + (cond + (isme (do-work udata dat)) ;; no transmission needed + (else + (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC? + exn + (message exn) + (begin + ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP + (let-values (((inp oup)(tcp-connect host-port))) + (let ((res (if (and inp oup) + (begin + (serialize dat oup) + (close-output-port oup) + (deserialize inp)) + (begin + (print "ERROR: send called but no receiver has been setup. Please call setup first!") + #f)))) + (close-input-port inp) + ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP + res)))))))) ;; res will always be 'ack unless return-method is direct + +;;====================================================================== +;; work queues - this is all happening on the listener side +;;====================================================================== + +;; move the logic to return the result somewhere else? +;; +(define (do-work uconn rdat) + (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change + ;; put this following into a do-work procedure + (match rdat + ((rem-host-port qrykey cmd params) + (let* ((start-time (current-milliseconds)) + (result (proc rem-host-port qrykey cmd params)) + (end-time (current-milliseconds)) + (run-time (- end-time start-time))) + result)) + (else + (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params"))))) + +;;====================================================================== +;; misc utils +;;====================================================================== + +(define (pp-uconn uconn) + (pp (udat->alist uconn))) + + +;;====================================================================== +;; network utilities +;;====================================================================== + +;; NOTE: Look at address-info egg as alternative to some of this + +(define (rate-ip ipaddr) + (regex-case ipaddr + ( "^127\\..*" _ 0 ) + ( "^(10\\.0|192\\.168)\\..*" _ 1 ) + ( else 2 ) )) + +;; Change this to bias for addresses with a reasonable broadcast value? +;; +(define (ip-pref-less? a b) + (> (rate-ip a) (rate-ip b))) + +(define (get-my-best-address) + (let ((all-my-addresses (get-all-ips))) + (cond + ((null? all-my-addresses) + (get-host-name)) ;; no interfaces? + ((eq? (length all-my-addresses) 1) + (car all-my-addresses)) ;; only one to choose from, just go with it + (else + (car (sort all-my-addresses ip-pref-less?)))))) + +(define (get-all-ips-sorted) + (sort (get-all-ips) ip-pref-less?)) + +(define (get-all-ips) + (map address-info-host + (filter (lambda (x) + (equal? (address-info-type x) "tcp")) + (address-infos (get-host-name))))) + )