Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -65,11 +65,11 @@ ;; 1. check if server is started IFF cmd is a write OR if we are not on the homehost, store in *runremote* ;; 2. check the age of the connections. refresh the connection if it is older than timeout-20 seconds. ;; 3. do the query, if on homehost use local access ;; - (let* ((start-time (current-time))) ;; snapshot time so all use cases get same value + (let* ((start-time (current-seconds))) ;; snapshot time so all use cases get same value (cond ;; give up if more than 15 attempts ((> attemptnum 15) (debug:print 0 *default-log-port* "ERROR: 15 tries to start/connect to server. Giving up.") (exit 1)) @@ -100,26 +100,29 @@ ;; all set up if get this far, dispatch the query ((cdr (remote-hh-dat *runremote*)) ;; we are on homehost (mutex-unlock! *rmt-mutex*) (rmt:open-qry-close-locally cmd (if rid rid 0) params) ) ;; reset the connection if it has been unused too long - ;(> (- start-time (remote-last-server-check *runremote*)) - ;(remote-server-timeout *runremote*))) ;; we have timed out for this connection + ((and (remote-conndat *runremote*) + (let ((expire-time (- start-time (remote-server-timeout *runremote*)))) + (< (http-transport:server-dat-get-last-access connection) expire-time))) + (remote-conndatr *runremote* #f)) ;; not on homehost, do server query (else (mutex-unlock! *rmt-mutex*) - (let* ((dat (case (remote-transport *runremote*) + (let* ((conninfo (remote-conndat *runremote*)) + (dat (case (remote-transport *runremote*) ((http)(condition-case - (http-transport:client-api-send-receive run-id connection-info cmd params) + (http-transport:client-api-send-receive run-id conninfo cmd params) ((commfail)(vector #f "communications fail")) ((exn)(vector #f "other fail")))) (else (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported") (exit)))) (success (if (vector? dat) (vector-ref dat 0) #f)) (res (if (vector? dat) (vector-ref dat 1) #f))) - (if (vector? connection-info)(http-transport:server-dat-update-last-access connection-info)) ;; refresh access time + (if (vector? conninfo)(http-transport:server-dat-update-last-access conninfo)) ;; refresh access time (if (and success res) (case (remote-transport *runremote*) ((http) res) (else (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " is unknown") @@ -129,111 +132,10 @@ (remote-conndat-set! *runremote* #f) (server-url-set! *runremote* #f) (tasks:start-and-wait-for-server (tasks:open-db) 0 15) (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))))))))) -(define (junk-delete-me) - (let ((expire-time (- (current-seconds) (server:get-timeout) 10))) ;; don't forget the 10 second margin - (for-each - (lambda (run-id) - (let ((connection (hash-table-ref/default *runremote* run-id #f))) - (if (and (vector? connection) - (< (http-transport:server-dat-get-last-access connection) expire-time)) - (begin - (debug:print-info 0 *default-log-port* "Discarding connection to server for run-id " run-id ", too long between accesses") - ;; bb- disabling nanomsg - ;; SHOULD CLOSE THE CONNECTION HERE - ;; (case *transport-type* - ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket - ;; (hash-table-ref *runremote* run-id))))) - (hash-table-delete! *runremote* run-id))))) - (hash-table-keys *runremote*))) - ;; (mutex-unlock! *db-multi-sync-mutex*) - ;; (mutex-lock! *send-receive-mutex*) - (let* ((run-id (if rid rid 0)) - (home-host (common:get-homehost)) - (connection-info (if (cdr home-host) ;; we are on the home-host - #f - (rmt:get-connection-info run-id)))) - (cond - (home-host (rmt:open-qry-close-locally cmd run-id params)) - (connection-info - ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) - ;; use the server if have connection info - (let* ((dat (case *transport-type* - ((http)(condition-case - (http-transport:client-api-send-receive run-id connection-info cmd params) - ((commfail)(vector #f "communications fail")) - ((exn)(vector #f "other fail")))) - ;; ((nmsg)(condition-case - ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd params) - ;; ((timeout)(vector #f "timeout talking to server")))) - (else (exit)))) - (success (if (vector? dat) (vector-ref dat 0) #f)) - (res (if (vector? dat) (vector-ref dat 1) #f))) - (if (vector? connection-info)(http-transport:server-dat-update-last-access connection-info)) - (if success - (begin - ;; (mutex-unlock! *send-receive-mutex*) - (case *transport-type* - ((http) res) ;; (db:string->obj res)) - ;; ((nmsg) res) - )) ;; (vector-ref res 1))) - (begin ;; let ((new-connection-info (client:setup run-id))) - (debug:print 0 *default-log-port* "WARNING: Communication failed, trying call to rmt:send-receive again.") - ;; (case *transport-type* - ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info)))) - (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection - ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. - ;; (if (eq? (modulo attemptnum 5) 0) - ;; (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) - ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications - (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) - ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) - - ;; no longer killing the server in http-transport:client-api-send-receive - ;; may kill it here but what are the criteria? - ;; start with three calls then kill server - ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) - ;; (thread-sleep! 2) - (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))))) - (else - ;; no connection info? try to start a server, or access locally if no - ;; server and the query is read-only - ;; - ;; Note: The tasks db was checked for a server in starting mode in the rmt:get-connection-info call - ;; - (if (and (< attemptnum 15) - (member cmd api:write-queries)) - (let ((homehost (common:get-homehost))) ;; faststart (configf:lookup *configdat* "server" "faststart"))) - (hash-table-delete! *runremote* run-id) - ;; (mutex-unlock! *send-receive-mutex*) - (if (not (cdr homehost)) ;; we always require a server if not on homehost ;; (and faststart (equal? faststart "no")) - (begin - (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10) - (thread-sleep! (random 5)) ;; give some time to settle and minimize collison? - (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))) - ;; NB - probably can remove the query time stuff but need to discuss it .... - (let ((start-time (current-milliseconds)) - (max-query (string->number (or (configf:lookup *configdat* "server" "server-query-threshold") - "300"))) - (newres (rmt:open-qry-close-locally cmd run-id params))) - (let ((delta (- (current-milliseconds) start-time))) - (if (> delta max-query) - (begin - (debug:print-info 0 *default-log-port* "WARNING: long query times, you may have an overloaded homehost.") ;; Starting server as query time " delta " is over the limit of " max-query) - ;; (server:kind-run run-id))) - )) - ;; return the result! - newres) - ))) - (begin - ;; (debug:print-error 0 *default-log-port* "Communication failed!") - ;; (mutex-unlock! *send-receive-mutex*) - ;; (exit) - (rmt:open-qry-close-locally cmd run-id params) - )))))) (define (rmt:update-db-stats run-id rawcmd params duration) (mutex-lock! *db-stats-mutex*) (handle-exceptions exn