Index: Makefile ================================================================== --- Makefile +++ Makefile @@ -7,11 +7,11 @@ db.scm keys.scm margs.scm megatest-version.scm \ process.scm runs.scm tasks.scm tests.scm genexample.scm \ http-transport.scm nmsg-transport.scm filedb.scm \ client.scm gutils.scm synchash.scm daemon.scm mt.scm dcommon.scm \ tree.scm ezsteps.scm lock-queue.scm sdb.scm \ - rmt.scm api.scm tdb.scm rpc-transport.scm \ + rmt.scm api.scm tdb.scm \ portlogger.scm archive.scm # Eggs to install (straightforward ones) EGGS=matchable readline apropos base64 regex-literals format regex-case test coops trace csv \ dot-locking posix-utils posix-extras directory-utils hostinfo tcp-server rpc csv-xml fmt \ @@ -60,11 +60,11 @@ tests.o runs.o dashboard.o dashboard-tests.o dashboard-main.o : run_records.scm db.o ezsteps.o keys.o launch.o megatest.o monitor.o runs-for-ref.o runs.o tests.o : key_records.scm tests.o tasks.o dashboard-tasks.o : task_records.scm runs.o : test_records.scm megatest.o : megatest-fossil-hash.scm -client.scm common.scm configf.scm dashboard-guimonitor.scm dashboard-tests.scm dashboard.scm db.scm dcommon.scm ezsteps.scm fs-transport.scm http-transport.scm index-tree.scm items.scm keys.scm launch.scm megatest.scm monitor.scm mt.scm newdashboard.scm runconfig.scm runs.scm server.scm tdb.scm tests.scm tree.scm zmq-transport.scm : common_records.scm rpc-transport.scm +client.scm common.scm configf.scm dashboard-guimonitor.scm dashboard-tests.scm dashboard.scm db.scm dcommon.scm ezsteps.scm fs-transport.scm http-transport.scm index-tree.scm items.scm keys.scm launch.scm megatest.scm monitor.scm mt.scm newdashboard.scm runconfig.scm runs.scm server.scm tdb.scm tests.scm tree.scm zmq-transport.scm : common_records.scm # Temporary while transitioning to new routine # runs.o : run-tests-queue-classic.scm run-tests-queue-new.scm megatest-fossil-hash.scm : $(SRCFILES) megatest.scm *_records.scm Index: client.scm ================================================================== --- client.scm +++ client.scm @@ -43,20 +43,23 @@ (cdb:logout serverdat *toppath* (client:get-signature))))) ok)) (define (client:connect iface port) (case (server:get-transport) - ((rpc) (rpc:client-connect iface port)) + ((nmsg) (nmsg-transport:client-connect iface port)) ((http) (http:client-connect iface port)) - ((zmq) (zmq:client-connect iface port)) - (else (rpc:client-connect iface port)))) + ;; ((zmq) (zmq:client-connect iface port)) + (else + (debug:print 0 "ERROR: Unknown transport " (server:get-transport)) + (exit 1)))) (define (client:setup run-id #!key (remaining-tries 10) (failed-connects 0)) (case (server:get-transport) - ((rpc) (rpc-transport:client-setup run-id)) ;;(client:setup-rpc run-id)) + ((nmsg) (client:setup-nmsg run-id)) ((http)(client:setup-http run-id)) - (else (rpc-transport:client-setup run-id)))) ;; (client:setup-rpc run-id)))) + (else + (debug:print 0 "ERROR: Unknown transport " (server:get-transport))))) ;; (define (client:login-no-auto-setup server-info run-id) ;; (case (server:get-transport) ;; ((rpc) (rpc:login-no-auto-client-setup server-info run-id)) ;; ((http) (rmt:login-no-auto-client-setup server-info run-id)) @@ -156,10 +159,66 @@ ;; (define (client:setup-http run-id #!key (remaining-tries 10) (failed-connects 0)) (debug:print-info 2 "client:setup remaining-tries=" remaining-tries) (let* ((tdbdat (tasks:open-db))) (if (<= remaining-tries 0) + (begin + (debug:print 0 "ERROR: failed to start or connect to server for run-id " run-id) + (exit 1)) + (let* ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id))) + (debug:print-info 4 "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) + (if server-dat + (let* ((iface (tasks:hostinfo-get-interface server-dat)) + (hostname (tasks:hostinfo-get-hostname server-dat)) + (port (tasks:hostinfo-get-port server-dat)) + (start-res (case *transport-type* + ((http)(http-transport:client-connect iface port)) + ((nmsg)(nmsg-transport:client-connect hostname port)))) + (ping-res (case *transport-type* + ((http)(rmt:login-no-auto-client-setup start-res run-id)) + ((nmsg)(let ((logininfo (rmt:login-no-auto-client-setup start-res run-id))) + (if logininfo + (car (vector-ref logininfo 1)) + #f)))))) + (if (and start-res + ping-res) + (begin + (hash-table-set! *runremote* run-id start-res) + (debug:print-info 2 "connected to " (http-transport:server-dat-make-url start-res)) + start-res) + (begin ;; login failed but have a server record, clean out the record and try again + (debug:print-info 0 "client:setup, login failed, will attempt to start server ... start-res=" start-res ", run-id=" run-id ", server-dat=" server-dat) + (case *transport-type* + ((http)(http-transport:close-connections run-id))) + (hash-table-delete! *runremote* run-id) + (tasks:kill-server-run-id run-id) + (tasks:server-force-clean-run-record (db:delay-if-busy tdbdat) + run-id + (tasks:hostinfo-get-interface server-dat) + (tasks:hostinfo-get-port server-dat) + " client:setup (server-dat = #t)") + (if (> remaining-tries 8) + (thread-sleep! (+ 1 (random 5))) ;; spread out the starts a little + (thread-sleep! (+ 15 (random 20)))) ;; it isn't going well. give it plenty of time + (server:try-running run-id) + (thread-sleep! 5) ;; give server a little time to start up + (client:setup run-id remaining-tries: (- remaining-tries 1)) + ))) + (begin ;; no server registered + (let ((num-available (tasks:num-in-available-state (db:dbdat-get-db tdbdat) run-id))) + (debug:print-info 0 "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available) + (if (< num-available 2) + (server:try-running run-id)) + (thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. + (client:setup run-id remaining-tries: (- remaining-tries 1))))))))) + +;; for nanomsg +;; +(define (client:setup-nmsg run-id #!key (remaining-tries 10) (failed-connects 0)) + (debug:print-info 2 "client:setup remaining-tries=" remaining-tries) + (let* ((tdbdat (tasks:open-db))) + (if (<= remaining-tries 0) (begin (debug:print 0 "ERROR: failed to start or connect to server for run-id " run-id) (exit 1)) (let* ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id))) (debug:print-info 4 "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -145,11 +145,11 @@ -update-meta : update the tests metadata for all tests -setvars VAR1=val1,VAR2=val2 : Add environment variables to a run NB// these are overwritten by values set in config files. -server -|hostname : start the server (reduces contention on megatest.db), use - to automatically figure out hostname - -transport http|zmq : use http or zmq for transport (default is http) + -transport http|nmsg : use http or nanomsg for transport (default is http) -daemonize : fork into background and disconnect from stdin/out -log logfile : send stdout and stderr to logfile -list-servers : list the servers -stop-server id : stop server specified by id (see output of -list-servers), use 0 to kill all Index: nmsg-transport.scm ================================================================== --- nmsg-transport.scm +++ nmsg-transport.scm @@ -11,22 +11,41 @@ (require-extension (srfi 18) extras tcp s11n) (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) (import (prefix sqlite3 sqlite3:)) -;; (use nanomsg) +(use nanomsg) (declare (unit nmsg-transport)) (declare (uses common)) (declare (uses db)) (declare (uses tests)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (declare (uses server)) +(declare (uses portlogger)) (include "common_records.scm") (include "db_records.scm") + +(define (make-http-transport:server-dat)(make-vector 6)) +(define (http-transport:server-dat-get-iface vec) (vector-ref vec 0)) +(define (http-transport:server-dat-get-port vec) (vector-ref vec 1)) +(define (http-transport:server-dat-get-api-uri vec) (vector-ref vec 2)) +(define (http-transport:server-dat-get-api-url vec) (vector-ref vec 3)) +(define (http-transport:server-dat-get-api-req vec) (vector-ref vec 4)) +(define (http-transport:server-dat-get-last-access vec) (vector-ref vec 5)) +(define (http-transport:server-dat-get-socket vec) (vector-ref vec 6)) + +(define (http-transport:server-dat-make-url vec) + (if (and (http-transport:server-dat-get-iface vec) + (http-transport:server-dat-get-port vec)) + (conc "http://" + (http-transport:server-dat-get-iface vec) + ":" + (http-transport:server-dat-get-port vec)) + #f)) ;; Transition to pub --> sub with pull <-- push ;; ;; 1. client sends request to server via push to the pull port ;; 2. server puts request in queue or processes immediately as appropriate @@ -56,10 +75,11 @@ #f (conc "tcp://" (if bindall "*" (car hostport)) ":" (cadr hostport)))) (define *server-loop-heart-beat* (current-seconds)) (define *heartbeat-mutex* (make-mutex)) +(define *nmsg-mutex* (make-mutex)) ;;====================================================================== ;; S E R V E R ;;====================================================================== @@ -280,11 +300,11 @@ (* 60 60 (string->number tmo)) ;; (* 3 24 60 60) ;; default to three days (* 60 1) ;; default to one minute ;; (* 60 60 25) ;; default to 25 hours )))) - (print "Keep-running got server pid " server-id ", using iface " iface " and port " port) + (print "Keep-running got server pid " server-id ", using iface " iface " and port " port " and transport of " *transport-type*) (let loop ((count 0)) (thread-sleep! 4) ;; no need to do this very often ;; NB// sync currently does NOT return queue-length (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) ;; (print "Server running, count is " count) @@ -321,17 +341,17 @@ ;; returns result, there is no sucess/fail flag - handled via excpections ;; (define (nmsg-transport:client-api-send-receive run-id connection-info cmd param #!key (remtries 5)) ;; NB// In the html version of this routine there is a call to ;; tasks:kill-server-run-id when there is an exception - (mutex-lock! *http-mutex*) + (mutex-lock! *nmsg-mutex*) (let* ((packet (vector cmd param)) (reqsoc (http-transport:server-dat-get-socket connection-info)) (res (nmsg-transport:client-api-send-receive-raw reqsoc packet))) ;; (status (vector-ref rawres 0)) ;; (result (vector-ref rawres 1))) - (mutex-unlock! *http-mutex*) + (mutex-unlock! *nmsg-mutex*) res)) ;; (vector status (if status (db:string->obj result transport: 'nmsg) result)))) ;;====================================================================== ;; J U N K ;;====================================================================== Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -19,12 +19,12 @@ (declare (uses common)) (declare (uses db)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (declare (uses synchash)) -(declare (uses http-transport)) -(declare (uses rpc-transport)) +;; (declare (uses http-transport)) +;; (declare (uses rpc-transport)) (declare (uses nmsg-transport)) (declare (uses launch)) (declare (uses daemon)) (include "common_records.scm") @@ -51,11 +51,11 @@ ;; (define (server:launch run-id) (case *transport-type* ((http)(http-transport:launch run-id)) ((nmsg)(nmsg-transport:launch run-id)) - ((rpc) (rpc-transport:launch run-id)) + ;; ((rpc) (rpc-transport:launch run-id)) (else (debug:print 0 "ERROR: unknown server type " *transport-type*)))) ;; (else (debug:print 0 "ERROR: No known transport set, transport=" transport ", using rpc") ;; (rpc-transport:launch run-id))))) ;;====================================================================== @@ -67,11 +67,11 @@ (if *transport-type* *transport-type* (let ((ttype (string->symbol (or (args:get-arg "-transport") (configf:lookup *configdat* "server" "transport") - "rpc")))) + "http")))) (set! *transport-type* ttype) ttype))) ;; Generate a unique signature for this server (define (server:mk-signature) @@ -87,11 +87,11 @@ (define (server:reply return-addr query-sig success/fail result) (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) ;; (send-message pubsock target send-more: #t) ;; (send-message pubsock (case (server:get-transport) - ((rpc) (db:obj->string (vector success/fail query-sig result))) + ;; ((rpc) (db:obj->string (vector success/fail query-sig result))) ((http) (db:obj->string (vector success/fail query-sig result))) ((zmq) (let ((pub-socket (vector-ref *runremote* 1))) (send-message pub-socket return-addr send-more: #t) (send-message pub-socket (db:obj->string (vector success/fail query-sig result))))) Index: tests/fullrun/megatest.config ================================================================== --- tests/fullrun/megatest.config +++ tests/fullrun/megatest.config @@ -145,16 +145,16 @@ # RUNDEAD [system exit 56] [server] # force use of server always -# required yes +required yes # Use http instead of direct filesystem access -transport http +# transport http # transport fs -# transport nmsg +transport nmsg synchronous 0 # If the server can't be started on this port it will try the next port until # it succeeds @@ -164,12 +164,12 @@ # Three minutes is 0.05 hours # timeout 0.025 timeout 0.01 # faststart; unless no, start server but proceed with writes until server started -# faststart no -faststart yes +faststart no +# faststart yes # Start server when average query takes longer than this # server-query-threshold 55500 server-query-threshold 1000