Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -1125,13 +1125,13 @@ ((zmq) (handle-exceptions exn (begin (thread-sleep! 5) - (if (> numretries 0)(apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params))) - (let* ((push-socket (vector-ref zmq-sockets 0)) - (sub-socket (vector-ref zmq-sockets 1)) + (if (> numretries 0)(apply cdb:client-call serverdat qtype immediate (- numretries 1) params))) + (let* ((push-socket (vector-ref serverdat 0)) + (sub-socket (vector-ref serverdat 1)) (client-sig (server:get-client-signature)) (query-sig (message-digest-string (md5-primitive) (conc qtype immediate params))) (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params)))) (res #f) (send-receive (lambda () @@ -1157,11 +1157,11 @@ (debug:print 2 "WARNING: no reply to query " params ", trying resend") (debug:print-info 11 "re-sending message") (send-message push-socket zdat) (debug:print-info 11 "message re-sent") (loop (- n 1))) - ;; (apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params)) + ;; (apply cdb:client-call *runremote* qtype immediate (- numretries 1) params)) (begin (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.") (exit 5)))))))) (debug:print-info 11 "Starting threads") (let ((th1 (make-thread send-receive "send receive")) Index: launch.scm ================================================================== --- launch.scm +++ launch.scm @@ -60,10 +60,11 @@ (work-area (assoc/default 'work-area cmdinfo)) (test-name (assoc/default 'test-name cmdinfo)) (runscript (assoc/default 'runscript cmdinfo)) (ezsteps (assoc/default 'ezsteps cmdinfo)) (runremote (assoc/default 'runremote cmdinfo)) + (transport (assoc/default 'transport cmdinfo)) (run-id (assoc/default 'run-id cmdinfo)) (test-id (assoc/default 'test-id cmdinfo)) (target (assoc/default 'target cmdinfo)) (itemdat (assoc/default 'itemdat cmdinfo)) (env-ovrd (assoc/default 'env-ovrd cmdinfo)) @@ -85,10 +86,11 @@ (rollup-status 0)) (debug:print 2 "Exectuing " test-name " (id: " test-id ") on " (get-host-name)) ;; Setup the *runremote* global var (if *runremote* (debug:print 2 "ERROR: I'm not expecting *runremote* to be set at this time")) (set! *runremote* runremote) + (set! *transport-type* transport) (set! keys (cdb:remote-run db:get-keys #f)) (set! keyvals (if run-id (cdb:remote-run db:get-key-vals #f run-id) #f)) ;; apply pre-overrides before other variables. The pre-override vars must not ;; clobbers things from the official sources such as megatest.config and runconfigs.config (if (string? set-vars) @@ -579,10 +581,11 @@ (set! cmdparms (base64:base64-encode (with-output-to-string (lambda () ;; (list 'hosts hosts) (write (list (list 'testpath test-path) (list 'runremote *runremote*) + (list 'transport *transport-type*) (list 'toppath *toppath*) (list 'work-area work-area) (list 'test-name test-name) (list 'runscript runscript) (list 'run-id run-id ) Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -103,10 +103,11 @@ -server -|hostname : start the server (reduces contention on megatest.db), use - to automatically figure out hostname -transport http|zmq : use http or zmq for transport (default is http) -list-servers : list the servers -repl : start a repl (useful for extending megatest) + -load file.scm : load and run file.scm Spreadsheet generation -extract-ods fname.ods : extract an open document spreadsheet from the database -pathmod path : insert path, i.e. path/runame/itempath/logfile.html will clear the field if no rundir/testname/itempath/logfile @@ -172,10 +173,11 @@ "-set-state-status" "-debug" ;; for *verbosity* > 2 "-gen-megatest-test" "-override-timeout" "-test-files" ;; -test-paths is for listing all + "-load" ;; load and exectute a scheme file ) (list "-h" "-version" "-force" "-xterm" @@ -390,14 +392,14 @@ (let* ((db #f) (runpatt (args:get-arg "-list-runs")) (testpatt (if (args:get-arg "-testpatt") (args:get-arg "-testpatt") "%")) - (runsdat (open-run-close db:get-runs db runpatt #f #f '())) + (runsdat (cdb:remote-run db:get-runs #f runpatt #f #f '())) (runs (db:get-rows runsdat)) (header (db:get-header runsdat)) - (keys (open-run-close db:get-keys db)) + (keys (cdb:remote-run db:get-keys #f)) (keynames (map key:get-fieldname keys)) (db-targets (args:get-arg "-list-db-targets")) (seen (make-hash-table))) ;; Each run (for-each @@ -410,12 +412,12 @@ (begin (hash-table-set! seen targetstr #t) ;; (print "[" targetstr "]")))) (print targetstr)))) (if (not db-targets) - (let* ((run-id (open-run-close db:get-value-by-header run header "id")) - (tests (open-run-close db:get-tests-for-run db run-id testpatt '() '()))) + (let* ((run-id (db:get-value-by-header run header "id")) + (tests (cdb:remote-run db:get-tests-for-run #f run-id testpatt '() '()))) (debug:print 1 "Run: " targetstr " status: " (db:get-value-by-header run header "state") " run-id: " run-id ", number tests: " (length tests)) (for-each (lambda (test) (format #t @@ -437,11 +439,11 @@ "\n diskfree: " (db:test-get-diskfree test) "\n uname: " (db:test-get-uname test) "\n rundir: " (db:test-get-rundir test) ) ;; Each test - (let ((steps (open-run-close db:get-steps-for-test db (db:test-get-id test)))) + (let ((steps (cdb:remote-run db:get-steps-for-test #f (db:test-get-id test)))) (for-each (lambda (step) (format #t " Step: ~20a State: ~10a Status: ~10a Time ~22a\n" (db:step-get-stepname step) @@ -675,44 +677,49 @@ (set! *didsomething* #t))) ;;====================================================================== ;; Test commands (i.e. for use inside tests) ;;====================================================================== + +(define (megatest:step step state status logfile msg) + (if (not (getenv "MT_CMDINFO")) + (begin + (debug:print 0 "ERROR: MT_CMDINFO env var not set, -step must be called *inside* a megatest invoked environment!") + (exit 5)) + (let* ((cmdinfo (read (open-input-string (base64:base64-decode (getenv "MT_CMDINFO"))))) + (runremote (assoc/default 'runremote cmdinfo)) + (testpath (assoc/default 'testpath cmdinfo)) + (test-name (assoc/default 'test-name cmdinfo)) + (runscript (assoc/default 'runscript cmdinfo)) + (db-host (assoc/default 'db-host cmdinfo)) + (run-id (assoc/default 'run-id cmdinfo)) + (test-id (assoc/default 'test-id cmdinfo)) + (itemdat (assoc/default 'itemdat cmdinfo)) + (db #f)) + (change-directory testpath) + (set! *runremote* runremote) + (if (not (setup-for-run)) + (begin + (debug:print 0 "Failed to setup, exiting") + (exit 1))) + (if (and state status) + (open-run-close db:teststep-set-status! db test-id step state status msg logfile) + (begin + (debug:print 0 "ERROR: You must specify :state and :status with every call to -step") + (exit 6)))))) (if (args:get-arg "-step") - (if (not (getenv "MT_CMDINFO")) - (begin - (debug:print 0 "ERROR: MT_CMDINFO env var not set, -step must be called *inside* a megatest invoked environment!") - (exit 5)) - (let* ((step (args:get-arg "-step")) - (cmdinfo (read (open-input-string (base64:base64-decode (getenv "MT_CMDINFO"))))) - (runremote (assoc/default 'runremote cmdinfo)) - (testpath (assoc/default 'testpath cmdinfo)) - (test-name (assoc/default 'test-name cmdinfo)) - (runscript (assoc/default 'runscript cmdinfo)) - (db-host (assoc/default 'db-host cmdinfo)) - (run-id (assoc/default 'run-id cmdinfo)) - (test-id (assoc/default 'test-id cmdinfo)) - (itemdat (assoc/default 'itemdat cmdinfo)) - (db #f) - (state (args:get-arg ":state")) - (status (args:get-arg ":status")) - (logfile (args:get-arg "-setlog"))) - (change-directory testpath) - (set! *runremote* runremote) - (if (not (setup-for-run)) - (begin - (debug:print 0 "Failed to setup, exiting") - (exit 1))) - (if (and state status) - (open-run-close db:teststep-set-status! db test-id step state status (args:get-arg "-m") logfile) - (begin - (debug:print 0 "ERROR: You must specify :state and :status with every call to -step") - (exit 6))) - (if db (sqlite3:finalize! db)) - (set! *didsomething* #t)))) - + (begin + (megatest:step + (args:get-arg "-step") + (args:get-arg ":state") + (args:get-arg ":status") + (args:get-arg "-setlog") + (args:get-arg "-m")) + ;; (if db (sqlite3:finalize! db)) + (set! *didsomething* #t))) + (if (or (args:get-arg "-setlog") ;; since setting up is so costly lets piggyback on -test-status (args:get-arg "-set-toplog") (args:get-arg "-test-status") (args:get-arg "-set-values") (args:get-arg "-load-test-data") @@ -887,11 +894,12 @@ ;;====================================================================== ;; Start a repl ;;====================================================================== -(if (args:get-arg "-repl") +(if (or (args:get-arg "-repl") + (args:get-arg "-load")) (let* ((toppath (setup-for-run)) (db (if toppath (open-db) #f))) (if db (begin (set! *db* db) @@ -901,11 +909,13 @@ (import apropos) (gnu-history-install-file-manager (string-append (or (get-environment-variable "HOME") ".") "/.megatest_history")) (current-input-port (make-gnu-readline-port "megatest> ")) - (repl)) + (if (args:get-arg "-repl") + (repl) + (load (args:get-arg "-load")))) (exit)) (set! *didsomething* #t))) ;;====================================================================== ;; Exit and clean up Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -8,11 +8,11 @@ ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. (require-extension (srfi 18) extras tcp s11n) -(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest zmq) (import (prefix sqlite3 sqlite3:)) (use spiffy uri-common intarweb http-client spiffy-request-vars) (declare (unit server)) @@ -129,12 +129,13 @@ ;; (send-message pubsock (case *transport-type* ((fs) result) ((http)(db:obj->string (vector success/fail query-sig result))) ((zmq) - (send-message pubsock target send-more: #t) - (send-message pubsock (db:obj->string (vector success/fail query-sig result)))) + (let ((pub-socket (vector-ref *runremote* 1))) + (send-message pub-socket return-addr send-more: #t) + (send-message pub-socket (db:obj->string (vector success/fail query-sig result))))) (else (debug:print 0 "ERROR: unrecognised transport type: " *transport-type*) result))) ;;====================================================================== @@ -154,58 +155,51 @@ (define (server:client-logout serverdat) (let ((ok (and (socket? serverdat) (cdb:logout serverdat *toppath* (server:get-client-signature))))) ok)) -(define (server:client-connect iface port) - (let* ((login-res #f) - (serverdat (list iface port))) - (set! login-res (server:client-login serverdat)) - (if (and (not (null? login-res)) - (car login-res)) - (begin - (debug:print-info 2 "Logged in and connected to " iface ":" port) - (set! *runremote* serverdat) - serverdat) - (begin - (debug:print-info 2 "Failed to login or connect to " iface ":" port) - (set! *runremote* #f) - #f)))) - ;; Do all the connection work, look up the transport type and set up the ;; connection if required. ;; +;; There are two scenarios. +;; 1. We are a test manager and we received *transport-type* and *runremote* via cmdline +;; 2. We are a run tests, list runs or other interactive process and we mush figure out +;; *transport-type* and *runremote* from the monitor.db +;; (define (server:client-setup #!key (numtries 50)) (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: failed to find megatest.config, exiting") (exit)))) - (debug:print-info 11 "*transport-type* is " *transport-type*) - (let* ((hostinfo (if (not *transport-type*) ;; If we dont' already have transport type set then figure it out - (open-run-close tasks:get-best-server tasks:open-db) - #f))) + (debug:print-info 11 "*transport-type* is " *transport-type* ", *runremote* is " *runremote*) + (let* ((hostinfo (if (not *transport-type*) ;; If we dont' already have transport type set then figure it out + (open-run-close tasks:get-best-server tasks:open-db) + #f))) ;; if have hostinfo then extract the transport type ;; else fall back to fs (debug:print-info 11 "CLIENT SETUP, hostinfo=" hostinfo) (set! *transport-type* (if hostinfo - (string->symbol (tasks:hostinfo-get-transport hostinfo)) + (string->symbol (tasks:hostinfo-get-transport hostinfo)) 'fs)) + ;; DEBUG STUFF + (if (eq? *transport-type* 'fs)(begin (print "ERROR!!!!!!! refusing to run with transport " *transport-type*)(exit 99))) + (debug:print-info 11 "Using transport type of " *transport-type* (if hostinfo (conc " to connect to " hostinfo) "")) (case *transport-type* - ((fs)(if (not *megatest-db*)(set! *megatest-db* (open-db)))) - ((http) - (http-transport:client-connect (tasks:hostinfo-get-interface hostinfo) - (tasks:hostinfo-get-port hostinfo))) - ((zmq) - (zmq-transport:client-connect (tasks:hostinfo-get-interface hostinfo) - (tasks:hostinfo-get-port hostinfo) - (tasks:hostinfo-get-pubport hostinfo))) - (else ;; default to fs - (set! *transport-type* 'fs) - (set! *megatest-db* (open-db)))))) - + ((fs)(if (not *megatest-db*)(set! *megatest-db* (open-db)))) + ((http) + (http-transport:client-connect (tasks:hostinfo-get-interface hostinfo) + (tasks:hostinfo-get-port hostinfo))) + ((zmq) + (zmq-transport:client-connect (tasks:hostinfo-get-interface hostinfo) + (tasks:hostinfo-get-port hostinfo) + (tasks:hostinfo-get-pubport hostinfo))) + (else ;; default to fs + (debug:print 0 "ERROR: unrecognised transport type " *transport-type* " attempting to continue with fs") + (set! *transport-type* 'fs) + (set! *megatest-db* (open-db)))))) ;; all routes though here end in exit ... (define (server:launch transport) (if (not *toppath*) Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -194,13 +194,13 @@ (sqlite3:for-each-row (lambda (id interface port pubport transport pid hostname) (set! res (cons (vector id interface port pubport transport pid hostname) res)) (debug:print-info 2 "Found existing server " hostname ":" port " registered in db")) mdb + ;; strftime('%s','now')-heartbeat < 10 AND "SELECT id,interface,port,pubport,transport,pid,hostname FROM servers - WHERE strftime('%s','now')-heartbeat < 10 - AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version) + WHERE mt_version=? ORDER BY start_time DESC LIMIT 1;" megatest-version) ;; for now we are keeping only one server registered in the db, return #f or first server found (if (null? res) #f (car res)))) ;; BUG: This logic is probably needed unless methodology changes completely... ;; Index: zmq-transport.scm ================================================================== --- zmq-transport.scm +++ zmq-transport.scm @@ -73,11 +73,12 @@ (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") (exit)))) - (let* ((zmq-sdat1 #f) + (let* ((db (open-db)) ;; here we *do not* want to be opening and closing the db + (zmq-sdat1 #f) (zmq-sdat2 #f) (pull-socket #f) (pub-socket #f) (p1 #f) (p2 #f) @@ -102,10 +103,12 @@ (set! zmq-sdat2 (cadr zmq-sockets-dat)) (set! pub-socket (cadr zmq-sdat2)) (set! p2 (caddr zmq-sdat2)) (set! *cache-on* #t) + + (set! *runremote* (vector pull-socket pub-socket)) ;; overloading the use of *runremote* BUG!? ;; what to do when we quit ;; ;; (on-exit (lambda () ;; (if (and *toppath* *server-info*) @@ -136,11 +139,13 @@ (mutex-lock! *heartbeat-mutex*) (set! *last-db-access* (current-seconds)) (mutex-unlock! *heartbeat-mutex*))) (if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue (begin - (open-run-close db:process-queue #f pub-socket (cons packet queue-lst)) + (db:process-queue-item db packet) + ;; (open-run-close db:process-queue #f pub-socket (cons packet queue-lst)) + (loop '())) (loop (cons packet queue-lst))))))) ;; run zmq-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. @@ -280,15 +285,15 @@ (define (zmq-transport:client-connect iface pullport pubport) (let* ((push-socket (zmq-transport:client-socket-connect iface pullport type: 'push)) (sub-socket (zmq-transport:client-socket-connect iface pubport type: 'sub - subscriptions: (list (zmq-transport:get-client-signature) "all"))) + subscriptions: (list (server:get-client-signature) "all"))) (zmq-sockets (vector push-socket sub-socket)) (login-res #f)) (debug:print-info 11 "zmq-transport:client-connect started. Next is login") - (set! login-res (zmq-transport:client-login zmq-sockets)) + (set! login-res (server:client-login zmq-sockets)) (if (and (not (null? login-res)) (car login-res)) (begin (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".") (set! *runremote* zmq-sockets) @@ -381,16 +386,16 @@ (th2 (make-thread (lambda () (zmq-transport:run (if (args:get-arg "-server") (args:get-arg "-server") "-"))) "Server run")) - (th3 (make-thread (lambda ()(zmq-transport:keep-running)) "Keep running")) + ;; (th3 (make-thread (lambda ()(zmq-transport:keep-running)) "Keep running")) ) (set! *client-non-blocking-mode* #t) ;; (thread-start! th1) (thread-start! th2) - (thread-start! th3) + ;; (thread-start! th3) (set! *didsomething* #t) ;; (thread-join! th3) (thread-join! th2) ) (debug:print 0 "ERROR: Failed to setup for megatest")))