Index: apimod.scm ================================================================== --- apimod.scm +++ apimod.scm @@ -175,12 +175,13 @@ (seconds->year-work-week/day-time-fname (current-seconds)) "-"cleandbname".log")) (logf2 (conc logd "/server-" (seconds->year-work-week/day-time-fname (current-seconds)) "-"cleandbname"-")) - (cmd (conc "nbfake megatest -server - -area "apath - " -db "dbname" -autolog "logf2))) + (cmd (conc "nbfake megatest -server - -area "apath" -db "dbname) + ;; " -autolog "logf2 ;; the side log did not help. Ended up with two logs and the pid in the name was not that useful. + )) (if (not (directory-exists? logd)) (create-directory logd #t)) (system (conc "NBFAKE_LOG="logf" "cmd)))) ;; special function to get server Index: dashboard.scm ================================================================== --- dashboard.scm +++ dashboard.scm @@ -3642,11 +3642,11 @@ tabdat (hash-table-ref/default (dboard:tabdat-searchpatts tabdat) "runname" "%") (dboard:tabdat-numruns tabdat) (hash-table-ref/default (dboard:tabdat-searchpatts tabdat) "test-name" "%/%") ;; generate key patterns from the target stored in tabdat - (let* ((dbkeys (dboard:tabdat-dbkeys tabdat))) + (let* ((dbkeys (dboard:tabdat-dbkeys tabdat))) (let ((fres (if (dboard:tabdat-target tabdat) (let ((ptparts (append (dboard:tabdat-target tabdat)(make-list (length dbkeys) "%")))) (map (lambda (k v)(list k v)) dbkeys ptparts)) (let ((res '())) (for-each (lambda (key) Index: dbmod.scm ================================================================== --- dbmod.scm +++ dbmod.scm @@ -693,11 +693,13 @@ ;; NOTE: touched logic is disabled/not done ;; sync run to disk if touched ;; (define (db:sync-inmem->disk dbstruct apath dbfile #!key (force-sync #f)) - #f) ;; disabled + (if #f + (debug:print-info 0 *default-log-port* "syncing "*toppath*" "dbfile" at "(current-seconds)) + #f)) ;; disabled ;; (let* ((dbdat (db:get-dbdat dbstruct apath dbfile)) ;; (dbfullname (conc apath "/" dbfile)) ;; (db (db:open-run-db dbfullname db:initialize-db)) ;; (dbr:dbdat-db dbdat)) ;; (inmem (dbr:dbdat-inmem dbdat)) ;; (start-t (current-seconds)) Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -2138,11 +2138,10 @@ (debug:print-info 0 *default-log-port* "Not starting watchdog thread (in state "(thread-state watchdog)")")) (debug:print 0 *default-log-port* "ERROR: *watchdog* not setup, cannot start it.")) #;(loop (+ count 1) bad-sync-count start-time) )) - (debug:print-info 0 *default-log-port* "syncing "*toppath*" "dbname" at "(current-seconds)) (db:sync-inmem->disk *dbstruct-db* *toppath* dbname force-sync: #t) (mutex-unlock! *heartbeat-mutex*) ;; when things go wrong we don't want to be doing the various Index: ulex-trials/Makefile ================================================================== --- ulex-trials/Makefile +++ ulex-trials/Makefile @@ -1,8 +1,8 @@ -ulex-test : ulex-test.scm +ulex-test : ulex-test.scm ../ulex/ulex.scm csc ulex-test.scm test : ulex-test - ./ulex-test do-test + for x in $$(seq 9);do export NBFAKE_LOG=NBFAKE_$$x;sleep 1;nbfake ./ulex-test run 828$$x;echo $$cmd;$$cmd;done clean : rm -f .runners/* NBFAKE* Index: ulex-trials/ulex-test.scm ================================================================== --- ulex-trials/ulex-test.scm +++ ulex-trials/ulex-test.scm @@ -24,74 +24,16 @@ ulex ) (define help "Usage: ulex-test COMMAND where COMMAND is one of: - do-test : run the basic req/rep test - run tcp://host:port : start test server - start several in same dir + run host:port : start test server - start several in same dir ") -(define address-tcp-1 "tcp://localhost:5555") -(define address-tcp-2 "tcp://localhost:6666") - -(define address-inproc-1 "inproc://local1") -(define address-inproc-2 "inproc://local2") - -;;; -;;; Req-Rep -;;; -(define (make-listening-reply-socket address) - (let ((socket (make-rep-socket))) - (socket-set! socket 'nng/recvtimeo 2000) - (nng-listen socket address) - socket)) - -(define (make-dialed-request-socket address) - (let ((socket (make-req-socket))) - (socket-set! socket 'nng/recvtimeo 2000) - (nng-dial socket address) - socket)) - -(define (req-rep-test address) - (let ((rep (make-listening-reply-socket address)) - (req (make-dialed-request-socket address))) - (nng-send req "message 1") - (nng-recv rep) - (nng-send rep "message") - (begin0 - (nng-recv req) - (nng-close! rep)))) - -(define (do-test) - (test-group "nng" - (test "tcp req-rep" - "message" - (req-rep-test address-tcp-1)) - (test "inproc req-rep" - "message" - (req-rep-test address-inproc-1))) - (test-exit)) - -;; this should be run in a thread -(define (run-listener-responder socket myaddr) - (let loop ((status 'running)) - (let* ((msg (nng-recv socket)) - (response (process-message msg))) - (if (not (eq? response 'done)) - (begin - (nng-send socket response) - (loop status)))))) - -(define *channels* (make-hash-table)) - -(define (call channels msg addr) - (let* ((csocket (hash-table-ref/default channels addr #f)) - (socket (or csocket (make-dialed-request-socket addr)))) - (nng-send socket msg) - (print "Sent: "msg", received: "(nng-recv socket)) - (if (not (hash-table-exists? channels addr)) - (hash-table-set! channels addr socket)))) +(define (call uconn msg addr) + (print "Sent: "msg", received: " + (send-receive uconn addr 'hello msg))) ;; start => hello 0 ;; hello 0 => hello 1 ;; hello 1 => hello 2 ;; ... @@ -112,32 +54,29 @@ "hello 0")))) (define (main) (match (command-line-arguments) - (("do-test")(do-test)) - ((run myaddr) + ((run myport) ;; start listener ;; put myaddr into file by host-pid in .runners ;; for 1 minute ;; get all in .runners ;; call each with a message ;; - (let* ((endtimes (+ (current-seconds) 20)) ;; run for 20 seconds - (socket (make-listening-reply-socket myaddr)) - (rfile (conc ".runners/"(get-host-name)"-"(current-process-id))) - (th1 (make-thread (lambda () - (run-listener-responder socket myaddr) - ) - "responder"))) + (let* ((port (string->number myport)) + (endtimes (+ (current-seconds) 20)) ;; run for 20 seconds + (handler (lambda (rem-host-port qrykey cmd params) + (process-message params))) + (uconn (run-listener handler myport)) + (rfile (conc ".runners/"(get-host-name)"-"(current-process-id)))) (if (not (and (file-exists? ".runners") (directory? ".runners"))) (create-directory ".runners" #t)) (with-output-to-file rfile (lambda () - (print myaddr))) - (thread-start! th1) + (print myport))) (let loop ((entries '())) (if (> (current-seconds) endtimes) (begin (delete-file* rfile) (sleep 1) @@ -144,18 +83,18 @@ (exit)) (if (null? entries) (loop (glob ".runners/*")) (let* ((entry (car entries)) (destaddr (with-input-from-file entry read-line))) - (call *channels* (conc "hello-from-"destaddr) destaddr) + (call uconn (conc "hello-from-"myport"to-"destaddr) destaddr) ;; (thread-sleep! 0.025) (loop (cdr entries)))))))) ((cmd)(print "ERROR: command "cmd", not recognised.\n\n"help)) (else (print help)))) ) ;; end module -(import nng-test) +(import ulex-test) (main) Index: ulex.scm ================================================================== --- ulex.scm +++ ulex.scm @@ -18,7 +18,7 @@ ;;====================================================================== (declare (unit ulex)) -;; (include "ulex/ulex.scm") -(include "ulex-simple/ulex.scm") +(include "ulex/ulex.scm") +;; (include "ulex-simple/ulex.scm") Index: ulex/ulex.scm ================================================================== --- ulex/ulex.scm +++ ulex/ulex.scm @@ -24,11 +24,12 @@ ;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity. ;; ;;====================================================================== (module ulex - ( + * + #;( ;; NOTE: looking for the handler proc - find the run-listener :) run-listener ;; (run-listener handler-proc [port]) => uconn @@ -107,18 +108,18 @@ ) ;; Parameters ;; work-method: -(define work-method (make-parameter 'direct)) +(define work-method (make-parameter 'mailbox)) ;; mailbox - all rdat goes through mailbox ;; threads - all rdat immediately executed in new thread ;; direct - no queuing ;; ;; return-method, return the result to waiting send-receive: -(define return-method (make-parameter 'direct)) +(define return-method (make-parameter 'mailbox)) ;; mailbox - create a mailbox and use it for passing returning results to send-receive ;; polling - put the result in a hash table keyed by qrykey and send-receive can poll it for result ;; direct - no queuing, result is passed back in single tcp connection ;; @@ -218,11 +219,11 @@ ;; (define (send udata host-port qrykey cmd params) (let* ((my-host-port (udat-host-port udata)) ;; remote will return to this (isme #f #;(equal? host-port my-host-port)) ;; calling myself? ;; dat is a self-contained work block that can be sent or handled locally - (dat (list my-host-port qrykey cmd params))) + (dat (list my-host-port qrykey cmd params #;(cons (current-seconds)(current-milliseconds))))) (cond (isme (ulex-handler udata dat)) ;; no transmission needed (else (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC? exn @@ -239,74 +240,88 @@ #f)))) (close-input-port inp) (close-output-port oup) ;; (mutex-unlock! *send-mutex*) res)))))))) ;; res will always be 'ack unless return-method is direct + +(define (send-via-polling uconn host-port cmd data) + (let* ((qrykey (make-cookie uconn)) + (sres (send uconn host-port qrykey cmd data))) + (case sres + ((ack) + (let loop ((start-time (current-milliseconds))) + (if (> (current-milliseconds)(+ start-time 10000)) ;; ten seconds timeout + (begin + (print "ULEX ERROR: timed out waiting for response from "host-port", "cmd" "data) + #f) + (let* ((result (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) ;; NOTE: we are re-using mboxes hash + (if result ;; result is '(status . result-data) or #f for nothing yet + (begin + (hash-table-delete! (udat-mboxes uconn) qrykey) + (cdr result)) + (begin + (thread-sleep! 0.01) + (loop start-time))))))) + (else + (print "ULEX ERROR: Communication failed? sres="sres) + #f)))) + +(define (send-via-mailbox uconn host-port cmd data) + (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse? + (qrykey (car cmbox)) + (mbox (cdr cmbox)) + (mbox-time (current-milliseconds)) + (sres (send uconn host-port qrykey cmd data))) ;; short res + (if (eq? sres 'ack) + (let* ((mbox-timeout-secs 120 #;(if (eq? 'primordial (thread-name (current-thread))) + #f + 120)) ;; timeout) + (mbox-timeout-result 'MBOX_TIMEOUT) + (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) + (mbox-receive-time (current-milliseconds))) + ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it? + (hash-table-delete! (udat-mboxes uconn) qrykey) + (if (eq? res 'MBOX_TIMEOUT) + (begin + (print "WARNING: mbox timed out for query "cmd", with data "data + ", waiting for response from "host-port".") + + ;; here it might make sense to clean up connection records and force clean start? + ;; NO. The progam using ulex needs to do the reset. Right thing here is exception + + #f) ;; convert to raising exception? + res)) + (begin + (print "ERROR: Communication failed? Got "sres) + #f)))) ;; send a request to the given host-port and register a mailbox in udata ;; wait for the mailbox data and return it ;; (define (send-receive uconn host-port cmd data) - (cond - ((member cmd '(ping goodbye)) ;; these are immediate - (send uconn host-port 'ping cmd data)) - ((eq? (work-method) 'direct) - ;; the result from send will be the actual result, not an 'ack - (send uconn host-port 'direct cmd data)) - (else - (case (return-method) - ((polling) - (let* ((qrykey (make-cookie uconn)) - (sres (send uconn host-port qrykey cmd data))) - (case sres - ((ack) - (let loop ((start-time (current-milliseconds))) - (if (> (current-milliseconds)(+ start-time 10000)) ;; ten seconds timeout - (begin - (print "ULEX ERROR: timed out waiting for response from "host-port", "cmd" "data) - #f) - (let* ((result (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) ;; NOTE: we are re-using mboxes hash - (if result ;; result is '(status . result-data) or #f for nothing yet - (begin - (hash-table-delete! (udat-mboxes uconn) qrykey) - (cdr result)) - (begin - (thread-sleep! 0.01) - (loop start-time))))))) - (else - (print "ULEX ERROR: Communication failed? sres="sres) - #f)))) - ((mailbox) - (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse? - (qrykey (car cmbox)) - (mbox (cdr cmbox)) - (mbox-time (current-milliseconds)) - (sres (send uconn host-port qrykey cmd data))) ;; short res - (if (eq? sres 'ack) - (let* ((mbox-timeout-secs 120 #;(if (eq? 'primordial (thread-name (current-thread))) - #f - 120)) ;; timeout) - (mbox-timeout-result 'MBOX_TIMEOUT) - (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) - (mbox-receive-time (current-milliseconds))) - ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it? - (hash-table-delete! (udat-mboxes uconn) qrykey) - (if (eq? res 'MBOX_TIMEOUT) - (begin - (print "WARNING: mbox timed out for query "cmd", with data "data", waiting for response from "host-port".") - - ;; here it might make sense to clean up connection records and force clean start? - ;; NO. The progam using ulex needs to do the reset. Right thing here is exception - - #f) ;; convert to raising exception? - res)) - (begin - (print "ERROR: Communication failed? Got "sres) - #f)))) - (else - (print "ULEX ERROR: unrecognised return-method "(return-method)".") - #f))))) + (let* ((start-time (current-milliseconds)) + (result (cond + ((member cmd '(ping goodbye)) ;; these are immediate + (send uconn host-port 'ping cmd data)) + ((eq? (work-method) 'direct) + ;; the result from send will be the actual result, not an 'ack + (send uconn host-port 'direct cmd data)) + (else + (case (return-method) + ((polling) + (send-via-polling uconn host-port cmd data)) + ((mailbox) + (send-via-mailbox uconn host-port cmd data)) + (else + (print "ULEX ERROR: unrecognised return-method "(return-method)".") + #f)))))) + ;; this is ONLY for development and debugging. It will be removed once Ulex is stable. + (if (< 5000 (- (current-milliseconds) start-time)) + (print "ULEX WARNING: round-trip took over 5 seconds; " + cmd", host-port="host-port", data="data)) + result)) + ;;====================================================================== ;; responder side ;;====================================================================== @@ -315,11 +330,11 @@ ;; Reserved cmds; ack ping goodbye response ;; (define (ulex-handler uconn rdat) (assert (list? rdat) "FATAL: ulex-handler give rdat as not list") (match rdat ;; (string-split controldat) - ((rem-host-port qrykey cmd params) + ((rem-host-port qrykey cmd params);; timedata) ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params) (case cmd ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack) ((ping) ;; (print "Got Ping!")