Index: client.scm ================================================================== --- client.scm +++ client.scm @@ -50,30 +50,18 @@ ;; 1. We are a test manager and we received *transport-type* and *runremote* via cmdline ;; 2. We are a run tests, list runs or other interactive process and we must figure out ;; *transport-type* and *runremote* from the monitor.db ;; ;; client:setup +;; +;; lookup_server, need to remove *runremote* stuff +;; (define (client:setup run-id #!key (remaining-tries 3)) - (if (not *toppath*) - (if (not (setup-for-run)) - (begin - (debug:print 0 "ERROR: failed to find megatest.config, exiting") - (exit)))) - ;; (push-directory *toppath*) ;; This is probably NOT needed - ;; clients get the sdb:qry proc created here - ;; (if (not sdb:qry) - ;; (begin - ;; (set! sdb:qry (make-sdb:qry (conc *toppath* "/db/strings.db"))) ;; we open the normalization helpers here - ;; (sdb:qry 'setup #f))) (let ((hostinfo (and run-id (hash-table-ref/default *runremote* run-id #f)))) - (debug:print-info 11 "for run-id=" run-id ", *transport-type* is " *transport-type*) (if hostinfo hostinfo ;; have hostinfo - just return it - (let* ((hostinfo (open-run-close tasks:get-server tasks:open-db run-id)) - (transport (if hostinfo - (string->symbol (tasks:hostinfo-get-transport hostinfo)) - 'http))) + (let* ((hostinfo (open-run-close tasks:get-server tasks:open-db run-id))) (if (not hostinfo) (if (> remaining-tries 0) (begin (server:ensure-running run-id) (client:setup run-id remaining-tries: (- remaining-tries 1))) @@ -81,26 +69,17 @@ (debug:print 0 "ERROR: Expected to be able to connect to a server by now. No server available for run-id = " run-id) (exit 1))) (begin (hash-table-set! *runremote* run-id hostinfo) (debug:print-info 11 "CLIENT SETUP, hostinfo=" hostinfo) - (debug:print-info 11 "Using transport type of " transport (if hostinfo (conc " to connect to " hostinfo) "")) - (case *transport-type* - ;; ((fs)(if (not *megatest-db*)(set! *megatest-db* (open-db)))) - ((http) - ;; this saves the hostinfo in the *runremote* hash and returns it - (http-transport:client-connect run-id - (tasks:hostinfo-get-interface hostinfo) - (tasks:hostinfo-get-port hostinfo))) - ((zmq) - (zmq-transport:client-connect (tasks:hostinfo-get-interface hostinfo) - (tasks:hostinfo-get-port hostinfo) - (tasks:hostinfo-get-pubport hostinfo))) - (else ;; default to fs - (debug:print 0 "ERROR: unrecognised transport type " *transport-type* " exiting now.") - (exit))))))))) - ;; (pop-directory))) + (client:start run-id hostinfo))))))) + +(define (client:start run-id server-info) + ;; this saves the server-info in the *runremote* hash and returns it + (http-transport:client-connect run-id + (tasks:hostinfo-get-interface server-info) + (tasks:hostinfo-get-port server-info))) ;; client:signal-handler (define (client:signal-handler signum) (handle-exceptions exn Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -1812,17 +1812,21 @@ sync set-verbosity killserver )) -(define (db:login dbstruct calling-path calling-version client-signature) - (if (and (equal? calling-path *toppath*) - (equal? megatest-version calling-version)) - (begin - (hash-table-set! *logged-in-clients* client-signature (current-seconds)) - '(#t "successful login")) ;; path matches - pass! Should vet the caller at this time ... - (list #f (conc "Login failed due to mismatch paths: " calling-path ", " *toppath*)))) +(define (db:login dbstruct calling-path calling-version run-id client-signature) + (cond + ((not (equal? calling-path *toppath*)) + (list #f "Login failed due to mismatch paths: " calling-path ", " *toppath*)) + ((not (equal? *run-id* run-id)) + (list #f "Login failed due to mismatch run-id: " run-id ", " *run-id*)) + ((not (equal? megatest-version calling-version)) + (list #f "Login failed due to mismatch megatest version: " calling-version ", " megatest-version)) + (else + (hash-table-set! *logged-in-clients* client-signature (current-seconds)) + '(#t "successful login")))) (define (db:general-call db stmtname params) (let ((query (let ((q (alist-ref (if (string? stmtname) (string->symbol stmtname) stmtname) Index: docs/manual/server.dot ================================================================== --- docs/manual/server.dot +++ docs/manual/server.dot @@ -46,12 +46,15 @@ remove_server_record -> set_available; set_available -> avail_delay [label="delay 3s"]; avail_delay -> "first_in_queue?"; "first_in_queue?" -> set_running [label=yes]; - set_running -> handle_requests; - "first_in_queue?" -> "server_running?" [label=no]; + set_running -> get_next_port -> handle_requests; + "first_in_queue?" -> "dead_entry_in_queue?" [label=no]; + "dead_entry_in_queue?" -> "server_running?" [label=no]; + "dead_entry_in_queue?" -> "remove_dead_entries" [label=yes]; + remove_dead_entries -> "server_running?"; handle_requests -> start_shutdown [label="no traffic"]; handle_requests -> shutdown_request; start_shutdown -> shutdown_delay; shutdown_request -> shutdown_delay; ADDED docs/results.pdf Index: docs/results.pdf ================================================================== --- /dev/null +++ docs/results.pdf cannot compute difference between binary files Index: http-transport.scm ================================================================== --- http-transport.scm +++ http-transport.scm @@ -60,33 +60,19 @@ (u8vector->list (if res res (hostname->ip hostname)))) "."))) (define (http-transport:run hostn run-id server-id) (debug:print 2 "Attempting to start the server ...") - (if (not *toppath*) - (if (not (setup-for-run)) - (begin - (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") - (exit)))) - (let* (;; (iface (if (string=? "-" hostn) - ;; #f ;; (get-host-name) - ;; hostn)) - (db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily + (let* ((db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily (hostname (get-host-name)) (ipaddrstr (let ((ipstr (if (string=? "-" hostn) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") (server:get-best-guess-address hostname) #f))) (if ipstr ipstr hostn))) ;; hostname))) - (start-port (if (and (args:get-arg "-port") - (string->number (args:get-arg "-port"))) - (string->number (args:get-arg "-port")) - (if (and (config-lookup *configdat* "server" "port") - (string->number (config-lookup *configdat* "server" "port"))) - (string->number (config-lookup *configdat* "server" "port")) - (+ 5000 (random 1001))))) - (link-tree-path (config-lookup *configdat* "setup" "linktree"))) + (start-port (open-run-close tasks:server-get-next-port tasks:open-db)) + (link-tree-path (config-lookup *configdat* "setup" "linktree"))) (set! db *inmemdb*) (root-path (if link-tree-path link-tree-path (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP! (handle-directory spiffy-directory-listing) @@ -154,10 +140,13 @@ (print-error-message exn) (if (< portnum 9000) (begin (debug:print 0 "WARNING: failed to start on portnum: " portnum ", trying next port") (thread-sleep! 0.1) + + ;; get_next_port goes here + (http-transport:try-start-server ipaddrstr (+ portnum 1) server-id)) (print "ERROR: Tried and tried but could not start the server"))) ;; any error in following steps will result in a retry (set! *server-info* (list ipaddrstr portnum)) (open-run-close tasks:server-set-interface-port @@ -244,13 +233,15 @@ (res #f)) (handle-exceptions exn (begin (print "ERROR IN http-transport:client-send-receive " ((condition-property-accessor 'exn 'message) exn)) - (thread-sleep! 2) (if (> numretries 0) - (http-transport:client-send-receive serverdat msg numretries: (- numretries 1)))) + (begin + (thread-sleep! 2) + (http-transport:client-send-receive serverdat msg numretries: (- numretries 1))) + #f)) (begin (debug:print-info 11 "fullurl=" fullurl "\n") ;; set up the http-client here (max-retry-attempts 5) ;; consider all requests indempotent @@ -304,17 +295,19 @@ (res #f)) (handle-exceptions exn (begin ;; TODO: Send this output to a log file so it isn't lost when running as daemon - (print "ERROR IN http-transport:client-send-receive " ((condition-property-accessor 'exn 'message) exn)) (if (> numretries 0) + ;; on the zeroeth retry do not print the error message - this allows the call to be used as a ping (no junk on output). (begin + (print "ERROR IN http-transport:client-send-receive " ((condition-property-accessor 'exn 'message) exn)) (if (> (random 100) 80)(server:ensure-running run-id)) ;; every so often try starting a server - (http-transport:client-api-send-receive run-id serverdat cmd params numretries: (- numretries 1))))) + (http-transport:client-api-send-receive run-id serverdat cmd params numretries: (- numretries 1))) + #f)) (begin - (debug:print-info 11 "fullurl=" fullurl "\n") + (debug:print-info 11 "fullurl=" fullurl ", cmd=" cmd ", params=" params ", run-id=" run-id "\n") ;; set up the http-client here (max-retry-attempts 5) ;; consider all requests indempotent (retry-request? (lambda (request) #t)) ;; (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10)) @@ -371,26 +364,28 @@ (thread-join! th1) (thread-terminate! th2) (debug:print-info 11 "got res=" res) res))))) +;; +;; connect +;; (define (http-transport:client-connect run-id iface port) - (let* ((login-res #f) - (uri-dat (make-request method: 'POST uri: (uri-reference (conc "http://" iface ":" port "/ctrl")))) + (let* ((uri-dat (make-request method: 'POST uri: (uri-reference (conc "http://" iface ":" port "/ctrl")))) (uri-api-dat (make-request method: 'POST uri: (uri-reference (conc "http://" iface ":" port "/api")))) - (serverdat (list iface port uri-dat uri-api-dat))) + (serverdat (list iface port uri-dat uri-api-dat)) + (login-res (rmt:login-no-auto-client-setup serverdat run-id))) (hash-table-set! *runremote* run-id serverdat) ;; may or may not be good ... - (set! login-res (rmt:login run-id)) (if (and (list? login-res) (car login-res)) (begin (debug:print-info 2 "Logged in and connected to " iface ":" port) (hash-table-set! *runremote* run-id serverdat) serverdat) (begin (debug:print-info 0 "ERROR: Failed to login or connect to " iface ":" port) - (exit 1))))) + #f)))) ;; run http-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (http-transport:keep-running server-id) @@ -421,10 +416,13 @@ (if (and (string? tmo) (string->number tmo)) (* 60 60 (string->number tmo)) ;; default to three days (* 3 24 60 60))))) + ;; + ;; set_running + ;; (tasks:server-set-state! tdb server-id "running") (let loop ((count 0)) ;; Use this opportunity to sync the inmemdb to db (let ((start-time (current-milliseconds)) (sync-time #f) @@ -453,22 +451,19 @@ (begin (debug:print-info 0 "interface changed, refreshing iface and port info") (set! iface (car sdat)) (set! port (cadr sdat)))) - ;; NOTE: Get rid of this mechanism! It really is not needed... - ;; (open-run-close tasks:server-update-heartbeat tasks:open-db spid) - (tasks:server-update-heartbeat tdb server-id) - - ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access - ;; Transfer *last-db-access* to last-access to use in checking that we are still alive (mutex-lock! *heartbeat-mutex*) (set! last-access *last-db-access*) (mutex-unlock! *heartbeat-mutex*) ;; (debug:print 11 "last-access=" last-access ", server-timeout=" server-timeout) + ;; + ;; no_traffic + ;; (if (and *server-run* (> (+ last-access server-timeout) (current-seconds))) (begin (debug:print-info 0 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) @@ -476,10 +471,13 @@ (begin (debug:print-info 0 "Starting to shutdown the server.") ;; need to delete only *my* server entry (future use) (set! *time-to-exit* #t) (if *inmemdb* (db:sync-touched *inmemdb* force-sync: #t)) + ;; + ;; start_shutdown + ;; ( tasks:server-set-state! tdb server-id "shutting-down") (thread-sleep! 5) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Number of cached writes " *number-of-writes*) (debug:print-info 0 "Average cached write time " @@ -498,45 +496,40 @@ (debug:print-info 0 "Server shutdown complete. Exiting") (tasks:server-delete-record! tdb server-id) (exit)))))) ;; all routes though here end in exit ... +;; +;; start_server? +;; (define (http-transport:launch run-id) (set! *run-id* run-id) - (if (not *toppath*) - (if (not (setup-for-run)) - (begin - (debug:print 0 "ERROR: cannot find megatest.config, exiting") - (exit)))) - (debug:print-info 2 "Starting the standalone server") (if (args:get-arg "-daemonize") (daemon:ize)) (let ((server-id (open-run-close tasks:server-lock-slot tasks:open-db run-id))) (if (not server-id) (begin + ;; since we didn't get the server lock we are going to clean up and bail out (debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") (open-run-close tasks:server-delete-records-for-this-pid tasks:open-db)) - (if *toppath* - (let* ((th2 (make-thread (lambda () - (http-transport:run - (if (args:get-arg "-server") - (args:get-arg "-server") - "-") - run-id - server-id)) "Server run")) - (th3 (make-thread (lambda () - (http-transport:keep-running server-id)) - "Keep running"))) - ;; Database connection - (set! *inmemdb* (db:setup run-id)) - (thread-start! th2) - (thread-start! th3) - (set! *didsomething* #t) - (thread-join! th2)) - (debug:print 0 "ERROR: Failed to setup for megatest"))) - ;; (sdb:qry 'finalize) - (exit))) + (let* ((th2 (make-thread (lambda () + (http-transport:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-") + run-id + server-id)) "Server run")) + (th3 (make-thread (lambda () + (http-transport:keep-running server-id)) + "Keep running"))) + ;; Database connection + (set! *inmemdb* (db:setup run-id)) + (thread-start! th2) + (thread-start! th3) + (set! *didsomething* #t) + (thread-join! th2) + (exit))))) (define (http-transport:server-signal-handler signum) (handle-exceptions exn (debug:print " ... exiting ...") Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -121,11 +121,10 @@ -env2file fname : write the environment to fname.csh and fname.sh -setvars VAR1=val1,VAR2=val2 : Add environment variables to a run NB// these are overwritten by values set in config files. -server -|hostname : start the server (reduces contention on megatest.db), use - to automatically figure out hostname - -transport http|fs : use http or direct access for transport (default is http) -daemonize : fork into background and disconnect from stdin/out -list-servers : list the servers -stop-server id : stop server specified by id (see output of -list-servers), use 0 to kill all -repl : start a repl (useful for extending megatest) @@ -188,11 +187,10 @@ ":expected" ":tol" ":units" ;; misc "-server" - "-transport" "-stop-server" "-port" "-extract-ods" "-pathmod" "-env2file" @@ -290,16 +288,10 @@ (printf "Sending signal/term to ~A\n" pid) (process-signal pid signal/term)))))) (process:children #f)) (original-exit exit-code))))) -;; Force default transport to fs -;; (if ;; (and (or (args:get-arg "-list-targets") -;; ;; (args:get-arg "-list-db-targets")) -;; (not (args:get-arg "-transport")) -;; (hash-table-set! args:arg-hash "-transport" "fs")) - ;;====================================================================== ;; Misc setup stuff ;;====================================================================== (debug:setup) @@ -344,17 +336,14 @@ (if (args:get-arg "-server") ;; Server? Start up here. ;; (let ((tl (setup-for-run)) - (transport (or (configf:lookup *configdat* "setup" "transport") - (args:get-arg "-transport" "http"))) (run-id (and (args:get-arg "-run-id") (string->number (args:get-arg "-run-id"))))) - (debug:print 2 "Launching server using transport " transport " for run-id=" run-id) (if run-id - (server:launch (string->symbol transport) run-id) + (server:launch run-id) (debug:print 0 "ERROR: server requires run-id be specified with -run-id"))) ;; Not a server? This section will decide how to communicate ;; ;; Setup client for all expect listed here @@ -371,43 +360,22 @@ ;; (set! *fdb* (filedb:open-db (conc *toppath* "/db/paths.db"))) ;; if not list or kill then start a client (if appropriate) (if (or (args-defined? "-h" "-version" "-gen-megatest-area" "-gen-megatest-test") (eq? (length (hash-table-keys args:arg-hash)) 0)) (debug:print-info 1 "Server connection not needed") - ;; ok, so lets connect to the server - (let* ((transport-from-config (configf:lookup *configdat* "setup" "transport")) - (transport-from-cmdln (args:get-arg "-transport")) - (transport-from-cmdinfo (if (getenv "MT_CMDINFO") - (let ((res (assoc 'transport - (read - (open-input-string - (base64:base64-decode - (getenv "MT_CMDINFO"))))))) - (if res (cadr res) #f)) - #f)) - (chosen-transport (string->symbol (or transport-from-cmdln - transport-from-cmdinfo - transport-from-config - "fs")))) - (debug:print 2 "chosen-transport: " chosen-transport " have; config=" transport-from-config ", cmdln=" transport-from-cmdln ", cmdinfo=" transport-from-cmdinfo) - (case chosen-transport - ((http) - (set! *transport-type 'http) - ;; if we have a run-id (why would we?) start the server for that run. - ;; otherwise it is up to other calls to start the server(s) dynamically - (if run-id - (begin - (server:ensure-running run-id) - (client:launch run-id)) - (begin - ;; without run-id we'll start a server for "0" - (server:ensure-running 0) - (client:launch 0)))) - (else ;; (fs) - (debug:print 0 "ERROR: Should NOT be getting here! fs transport is no longer supported") - (set! *transport-type* 'fs) - (set! *megatest-db* (make-dbr:dbstruct path: *toppath* local: #t)))))))))) + (begin + (if run-id + (begin + (server:ensure-running run-id) + (client:launch run-id)) + (begin + ;; without run-id we'll start a server for "0" + (server:ensure-running 0) + (client:launch 0))))))))) + +;; MAY STILL NEED THIS +;; (set! *megatest-db* (make-dbr:dbstruct path: *toppath* local: #t)))))))))) (if (or (args:get-arg "-list-servers") (args:get-arg "-stop-server")) (let ((tl (setup-for-run))) (if tl @@ -761,11 +729,10 @@ ;; if we are in a test use the MT_CMDINFO data (if (getenv "MT_CMDINFO") (let* ((startingdir (current-directory)) (cmdinfo (read (open-input-string (base64:base64-decode (getenv "MT_CMDINFO"))))) ;; (runremote (assoc/default 'runremote cmdinfo)) - (transport (assoc/default 'transport cmdinfo)) (testpath (assoc/default 'testpath cmdinfo)) (test-name (assoc/default 'test-name cmdinfo)) (runscript (assoc/default 'runscript cmdinfo)) (db-host (assoc/default 'db-host cmdinfo)) (run-id (assoc/default 'run-id cmdinfo)) @@ -774,11 +741,10 @@ (status (args:get-arg ":status")) (target (args:get-arg "-target")) (toppath (assoc/default 'toppath cmdinfo))) (change-directory toppath) ;; (set! *runremote* runremote) - ;; (set! *transport-type* (string->symbol transport)) (if (not target) (begin (debug:print 0 "ERROR: -target is required.") (exit 1))) (if (not (setup-for-run)) @@ -812,11 +778,10 @@ ;; if we are in a test use the MT_CMDINFO data (if (getenv "MT_CMDINFO") (let* ((startingdir (current-directory)) (cmdinfo (read (open-input-string (base64:base64-decode (getenv "MT_CMDINFO"))))) ;; (runremote (assoc/default 'runremote cmdinfo)) - (transport (assoc/default 'transport cmdinfo)) (testpath (assoc/default 'testpath cmdinfo)) (test-name (assoc/default 'test-name cmdinfo)) (runscript (assoc/default 'runscript cmdinfo)) (db-host (assoc/default 'db-host cmdinfo)) (run-id (assoc/default 'run-id cmdinfo)) @@ -824,11 +789,10 @@ (state (args:get-arg ":state")) (status (args:get-arg ":status")) (target (args:get-arg "-target"))) (change-directory testpath) ;; (set! *runremote* runremote) - ;; (set! *transport-type* (string->symbol transport)) (if (not target) (begin (debug:print 0 "ERROR: -target is required.") (exit 1))) (if (not (setup-for-run)) @@ -894,11 +858,10 @@ (begin (debug:print 0 "ERROR: MT_CMDINFO env var not set, -step must be called *inside* a megatest invoked environment!") (exit 5)) (let* ((cmdinfo (read (open-input-string (base64:base64-decode (getenv "MT_CMDINFO"))))) ;; (runremote (assoc/default 'runremote cmdinfo)) - (transport (assoc/default 'transport cmdinfo)) (testpath (assoc/default 'testpath cmdinfo)) (test-name (assoc/default 'test-name cmdinfo)) (runscript (assoc/default 'runscript cmdinfo)) (db-host (assoc/default 'db-host cmdinfo)) (run-id (assoc/default 'run-id cmdinfo)) @@ -906,12 +869,10 @@ (itemdat (assoc/default 'itemdat cmdinfo)) (work-area (assoc/default 'work-area cmdinfo)) (db #f)) (change-directory testpath) ;; (set! *runremote* runremote) - ;; The transport is handled earlier in the loading process of megatest. - ;; (set! *transport-type* (string->symbol transport)) (if (not (setup-for-run)) (begin (debug:print 0 "Failed to setup, exiting") (exit 1))) (if (and state status) @@ -945,11 +906,10 @@ (debug:print 0 "ERROR: MT_CMDINFO env var not set, commands -test-status, -runstep and -setlog must be called *inside* a megatest environment!") (exit 5)) (let* ((startingdir (current-directory)) (cmdinfo (read (open-input-string (base64:base64-decode (getenv "MT_CMDINFO"))))) ;; (runremote (assoc/default 'runremote cmdinfo)) - (transport (assoc/default 'transport cmdinfo)) (testpath (assoc/default 'testpath cmdinfo)) (test-name (assoc/default 'test-name cmdinfo)) (runscript (assoc/default 'runscript cmdinfo)) (db-host (assoc/default 'db-host cmdinfo)) (run-id (assoc/default 'run-id cmdinfo)) @@ -958,11 +918,10 @@ (work-area (assoc/default 'work-area cmdinfo)) (db #f) ;; (open-db)) (state (args:get-arg ":state")) (status (args:get-arg ":status"))) ;; (set! *runremote* runremote) - ;; (set! *transport-type* (string->symbol transport)) (if (not (setup-for-run)) (begin (debug:print 0 "Failed to setup, exiting") (exit 1))) Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -36,28 +36,27 @@ ;; cmd is a symbol ;; vars is a json string encoding the parameters for the call ;; (define (rmt:send-receive cmd run-id params) - (case *transport-type* - ((fs-aint-here) - (debug:print 0 "ERROR: Not yet (re)supported") - (exit 1)) - ((fs http) - ;; if run-id is #f send the request to run-id = 0 server. This will be for main.db - ;; - (let* ((connection-info (client:setup (if run-id run-id 0))) - (jparams (db:obj->string params)) ;; (rmt:dat->json-str params)) - (res (http-transport:client-api-send-receive run-id connection-info cmd jparams))) - (if res - (db:string->obj res) ;; (rmt:json-str->dat res) - (begin - (debug:print 0 "ERROR: Bad value from http-transport:client-api-send-receive " res) - #f)))) - (else - (debug:print 0 "ERROR: Transport " *transport-type* " not yet (re)supported") - (exit 1)))) + (let* ((connection-info (client:setup (if run-id run-id 0))) + (jparams (db:obj->string params)) ;; (rmt:dat->json-str params)) + (res (http-transport:client-api-send-receive run-id connection-info cmd jparams))) + (if res + (db:string->obj res) ;; (rmt:json-str->dat res) + (begin + (debug:print 0 "ERROR: Bad value from http-transport:client-api-send-receive " res) + #f)))) + +(define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params) + (let* ((jparams (db:obj->string params)) ;; (rmt:dat->json-str params)) + (res (http-transport:client-api-send-receive run-id connection-info cmd jparams numretries: 0))) + (if res + (db:string->obj res) ;; (rmt:json-str->dat res) + (begin + (debug:print 0 "ERROR: Bad value from http-transport:client-api-send-receive " res) + #f)))) ;; Wrap json library for strings (why the ports crap in the first place?) (define (rmt:dat->json-str dat) (with-output-to-string (lambda () @@ -77,12 +76,17 @@ ;;====================================================================== ;; M I S C ;;====================================================================== (define (rmt:login run-id) - (rmt:send-receive 'login run-id (list *toppath* megatest-version *my-client-signature*))) + (rmt:send-receive 'login run-id (list *toppath* megatest-version run-id *my-client-signature*))) +;; This login does no retries under the hood - it acts a bit like a ping. +;; +(define (rmt:login-no-auto-client-setup connection-info run-id) + (rmt:send-receive-no-auto-client-setup connection-info 'login run-id (list *toppath* megatest-version run-id *my-client-signature*))) + (define (rmt:kill-server run-id) (rmt:send-receive 'kill-server run-id (list run-id))) ;; hand off a call to one of the db:queries statements ;; added run-id to make looking up the correct db possible Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -42,25 +42,18 @@ ;; Call this to start the actual server ;; ;; all routes though here end in exit ... -(define (server:launch transport run-id) - (if (not *toppath*) - (if (not (setup-for-run)) - (begin - (debug:print 0 "ERROR: cannot find megatest.config, exiting") - (exit)))) - (debug:print-info 2 "Starting server using " transport " transport") - (set! *transport-type* transport) - (case transport - ;; ((fs) (exit)) ;; there is no "fs" server transport - ((fs http) (http-transport:launch run-id)) - ((zmq) (zmq-transport:launch run-id)) - (else - (debug:print "WARNING: unrecognised transport " transport) - (exit)))) +;; +;; start_server +;; +(define (server:launch run-id) + (if (server:check-if-running run-id) + ;; a server is already running + (exit) + (http-transport:launch run-id))) ;;====================================================================== ;; Q U E U E M A N A G E M E N T ;;====================================================================== @@ -103,37 +96,21 @@ ;; When using zmq this would send the message back (two step process) ;; with spiffy or rpc this simply returns the return data to be returned ;; (define (server:reply return-addr query-sig success/fail result) - (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) - ;; (send-message pubsock target send-more: #t) - ;; (send-message pubsock - (case *transport-type* - ((fs) result) - ((http)(db:obj->string (vector success/fail query-sig result))) - ((zmq) - (let ((pub-socket (vector-ref *runremote* 1))) - (send-message pub-socket return-addr send-more: #t) - (send-message pub-socket (db:obj->string (vector success/fail query-sig result))))) - (else - (debug:print 0 "ERROR: unrecognised transport type: " *transport-type*) - result))) + (db:obj->string (vector success/fail query-sig result))) (define (server:ensure-running run-id) - (let loop ((servers (open-run-close tasks:get-server tasks:open-db run-id)) + (let loop ((server (open-run-close tasks:get-server tasks:open-db run-id)) (trycount 0)) - (if (or (not servers) - (null? servers)) + (if (not server) (begin (if (even? trycount) ;; just do the server start every other time through this loop (every 8 seconds) (let ((cmdln (conc (if (getenv "MT_MEGATEST") (getenv "MT_MEGATEST") "megatest") " -server - -run-id " run-id " &> " *toppath* "/db/" run-id ".log &"))) (debug:print 0 "INFO: Starting server (" cmdln ") as none running ...") - ;; (server:launch (string->symbol (args:get-arg "-transport" "http")))) - ;; no need to use fork, no need to do the list-servers trick. Just start the damn server, it will exit on it's own - ;; if there is an existing server (push-directory *toppath*) (system cmdln) (pop-directory) (thread-sleep! 3) ;; (process-run (car (argv)) (list "-server" "-" "-daemonize" "-transport" (args:get-arg "-transport" "http"))) @@ -143,7 +120,24 @@ (thread-sleep! 4))) (if (< trycount 10) (loop (open-run-close tasks:get-server tasks:open-db run-id) (+ trycount 1)) (debug:print 0 "WARNING: Couldn't start or find a server."))) - (debug:print 2 "INFO: Server(s) running " servers) - ))) + (debug:print 2 "INFO: Server(s) running " server)))) + +(define (server:check-if-running run-id) + (let loop ((server (open-run-close tasks:get-server tasks:open-db run-id)) + (trycount 0)) + (if server + ;; note: client:start will set *runremote*. this needs to be changed + ;; also, client:start will login to the server, also need to change that. + ;; + ;; client:start returns #t if login was successful. + ;; + (let ((res (client:start run-id server))) + ;; if the server didn't respond we must remove the record + (if res + res + (begin + (open-run-close tasks:server-force-clean-running-records-for-run-id tasks:open-db run-id) + res))) + #f))) Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -94,20 +94,21 @@ (define (tasks:server-lock-slot mdb run-id) (let ((res '()) (best #f)) (tasks:server-clean-out-old-records-for-run-id mdb run-id) - (tasks:server-set-available mdb run-id) + (if (tasks:less-than-two-available mdb run-id) + (tasks:server-set-available mdb run-id)) (thread-sleep! 2) ;; Try removing this. It may not be needed. (tasks:server-am-i-the-server? mdb run-id))) ;; register that this server may come online (first to register goes though with the process) (define (tasks:server-set-available mdb run-id) (sqlite3:execute mdb "INSERT INTO servers (pid,hostname,port,pubport,start_time, priority,state,mt_version,heartbeat, interface,transport,run_id) - VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?, ?, ?);" + VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?,-1,?, ?, ?);" (current-process-id) ;; pid (get-host-name) ;; hostname -1 ;; port -1 ;; pubport (random 1000) ;; priority (used a tiebreaker on get-available) @@ -115,16 +116,28 @@ (common:version-signature) ;; mt_version -1 ;; interface "http" ;; transport run-id )) + +(define (tasks:less-than-two-available mdb run-id) + (let ((res 0)) + (sqlite3:for-each-row + (lambda (num-in-queue) + (set! res num-in-queue)) + mdb + "SELECT count(id) FROM servers WHERE run_id=?;" + run-id) + (< res 3))) (define (tasks:server-clean-out-old-records-for-run-id mdb run-id) (sqlite3:execute mdb "DELETE FROM servers WHERE state in ('available','shutting-down') AND (strftime('%s','now') - start_time) > 30 AND run_id=?;" run-id) - (sqlite3:execute mdb "DELETE FROM servers WHERE state='running' AND (strftime('%s','now') - heartbeat) > 10 AND run_id=?;" run-id) - ) - + (if (server:check-if-running run-id) + (sqlite3:execute mdb "DELETE FROM servers WHERE run_id=?;" run-id))) + +(define (tasks:server-force-clean-running-records-for-run-id mdb run-id) + (sqlite3:execute mdb "DELETE FROM servers WHERE state = 'running' AND run_id=?;" run-id)) (define (tasks:server-set-state! mdb server-id state) (sqlite3:execute mdb "UPDATE servers SET state=? WHERE id=?;" state server-id)) (define (tasks:server-delete-record! mdb server-id) @@ -134,10 +147,33 @@ (sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND pid=?;" (get-host-name) (current-process-id))) (define (tasks:server-set-interface-port mdb server-id interface port) (sqlite3:execute mdb "UPDATE servers SET interface=?,port=? WHERE id=?;" interface port server-id)) +(define (tasks:server-get-next-port mdb) + (let ((res #f) + (port-param (if (and (args:get-arg "-port") + (string->number (args:get-arg "-port"))) + (string->number (args:get-arg "-port")) + #f)) + (config-port (if (and (config-lookup *configdat* "server" "port") + (string->number (config-lookup *configdat* "server" "port"))) + (string->number (config-lookup *configdat* "server" "port")) + #f))) + (sqlite3:for-each-row + (lambda (port) + (set! res (+ port 1))) ;; set to next + mdb + "SELECT max(port) FROM servers;") + (cond + ((and port-param res) (if (> res port-param) res port-param)) + (port-param port-param) + ((and config-port res) (if (> res config-port) res config-port)) + (config-port config-port) + ((and res (> res 8080)) res) + (else (+ 5000 (random 1001)))))) + (define (tasks:server-am-i-the-server? mdb run-id) (let* ((all (tasks:server-get-servers-vying-for-run-id mdb run-id)) (first (if (null? all) (begin (debug:print 0 "ERROR: no servers listed, should be at least one by now.") (sqlite3:finalize! mdb) @@ -168,67 +204,34 @@ mdb (conc "SELECT " selstr " FROM servers WHERE run_id=? ORDER BY start_time DESC;") run-id) (vector header res))) -(define (tasks:server-update-heartbeat mdb server-id) - (debug:print-info 1 "Heart beat update of server id=" server-id) - (handle-exceptions - exn - (begin - (debug:print 0 "WARNING: probable timeout on monitor.db access") - (thread-sleep! 1) - (tasks:server-update-heartbeat mdb server-id)) - (sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id))) - -;; alive servers keep the heartbeat field upto date with seconds every 6 or so seconds -(define (tasks:server-alive? mdb server-id #!key (iface #f)(hostname #f)(port #f)(pid #f)) - (let* ((server-id (if server-id - server-id - (tasks:server-get-server-id mdb hostname iface port pid))) - (heartbeat-delta 99e9)) - (sqlite3:for-each-row - (lambda (delta) - (set! heartbeat-delta delta)) - mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id) - (< heartbeat-delta 10))) - (define (tasks:get-server mdb run-id) (let ((res #f) (best #f)) (sqlite3:for-each-row (lambda (id interface port pubport transport pid hostname) (set! res (vector id interface port pubport transport pid hostname))) mdb + ;; removed: + ;; strftime('%s','now')-heartbeat < 10 AND mt_version = ? "SELECT id,interface,port,pubport,transport,pid,hostname FROM servers - WHERE strftime('%s','now')-heartbeat < 10 - AND mt_version=? AND run_id=? AND state='running' - ORDER BY start_time DESC LIMIT 1;" (common:version-signature) run-id) + WHERE run_id=? AND state='running' + ORDER BY start_time DESC LIMIT 1;" run-id) ;; (common:version-signature) run-id) res)) -;; (define (tasks:get-all-servers mdb) -;; (let ((res '())) -;; (sqlite3:for-each-row -;; (lambda (id interface port pubport transport pid hostname) -;; (set! res (cons (vector id interface port pubport transport pid hostname) res))) -;; mdb -;; "SELECT id,interface,port,pubport,transport,pid,hostname FROM servers -;; WHERE strftime('%s','now')-heartbeat < 10 -;; AND mt_version=? -;; ORDER BY start_time DESC;" (common:version-signature)) -;; res)) - (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport) (set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport) res))) mdb "SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport FROM servers ORDER BY start_time DESC;") res)) -(define (tasks:kill-server status hostname port pid transport) +(define (tasks:kill-server status hostname port pid) (debug:print-info 1 "Removing defunct server record for " hostname ":" port) (if port (open-run-close tasks:server-deregister tasks:open-db hostname port: port) (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid)) (if status ;; #t means alive @@ -245,13 +248,11 @@ ) ;; local machine, send sig term (begin ;;(debug:print-info 1 "Stopping remote servers not yet supported.")))) (debug:print-info 1 "Telling alive server on " hostname ":" port " to commit servercide") (let ((serverdat (list hostname port))) - (case (if (string? transport) (string->symbol transport) transport) - ((http)(http-transport:client-connect hostname port)) - (else (debug:print "ERROR: remote stopping servers of type " transport " not supported yet"))) + (http-transport:client-connect hostname port) (cdb:kill-server serverdat pid))))) ;; remote machine, try telling server to commit suicide (begin (if status (if (equal? hostname (get-host-name)) (begin ADDED utils/plot-code.scm Index: utils/plot-code.scm ================================================================== --- /dev/null +++ utils/plot-code.scm @@ -0,0 +1,148 @@ +#!/mfs/pkgs/chicken/4.8.0.5/bin/csi -nbq + +(use regex srfi-69 srfi-13) + +(define targs #f) +(define files (cddddr (argv))) + +(let ((targdat (cadddr (argv)))) + (if (equal? targdat "-") + (set! targs files) + (set! targs (string-split targdat ",")))) + +(define filedat-defns (make-hash-table)) +(define filedat-usages (make-hash-table)) + +(define defn-rx (regexp "^\\s*\\(define\\s+\\(([^\\s\\)]+).*")) +(define all-regexs (make-hash-table)) + +(define all-fns '()) + +(define (print-err . data) + (with-output-to-port (current-error-port) + (lambda () + (apply print data)))) + +(print-err "Making graph for files: " (string-intersperse targs ", ")) +(print-err "Looking at files: " (string-intersperse files ", ")) + +;; Gather the functions +;; +(for-each + (lambda (fname) + (print-err "Processing file " fname) + (with-input-from-file fname + (lambda () + (let loop ((inl (read-line))) + (if (not (eof-object? inl)) + (let ((match (string-match defn-rx inl))) + (if match + (let ((fnname (cadr match))) + ;; (print " " fnname) + (set! all-fns (cons fnname all-fns)) + (hash-table-set! + filedat-defns + fname + (cons fnname (hash-table-ref/default filedat-defns fname '()))) + )) + (loop (read-line)))))))) + files) + +;; fill up the regex hash +(print-err "Make the huge regex hash") +(for-each + (lambda (fnname) + (hash-table-set! all-regexs fnname (regexp (conc "^(|.*[^a-zA-Z]+)" fnname "([^a-zA-Z]+|)$")))) + (cons "toplevel" all-fns)) + +(define breadcrumbs (make-hash-table)) + +(define (have-function inl) + (let loop ((hed (car all-fns)) + (tal (cdr all-fns))) + (if (string-contains inl hed) + #t + (if (null? tal) + #f + (loop (car tal)(cdr tal)))))) + +(define (look-for-all-calls inl fnname) + (if (have-function inl) ;; (string-search have-function-rx inl) + (let loop ((hed (car all-fns)) + (tal (cdr all-fns)) + (res '())) + (let ((match (string-match (hash-table-ref all-regexs hed) inl))) + (if match + (let ((newres (cons hed res))) + (if (null? tal) + newres + (loop (car tal) + (cdr tal) + newres))) + (if (null? tal) + res + (loop (car tal)(cdr tal) res))))) + '())) + +;; Gather the usages +(print "digraph G {") +(define curr-cluster-num 0) +(define function-calls '()) + +(for-each + (lambda (fname) + (let ((last-func #f)) + (print-err "Processing file " fname) + (print "subgraph cluster_" curr-cluster-num " {") + (set! curr-cluster-num (+ curr-cluster-num 1)) + (with-input-from-file fname + (lambda () + (with-output-to-port (current-error-port) + (lambda () + (print "Analyzing file " fname))) + (print "label=\"" fname "\";") + (let loop ((inl (read-line)) + (fnname "toplevel") + (allcalls '())) + (if (eof-object? inl) + (begin + (set! function-calls (cons (list fnname allcalls) function-calls)) + (for-each + (lambda (call-name) + (hash-table-set! breadcrumbs call-name #t)) + allcalls) + (print-err "function: " fnname " allcalls: " allcalls)) + (let ((match (string-match defn-rx inl))) + (if match + (let ((func-name (cadr match))) + (if last-func + (print "\"" func-name "\" -> \"" last-func "\";") + (print "\"" func-name "\";")) + (set! last-func func-name) + (hash-table-set! breadcrumbs func-name #t) + (loop (read-line) + func-name + allcalls)) + (let ((calls (look-for-all-calls inl fnname))) + (loop (read-line) fnname (append allcalls calls))))))))) + (print "}"))) + targs) + +(print-err "breadcrumbs: " (hash-table-keys breadcrumbs)) +(print-err "function-calls: " function-calls) + +(for-each + (lambda (function-call) + (print-err "function-call: " function-call) + (let ((fnname (car function-call)) + (calls (cadr function-call))) + (for-each + (lambda (callname) + (print (if (hash-table-ref/default breadcrumbs callname #f) "" "// ") + "\"" fnname "\" -> \"" callname "\";")) + calls))) + function-calls) + +(print "}") + +(exit)