Index: client.scm ================================================================== --- client.scm +++ client.scm @@ -177,11 +177,11 @@ (car (vector-ref logininfo 1)) #f)))))) (if (and start-res ping-res) (begin - (hash-table-set! *runremote* run-id start-res) + ;; (hash-table-set! *runremote* run-id start-res) (debug:print-info 2 "connected to " (http-transport:server-dat-make-url start-res)) start-res) (begin ;; login failed but have a server record, clean out the record and try again (debug:print-info 0 "client:setup, login failed, will attempt to start server ... start-res=" start-res ", run-id=" run-id ", server-dat=" server-dat) (case *transport-type* @@ -199,48 +199,17 @@ (server:try-running run-id) (thread-sleep! 5) ;; give server a little time to start up (client:setup run-id remaining-tries: (- remaining-tries 1)) ))) (begin ;; no server registered - (let ((num-available (tasks:num-in-available-state (db:dbdat-get-db tdbdat) run-id))) - (debug:print-info 0 "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available) - (if (< num-available 2) - (server:try-running run-id)) - (thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. - (client:setup run-id remaining-tries: (- remaining-tries 1))))))))) - -;; (let ((host-info (hash-table-ref/default *runremote* run-id #f))) -;; (if host-info ;; this is a bit circular. the host-info *is* the start-res FIXME -;; (let* ((iface (http-transport:server-dat-get-iface host-info)) -;; (port (http-transport:server-dat-get-port host-info)) -;; (start-res (case *transport-type* -;; ((http)(http-transport:client-connect iface port)) -;; ((nmsg)(nmsg-transport:client-connect iface port)) ;; (http-transport:server-dat-get-socket host-info)) -;; (else #f))) -;; (ping-res (case *transport-type* -;; ((http)(rmt:login-no-auto-client-setup start-res run-id)) -;; ((nmsg)(let ((logininfo (rmt:login-no-auto-client-setup start-res run-id))) -;; (if logininfo -;; (vector-ref (vector-ref logininfo 1) 1) -;; #f))) -;; (else #f)))) -;; (if ping-res ;; sucessful login? -;; (begin -;; (debug:print-info 2 "client:setup, ping is good using host-info=" host-info ", remaining-tries=" remaining-tries) -;; start-res) ;; return the server info -;; ;; have host info but no ping. shutdown the current connection and try again -;; (begin ;; login failed -;; (debug:print-info 1 "client:setup, ping is bad for start-res=" start-res " and *runremote*=" host-info) -;; (case *transport-type* -;; ((http)(http-transport:close-connections run-id))) -;; (hash-table-delete! *runremote* run-id) -;; (if (< remaining-tries 8) -;; (thread-sleep! 5) -;; (thread-sleep! 1)) -;; (client:setup run-id remaining-tries: (- remaining-tries 1))))) -;; ;; YUK: rename server-dat here -;; + ;; (let ((num-available (tasks:num-in-available-state (db:dbdat-get-db tdbdat) run-id))) + ;; (debug:print-info 0 "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available) + ;; (if (< num-available 2) + ;; (server:try-running run-id)) + (tasks:start-and-wait-for-server tdbdat run-id delay-max-tries) + ;; (thread-sleep! (+ 2 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. + (client:setup run-id remaining-tries: (- remaining-tries 1)))))))) ;; keep this as a function to ease future (define (client:start run-id server-info) (http-transport:client-connect (tasks:hostinfo-get-interface server-info) (tasks:hostinfo-get-port server-info))) Index: dashboard.scm ================================================================== --- dashboard.scm +++ dashboard.scm @@ -1473,13 +1473,14 @@ (file-modification-time *monitor-db-path*) -1)) (run-update-time (current-seconds)) (recalc (dashboard:recalc modtime *please-update-buttons* *last-db-update-time*))) (if (and (eq? *current-tab-number* 0) - (> monitor-modtime *last-monitor-update-time*)) + (or (> monitor-modtime *last-monitor-update-time*) + (> (- run-update-time *last-monitor-update-time*) 5))) ;; update every 1/2 minute just in case (begin - (set! *last-monitor-update-time* monitor-modtime) + (set! *last-monitor-update-time* run-update-time) ;; monitor-modtime) (if dashboard:update-servers-table (dashboard:update-servers-table)))) (if recalc (begin (case *current-tab-number* ((0) Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -28,24 +28,28 @@ ;; (trace ;; rmt:send-receive ;; api:execute-requests ;; ) +;; generate entries for ~/.megatestrc with the following +;; +;; grep define ../rmt.scm | grep rmt: |perl -pi -e 's/\(define\s+\((\S+)\W.*$/\1/'|sort -u + ;;====================================================================== ;; S U P P O R T F U N C T I O N S ;;====================================================================== -;; NOT USED +;; NOT USED? ;; -(define (rmt:call-transport run-id connection-info cmd jparams) - (case (server:get-transport) - ((rpc) ( rpc-transport:client-api-send-receive run-id connection-info cmd jparams)) - ((http) (http-transport:client-api-send-receive run-id connection-info cmd jparams)) - ((fs) ( fs-transport:client-api-send-receive run-id connection-info cmd jparams)) - ((zmq) (zmq-transport:client-api-send-receive run-id connection-info cmd jparams)) - (else ( rpc-transport:client-api-send-receive run-id connection-info cmd jparams)))) +;; (define (rmt:call-transport run-id connection-info cmd jparams) +;; (case (server:get-transport) +;; ((rpc) ( rpc-transport:client-api-send-receive run-id connection-info cmd jparams)) +;; ((http) (http-transport:client-api-send-receive run-id connection-info cmd jparams)) +;; ((fs) ( fs-transport:client-api-send-receive run-id connection-info cmd jparams)) +;; ((zmq) (zmq-transport:client-api-send-receive run-id connection-info cmd jparams)) +;; (else ( rpc-transport:client-api-send-receive run-id connection-info cmd jparams)))) ;; (define (rmt:write-frequency-over-limit? cmd run-id) (and (not (member cmd api:read-only-queries)) (let* ((tmprec (hash-table-ref/default *write-frequency* run-id #f)) @@ -63,19 +67,21 @@ (begin (debug:print-info 1 "db write rate too high, starting a server, count=" count " start=" start " run-id=" run-id " queries-per-second=" queries-per-second) #t) #f)))) -(define (rmt:get-connection-info run-id) - (let ((cinfo (hash-table-ref/default *runremote* run-id #f))) - (if cinfo - cinfo - ;; NB// can cache the answer for server running for 10 seconds ... - ;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id)) - (if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id) - (client:setup run-id) - #f)))) +;; (define (rmt:get-connection-info run-id) +;; (let ((cinfo (hash-table-ref/default *runremote* run-id #f))) +;; (if cinfo +;; cinfo +;; ;; NB// can cache the answer for server running for 10 seconds ... +;; ;; ;; (and (not (rmt:write-frequency-over-limit? cmd run-id)) +;; ;; (if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id) +;; ;; (begin +;; ;; (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id) +;; (client:setup run-id)))) +;; ;; #f)))) (define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id (define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected ;; clean out old connections (mutex-lock! *db-multi-sync-mutex*) @@ -92,21 +98,26 @@ ((nmsg)(nn-close (http-transport:server-dat-get-socket (hash-table-ref *runremote* run-id))))) (hash-table-delete! *runremote* run-id))))) (hash-table-keys *runremote*))) (mutex-unlock! *db-multi-sync-mutex*) + ;; (mutex-lock! *send-receive-mutex*) (let* ((run-id (if rid rid 0)) - (connection-info (rmt:get-connection-info run-id))) + (connection-info (hash-table-ref/default *runremote* run-id #f))) ;; (rmt:get-connection-info run-id))) ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) (if connection-info ;; use the server if have connection info (let* ((dat (case *transport-type* ((http)(condition-case (http-transport:client-api-send-receive run-id connection-info cmd params) - ((commfail)(vector #f "communications fail")) - ((exn)(vector #f "other fail")))) + ((commfail) + (tasks:kill-server-run-id run-id) + (vector #f "communications fail")) + ((exn) + (tasks:kill-server-run-id run-id) + (vector #f "other fail")))) ((nmsg)(condition-case (nmsg-transport:client-api-send-receive run-id connection-info cmd params) ((timeout)(vector #f "timeout talking to server")))) (else (exit)))) (success (if (vector? dat) (vector-ref dat 0) #f)) @@ -113,45 +124,31 @@ (res (if (vector? dat) (vector-ref dat 1) #f))) (if (vector? connection-info)(http-transport:server-dat-update-last-access connection-info)) (if success (begin ;; (mutex-unlock! *send-receive-mutex*) + ;; all is well, return the result! (case *transport-type* ((http) res) ;; (db:string->obj res)) ((nmsg) res))) ;; (vector-ref res 1))) + ;; we had a connection but it is borked. clean up and reconnect (begin ;; let ((new-connection-info (client:setup run-id))) (debug:print 0 "WARNING: Communication failed, trying call to rmt:send-receive again.") ;; (case *transport-type* ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info)))) (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection - ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. - ;; (if (eq? (modulo attemptnum 5) 0) - ;; (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) - ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications - (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) - ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) - - ;; no longer killing the server in http-transport:client-api-send-receive - ;; may kill it here but what are the criteria? - ;; start with three calls then kill server - ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) - ;; (thread-sleep! 2) (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1))))) ;; no connection info? try to start a server (if (and (< attemptnum 15) (member cmd api:write-queries)) (begin (hash-table-delete! *runremote* run-id) ;; (mutex-unlock! *send-receive-mutex*) (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10) - ;; (client:setup run-id) ;; client setup happens in rmt:get-connection-info - (thread-sleep! (random 5)) ;; give some time to settle and minimize collison? + (hash-table-set! *runremote* run-id (client:setup run-id)) (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))) (begin - ;; (debug:print 0 "ERROR: Communication failed!") - ;; (mutex-unlock! *send-receive-mutex*) - ;; (exit) (rmt:open-qry-close-locally cmd run-id params) ))))) (define (rmt:update-db-stats run-id rawcmd params duration) (mutex-lock! *db-stats-mutex*) @@ -208,33 +205,44 @@ (cons 'none 0)) (loop (car tal)(cdr tal) newmax-cmd currmax))))))) (mutex-unlock! *db-stats-mutex*) res)) -(define (rmt:open-qry-close-locally cmd run-id params) +(define (rmt:open-qry-close-locally cmd run-id params #!key (remretries 5)) (let* ((dbstruct-local (if *dbstruct-db* *dbstruct-db* (let* ((dbdir (conc (configf:lookup *configdat* "setup" "linktree") "/.db")) (db (make-dbr:dbstruct path: dbdir local: #t))) (set! *dbstruct-db* db) db))) - (db-file-path (db:dbfile-path 0))) - ;; (read-only (not (file-read-access? db-file-path))) - (let* ((start (current-milliseconds)) - (resdat (api:execute-requests dbstruct-local (vector (symbol->string cmd) params))) - (res (vector-ref resdat 1)) - (duration (- (current-milliseconds) start))) - (rmt:update-db-stats run-id cmd params duration) - ;; mark this run as dirty if this was a write - (if (not (member cmd api:read-only-queries)) - (let ((start-time (current-seconds))) - (mutex-lock! *db-multi-sync-mutex*) - ;; (if (not (hash-table-ref/default *db-local-sync* run-id #f)) - ;; just set it every time. Is a write more expensive than a read and does it matter? - (hash-table-set! *db-local-sync* (or run-id 0) start-time) ;; the oldest "write" - (mutex-unlock! *db-multi-sync-mutex*))) - res))) + (db-file-path (db:dbfile-path 0)) + ;; (read-only (not (file-read-access? db-file-path))) + (start (current-milliseconds)) + (resdat (api:execute-requests dbstruct-local (vector (symbol->string cmd) params))) + (success (vector-ref resdat 0)) + (res (vector-ref resdat 1)) + (duration (- (current-milliseconds) start))) + (if (not success) + (if (> remretries 0) + (begin + (debug:print 0 "ERROR: local query failed. Trying again.") + (thread-sleep! (/ (random 5000) 1000)) ;; some random delay + (rmt:open-qry-close-locally cmd run-id params remretries: (- remretries 1))) + (begin + (debug:print 0 "ERROR: too many retries in rmt:open-qry-close-locally, giving up") + #f)) + (begin + (rmt:update-db-stats run-id cmd params duration) + ;; mark this run as dirty if this was a write + (if (not (member cmd api:read-only-queries)) + (let ((start-time (current-seconds))) + (mutex-lock! *db-multi-sync-mutex*) + ;; (if (not (hash-table-ref/default *db-local-sync* run-id #f)) + ;; just set it every time. Is a write more expensive than a read and does it matter? + (hash-table-set! *db-local-sync* (or run-id 0) start-time) ;; the oldest "write" + (mutex-unlock! *db-multi-sync-mutex*))) + res)))) (define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params) (let* ((run-id (if run-id run-id 0)) ;; (jparams (db:obj->string params)) ;; (rmt:dat->json-str params)) (res (handle-exceptions Index: runs.scm ================================================================== --- runs.scm +++ runs.scm @@ -160,13 +160,13 @@ (hash-table-set! *runs:denoise* key currtime) #t) #f))) (define (runs:can-run-more-tests run-id jobgroup max-concurrent-jobs) - ;;(thread-sleep! (cond - ;; ((> *runs:can-run-more-tests-count* 20) 2);; obviously haven't had any work to do for a while - ;; (else 0))) + (thread-sleep! (cond + ((> *runs:can-run-more-tests-count* 20) 2);; obviously haven't had any work to do for a while + (else 0))) (let* ((num-running (rmt:get-count-tests-running run-id)) (num-running-in-jobgroup (rmt:get-count-tests-running-in-jobgroup run-id jobgroup)) (job-group-limit (let ((jobg-count (config-lookup *configdat* "jobgroups" jobgroup))) (if (string? jobg-count) (string->number jobg-count) Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -361,40 +361,41 @@ mdb ;; NEEDS dbprep ADDED "SELECT id FROM servers WHERE run_id=? AND state = 'running';" run-id) res)) (define (tasks:need-server run-id) - (let ((forced (configf:lookup *configdat* "server" "required")) - (maxqry (cdr (rmt:get-max-query-average run-id))) - (threshold (string->number (or (configf:lookup *configdat* "server" "server-query-threshold") "10")))) - (cond - (forced - (if (common:low-noise-print 60 run-id "server required is set") - (debug:print-info 0 "Server required is set, starting server for run-id " run-id ".")) - #t) - ((> maxqry threshold) - (if (common:low-noise-print 60 run-id "Max query time execeeded") - (debug:print-info 0 "Max avg query time of " maxqry "ms exceeds limit of " threshold "ms, server needed for run-id " run-id ".")) - #t) - (else - #f)))) + (configf:lookup *configdat* "server" "required")) + +;; (maxqry (cdr (rmt:get-max-query-average run-id))) +;; (threshold (string->number (or (configf:lookup *configdat* "server" "server-query-threshold") "10")))) +;; (cond +;; (forced +;; (if (common:low-noise-print 60 run-id "server required is set") +;; (debug:print-info 0 "Server required is set, starting server for run-id " run-id ".")) +;; #t) +;; ((> maxqry threshold) +;; (if (common:low-noise-print 60 run-id "Max query time execeeded") +;; (debug:print-info 0 "Max avg query time of " maxqry "ms exceeds limit of " threshold "ms, server needed for run-id " run-id ".")) +;; #t) +;; (else +;; #f)))) ;; try to start a server and wait for it to be available ;; (define (tasks:start-and-wait-for-server tdbdat run-id delay-max-tries) ;; ensure a server is running for this run - (let loop ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id)) + (let loop ((server-running (tasks:server-running? (db:delay-if-busy tdbdat) run-id)) (delay-time 0)) - (if (and (not server-dat) + (if (and (not server-running) (< delay-time delay-max-tries)) (begin (if (common:low-noise-print 60 "tasks:start-and-wait-for-server" run-id) (debug:print 0 "Try starting server for run-id " run-id)) (thread-sleep! (/ (random 2000) 1000)) (server:kind-run run-id) (thread-sleep! (min delay-time 1)) - (loop (tasks:get-server (db:delay-if-busy tdbdat) run-id)(+ delay-time 1)))))) + (loop (tasks:server-running? (db:delay-if-busy tdbdat) run-id)(+ delay-time 1)))))) (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id)