@@ -7,34 +7,34 @@ ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. ;;====================================================================== -(use json format) ;; RADT => purpose of json format?? +(use format typed-records) ;; RADT => purpose of json format?? (declare (unit rmt)) (declare (uses api)) (declare (uses tdb)) (declare (uses http-transport)) ;;(declare (uses nmsg-transport)) +(include "common_records.scm") + ;; ;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!! ;; -;; ;; For debugging add the following to ~/.megatestrc -;; -;; (require-library trace) -;; (import trace) -;; (trace -;; rmt:send-receive -;; api:execute-requests -;; ) - ;; generate entries for ~/.megatestrc with the following ;; ;; grep define ../rmt.scm | grep rmt: |perl -pi -e 's/\(define\s+\((\S+)\W.*$/\1/'|sort -u +(defstruct remote + (hh-dat (common:get-homehost)) ;; homehost record ( addr . hhflag ) + (server-url (if *toppath* (server:check-if-running *toppath*) #f)) + (last-server-check 0) ;; last time we checked to see if the server was alive + (conndat #f) + (transport *transport-type*) + (server-timeout (or (server:get-timeout) 100))) ;; default to 100 seconds ;;====================================================================== ;; S U P P O R T F U N C T I O N S ;;====================================================================== @@ -41,10 +41,15 @@ ;; if a server is either running or in the process of starting call client:setup ;; else return #f to let the calling proc know that there is no server available ;; (define (rmt:get-connection-info run-id) (let ((cinfo *runremote*)) ;; (hash-table-ref/default *runremote* run-id #f))) +;; how about if rrr is a defstruct and we use a wrapper to access it (even better would be a macro) +;; look in common_records for with-mutex +;; +;; (with-mutex *rrr-mutex* rmt:dat-host *runremote*) => returns value +;; (with-mutex *rrr-mutex* rmt: (if cinfo cinfo (if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id) (client:setup run-id) #f)))) @@ -52,128 +57,183 @@ (define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id ;; RA => e.g. usage (rmt:send-receive 'get-var #f (list varname)) ;; (define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected - ;; clean out old connections - ;; (mutex-lock! *db-multi-sync-mutex*) + + ;; do all the prep locked under the rmt-mutex + (mutex-lock! *rmt-mutex*) ;; 1. check if server is started IFF cmd is a write OR if we are not on the homehost, store in *runremote* ;; 2. check the age of the connections. refresh the connection if it is older than timeout-20 seconds. ;; 3. do the query, if on homehost use local access ;; - (if (and ;; #f ;; FORCE NO GO FOR RIGHT NOW - (not *runremote*) ;; we trust *runremote* to reflect that a server was found previously - (not (member cmd api:read-only-queries))) ;; we don't trust so much the list of write queries + (let* ((start-time (current-time))) ;; snapshot time so all use cases get same value + (cond + ;; give up if more than 15 attempts + ((> attemptnum 15) + (debug:print 0 *default-log-port* "ERROR: 15 tries to start/connect to server. Giving up.") + (exit 1)) + ;; ensure we have a record for our connection for given area + ((not *runremote*) + (set! *runremote* (make-remote)) + (mutex-unlock! *rmt-mutex*) + (rmt:send-receive cmd rid params attemptnum: attemptnum)) + ;; no server contact made and this is a write, try starting a server + ((and (not (remote-server-url *runremote*)) + (not (member cmd api:read-only-queries))) (let ((serverconn (server:check-if-running *toppath*))) (if serverconn - (set! *runremote* serverconn) ;; the string can be consumed by the client setup if needed + (remote-server-url-set! *runremote* serverconn) ;; the string can be consumed by the client setup if needed (if (not (server:start-attempted? *toppath*)) - (server:kind-run *toppath*))))) - - (rmt:open-qry-close-locally cmd (if rid rid 0) params)) - -;; (let ((expire-time (- (current-seconds) (server:get-timeout) 10))) ;; don't forget the 10 second margin -;; (for-each -;; (lambda (run-id) -;; (let ((connection (hash-table-ref/default *runremote* run-id #f))) -;; (if (and (vector? connection) -;; (< (http-transport:server-dat-get-last-access connection) expire-time)) -;; (begin -;; (debug:print-info 0 *default-log-port* "Discarding connection to server for run-id " run-id ", too long between accesses") -;; ;; bb- disabling nanomsg -;; ;; SHOULD CLOSE THE CONNECTION HERE -;; ;; (case *transport-type* -;; ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket -;; ;; (hash-table-ref *runremote* run-id))))) -;; (hash-table-delete! *runremote* run-id))))) -;; (hash-table-keys *runremote*))) -;; ;; (mutex-unlock! *db-multi-sync-mutex*) -;; ;; (mutex-lock! *send-receive-mutex*) -;; (let* ((run-id (if rid rid 0)) -;; (home-host (common:get-homehost)) -;; (connection-info (if (cdr home-host) ;; we are on the home-host -;; #f -;; (rmt:get-connection-info run-id)))) -;; (cond -;; (home-host (rmt:open-qry-close-locally cmd run-id params)) -;; (connection-info -;; ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) -;; ;; use the server if have connection info -;; (let* ((dat (case *transport-type* -;; ((http)(condition-case -;; (http-transport:client-api-send-receive run-id connection-info cmd params) -;; ((commfail)(vector #f "communications fail")) -;; ((exn)(vector #f "other fail")))) -;; ;; ((nmsg)(condition-case -;; ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd params) -;; ;; ((timeout)(vector #f "timeout talking to server")))) -;; (else (exit)))) -;; (success (if (vector? dat) (vector-ref dat 0) #f)) -;; (res (if (vector? dat) (vector-ref dat 1) #f))) -;; (if (vector? connection-info)(http-transport:server-dat-update-last-access connection-info)) -;; (if success -;; (begin -;; ;; (mutex-unlock! *send-receive-mutex*) -;; (case *transport-type* -;; ((http) res) ;; (db:string->obj res)) -;; ;; ((nmsg) res) -;; )) ;; (vector-ref res 1))) -;; (begin ;; let ((new-connection-info (client:setup run-id))) -;; (debug:print 0 *default-log-port* "WARNING: Communication failed, trying call to rmt:send-receive again.") -;; ;; (case *transport-type* -;; ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info)))) -;; (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection -;; ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. -;; ;; (if (eq? (modulo attemptnum 5) 0) -;; ;; (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) -;; ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications -;; (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) -;; ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) -;; -;; ;; no longer killing the server in http-transport:client-api-send-receive -;; ;; may kill it here but what are the criteria? -;; ;; start with three calls then kill server -;; ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) -;; ;; (thread-sleep! 2) -;; (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))))) -;; (else -;; ;; no connection info? try to start a server, or access locally if no -;; ;; server and the query is read-only -;; ;; -;; ;; Note: The tasks db was checked for a server in starting mode in the rmt:get-connection-info call -;; ;; -;; (if (and (< attemptnum 15) -;; (member cmd api:write-queries)) -;; (let ((homehost (common:get-homehost))) ;; faststart (configf:lookup *configdat* "server" "faststart"))) -;; (hash-table-delete! *runremote* run-id) -;; ;; (mutex-unlock! *send-receive-mutex*) -;; (if (not (cdr homehost)) ;; we always require a server if not on homehost ;; (and faststart (equal? faststart "no")) -;; (begin -;; (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10) -;; (thread-sleep! (random 5)) ;; give some time to settle and minimize collison? -;; (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))) -;; ;; NB - probably can remove the query time stuff but need to discuss it .... -;; (let ((start-time (current-milliseconds)) -;; (max-query (string->number (or (configf:lookup *configdat* "server" "server-query-threshold") -;; "300"))) -;; (newres (rmt:open-qry-close-locally cmd run-id params))) -;; (let ((delta (- (current-milliseconds) start-time))) -;; (if (> delta max-query) -;; (begin -;; (debug:print-info 0 *default-log-port* "WARNING: long query times, you may have an overloaded homehost.") ;; Starting server as query time " delta " is over the limit of " max-query) -;; ;; (server:kind-run run-id))) -;; )) -;; ;; return the result! -;; newres) -;; ))) -;; (begin -;; ;; (debug:print-error 0 *default-log-port* "Communication failed!") -;; ;; (mutex-unlock! *send-receive-mutex*) -;; ;; (exit) -;; (rmt:open-qry-close-locally cmd run-id params) -;; )))))) + (server:kind-run *toppath*)))) + (mutex-unlock! *rmt-mutex*) + (rmt:send-receive cmd rid params attemptnum: attemptnum)) + ;; if not on homehost ensure we have a connection to a live server + ((or (not (pair? (remote-hh-dat *runremote*))) ;; have a homehost record? + (not (cdr (remote-hh-dat *runremote*))) ;; have record, are we on a homehost? + (not (remote-conndat *runremote*))) ;; do we not have a connection? + (remote-hh-dat-set! *runremote* (common:get-homehost)) + (remote-conndat-set! *runremote* (rmt:get-connection-info 0)) + (mutex-unlock! *rmt-mutex*) + (server:kind-run *toppath*) ;; we need a sever + (rmt:send-receive cmd rid params attemptnum: attemptnum)) + ;; all set up if get this far, dispatch the query + ((cdr (remote-hh-dat *runremote*)) ;; we are on homehost + (mutex-unlock! *rmt-mutex*) + (rmt:open-qry-close-locally cmd (if rid rid 0) params) ) + ;; reset the connection if it has been unused too long + ;(> (- start-time (remote-last-server-check *runremote*)) + ;(remote-server-timeout *runremote*))) ;; we have timed out for this connection + ;; not on homehost, do server query + (else + (mutex-unlock! *rmt-mutex*) + (let* ((dat (case (remote-transport *runremote*) + ((http)(condition-case + (http-transport:client-api-send-receive run-id connection-info cmd params) + ((commfail)(vector #f "communications fail")) + ((exn)(vector #f "other fail")))) + (else + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported") + (exit)))) + (success (if (vector? dat) (vector-ref dat 0) #f)) + (res (if (vector? dat) (vector-ref dat 1) #f))) + (if (vector? connection-info)(http-transport:server-dat-update-last-access connection-info)) ;; refresh access time + (if (and success res) + (case (remote-transport *runremote*) + ((http) res) + (else + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " is unknown") + (exit 1))) + (begin + (debug:print 0 *default-log-port* "WARNING: communication failed. Trying again, try num: " attemptnum) + (remote-conndat-set! *runremote* #f) + (server-url-set! *runremote* #f) + (tasks:start-and-wait-for-server (tasks:open-db) 0 15) + (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))))))))) + +(define (junk-delete-me) + (let ((expire-time (- (current-seconds) (server:get-timeout) 10))) ;; don't forget the 10 second margin + (for-each + (lambda (run-id) + (let ((connection (hash-table-ref/default *runremote* run-id #f))) + (if (and (vector? connection) + (< (http-transport:server-dat-get-last-access connection) expire-time)) + (begin + (debug:print-info 0 *default-log-port* "Discarding connection to server for run-id " run-id ", too long between accesses") + ;; bb- disabling nanomsg + ;; SHOULD CLOSE THE CONNECTION HERE + ;; (case *transport-type* + ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket + ;; (hash-table-ref *runremote* run-id))))) + (hash-table-delete! *runremote* run-id))))) + (hash-table-keys *runremote*))) + ;; (mutex-unlock! *db-multi-sync-mutex*) + ;; (mutex-lock! *send-receive-mutex*) + (let* ((run-id (if rid rid 0)) + (home-host (common:get-homehost)) + (connection-info (if (cdr home-host) ;; we are on the home-host + #f + (rmt:get-connection-info run-id)))) + (cond + (home-host (rmt:open-qry-close-locally cmd run-id params)) + (connection-info + ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) + ;; use the server if have connection info + (let* ((dat (case *transport-type* + ((http)(condition-case + (http-transport:client-api-send-receive run-id connection-info cmd params) + ((commfail)(vector #f "communications fail")) + ((exn)(vector #f "other fail")))) + ;; ((nmsg)(condition-case + ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd params) + ;; ((timeout)(vector #f "timeout talking to server")))) + (else (exit)))) + (success (if (vector? dat) (vector-ref dat 0) #f)) + (res (if (vector? dat) (vector-ref dat 1) #f))) + (if (vector? connection-info)(http-transport:server-dat-update-last-access connection-info)) + (if success + (begin + ;; (mutex-unlock! *send-receive-mutex*) + (case *transport-type* + ((http) res) ;; (db:string->obj res)) + ;; ((nmsg) res) + )) ;; (vector-ref res 1))) + (begin ;; let ((new-connection-info (client:setup run-id))) + (debug:print 0 *default-log-port* "WARNING: Communication failed, trying call to rmt:send-receive again.") + ;; (case *transport-type* + ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info)))) + (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection + ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. + ;; (if (eq? (modulo attemptnum 5) 0) + ;; (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) + ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications + (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) + ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) + + ;; no longer killing the server in http-transport:client-api-send-receive + ;; may kill it here but what are the criteria? + ;; start with three calls then kill server + ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) + ;; (thread-sleep! 2) + (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))))) + (else + ;; no connection info? try to start a server, or access locally if no + ;; server and the query is read-only + ;; + ;; Note: The tasks db was checked for a server in starting mode in the rmt:get-connection-info call + ;; + (if (and (< attemptnum 15) + (member cmd api:write-queries)) + (let ((homehost (common:get-homehost))) ;; faststart (configf:lookup *configdat* "server" "faststart"))) + (hash-table-delete! *runremote* run-id) + ;; (mutex-unlock! *send-receive-mutex*) + (if (not (cdr homehost)) ;; we always require a server if not on homehost ;; (and faststart (equal? faststart "no")) + (begin + (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10) + (thread-sleep! (random 5)) ;; give some time to settle and minimize collison? + (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))) + ;; NB - probably can remove the query time stuff but need to discuss it .... + (let ((start-time (current-milliseconds)) + (max-query (string->number (or (configf:lookup *configdat* "server" "server-query-threshold") + "300"))) + (newres (rmt:open-qry-close-locally cmd run-id params))) + (let ((delta (- (current-milliseconds) start-time))) + (if (> delta max-query) + (begin + (debug:print-info 0 *default-log-port* "WARNING: long query times, you may have an overloaded homehost.") ;; Starting server as query time " delta " is over the limit of " max-query) + ;; (server:kind-run run-id))) + )) + ;; return the result! + newres) + ))) + (begin + ;; (debug:print-error 0 *default-log-port* "Communication failed!") + ;; (mutex-unlock! *send-receive-mutex*) + ;; (exit) + (rmt:open-qry-close-locally cmd run-id params) + )))))) (define (rmt:update-db-stats run-id rawcmd params duration) (mutex-lock! *db-stats-mutex*) (handle-exceptions exn @@ -277,20 +337,20 @@ ;; (db:string->obj (vector-ref dat 1)) ;; (begin ;; (debug:print-error 0 *default-log-port* "rmt:send-receive-no-auto-client-setup failed, attempting to continue. Got " dat) ;; dat)))) -;; Wrap json library for strings (why the ports crap in the first place?) -(define (rmt:dat->json-str dat) - (with-output-to-string - (lambda () - (json-write dat)))) - -(define (rmt:json-str->dat json-str) - (with-input-from-string json-str - (lambda () - (json-read)))) +;; ;; Wrap json library for strings (why the ports crap in the first place?) +;; (define (rmt:dat->json-str dat) +;; (with-output-to-string +;; (lambda () +;; (json-write dat)))) +;; +;; (define (rmt:json-str->dat json-str) +;; (with-input-from-string json-str +;; (lambda () +;; (json-read)))) ;;====================================================================== ;; ;; A C T U A L A P I C A L L S ;;