Index: client.scm ================================================================== --- client.scm +++ client.scm @@ -54,11 +54,11 @@ (define (client:setup run-id #!key (remaining-tries 10)) (debug:print-info 2 *default-log-port* "client:setup remaining-tries=" remaining-tries) (let* ((server-dat (tasks:bb-get-server-info run-id)) (transport (if server-dat (string->symbol (tasks:hostinfo-get-transport server-dat)) 'noserver))) - ;;(BB> "transport >"transport"< string? transport >"(string? transport)"< symbol? transport >"(symbol? transport)"<") + (case transport ((noserver) ;; no server registered (if (<= remaining-tries 0) (begin (debug:print-error 0 *default-log-port* "failed to start or connect to server for run-id " run-id) Index: fs-transport.scm ================================================================== --- fs-transport.scm +++ fs-transport.scm @@ -14,11 +14,10 @@ (import (prefix sqlite3 sqlite3:)) (use spiffy uri-common intarweb http-client spiffy-request-vars) ;;(tcp-buffer-size 2048) -(BB> "HEY TURNING OFF tcp-buffer-size TO TEST FOR RPC SIDE EFFECT> TURN BACK ON BEFORE PRODUCTION") (declare (unit fs-transport)) (declare (uses common)) (declare (uses db)) Index: launch.scm ================================================================== --- launch.scm +++ launch.scm @@ -68,14 +68,11 @@ (csvr (db:logpro-dat->csv dat stepname)) (csvt (let-values (( (fmt-cell fmt-record fmt-csv) (make-format ","))) (fmt-csv (map list->csv-record csvr)))) (status (configf:lookup dat "final" "exit-status")) (msg (configf:lookup dat "final" "message"))) - ;;(if csvt ;; this if blocked stack dump caused by .dat file from logpro being 0-byte. fixed by upgrading logpro - (rmt:csv->test-data run-id test-id csvt) - ;; (BB> "Error: run-id/test-id/stepname="run-id"/"test-id"/"stepname" => bad csvr="csvr) - ;; ) + (rmt:csv->test-data run-id test-id csvt) (cond ((equal? status "PASS") "PASS") ;; skip the message part if status is pass (status (conc (configf:lookup dat "final" "exit-status") ": " (if msg msg "no message"))) (else #f))) #f))) @@ -828,11 +825,10 @@ (directory-exists? *toppath*)) (setenv "MT_RUN_AREA_HOME" *toppath*) (begin (debug:print-error 0 *default-log-port* "failed to find the top path to your Megatest area."))) (server:set-transport) - ;;(BB> "Transport is >"*transport-type*"<") *toppath*)) (define (get-best-disk confdat testconfig) (let* ((disks (or (and testconfig (hash-table-ref/default testconfig "disks" #f)) (hash-table-ref/default confdat "disks" #f))) Index: rpc-transport.scm ================================================================== --- rpc-transport.scm +++ rpc-transport.scm @@ -27,30 +27,32 @@ (define *heartbeat-mutex* (make-mutex)) (define *server-loop-heart-beat* (current-seconds)) ;; procstr is the name of the procedure to be called as a string - -(define (rpc-transport:autoremote procstr params) - (print "BB> rpc-transport:autoremote entered with procstr="procstr" params="params" string?"(string? procstr)" symbol?"(symbol? procstr)" list?"(list? params) ) +(define (rpc-transport:autoremote procstr params) ;; may be unused, I think api-exec deprecates this one. (let* ((procsym (if (symbol? procstr) procstr (string->symbol (->string procstr)))) (res - (begin (print "BB>before apply") (apply (eval procsym) params)))) - (print "BB> after apply; rpc-transport res="res) - res - )) + (begin + (apply (eval procsym) params)))) + res)) ;; rpc receiver (define (rpc-transport:api-exec cmd params) - (BB> "rpc-transport:api-exec cmd="cmd" params="params" inmemdb="*inmemdb*) (let* ( (resdat (api:execute-requests *inmemdb* (vector cmd params))) ;; #( flag result ) (flag (vector-ref resdat 0)) (res (vector-ref resdat 1))) - (BB> "rpc-transport:api-exec flag="flag" res="res) + + (mutex-lock! *heartbeat-mutex*) + + (set! *last-db-access* (current-seconds)) ;; bump *last-db-access*; this will renew keep-running thread's lease on life for another (server:get-timeout) seconds + (BB> "in api-exec; last-db-access updated to "*last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + res)) ;; (handle-exceptions ;; exn @@ -147,24 +149,20 @@ (if chatty (print " + last try failed with exception- return canned failure value >"failure-value"<")) failure-value)))))))) (define (rpc-transport:server-shutdown server-id rpc:listener #!key (from-on-exit #f)) - (BB> "rpc-transport:server-shutdown entered.") (on-exit (lambda () #t)) ;; turn off on-exit stuff ;;(tcp-close rpc:listener) ;; gotta exit nicely ;;(tasks:bb-server-set-state! server-id "stopped") ;; TODO: (low) the following is extraordinaritly slow. Maybe we don't even need portlogger for rpc anyway?? the exception-based failover when ports are taken is fast! - ;;(BB> "before plog rel") ;;(portlogger:open-run-close portlogger:set-port (rpc:default-server-port) "released") (set! *time-to-exit* #t) - (BB> "before db:sync-touched") (if *inmemdb* (db:sync-touched *inmemdb* *run-id* force-sync: #t)) - (BB> "before bb-server-delete-record") (tasks:bb-server-delete-record server-id " rpc-transport:keep-running complete") (BB> "Before (exit) (from-on-exit="from-on-exit")") (unless from-on-exit (exit)) ;; sometimes we hang (around) here with 100% cpu. (BB> "After") ;; strace reveals endless: @@ -189,11 +187,10 @@ ;; all routes though here end in exit ... ;; ;; start_server? ;; (define (rpc-transport:launch run-id) - (BB> "rpc-transport:launch fired for run-id="run-id) (set! *run-id* run-id) ;; send to background if requested (when (args:get-arg "-daemonize") (daemon:ize) @@ -208,11 +205,11 @@ ;; let's get a server-id for this server ;; if at first we do not suceed, try 3 more times. (let ((server-id (retry-thunk (lambda () (tasks:bb-server-lock-slot run-id 'rpc)) - chatty: #t + chatty: #f retries: 4))) (when (not server-id) ;; dang we couldn't get a server-id. ;; since we didn't get the server lock we are going to clean up and bail out (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") (tasks:bb-server-delete-records-for-this-pid " rpc-transport:launch") @@ -254,27 +251,25 @@ (res #f) (run-remote (rpc:procedure 'rpc-transport:autoremote iface port)) (api-exec (rpc:procedure 'api-exec iface port)) (send-receive (lambda () (tcp-buffer-size 0) - (BB> "Entered SR run-id="run-id" cmd="cmd" params="params" iface="iface" port="port) (set! res (retry-thunk (lambda () (condition-case ;;(vector #t (run-remote cmd params)) (vector 'success (api-exec cmd params)) [x (exn i/o net) (vector 'comms-fail (conc "communications fail ["(->string x)"]") x)] [x () (vector 'other-fail "other fail ["(->string x)"]" x)])) - chatty: #t + chatty: #f accept-result?: (lambda(x) (and (vector? x) (vector-ref x 0))) retries: 4 back-off-factor: 1.5 random-wait: 0.2 retry-delay: 0.1 final-failure-returns-actual: #t)) - (BB> "Leaving SR w/ "res) res )) (th1 (make-thread send-receive "send-receive")) (time-out-reached #f) (time-out (lambda () @@ -295,11 +290,10 @@ ((comms-fail) (debug:print 0 *default-log-port* "WARNING: comms failure for rpc request") ;;(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn)) (vector #f (vector-ref res 1))) (else - (BB> "res="res) (debug:print-error 0 *default-log-port* "error occured at server, info=" (vector-ref res 1)) (debug:print 0 *default-log-port* " client call chain:") (print-call-chain (current-error-port)) (debug:print 0 *default-log-port* " server call chain:") (pp (vector-ref res 1) (current-error-port)) @@ -310,24 +304,20 @@ 'message "nmsg-transport:client-api-send-receive-raw timed out talking to server")))))) (define (rpc-transport:run hostn run-id server-id) - (BB> "rpc-transport:run fired for hostn="hostn" run-id="run-id" server-id="server-id) (debug:print 2 *default-log-port* "Attempting to start the rpc server ...") ;; (trace rpc:publish-procedure!) ;;====================================================================== ;; start of publish-procedure section ;;====================================================================== (rpc:publish-procedure! 'server:login server:login) ;; this allows client to validate it is the same megatest instance as the server. No security here, just making sure we're in the right room. - (BB> "published 'testing") (rpc:publish-procedure! 'testing (lambda () - (BB> "Current-peer=["(rpc:current-peer)"]") - (BB> "published rpc proc 'testing was invoked") "Just testing")) ;; procedure to receive arbitrary API request from client's rpc:send-receive/rpc-transport:client-api-send-receive (rpc:publish-procedure! 'rpc-transport:autoremote rpc-transport:autoremote) ;; can use this to run most anything at the remote @@ -338,46 +328,45 @@ ;; end of publish-procedure section ;;====================================================================== (let* ((db #f) - (hostname (let ((res (get-host-name))) (BB> "hostname="res) res)) + (hostname (let ((res (get-host-name))) res)) (server-start-time (current-seconds)) (server-timeout (server:get-timeout)) (ipaddrstr (let* ((ipstr (if (string=? "-" hostn) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") (server:get-best-guess-address hostname) #f)) (res (if ipstr ipstr hostn))) - (BB> "ipaddrstr="res) res)) ;; hostname))) - (start-port (let ((res (portlogger:open-run-close portlogger:find-port))) (BB> "start-port="res) res)) + (start-port (let ((res (portlogger:open-run-close portlogger:find-port))) ;; BB> TODO: remove portlogger! + res)) (link-tree-path (configf:lookup *configdat* "setup" "linktree")) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; rpc:listener is the tcp-listen result from inside the find-free-port-and-open complex. ;; It is our handle on the listening tcp port ;; We will attach this to our rpc server with rpc:make-server in thread th1 . (rpc:listener (rpc-transport:find-free-port-and-open start-port)) (th1 (make-thread (lambda () - (BB> "+++ before rpc:make-server "rpc:listener) - ;;(cute (rpc:make-server rpc:listener) "rpc:server") - ((rpc:make-server rpc:listener) #t) - (BB> "--- after rpc:make-server")) + ((rpc:make-server rpc:listener) #t) ) "rpc:server")) (hostname (if (string=? "-" hostn) (get-host-name) hostn)) (ipaddrstr (if (string=? "-" hostn) (server:get-best-guess-address hostname) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") #f)) - (portnum (let ((res (rpc:default-server-port))) (BB> "rpc:default-server-port="res" rpc-listener-port="*rpc-listener-port*) res)) + (portnum (let ((res (rpc:default-server-port))) res)) (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" portnum))) + + ;; BB> TODO: remove portlogger! ;; if rpc found it needed a different port than portlogger provided, keep portlogger in the loop. ;; (when (not (equal? start-port portnum)) ;; (BB> "portlogger proffered "start-port" but rpc grabbed "portnum) ;; (portlogger:open-run-close portlogger:set-port start-port "released") ;; (portlogger:open-run-close portlogger:take-port portnum)) @@ -385,28 +374,22 @@ (tasks:bb-server-set-interface-port server-id ipaddrstr portnum) ;;============================================================ ;; activate thread th1 to attach opened tcp port to rpc server ;;============================================================= - (BB> "Got here before thread start of rpc listener") (thread-start! th1) - (BB> "started rpc server thread th1="th1) - (set! db *inmemdb*) (debug:print 0 *default-log-port* "Server started on " host:port) - (thread-sleep! 8) - (BB> "before self test") + (thread-sleep! 2) (if (rpc-transport:self-test run-id ipaddrstr portnum) - (BB> "Pass self-test.") + (debug:print 0 *default-log-port* "INFO: rpc self test passed!") (begin - (print "Error: rpc listener did not pass self test. Shutting down.") + (debug:print 0 *default-log-port* "Error: rpc listener did not pass self test. Shutting down. On: " host:port) (exit))) - (BB> "after self test") - (on-exit (lambda () (rpc-transport:server-shutdown server-id rpc:listener from-on-exit: #t))) ;; check again for running servers for this run-id in case one has snuck in since we checked last in rpc-transport:launch @@ -430,10 +413,11 @@ ;; if no requests received within the last 20 seconds : ;; database hasnt changed in ?? ;; ;; begin new loop + ;; keep-running loop: polls last-db-access to see if we have timed out. (let loop ((count 0) (bad-sync-count 0)) ;; Use this opportunity to sync the inmemdb to db (let ((start-time (current-milliseconds)) @@ -477,10 +461,11 @@ ;; (set! port (cadr sdat)))) ;; Transfer *last-db-access* to last-access to use in checking that we are still alive (mutex-lock! *heartbeat-mutex*) (set! last-access *last-db-access*) + (BB> "in rpc-transport:run ; last-access="last-access) (mutex-unlock! *heartbeat-mutex*) ;; (debug:print 11 *default-log-port* "last-access=" last-access ", server-timeout=" server-timeout) ;; ;; no_traffic, no running tests, if server 0, no running servers @@ -505,33 +490,14 @@ ;; ;; (if (tasks:server-am-i-the-server? tdb run-id) ;; (tasks:server-set-state! tdb server-id "running")) ;; (loop 0 bad-sync-count)) - (rpc-transport:server-shutdown server-id rpc:listener)))) + (begin + (BB> "SERVER SHUTDOWN CALLED! last-access="last-access" current-seconds="(current-seconds)" server-timeout="server-timeout) + (rpc-transport:server-shutdown server-id rpc:listener))))) ;; end new loop - - ;; ;; begin old loop - ;; (let loop ((count 0)) - ;; (BB> "Found top of rpc-transport:run stay-alive loop.") - ;; (thread-sleep! 5) ;; no need to do this very often - ;; (let ((numrunning -1)) ;; (db:get-count-tests-running db))) - ;; (if (or (> numrunning 0) - ;; (> (+ *last-db-access* 60)(current-seconds))) - ;; (begin - ;; (debug:print-info 0 *default-log-port* "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*)) - ;; (loop (+ 1 count))) - ;; (begin - ;; (debug:print-info 0 *default-log-port* "Starting to shutdown the server side") - ;; (open-run-close tasks:server-delete-record tasks:open-db server-id " rpc-transport:try-start-server stop") - ;; (thread-sleep! 10) - ;; (debug:print-info 0 *default-log-port* "Max cached queries was " *max-cache-size*) - ;; (debug:print-info 0 *default-log-port* "Server shutdown complete. Exiting") - ;; )))) - ;; ;; end old loop - - )))) (define (rpc-transport:find-free-port-and-open port #!key ) (handle-exceptions @@ -542,13 +508,11 @@ (rpc:default-server-port port) (set! *rpc-listener-port* port) ;; a bit paranoid about rpc:default-server-port parameter not changing across threads (as params are wont to do). keeping this global in my back pocket in case this causes problems (set! *rpc-listener-port-bind-timestamp* (current-milliseconds)) ;; may want to test how long it has been since the last bind attempt happened... (tcp-read-timeout 240000) (tcp-buffer-size 0) ;; gotta do this because http-transport undoes it. - (BB> "rpc-transport> attempting to bind tcp port "port) (tcp-listen (rpc:default-server-port) 10000) - ;;(tcp-listen (rpc:default-server-port) ) )) (define (rpc-transport:ping run-id host port) (handle-exceptions exn @@ -563,33 +527,24 @@ (begin (print "LOGIN_FAILED") (exit 1)))))) (define (rpc-transport:self-test run-id host port) - (BB> "SELF TEST RPC ... *toppath*="*toppath*) - (BB> "local: [" (server:login *toppath*) "]") - ;(handle-exceptions - ;exn - ;(begin - ; (BB> "SERVER_NOT_FOUND") - ; #f) (tcp-buffer-size 0) ;; gotta do this because http-transport undoes it. (let* ((testing-res ((rpc:procedure 'testing host port))) (login-res ((rpc:procedure 'server:login host port) *toppath*)) (res (and login-res (equal? testing-res "Just testing")))) - - (BB> "testing-res = >"testing-res"<") - (BB> "login-res = >"testing-res"<") - (if login-res - (begin - (BB> "LOGIN_OK") - #t) - (begin - (BB> "LOGIN_FAILED") - #f)) - (BB> "self test res="res) - res));) + + (if login-res + (begin + (BB> "Self test PASS. login-res="login-res" testing-res="testing-res" *toppath*="*toppath*) + #t) + (begin + (BB> "Self test fail. login-res="login-res" testing-res="testing-res" *toppath*="*toppath*) + + #f)) + res)) (define (rpc-transport:client-setup run-id server-dat #!key (remtries 10)) (tcp-buffer-size 0) (debug:print-info 0 *default-log-port* "rpc-transport:client-setup run-id="run-id" server-dat=" server-dat ", remaining-tries=" remtries) (let* ((iface (tasks:hostinfo-get-interface server-dat)) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -48,11 +48,10 @@ ;; all routes though here end in exit ... ;; ;; start_server ;; (define (server:launch run-id transport-type) - (BB> "server:launch fired for run-id="run-id" transport-type="transport-type) (let ((ttype (if (symbol? transport-type) transport-type (string->symbol (->string transport-type))))) (case ttype ((http)(http-transport:launch run-id)) ;;((nmsg)(nmsg-transport:launch run-id)) ((rpc) (rpc-transport:launch run-id)) @@ -69,11 +68,10 @@ (define (server:set-transport) (let ((ttype (string->symbol (or (args:get-arg "-transport") (configf:lookup *configdat* "server" "transport") "rpc")))) - (BB> "TRANSPORT IS "ttype" string?"(string? ttype)" symbol?"(symbol? ttype)) (set! *transport-type* ttype) ttype)) ;; Get the transport -- DO NOT call this from client code. In client code, this is run-id sensitive and not a global ;; For code communicating with existing run-id with a server, use: (rmt:run-id->transport-type run-id) @@ -201,10 +199,12 @@ (let* ((transport-type (rmt:run-id->transport-type run-id)) (res (case transport-type ((http)(server:ping-server run-id (tasks:hostinfo-get-interface server) (tasks:hostinfo-get-port server))) + ((rpc) ((rpc:procedure 'server:login (tasks:hostinfo-get-interface server) (tasks:hostinfo-get-port server)) *toppath*)) + (else (debug:print-error 0 *default-log-port* "(5) Transport [" transport-type "] specified for run-id [" run-id "] is not implemented in rmt:send-receive. Cannot proceed.") (exit 1))))) @@ -269,11 +269,10 @@ ;; Client will call this procedure on the server via the low-level transport (http/rpc/etc) to verify its toppath matches the server's toppath. ;; A true result means client and server are associated with same megatest instance, share the same megatest.config, etc...) A false result means the client should not talk to this server. (define (server:login toppath) (set! *last-db-access* (current-seconds)) - (BB> "server:login ours="*toppath*" theirs="toppath) (if (equal? *toppath* toppath) (begin ;; (debug:print-info 2 *default-log-port* "login successful") #t) (begin