Index: Makefile ================================================================== --- Makefile +++ Makefile @@ -3,11 +3,12 @@ CSCOPTS= INSTALL=install SRCFILES = common.scm items.scm launch.scm \ ods.scm runconfig.scm server.scm configf.scm \ db.scm keys.scm margs.scm megatest-version.scm \ - process.scm runs.scm tasks.scm tests.scm genexample.scm + process.scm runs.scm tasks.scm tests.scm genexample.scm \ + fs-transport.scm zmq-transport.scm http-transport.scm GUISRCF = dashboard.scm dashboard-tests.scm dashboard-guimonitor.scm dashboard-main.scm OFILES = $(SRCFILES:%.scm=%.o) GOFILES = $(GUISRCF:%.scm=%.o) Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -40,10 +40,12 @@ (define *globalexitstatus* 0) ;; attempt to work around possible thread issues (define *passnum* 0) ;; when running track calls to run-tests or similar ;; SERVER (define *my-client-signature* #f) +(define *transport-type* #f) +(define *megatest-db* #f) (define *rpc:listener* #f) ;; if set up for server communication this will hold the tcp port (define *runremote* #f) ;; if set up for server communication this will hold (define *last-db-access* (current-seconds)) ;; update when db is accessed via server (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -17,15 +17,19 @@ ;; (import (prefix rpc rpc:)) (use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest base64) (import (prefix sqlite3 sqlite3:)) (import (prefix base64 base64:)) + +;; Note, try to remove this dependency +(use zmq) (declare (unit db)) (declare (uses common)) (declare (uses keys)) (declare (uses ods)) +(declare (uses fs-transport)) (include "common_records.scm") (include "db_records.scm") (include "key_records.scm") (include "run_records.scm") @@ -1103,23 +1107,37 @@ ;; (debug:print-info 4 "Starting cache processing") ;; (let loop () ;; (thread-sleep! 10) ;; move save time around to minimize regular collisions? ;; (db:write-cached-data) ;; (loop))) - +;; The queue is a list of vectors where the zeroth slot indicates the type of query to +;; apply and the second slot is the time of the query and the third entry is a list of +;; values to be applied +;; +;; NOTE: Can remove the regex and base64 encoding for zmq (define (db:obj->string obj) - (string-substitute - (regexp "=") "_" - (base64:base64-encode (with-output-to-string (lambda ()(serialize obj)))) - #t)) + (case *transport-type* + ((fs) obj) + ((http) + (string-substitute + (regexp "=") "_" + (base64:base64-encode (with-output-to-string (lambda ()(serialize obj)))) + #t)) + ((zmq)(with-output-to-string (lambda ()(serialize obj)))) + (else obj))) (define (db:string->obj msg) - (with-input-from-string - (base64:base64-decode - (string-substitute - (regexp "_") "=" msg #t)) - (lambda ()(deserialize)))) + (case *transport-type* + ((fs) msg) + ((http) + (with-input-from-string + (base64:base64-decode + (string-substitute + (regexp "_") "=" msg #t)) + (lambda ()(deserialize)))) + ((zmq)(with-input-from-string msg (lambda ()(deserialize)))) + (else msg))) (define (cdb:use-non-blocking-mode proc) (set! *client-non-blocking-mode* #t) (let ((res (proc))) (set! *client-non-blocking-mode* #f) @@ -1126,58 +1144,83 @@ res)) ;; params = 'target cached remparams ;; ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime +;; +;; cdb:client-call is the unified interface to all the transports. It dispatches the +;; query to a server routine (e.g. server:client-send-recieve) that +;; transports the data to the server where it is passed to db:process-queue-item +;; which either returns the data to the calling server routine or +;; directly calls the returning procedure (e.g. zmq). ;; (define (cdb:client-call serverdat qtype immediate numretries . params) (debug:print-info 11 "cdb:client-call serverdat=" serverdat ", qtype=" qtype ", immediate=" immediate ", numretries=" numretries ", params=" params) - ;; (handle-exceptions - ;; exn - ;; (begin - ;; (thread-sleep! 5) - ;; (if (> numretries 0)(apply cdb:client-call serverdat qtype immediate (- numretries 1) params))) - (let* ((client-sig (server:get-client-signature)) - (query-sig (message-digest-string (md5-primitive) (conc qtype immediate params))) - (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params)))) - ) - (debug:print-info 11 "zdat=" zdat) - (let* ( - (res #f) - (rawdat (server:client-send-receive serverdat zdat)) - (tmp #f)) - (debug:print-info 11 "Sent " zdat ", received " rawdat) - (set! tmp (db:string->obj rawdat)) - ;; (if (equal? query-sig (vector-ref myres 1)) - ;; (set! res - (vector-ref tmp 2) - ;; (loop (server:client-send-receive serverdat zdat))))))) - ;; (timeout (lambda () - ;; (let loop ((n numretries)) - ;; (thread-sleep! 15) - ;; (if (not res) - ;; (if (> numretries 0) - ;; (begin - ;; (debug:print 2 "WARNING: no reply to query " params ", trying resend") - ;; (debug:print-info 11 "re-sending message") - ;; (apply cdb:client-call serverdat qtype immediate numretries params) - ;; (debug:print-info 11 "message re-sent") - ;; (loop (- n 1))) - ;; ;; (apply cdb:client-call serverdats qtype immediate (- numretries 1) params)) - ;; (begin - ;; (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.") - ;; (exit 5)))))))) - ;; (send-receive) - ))) - ;; (debug:print-info 11 "Starting threads") - ;; (let ((th1 (make-thread send-receive "send receive")) - ;; (th2 (make-thread timeout "timeout"))) - ;; (thread-start! th1) - ;; (thread-start! th2) - ;; (thread-join! th1) - ;; (debug:print-info 11 "cdb:client-call returning res=" res) - ;; res)))) + (case *transport-type* + ((fs) + (let ((packet (vector "na" qtype immediate "na" params 0))) + (fs:process-queue-item packet))) + ((http) + (let* ((client-sig (server:get-client-signature)) + (query-sig (message-digest-string (md5-primitive) (conc qtype immediate params))) + (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds))))) ;; (with-output-to-string (lambda ()(serialize params)))) + (debug:print-info 11 "zdat=" zdat) + (let* ((res #f) + (rawdat (http-transport:client-send-receive serverdat zdat)) + (tmp #f)) + (debug:print-info 11 "Sent " zdat ", received " rawdat) + (set! tmp (db:string->obj rawdat)) + (vector-ref tmp 2)))) + ((zmq) + (handle-exceptions + exn + (begin + (thread-sleep! 5) + (if (> numretries 0)(apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params))) + (let* ((push-socket (vector-ref zmq-sockets 0)) + (sub-socket (vector-ref zmq-sockets 1)) + (client-sig (server:get-client-signature)) + (query-sig (message-digest-string (md5-primitive) (conc qtype immediate params))) + (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params)))) + (res #f) + (send-receive (lambda () + (debug:print-info 11 "sending message") + (send-message push-socket zdat) + (debug:print-info 11 "message sent") + (let loop () + ;; get the sender info + ;; this should match (server:get-client-signature) + ;; we will need to process "all" messages here some day + (receive-message* sub-socket) + ;; now get the actual message + (let ((myres (db:string->obj (receive-message* sub-socket)))) + (if (equal? query-sig (vector-ref myres 1)) + (set! res (vector-ref myres 2)) + (loop)))))) + (timeout (lambda () + (let loop ((n numretries)) + (thread-sleep! 15) + (if (not res) + (if (> numretries 0) + (begin + (debug:print 2 "WARNING: no reply to query " params ", trying resend") + (debug:print-info 11 "re-sending message") + (send-message push-socket zdat) + (debug:print-info 11 "message re-sent") + (loop (- n 1))) + ;; (apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params)) + (begin + (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.") + (exit 5)))))))) + (debug:print-info 11 "Starting threads") + (let ((th1 (make-thread send-receive "send receive")) + (th2 (make-thread timeout "timeout"))) + (thread-start! th1) + (thread-start! th2) + (thread-join! th1) + (debug:print-info 11 "cdb:client-call returning res=" res) + res)))))) (define (cdb:set-verbosity serverdat val) (cdb:client-call serverdat 'set-verbosity #f *default-numtries* val)) (define (cdb:login serverdat keyval signature) @@ -1270,10 +1313,67 @@ set-verbosity killserver)) ;; not used, intended to indicate to run in calling process (define db:run-local-queries '()) ;; rollup-tests-pass-fail)) + +(define (db:write-cached-data) + (open-run-close + (lambda (db . junkparams) + (let ((queries (make-hash-table)) + (data #f)) + (mutex-lock! *incoming-mutex*) + (set! data (sort *incoming-data* (lambda (a b)(< (vector-ref a 1)(vector-ref b 1))))) + (set! *incoming-data* '()) + (mutex-unlock! *incoming-mutex*) + (if (> (length data) 0) + (debug:print-info 4 "Writing cached data " data)) + ;; prepare the needed statements + (for-each (lambda (request-item) + (let ((stmt-key (vector-ref request-item 0))) + (if (not (hash-table-ref/default queries stmt-key #f)) + (let ((stmt (alist-ref stmt-key db:queries))) + (if stmt + (hash-table-set! queries stmt-key (sqlite3:prepare db (car stmt))) + (debug:print 0 "ERROR: Missing query spec for " stmt-key "!")))))) + data) + (let outerloop ((special-qry #f) + (stmts data)) + (if special-qry + ;; handle a query that cannot be part of the grouped queries + (let* ((stmt-key (vector-ref special-qry 0)) + (qry (hash-table-ref queries stmt-key)) + (params (vector-ref speical-qry 2))) + (apply sqlite3:execute db qry params) + (if (not (null? stmts)) + (outerloop #f stmts))) + ;; handle normal queries + (sqlite3:with-transaction + db + (lambda () + (debug:print-info 11 "flushing " stmts " to db") + (if (not (null? stmts)) + (let innerloop ((hed (car stmts)) + (tal (cdr stmts))) + (let ((params (vector-ref hed 2)) + (stmt-key (vector-ref hed 0))) + (if (not (member stmt-key db:special-queries)) + (begin + (debug:print-info 11 "Executing " stmt-key " for " params) + (apply sqlite3:execute (hash-table-ref queries stmt-key) params) + (if (not (null? tal)) + (innerloop (car tal)(cdr tal)))) + (outerloop hed tal))))))))) + (for-each (lambda (stmt-key) + (sqlite3:finalize! (hash-table-ref queries stmt-key))) + (hash-table-keys queries)) + (let ((cache-size (length data))) + (if (> cache-size *max-cache-size*) + (set! *max-cache-size* cache-size))) + )) + #f)) + ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; ADDED fs-transport.scm Index: fs-transport.scm ================================================================== --- /dev/null +++ fs-transport.scm @@ -0,0 +1,44 @@ + +;; Copyright 2006-2012, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +(require-extension (srfi 18) extras tcp s11n) + +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) +(import (prefix sqlite3 sqlite3:)) + +(use spiffy uri-common intarweb http-client spiffy-request-vars) + +(tcp-buffer-size 2048) + +(declare (unit fs-transport)) + +(declare (uses common)) +(declare (uses db)) +(declare (uses tests)) +(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. + +(include "common_records.scm") +(include "db_records.scm") + + +;;====================================================================== +;; F S T R A N S P O R T S E R V E R +;;====================================================================== + +;; There is no "server" per se but a convience routine to make it non +;; necessary to be reopening the db over and over again. +;; + +(define (fs:process-queue-item packet) + (if (not *megatest-db*) ;; we will require that (setup-for-run) has already been called + (set! *megatest-db* (open-db))) + (debug:print-info 11 "fs:process-queue-item called with packet=" packet) + (db:process-queue-item *megatest-db* packet)) + ADDED http-transport.scm Index: http-transport.scm ================================================================== --- /dev/null +++ http-transport.scm @@ -0,0 +1,287 @@ + +;; Copyright 2006-2012, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +(require-extension (srfi 18) extras tcp s11n) + +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) +(import (prefix sqlite3 sqlite3:)) + +(use spiffy uri-common intarweb http-client spiffy-request-vars) + +(tcp-buffer-size 2048) + +(declare (unit http-transport)) + +(declare (uses common)) +(declare (uses db)) +(declare (uses tests)) +(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. +(declare (uses server)) + +(include "common_records.scm") +(include "db_records.scm") + +(define (http-transport:make-server-url hostport) + (if (not hostport) + #f + (conc "http://" (car hostport) ":" (cadr hostport)))) + +(define *server-loop-heart-beat* (current-seconds)) +(define *heartbeat-mutex* (make-mutex)) + +;;====================================================================== +;; S E R V E R +;;====================================================================== + +;; Call this to start the actual server +;; + +(define *db:process-queue-mutex* (make-mutex)) + +(define (http-transport:run hostn) + (debug:print 2 "Attempting to start the server ...") + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") + (exit)))) + (let* (;; (iface (if (string=? "-" hostn) + ;; #f ;; (get-host-name) + ;; hostn)) + (db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily + (hostname (get-host-name)) + (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + #f))) + (if ipstr ipstr hostn))) ;; hostname))) + (start-port (if (args:get-arg "-port") + (string->number (args:get-arg "-port")) + (+ 5000 (random 1001)))) + (link-tree-path (config-lookup *configdat* "setup" "linktree"))) + (set! *cache-on* #t) + (root-path (if link-tree-path + link-tree-path + (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP! + + ;; Setup the web server and a /ctrl interface + ;; + (vhost-map `(((* any) . ,(lambda (continue) + ;; open the db on the first call + (if (not db)(set! db (open-db))) + (let* (($ (request-vars source: 'both)) + (dat ($ 'dat)) + (res #f)) + (cond + ((equal? (uri-path (request-uri (current-request))) + '(/ "hey")) + (send-response body: "hey there!\n" + headers: '((content-type text/plain)))) + ;; This is the /ctrl path where data is handed to the server and + ;; responses + ((equal? (uri-path (request-uri (current-request))) + '(/ "ctrl")) + (let* ((packet (db:string->obj dat)) + (qtype (cdb:packet-get-qtype packet))) + (debug:print-info 12 "server=> received packet=" packet) + (if (not (member qtype '(sync ping))) + (begin + (mutex-lock! *heartbeat-mutex*) + (set! *last-db-access* (current-seconds)) + (mutex-unlock! *heartbeat-mutex*))) + ;; (mutex-lock! *db:process-queue-mutex*) ;; trying a mutex + ;; (set! res (open-run-close db:process-queue-item open-db packet)) + (set! res (db:process-queue-item db packet)) + ;; (mutex-unlock! *db:process-queue-mutex*) + (debug:print-info 11 "Return value from db:process-queue-item is " res) + (send-response body: (conc "ctrl data\n" + res + "") + headers: '((content-type text/plain))))) + (else (continue)))))))) + (http-transport:try-start-server ipaddrstr start-port) + ;; lite3:finalize! db))) + )) + +;; This is recursively run by http-transport:run until sucessful +;; +(define (http-transport:try-start-server ipaddrstr portnum) + (handle-exceptions + exn + (begin + (print-error-message exn) + (if (< portnum 9000) + (begin + (print "WARNING: failed to start on portnum: " portnum ", trying next port") + (thread-sleep! 0.1) + (open-run-close tasks:remove-server-records tasks:open-db) + (http-transport:try-start-server ipaddrstr (+ portnum 1))) + (print "ERROR: Tried and tried but could not start the server"))) + (set! *runremote* (list ipaddrstr portnum)) + (open-run-close tasks:remove-server-records tasks:open-db) + (open-run-close tasks:server-register + tasks:open-db + (current-process-id) + ipaddrstr portnum 0 'live 'http) + (print "INFO: Trying to start server on " ipaddrstr ":" portnum) + ;; This starts the spiffy server + (start-server port: portnum) + (print "INFO: server has been stopped"))) + +(define (http-transport:mk-signature) + (message-digest-string (md5-primitive) + (with-output-to-string + (lambda () + (write (list (current-directory) + (argv))))))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +;;====================================================================== +;; C L I E N T S +;;====================================================================== + +;; +;; +;; 1 Hello, world! Goodbye Dolly +;; Send msg to serverdat and receive result +(define (http-transport:client-send-receive serverdat msg) + (let* ((url (http-transport:make-server-url serverdat)) + (fullurl (conc url "/ctrl")) ;; (conc url "/?dat=" msg))) + (numretries 0)) + (handle-exceptions + exn + (if (< numretries 200) + (http-transport:client-send-receive serverdat msg)) + (begin + (debug:print-info 11 "fullurl=" fullurl "\n") + ;; set up the http-client here + (max-retry-attempts 100) + (retry-request? (lambda (request) + (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10)) + (set! numretries (+ numretries 1)) + #t)) + ;; send the data and get the response + ;; extract the needed info from the http data and + ;; process and return it. + (let* ((res (with-input-from-request fullurl + ;; #f + ;; msg + (list (cons 'dat msg)) + read-string))) + (debug:print-info 11 "got res=" res) + (let ((match (string-search (regexp "(.*)<.body>") res))) + (debug:print-info 11 "match=" match) + (let ((final (cadr match))) + (debug:print-info 11 "final=" final) + final))))))) + +(define (http-transport:client-connect iface port) + (let* ((login-res #f) + (serverdat (list iface port))) + (set! login-res (server:client-login serverdat)) + (if (and (not (null? login-res)) + (car login-res)) + (begin + (debug:print-info 2 "Logged in and connected to " iface ":" port) + (set! *runremote* serverdat) + serverdat) + (begin + (debug:print-info 2 "Failed to login or connect to " iface ":" port) + (set! *runremote* #f) + #f)))) + + +;; run http-transport:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (http-transport:keep-running) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *runremote*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (sleep 4) + (loop)))))) + (iface (car server-info)) + (port (cadr server-info)) + (last-access 0) + (tdb (tasks:open-db)) + (spid (tasks:server-get-server-id tdb #f iface port #f))) + (print "Keep-running got server pid " spid ", using iface " iface " and port " port) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (tasks:server-update-heartbeat tdb spid) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (if (> (+ last-access + ;; (* 50 60 60) ;; 48 hrs + ;; 60 ;; one minute + ;; (* 60 60) ;; one hour + (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (tasks:server-deregister-self tdb (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit))))))) + +;; all routes though here end in exit ... +(define (http-transport:launch) + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: cannot find megatest.config, exiting") + (exit)))) + (debug:print-info 2 "Starting the standalone server") + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) + (debug:print 11 "http-transport:launch hostinfo=" hostinfo) + (if hostinfo + (debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) + (if *toppath* + (let* ((th2 (make-thread (lambda () + (http-transport:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-"))) "Server run")) + (th3 (make-thread (lambda ()(http-transport:keep-running)) "Keep running")) + ) + (thread-start! th2) + (thread-start! th3) + (set! *didsomething* #t) + (thread-join! th2) + ) + (debug:print 0 "ERROR: Failed to setup for megatest"))) + (exit))) + Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -97,11 +97,12 @@ -env2file fname : write the environment to fname.csh and fname.sh -setvars VAR1=val1,VAR2=val2 : Add environment variables to a run NB// these are overwritten by values set in config files. -server -|hostname : start the server (reduces contention on megatest.db), use - to automatically figure out hostname - -list-servers : list the servers + -transport http|zmq : use http or zmq for transport (default is http) + -list-servers : list the servers -repl : start a repl (useful for extending megatest) Spreadsheet generation -extract-ods fname.ods : extract an open document spreadsheet from the database -pathmod path : insert path, i.e. path/runame/itempath/logfile.html @@ -157,10 +158,11 @@ ":expected" ":tol" ":units" ;; misc "-server" + "-transport" "-kill-server" "-port" "-extract-ods" "-pathmod" "-env2file" @@ -283,23 +285,23 @@ ;; Start the server - can be done in conjunction with -runall or -runtests (one day...) ;; we start the server if not running else start the client thread ;;====================================================================== (if (args:get-arg "-server") - (begin - (debug:print 2 "Launching server...") - (server:launch))) + (let ((transport (args:get-arg "-transport" "http"))) + (debug:print 2 "Launching server using transport " transport) + (server:launch (string->symbol transport)))) (if (args:get-arg "-list-servers") ;; (args:get-arg "-kill-server")) (let ((tl (setup-for-run))) (if tl (let ((servers (open-run-close tasks:get-all-servers tasks:open-db)) - (fmtstr "~5a~8a~8a~20a~20a~10a~10a~10a~10a\n") + (fmtstr "~5a~8a~8a~20a~20a~10a~10a~10a~10a~10a\n") (servers-to-kill '())) - (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface" "OutPort" "InPort" "LastBeat" "State") - (format #t fmtstr "==" "=====" "===" "====" "=========" "=======" "======" "========" "=====") + (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface" "OutPort" "InPort" "LastBeat" "State" "Transport") + (format #t fmtstr "==" "=====" "===" "====" "=========" "=======" "======" "========" "=====" "=========") (for-each (lambda (server) (let* (;; (killinfo (args:get-arg "-kill-server")) ;; (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f)) ;; (kpid (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f)) @@ -312,10 +314,11 @@ (start-time (vector-ref server 6)) (priority (vector-ref server 7)) (state (vector-ref server 8)) (mt-ver (vector-ref server 9)) (last-update (vector-ref server 10)) ;; (open-run-close tasks:server-alive? tasks:open-db #f hostname: hostname port: port)) + (transport (vector-ref server 11)) (killed #f) (status (< last-update 20))) ;; (zmq-sockets (if status (server:client-connect hostname port) #f))) ;; no need to login as status of #t indicates we are connecting to correct ;; server @@ -324,11 +327,11 @@ (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid action: 'delete)) (if (> last-update 20) ;; Mark as dead if not updated in last 20 seconds (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid))) (format #t fmtstr id mt-ver pid hostname interface pullport pubport last-update - (if status "alive" "dead")))) + (if status "alive" "dead") transport))) servers) (debug:print-info 1 "Done with listservers") (set! *didsomething* #t) (exit) ;; must do, would have to add checks to many/all calls below ) @@ -335,11 +338,11 @@ (exit))) ;; if not list or kill then start a client (if appropriate) (if (or (args-defined? "-h" "-version" "-gen-megatest-area" "-gen-megatest-test") (eq? (length (hash-table-keys args:arg-hash)) 0)) (debug:print-info 1 "Server connection not needed") - + ;; ok, so lets connect to the server (server:client-launch))) ;;====================================================================== ;; Weird special calls that need to run *after* the server has started? ;;====================================================================== ADDED rpc-transport.scm Index: rpc-transport.scm ================================================================== --- /dev/null +++ rpc-transport.scm @@ -0,0 +1,412 @@ + +;; Copyright 2006-2012, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +(require-extension (srfi 18) extras tcp s11n) + +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) +(import (prefix sqlite3 sqlite3:)) + +(use spiffy uri-common intarweb http-client spiffy-request-vars) + +(tcp-buffer-size 2048) + +(declare (unit server)) + +(declare (uses common)) +(declare (uses db)) +(declare (uses tests)) +(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. + +(include "common_records.scm") +(include "db_records.scm") + +(define (server:make-server-url hostport) + (if (not hostport) + #f + (conc "http://" (car hostport) ":" (cadr hostport)))) + +(define *server-loop-heart-beat* (current-seconds)) +(define *heartbeat-mutex* (make-mutex)) + +;;====================================================================== +;; S E R V E R +;;====================================================================== + +;; Call this to start the actual server +;; + +(define *db:process-queue-mutex* (make-mutex)) + +(define (server:run hostn) + (debug:print 2 "Attempting to start the server ...") + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") + (exit)))) + (let* (;; (iface (if (string=? "-" hostn) + ;; #f ;; (get-host-name) + ;; hostn)) + (db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily + (hostname (get-host-name)) + (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + #f))) + (if ipstr ipstr hostn))) ;; hostname))) + (start-port (if (args:get-arg "-port") + (string->number (args:get-arg "-port")) + (+ 5000 (random 1001)))) + (link-tree-path (config-lookup *configdat* "setup" "linktree"))) + (set! *cache-on* #t) + (root-path (if link-tree-path + link-tree-path + (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP! + + ;; Setup the web server and a /ctrl interface + ;; + (vhost-map `(((* any) . ,(lambda (continue) + ;; open the db on the first call + (if (not db)(set! db (open-db))) + (let* (($ (request-vars source: 'both)) + (dat ($ 'dat)) + (res #f)) + (cond + ((equal? (uri-path (request-uri (current-request))) + '(/ "hey")) + (send-response body: "hey there!\n" + headers: '((content-type text/plain)))) + ;; This is the /ctrl path where data is handed to the server and + ;; responses + ((equal? (uri-path (request-uri (current-request))) + '(/ "ctrl")) + (let* ((packet (db:string->obj dat)) + (qtype (cdb:packet-get-qtype packet))) + (debug:print-info 12 "server=> received packet=" packet) + (if (not (member qtype '(sync ping))) + (begin + (mutex-lock! *heartbeat-mutex*) + (set! *last-db-access* (current-seconds)) + (mutex-unlock! *heartbeat-mutex*))) + ;; (mutex-lock! *db:process-queue-mutex*) ;; trying a mutex + ;; (set! res (open-run-close db:process-queue-item open-db packet)) + (set! res (db:process-queue-item db packet)) + ;; (mutex-unlock! *db:process-queue-mutex*) + (debug:print-info 11 "Return value from db:process-queue-item is " res) + (send-response body: (conc "ctrl data\n" + res + "") + headers: '((content-type text/plain))))) + (else (continue)))))))) + (server:try-start-server ipaddrstr start-port) + ;; lite3:finalize! db))) + )) + + + +;; (define (server:main-loop) +;; (print "INFO: Exectuing main server loop") +;; (access-log "megatest-http.log") +;; (server-bind-address #f) +;; (define-page (main-page-path) +;; (lambda () +;; (let ((dat ($ "dat"))) +;; ;; (with-request-variables (dat) +;; (debug:print-info 12 "Got dat=" dat) +;; (let* ((packet (db:string->obj dat)) +;; (qtype (cdb:packet-get-qtype packet))) +;; (debug:print-info 12 "server=> received packet=" packet) +;; (if (not (member qtype '(sync ping))) +;; (begin +;; (mutex-lock! *heartbeat-mutex*) +;; (set! *last-db-access* (current-seconds)) +;; (mutex-unlock! *heartbeat-mutex*))) +;; (let ((res (open-run-close db:process-queue-item open-db packet))) +;; (debug:print-info 11 "Return value from db:process-queue-item is " res) +;; res)))))) + +;;; (use spiffy uri-common intarweb) +;;; +;;; (root-path "/var/www") +;;; +;;; (vhost-map `(((* any) . ,(lambda (continue) +;;; (if (equal? (uri-path (request-uri (current-request))) +;;; '(/ "hey")) +;;; (send-response body: "hey there!\n" +;;; headers: '((content-type text/plain))) +;;; (continue)))))) +;;; +;;; (start-server port: 12345) + +;; This is recursively run by server:run until sucessful +;; +(define (server:try-start-server ipaddrstr portnum) + (handle-exceptions + exn + (begin + (print-error-message exn) + (if (< portnum 9000) + (begin + (print "WARNING: failed to start on portnum: " portnum ", trying next port") + (thread-sleep! 0.1) + (open-run-close tasks:remove-server-records tasks:open-db) + (server:try-start-server ipaddrstr (+ portnum 1))) + (print "ERROR: Tried and tried but could not start the server"))) + (set! *runremote* (list ipaddrstr portnum)) + (open-run-close tasks:remove-server-records tasks:open-db) + (open-run-close tasks:server-register + tasks:open-db + (current-process-id) + ipaddrstr portnum 0 'live) + (print "INFO: Trying to start server on " ipaddrstr ":" portnum) + ;; This starts the spiffy server + (start-server port: portnum) + (print "INFO: server has been stopped"))) + +(define (server:mk-signature) + (message-digest-string (md5-primitive) + (with-output-to-string + (lambda () + (write (list (current-directory) + (argv))))))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +;; When using zmq this would send the message back (two step process) +;; with spiffy or rpc this simply returns the return data to be returned +;; +(define (server:reply return-addr query-sig success/fail result) + (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) + ;; (send-message pubsock target send-more: #t) + ;; (send-message pubsock + (db:obj->string (vector success/fail query-sig result))) + +;;====================================================================== +;; C L I E N T S +;;====================================================================== + +(define (server:get-client-signature) + (if *my-client-signature* *my-client-signature* + (let ((sig (server:mk-signature))) + (set! *my-client-signature* sig) + *my-client-signature*))) + +;; +;; +;; 1 Hello, world! Goodbye Dolly +;; Send msg to serverdat and receive result +(define (server:client-send-receive serverdat msg) + (let* ((url (server:make-server-url serverdat)) + (fullurl (conc url "/ctrl")) ;; (conc url "/?dat=" msg))) + (numretries 0)) + (handle-exceptions + exn + (if (< numretries 200) + (server:client-send-receive serverdat msg)) + (begin + (debug:print-info 11 "fullurl=" fullurl "\n") + ;; set up the http-client here + (max-retry-attempts 100) + (retry-request? (lambda (request) + (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10)) + (set! numretries (+ numretries 1)) + #t)) + ;; send the data and get the response + ;; extract the needed info from the http data and + ;; process and return it. + (let* ((res (with-input-from-request fullurl + ;; #f + ;; msg + (list (cons 'dat msg)) + read-string))) + (debug:print-info 11 "got res=" res) + (let ((match (string-search (regexp "(.*)<.body>") res))) + (debug:print-info 11 "match=" match) + (let ((final (cadr match))) + (debug:print-info 11 "final=" final) + final))))))) + +(define (server:client-login serverdat) + (max-retry-attempts 100) + (cdb:login serverdat *toppath* (server:get-client-signature))) + +;; Not currently used! But, I think it *should* be used!!! +(define (server:client-logout serverdat) + (let ((ok (and (socket? serverdat) + (cdb:logout serverdat *toppath* (server:get-client-signature))))) + ;; (close-socket serverdat) + ok)) + +(define (server:client-connect iface port) + (let* ((login-res #f) + (serverdat (list iface port))) + (set! login-res (server:client-login serverdat)) + (if (and (not (null? login-res)) + (car login-res)) + (begin + (debug:print-info 2 "Logged in and connected to " iface ":" port) + (set! *runremote* serverdat) + serverdat) + (begin + (debug:print-info 2 "Failed to login or connect to " iface ":" port) + (set! *runremote* #f) + #f)))) + +;; Do all the connection work, start a server if not already running +(define (server:client-setup #!key (numtries 50)) + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: failed to find megatest.config, exiting") + (exit)))) + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) + (if hostinfo + (let ((host (list-ref hostinfo 0)) + (iface (list-ref hostinfo 1)) + (port (list-ref hostinfo 2)) + (pid (list-ref hostinfo 3))) + (debug:print-info 2 "Setting up to connect to " hostinfo) + (server:client-connect iface port)) ;; ) + (if (> numtries 0) + (let ((exe (car (argv))) + (pid #f)) + (debug:print-info 0 "No server available, attempting to start one...") + (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) + (string-intersperse *verbosity* ",") + (conc *verbosity*))))) + ;; (set! pid (process-fork (lambda () + ;; (current-input-port (open-input-file "/dev/null")) + ;; (current-output-port (open-output-file "/dev/null")) + ;; (current-error-port (open-output-file "/dev/null")) + ;; (server:launch)))) + (let loop ((count 0)) + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) + (if (not hostinfo) + (begin + (debug:print-info 0 "Waiting for server pid=" pid " to start") + (sleep 2) ;; give server time to start + (if (< count 5) + (loop (+ count 1))))))) + ;; we are starting a server, do not try again! That can lead to + ;; recursively starting many processes!!! + (server:client-setup numtries: 0)) + (debug:print-info 1 "Too many attempts, giving up"))))) + +;; run server:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (server:keep-running) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *runremote*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (sleep 4) + (loop)))))) + (iface (car server-info)) + (port (cadr server-info)) + (last-access 0) + (tdb (tasks:open-db)) + (spid (tasks:server-get-server-id tdb #f iface port #f))) + (print "Keep-running got server pid " spid ", using iface " iface " and port " port) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (tasks:server-update-heartbeat tdb spid) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (if (> (+ last-access + ;; (* 50 60 60) ;; 48 hrs + ;; 60 ;; one minute + ;; (* 60 60) ;; one hour + (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (tasks:server-deregister-self tdb (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit))))))) + +;; all routes though here end in exit ... +(define (server:launch) + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: cannot find megatest.config, exiting") + (exit)))) + (debug:print-info 2 "Starting the standalone server") + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) + (debug:print 11 "server:launch hostinfo=" hostinfo) + (if hostinfo + (debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) + (if *toppath* + (let* ((th2 (make-thread (lambda () + (server:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-"))) "Server run")) + (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) + ) + (thread-start! th2) + (thread-start! th3) + (set! *didsomething* #t) + (thread-join! th2) + ) + (debug:print 0 "ERROR: Failed to setup for megatest"))) + (exit))) + +(define (server:client-signal-handler signum) + (handle-exceptions + exn + (debug:print " ... exiting ...") + (let ((th1 (make-thread (lambda () + "") ;; do nothing for now (was flush out last call if applicable) + "eat response")) + (th2 (make-thread (lambda () + (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") + (thread-sleep! 1) ;; give the flush one second to do it's stuff + (debug:print 0 " Done.") + (exit 4)) + "exit on ^C timer"))) + (thread-start! th2) + (thread-start! th1) + (thread-join! th2)))) + +(define (server:client-launch) + (set-signal-handler! signal/int server:client-signal-handler) + (if (server:client-setup) + (debug:print-info 2 "connected as client") + (begin + (debug:print 0 "ERROR: Failed to connect as client") + (exit)))) + Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -13,18 +13,18 @@ (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) (import (prefix sqlite3 sqlite3:)) (use spiffy uri-common intarweb http-client spiffy-request-vars) -(tcp-buffer-size 2048) - (declare (unit server)) (declare (uses common)) (declare (uses db)) (declare (uses tests)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. +(declare (uses http-transport)) +(declare (uses zmq-transport)) (include "common_records.scm") (include "db_records.scm") (define (server:make-server-url hostport) @@ -107,70 +107,10 @@ (server:try-start-server ipaddrstr start-port) ;; lite3:finalize! db))) )) - -;; (define (server:main-loop) -;; (print "INFO: Exectuing main server loop") -;; (access-log "megatest-http.log") -;; (server-bind-address #f) -;; (define-page (main-page-path) -;; (lambda () -;; (let ((dat ($ "dat"))) -;; ;; (with-request-variables (dat) -;; (debug:print-info 12 "Got dat=" dat) -;; (let* ((packet (db:string->obj dat)) -;; (qtype (cdb:packet-get-qtype packet))) -;; (debug:print-info 12 "server=> received packet=" packet) -;; (if (not (member qtype '(sync ping))) -;; (begin -;; (mutex-lock! *heartbeat-mutex*) -;; (set! *last-db-access* (current-seconds)) -;; (mutex-unlock! *heartbeat-mutex*))) -;; (let ((res (open-run-close db:process-queue-item open-db packet))) -;; (debug:print-info 11 "Return value from db:process-queue-item is " res) -;; res)))))) - -;;; (use spiffy uri-common intarweb) -;;; -;;; (root-path "/var/www") -;;; -;;; (vhost-map `(((* any) . ,(lambda (continue) -;;; (if (equal? (uri-path (request-uri (current-request))) -;;; '(/ "hey")) -;;; (send-response body: "hey there!\n" -;;; headers: '((content-type text/plain))) -;;; (continue)))))) -;;; -;;; (start-server port: 12345) - -;; This is recursively run by server:run until sucessful -;; -(define (server:try-start-server ipaddrstr portnum) - (handle-exceptions - exn - (begin - (print-error-message exn) - (if (< portnum 9000) - (begin - (print "WARNING: failed to start on portnum: " portnum ", trying next port") - (thread-sleep! 0.1) - (open-run-close tasks:remove-server-records tasks:open-db) - (server:try-start-server ipaddrstr (+ portnum 1))) - (print "ERROR: Tried and tried but could not start the server"))) - (set! *runremote* (list ipaddrstr portnum)) - (open-run-close tasks:remove-server-records tasks:open-db) - (open-run-close tasks:server-register - tasks:open-db - (current-process-id) - ipaddrstr portnum 0 'live) - (print "INFO: Trying to start server on " ipaddrstr ":" portnum) - ;; This starts the spiffy server - (start-server port: portnum) - (print "INFO: server has been stopped"))) - (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) @@ -185,11 +125,19 @@ ;; (define (server:reply return-addr query-sig success/fail result) (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) ;; (send-message pubsock target send-more: #t) ;; (send-message pubsock - (db:obj->string (vector success/fail query-sig result))) + (case *transport-type* + ((fs) result) + ((http)(db:obj->string (vector success/fail query-sig result))) + ((zmq) + (send-message pubsock target send-more: #t) + (send-message pubsock (db:obj->string (vector success/fail query-sig result)))) + (else + (debug:print 0 "ERROR: unrecognised transport type: " *transport-type*) + result))) ;;====================================================================== ;; C L I E N T S ;;====================================================================== @@ -197,54 +145,17 @@ (if *my-client-signature* *my-client-signature* (let ((sig (server:mk-signature))) (set! *my-client-signature* sig) *my-client-signature*))) -;; -;; -;; 1 Hello, world! Goodbye Dolly -;; Send msg to serverdat and receive result -(define (server:client-send-receive serverdat msg) - (let* ((url (server:make-server-url serverdat)) - (fullurl (conc url "/ctrl")) ;; (conc url "/?dat=" msg))) - (numretries 0)) - (handle-exceptions - exn - (if (< numretries 200) - (server:client-send-receive serverdat msg)) - (begin - (debug:print-info 11 "fullurl=" fullurl "\n") - ;; set up the http-client here - (max-retry-attempts 100) - (retry-request? (lambda (request) - (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10)) - (set! numretries (+ numretries 1)) - #t)) - ;; send the data and get the response - ;; extract the needed info from the http data and - ;; process and return it. - (let* ((res (with-input-from-request fullurl - ;; #f - ;; msg - (list (cons 'dat msg)) - read-string))) - (debug:print-info 11 "got res=" res) - (let ((match (string-search (regexp "(.*)<.body>") res))) - (debug:print-info 11 "match=" match) - (let ((final (cadr match))) - (debug:print-info 11 "final=" final) - final))))))) - (define (server:client-login serverdat) - (max-retry-attempts 100) (cdb:login serverdat *toppath* (server:get-client-signature))) ;; Not currently used! But, I think it *should* be used!!! (define (server:client-logout serverdat) (let ((ok (and (socket? serverdat) (cdb:logout serverdat *toppath* (server:get-client-signature))))) - ;; (close-socket serverdat) ok)) (define (server:client-connect iface port) (let* ((login-res #f) (serverdat (list iface port))) @@ -258,134 +169,61 @@ (begin (debug:print-info 2 "Failed to login or connect to " iface ":" port) (set! *runremote* #f) #f)))) -;; Do all the connection work, start a server if not already running +;; Do all the connection work, look up the transport type and set up the +;; connection if required. +;; (define (server:client-setup #!key (numtries 50)) (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: failed to find megatest.config, exiting") (exit)))) - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (if hostinfo - (let ((host (list-ref hostinfo 0)) - (iface (list-ref hostinfo 1)) - (port (list-ref hostinfo 2)) - (pid (list-ref hostinfo 3))) - (debug:print-info 2 "Setting up to connect to " hostinfo) - (server:client-connect iface port)) ;; ) - (if (> numtries 0) - (let ((exe (car (argv))) - (pid #f)) - (debug:print-info 0 "No server available, attempting to start one...") - (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) - (string-intersperse *verbosity* ",") - (conc *verbosity*))))) - ;; (set! pid (process-fork (lambda () - ;; (current-input-port (open-input-file "/dev/null")) - ;; (current-output-port (open-output-file "/dev/null")) - ;; (current-error-port (open-output-file "/dev/null")) - ;; (server:launch)))) - (let loop ((count 0)) - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (if (not hostinfo) - (begin - (debug:print-info 0 "Waiting for server pid=" pid " to start") - (sleep 2) ;; give server time to start - (if (< count 5) - (loop (+ count 1))))))) - ;; we are starting a server, do not try again! That can lead to - ;; recursively starting many processes!!! - (server:client-setup numtries: 0)) - (debug:print-info 1 "Too many attempts, giving up"))))) - -;; run server:keep-running in a parallel thread to monitor that the db is being -;; used and to shutdown after sometime if it is not. -;; -(define (server:keep-running) - ;; if none running or if > 20 seconds since - ;; server last used then start shutdown - ;; This thread waits for the server to come alive - (let* ((server-info (let loop () - (let ((sdat #f)) - (mutex-lock! *heartbeat-mutex*) - (set! sdat *runremote*) - (mutex-unlock! *heartbeat-mutex*) - (if sdat sdat - (begin - (sleep 4) - (loop)))))) - (iface (car server-info)) - (port (cadr server-info)) - (last-access 0) - (tdb (tasks:open-db)) - (spid (tasks:server-get-server-id tdb #f iface port #f))) - (print "Keep-running got server pid " spid ", using iface " iface " and port " port) - (let loop ((count 0)) - (thread-sleep! 4) ;; no need to do this very often - ;; NB// sync currently does NOT return queue-length - (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) - ;; (print "Server running, count is " count) - (if (< count 1) ;; 3x3 = 9 secs aprox - (loop (+ count 1))) - - ;; NOTE: Get rid of this mechanism! It really is not needed... - (tasks:server-update-heartbeat tdb spid) - - ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access - (mutex-lock! *heartbeat-mutex*) - (set! last-access *last-db-access*) - (mutex-unlock! *heartbeat-mutex*) - (if (> (+ last-access - ;; (* 50 60 60) ;; 48 hrs - ;; 60 ;; one minute - ;; (* 60 60) ;; one hour - (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. - ) - (current-seconds)) - (begin - (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) - (loop 0)) - (begin - (debug:print-info 0 "Starting to shutdown the server.") - ;; need to delete only *my* server entry (future use) - (set! *time-to-exit* #t) - (tasks:server-deregister-self tdb (get-host-name)) - (thread-sleep! 1) - (debug:print-info 0 "Max cached queries was " *max-cache-size*) - (debug:print-info 0 "Server shutdown complete. Exiting") - (exit))))))) + (debug:print-info 11 "*transport-type* is " *transport-type*) + (let* ((hostinfo (if (not *transport-type*) ;; If we dont' already have transport type set then figure it out + (open-run-close tasks:get-best-server tasks:open-db) + #f))) + ;; if have hostinfo then extract the transport type + ;; else fall back to fs + (debug:print-info 11 "CLIENT SETUP, hostinfo=" hostinfo) + (set! *transport-type* (if hostinfo + (string->symbol (tasks:hostinfo-get-transport hostinfo)) + 'fs)) + (debug:print-info 1 "Using transport type of " *transport-type* (if hostinfo (conc " to connect to " hostinfo) "")) + (case *transport-type* + ((fs)(if (not *megatest-db*)(set! *megatest-db* (open-db)))) + ((http) + (http-transport:client-connect (tasks:hostinfo-get-interface hostinfo) + (tasks:hostinfo-get-port hostinfo))) + ((zmq) + (zmq-transport:client-connect (tasks:hostinfo-get-interface hostinfo) + (tasks:hostinfo-get-port hostinfo) + (tasks:hostinfo-get-pubport hostinfo))) + (else ;; default to fs + (set! *transport-type* 'fs) + (set! *megatest-db* (open-db)))))) + + ;; all routes though here end in exit ... -(define (server:launch) +(define (server:launch transport) (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, exiting") (exit)))) - (debug:print-info 2 "Starting the standalone server") - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (debug:print 11 "server:launch hostinfo=" hostinfo) - (if hostinfo - (debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) - (if *toppath* - (let* ((th2 (make-thread (lambda () - (server:run - (if (args:get-arg "-server") - (args:get-arg "-server") - "-"))) "Server run")) - (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) - ) - (thread-start! th2) - (thread-start! th3) - (set! *didsomething* #t) - (thread-join! th2) - ) - (debug:print 0 "ERROR: Failed to setup for megatest"))) - (exit))) + (debug:print-info 2 "Starting server using " transport " transport") + (set! *transport-type* transport) + (case transport + ((fs) (exit)) ;; there is no "fs" transport + ((http) (http-transport:launch)) + ((zmq) (zmq-transport:launch)) + (else + (debug:print "WARNING: unrecognised transport " transport) + (exit)))) (define (server:client-signal-handler signum) (handle-exceptions exn (debug:print " ... exiting ...") Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -61,15 +61,17 @@ (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY, pid INTEGER, interface TEXT, hostname TEXT, port INTEGER, + pubport INTEGER, start_time TIMESTAMP, priority INTEGER, state TEXT, mt_version TEXT, heartbeat TIMESTAMP, + transport TEXT, CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));") (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY, server_id INTEGER, pid INTEGER, hostname TEXT, @@ -83,22 +85,33 @@ ;;====================================================================== ;; Server and client management ;;====================================================================== +;; make-vector-record tasks hostinfo id interface port pubport transport pid hostname +(define (tasks:hostinfo-get-id vec) (vector-ref vec 0)) +(define (tasks:hostinfo-get-interface vec) (vector-ref vec 1)) +(define (tasks:hostinfo-get-port vec) (vector-ref vec 2)) +(define (tasks:hostinfo-get-pubport vec) (vector-ref vec 3)) +(define (tasks:hostinfo-get-transport vec) (vector-ref vec 4)) +(define (tasks:hostinfo-get-pid vec) (vector-ref vec 5)) +(define (tasks:hostinfo-get-hostname vec) (vector-ref vec 6)) + ;; state: 'live, 'shutting-down, 'dead -(define (tasks:server-register mdb pid interface port priority state) +(define (tasks:server-register mdb pid interface port priority state transport #!key (pubport -1)) (debug:print-info 11 "tasks:server-register " pid " " interface " " port " " priority " " state) (sqlite3:execute mdb - "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface) - VALUES(?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?);" - pid (get-host-name) port priority (conc state) megatest-version interface) - (list + "INSERT OR REPLACE INTO servers (pid,hostname,port,pubport,start_time,priority,state,mt_version,heartbeat,interface,transport) + VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?,?);" + pid (get-host-name) port pubport priority (conc state) megatest-version interface (conc transport)) + (vector (tasks:server-get-server-id mdb (get-host-name) interface port pid) interface port + pubport + transport )) ;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used! (define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)(action 'markdead)) (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid) @@ -176,15 +189,15 @@ ;; remove any others. will not necessarily remove all! (define (tasks:get-best-server mdb) (let ((res '()) (best #f)) (sqlite3:for-each-row - (lambda (id hostname interface port pid) - (set! res (cons (list hostname interface port pid id) res)) + (lambda (id interface port pubport transport pid hostname) + (set! res (cons (vector id interface port pubport transport pid hostname) res)) (debug:print-info 2 "Found existing server " hostname ":" port " registered in db")) mdb - "SELECT id,hostname,interface,port,pid FROM servers + "SELECT id,interface,port,pubport,transport,pid,hostname FROM servers WHERE strftime('%s','now')-heartbeat < 10 AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version) ;; for now we are keeping only one server registered in the db, return #f or first server found (if (null? res) #f (car res)))) @@ -253,14 +266,14 @@ (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row - (lambda (id pid hostname interface port start-time priority state mt-version last-update) - (set! res (cons (vector id pid hostname interface port start-time priority state mt-version last-update) res))) + (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport) + (set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport) res))) mdb - "SELECT id,pid,hostname,interface,port,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update FROM servers ORDER BY start_time DESC;") + "SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport FROM servers ORDER BY start_time DESC;") res)) ;;====================================================================== ;; Tasks and Task monitors ADDED zmq-transport.scm Index: zmq-transport.scm ================================================================== --- /dev/null +++ zmq-transport.scm @@ -0,0 +1,475 @@ + +;; Copyright 2006-2012, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +(require-extension (srfi 18) extras tcp s11n) + +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) +(import (prefix sqlite3 sqlite3:)) + +(use zmq) + +(declare (unit zmq-transport)) + +(declare (uses common)) +(declare (uses db)) +(declare (uses tests)) +(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. +(declare (uses server)) + +(include "common_records.scm") +(include "db_records.scm") + +;; Transition to pub --> sub with pull <-- push +;; +;; 1. client sends request to server via push to the pull port +;; 2. server puts request in queue or processes immediately as appropriate +;; 3. server puts responses from completed requests into pub port +;; +;; TODO +;; +;; Done Tested +;; [x] [ ] 1. Add columns pullport pubport to servers table +;; [x] [ ] 2. Add rm of monitor.db if older than 11/12/2012 +;; [x] [ ] 3. Add create of pullport and pubport with finding of available ports +;; [x] [ ] 4. Add client compose of request +;; [x] [ ] - name of client: testname/itempath-test_id-hostname +;; [x] [ ] - name of request: callname, params +;; [x] [ ] - request key: f(clientname, callname, params) +;; [x] [ ] 5. Add processing of subscription hits +;; [x] [ ] - done when get key +;; [x] [ ] - return results +;; [x] [ ] 6. Add timeout processing +;; [x] [ ] - after 60 seconds +;; [ ] [ ] i. check server alive, connect to new if necessary +;; [ ] [ ] ii. resend request +;; [ ] [ ] 7. Turn self ping back on + +(define (zmq-transport:make-server-url hostport) + (if (not hostport) + #f + (conc "tcp://" (car hostport) ":" (cadr hostport)))) + +(define *server-loop-heart-beat* (current-seconds)) +(define *heartbeat-mutex* (make-mutex)) + +;;====================================================================== +;; S E R V E R +;;====================================================================== + +(define-inline (zmqsock:get-pub dat)(vector-ref dat 0)) +(define-inline (zmqsock:get-pull dat)(vector-ref dat 1)) +(define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0)) +(define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0)) + +(define (zmq-transport:run hostn) + (debug:print 2 "Attempting to start the server ...") + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") + (exit)))) + (let* ((zmq-sdat1 #f) + (zmq-sdat2 #f) + (pull-socket #f) + (pub-socket #f) + (p1 #f) + (p2 #f) + (zmq-sockets-dat #f) + (iface (if (string=? "-" hostn) + "*" ;; (get-host-name) + hostn)) + (hostname (get-host-name)) + (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + #f))) + (if ipstr ipstr hostname))) + (last-run 0)) + (set! zmq-sockets-dat (zmq-transport:setup-ports ipaddrstr (if (args:get-arg "-port") + (string->number (args:get-arg "-port")) + (+ 5000 (random 1001))))) + + (set! zmq-sdat1 (car zmq-sockets-dat)) + (set! pull-socket (cadr zmq-sdat1)) ;; (iface s port) + (set! p1 (caddr zmq-sdat1)) + + (set! zmq-sdat2 (cadr zmq-sockets-dat)) + (set! pub-socket (cadr zmq-sdat2)) + (set! p2 (caddr zmq-sdat2)) + + (set! *cache-on* #t) + + ;; what to do when we quit + ;; +;; (on-exit (lambda () +;; (if (and *toppath* *server-info*) +;; (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*)) +;; (let loop () +;; (let ((queue-len 0)) +;; (thread-sleep! (random 5)) +;; (mutex-lock! *incoming-mutex*) +;; (set! queue-len (length *incoming-data*)) +;; (mutex-unlock! *incoming-mutex*) +;; (if (> queue-len 0) +;; (begin +;; (debug:print-info 0 "Queue not flushed, waiting ...") +;; (loop)))))))) + + ;; The heavy lifting + ;; + ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime + ;; + (let loop ((queue-lst '())) + (let* ((rawmsg (receive-message* pull-socket)) + (packet (db:string->obj rawmsg)) + (qtype (cdb:packet-get-qtype packet))) + (debug:print-info 12 "server=> received packet=" packet) + (if (not (member qtype '(sync ping))) + (begin + (mutex-lock! *heartbeat-mutex*) + (set! *last-db-access* (current-seconds)) + (mutex-unlock! *heartbeat-mutex*))) + (if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue + (begin + (open-run-close db:process-queue #f pub-socket (cons packet queue-lst)) + (loop '())) + (loop (cons packet queue-lst))))))) + +;; run zmq-transport:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (zmq-transport:keep-running) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *server-info*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (sleep 4) + (loop)))))) + (iface (cadr server-info)) + (pullport (caddr server-info)) + (pubport (cadddr server-info)) ;; id interface pullport pubport) + (zmq-sockets (zmq-transport:client-connect iface pullport pubport)) + (last-access 0)) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + (let ((queue-len (cdb:client-call zmq-sockets 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (if (> (+ last-access + ;; (* 50 60 60) ;; 48 hrs + ;; 60 ;; one minute + ;; (* 60 60) ;; one hour + (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit))))))) + +(define (zmq-transport:find-free-port-and-open iface s port stype #!key (trynum 50)) + (let ((s (if s s (make-socket stype))) + (p (if (number? port) port 5555)) + (old-handler (current-exception-handler))) + (handle-exceptions + exn + (begin + (debug:print 0 "Failed to bind to port " p ", trying next port") + (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) + ;; (old-handler) + ;; (print-call-chain) + (if (> trynum 0) + (zmq-transport:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1)) + (debug:print-info 0 "Tried ports up to " p + " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use")) + (exit)) ;; To exit or not? That is the question. + (let ((zmq-url (conc "tcp://" iface ":" p))) + (debug:print 2 "Trying to start server on " zmq-url) + (bind-socket s zmq-url) + (list iface s port))))) + +(define (zmq-transport:setup-ports ipaddrstr startport) + (let* ((s1 (zmq-transport:find-free-port-and-open ipaddrstr #f startport 'pull)) + (p1 (caddr s1)) + (s2 (zmq-transport:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub)) + (p2 (caddr s2))) + (set! *runremote* #f) + (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2) + (mutex-lock! *heartbeat-mutex*) + (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live 'zmq)) + (mutex-unlock! *heartbeat-mutex*) + (list s1 s2))) + +(define (zmq-transport:mk-signature) + (message-digest-string (md5-primitive) + (with-output-to-string + (lambda () + (write (list (current-directory) + (argv))))))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +;;====================================================================== +;; C L I E N T S +;;====================================================================== + +;; +(define (zmq-transport:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '())) + (debug:print-info 3 "client-connect " iface ":" port ", type=" type ", subscriptions=" subscriptions) + (let ((connect-ok #f) + (zmq-socket (if context + (make-socket type context) + (make-socket type))) + (conurl (zmq-transport:make-server-url (list iface port)))) + (if (socket? zmq-socket) + (begin + ;; first apply subscriptions + (for-each (lambda (subscription) + (debug:print 2 "Subscribing to " subscription) + (socket-option-set! zmq-socket 'subscribe subscription)) + subscriptions) + (connect-socket zmq-socket conurl) + zmq-socket) + (begin + (debug:print 0 "ERROR: Failed to open socket to " conurl) + #f)))) + +(define (zmq-transport:client-connect iface pullport pubport) + (let* ((push-socket (zmq-transport:client-socket-connect iface pullport type: 'push)) + (sub-socket (zmq-transport:client-socket-connect iface pubport + type: 'sub + subscriptions: (list (zmq-transport:get-client-signature) "all"))) + (zmq-sockets (vector push-socket sub-socket)) + (login-res #f)) + (set! login-res (zmq-transport:client-login zmq-sockets)) + (if (and (not (null? login-res)) + (car login-res)) + (begin + (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".") + (set! *runremote* zmq-sockets) + zmq-sockets) + (begin + (debug:print-info 2 "Failed to login or connect to " conurl) + (set! *runremote* #f) + #f)))) + +;; run zmq-transport:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (zmq-transport:keep-running) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *runremote*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (sleep 4) + (loop)))))) + (iface (car server-info)) + (port (cadr server-info)) + (last-access 0) + (tdb (tasks:open-db)) + (spid (tasks:server-get-server-id tdb #f iface port #f))) + (print "Keep-running got server pid " spid ", using iface " iface " and port " port) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (tasks:server-update-heartbeat tdb spid) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (if (> (+ last-access + ;; (* 50 60 60) ;; 48 hrs + ;; 60 ;; one minute + ;; (* 60 60) ;; one hour + (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (tasks:server-deregister-self tdb (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit))))))) + +;; all routes though here end in exit ... +(define (zmq-transport:launch) + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: cannot find megatest.config, exiting") + (exit)))) + (debug:print-info 2 "Starting zmq server") + (if *toppath* + (let* (;; (th1 (make-thread (lambda () + ;; (let ((server-info #f)) + ;; ;; wait for the server to be online and available + ;; (let loop () + ;; (debug:print-info 2 "Waiting for the server to come online before starting heartbeat") + ;; (thread-sleep! 2) + ;; (mutex-lock! *heartbeat-mutex*) + ;; (set! server-info *server-info* ) + ;; (mutex-unlock! *heartbeat-mutex*) + ;; (if (not server-info)(loop))) + ;; (debug:print 2 "Server alive, starting self-ping") + ;; (zmq-transport:self-ping server-info) + ;; )) + ;; "Self ping")) + (th2 (make-thread (lambda () + (zmq-transport:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-"))) "Server run")) + (th3 (make-thread (lambda ()(zmq-transport:keep-running)) "Keep running")) + ) + (set! *client-non-blocking-mode* #t) + ;; (thread-start! th1) + (thread-start! th2) + (thread-start! th3) + (set! *didsomething* #t) + ;; (thread-join! th3) + (thread-join! th2) + ) + (debug:print 0 "ERROR: Failed to setup for megatest"))) + +(define (zmq-transport:client-signal-handler signum) + (handle-exceptions + exn + (debug:print " ... exiting ...") + (let ((th1 (make-thread (lambda () + (if (not *received-response*) + (receive-message* *runremote*))) ;; flush out last call if applicable + "eat response")) + (th2 (make-thread (lambda () + (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") + (thread-sleep! 3) ;; give the flush three seconds to do it's stuff + (debug:print 0 " Done.") + (exit 4)) + "exit on ^C timer"))) + (thread-start! th2) + (thread-start! th1) + (thread-join! th2)))) + +(define (zmq-transport:client-launch) + (set-signal-handler! signal/int zmq-transport:client-signal-handler) + (if (zmq-transport:client-setup) + (debug:print-info 2 "connected as client") + (begin + (debug:print 0 "ERROR: Failed to connect as client") + (exit)))) + +;;====================================================================== +;; Defunct functions +;;====================================================================== + +;; ping a server and return number of clients or #f (if no response) +;; NOT IN USE! +(define (zmq-transport:ping host port #!key (secs 10)(return-socket #f)) + (cdb:use-non-blocking-mode + (lambda () + (let* ((res #f) + (th1 (make-thread + (lambda () + (let* ((zmq-context (make-context 1)) + (zmq-socket (zmq-transport:client-connect host port context: zmq-context))) + (if zmq-socket + (if (zmq-transport:client-login zmq-socket) + (let ((numclients (cdb:num-clients zmq-socket))) + (if (not return-socket) + (begin + (zmq-transport:client-logout zmq-socket) + (close-socket zmq-socket))) + (set! res (list #t numclients (if return-socket zmq-socket #f)))) + (begin + ;; (close-socket zmq-socket) + (set! res (list #f "CAN'T LOGIN" #f)))) + (set! res (list #f "CAN'T CONNECT" #f))))) + "Ping: th1")) + (th2 (make-thread + (lambda () + (let loop ((count 1)) + (debug:print-info 1 "Ping " count " server on " host " at port " port) + (thread-sleep! 2) + (if (< count (/ secs 2)) + (loop (+ count 1)))) + ;; (thread-terminate! th1) + (set! res (list #f "TIMED OUT" #f))) + "Ping: th2"))) + (thread-start! th2) + (thread-start! th1) + (handle-exceptions + exn + (set! res (list #f "TIMED OUT" #f)) + (thread-join! th1 secs)) + res)))) + +;; (define (zmq-transport:self-ping server-info) +;; ;; server-info: server-id interface pullport pubport +;; (let ((iface (list-ref server-info 1)) +;; (pullport (list-ref server-info 2)) +;; (pubport (list-ref server-info 3))) +;; (zmq-transport:client-connect iface pullport pubport) +;; (let loop () +;; (thread-sleep! 2) +;; (cdb:client-call *runremote* 'ping #t) +;; (debug:print 4 "zmq-transport:self-ping - I'm alive on " iface ":" pullport "/" pubport "!") +;; (mutex-lock! *heartbeat-mutex*) +;; (set! *server-loop-heart-beat* (current-seconds)) +;; (mutex-unlock! *heartbeat-mutex*) +;; (loop)))) + +(define (zmq-transport:reply pubsock target query-sig success/fail result) + (debug:print-info 11 "zmq-transport:reply target=" target ", result=" result) + (send-message pubsock target send-more: #t) + (send-message pubsock (db:obj->string (vector success/fail query-sig result)))) +