@@ -54,104 +54,112 @@ ;; RA => e.g. usage (rmt:send-receive 'get-var #f (list varname)) ;; (define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected ;; clean out old connections ;; (mutex-lock! *db-multi-sync-mutex*) - (let ((expire-time (- (current-seconds) (server:get-timeout) 10))) ;; don't forget the 10 second margin - (for-each - (lambda (run-id) - (let ((connection (hash-table-ref/default *runremote* run-id #f))) - (if (and (vector? connection) - (< (http-transport:server-dat-get-last-access connection) expire-time)) - (begin - (debug:print-info 0 *default-log-port* "Discarding connection to server for run-id " run-id ", too long between accesses") - ;; bb- disabling nanomsg - ;; SHOULD CLOSE THE CONNECTION HERE - ;; (case *transport-type* - ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket - ;; (hash-table-ref *runremote* run-id))))) - (hash-table-delete! *runremote* run-id))))) - (hash-table-keys *runremote*))) - ;; (mutex-unlock! *db-multi-sync-mutex*) - ;; (mutex-lock! *send-receive-mutex*) - (let* ((run-id (if rid rid 0)) - (connection-info (rmt:get-connection-info run-id))) - ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) - (if connection-info - ;; use the server if have connection info - (let* ((dat (case *transport-type* - ((http)(condition-case - (http-transport:client-api-send-receive run-id connection-info cmd params) - ((commfail)(vector #f "communications fail")) - ((exn)(vector #f "other fail")))) - ;; ((nmsg)(condition-case - ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd params) - ;; ((timeout)(vector #f "timeout talking to server")))) - (else (exit)))) - (success (if (vector? dat) (vector-ref dat 0) #f)) - (res (if (vector? dat) (vector-ref dat 1) #f))) - (if (vector? connection-info)(http-transport:server-dat-update-last-access connection-info)) - (if success - (begin - ;; (mutex-unlock! *send-receive-mutex*) - (case *transport-type* - ((http) res) ;; (db:string->obj res)) - ;; ((nmsg) res) - )) ;; (vector-ref res 1))) - (begin ;; let ((new-connection-info (client:setup run-id))) - (debug:print 0 *default-log-port* "WARNING: Communication failed, trying call to rmt:send-receive again.") - ;; (case *transport-type* - ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info)))) - (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection - ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. - ;; (if (eq? (modulo attemptnum 5) 0) - ;; (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) - ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications - (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) - ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) - - ;; no longer killing the server in http-transport:client-api-send-receive - ;; may kill it here but what are the criteria? - ;; start with three calls then kill server - ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) - ;; (thread-sleep! 2) - (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1))))) - ;; no connection info? try to start a server, or access locally if no - ;; server and the query is read-only - ;; - ;; Note: The tasks db was checked for a server in starting mode in the rmt:get-connection-info call - ;; - (if (and (< attemptnum 15) - (member cmd api:write-queries)) - (let ((homehost (common:get-homehost))) ;; faststart (configf:lookup *configdat* "server" "faststart"))) - (hash-table-delete! *runremote* run-id) - ;; (mutex-unlock! *send-receive-mutex*) - (if (not (cdr homehost)) ;; we always require a server if not on homehost ;; (and faststart (equal? faststart "no")) - (begin - (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10) - (thread-sleep! (random 5)) ;; give some time to settle and minimize collison? - (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))) - ;; NB - probably can remove the query time stuff but need to discuss it .... - (let ((start-time (current-milliseconds)) - (max-query (string->number (or (configf:lookup *configdat* "server" "server-query-threshold") - "300"))) - (newres (rmt:open-qry-close-locally cmd run-id params))) - (let ((delta (- (current-milliseconds) start-time))) - (if (> delta max-query) - (begin - (debug:print-info 0 *default-log-port* "WARNING: long query times, you may have an overloaded homehost.") ;; Starting server as query time " delta " is over the limit of " max-query) - ;; (server:kind-run run-id))) - )) - ;; return the result! - newres) - ))) - (begin - ;; (debug:print-error 0 *default-log-port* "Communication failed!") - ;; (mutex-unlock! *send-receive-mutex*) - ;; (exit) - (rmt:open-qry-close-locally cmd run-id params) - ))))) + (rmt:open-qry-close-locally cmd (if rid rid 0) params)) + +;; (let ((expire-time (- (current-seconds) (server:get-timeout) 10))) ;; don't forget the 10 second margin +;; (for-each +;; (lambda (run-id) +;; (let ((connection (hash-table-ref/default *runremote* run-id #f))) +;; (if (and (vector? connection) +;; (< (http-transport:server-dat-get-last-access connection) expire-time)) +;; (begin +;; (debug:print-info 0 *default-log-port* "Discarding connection to server for run-id " run-id ", too long between accesses") +;; ;; bb- disabling nanomsg +;; ;; SHOULD CLOSE THE CONNECTION HERE +;; ;; (case *transport-type* +;; ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket +;; ;; (hash-table-ref *runremote* run-id))))) +;; (hash-table-delete! *runremote* run-id))))) +;; (hash-table-keys *runremote*))) +;; ;; (mutex-unlock! *db-multi-sync-mutex*) +;; ;; (mutex-lock! *send-receive-mutex*) +;; (let* ((run-id (if rid rid 0)) +;; (home-host (common:get-homehost)) +;; (connection-info (if (cdr home-host) ;; we are on the home-host +;; #f +;; (rmt:get-connection-info run-id)))) +;; (cond +;; (home-host (rmt:open-qry-close-locally cmd run-id params)) +;; (connection-info +;; ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) +;; ;; use the server if have connection info +;; (let* ((dat (case *transport-type* +;; ((http)(condition-case +;; (http-transport:client-api-send-receive run-id connection-info cmd params) +;; ((commfail)(vector #f "communications fail")) +;; ((exn)(vector #f "other fail")))) +;; ;; ((nmsg)(condition-case +;; ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd params) +;; ;; ((timeout)(vector #f "timeout talking to server")))) +;; (else (exit)))) +;; (success (if (vector? dat) (vector-ref dat 0) #f)) +;; (res (if (vector? dat) (vector-ref dat 1) #f))) +;; (if (vector? connection-info)(http-transport:server-dat-update-last-access connection-info)) +;; (if success +;; (begin +;; ;; (mutex-unlock! *send-receive-mutex*) +;; (case *transport-type* +;; ((http) res) ;; (db:string->obj res)) +;; ;; ((nmsg) res) +;; )) ;; (vector-ref res 1))) +;; (begin ;; let ((new-connection-info (client:setup run-id))) +;; (debug:print 0 *default-log-port* "WARNING: Communication failed, trying call to rmt:send-receive again.") +;; ;; (case *transport-type* +;; ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info)))) +;; (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection +;; ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. +;; ;; (if (eq? (modulo attemptnum 5) 0) +;; ;; (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) +;; ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications +;; (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) +;; ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) +;; +;; ;; no longer killing the server in http-transport:client-api-send-receive +;; ;; may kill it here but what are the criteria? +;; ;; start with three calls then kill server +;; ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) +;; ;; (thread-sleep! 2) +;; (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))))) +;; (else +;; ;; no connection info? try to start a server, or access locally if no +;; ;; server and the query is read-only +;; ;; +;; ;; Note: The tasks db was checked for a server in starting mode in the rmt:get-connection-info call +;; ;; +;; (if (and (< attemptnum 15) +;; (member cmd api:write-queries)) +;; (let ((homehost (common:get-homehost))) ;; faststart (configf:lookup *configdat* "server" "faststart"))) +;; (hash-table-delete! *runremote* run-id) +;; ;; (mutex-unlock! *send-receive-mutex*) +;; (if (not (cdr homehost)) ;; we always require a server if not on homehost ;; (and faststart (equal? faststart "no")) +;; (begin +;; (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10) +;; (thread-sleep! (random 5)) ;; give some time to settle and minimize collison? +;; (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))) +;; ;; NB - probably can remove the query time stuff but need to discuss it .... +;; (let ((start-time (current-milliseconds)) +;; (max-query (string->number (or (configf:lookup *configdat* "server" "server-query-threshold") +;; "300"))) +;; (newres (rmt:open-qry-close-locally cmd run-id params))) +;; (let ((delta (- (current-milliseconds) start-time))) +;; (if (> delta max-query) +;; (begin +;; (debug:print-info 0 *default-log-port* "WARNING: long query times, you may have an overloaded homehost.") ;; Starting server as query time " delta " is over the limit of " max-query) +;; ;; (server:kind-run run-id))) +;; )) +;; ;; return the result! +;; newres) +;; ))) +;; (begin +;; ;; (debug:print-error 0 *default-log-port* "Communication failed!") +;; ;; (mutex-unlock! *send-receive-mutex*) +;; ;; (exit) +;; (rmt:open-qry-close-locally cmd run-id params) +;; )))))) (define (rmt:update-db-stats run-id rawcmd params duration) (mutex-lock! *db-stats-mutex*) (handle-exceptions exn