Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -80,11 +80,11 @@ ;; SERVER (define *my-client-signature* #f) ;; default preference for transport-type is set here ;; -(define *transport-type* 'http) ;; override with [server] transport http|rpc +(define *transport-type* 'rpc) ;; override with [server] transport http|rpc (define *runremote* (make-hash-table)) ;; if set up for server communication this will hold (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) (define *client-non-blocking-mode* #f) Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -79,11 +79,11 @@ (if connection-info ;; if we already have a connection for this run-id, use that precendent ;; use the server if have connection info (let* ((transport-type (vector-ref connection-info 6))) ;; BB: assumes all transport-type'-servertdat vector's item 6 ids transport type transport-type) - ;; otherwise pick the global default as preference. + ;; otherwise pick the global default as preference. (set in common.scm) *transport-type*))) (define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id ;; RA => e.g. usage (rmt:send-receive 'get-var #f (list varname)) @@ -117,15 +117,15 @@ (dat (case transport-type ;; BB: replaced *transport-type* global with run-id specific transport-type ((http)(condition-case (http-transport:client-api-send-receive run-id connection-info cmd params) ((commfail)(vector #f "communications fail")) ((exn)(vector #f "other fail")))) - ;;((rpc) (rpc-transport:client-api-send-receive run-id connection-info cmd params)) ;; BB: let us error out for now + ((rpc) (rpc-transport:client-api-send-receive run-id connection-info cmd params)) ;; BB: let us error out for now (else (debug:print-error 0 *default-log-port* "(1) Transport [" transport-type "] specified for run-id [" run-id - "] is not implemented in rmt:send-receive. Cannot proceed.") + "] is not implemented in rmt:send-receive. Cannot proceed." (symbol? transport-type)) (vector #f (conc "transport ["transport-type"] unimplemented"))))) (success (if (vector? dat) (vector-ref dat 0) #f)) (res (if (vector? dat) (vector-ref dat 1) #f))) @@ -138,18 +138,17 @@ (else (debug:print-error 0 *default-log-port* "(2) Transport [" transport-type "] specified for run-id [" run-id "] is not implemented in rmt:send-receive. Cannot proceed. Also unexpected since this branch follows success which would follow a suported transport...") #f) - ;; ((nmsg) res) )) ;; (vector-ref res 1))) - + ;; no success... (begin ;; let ((new-connection-info (client:setup run-id))) (debug:print 0 *default-log-port* "WARNING: Communication failed, trying call to rmt:send-receive again.") (case transport-type - ((http) + ((http rpc) (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. ;; (if (eq? (modulo attemptnum 5) 0) ;; (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications Index: rpc-transport.scm ================================================================== --- rpc-transport.scm +++ rpc-transport.scm @@ -27,29 +27,127 @@ (define *heartbeat-mutex* (make-mutex)) (define *server-loop-heart-beat* (current-seconds)) ;; procstr is the name of the procedure to be called as a string + (define (rpc-transport:autoremote procstr params) - (handle-exceptions - exn - (begin - (debug:print 1 *default-log-port* "Remote failed for " proc " " params) - (apply (eval (string->symbol procstr)) params)) - ;; (if *runremote* - ;; (apply (eval (string->symbol (conc "remote:" procstr))) params) - (apply (eval (string->symbol procstr)) params))) + (print "BB> rpc-transport:autoremote entered with procstr="procstr" params="params" string?"(string? procstr)" symbol?"(symbol? procstr)" list?"(list? params) ) + (let* ((procsym (if (symbol? procstr) + procstr + (string->symbol (->string procstr)))) + (res + (begin (print "BB>before apply") (apply (eval procsym) params)))) + (print "BB> after apply; rpc-transport res="res) + res + )) + + +;; rpc receiver +(define (rpc-transport:api-exec cmd params) + (BB> "rpc-transport:api-exec cmd="cmd" params="params" inmemdb="*inmemdb*) + (let* ( (resdat (api:execute-requests *inmemdb* (vector cmd params))) ;; #( flag result ) + (flag (vector-ref resdat 0)) + (res (vector-ref resdat 1))) + (BB> "rpc-transport:api-exec flag="flag" res="res) + res)) + + + ;; (handle-exceptions + ;; exn + ;; (begin + ;; (debug:print 0 *default-log-port* "Remote failed for " proc " " params " exn="exn) + ;; (apply (eval (string->symbol procstr)) params)) + ;; ;; (if *runremote* + ;; ;; (apply (eval (string->symbol (conc "remote:" procstr))) params) + ;; (apply (eval (string->symbol procstr)) params))) ;; retry an operation (depends on srfi-18) -(define (retry-thunk the-thunk #!key (accept-result? (lambda (x) x)) (retries 4) (wait-seconds-between-tries 0.2) (failure-value #f)) - (let loop ((res (the-thunk)) (retries-left retries)) - (cond - ((accept-result? res) res) - ((> retries-left 0) - (thread-sleep! wait-seconds-between-tries) - (loop (the-thunk) (sub1 retries-left))) - (else failure-value)))) +;; ================== +;; idea here is to avoid spending time on coding retrying something. Trying to be generic here. +;; +;; Exception handling: +;; ------------------- +;; if evaluating the thunk results in exception, it will be retried. +;; on last try, if final-failure-returns-actual is true, the exception will be re-thrown to caller. +;; +;; look at options below #!key to see how to configure behavior +;; +;; +(define (retry-thunk + the-thunk + #!key ;;;; options below + (accept-result? (lambda (x) x)) ;; retry if predicate applied to thunk's result is false + (retries 4) ;; how many tries + (failure-value #f) ;; return this on final failure, unless following option is enabled: + (final-failure-returns-actual #f) ;; on failure, on the last try, just return the result, not failure-value + + (retry-delay 0.1) ;; delay between tries + (back-off-factor 1) ;; multiply retry-delay by this factor on retry + (random-delay 0.1) ;; add a random portion of this value to wait + + (chatty #f) ;; print status as we go, for debugging. + ) + + (when chatty (print) (print "Entered retry-thunk") (print "-=-=-=-=-=-")) + (let* ((guarded-thunk ;; we are guarding the thunk against exceptions. We will record whether result of evaluation is an exception or a regular result. + (lambda () + (let* ((EXCEPTION (gensym)) ;; using gensym to avoid potential collision + (res + (condition-case + (the-thunk) ;; this is what we are guarding the execution of + [x () (cons EXCEPTION x)] + ))) + (cond + ((and (pair? res) (eq? (car res) EXCEPTION)) + (if chatty + (print " - the-thunk threw exception >"(cdr res)"<")) + (cons 'exception (cdr res))) + (else + (if chatty + (print " - the-thunk returned result >"res"<")) + (cons 'regular-result res))))))) + + (let loop ((guarded-res (guarded-thunk)) + (retries-left retries) + (fail-wait retry-delay)) + (if chatty (print " ==========")) + (let* ((wait-time (+ fail-wait (+ (* fail-wait back-off-factor) + (* random-delay + (/ (random 1024) 1024) )))) + (res-type (car guarded-res)) + (res-value (cdr guarded-res))) + (cond + ((and (eq? res-type 'regular-result) (accept-result? res-value)) + (if chatty (print " + return result that satisfied accept-result? >"res-value"<")) + res-value) + + ((> retries-left 0) + (if chatty (print " - sleep "wait-time)) + (thread-sleep! wait-time) + (if chatty (print " + retry ["retries-left" tries left]")) + (loop (guarded-thunk) + (sub1 retries-left) + wait-time)) + + ((eq? res-type 'regular-result) + (if final-failure-returns-actual + (begin + (if chatty (print " + last try failed- return the result >"res-value"<")) + res-value) + (begin + (if chatty (print " + last try failed- return canned failure value >"failure-value"<")) + failure-value))) + + (else ;; no retries left; result was not accepted and res-type can only be 'exception + (if final-failure-returns-actual + (begin + (if chatty (print " + last try failed with exception- re-throw it >"res-value"<")) + (abort res-value)); re-raise the exception. TODO: find a way for call-history to show as though from entry to this function + (begin + (if chatty (print " + last try failed with exception- return canned failure value >"failure-value"<")) + failure-value)))))))) (define (rpc-transport:server-shutdown server-id rpc:listener #!key (from-on-exit #f)) (BB> "rpc-transport:server-shutdown entered.") (on-exit (lambda () #t)) ;; turn off on-exit stuff @@ -64,12 +162,29 @@ (set! *time-to-exit* #t) (BB> "before db:sync-touched") (if *inmemdb* (db:sync-touched *inmemdb* *run-id* force-sync: #t)) (BB> "before bb-server-delete-record") (tasks:bb-server-delete-record server-id " rpc-transport:keep-running complete") - (BB> "Before (exit)") - (unless from-on-exit (exit)) + (BB> "Before (exit) (from-on-exit="from-on-exit")") + (unless from-on-exit (exit)) ;; sometimes we hang (around) here with 100% cpu. + (BB> "After") + ;; strace reveals endless: + ;; getrusage(RUSAGE_SELF, {ru_utime={413, 917868}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 9874}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 13874}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 105880}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 109880}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 201886}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 205886}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 297892}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 301892}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 393898}, ru_stime={0, 60003}, ...}) = 0 + ;; getrusage(RUSAGE_SELF, {ru_utime={414, 397898}, ru_stime={0, 60003}, ...}) = 0 + ;; make a post to chicken-users w/ http://paste.call-cc.org/paste?id=60a4b66a29ccf7d11359ea866db642c970735978 + (if from-on-exit + ;; avoid above condition! End current process externally since 1 in 20 (exit)'s result in hung, 100% cpu zombies. (see above) + (system (conc "kill -9 "(current-process-id)))) ) ;; all routes though here end in exit ... ;; @@ -93,10 +208,11 @@ ;; let's get a server-id for this server ;; if at first we do not suceed, try 3 more times. (let ((server-id (retry-thunk (lambda () (tasks:bb-server-lock-slot run-id 'rpc)) + chatty: #t retries: 4))) (when (not server-id) ;; dang we couldn't get a server-id. ;; since we didn't get the server lock we are going to clean up and bail out (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") (tasks:bb-server-delete-records-for-this-pid " rpc-transport:launch") @@ -114,10 +230,89 @@ (define *rpc-listener-port* #f) (define *rpc-listener-port-bind-timestamp* #f) (define *on-exit-flag #f) + +(define (rpc-transport:server-dat-get-iface vec) (vector-ref vec 0)) +(define (rpc-transport:server-dat-get-port vec) (vector-ref vec 1)) +(define (rpc-transport:server-dat-get-last-access vec) (vector-ref vec 5)) +(define (rpc-transport:server-dat-get-transport vec) (vector-ref vec 6)) +(define (rpc-transport:server-dat-update-last-access vec) + (if (vector? vec) + (vector-set! vec 5 (current-seconds)) + (begin + (print-call-chain (current-error-port)) + (debug:print-error 0 *default-log-port* "call to rpc-transport:server-dat-update-last-access with non-vector!!")))) + + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; this client-side procedure makes rpc call to server and returns result +;; +(define (rpc-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 3)) + (let* ((iface (rpc-transport:server-dat-get-iface serverdat)) + (port (rpc-transport:server-dat-get-port serverdat)) + (res #f) + (run-remote (rpc:procedure 'rpc-transport:autoremote iface port)) + (api-exec (rpc:procedure 'api-exec iface port)) + (send-receive (lambda () + (tcp-buffer-size 0) + (BB> "Entered SR run-id="run-id" cmd="cmd" params="params" iface="iface" port="port) + (set! res (retry-thunk + (lambda () + (condition-case + ;;(vector #t (run-remote cmd params)) + (vector 'success (api-exec cmd params)) + [x (exn i/o net) (vector 'comms-fail (conc "communications fail ["(->string x)"]") x)] + [x () (vector 'other-fail "other fail ["(->string x)"]" x)])) + chatty: #t + accept-result?: (lambda(x) + (and (vector? x) (vector-ref x 0))) + retries: 4 + back-off-factor: 1.5 + random-wait: 0.2 + retry-delay: 0.1 + final-failure-returns-actual: #t)) + (BB> "Leaving SR w/ "res) + res + )) + (th1 (make-thread send-receive "send-receive")) + (time-out-reached #f) + (time-out (lambda () + (thread-sleep! 45) + (set! time-out-reached #t) + (thread-terminate! th1) + #f)) + + (th2 (make-thread time-out "time out"))) + (thread-start! th1) + (thread-start! th2) + (thread-join! th1) + (thread-terminate! th2) + (debug:print-info 11 *default-log-port* "got res=" res) + (if (vector? res) + (case (vector-ref res 0) + ((success) (vector #t (vector-ref res 1))) + ((comms-fail) + (debug:print 0 *default-log-port* "WARNING: comms failure for rpc request") + ;;(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn)) + (vector #f (vector-ref res 1))) + (else + (BB> "res="res) + (debug:print-error 0 *default-log-port* "error occured at server, info=" (vector-ref res 1)) + (debug:print 0 *default-log-port* " client call chain:") + (print-call-chain (current-error-port)) + (debug:print 0 *default-log-port* " server call chain:") + (pp (vector-ref res 1) (current-error-port)) + (signal (vector-ref res 2)))) + (signal (make-composite-condition + (make-property-condition + 'timeout + 'message "nmsg-transport:client-api-send-receive-raw timed out talking to server")))))) + + + (define (rpc-transport:run hostn run-id server-id) (BB> "rpc-transport:run fired for hostn="hostn" run-id="run-id" server-id="server-id) (debug:print 2 *default-log-port* "Attempting to start the rpc server ...") ;; (trace rpc:publish-procedure!) @@ -134,23 +329,18 @@ "Just testing")) ;; procedure to receive arbitrary API request from client's rpc:send-receive/rpc-transport:client-api-send-receive (rpc:publish-procedure! 'rpc-transport:autoremote rpc-transport:autoremote) ;; can use this to run most anything at the remote - (rpc:publish-procedure! - 'remote:run - (lambda (procstr . params) - (server:autoremote procstr params))) + (rpc:publish-procedure! 'api-exec rpc-transport:api-exec) ;;====================================================================== ;; end of publish-procedure section ;;====================================================================== - - (BB> "flag1") (let* ((db #f) (hostname (let ((res (get-host-name))) (BB> "hostname="res) res)) (server-start-time (current-seconds)) (server-timeout (server:get-timeout)) (ipaddrstr (let* ((ipstr (if (string=? "-" hostn)