Index: Makefile ================================================================== --- Makefile +++ Makefile @@ -72,11 +72,11 @@ tests.o runs.o dashboard.o dashboard-tests.o dashboard-main.o : run_records.scm db.o ezsteps.o keys.o launch.o megatest.o monitor.o runs-for-ref.o runs.o tests.o : key_records.scm tests.o tasks.o dashboard-tasks.o : task_records.scm runs.o : test_records.scm megatest.o : megatest-fossil-hash.scm -client.scm common.scm configf.scm dashboard-guimonitor.scm dashboard-tests.scm dashboard.scm db.scm dcommon.scm ezsteps.scm fs-transport.scm http-transport.scm index-tree.scm items.scm keys.scm launch.scm megatest.scm monitor.scm mt.scm newdashboard.scm runconfig.scm runs.scm server.scm tdb.scm tests.scm tree.scm : common_records.scm rpc-transport.scm +rmt.scm client.scm common.scm configf.scm dashboard-guimonitor.scm dashboard-tests.scm dashboard.scm db.scm dcommon.scm ezsteps.scm fs-transport.scm http-transport.scm index-tree.scm items.scm keys.scm launch.scm megatest.scm monitor.scm mt.scm newdashboard.scm runconfig.scm runs.scm server.scm tdb.scm tests.scm tree.scm : common_records.scm rpc-transport.scm common_records.scm : altdb.scm vg.o dashboard.o : vg_records.scm dcommon.o : run_records.scm # Temporary while transitioning to new routine # runs.o : run-tests-queue-classic.scm run-tests-queue-new.scm Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -116,10 +116,13 @@ (define *server-kind-run* (make-hash-table)) (define *home-host* #f) (define *total-non-write-delay* 0) (define *heartbeat-mutex* (make-mutex)) +;; client +(define *rmt-mutex* (make-mutex)) ;; remote access calls mutex + ;; RPC transport (define *rpc:listener* #f) ;; KEY info (define *target* (make-hash-table)) ;; cache the target here; target is keyval1/keyval2/.../keyvalN Index: common_records.scm ================================================================== --- common_records.scm +++ common_records.scm @@ -43,10 +43,19 @@ (lambda () (print ((condition-property-accessor 'exn 'message) exn)) (print "Callback error in " procname) (print "Full condition info:\n" (condition->list exn))))) (proc))) + +;; Need a mutex protected way to get and set values +;; or use (define-simple-syntax ?? +;; +(define-inline (with-mutex mtx accessor record . val) + (mutex-lock! mtx) + (let ((res (apply accessor record val))) + (mutex-unlock! mtx) + res)) ;; this was cached based on results from profiling but it turned out the profiling ;; somehow went wrong - perhaps too many processes writing to it. Leaving the caching ;; in for now but can probably take it out later. ;; Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -7,34 +7,34 @@ ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. ;;====================================================================== -(use json format) ;; RADT => purpose of json format?? +(use format typed-records) ;; RADT => purpose of json format?? (declare (unit rmt)) (declare (uses api)) (declare (uses tdb)) (declare (uses http-transport)) ;;(declare (uses nmsg-transport)) +(include "common_records.scm") + ;; ;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!! ;; -;; ;; For debugging add the following to ~/.megatestrc -;; -;; (require-library trace) -;; (import trace) -;; (trace -;; rmt:send-receive -;; api:execute-requests -;; ) - ;; generate entries for ~/.megatestrc with the following ;; ;; grep define ../rmt.scm | grep rmt: |perl -pi -e 's/\(define\s+\((\S+)\W.*$/\1/'|sort -u +(defstruct remote + (hh-dat (common:get-homehost)) ;; homehost record ( addr . hhflag ) + (server-url (if *toppath* (server:check-if-running *toppath*) #f)) + (last-server-check 0) ;; last time we checked to see if the server was alive + (conndat #f) + (transport *transport-type*) + (server-timeout (or (server:get-timeout) 100))) ;; default to 100 seconds ;;====================================================================== ;; S U P P O R T F U N C T I O N S ;;====================================================================== @@ -41,10 +41,15 @@ ;; if a server is either running or in the process of starting call client:setup ;; else return #f to let the calling proc know that there is no server available ;; (define (rmt:get-connection-info run-id) (let ((cinfo *runremote*)) ;; (hash-table-ref/default *runremote* run-id #f))) +;; how about if rrr is a defstruct and we use a wrapper to access it (even better would be a macro) +;; look in common_records for with-mutex +;; +;; (with-mutex *rrr-mutex* rmt:dat-host *runremote*) => returns value +;; (with-mutex *rrr-mutex* rmt: (if cinfo cinfo (if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id) (client:setup run-id) #f)))) @@ -52,128 +57,183 @@ (define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id ;; RA => e.g. usage (rmt:send-receive 'get-var #f (list varname)) ;; (define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected - ;; clean out old connections - ;; (mutex-lock! *db-multi-sync-mutex*) + + ;; do all the prep locked under the rmt-mutex + (mutex-lock! *rmt-mutex*) ;; 1. check if server is started IFF cmd is a write OR if we are not on the homehost, store in *runremote* ;; 2. check the age of the connections. refresh the connection if it is older than timeout-20 seconds. ;; 3. do the query, if on homehost use local access ;; - (if (and ;; #f ;; FORCE NO GO FOR RIGHT NOW - (not *runremote*) ;; we trust *runremote* to reflect that a server was found previously - (not (member cmd api:read-only-queries))) ;; we don't trust so much the list of write queries + (let* ((start-time (current-time))) ;; snapshot time so all use cases get same value + (cond + ;; give up if more than 15 attempts + ((> attemptnum 15) + (debug:print 0 *default-log-port* "ERROR: 15 tries to start/connect to server. Giving up.") + (exit 1)) + ;; ensure we have a record for our connection for given area + ((not *runremote*) + (set! *runremote* (make-remote)) + (mutex-unlock! *rmt-mutex*) + (rmt:send-receive cmd rid params attemptnum: attemptnum)) + ;; no server contact made and this is a write, try starting a server + ((and (not (remote-server-url *runremote*)) + (not (member cmd api:read-only-queries))) (let ((serverconn (server:check-if-running *toppath*))) (if serverconn - (set! *runremote* serverconn) ;; the string can be consumed by the client setup if needed + (remote-server-url-set! *runremote* serverconn) ;; the string can be consumed by the client setup if needed (if (not (server:start-attempted? *toppath*)) - (server:kind-run *toppath*))))) - - (rmt:open-qry-close-locally cmd (if rid rid 0) params)) - -;; (let ((expire-time (- (current-seconds) (server:get-timeout) 10))) ;; don't forget the 10 second margin -;; (for-each -;; (lambda (run-id) -;; (let ((connection (hash-table-ref/default *runremote* run-id #f))) -;; (if (and (vector? connection) -;; (< (http-transport:server-dat-get-last-access connection) expire-time)) -;; (begin -;; (debug:print-info 0 *default-log-port* "Discarding connection to server for run-id " run-id ", too long between accesses") -;; ;; bb- disabling nanomsg -;; ;; SHOULD CLOSE THE CONNECTION HERE -;; ;; (case *transport-type* -;; ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket -;; ;; (hash-table-ref *runremote* run-id))))) -;; (hash-table-delete! *runremote* run-id))))) -;; (hash-table-keys *runremote*))) -;; ;; (mutex-unlock! *db-multi-sync-mutex*) -;; ;; (mutex-lock! *send-receive-mutex*) -;; (let* ((run-id (if rid rid 0)) -;; (home-host (common:get-homehost)) -;; (connection-info (if (cdr home-host) ;; we are on the home-host -;; #f -;; (rmt:get-connection-info run-id)))) -;; (cond -;; (home-host (rmt:open-qry-close-locally cmd run-id params)) -;; (connection-info -;; ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) -;; ;; use the server if have connection info -;; (let* ((dat (case *transport-type* -;; ((http)(condition-case -;; (http-transport:client-api-send-receive run-id connection-info cmd params) -;; ((commfail)(vector #f "communications fail")) -;; ((exn)(vector #f "other fail")))) -;; ;; ((nmsg)(condition-case -;; ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd params) -;; ;; ((timeout)(vector #f "timeout talking to server")))) -;; (else (exit)))) -;; (success (if (vector? dat) (vector-ref dat 0) #f)) -;; (res (if (vector? dat) (vector-ref dat 1) #f))) -;; (if (vector? connection-info)(http-transport:server-dat-update-last-access connection-info)) -;; (if success -;; (begin -;; ;; (mutex-unlock! *send-receive-mutex*) -;; (case *transport-type* -;; ((http) res) ;; (db:string->obj res)) -;; ;; ((nmsg) res) -;; )) ;; (vector-ref res 1))) -;; (begin ;; let ((new-connection-info (client:setup run-id))) -;; (debug:print 0 *default-log-port* "WARNING: Communication failed, trying call to rmt:send-receive again.") -;; ;; (case *transport-type* -;; ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info)))) -;; (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection -;; ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. -;; ;; (if (eq? (modulo attemptnum 5) 0) -;; ;; (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) -;; ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications -;; (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) -;; ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) -;; -;; ;; no longer killing the server in http-transport:client-api-send-receive -;; ;; may kill it here but what are the criteria? -;; ;; start with three calls then kill server -;; ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) -;; ;; (thread-sleep! 2) -;; (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))))) -;; (else -;; ;; no connection info? try to start a server, or access locally if no -;; ;; server and the query is read-only -;; ;; -;; ;; Note: The tasks db was checked for a server in starting mode in the rmt:get-connection-info call -;; ;; -;; (if (and (< attemptnum 15) -;; (member cmd api:write-queries)) -;; (let ((homehost (common:get-homehost))) ;; faststart (configf:lookup *configdat* "server" "faststart"))) -;; (hash-table-delete! *runremote* run-id) -;; ;; (mutex-unlock! *send-receive-mutex*) -;; (if (not (cdr homehost)) ;; we always require a server if not on homehost ;; (and faststart (equal? faststart "no")) -;; (begin -;; (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10) -;; (thread-sleep! (random 5)) ;; give some time to settle and minimize collison? -;; (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))) -;; ;; NB - probably can remove the query time stuff but need to discuss it .... -;; (let ((start-time (current-milliseconds)) -;; (max-query (string->number (or (configf:lookup *configdat* "server" "server-query-threshold") -;; "300"))) -;; (newres (rmt:open-qry-close-locally cmd run-id params))) -;; (let ((delta (- (current-milliseconds) start-time))) -;; (if (> delta max-query) -;; (begin -;; (debug:print-info 0 *default-log-port* "WARNING: long query times, you may have an overloaded homehost.") ;; Starting server as query time " delta " is over the limit of " max-query) -;; ;; (server:kind-run run-id))) -;; )) -;; ;; return the result! -;; newres) -;; ))) -;; (begin -;; ;; (debug:print-error 0 *default-log-port* "Communication failed!") -;; ;; (mutex-unlock! *send-receive-mutex*) -;; ;; (exit) -;; (rmt:open-qry-close-locally cmd run-id params) -;; )))))) + (server:kind-run *toppath*)))) + (mutex-unlock! *rmt-mutex*) + (rmt:send-receive cmd rid params attemptnum: attemptnum)) + ;; if not on homehost ensure we have a connection to a live server + ((or (not (pair? (remote-hh-dat *runremote*))) ;; have a homehost record? + (not (cdr (remote-hh-dat *runremote*))) ;; have record, are we on a homehost? + (not (remote-conndat *runremote*))) ;; do we not have a connection? + (remote-hh-dat-set! *runremote* (common:get-homehost)) + (remote-conndat-set! *runremote* (rmt:get-connection-info 0)) + (mutex-unlock! *rmt-mutex*) + (server:kind-run *toppath*) ;; we need a sever + (rmt:send-receive cmd rid params attemptnum: attemptnum)) + ;; all set up if get this far, dispatch the query + ((cdr (remote-hh-dat *runremote*)) ;; we are on homehost + (mutex-unlock! *rmt-mutex*) + (rmt:open-qry-close-locally cmd (if rid rid 0) params) ) + ;; reset the connection if it has been unused too long + ;(> (- start-time (remote-last-server-check *runremote*)) + ;(remote-server-timeout *runremote*))) ;; we have timed out for this connection + ;; not on homehost, do server query + (else + (mutex-unlock! *rmt-mutex*) + (let* ((dat (case (remote-transport *runremote*) + ((http)(condition-case + (http-transport:client-api-send-receive run-id connection-info cmd params) + ((commfail)(vector #f "communications fail")) + ((exn)(vector #f "other fail")))) + (else + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported") + (exit)))) + (success (if (vector? dat) (vector-ref dat 0) #f)) + (res (if (vector? dat) (vector-ref dat 1) #f))) + (if (vector? connection-info)(http-transport:server-dat-update-last-access connection-info)) ;; refresh access time + (if (and success res) + (case (remote-transport *runremote*) + ((http) res) + (else + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " is unknown") + (exit 1))) + (begin + (debug:print 0 *default-log-port* "WARNING: communication failed. Trying again, try num: " attemptnum) + (remote-conndat-set! *runremote* #f) + (server-url-set! *runremote* #f) + (tasks:start-and-wait-for-server (tasks:open-db) 0 15) + (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))))))))) + +(define (junk-delete-me) + (let ((expire-time (- (current-seconds) (server:get-timeout) 10))) ;; don't forget the 10 second margin + (for-each + (lambda (run-id) + (let ((connection (hash-table-ref/default *runremote* run-id #f))) + (if (and (vector? connection) + (< (http-transport:server-dat-get-last-access connection) expire-time)) + (begin + (debug:print-info 0 *default-log-port* "Discarding connection to server for run-id " run-id ", too long between accesses") + ;; bb- disabling nanomsg + ;; SHOULD CLOSE THE CONNECTION HERE + ;; (case *transport-type* + ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket + ;; (hash-table-ref *runremote* run-id))))) + (hash-table-delete! *runremote* run-id))))) + (hash-table-keys *runremote*))) + ;; (mutex-unlock! *db-multi-sync-mutex*) + ;; (mutex-lock! *send-receive-mutex*) + (let* ((run-id (if rid rid 0)) + (home-host (common:get-homehost)) + (connection-info (if (cdr home-host) ;; we are on the home-host + #f + (rmt:get-connection-info run-id)))) + (cond + (home-host (rmt:open-qry-close-locally cmd run-id params)) + (connection-info + ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) + ;; use the server if have connection info + (let* ((dat (case *transport-type* + ((http)(condition-case + (http-transport:client-api-send-receive run-id connection-info cmd params) + ((commfail)(vector #f "communications fail")) + ((exn)(vector #f "other fail")))) + ;; ((nmsg)(condition-case + ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd params) + ;; ((timeout)(vector #f "timeout talking to server")))) + (else (exit)))) + (success (if (vector? dat) (vector-ref dat 0) #f)) + (res (if (vector? dat) (vector-ref dat 1) #f))) + (if (vector? connection-info)(http-transport:server-dat-update-last-access connection-info)) + (if success + (begin + ;; (mutex-unlock! *send-receive-mutex*) + (case *transport-type* + ((http) res) ;; (db:string->obj res)) + ;; ((nmsg) res) + )) ;; (vector-ref res 1))) + (begin ;; let ((new-connection-info (client:setup run-id))) + (debug:print 0 *default-log-port* "WARNING: Communication failed, trying call to rmt:send-receive again.") + ;; (case *transport-type* + ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info)))) + (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection + ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. + ;; (if (eq? (modulo attemptnum 5) 0) + ;; (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) + ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications + (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) + ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) + + ;; no longer killing the server in http-transport:client-api-send-receive + ;; may kill it here but what are the criteria? + ;; start with three calls then kill server + ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) + ;; (thread-sleep! 2) + (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))))) + (else + ;; no connection info? try to start a server, or access locally if no + ;; server and the query is read-only + ;; + ;; Note: The tasks db was checked for a server in starting mode in the rmt:get-connection-info call + ;; + (if (and (< attemptnum 15) + (member cmd api:write-queries)) + (let ((homehost (common:get-homehost))) ;; faststart (configf:lookup *configdat* "server" "faststart"))) + (hash-table-delete! *runremote* run-id) + ;; (mutex-unlock! *send-receive-mutex*) + (if (not (cdr homehost)) ;; we always require a server if not on homehost ;; (and faststart (equal? faststart "no")) + (begin + (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10) + (thread-sleep! (random 5)) ;; give some time to settle and minimize collison? + (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))) + ;; NB - probably can remove the query time stuff but need to discuss it .... + (let ((start-time (current-milliseconds)) + (max-query (string->number (or (configf:lookup *configdat* "server" "server-query-threshold") + "300"))) + (newres (rmt:open-qry-close-locally cmd run-id params))) + (let ((delta (- (current-milliseconds) start-time))) + (if (> delta max-query) + (begin + (debug:print-info 0 *default-log-port* "WARNING: long query times, you may have an overloaded homehost.") ;; Starting server as query time " delta " is over the limit of " max-query) + ;; (server:kind-run run-id))) + )) + ;; return the result! + newres) + ))) + (begin + ;; (debug:print-error 0 *default-log-port* "Communication failed!") + ;; (mutex-unlock! *send-receive-mutex*) + ;; (exit) + (rmt:open-qry-close-locally cmd run-id params) + )))))) (define (rmt:update-db-stats run-id rawcmd params duration) (mutex-lock! *db-stats-mutex*) (handle-exceptions exn @@ -277,20 +337,20 @@ ;; (db:string->obj (vector-ref dat 1)) ;; (begin ;; (debug:print-error 0 *default-log-port* "rmt:send-receive-no-auto-client-setup failed, attempting to continue. Got " dat) ;; dat)))) -;; Wrap json library for strings (why the ports crap in the first place?) -(define (rmt:dat->json-str dat) - (with-output-to-string - (lambda () - (json-write dat)))) - -(define (rmt:json-str->dat json-str) - (with-input-from-string json-str - (lambda () - (json-read)))) +;; ;; Wrap json library for strings (why the ports crap in the first place?) +;; (define (rmt:dat->json-str dat) +;; (with-output-to-string +;; (lambda () +;; (json-write dat)))) +;; +;; (define (rmt:json-str->dat json-str) +;; (with-input-from-string json-str +;; (lambda () +;; (json-read)))) ;;====================================================================== ;; ;; A C T U A L A P I C A L L S ;; Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -166,24 +166,30 @@ ;; (rmt:start-server run-id))) (define server:try-running server:run) ;; there is no more per-run servers ;; REMOVE ME. BUG. (define (server:start-attempted? areapath) (let ((flagfile (conc areapath "/.starting-server"))) - (and (file-exists? flagfile) - (< (- (current-seconds) - (file-modification-time flagfile)) - 15)))) ;; exists and less than 15 seconds old + (handle-exceptions + exn + #f ;; if things go wrong pretend we can't see the file + (and (file-exists? flagfile) + (< (- (current-seconds) + (file-modification-time flagfile)) + 15))))) ;; exists and less than 15 seconds old (define (server:read-dotserver areapath) (let ((dotfile (conc areapath "/.server"))) - (if (and (file-exists? dotfile) - (file-read-access? dotfile)) - (with-input-from-file - dotfile - (lambda () - (read-line))) - #f))) + (handle-exceptions + exn + #f ;; if things go wrong pretend we can't see the file + (if (and (file-exists? dotfile) + (file-read-access? dotfile)) + (with-input-from-file + dotfile + (lambda () + (read-line))) + #f)))) ;; write a .server file in *toppath* with hostport ;; return #t on success, #f otherwise ;; (define (server:write-dotserver areapath hostport)