Index: Makefile ================================================================== --- Makefile +++ Makefile @@ -4,11 +4,11 @@ INSTALL=install SRCFILES = common.scm items.scm launch.scm \ ods.scm runconfig.scm server.scm configf.scm \ db.scm keys.scm margs.scm megatest-version.scm \ process.scm runs.scm tasks.scm tests.scm genexample.scm \ - fs-transport.scm + fs-transport.scm zmq-transport.scm http-transport.scm GUISRCF = dashboard.scm dashboard-tests.scm dashboard-guimonitor.scm dashboard-main.scm OFILES = $(SRCFILES:%.scm=%.o) GOFILES = $(GUISRCF:%.scm=%.o) Index: http-transport.scm ================================================================== --- http-transport.scm +++ http-transport.scm @@ -15,21 +15,22 @@ (use spiffy uri-common intarweb http-client spiffy-request-vars) (tcp-buffer-size 2048) -(declare (unit server)) +(declare (unit http-transport)) (declare (uses common)) (declare (uses db)) (declare (uses tests)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. +(declare (uses server)) (include "common_records.scm") (include "db_records.scm") -(define (server:make-server-url hostport) +(define (http-transport:make-server-url hostport) (if (not hostport) #f (conc "http://" (car hostport) ":" (cadr hostport)))) (define *server-loop-heart-beat* (current-seconds)) @@ -42,11 +43,11 @@ ;; Call this to start the actual server ;; (define *db:process-queue-mutex* (make-mutex)) -(define (server:run hostn) +(define (http-transport:run hostn) (debug:print 2 "Attempting to start the server ...") (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") @@ -102,17 +103,17 @@ (send-response body: (conc "ctrl data\n" res "") headers: '((content-type text/plain))))) (else (continue)))))))) - (server:try-start-server ipaddrstr start-port) + (http-transport:try-start-server ipaddrstr start-port) ;; lite3:finalize! db))) )) -;; (define (server:main-loop) +;; (define (http-transport:main-loop) ;; (print "INFO: Exectuing main server loop") ;; (access-log "megatest-http.log") ;; (server-bind-address #f) ;; (define-page (main-page-path) ;; (lambda () @@ -142,36 +143,36 @@ ;;; headers: '((content-type text/plain))) ;;; (continue)))))) ;;; ;;; (start-server port: 12345) -;; This is recursively run by server:run until sucessful +;; This is recursively run by http-transport:run until sucessful ;; -(define (server:try-start-server ipaddrstr portnum) +(define (http-transport:try-start-server ipaddrstr portnum) (handle-exceptions exn (begin (print-error-message exn) (if (< portnum 9000) (begin (print "WARNING: failed to start on portnum: " portnum ", trying next port") (thread-sleep! 0.1) (open-run-close tasks:remove-server-records tasks:open-db) - (server:try-start-server ipaddrstr (+ portnum 1))) + (http-transport:try-start-server ipaddrstr (+ portnum 1))) (print "ERROR: Tried and tried but could not start the server"))) (set! *runremote* (list ipaddrstr portnum)) (open-run-close tasks:remove-server-records tasks:open-db) (open-run-close tasks:server-register tasks:open-db (current-process-id) - ipaddrstr portnum 0 'live) + ipaddrstr portnum 0 'live 'http) (print "INFO: Trying to start server on " ipaddrstr ":" portnum) ;; This starts the spiffy server (start-server port: portnum) (print "INFO: server has been stopped"))) -(define (server:mk-signature) +(define (http-transport:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) (argv))))))) @@ -178,41 +179,26 @@ ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== -;; When using zmq this would send the message back (two step process) -;; with spiffy or rpc this simply returns the return data to be returned -;; -(define (server:reply return-addr query-sig success/fail result) - (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) - ;; (send-message pubsock target send-more: #t) - ;; (send-message pubsock - (db:obj->string (vector success/fail query-sig result))) - ;;====================================================================== ;; C L I E N T S ;;====================================================================== -(define (server:get-client-signature) - (if *my-client-signature* *my-client-signature* - (let ((sig (server:mk-signature))) - (set! *my-client-signature* sig) - *my-client-signature*))) - ;; ;; ;; 1 Hello, world! Goodbye Dolly ;; Send msg to serverdat and receive result -(define (server:client-send-receive serverdat msg) - (let* ((url (server:make-server-url serverdat)) +(define (http-transport:client-send-receive serverdat msg) + (let* ((url (http-transport:make-server-url serverdat)) (fullurl (conc url "/ctrl")) ;; (conc url "/?dat=" msg))) (numretries 0)) (handle-exceptions exn (if (< numretries 200) - (server:client-send-receive serverdat msg)) + (http-transport:client-send-receive serverdat msg)) (begin (debug:print-info 11 "fullurl=" fullurl "\n") ;; set up the http-client here (max-retry-attempts 100) (retry-request? (lambda (request) @@ -232,22 +218,11 @@ (debug:print-info 11 "match=" match) (let ((final (cadr match))) (debug:print-info 11 "final=" final) final))))))) -(define (server:client-login serverdat) - (max-retry-attempts 100) - (cdb:login serverdat *toppath* (server:get-client-signature))) - -;; Not currently used! But, I think it *should* be used!!! -(define (server:client-logout serverdat) - (let ((ok (and (socket? serverdat) - (cdb:logout serverdat *toppath* (server:get-client-signature))))) - ;; (close-socket serverdat) - ok)) - -(define (server:client-connect iface port) +(define (http-transport:client-connect iface port) (let* ((login-res #f) (serverdat (list iface port))) (set! login-res (server:client-login serverdat)) (if (and (not (null? login-res)) (car login-res)) @@ -258,54 +233,15 @@ (begin (debug:print-info 2 "Failed to login or connect to " iface ":" port) (set! *runremote* #f) #f)))) -;; Do all the connection work, start a server if not already running -(define (server:client-setup #!key (numtries 50)) - (if (not *toppath*) - (if (not (setup-for-run)) - (begin - (debug:print 0 "ERROR: failed to find megatest.config, exiting") - (exit)))) - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (if hostinfo - (let ((host (list-ref hostinfo 0)) - (iface (list-ref hostinfo 1)) - (port (list-ref hostinfo 2)) - (pid (list-ref hostinfo 3))) - (debug:print-info 2 "Setting up to connect to " hostinfo) - (server:client-connect iface port)) ;; ) - (if (> numtries 0) - (let ((exe (car (argv))) - (pid #f)) - (debug:print-info 0 "No server available, attempting to start one...") - (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) - (string-intersperse *verbosity* ",") - (conc *verbosity*))))) - ;; (set! pid (process-fork (lambda () - ;; (current-input-port (open-input-file "/dev/null")) - ;; (current-output-port (open-output-file "/dev/null")) - ;; (current-error-port (open-output-file "/dev/null")) - ;; (server:launch)))) - (let loop ((count 0)) - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (if (not hostinfo) - (begin - (debug:print-info 0 "Waiting for server pid=" pid " to start") - (sleep 2) ;; give server time to start - (if (< count 5) - (loop (+ count 1))))))) - ;; we are starting a server, do not try again! That can lead to - ;; recursively starting many processes!!! - (server:client-setup numtries: 0)) - (debug:print-info 1 "Too many attempts, giving up"))))) - -;; run server:keep-running in a parallel thread to monitor that the db is being + +;; run http-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; -(define (server:keep-running) +(define (http-transport:keep-running) ;; if none running or if > 20 seconds since ;; server last used then start shutdown ;; This thread waits for the server to come alive (let* ((server-info (let loop () (let ((sdat #f)) @@ -356,57 +292,32 @@ (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") (exit))))))) ;; all routes though here end in exit ... -(define (server:launch) +(define (http-transport:launch) (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, exiting") (exit)))) (debug:print-info 2 "Starting the standalone server") (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (debug:print 11 "server:launch hostinfo=" hostinfo) + (debug:print 11 "http-transport:launch hostinfo=" hostinfo) (if hostinfo (debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) (if *toppath* (let* ((th2 (make-thread (lambda () - (server:run + (http-transport:run (if (args:get-arg "-server") (args:get-arg "-server") "-"))) "Server run")) - (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) + (th3 (make-thread (lambda ()(http-transport:keep-running)) "Keep running")) ) (thread-start! th2) (thread-start! th3) (set! *didsomething* #t) (thread-join! th2) ) (debug:print 0 "ERROR: Failed to setup for megatest"))) (exit))) -(define (server:client-signal-handler signum) - (handle-exceptions - exn - (debug:print " ... exiting ...") - (let ((th1 (make-thread (lambda () - "") ;; do nothing for now (was flush out last call if applicable) - "eat response")) - (th2 (make-thread (lambda () - (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") - (thread-sleep! 1) ;; give the flush one second to do it's stuff - (debug:print 0 " Done.") - (exit 4)) - "exit on ^C timer"))) - (thread-start! th2) - (thread-start! th1) - (thread-join! th2)))) - -(define (server:client-launch) - (set-signal-handler! signal/int server:client-signal-handler) - (if (server:client-setup) - (debug:print-info 2 "connected as client") - (begin - (debug:print 0 "ERROR: Failed to connect as client") - (exit)))) - Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -97,11 +97,12 @@ -env2file fname : write the environment to fname.csh and fname.sh -setvars VAR1=val1,VAR2=val2 : Add environment variables to a run NB// these are overwritten by values set in config files. -server -|hostname : start the server (reduces contention on megatest.db), use - to automatically figure out hostname - -list-servers : list the servers + -transport http|zmq : use http or zmq for transport (default is http) + -list-servers : list the servers -repl : start a repl (useful for extending megatest) Spreadsheet generation -extract-ods fname.ods : extract an open document spreadsheet from the database -pathmod path : insert path, i.e. path/runame/itempath/logfile.html @@ -157,10 +158,11 @@ ":expected" ":tol" ":units" ;; misc "-server" + "-transport" "-kill-server" "-port" "-extract-ods" "-pathmod" "-env2file" @@ -283,23 +285,23 @@ ;; Start the server - can be done in conjunction with -runall or -runtests (one day...) ;; we start the server if not running else start the client thread ;;====================================================================== (if (args:get-arg "-server") - (begin - (debug:print 2 "Launching server...") - (server:launch))) + (let ((transport (args:get-arg "-transport" 'http))) + (debug:print 2 "Launching server using transport " transport) + (server:launch transport))) (if (args:get-arg "-list-servers") ;; (args:get-arg "-kill-server")) (let ((tl (setup-for-run))) (if tl (let ((servers (open-run-close tasks:get-all-servers tasks:open-db)) - (fmtstr "~5a~8a~8a~20a~20a~10a~10a~10a~10a\n") + (fmtstr "~5a~8a~8a~20a~20a~10a~10a~10a~10a~10a\n") (servers-to-kill '())) - (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface" "OutPort" "InPort" "LastBeat" "State") - (format #t fmtstr "==" "=====" "===" "====" "=========" "=======" "======" "========" "=====") + (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface" "OutPort" "InPort" "LastBeat" "State" "Transport") + (format #t fmtstr "==" "=====" "===" "====" "=========" "=======" "======" "========" "=====" "=========") (for-each (lambda (server) (let* (;; (killinfo (args:get-arg "-kill-server")) ;; (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f)) ;; (kpid (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f)) @@ -312,10 +314,11 @@ (start-time (vector-ref server 6)) (priority (vector-ref server 7)) (state (vector-ref server 8)) (mt-ver (vector-ref server 9)) (last-update (vector-ref server 10)) ;; (open-run-close tasks:server-alive? tasks:open-db #f hostname: hostname port: port)) + (transport (vector-ref server 11)) (killed #f) (status (< last-update 20))) ;; (zmq-sockets (if status (server:client-connect hostname port) #f))) ;; no need to login as status of #t indicates we are connecting to correct ;; server @@ -324,11 +327,11 @@ (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid action: 'delete)) (if (> last-update 20) ;; Mark as dead if not updated in last 20 seconds (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid))) (format #t fmtstr id mt-ver pid hostname interface pullport pubport last-update - (if status "alive" "dead")))) + (if status "alive" "dead") transport))) servers) (debug:print-info 1 "Done with listservers") (set! *didsomething* #t) (exit) ;; must do, would have to add checks to many/all calls below ) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -13,18 +13,18 @@ (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) (import (prefix sqlite3 sqlite3:)) (use spiffy uri-common intarweb http-client spiffy-request-vars) -(tcp-buffer-size 2048) - (declare (unit server)) (declare (uses common)) (declare (uses db)) (declare (uses tests)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. +(declare (uses http-transport)) +(declare (uses zmq-transport)) (include "common_records.scm") (include "db_records.scm") (define (server:make-server-url hostport) @@ -107,70 +107,10 @@ (server:try-start-server ipaddrstr start-port) ;; lite3:finalize! db))) )) - -;; (define (server:main-loop) -;; (print "INFO: Exectuing main server loop") -;; (access-log "megatest-http.log") -;; (server-bind-address #f) -;; (define-page (main-page-path) -;; (lambda () -;; (let ((dat ($ "dat"))) -;; ;; (with-request-variables (dat) -;; (debug:print-info 12 "Got dat=" dat) -;; (let* ((packet (db:string->obj dat)) -;; (qtype (cdb:packet-get-qtype packet))) -;; (debug:print-info 12 "server=> received packet=" packet) -;; (if (not (member qtype '(sync ping))) -;; (begin -;; (mutex-lock! *heartbeat-mutex*) -;; (set! *last-db-access* (current-seconds)) -;; (mutex-unlock! *heartbeat-mutex*))) -;; (let ((res (open-run-close db:process-queue-item open-db packet))) -;; (debug:print-info 11 "Return value from db:process-queue-item is " res) -;; res)))))) - -;;; (use spiffy uri-common intarweb) -;;; -;;; (root-path "/var/www") -;;; -;;; (vhost-map `(((* any) . ,(lambda (continue) -;;; (if (equal? (uri-path (request-uri (current-request))) -;;; '(/ "hey")) -;;; (send-response body: "hey there!\n" -;;; headers: '((content-type text/plain))) -;;; (continue)))))) -;;; -;;; (start-server port: 12345) - -;; This is recursively run by server:run until sucessful -;; -(define (server:try-start-server ipaddrstr portnum) - (handle-exceptions - exn - (begin - (print-error-message exn) - (if (< portnum 9000) - (begin - (print "WARNING: failed to start on portnum: " portnum ", trying next port") - (thread-sleep! 0.1) - (open-run-close tasks:remove-server-records tasks:open-db) - (server:try-start-server ipaddrstr (+ portnum 1))) - (print "ERROR: Tried and tried but could not start the server"))) - (set! *runremote* (list ipaddrstr portnum)) - (open-run-close tasks:remove-server-records tasks:open-db) - (open-run-close tasks:server-register - tasks:open-db - (current-process-id) - ipaddrstr portnum 0 'live) - (print "INFO: Trying to start server on " ipaddrstr ":" portnum) - ;; This starts the spiffy server - (start-server port: portnum) - (print "INFO: server has been stopped"))) - (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) @@ -187,11 +127,14 @@ (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) ;; (send-message pubsock target send-more: #t) ;; (send-message pubsock (case *transport-type* ((fs) result) - ((http)(db:obj->string (vector success/fail query-sig result))))) + ((http)(db:obj->string (vector success/fail query-sig result))) + ((zmq) + (send-message pubsock target send-more: #t) + (send-message pubsock (db:obj->string (vector success/fail query-sig result)))) ;;====================================================================== ;; C L I E N T S ;;====================================================================== @@ -199,54 +142,17 @@ (if *my-client-signature* *my-client-signature* (let ((sig (server:mk-signature))) (set! *my-client-signature* sig) *my-client-signature*))) -;; -;; -;; 1 Hello, world! Goodbye Dolly -;; Send msg to serverdat and receive result -(define (server:client-send-receive serverdat msg) - (let* ((url (server:make-server-url serverdat)) - (fullurl (conc url "/ctrl")) ;; (conc url "/?dat=" msg))) - (numretries 0)) - (handle-exceptions - exn - (if (< numretries 200) - (server:client-send-receive serverdat msg)) - (begin - (debug:print-info 11 "fullurl=" fullurl "\n") - ;; set up the http-client here - (max-retry-attempts 100) - (retry-request? (lambda (request) - (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10)) - (set! numretries (+ numretries 1)) - #t)) - ;; send the data and get the response - ;; extract the needed info from the http data and - ;; process and return it. - (let* ((res (with-input-from-request fullurl - ;; #f - ;; msg - (list (cons 'dat msg)) - read-string))) - (debug:print-info 11 "got res=" res) - (let ((match (string-search (regexp "(.*)<.body>") res))) - (debug:print-info 11 "match=" match) - (let ((final (cadr match))) - (debug:print-info 11 "final=" final) - final))))))) - (define (server:client-login serverdat) - (max-retry-attempts 100) (cdb:login serverdat *toppath* (server:get-client-signature))) ;; Not currently used! But, I think it *should* be used!!! (define (server:client-logout serverdat) (let ((ok (and (socket? serverdat) (cdb:logout serverdat *toppath* (server:get-client-signature))))) - ;; (close-socket serverdat) ok)) (define (server:client-connect iface port) (let* ((login-res #f) (serverdat (list iface port))) @@ -269,134 +175,51 @@ (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: failed to find megatest.config, exiting") (exit)))) - (case *transport-type* + (let* ((hostinfo (if (not *transport-type*) ;; If we dont' already have transport type set then figure it out + (open-run-close tasks:get-best-server tasks:open-db) + #f))) + ;; if have hostinfo then extract the transport type + ;; else fall back to fs + ;; + (set! *transport-type* (if hostinfo + (string->vector (tasks:hostinfo-get-transport hostinfo)) + 'fs)) + (debug:print-info 1 "Using transport type of " *transport-type* (if hostinfo (conc " to connect to " hostinfo) "")) + (case *transport-type* ((fs)(if (not *megatest-db*)(set! *megatest-db* (open-db)))) ((http) - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (if hostinfo - (let ((host (list-ref hostinfo 0)) - (iface (list-ref hostinfo 1)) - (port (list-ref hostinfo 2)) - (pid (list-ref hostinfo 3))) - (debug:print-info 2 "Setting up to connect to " hostinfo) - (server:client-connect iface port)) ;; ) - (begin - (debug:print 0 "ERROR: No server found, exiting") - (exit))))))) - -;; (if (> numtries 0) -;; (let ((exe (car (argv))) -;; (pid #f)) -;; (debug:print-info 0 "No server available, attempting to start one...") -;; (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) -;; (string-intersperse *verbosity* ",") -;; (conc *verbosity*))))) -;; ;; (set! pid (process-fork (lambda () -;; ;; (current-input-port (open-input-file "/dev/null")) -;; ;; (current-output-port (open-output-file "/dev/null")) -;; ;; (current-error-port (open-output-file "/dev/null")) -;; ;; (server:launch)))) -;; (let loop ((count 0)) -;; (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) -;; (if (not hostinfo) -;; (begin -;; (debug:print-info 0 "Waiting for server pid=" pid " to start") -;; (sleep 2) ;; give server time to start -;; (if (< count 5) -;; (loop (+ count 1))))))) -;; ;; we are starting a server, do not try again! That can lead to -;; ;; recursively starting many processes!!! -;; (server:client-setup numtries: 0)) -;; (debug:print-info 1 "Too many attempts, giving up"))))) - -;; run server:keep-running in a parallel thread to monitor that the db is being -;; used and to shutdown after sometime if it is not. -;; -(define (server:keep-running) - ;; if none running or if > 20 seconds since - ;; server last used then start shutdown - ;; This thread waits for the server to come alive - (let* ((server-info (let loop () - (let ((sdat #f)) - (mutex-lock! *heartbeat-mutex*) - (set! sdat *runremote*) - (mutex-unlock! *heartbeat-mutex*) - (if sdat sdat - (begin - (sleep 4) - (loop)))))) - (iface (car server-info)) - (port (cadr server-info)) - (last-access 0) - (tdb (tasks:open-db)) - (spid (tasks:server-get-server-id tdb #f iface port #f))) - (print "Keep-running got server pid " spid ", using iface " iface " and port " port) - (let loop ((count 0)) - (thread-sleep! 4) ;; no need to do this very often - ;; NB// sync currently does NOT return queue-length - (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) - ;; (print "Server running, count is " count) - (if (< count 1) ;; 3x3 = 9 secs aprox - (loop (+ count 1))) - - ;; NOTE: Get rid of this mechanism! It really is not needed... - (tasks:server-update-heartbeat tdb spid) - - ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access - (mutex-lock! *heartbeat-mutex*) - (set! last-access *last-db-access*) - (mutex-unlock! *heartbeat-mutex*) - (if (> (+ last-access - ;; (* 50 60 60) ;; 48 hrs - ;; 60 ;; one minute - ;; (* 60 60) ;; one hour - (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. - ) - (current-seconds)) - (begin - (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) - (loop 0)) - (begin - (debug:print-info 0 "Starting to shutdown the server.") - ;; need to delete only *my* server entry (future use) - (set! *time-to-exit* #t) - (tasks:server-deregister-self tdb (get-host-name)) - (thread-sleep! 1) - (debug:print-info 0 "Max cached queries was " *max-cache-size*) - (debug:print-info 0 "Server shutdown complete. Exiting") - (exit))))))) + (http-transport:client-connect (tasks:hostinfo-get-interface hostinfo) + (tasks:hostinfo-get-port hostinfo))) + ((zmq) + (zmq-transport:client-connect (tasks:hostinfo-get-interface hostinfo) + (tasks:hostinfo-get-port hostinfo) + (tasks:hostinfo-get-pubport hostinfo))) + (else ;; default to fs + (set! *transport-type* 'fs) + (set! *megatest-db* (open-db)))))) + + ;; all routes though here end in exit ... -(define (server:launch) +(define (server:launch transport) (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, exiting") (exit)))) - (debug:print-info 2 "Starting the standalone server") - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (debug:print 11 "server:launch hostinfo=" hostinfo) - (if hostinfo - (debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) - (if *toppath* - (let* ((th2 (make-thread (lambda () - (server:run - (if (args:get-arg "-server") - (args:get-arg "-server") - "-"))) "Server run")) - (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) - ) - (thread-start! th2) - (thread-start! th3) - (set! *didsomething* #t) - (thread-join! th2) - ) - (debug:print 0 "ERROR: Failed to setup for megatest"))) - (exit))) + (debug:print-info 2 "Starting server using " transport " transport") + (set! *transport-type* transport) + (case transport + ((fs) (exit)) ;; there is no "fs" transport + ((http) (http-transport:launch)) + ((zmq) (zmq-transport:launch)) + (else + (debug:print "WARNING: unrecognised transport " transport) + (exit)))) (define (server:client-signal-handler signum) (handle-exceptions exn (debug:print " ... exiting ...") Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -67,10 +67,11 @@ start_time TIMESTAMP, priority INTEGER, state TEXT, mt_version TEXT, heartbeat TIMESTAMP, + transport TEXT, CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));") (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY, server_id INTEGER, pid INTEGER, hostname TEXT, @@ -84,23 +85,32 @@ ;;====================================================================== ;; Server and client management ;;====================================================================== +;; make-vector-record tasks hostinfo id interface port pubport transport +(define (make-tasks:hostinfo)(make-vector 5)) +(define (tasks:hostinfo-get-id vec) (vector-ref vec 0)) +(define (tasks:hostinfo-get-interface vec) (vector-ref vec 1)) +(define (tasks:hostinfo-get-port vec) (vector-ref vec 2)) +(define (tasks:hostinfo-get-pubport vec) (vector-ref vec 3)) +(define (tasks:hostinfo-get-transport vec) (vector-ref vec 4)) + ;; state: 'live, 'shutting-down, 'dead -(define (tasks:server-register mdb pid interface port priority state #!key (pubport -1)) +(define (tasks:server-register mdb pid interface port priority state transport #!key (pubport -1)) (debug:print-info 11 "tasks:server-register " pid " " interface " " port " " priority " " state) (sqlite3:execute mdb - "INSERT OR REPLACE INTO servers (pid,hostname,port,pubport,start_time,priority,state,mt_version,heartbeat,interface) - VALUES(?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?);" - pid (get-host-name) port pubport priority (conc state) megatest-version interface) - (list + "INSERT OR REPLACE INTO servers (pid,hostname,port,pubport,start_time,priority,state,mt_version,heartbeat,interface,transport) + VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?,?);" + pid (get-host-name) port pubport priority (conc state) megatest-version interface (conc transport)) + (vector (tasks:server-get-server-id mdb (get-host-name) interface port pid) interface port pubport + transport )) ;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used! (define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)(action 'markdead)) (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid) @@ -255,14 +265,14 @@ (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row - (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update) - (set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update) res))) + (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport) + (set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport) res))) mdb - "SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update FROM servers ORDER BY start_time DESC;") + "SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport FROM servers ORDER BY start_time DESC;") res)) ;;====================================================================== ;; Tasks and Task monitors Index: zmq-transport.scm ================================================================== --- zmq-transport.scm +++ zmq-transport.scm @@ -13,16 +13,17 @@ (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) (import (prefix sqlite3 sqlite3:)) (use zmq) -(declare (unit server)) +(declare (unit zmq-transport)) (declare (uses common)) (declare (uses db)) (declare (uses tests)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. +(declare (uses server)) (include "common_records.scm") (include "db_records.scm") ;; Transition to pub --> sub with pull <-- push @@ -48,11 +49,11 @@ ;; [x] [ ] - after 60 seconds ;; [ ] [ ] i. check server alive, connect to new if necessary ;; [ ] [ ] ii. resend request ;; [ ] [ ] 7. Turn self ping back on -(define (server:make-server-url hostport) +(define (zmq-transport:make-server-url hostport) (if (not hostport) #f (conc "tcp://" (car hostport) ":" (cadr hostport)))) (define *server-loop-heart-beat* (current-seconds)) @@ -65,11 +66,11 @@ (define-inline (zmqsock:get-pub dat)(vector-ref dat 0)) (define-inline (zmqsock:get-pull dat)(vector-ref dat 1)) (define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0)) (define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0)) -(define (server:run hostn) +(define (zmq-transport:run hostn) (debug:print 2 "Attempting to start the server ...") (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") @@ -88,11 +89,11 @@ (ipaddrstr (let ((ipstr (if (string=? "-" hostn) (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") #f))) (if ipstr ipstr hostname))) (last-run 0)) - (set! zmq-sockets-dat (server:setup-ports ipaddrstr (if (args:get-arg "-port") + (set! zmq-sockets-dat (zmq-transport:setup-ports ipaddrstr (if (args:get-arg "-port") (string->number (args:get-arg "-port")) (+ 5000 (random 1001))))) (set! zmq-sdat1 (car zmq-sockets-dat)) (set! pull-socket (cadr zmq-sdat1)) ;; (iface s port) @@ -138,14 +139,14 @@ (begin (open-run-close db:process-queue #f pub-socket (cons packet queue-lst)) (loop '())) (loop (cons packet queue-lst))))))) -;; run server:keep-running in a parallel thread to monitor that the db is being +;; run zmq-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; -(define (server:keep-running) +(define (zmq-transport:keep-running) ;; if none running or if > 20 seconds since ;; server last used then start shutdown ;; This thread waits for the server to come alive (let* ((server-info (let loop () (let ((sdat #f)) @@ -157,11 +158,11 @@ (sleep 4) (loop)))))) (iface (cadr server-info)) (pullport (caddr server-info)) (pubport (cadddr server-info)) ;; id interface pullport pubport) - (zmq-sockets (server:client-connect iface pullport pubport)) + (zmq-sockets (zmq-transport:client-connect iface pullport pubport)) (last-access 0)) (let loop ((count 0)) (thread-sleep! 4) ;; no need to do this very often ;; NB// sync currently does NOT return queue-length (let ((queue-len (cdb:client-call zmq-sockets 'sync #t 1))) @@ -194,37 +195,44 @@ (thread-sleep! 1) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") (exit))))))) -(define (server:find-free-port-and-open iface s port stype #!key (trynum 50)) +(define (zmq-transport:find-free-port-and-open iface s port stype #!key (trynum 50)) (let ((s (if s s (make-socket stype))) - (p (if (number? port) port 5555)) - (old-handler (current-exception-handler))) - (handle-exceptions - exn - (begin - (print-error-message exn) - (if (< portnum 9000) - (begin - (print "WARNING: failed to start on portnum: " portnum ", trying next port") - (thread-sleep! 0.1) - (open-run-close tasks:remove-server-records tasks:open-db) - (server:try-start-server ipaddrstr (+ portnum 1))) - (print "ERROR: Tried and tried but could not start the server"))) - (set! *runremote* (list ipaddrstr portnum)) - (open-run-close tasks:remove-server-records tasks:open-db) - (open-run-close tasks:server-register - tasks:open-db - (current-process-id) - ipaddrstr portnum 0 'live) - (print "INFO: Trying to start server on " ipaddrstr ":" portnum) - ;; This starts the spiffy server - (start-server port: portnum) - (print "INFO: server has been stopped"))) - -(define (server:mk-signature) + (p (if (number? port) port 5555)) + (old-handler (current-exception-handler))) + (handle-exceptions + exn + (begin + (debug:print 0 "Failed to bind to port " p ", trying next port") + (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) + ;; (old-handler) + ;; (print-call-chain) + (if (> trynum 0) + (zmq-transport:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1)) + (debug:print-info 0 "Tried ports up to " p + " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use")) + (exit)) ;; To exit or not? That is the question. + (let ((zmq-url (conc "tcp://" iface ":" p))) + (debug:print 2 "Trying to start server on " zmq-url) + (bind-socket s zmq-url) + (list iface s port))))) + +(define (zmq-transport:setup-ports ipaddrstr startport) + (let* ((s1 (zmq-transport:find-free-port-and-open ipaddrstr #f startport 'pull)) + (p1 (caddr s1)) + (s2 (zmq-transport:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub)) + (p2 (caddr s2))) + (set! *runremote* #f) + (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2) + (mutex-lock! *heartbeat-mutex*) + (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live 'zmq)) + (mutex-unlock! *heartbeat-mutex*) + (list s1 s2))) + +(define (zmq-transport:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) (argv))))))) @@ -231,37 +239,22 @@ ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== -;; When using zmq this would send the message back (two step process) -;; with spiffy or rpc this simply returns the return data to be returned -;; -(define (server:reply return-addr query-sig success/fail result) - (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) - ;; (send-message pubsock target send-more: #t) - ;; (send-message pubsock - (db:obj->string (vector success/fail query-sig result))) - ;;====================================================================== ;; C L I E N T S ;;====================================================================== -(define (server:get-client-signature) - (if *my-client-signature* *my-client-signature* - (let ((sig (server:mk-signature))) - (set! *my-client-signature* sig) - *my-client-signature*))) - ;; -(define (server:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '())) +(define (zmq-transport:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '())) (debug:print-info 3 "client-connect " iface ":" port ", type=" type ", subscriptions=" subscriptions) (let ((connect-ok #f) (zmq-socket (if context (make-socket type context) (make-socket type))) - (conurl (server:make-server-url (list iface port)))) + (conurl (zmq-transport:make-server-url (list iface port)))) (if (socket? zmq-socket) (begin ;; first apply subscriptions (for-each (lambda (subscription) (debug:print 2 "Subscribing to " subscription) @@ -271,29 +264,18 @@ zmq-socket) (begin (debug:print 0 "ERROR: Failed to open socket to " conurl) #f)))) -(define (server:client-login serverdat) - (max-retry-attempts 100) - (cdb:login serverdat *toppath* (server:get-client-signature))) - -;; Not currently used! But, I think it *should* be used!!! -(define (server:client-logout serverdat) - (let ((ok (and (socket? serverdat) - (cdb:logout serverdat *toppath* (server:get-client-signature))))) - ;; (close-socket serverdat) - ok)) - -(define (server:client-connect iface pullport pubport) - (let* ((push-socket (server:client-socket-connect iface pullport type: 'push)) - (sub-socket (server:client-socket-connect iface pubport +(define (zmq-transport:client-connect iface pullport pubport) + (let* ((push-socket (zmq-transport:client-socket-connect iface pullport type: 'push)) + (sub-socket (zmq-transport:client-socket-connect iface pubport type: 'sub - subscriptions: (list (server:get-client-signature) "all"))) + subscriptions: (list (zmq-transport:get-client-signature) "all"))) (zmq-sockets (vector push-socket sub-socket)) (login-res #f)) - (set! login-res (server:client-login zmq-sockets)) + (set! login-res (zmq-transport:client-login zmq-sockets)) (if (and (not (null? login-res)) (car login-res)) (begin (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".") (set! *runremote* zmq-sockets) @@ -301,66 +283,14 @@ (begin (debug:print-info 2 "Failed to login or connect to " conurl) (set! *runremote* #f) #f)))) -;; Do all the connection work, start a server if not already running -(define (server:client-setup #!key (numtries 50)) - (if (not *toppath*) - (if (not (setup-for-run)) - (begin - (debug:print 0 "ERROR: failed to find megatest.config, exiting") - (exit)))) - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (if hostinfo - (let ((host (list-ref hostinfo 0)) - (iface (list-ref hostinfo 1)) - (port (list-ref hostinfo 2)) - (pubport (list-ref hostinfo 3)) - (pid (list-ref hostinfo 4))) - (debug:print-info 2 "Setting up to connect to " hostinfo) - ;; (handle-exceptions - ;; exn - ;; (begin - ;; ;; something went wrong in connecting to the server. In this scenario it is ok - ;; ;; to try again - ;; (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo) - ;; (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) - ;; (debug:print 0 " perhaps jobs killed with -9? Removing server records") - ;; (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport) - ;; (server:client-setup (- numtries 1)) - ;; #f) - (server:client-connect iface port pubport)) ;; ) - (if (> numtries 0) - (let ((exe (car (argv))) - (pid #f)) - (debug:print-info 0 "No server available, attempting to start one...") - ;; (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) - ;; (string-intersperse *verbosity* ",") - ;; (conc *verbosity*))))) - (set! pid (process-fork (lambda () - ;; (current-input-port (open-input-file "/dev/null")) - ;; (current-output-port (open-output-file "/dev/null")) - ;; (current-error-port (open-output-file "/dev/null")) - (server:launch)))) ;; should never get here .... - (let loop ((count 0)) - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (if (not hostinfo) - (begin - (debug:print-info 0 "Waiting for server pid=" pid " to start") - (sleep 2) ;; give server time to start - (if (< count 5) - (loop (+ count 1))))))) - ;; we are starting a server, do not try again! That can lead to - ;; recursively starting many processes!!! - (server:client-setup numtries: 0)) - (debug:print-info 1 "Too many attempts, giving up"))))) - -;; run server:keep-running in a parallel thread to monitor that the db is being +;; run zmq-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; -(define (server:keep-running) +(define (zmq-transport:keep-running) ;; if none running or if > 20 seconds since ;; server last used then start shutdown ;; This thread waits for the server to come alive (let* ((server-info (let loop () (let ((sdat #f)) @@ -411,55 +341,50 @@ (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") (exit))))))) ;; all routes though here end in exit ... -(define (server:launch) +(define (zmq-transport:launch) (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, exiting") (exit)))) - (debug:print-info 2 "Starting the standalone server") - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (debug:print 11 "server:launch hostinfo=" hostinfo) - (if hostinfo - (debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) - (if *toppath* - (let* (;; (th1 (make-thread (lambda () - ;; (let ((server-info #f)) - ;; ;; wait for the server to be online and available - ;; (let loop () - ;; (debug:print-info 2 "Waiting for the server to come online before starting heartbeat") - ;; (thread-sleep! 2) - ;; (mutex-lock! *heartbeat-mutex*) - ;; (set! server-info *server-info* ) - ;; (mutex-unlock! *heartbeat-mutex*) - ;; (if (not server-info)(loop))) - ;; (debug:print 2 "Server alive, starting self-ping") - ;; (server:self-ping server-info) - ;; )) - ;; "Self ping")) - (th2 (make-thread (lambda () - (server:run - (if (args:get-arg "-server") - (args:get-arg "-server") - "-"))) "Server run")) - (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) - ) - (set! *client-non-blocking-mode* #t) - ;; (thread-start! th1) - (thread-start! th2) - (thread-start! th3) - (set! *didsomething* #t) - ;; (thread-join! th3) - (thread-join! th2) - ) - (debug:print 0 "ERROR: Failed to setup for megatest"))) - (exit))) - -(define (server:client-signal-handler signum) + (debug:print-info 2 "Starting zmq server") + (if *toppath* + (let* (;; (th1 (make-thread (lambda () + ;; (let ((server-info #f)) + ;; ;; wait for the server to be online and available + ;; (let loop () + ;; (debug:print-info 2 "Waiting for the server to come online before starting heartbeat") + ;; (thread-sleep! 2) + ;; (mutex-lock! *heartbeat-mutex*) + ;; (set! server-info *server-info* ) + ;; (mutex-unlock! *heartbeat-mutex*) + ;; (if (not server-info)(loop))) + ;; (debug:print 2 "Server alive, starting self-ping") + ;; (zmq-transport:self-ping server-info) + ;; )) + ;; "Self ping")) + (th2 (make-thread (lambda () + (zmq-transport:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-"))) "Server run")) + (th3 (make-thread (lambda ()(zmq-transport:keep-running)) "Keep running")) + ) + (set! *client-non-blocking-mode* #t) + ;; (thread-start! th1) + (thread-start! th2) + (thread-start! th3) + (set! *didsomething* #t) + ;; (thread-join! th3) + (thread-join! th2) + ) + (debug:print 0 "ERROR: Failed to setup for megatest"))) + +(define (zmq-transport:client-signal-handler signum) (handle-exceptions exn (debug:print " ... exiting ...") (let ((th1 (make-thread (lambda () (if (not *received-response*) @@ -473,13 +398,13 @@ "exit on ^C timer"))) (thread-start! th2) (thread-start! th1) (thread-join! th2)))) -(define (server:client-launch) - (set-signal-handler! signal/int server:client-signal-handler) - (if (server:client-setup) +(define (zmq-transport:client-launch) + (set-signal-handler! signal/int zmq-transport:client-signal-handler) + (if (zmq-transport:client-setup) (debug:print-info 2 "connected as client") (begin (debug:print 0 "ERROR: Failed to connect as client") (exit)))) @@ -487,24 +412,24 @@ ;; Defunct functions ;;====================================================================== ;; ping a server and return number of clients or #f (if no response) ;; NOT IN USE! -(define (server:ping host port #!key (secs 10)(return-socket #f)) +(define (zmq-transport:ping host port #!key (secs 10)(return-socket #f)) (cdb:use-non-blocking-mode (lambda () (let* ((res #f) (th1 (make-thread (lambda () (let* ((zmq-context (make-context 1)) - (zmq-socket (server:client-connect host port context: zmq-context))) + (zmq-socket (zmq-transport:client-connect host port context: zmq-context))) (if zmq-socket - (if (server:client-login zmq-socket) + (if (zmq-transport:client-login zmq-socket) (let ((numclients (cdb:num-clients zmq-socket))) (if (not return-socket) (begin - (server:client-logout zmq-socket) + (zmq-transport:client-logout zmq-socket) (close-socket zmq-socket))) (set! res (list #t numclients (if return-socket zmq-socket #f)))) (begin ;; (close-socket zmq-socket) (set! res (list #f "CAN'T LOGIN" #f)))) @@ -526,25 +451,25 @@ exn (set! res (list #f "TIMED OUT" #f)) (thread-join! th1 secs)) res)))) -;; (define (server:self-ping server-info) +;; (define (zmq-transport:self-ping server-info) ;; ;; server-info: server-id interface pullport pubport ;; (let ((iface (list-ref server-info 1)) ;; (pullport (list-ref server-info 2)) ;; (pubport (list-ref server-info 3))) -;; (server:client-connect iface pullport pubport) +;; (zmq-transport:client-connect iface pullport pubport) ;; (let loop () ;; (thread-sleep! 2) ;; (cdb:client-call *runremote* 'ping #t) -;; (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!") +;; (debug:print 4 "zmq-transport:self-ping - I'm alive on " iface ":" pullport "/" pubport "!") ;; (mutex-lock! *heartbeat-mutex*) ;; (set! *server-loop-heart-beat* (current-seconds)) ;; (mutex-unlock! *heartbeat-mutex*) ;; (loop)))) -(define (server:reply pubsock target query-sig success/fail result) - (debug:print-info 11 "server:reply target=" target ", result=" result) +(define (zmq-transport:reply pubsock target query-sig success/fail result) + (debug:print-info 11 "zmq-transport:reply target=" target ", result=" result) (send-message pubsock target send-more: #t) (send-message pubsock (db:obj->string (vector success/fail query-sig result))))