Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -271,10 +271,17 @@ ;; NB// sinfo is a servdat struct ;; (define (rmt:general-open-connection sinfo apath dbname #!key (num-tries 5)) (assert (not (equal? dbname ".db/main.db")) "ERROR: general-open-connection should never be called with main as the db") + (let loop () + (if (not (and *db-serv-info* + (servdat-uconn *db-serv-info*))) + (begin + (debug:print-info 0 *default-log-port* "Waiting for my listener to be available...") + (thread-sleep! 1) + (loop)))) (let* ((mdbname ".db/main.db") ;; (db:run-id->dbname #f)) TODO: put this back to the lookup when stable (fullname (db:dbname->path apath dbname)) (conns (servdat-conns sinfo)) (mconn (rmt:get-conn sinfo apath ".db/main.db")) (dconn (rmt:get-conn sinfo apath dbname))) Index: ulex/ulex.scm ================================================================== --- ulex/ulex.scm +++ ulex/ulex.scm @@ -60,10 +60,11 @@ chicken.time chicken.condition chicken.string chicken.sort chicken.pretty-print + chicken.process-context.posix address-info mailbox matchable ;; queues @@ -99,25 +100,18 @@ (numthreads 50) (cmd-thread #f) (work-queue-thread #f) ) -;; ;; struct for keeping track of others we are talking to -;; ;; -;; (defstruct pdat -;; (host-port #f) -;; (conns '()) ;; list of pcon structs, pop one off when calling the peer -;; ) -;; -;; ;; struct for peer connections, keep track of expiration etc. -;; ;; -;; (defstruct pcon -;; (inp #f) -;; (oup #f) -;; (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59) -;; (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes -;; ) +;; short: immediately run the task and return the result +;; mailb: use mailbox to queue the tasks on the server side +;; send tasks to mailbox, send results to caller +;; thread: use immediately created threads to do the task, send +;; is called directly +;; +(define ulex-mode (make-parameter 'short)) + ;;====================================================================== ;; listener ;;====================================================================== @@ -210,40 +204,49 @@ #f)))) (close-input-port inp) (close-output-port oup) res)))))) ;; res will always be 'ack +(define (delta-print start-ms threshold . params) + (let* ((delta (- (current-milliseconds) start-ms))) + (if (>= delta threshold) + (apply print "ULEX: long wait "delta"ms. " params)))) + ;; send a request to the given host-port and register a mailbox in udata ;; wait for the mailbox data and return it ;; (define (send-receive uconn host-port cmd data) (cond ((member cmd '(ping goodbye)) ;; these are immediate (send uconn host-port 'ping cmd data)) - (else + ((member (ulex-mode) '(short)) + (send uconn host-port 'nokey cmd data)) + (else ;; 'full (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse? (qrykey (car cmbox)) (mbox (cdr cmbox)) (mbox-time (current-milliseconds)) (sres (send uconn host-port qrykey cmd data))) ;; short res - (if (eq? sres 'ack) - (let* ((mbox-timeout-secs 120 #;(if (eq? 'primordial (thread-name (current-thread))) - #f - 120)) ;; timeout) - (mbox-timeout-result 'MBOX_TIMEOUT) - (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) - (mbox-receive-time (current-milliseconds))) - ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it? - (hash-table-delete! (udat-mboxes uconn) qrykey) - (if (eq? res 'MBOX_TIMEOUT) - (begin - (print "WARNING: mbox timed out for query "cmd", with data "data) - #f) ;; convert to raising exception? - res)) - (begin - (print "ERROR: Communication failed? Got "sres) - #f)))))) ;; #f means failed to communicate + (cond + ((eq? sres 'ack) + (let* ((mbox-timeout-secs 120 #;(if (eq? 'primordial (thread-name (current-thread))) + #f + 120)) ;; timeout) + (mbox-timeout-result 'MBOX_TIMEOUT) + (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) + (mbox-receive-time (current-milliseconds))) + (delta-print mbox-time 6000 ", pid="(current-process-id)". send-receive mailbox received "res) + ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it? + (hash-table-delete! (udat-mboxes uconn) qrykey) + (if (eq? res 'MBOX_TIMEOUT) + (begin + (print "WARNING: mbox timed out for query "cmd", with data "data) + #f) ;; convert to raising exception? + res))) + (else + (print "ERROR: Communication failed? Got "sres) + #f)))))) ;; #f means failed to communicate ;;====================================================================== ;; responder side ;;====================================================================== @@ -254,37 +257,42 @@ (define (ulex-handler uconn rdat) (assert (list? rdat) "FATAL: ulex-handler give rdat as not list") (match rdat ;; (string-split controldat) ((rem-host-port qrykey cmd params) ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params) - (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) - (case cmd - ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack) - ((ping) - ;; (print "Got Ping!") - ;; (add-to-work-queue uconn rdat) - 'ack) - ((goodbye) - ;; just clear out references to the caller - (add-to-work-queue uconn rdat) - 'ack) - ((response) ;; this is a result from remote processing, send it as mail ... - (if mbox - (begin - (mailbox-send! mbox params) ;; params here is our result - 'ack) - (begin - (print "ERROR: received result but no associated mbox for cookie "qrykey) - #f))) - (else - ;; (print "Got generic request: "cmd) - (add-to-work-queue uconn rdat) - 'ack)))) - (else - (print "BAD DATA? controldat=" rdat) - 'ack) ;; send ack anyway? - )) + (case (ulex-mode) + ((short)(do-work uconn rdat)) + ((mailb) + (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) + (case cmd + ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack) + ((ping) + ;; (print "Got Ping!") + ;; (add-to-work-queue uconn rdat) + 'ack) + ((goodbye) + ;; just clear out references to the caller + (add-to-work-queue uconn rdat) + 'ack) + ((response) ;; this is a result from remote processing, send it as mail ... + (if mbox + (begin + (mailbox-send! mbox params) ;; params here is our result + 'ack) + (begin + (print "ERROR: received result but no associated mbox for cookie "qrykey) + #f))) + (else + (add-to-work-queue uconn rdat) + 'ack)))) + ((thread)(thread-start! + (make-thread (lambda () + (do-work uconn rdat))))) + (else + (print "BAD DATA? controldat=" rdat) + 'ack) ;; send ack anyway? + )))) ;; given an already set up uconn start the cmd-loop ;; (define (ulex-cmd-loop uconn) (let* ((serv-listener (udat-socket uconn))) @@ -309,11 +317,18 @@ ;; rdat is (rem-host-port qrykey cmd params) (define (add-to-work-queue uconn rdat) #;(queue-add! (udat-work-queue uconn) rdat) - (mailbox-send! (udat-work-queue uconn) rdat)) + (case (ulex-mode) + ((short) (assert #f "FATAL: Should never get here.")) + ((full) + (mailbox-send! (udat-work-queue uconn) rdat)) + ((thread) + (thread-start! + (make-thread + (lambda ()(do-work uconn rdat)) "do-work"))))) (define (do-work uconn rdat) (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change ;; put this following into a do-work procedure (match rdat @@ -322,25 +337,33 @@ (result (proc rem-host-port qrykey cmd params)) (end-time (current-milliseconds)) (run-time (- end-time start-time))) (print "ULEX: work "cmd", "params" done in "run-time" ms") ;; send 'response as cmd and result as params - (send uconn rem-host-port qrykey 'response result) ;; could check for ack - (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time)))) + (case (ulex-mode) + ((mailb thread) + (send uconn rem-host-port qrykey 'response result)) ;; could check for ack + ((short) result) + (else + (print "ULEX: error - ulex-mode unrecognised "(ulex-mode)))))) + ;; (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time)))) (MBOX_TIMEOUT #f) (else - (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params"))))) + (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params") + #f)))) (define (process-work-queue uconn) (let ((wqueue (udat-work-queue uconn)) (proc (udat-work-proc uconn)) (numthr (udat-numthreads uconn))) (let loop ((thnum 1) (threads '())) (let ((thlst (cons (make-thread (lambda () (let work-loop () - (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT))) + (let ((start-time (current-milliseconds)) + (rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT))) + (delta-print start-time 60000 ", pid="(current-process-id)" process-work-queue mailbox received "rdat) (do-work uconn rdat)) (work-loop))) (conc "work thread " thnum)) threads))) (if (< thnum numthr)