Index: launchmod.scm ================================================================== --- launchmod.scm +++ launchmod.scm @@ -1888,11 +1888,11 @@ ;; 0 RUNNING ==> this is actually the first condition, should not get here (define (runs:end-of-run-check run-id ) (let* ((not-completed-cnt (rmt:get-not-completed-cnt run-id)) (running-cnt (rmt:get-count-tests-running-for-run-id run-id)) - (all-test-launched (rmt:get-var run-id (conc "lunch-complete-" run-id))) + (all-test-launched (rmt:get-var run-id (conc "launch-complete-" run-id))) (current-state (rmt:get-run-state run-id)) (current-status (rmt:get-run-status run-id))) ;;get-vars run-id to query metadata table to check if all completed. if all-test-launched = yes then only not-completed-cnt = 0 means everyting is completed if no entry found in the table do nothing (debug:print 0 *default-log-port* "Running test cnt :" running-cnt) (rmt:set-state-status-and-roll-up-run run-id current-state current-status) Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -156,13 +156,22 @@ rmtmod runsmod servermod tasksmod testsmod - + + ulex ) - + +;; ;; ulex parameters +;; (work-method 'direct) +;; (return-method 'direct) + + ;; ulex parameters + (work-method 'mailbox) + (return-method 'mailbox) + ;; fake out readline usage of toplevel-command (define (toplevel-command . a) #f) (define *didsomething* #f) (define *db* #f) ;; this is only for the repl, do not use in general!!!! Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -1792,11 +1792,14 @@ "ERROR: get-pkts-dir called without *toppath* set. Exiting.") (let* ((pdir (conc effective-toppath "/.meta/srvpkts"))) (if (file-exists? pdir) pdir (begin - (create-directory pdir #t) + (handle-exceptions ;; this exception handler should NOT be needed but ... + exn + pdir + (create-directory pdir #t)) pdir))))) ;; given a pkts dir read ;; (define (get-all-server-pkts pktsdir-in pktspec) Index: runsmod.scm ================================================================== --- runsmod.scm +++ runsmod.scm @@ -499,11 +499,11 @@ ;; run the run prehook if there are no tests yet run for this run: ;; (runs:run-pre-hook run-id) ;; mark all test launched flag as false in the meta table - (rmt:set-var run-id (conc "lunch-complete-" run-id) "no") + (rmt:set-var run-id (conc "launch-complete-" run-id) "no") (debug:print-info 1 *default-log-port* "Setting end-of-run to no") (let* ((config-reruns (let ((x (configf:lookup *configdat* "setup" "reruns"))) (if x (string->number x) #f))) (config-rerun-cnt (if config-reruns config-reruns @@ -1709,11 +1709,11 @@ (else (debug:print-info 4 *default-log-port* "cond branch - " "rtq-9") (debug:print-info 4 *default-log-port* "Exiting loop with...\n hed=" hed "\n tal=" tal "\n reruns=" reruns)) ))) ;; end loop on sorted test names ;; this is the point where everything is launched and now you can mark the run in metadata table as all launched - (rmt:set-var run-id (conc "lunch-complete-" run-id) "yes") + (rmt:set-var run-id (conc "launch-complete-" run-id) "yes") ;; now *if* -run-wait we wait for all tests to be done ;; Now wait for any RUNNING tests to complete (if in run-wait mode) ;; (if (runs:dat-load-mgmt-function runsdat)((runs:dat-load-mgmt-function runsdat))) (thread-sleep! 10) ;; I think there is a race condition here. Let states/statuses settle Index: tests/simplerun/Makefile ================================================================== --- tests/simplerun/Makefile +++ tests/simplerun/Makefile @@ -1,5 +1,5 @@ cleanup : killall mtest dboard -v -9 || true - rm -rf *.log *.bak NB* logs/* .meta .db + rm -rf *.log *.bak NB* logs/* .meta .db ../simpleruns/* lt Index: tests/simplerun/debug.scm ================================================================== --- tests/simplerun/debug.scm +++ tests/simplerun/debug.scm @@ -22,27 +22,32 @@ )) (define (run) (let* ((th1 (make-thread (lambda () - (let loop ((r (make-run-id)) - (i 1)) - (let ((start-time (current-milliseconds))) - (rmt:register-test r "test1" (conc "item_" i)) - (let ((qry-time (- (current-milliseconds) start-time))) - (if (> qry-time 500) - (print "WARNING: rmt:register-test took more than 500ms, "qry-time"ms")))) - (if (eq? (modulo i 100) 0) - (print "For run-id="r", num tests registered="i)) - (if (< i 100000) - (loop r (+ i 1)) - (if (< r 100) - (begin - (print "get-tests-for-run "r) - (rmt:get-tests-for-run r "%" '() '() 0 #f #f #f #f #f 0 #f) - (loop (+ r 1) 0))))) - )))) + (let loop ((r 0) + (i 1) + (s 0)) ;; sum + (let ((start-time (current-milliseconds)) + (run-id (+ r (make-run-id)))) + (rmt:register-test run-id "test1" (conc "item_" i)) + (let* ((qry-time (- (current-milliseconds) start-time)) + (tot-query-time (+ qry-time s)) + (avg-query-time (* 1.0 (/ tot-query-time i)))) + (if (> qry-time 500) + (print "WARNING: rmt:register-test took more than 500ms, "qry-time"ms, i="i", avg-query-time="avg-query-time)) + (if (eq? (modulo i 100) 0) + (print "For run-id="run-id", "(rmt:get-keys-write)" num tests registered="i)) + (if (< i 500) + (loop r (+ i 1) tot-query-time) + (if (< r 100) + (let* ((start-time (current-milliseconds))) + (print "rmt:get-keys "(rmt:get-keys)" in "(- (current-milliseconds) start-time)) + (print "Got "(length (rmt:get-tests-for-run run-id "%" '() '() 0 #f #f #f #f #f 0 #f))" tests for run "run-id) + (print "Average query time: "avg-query-time) + (loop (+ r 1) 0 tot-query-time)))))))) + ))) (thread-start! th1) (thread-join! th1))) ) Index: tests/simplerun/megatest.config ================================================================== --- tests/simplerun/megatest.config +++ tests/simplerun/megatest.config @@ -38,11 +38,11 @@ [validvalues] state start end completed # Job tools are more advanced ways to control how your jobs are launched [jobtools] -useshell yes +# useshell yes launcher nbfake # You can override environment variables for all your tests here [env-override] EXAMPLE_VAR example value Index: tests/tests.scm ================================================================== --- tests/tests.scm +++ tests/tests.scm @@ -22,13 +22,17 @@ chicken.string chicken.process-context chicken.file chicken.pretty-print commonmod + ulex ) (define test-work-dir (current-directory)) + +(work-method 'mailbox) ;; threads, direct, mailbox +(return-method 'mailbox) ;; polling, mailbox, direct ;; given list of lists ;; ( ( msg expected param1 param2 ...) ;; ( ... ) ) ;; apply test to all Index: ulex/ulex.scm ================================================================== --- ulex/ulex.scm +++ ulex/ulex.scm @@ -50,11 +50,14 @@ udat-port udat-host-port ;; for testing only ;; pp-uconn - work-method ;; parameter; 'threads, 'mailbox, 'limited, 'direct + + ;; parameters + work-method ;; parameter; 'threads, 'mailbox, 'limited, 'direct + return-method ;; parameter; 'mailbox, 'polling, 'direct ) (import scheme chicken.base chicken.file @@ -104,17 +107,23 @@ ) ;; Parameters ;; work-method: +(define work-method (make-parameter 'direct)) ;; mailbox - all rdat goes through mailbox ;; threads - all rdat immediately executed in new thread -;; limited - run rdats in immediately executed threads until NthreadsMax -;; reached, then put in mailbox ;; direct - no queuing ;; -(define work-method (make-parameter 'threads)) + +;; return-method, return the result to waiting send-receive: +(define return-method (make-parameter 'direct)) +;; mailbox - create a mailbox and use it for passing returning results to send-receive +;; polling - put the result in a hash table keyed by qrykey and send-receive can poll it for result +;; direct - no queuing, result is passed back in single tcp connection +;; + ;; ;; struct for keeping track of others we are talking to ;; ;; ;; (defstruct pdat ;; (host-port #f) ;; (conns '()) ;; list of pcon structs, pop one off when calling the peer @@ -209,66 +218,95 @@ ;; (define (send udata host-port qrykey cmd params) (let* ((my-host-port (udat-host-port udata)) ;; remote will return to this (isme #f #;(equal? host-port my-host-port)) ;; calling myself? ;; dat is a self-contained work block that can be sent or handled locally - (dat (list my-host-port qrykey cmd params)) - ) - (if isme - (ulex-handler udata dat) ;; no transmission needed - (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC? - exn - #f - (begin - ; (mutex-lock! *send-mutex*) + (dat (list my-host-port qrykey cmd params))) + (cond + (isme (ulex-handler udata dat)) ;; no transmission needed + (else + (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC? + exn + #f + (begin + ;; (mutex-lock! *send-mutex*) (let-values (((inp oup)(tcp-connect host-port))) (let ((res (if (and inp oup) (begin (serialize dat oup) - (deserialize inp)) ;; yes, we always want an ack + (deserialize inp)) (begin (print "ERROR: send called but no receiver has been setup. Please call setup first!") #f)))) (close-input-port inp) (close-output-port oup) - ; (mutex-unlock! *send-mutex*) - res))))))) ;; res will always be 'ack - + ;; (mutex-unlock! *send-mutex*) + res)))))))) ;; res will always be 'ack unless return-method is direct + ;; send a request to the given host-port and register a mailbox in udata ;; wait for the mailbox data and return it ;; (define (send-receive uconn host-port cmd data) (cond ((member cmd '(ping goodbye)) ;; these are immediate (send uconn host-port 'ping cmd data)) + ((eq? (work-method) 'direct) + ;; the result from send will be the actual result, not an 'ack + (send uconn host-port 'direct cmd data)) (else - (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse? - (qrykey (car cmbox)) - (mbox (cdr cmbox)) - (mbox-time (current-milliseconds)) - (sres (send uconn host-port qrykey cmd data))) ;; short res - (if (eq? sres 'ack) - (let* ((mbox-timeout-secs 120 #;(if (eq? 'primordial (thread-name (current-thread))) - #f - 120)) ;; timeout) - (mbox-timeout-result 'MBOX_TIMEOUT) - (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) - (mbox-receive-time (current-milliseconds))) - ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it? - (hash-table-delete! (udat-mboxes uconn) qrykey) - (if (eq? res 'MBOX_TIMEOUT) - (begin - (print "WARNING: mbox timed out for query "cmd", with data "data", waiting for response from "host-port".") - - ;; here it might make sense to clean up connection records and force clean start? - ;; NO. The progam using ulex needs to do the reset. Right thing here is exception - - #f) ;; convert to raising exception? - res)) - (begin - (print "ERROR: Communication failed? Got "sres) - #f)))))) + (case (return-method) + ((polling) + (let* ((qrykey (make-cookie uconn)) + (sres (send uconn host-port qrykey cmd data))) + (case sres + ((ack) + (let loop ((start-time (current-milliseconds))) + (if (> (current-milliseconds)(+ start-time 10000)) ;; ten seconds timeout + (begin + (print "ULEX ERROR: timed out waiting for response from "host-port", "cmd" "data) + #f) + (let* ((result (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) ;; NOTE: we are re-using mboxes hash + (if result ;; result is '(status . result-data) or #f for nothing yet + (begin + (hash-table-delete! (udat-mboxes uconn) qrykey) + (cdr result)) + (begin + (thread-sleep! 0.01) + (loop start-time))))))) + (else + (print "ULEX ERROR: Communication failed? sres="sres) + #f)))) + ((mailbox) + (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse? + (qrykey (car cmbox)) + (mbox (cdr cmbox)) + (mbox-time (current-milliseconds)) + (sres (send uconn host-port qrykey cmd data))) ;; short res + (if (eq? sres 'ack) + (let* ((mbox-timeout-secs 120 #;(if (eq? 'primordial (thread-name (current-thread))) + #f + 120)) ;; timeout) + (mbox-timeout-result 'MBOX_TIMEOUT) + (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) + (mbox-receive-time (current-milliseconds))) + ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it? + (hash-table-delete! (udat-mboxes uconn) qrykey) + (if (eq? res 'MBOX_TIMEOUT) + (begin + (print "WARNING: mbox timed out for query "cmd", with data "data", waiting for response from "host-port".") + + ;; here it might make sense to clean up connection records and force clean start? + ;; NO. The progam using ulex needs to do the reset. Right thing here is exception + + #f) ;; convert to raising exception? + res)) + (begin + (print "ERROR: Communication failed? Got "sres) + #f)))) + (else + (print "ULEX ERROR: unrecognised return-method "(return-method)".") + #f))))) ;;====================================================================== ;; responder side ;;====================================================================== @@ -279,37 +317,42 @@ (define (ulex-handler uconn rdat) (assert (list? rdat) "FATAL: ulex-handler give rdat as not list") (match rdat ;; (string-split controldat) ((rem-host-port qrykey cmd params) ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params) - (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) - (case cmd - ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack) - ((ping) - ;; (print "Got Ping!") - ;; (add-to-work-queue uconn rdat) - 'ack) - ((goodbye) - ;; just clear out references to the caller - (add-to-work-queue uconn rdat) - 'ack) - ((response) ;; this is a result from remote processing, send it as mail ... - (if mbox - (begin - (mailbox-send! mbox params) ;; params here is our result - 'ack) - (begin - (print "ERROR: received result but no associated mbox for cookie "qrykey) - #f))) - (else - ;; (print "Got generic request: "cmd) - (add-to-work-queue uconn rdat) - 'ack)))) - (else - (print "BAD DATA? controldat=" rdat) - 'ack) ;; send ack anyway? - )) + (case cmd + ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack) + ((ping) + ;; (print "Got Ping!") + ;; (add-to-work-queue uconn rdat) + 'ack) + ((goodbye) + ;; just clear out references to the caller. NOT COMPLETE + (add-to-work-queue uconn rdat) + 'ack) + ((response) ;; this is a result from remote processing, send it as mail ... + (case (return-method) + ((polling) + (hash-table-set! (udat-mboxes uconn) qrykey (cons 'ok params)) + 'ack) + ((mailbox) + (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) + (if mbox + (begin + (mailbox-send! mbox params) ;; params here is our result + 'ack) + (begin + (print "ERROR: received result but no associated mbox for cookie "qrykey) + 'no-mbox-found)))) + (else (print "ULEX ERROR: unrecognised return-method "(return-method)) + 'bad-return-method))) + (else ;; generic request - hand it to the work queue + (add-to-work-queue uconn rdat) + 'ack))) + (else + (print "ULEX ERROR: bad rdat "rdat) + 'bad-rdat))) ;; given an already set up uconn start the cmd-loop ;; (define (ulex-cmd-loop uconn) (let* ((serv-listener (udat-socket uconn)) @@ -357,24 +400,28 @@ (do-work uconn rdat)) (else (print "ULEX ERROR: work-method "(work-method)" not recognised, using mailbox.") (mailbox-send! (udat-work-queue uconn) rdat)))) - +;; move the logic to return the result somewhere else? +;; (define (do-work uconn rdat) (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change ;; put this following into a do-work procedure (match rdat ((rem-host-port qrykey cmd params) (let* ((start-time (current-milliseconds)) (result (proc rem-host-port qrykey cmd params)) (end-time (current-milliseconds)) (run-time (- end-time start-time))) - (print "ULEX: work "cmd", "params" done in "run-time" ms") - ;; send 'response as cmd and result as params - (send uconn rem-host-port qrykey 'response result) ;; could check for ack - (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time)))) + (case (work-method) + ((direct) result) + (else + (print "ULEX: work "cmd", "params" done in "run-time" ms") + ;; send 'response as cmd and result as params + (send uconn rem-host-port qrykey 'response result) ;; could check for ack + (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time)))))) (MBOX_TIMEOUT #f) (else (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params"))))) ;; NEW APPROACH: