Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -74,10 +74,11 @@ ;; SERVER (define *my-client-signature* #f) (define *transport-type* 'http) (define *transport-type* 'http) ;; override with [server] transport http|rpc|nmsg +(define *transport-type* 'http) (define *runremote* (make-hash-table)) ;; if set up for server communication this will hold (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) (define *client-non-blocking-mode* #f) (define *server-id* #f) Index: http-transport.scm ================================================================== --- http-transport.scm +++ http-transport.scm @@ -357,14 +357,15 @@ (define (http-transport:sync-inmemdb-to-db tdbdat server-state run-id server-id bad-sync-count) (if *inmemdb* (let ((start-time (current-milliseconds)) (sync-time #f) (rem-time #f) - (sync-retry #f)) + (sync-retry #f) + (sync-touched (db:sync-touched *inmemdb* *run-id* force-sync: #t))) ;; inmemdb is a dbstruct - (condition-case - (db:sync-touched *inmemdb* *run-id* force-sync: #t) + (condition-case sync-touched + ((sync-failed)(cond ((> bad-sync-count 10) ;; time to give up (http-transport:server-shutdown server-id port)) (else ;; (> bad-sync-count 0) ;; we've had a fail or two, delay and loop (thread-sleep! 5) @@ -372,41 +373,42 @@ ((exn) (debug:print 0 "ERROR: error from sync code other than 'sync-failed. Attempting to gracefully shutdown the server") (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " http-transport:keep-running crashed") (exit))) (if sync-retry - #t ; return true - retry + (begin + #t) ; return true - retry (begin (set! sync-time (- (current-milliseconds) start-time)) (set! rem-time (quotient (- 4000 sync-time) 1000)) (debug:print 4 "SYNC: time= " sync-time ", rem-time=" rem-time) (if (and (<= rem-time 4) (> rem-time 0)) (thread-sleep! rem-time) - (thread-sleep! 4)) ;; fallback for if the math is changed ... - - ;; - ;; no *inmemdb* yet, set running after our first pass through and start the db - ;; - (if (eq? server-state 'available) - (let ((new-server-id (tasks:server-am-i-the-server? (db:delay-if-busy tdbdat) run-id))) ;; try to ensure no double registering of servers - (if (equal? new-server-id server-id) - (begin - (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep") - (thread-sleep! 0.5) ;; give some margin for queries to complete before switching from file based access to server based access - (set! *inmemdb* (db:setup run-id)) - ;; force initialization - ;; (db:get-db *inmemdb* #t) - (db:get-db *inmemdb* run-id) - (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running")) - (begin ;; gotta exit nicely - (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "collision") - (http-transport:server-shutdown server-id port))))) - #f))) ; return #f - don't retry - #f)) ; return #f - don't retry since there is no inmemdb - + (thread-sleep! 4)))) + #f) ;; fallback for if the math is changed ... + + ;; + ;; no *inmemdb* yet, set running after our first pass through and start the db + ;; + (begin + (if (eq? server-state 'available) + (let ((new-server-id (tasks:server-am-i-the-server? (db:delay-if-busy tdbdat) run-id))) ;; try to ensure no double registering of servers + (if (equal? new-server-id server-id) + (begin + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep") + (thread-sleep! 0.5) ;; give some margin for queries to complete before switching from file based access to server based access + (set! *inmemdb* (db:setup run-id)) + ;; force initialization + ;; (db:get-db *inmemdb* #t) + (db:get-db *inmemdb* run-id) + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running")) + (begin ;; gotta exit nicely + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "collision") + (http-transport:server-shutdown server-id port))))))) + #f) ;;; factored out of http-transport:keep-running (define (http-transport:get-server-info tdbdat server-start-time server-id run-id) (let loop ((start-time (current-seconds)) (changed #t) @@ -417,15 +419,15 @@ (mutex-lock! *heartbeat-mutex*) (set! sdat *server-info*) (mutex-unlock! *heartbeat-mutex*) (if (and sdat (not changed) - (> (- (current-seconds) start-time) 2)) + (> (- (current-seconds) start-time) (- (tasks:update-pause-seconds) 1) )) sdat (begin - (debug:print-info 0 "Still waiting, last-sdat=" last-sdat) - (sleep 4) + (debug:print-info 0 "Still waiting, sdat="sdat" last-sdat=" last-sdat) + (sleep (tasks:update-pause-seconds)) (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes (begin (debug:print 0 "ERROR: transport appears to have died, exiting server " server-id " for run " run-id) (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature") (exit)) Index: launch.scm ================================================================== --- launch.scm +++ launch.scm @@ -607,12 +607,16 @@ environ-patt: "env-override" given-toppath: (get-environment-variable "MT_RUN_AREA_HOME") pathenvvar: "MT_RUN_AREA_HOME")))) (set! *configdat* (if (car *configinfo*)(car *configinfo*) #f)) (set! *toppath* (if (car *configinfo*)(cadr *configinfo*) #f)) - (let* ((tmptransport (configf:lookup *configdat* "server" "transport")) - (transport (if tmptransport (string->symbol tmptransport) 'http))) + (let* ((cmdlinetransport (args:get-arg "-transport")) + (tmptransport (configf:lookup *configdat* "server" "transport")) + (transport + (if cmdlinetransport + (string->symbol cmdlinetransport) + (if tmptransport (string->symbol tmptransport) 'http)))) (if (member transport '(http rpc nmsg)) (set! *transport-type* transport) (begin (debug:print 0 "ERROR: Unrecognised transport " transport) (exit)))) Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -145,11 +145,10 @@ -update-meta : update the tests metadata for all tests -setvars VAR1=val1,VAR2=val2 : Add environment variables to a run NB// these are overwritten by values set in config files. -server -|hostname : start the server (reduces contention on megatest.db), use - to automatically figure out hostname - -transport http|zmq : use http or zmq for transport (default is http) -daemonize : fork into background and disconnect from stdin/out -log logfile : send stdout and stderr to logfile -list-servers : list the servers -stop-server id : stop server specified by id (see output of -list-servers), use 0 to kill all @@ -226,11 +225,11 @@ ":units" ;; misc "-start-dir" "-server" "-stop-server" - "-transport" + "-transport" ;; note: this is deprecated "-kill-server" "-port" "-extract-ods" "-pathmod" "-env2file" @@ -707,10 +706,11 @@ #t )))))) ;; MAY STILL NEED THIS ;; (set! *megatest-db* (make-dbr:dbstruct path: *toppath* local: #t)))))))))) + (if (or (args:get-arg "-list-servers") (args:get-arg "-stop-server")) (let ((tl (launch:setup-for-run))) (if tl Index: rpc-transport.scm ================================================================== --- rpc-transport.scm +++ rpc-transport.scm @@ -6,221 +6,672 @@ ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. -(require-extension (srfi 18) extras tcp s11n rpc) -(import (prefix rpc rpc:)) +(require-extension (srfi 18) extras tcp s11n) + +(use srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) ;; sqlite3 +;; (import (prefix sqlite3 sqlite3:)) + +(use spiffy uri-common intarweb http-client spiffy-request-vars intarweb spiffy-directory-listing) -(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) -(import (prefix sqlite3 sqlite3:)) +;; Configurations for server +(tcp-buffer-size 2048) +(max-connections 2048) (declare (unit rpc-transport)) (declare (uses common)) (declare (uses db)) (declare (uses tests)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. +(declare (uses server)) +(declare (uses daemon)) +(declare (uses portlogger)) (include "common_records.scm") (include "db_records.scm") -;; procstr is the name of the procedure to be called as a string -(define (rpc-transport:autoremote procstr params) - (handle-exceptions - exn - (begin - (debug:print 1 "Remote failed for " proc " " params) - (apply (eval (string->symbol procstr)) params)) - ;; (if *runremote* - ;; (apply (eval (string->symbol (conc "remote:" procstr))) params) - (apply (eval (string->symbol procstr)) params))) - -;; all routes though here end in exit ... -;; -;; start_server? -;; -(define (rpc-transport:launch run-id) - (set! *run-id* run-id) - (if (args:get-arg "-daemonize") - (daemon:ize)) - (if (server:check-if-running run-id) - (begin - (debug:print 0 "INFO: Server for run-id " run-id " already running") - (exit 0))) - (let loop ((server-id (open-run-close tasks:server-lock-slot tasks:open-db run-id)) - (remtries 4)) - (if (not server-id) - (if (> remtries 0) - (begin - (thread-sleep! 2) - (loop (open-run-close tasks:server-lock-slot tasks:open-db run-id) - (- remtries 1))) - (begin - ;; since we didn't get the server lock we are going to clean up and bail out - (debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") - (open-run-close tasks:server-delete-records-for-this-pid tasks:open-db " rpc-transport:launch"))) - (begin - (rpc-transport:run (if (args:get-arg "-server")(args:get-arg "-server") "-") run-id server-id) - (exit))))) +(define (rpc-transport:make-server-url hostport) + (if (not hostport) + #f + (conc "http://" (car hostport) ":" (cadr hostport)))) + +(define *server-loop-heart-beat* (current-seconds)) +(define *heartbeat-mutex* (make-mutex)) + +;;====================================================================== +;; S E R V E R +;;====================================================================== + +;; Call this to start the actual server +;; + +(define *db:process-queue-mutex* (make-mutex)) (define (rpc-transport:run hostn run-id server-id) - (debug:print 2 "Attempting to start the rpc server ...") - ;; (trace rpc:publish-procedure!) - - (rpc:publish-procedure! 'server:login server:login) - (rpc:publish-procedure! 'testing (lambda () "Just testing")) - - (let* ((db #f) + (debug:print 2 "Attempting to start the server ...") + (let* ((db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily (hostname (get-host-name)) (ipaddrstr (let ((ipstr (if (string=? "-" hostn) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") (server:get-best-guess-address hostname) #f))) (if ipstr ipstr hostn))) ;; hostname))) - (start-port (open-run-close tasks:server-get-next-port tasks:open-db)) - (link-tree-path (configf:lookup *configdat* "setup" "linktree")) - (rpc:listener (rpc-transport:find-free-port-and-open (rpc:default-server-port))) - (th1 (make-thread - (lambda () - ((rpc:make-server rpc:listener) #t)) - "rpc:server")) - ;; (cute (rpc:make-server rpc:listener) "rpc:server") - ;; 'rpc:server)) - (hostname (if (string=? "-" hostn) - (get-host-name) - hostn)) - (ipaddrstr (if (string=? "-" hostn) - (server:get-best-guess-address hostname) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") - #f)) - (portnum (rpc:default-server-port)) - (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" portnum)) - (tdb (tasks:open-db))) - (thread-start! th1) - (set! db *inmemdb*) - (open-run-close tasks:server-set-interface-port - tasks:open-db - server-id - ipaddrstr portnum) - (debug:print 0 "Server started on " host:port) - - ;; (trace rpc:publish-procedure!) - ;; (rpc:publish-procedure! 'server:login server:login) - ;; (rpc:publish-procedure! 'testing (lambda () "Just testing")) - - ;;====================================================================== - ;; ;; end of publish-procedure section - ;;====================================================================== - ;; - (on-exit (lambda () - (open-run-close tasks:server-set-state! tasks:open-db server-id "stopped"))) - - (set! *rpc:listener* rpc:listener) - (tasks:server-set-state! tdb server-id "running") - (set! *inmemdb* (db:setup run-id)) - ;; if none running or if > 20 seconds since - ;; server last used then start shutdown - (let loop ((count 0)) - (thread-sleep! 5) ;; no need to do this very often - (let ((numrunning -1)) ;; (db:get-count-tests-running db))) - (if (or (> numrunning 0) - (> (+ *last-db-access* 60)(current-seconds))) - (begin - (debug:print-info 0 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*)) - (loop (+ 1 count))) - (begin - (debug:print-info 0 "Starting to shutdown the server side") - (open-run-close tasks:server-delete-record tasks:open-db server-id " rpc-transport:try-start-server stop") - (thread-sleep! 10) - (debug:print-info 0 "Max cached queries was " *max-cache-size*) - (debug:print-info 0 "Server shutdown complete. Exiting") - )))))) - -(define (rpc-transport:find-free-port-and-open port) - (handle-exceptions - exn - (begin - (print "Failed to bind to port " (rpc:default-server-port) ", trying next port") - (rpc-transport:find-free-port-and-open (+ port 1))) - (rpc:default-server-port port) - (tcp-read-timeout 240000) - (tcp-listen (rpc:default-server-port) 10000))) - -(define (rpc-transport:ping run-id host port) - (handle-exceptions - exn - (begin - (print "SERVER_NOT_FOUND") - (exit 1)) - (let ((login-res ((rpc:procedure 'server:login host port) *toppath*))) - (if (and (list? login-res) - (car login-res)) - (begin - (print "LOGIN_OK") - (exit 0)) - (begin - (print "LOGIN_FAILED") - (exit 1)))))) - -(define (rpc-transport:client-setup run-id #!key (remtries 10)) - (if *runremote* - (begin - (debug:print 0 "ERROR: Attempt to connect to server but already connected") - #f) - (let* ((host-info (hash-table-ref/default *runremote* run-id #f))) ;; (open-run-close db:get-var #f "SERVER")) - (if host-info - (let ((iface (car host-info)) - (port (cadr host-info)) - (ping-res ((rpc:procedure 'server:login host port) *toppath*))) - (if ping-res - (let ((server-dat (list iface port #f #f #f))) - (hash-table-set! *runremote* run-id server-dat) - server-dat) - (begin - (server:try-running run-id) - (thread-sleep! 2) - (rpc-transport:client-setup run-id (- remtries 1))))) - (let* ((server-db-info (open-run-close tasks:get-server tasks:open-db run-id))) - (debug:print-info 0 "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) - (if server-db-info - (let* ((iface (tasks:hostinfo-get-interface server-db-info)) - (port (tasks:hostinfo-get-port server-db-info)) - (server-dat (list iface port #f #f #f)) - (ping-res ((rpc:procedure 'server:login host port) *toppath*))) - (if start-res - (begin - (hash-table-set! *runremote* run-id server-dat) - server-dat) - (begin - (server:try-running run-id) - (thread-sleep! 2) - (rpc-transport:client-setup run-id (- remtries 1))))) - (begin - (server:try-running run-id) - (thread-sleep! 2) - (rpc-transport:client-setup run-id (- remtries 1))))))))) -;; -;; (port (if (and hostinfo (> (length hostdat) 1))(cadr hostdat) #f))) -;; (if (and port -;; (string->number port)) -;; (let ((portn (string->number port))) -;; (debug:print-info 2 "Setting up to connect to host " host ":" port) -;; (handle-exceptions -;; exn -;; (begin -;; (debug:print 0 "ERROR: Failed to open a connection to the server at host: " host " port: " port) -;; (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) -;; ;; (open-run-close -;; ;; (lambda (db . param) -;; ;; (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER'")) -;; ;; #f) -;; (set! *runremote* #f)) -;; (if (and (not (args:get-arg "-server")) ;; no point in the server using the server using the server -;; ((rpc:procedure 'server:login host portn) *toppath*)) -;; (begin -;; (debug:print-info 2 "Logged in and connected to " host ":" port) -;; (set! *runremote* (vector host portn))) -;; (begin -;; (debug:print-info 2 "Failed to login or connect to " host ":" port) -;; (set! *runremote* #f))))) -;; (debug:print-info 2 "no server available"))))) - + (start-port (portlogger:open-run-close portlogger:find-port)) + (link-tree-path (configf:lookup *configdat* "setup" "linktree"))) + ;; (set! db *inmemdb*) + (debug:print-info 0 "portlogger recommended port: " start-port) + (root-path (if link-tree-path + link-tree-path + (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP! + (handle-directory spiffy-directory-listing) + (handle-exception (lambda (exn chain) + (signal (make-composite-condition + (make-property-condition + 'server + 'message "server error"))))) + + ;; rpc-transport:handle-directory) ;; simple-directory-handler) + ;; Setup the web server and a /ctrl interface + ;; + (vhost-map `(((* any) . ,(lambda (continue) + ;; open the db on the first call + ;; This is were we set up the database connections + (let* (($ (request-vars source: 'both)) + (dat ($ 'dat)) + (res #f)) + (cond + ((equal? (uri-path (request-uri (current-request))) + '(/ "api")) + (send-response body: (api:process-request *inmemdb* $) ;; the $ is the request vars proc + headers: '((content-type text/plain))) + (mutex-lock! *heartbeat-mutex*) + (set! *last-db-access* (current-seconds)) + (mutex-unlock! *heartbeat-mutex*)) + ((equal? (uri-path (request-uri (current-request))) + '(/ "")) + (send-response body: (rpc-transport:main-page))) + ((equal? (uri-path (request-uri (current-request))) + '(/ "runs")) + (send-response body: (rpc-transport:main-page))) + ((equal? (uri-path (request-uri (current-request))) + '(/ any)) + (send-response body: "hey there!\n" + headers: '((content-type text/plain)))) + ((equal? (uri-path (request-uri (current-request))) + '(/ "hey")) + (send-response body: "hey there!\n" + headers: '((content-type text/plain)))) + (else (continue)))))))) + (rpc-transport:try-start-server run-id ipaddrstr start-port server-id))) + +;; This is recursively run by rpc-transport:run until sucessful +;; +(define (rpc-transport:try-start-server run-id ipaddrstr portnum server-id) + (let ((config-hostname (configf:lookup *configdat* "server" "hostname")) + (tdbdat (tasks:open-db))) + (debug:print-info 0 "rpc-transport:try-start-server run-id=" run-id " ipaddrsstr=" ipaddrstr " portnum=" portnum " server-id=" server-id " config-hostname=" config-hostname) + (handle-exceptions + exn + (begin + (print-error-message exn) + (if (< portnum 64000) + (begin + (debug:print 0 "WARNING: attempt to start server failed. Trying again ...") + (debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) + (debug:print 0 "exn=" (condition->list exn)) + (portlogger:open-run-close portlogger:set-failed portnum) + (debug:print 0 "WARNING: failed to start on portnum: " portnum ", trying next port") + (thread-sleep! 0.1) + + ;; get_next_port goes here + (rpc-transport:try-start-server run-id + ipaddrstr + (portlogger:open-run-close portlogger:find-port) + server-id)) + (begin + (tasks:server-force-clean-run-record (db:delay-if-busy tdbdat) run-id ipaddrstr portnum " rpc-transport:try-start-server") + (print "ERROR: Tried and tried but could not start the server")))) + ;; any error in following steps will result in a retry + (set! *server-info* (list ipaddrstr portnum)) + (tasks:server-set-interface-port + (db:delay-if-busy tdbdat) + server-id + ipaddrstr portnum) + (debug:print 0 "INFO: Trying to start server on " ipaddrstr ":" portnum) + ;; This starts the spiffy server + ;; NEED WAY TO SET IP TO #f TO BIND ALL + ;; (start-server bind-address: ipaddrstr port: portnum) + (if config-hostname ;; this is a hint to bind directly + (start-server port: portnum bind-address: (if (equal? config-hostname "-") + ipaddrstr + config-hostname)) + (start-server port: portnum)) + ;; (portlogger:open-run-close portlogger:set-port portnum "released") + (tasks:server-force-clean-run-record (db:delay-if-busy tdbdat) run-id ipaddrstr portnum " rpc-transport:try-start-server") + (debug:print 1 "INFO: server has been stopped")))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +;;====================================================================== +;; C L I E N T S +;;====================================================================== + +(define *rpc-mutex* (make-mutex)) + +;; NOTE: Large block of code from 32436b426188080f72fceb6894af541fbad9921e removed here +;; I'm pretty sure it is defunct. + +;; This next block all imported en-mass from the api branch +(define *rpc-requests-in-progress* 0) +(define *rpc-connections-next-cleanup* (current-seconds)) + +(define (rpc-transport:get-time-to-cleanup) + (let ((res #f)) + (mutex-lock! *rpc-mutex*) + (set! res (> (current-seconds) *rpc-connections-next-cleanup*)) + (mutex-unlock! *rpc-mutex*) + res)) + +(define (rpc-transport:inc-requests-count) + (mutex-lock! *rpc-mutex*) + (set! *rpc-requests-in-progress* (+ 1 *rpc-requests-in-progress*)) + ;; Use this opportunity to slow things down iff there are too many requests in flight + (if (> *rpc-requests-in-progress* 5) + (begin + (debug:print-info 0 "Whoa there buddy, ease up...") + (thread-sleep! 1))) + (mutex-unlock! *rpc-mutex*)) + +(define (rpc-transport:dec-requests-count proc) + (mutex-lock! *rpc-mutex*) + (proc) + (set! *rpc-requests-in-progress* (- *rpc-requests-in-progress* 1)) + (mutex-unlock! *rpc-mutex*)) + +(define (rpc-transport:dec-requests-count-and-close-all-connections) + (set! *rpc-requests-in-progress* (- *rpc-requests-in-progress* 1)) + (let loop ((etime (+ (current-seconds) 5))) ;; give up in five seconds + (if (> *rpc-requests-in-progress* 0) + (if (> etime (current-seconds)) + (begin + (thread-sleep! 0.05) + (loop etime)) + (debug:print 0 "ERROR: requests still in progress after 5 seconds of waiting. I'm going to pass on cleaning up http connections")) + (close-all-connections!))) + (set! *rpc-connections-next-cleanup* (+ (current-seconds) 10)) + (mutex-unlock! *rpc-mutex*)) + +(define (rpc-transport:inc-requests-and-prep-to-close-all-connections) + (mutex-lock! *rpc-mutex*) + (set! *rpc-requests-in-progress* (+ 1 *rpc-requests-in-progress*))) + +;; Send "cmd" with json payload "params" to serverdat and receive result +;; +(define (rpc-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 3)) + (let* ((fullurl (if (vector? serverdat) + (rpc-transport:server-dat-get-api-req serverdat) + (begin + (debug:print 0 "FATAL ERROR: rpc-transport:client-api-send-receive called with no server info") + (exit 1)))) + (res #f) + (success #t) + (sparams (db:obj->string params transport: 'http))) +;; (condition-case +;; handle-exceptions +;; exn +;; (if (> numretries 0) +;; (begin +;; (mutex-unlock! *rpc-mutex*) +;; (thread-sleep! 1) +;; (handle-exceptions +;; exn +;; (debug:print 0 "WARNING: closing connections failed. Server at " fullurl " almost certainly dead") +;; (close-all-connections!)) +;; (debug:print 0 "WARNING: Failed to communicate with server, trying again, numretries left: " numretries) +;; (rpc-transport:client-api-send-receive run-id serverdat cmd sparams numretries: (- numretries 1))) +;; (begin +;; (mutex-unlock! *rpc-mutex*) +;; (tasks:kill-server-run-id run-id) +;; #f)) +;; (begin + (debug:print-info 11 "fullurl=" fullurl ", cmd=" cmd ", params=" params ", run-id=" run-id "\n") + ;; set up the rpc-client here + (max-retry-attempts 1) + ;; consider all requests indempotent + (retry-request? (lambda (request) + #f)) + ;; send the data and get the response + ;; extract the needed info from the http data and + ;; process and return it. + (let* ((send-recieve (lambda () + (mutex-lock! *rpc-mutex*) + ;; (condition-case (with-input-from-request "http://localhost"; #f read-lines) + ;; ((exn http client-error) e (print e))) + (set! res (vector + success + (db:string->obj + (handle-exceptions + exn + (begin + (set! success #f) + (debug:print 0 "WARNING: failure in with-input-from-request to " fullurl ".") + (debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) + (hash-table-delete! *runremote* run-id) + ;; Killing associated server to allow clean retry.") + ;; (tasks:kill-server-run-id run-id) ;; better to kill the server in the logic that called this routine? + (mutex-unlock! *rpc-mutex*) + ;;; (signal (make-composite-condition + ;;; (make-property-condition 'commfail 'message "failed to connect to server"))) + ;;; "communications failed" + (db:obj->string #f)) + (with-input-from-request ;; was dat + fullurl + (list (cons 'key "thekey") + (cons 'cmd cmd) + (cons 'params sparams)) + read-string)) + transport: 'http))) + ;; Shouldn't this be a call to the managed call-all-connections stuff above? + (close-all-connections!) + (mutex-unlock! *rpc-mutex*) + )) + (time-out (lambda () + (thread-sleep! 45) + #f)) + (th1 (make-thread send-recieve "with-input-from-request")) + (th2 (make-thread time-out "time out"))) + (thread-start! th1) + (thread-start! th2) + (thread-join! th1) + (thread-terminate! th2) + (debug:print-info 11 "got res=" res) + (if (vector? res) + (if (vector-ref res 0) + res + (begin ;; note: this code also called in nmsg-transport - consider consolidating it + (debug:print 0 "ERROR: error occured at server, info=" (vector-ref res 2)) + (debug:print 0 " client call chain:") + (print-call-chain (current-error-port)) + (debug:print 0 " server call chain:") + (pp (vector-ref res 1) (current-error-port)) + (signal (vector-ref result 0)))) + (signal (make-composite-condition + (make-property-condition + 'timeout + 'message "nmsg-transport:client-api-send-receive-raw timed out talking to server"))))))) + +;; careful closing of connections stored in *runremote* +;; +(define (rpc-transport:close-connections run-id) + (let* ((server-dat (hash-table-ref/default *runremote* run-id #f))) + (if (vector? server-dat) + (let ((api-dat (rpc-transport:server-dat-get-api-uri server-dat))) + (close-connection! api-dat) + #t) + #f))) + + +(define (make-rpc-transport:server-dat)(make-vector 6)) +(define (rpc-transport:server-dat-get-iface vec) (vector-ref vec 0)) +(define (rpc-transport:server-dat-get-port vec) (vector-ref vec 1)) +(define (rpc-transport:server-dat-get-api-uri vec) (vector-ref vec 2)) +(define (rpc-transport:server-dat-get-api-url vec) (vector-ref vec 3)) +(define (rpc-transport:server-dat-get-api-req vec) (vector-ref vec 4)) +(define (rpc-transport:server-dat-get-last-access vec) (vector-ref vec 5)) +(define (rpc-transport:server-dat-get-socket vec) (vector-ref vec 6)) + +(define (rpc-transport:server-dat-make-url vec) + (if (and (rpc-transport:server-dat-get-iface vec) + (rpc-transport:server-dat-get-port vec)) + (conc "http://" + (rpc-transport:server-dat-get-iface vec) + ":" + (rpc-transport:server-dat-get-port vec)) + #f)) + +(define (rpc-transport:server-dat-update-last-access vec) + (if (vector? vec) + (vector-set! vec 5 (current-seconds)) + (begin + (print-call-chain (current-error-port)) + (debug:print 0 "ERROR: call to rpc-transport:server-dat-update-last-access with non-vector!!")))) + +;; +;; connect +;; +(define (rpc-transport:client-connect iface port) + (let* ((api-url (conc "http://" iface ":" port "/api")) + (api-uri (uri-reference (conc "http://" iface ":" port "/api"))) + (api-req (make-request method: 'POST uri: api-uri)) + (server-dat (vector iface port api-uri api-url api-req (current-seconds)))) + server-dat)) + +;;; factored out of rpc-transport:keep-running +;; return #t if a bad sync occurred and a retry is warranted +;; return #f otherwise +;; side effect - cleans up and exits on exception. +(define (rpc-transport:sync-inmemdb-to-db tdbdat server-state run-id server-id bad-sync-count) + (if *inmemdb* + (let ((start-time (current-milliseconds)) + (sync-time #f) + (rem-time #f) + (sync-retry #f)) + ;; inmemdb is a dbstruct + (condition-case + (db:sync-touched *inmemdb* *run-id* force-sync: #t) + ((sync-failed)(cond + ((> bad-sync-count 10) ;; time to give up + (rpc-transport:server-shutdown server-id port)) + (else ;; (> bad-sync-count 0) ;; we've had a fail or two, delay and loop + (thread-sleep! 5) + (set! sync-retry #t)))) + ((exn) + (debug:print 0 "ERROR: error from sync code other than 'sync-failed. Attempting to gracefully shutdown the server") + (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " rpc-transport:keep-running crashed") + (exit))) + (if sync-retry + #t ; return true - retry + (begin + (set! sync-time (- (current-milliseconds) start-time)) + (set! rem-time (quotient (- 4000 sync-time) 1000)) + (debug:print 4 "SYNC: time= " sync-time ", rem-time=" rem-time) + + (if (and (<= rem-time 4) + (> rem-time 0)) + (thread-sleep! rem-time) + (thread-sleep! 4)) ;; fallback for if the math is changed ... + + ;; + ;; no *inmemdb* yet, set running after our first pass through and start the db + ;; + (if (eq? server-state 'available) + (let ((new-server-id (tasks:server-am-i-the-server? (db:delay-if-busy tdbdat) run-id))) ;; try to ensure no double registering of servers + (if (equal? new-server-id server-id) + (begin + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep") + (thread-sleep! 0.5) ;; give some margin for queries to complete before switching from file based access to server based access + (set! *inmemdb* (db:setup run-id)) + ;; force initialization + ;; (db:get-db *inmemdb* #t) + (db:get-db *inmemdb* run-id) + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running")) + (begin ;; gotta exit nicely + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "collision") + (rpc-transport:server-shutdown server-id port))))) + #f))) ; return #f - don't retry + #f)) ; return #f - don't retry since there is no inmemdb + + +;;; factored out of rpc-transport:keep-running +(define (rpc-transport:get-server-info tdbdat server-start-time server-id run-id) + (let loop ((start-time (current-seconds)) + (changed #t) + (last-sdat "not this")) + (let ((sdat #f)) + (thread-sleep! 0.01) + (debug:print-info 0 "Waiting for server alive signature") + (mutex-lock! *heartbeat-mutex*) + (set! sdat *server-info*) + (mutex-unlock! *heartbeat-mutex*) + (if (and sdat + (not changed) + (> (- (current-seconds) start-time) 2)) + sdat + (begin + (debug:print-info 0 "Still waiting, last-sdat=" last-sdat) + (sleep 4) + (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes + (begin + (debug:print 0 "ERROR: transport appears to have died, exiting server " server-id " for run " run-id) + (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature") + (exit)) + (loop start-time + (equal? sdat last-sdat) + sdat))))))) + +;; run rpc-transport:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (rpc-transport:keep-running server-id run-id) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (debug:print-info 0 "Starting the sync-back, keep alive thread in server for run-id=" run-id) + (let* ((tdbdat (tasks:open-db)) + (server-start-time (current-seconds)) + (server-info (rpc-transport:get-server-info tdbdat server-start-time server-id run-id)) + (iface (car server-info)) + (port (cadr server-info)) + (last-access 0) + (server-timeout (server:get-timeout))) + + (let loop ((count 0) + (server-state 'available) + (bad-sync-count 0)) + + ;; Use this opportunity to sync the inmemdb to db + (let ((sync-retry (rpc-transport:sync-inmemdb-to-db tdbdat server-state run-id server-id bad-sync-count))) + (if sync-retry + (loop count server-state (+ bad-sync-count 1)))) + + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1) 'running bad-sync-count)) + + ;; Check that iface and port have not changed (can happen if server port collides) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *server-info*) + (mutex-unlock! *heartbeat-mutex*) + + (if (or (not (equal? sdat (list iface port))) + (not server-id)) + (begin + (debug:print-info 0 "interface changed, refreshing iface and port info") + (set! iface (car sdat)) + (set! port (cadr sdat)))) + + ;; Transfer *last-db-access* to last-access to use in checking that we are still alive + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + + ;; (debug:print 11 "last-access=" last-access ", server-timeout=" server-timeout) + ;; + ;; no_traffic, no running tests, if server 0, no running servers + ;; + ;; (let ((wait-on-running (configf:lookup *configdat* "server" b"wait-on-running"))) ;; wait on running tasks (if not true then exit on time out) + ;; + (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600)) + (adjusted-timeout (if (> hrs-since-start 1) + (- server-timeout (inexact->exact (round (* hrs-since-start 60)))) ;; subtract 60 seconds per hour + server-timeout))) + (if (common:low-noise-print 120 "server timeout") + (debug:print-info 0 "Adjusted server timeout: " adjusted-timeout)) + (if (and *server-run* + (> (+ last-access server-timeout) + (current-seconds))) + (begin + (if (common:low-noise-print 120 "server continuing") + (debug:print-info 0 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))) + ;; + ;; Consider implementing some smarts here to re-insert the record or kill self is + ;; the db indicates so + ;; + ;; (if (tasks:server-am-i-the-server? tdb run-id) + ;; (tasks:server-set-state! tdb server-id "running")) + ;; + (loop 0 server-state bad-sync-count)) + (rpc-transport:server-shutdown server-id port)))))) + +(define (rpc-transport:server-shutdown server-id port) + (let ((tdbdat (tasks:open-db))) + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (if *inmemdb* (db:sync-touched *inmemdb* *run-id* force-sync: #t)) + ;; + ;; start_shutdown + ;; + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "shutting-down") + (portlogger:open-run-close portlogger:set-port port "released") + (thread-sleep! 5) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Number of cached writes " *number-of-writes*) + (debug:print-info 0 "Average cached write time " + (if (eq? *number-of-writes* 0) + "n/a (no writes)" + (/ *writes-total-delay* + *number-of-writes*)) + " ms") + (debug:print-info 0 "Number non-cached queries " *number-non-write-queries*) + (debug:print-info 0 "Average non-cached time " + (if (eq? *number-non-write-queries* 0) + "n/a (no queries)" + (/ *total-non-write-delay* + *number-non-write-queries*)) + " ms") + (debug:print-info 0 "Server shutdown complete. Exiting") + (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " rpc-transport:keep-running complete") + (exit))) + +;; all routes though here end in exit ... +;; +;; start_server? +;; +(define (rpc-transport:launch run-id) + (let* ((tdbdat (tasks:open-db))) + (set! *run-id* run-id) + (if (args:get-arg "-daemonize") + (begin + (daemon:ize) + (if *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it + (begin + (current-error-port *alt-log-file*) + (current-output-port *alt-log-file*))))) + (if (server:check-if-running run-id) + (begin + (debug:print 0 "INFO: Server for run-id " run-id " already running") + (exit 0))) + (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)) + (remtries 4)) + (if (not server-id) + (if (> remtries 0) + (begin + (thread-sleep! 2) + (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id) + (- remtries 1))) + (begin + ;; since we didn't get the server lock we are going to clean up and bail out + (debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") + (tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " rpc-transport:launch") + )) + (let* ((th2 (make-thread (lambda () + (debug:print-info 0 "Server run thread started") + (rpc-transport:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-") + run-id + server-id)) "Server run")) + (th3 (make-thread (lambda () + (debug:print-info 0 "Server monitor thread started") + (rpc-transport:keep-running server-id run-id)) + "Keep running"))) + (thread-start! th2) + (thread-sleep! 0.25) ;; give the server time to settle before starting the keep-running monitor. + (thread-start! th3) + (set! *didsomething* #t) + (thread-join! th2) + (exit)))))) + +(define (http:ping run-id host-port) + (let* ((server-dat (rpc-transport:client-connect (car host-port)(cadr host-port))) + (login-res (rmt:login-no-auto-client-setup server-dat run-id))) + (if (and (list? login-res) + (car login-res)) + (begin + (print "LOGIN_OK") + (exit 0)) + (begin + (print "LOGIN_FAILED") + (exit 1))))) + +(define (rpc-transport:server-signal-handler signum) + (signal-mask! signum) + (handle-exceptions + exn + (debug:print " ... exiting ...") + (let ((th1 (make-thread (lambda () + (thread-sleep! 1)) + "eat response")) + (th2 (make-thread (lambda () + (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") + (thread-sleep! 3) ;; give the flush three seconds to do it's stuff + (debug:print 0 " Done.") + (exit 4)) + "exit on ^C timer"))) + (thread-start! th2) + (thread-start! th1) + (thread-join! th2)))) + +;;====================================================================== +;; web pages +;;====================================================================== + +(define (rpc-transport:main-page) + (let ((linkpath (root-path))) + (conc "

" (pathname-strip-directory *toppath*) "

" + "" + "Run area: " *toppath* + "

Server Stats

" + (rpc-transport:stats-table) + "
" + (rpc-transport:runs linkpath) + "
" + (rpc-transport:run-stats) + "" + ))) + +(define (rpc-transport:stats-table) + (mutex-lock! *heartbeat-mutex*) + (let ((res + (conc "" + "" + "" + "" + "" + "" + "" + "
Max cached queries " *max-cache-size* "
Number of cached writes " *number-of-writes* "
Average cached write time " (if (eq? *number-of-writes* 0) + "n/a (no writes)" + (/ *writes-total-delay* + *number-of-writes*)) + " ms
Number non-cached queries " *number-non-write-queries* "
Average non-cached time " (if (eq? *number-non-write-queries* 0) + "n/a (no queries)" + (/ *total-non-write-delay* + *number-non-write-queries*)) + " ms
Last access" (seconds->time-string *last-db-access*) "
"))) + (mutex-unlock! *heartbeat-mutex*) + res)) + +(define (rpc-transport:runs linkpath) + (conc "

Runs

" + (string-intersperse + (let ((files (map pathname-strip-directory (glob (conc linkpath "/*"))))) + (map (lambda (p) + (conc "" p "
")) + files)) + " "))) + +(define (rpc-transport:run-stats) + (let ((stats (open-run-close db:get-running-stats #f))) + (conc "" + (string-intersperse + (map (lambda (stat) + (conc "")) + stats) + " ") + "
" (car stat) "" (cadr stat) "
"))) ADDED rpctest/Makefile Index: rpctest/Makefile ================================================================== --- /dev/null +++ rpctest/Makefile @@ -0,0 +1,11 @@ +client: rpctest-continuous-client test.db + rpctest-continuous-client client test.db + +server: rpctest-continuous-client test.db + rpctest-continuous-client server test.db + +rpctest-continuous-client: rpctest-continuous-client.scm + csc rpctest-continuous-client.scm + +test.db: + sqlite3 test.db "CREATE TABLE foo (id INTEGER PRIMARY KEY, var TEXT, val TEXT);" Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -48,11 +48,11 @@ ;; all routes though here end in exit ... ;; ;; start_server ;; (define (server:launch run-id) - (case *transport-type* + (case (server:get-transport) ((http)(http-transport:launch run-id)) ((nmsg)(nmsg-transport:launch run-id)) ((rpc) (rpc-transport:launch run-id)) (else (debug:print 0 "ERROR: unknown server type " *transport-type*)))) ;; (else (debug:print 0 "ERROR: No known transport set, transport=" transport ", using rpc") Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -197,10 +197,12 @@ -1 ;; interface ;; (conc (server:get-transport)) ;; transport (conc *transport-type*) ;; transport run-id )) + +(define (tasks:update-pause-seconds) 4) (define (tasks:num-in-available-state mdb run-id) (let ((res 0)) (sqlite3:for-each-row (lambda (num-in-queue) Index: tests/unittests/server.scm ================================================================== --- tests/unittests/server.scm +++ tests/unittests/server.scm @@ -53,10 +53,11 @@ (apply print (intersperse (vector->list dat) ", "))) server-dats) (print "No server")) (test #f test-one-rec (rmt:get-test-info-by-id run-id test-one-id)) (thread-sleep! 1) + (debug:print-info 0 "server-state="server-state " test-state="test-state) (case test-state ((start) (print "Trying to start server") (server:kind-run run-id) (loop 'server-started))