Index: Makefile ================================================================== --- Makefile +++ Makefile @@ -3,11 +3,12 @@ CSCOPTS= INSTALL=install SRCFILES = common.scm items.scm launch.scm \ ods.scm runconfig.scm server.scm configf.scm \ db.scm keys.scm margs.scm megatest-version.scm \ - process.scm runs.scm tasks.scm tests.scm genexample.scm + process.scm runs.scm tasks.scm tests.scm genexample.scm \ + fs-transport.scm GUISRCF = dashboard.scm dashboard-tests.scm dashboard-guimonitor.scm dashboard-main.scm OFILES = $(SRCFILES:%.scm=%.o) GOFILES = $(GUISRCF:%.scm=%.o) Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -40,10 +40,12 @@ (define *globalexitstatus* 0) ;; attempt to work around possible thread issues (define *passnum* 0) ;; when running track calls to run-tests or similar ;; SERVER (define *my-client-signature* #f) +(define *transport-type* 'fs) +(define *megatest-db* #f) (define *rpc:listener* #f) ;; if set up for server communication this will hold the tcp port (define *runremote* #f) ;; if set up for server communication this will hold (define *last-db-access* (current-seconds)) ;; update when db is accessed via server (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -22,10 +22,11 @@ (declare (unit db)) (declare (uses common)) (declare (uses keys)) (declare (uses ods)) +(declare (uses fs-transport)) (include "common_records.scm") (include "db_records.scm") (include "key_records.scm") (include "run_records.scm") @@ -1126,58 +1127,35 @@ res)) ;; params = 'target cached remparams ;; ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime +;; +;; cdb:client-call is the unified interface to all the transports. It dispatches the +;; query to a server routine (e.g. server:client-send-recieve) that +;; transports the data to the server where it is passed to db:process-queue-item +;; which either returns the data to the calling server routine or +;; directly calls the returning procedure (e.g. zmq). ;; (define (cdb:client-call serverdat qtype immediate numretries . params) (debug:print-info 11 "cdb:client-call serverdat=" serverdat ", qtype=" qtype ", immediate=" immediate ", numretries=" numretries ", params=" params) - ;; (handle-exceptions - ;; exn - ;; (begin - ;; (thread-sleep! 5) - ;; (if (> numretries 0)(apply cdb:client-call serverdat qtype immediate (- numretries 1) params))) - (let* ((client-sig (server:get-client-signature)) - (query-sig (message-digest-string (md5-primitive) (conc qtype immediate params))) - (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params)))) - ) - (debug:print-info 11 "zdat=" zdat) - (let* ( - (res #f) - (rawdat (server:client-send-receive serverdat zdat)) - (tmp #f)) - (debug:print-info 11 "Sent " zdat ", received " rawdat) - (set! tmp (db:string->obj rawdat)) - ;; (if (equal? query-sig (vector-ref myres 1)) - ;; (set! res - (vector-ref tmp 2) - ;; (loop (server:client-send-receive serverdat zdat))))))) - ;; (timeout (lambda () - ;; (let loop ((n numretries)) - ;; (thread-sleep! 15) - ;; (if (not res) - ;; (if (> numretries 0) - ;; (begin - ;; (debug:print 2 "WARNING: no reply to query " params ", trying resend") - ;; (debug:print-info 11 "re-sending message") - ;; (apply cdb:client-call serverdat qtype immediate numretries params) - ;; (debug:print-info 11 "message re-sent") - ;; (loop (- n 1))) - ;; ;; (apply cdb:client-call serverdats qtype immediate (- numretries 1) params)) - ;; (begin - ;; (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.") - ;; (exit 5)))))))) - ;; (send-receive) - ))) - ;; (debug:print-info 11 "Starting threads") - ;; (let ((th1 (make-thread send-receive "send receive")) - ;; (th2 (make-thread timeout "timeout"))) - ;; (thread-start! th1) - ;; (thread-start! th2) - ;; (thread-join! th1) - ;; (debug:print-info 11 "cdb:client-call returning res=" res) - ;; res)))) + (case *transport-type* + ((fs) + (let ((packet (vector "na" qtype immediate "na" params 0))) + (fs:process-queue-item packet))) + ((http) + (let* ((client-sig (server:get-client-signature)) + (query-sig (message-digest-string (md5-primitive) (conc qtype immediate params))) + (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds))))) ;; (with-output-to-string (lambda ()(serialize params)))) + (debug:print-info 11 "zdat=" zdat) + (let* ((res #f) + (rawdat (server:client-send-receive serverdat zdat)) + (tmp #f)) + (debug:print-info 11 "Sent " zdat ", received " rawdat) + (set! tmp (db:string->obj rawdat)) + (vector-ref tmp 2) + ))))) (define (cdb:set-verbosity serverdat val) (cdb:client-call serverdat 'set-verbosity #f *default-numtries* val)) (define (cdb:login serverdat keyval signature) ADDED fs-transport.scm Index: fs-transport.scm ================================================================== --- /dev/null +++ fs-transport.scm @@ -0,0 +1,44 @@ + +;; Copyright 2006-2012, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +(require-extension (srfi 18) extras tcp s11n) + +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) +(import (prefix sqlite3 sqlite3:)) + +(use spiffy uri-common intarweb http-client spiffy-request-vars) + +(tcp-buffer-size 2048) + +(declare (unit fs-transport)) + +(declare (uses common)) +(declare (uses db)) +(declare (uses tests)) +(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. + +(include "common_records.scm") +(include "db_records.scm") + + +;;====================================================================== +;; F S T R A N S P O R T S E R V E R +;;====================================================================== + +;; There is no "server" per se but a convience routine to make it non +;; necessary to be reopening the db over and over again. +;; + +(define (fs:process-queue-item packet) + (if (not *megatest-db*) ;; we will require that (setup-for-run) has already been called + (set! *megatest-db* (open-db))) + (debug:print-info 11 "fs:process-queue-item called with packet=" packet) + (db:process-queue-item *megatest-db* packet)) + ADDED http-transport.scm Index: http-transport.scm ================================================================== --- /dev/null +++ http-transport.scm @@ -0,0 +1,412 @@ + +;; Copyright 2006-2012, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +(require-extension (srfi 18) extras tcp s11n) + +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) +(import (prefix sqlite3 sqlite3:)) + +(use spiffy uri-common intarweb http-client spiffy-request-vars) + +(tcp-buffer-size 2048) + +(declare (unit server)) + +(declare (uses common)) +(declare (uses db)) +(declare (uses tests)) +(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. + +(include "common_records.scm") +(include "db_records.scm") + +(define (server:make-server-url hostport) + (if (not hostport) + #f + (conc "http://" (car hostport) ":" (cadr hostport)))) + +(define *server-loop-heart-beat* (current-seconds)) +(define *heartbeat-mutex* (make-mutex)) + +;;====================================================================== +;; S E R V E R +;;====================================================================== + +;; Call this to start the actual server +;; + +(define *db:process-queue-mutex* (make-mutex)) + +(define (server:run hostn) + (debug:print 2 "Attempting to start the server ...") + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") + (exit)))) + (let* (;; (iface (if (string=? "-" hostn) + ;; #f ;; (get-host-name) + ;; hostn)) + (db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily + (hostname (get-host-name)) + (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + #f))) + (if ipstr ipstr hostn))) ;; hostname))) + (start-port (if (args:get-arg "-port") + (string->number (args:get-arg "-port")) + (+ 5000 (random 1001)))) + (link-tree-path (config-lookup *configdat* "setup" "linktree"))) + (set! *cache-on* #t) + (root-path (if link-tree-path + link-tree-path + (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP! + + ;; Setup the web server and a /ctrl interface + ;; + (vhost-map `(((* any) . ,(lambda (continue) + ;; open the db on the first call + (if (not db)(set! db (open-db))) + (let* (($ (request-vars source: 'both)) + (dat ($ 'dat)) + (res #f)) + (cond + ((equal? (uri-path (request-uri (current-request))) + '(/ "hey")) + (send-response body: "hey there!\n" + headers: '((content-type text/plain)))) + ;; This is the /ctrl path where data is handed to the server and + ;; responses + ((equal? (uri-path (request-uri (current-request))) + '(/ "ctrl")) + (let* ((packet (db:string->obj dat)) + (qtype (cdb:packet-get-qtype packet))) + (debug:print-info 12 "server=> received packet=" packet) + (if (not (member qtype '(sync ping))) + (begin + (mutex-lock! *heartbeat-mutex*) + (set! *last-db-access* (current-seconds)) + (mutex-unlock! *heartbeat-mutex*))) + ;; (mutex-lock! *db:process-queue-mutex*) ;; trying a mutex + ;; (set! res (open-run-close db:process-queue-item open-db packet)) + (set! res (db:process-queue-item db packet)) + ;; (mutex-unlock! *db:process-queue-mutex*) + (debug:print-info 11 "Return value from db:process-queue-item is " res) + (send-response body: (conc "ctrl data\n" + res + "") + headers: '((content-type text/plain))))) + (else (continue)))))))) + (server:try-start-server ipaddrstr start-port) + ;; lite3:finalize! db))) + )) + + + +;; (define (server:main-loop) +;; (print "INFO: Exectuing main server loop") +;; (access-log "megatest-http.log") +;; (server-bind-address #f) +;; (define-page (main-page-path) +;; (lambda () +;; (let ((dat ($ "dat"))) +;; ;; (with-request-variables (dat) +;; (debug:print-info 12 "Got dat=" dat) +;; (let* ((packet (db:string->obj dat)) +;; (qtype (cdb:packet-get-qtype packet))) +;; (debug:print-info 12 "server=> received packet=" packet) +;; (if (not (member qtype '(sync ping))) +;; (begin +;; (mutex-lock! *heartbeat-mutex*) +;; (set! *last-db-access* (current-seconds)) +;; (mutex-unlock! *heartbeat-mutex*))) +;; (let ((res (open-run-close db:process-queue-item open-db packet))) +;; (debug:print-info 11 "Return value from db:process-queue-item is " res) +;; res)))))) + +;;; (use spiffy uri-common intarweb) +;;; +;;; (root-path "/var/www") +;;; +;;; (vhost-map `(((* any) . ,(lambda (continue) +;;; (if (equal? (uri-path (request-uri (current-request))) +;;; '(/ "hey")) +;;; (send-response body: "hey there!\n" +;;; headers: '((content-type text/plain))) +;;; (continue)))))) +;;; +;;; (start-server port: 12345) + +;; This is recursively run by server:run until sucessful +;; +(define (server:try-start-server ipaddrstr portnum) + (handle-exceptions + exn + (begin + (print-error-message exn) + (if (< portnum 9000) + (begin + (print "WARNING: failed to start on portnum: " portnum ", trying next port") + (thread-sleep! 0.1) + (open-run-close tasks:remove-server-records tasks:open-db) + (server:try-start-server ipaddrstr (+ portnum 1))) + (print "ERROR: Tried and tried but could not start the server"))) + (set! *runremote* (list ipaddrstr portnum)) + (open-run-close tasks:remove-server-records tasks:open-db) + (open-run-close tasks:server-register + tasks:open-db + (current-process-id) + ipaddrstr portnum 0 'live) + (print "INFO: Trying to start server on " ipaddrstr ":" portnum) + ;; This starts the spiffy server + (start-server port: portnum) + (print "INFO: server has been stopped"))) + +(define (server:mk-signature) + (message-digest-string (md5-primitive) + (with-output-to-string + (lambda () + (write (list (current-directory) + (argv))))))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +;; When using zmq this would send the message back (two step process) +;; with spiffy or rpc this simply returns the return data to be returned +;; +(define (server:reply return-addr query-sig success/fail result) + (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) + ;; (send-message pubsock target send-more: #t) + ;; (send-message pubsock + (db:obj->string (vector success/fail query-sig result))) + +;;====================================================================== +;; C L I E N T S +;;====================================================================== + +(define (server:get-client-signature) + (if *my-client-signature* *my-client-signature* + (let ((sig (server:mk-signature))) + (set! *my-client-signature* sig) + *my-client-signature*))) + +;; +;; +;; 1 Hello, world! Goodbye Dolly +;; Send msg to serverdat and receive result +(define (server:client-send-receive serverdat msg) + (let* ((url (server:make-server-url serverdat)) + (fullurl (conc url "/ctrl")) ;; (conc url "/?dat=" msg))) + (numretries 0)) + (handle-exceptions + exn + (if (< numretries 200) + (server:client-send-receive serverdat msg)) + (begin + (debug:print-info 11 "fullurl=" fullurl "\n") + ;; set up the http-client here + (max-retry-attempts 100) + (retry-request? (lambda (request) + (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10)) + (set! numretries (+ numretries 1)) + #t)) + ;; send the data and get the response + ;; extract the needed info from the http data and + ;; process and return it. + (let* ((res (with-input-from-request fullurl + ;; #f + ;; msg + (list (cons 'dat msg)) + read-string))) + (debug:print-info 11 "got res=" res) + (let ((match (string-search (regexp "(.*)<.body>") res))) + (debug:print-info 11 "match=" match) + (let ((final (cadr match))) + (debug:print-info 11 "final=" final) + final))))))) + +(define (server:client-login serverdat) + (max-retry-attempts 100) + (cdb:login serverdat *toppath* (server:get-client-signature))) + +;; Not currently used! But, I think it *should* be used!!! +(define (server:client-logout serverdat) + (let ((ok (and (socket? serverdat) + (cdb:logout serverdat *toppath* (server:get-client-signature))))) + ;; (close-socket serverdat) + ok)) + +(define (server:client-connect iface port) + (let* ((login-res #f) + (serverdat (list iface port))) + (set! login-res (server:client-login serverdat)) + (if (and (not (null? login-res)) + (car login-res)) + (begin + (debug:print-info 2 "Logged in and connected to " iface ":" port) + (set! *runremote* serverdat) + serverdat) + (begin + (debug:print-info 2 "Failed to login or connect to " iface ":" port) + (set! *runremote* #f) + #f)))) + +;; Do all the connection work, start a server if not already running +(define (server:client-setup #!key (numtries 50)) + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: failed to find megatest.config, exiting") + (exit)))) + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) + (if hostinfo + (let ((host (list-ref hostinfo 0)) + (iface (list-ref hostinfo 1)) + (port (list-ref hostinfo 2)) + (pid (list-ref hostinfo 3))) + (debug:print-info 2 "Setting up to connect to " hostinfo) + (server:client-connect iface port)) ;; ) + (if (> numtries 0) + (let ((exe (car (argv))) + (pid #f)) + (debug:print-info 0 "No server available, attempting to start one...") + (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) + (string-intersperse *verbosity* ",") + (conc *verbosity*))))) + ;; (set! pid (process-fork (lambda () + ;; (current-input-port (open-input-file "/dev/null")) + ;; (current-output-port (open-output-file "/dev/null")) + ;; (current-error-port (open-output-file "/dev/null")) + ;; (server:launch)))) + (let loop ((count 0)) + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) + (if (not hostinfo) + (begin + (debug:print-info 0 "Waiting for server pid=" pid " to start") + (sleep 2) ;; give server time to start + (if (< count 5) + (loop (+ count 1))))))) + ;; we are starting a server, do not try again! That can lead to + ;; recursively starting many processes!!! + (server:client-setup numtries: 0)) + (debug:print-info 1 "Too many attempts, giving up"))))) + +;; run server:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (server:keep-running) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *runremote*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (sleep 4) + (loop)))))) + (iface (car server-info)) + (port (cadr server-info)) + (last-access 0) + (tdb (tasks:open-db)) + (spid (tasks:server-get-server-id tdb #f iface port #f))) + (print "Keep-running got server pid " spid ", using iface " iface " and port " port) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (tasks:server-update-heartbeat tdb spid) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (if (> (+ last-access + ;; (* 50 60 60) ;; 48 hrs + ;; 60 ;; one minute + ;; (* 60 60) ;; one hour + (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (tasks:server-deregister-self tdb (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit))))))) + +;; all routes though here end in exit ... +(define (server:launch) + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: cannot find megatest.config, exiting") + (exit)))) + (debug:print-info 2 "Starting the standalone server") + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) + (debug:print 11 "server:launch hostinfo=" hostinfo) + (if hostinfo + (debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) + (if *toppath* + (let* ((th2 (make-thread (lambda () + (server:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-"))) "Server run")) + (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) + ) + (thread-start! th2) + (thread-start! th3) + (set! *didsomething* #t) + (thread-join! th2) + ) + (debug:print 0 "ERROR: Failed to setup for megatest"))) + (exit))) + +(define (server:client-signal-handler signum) + (handle-exceptions + exn + (debug:print " ... exiting ...") + (let ((th1 (make-thread (lambda () + "") ;; do nothing for now (was flush out last call if applicable) + "eat response")) + (th2 (make-thread (lambda () + (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") + (thread-sleep! 1) ;; give the flush one second to do it's stuff + (debug:print 0 " Done.") + (exit 4)) + "exit on ^C timer"))) + (thread-start! th2) + (thread-start! th1) + (thread-join! th2)))) + +(define (server:client-launch) + (set-signal-handler! signal/int server:client-signal-handler) + (if (server:client-setup) + (debug:print-info 2 "connected as client") + (begin + (debug:print 0 "ERROR: Failed to connect as client") + (exit)))) + ADDED rpc-transport.scm Index: rpc-transport.scm ================================================================== --- /dev/null +++ rpc-transport.scm @@ -0,0 +1,412 @@ + +;; Copyright 2006-2012, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +(require-extension (srfi 18) extras tcp s11n) + +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) +(import (prefix sqlite3 sqlite3:)) + +(use spiffy uri-common intarweb http-client spiffy-request-vars) + +(tcp-buffer-size 2048) + +(declare (unit server)) + +(declare (uses common)) +(declare (uses db)) +(declare (uses tests)) +(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. + +(include "common_records.scm") +(include "db_records.scm") + +(define (server:make-server-url hostport) + (if (not hostport) + #f + (conc "http://" (car hostport) ":" (cadr hostport)))) + +(define *server-loop-heart-beat* (current-seconds)) +(define *heartbeat-mutex* (make-mutex)) + +;;====================================================================== +;; S E R V E R +;;====================================================================== + +;; Call this to start the actual server +;; + +(define *db:process-queue-mutex* (make-mutex)) + +(define (server:run hostn) + (debug:print 2 "Attempting to start the server ...") + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") + (exit)))) + (let* (;; (iface (if (string=? "-" hostn) + ;; #f ;; (get-host-name) + ;; hostn)) + (db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily + (hostname (get-host-name)) + (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + #f))) + (if ipstr ipstr hostn))) ;; hostname))) + (start-port (if (args:get-arg "-port") + (string->number (args:get-arg "-port")) + (+ 5000 (random 1001)))) + (link-tree-path (config-lookup *configdat* "setup" "linktree"))) + (set! *cache-on* #t) + (root-path (if link-tree-path + link-tree-path + (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP! + + ;; Setup the web server and a /ctrl interface + ;; + (vhost-map `(((* any) . ,(lambda (continue) + ;; open the db on the first call + (if (not db)(set! db (open-db))) + (let* (($ (request-vars source: 'both)) + (dat ($ 'dat)) + (res #f)) + (cond + ((equal? (uri-path (request-uri (current-request))) + '(/ "hey")) + (send-response body: "hey there!\n" + headers: '((content-type text/plain)))) + ;; This is the /ctrl path where data is handed to the server and + ;; responses + ((equal? (uri-path (request-uri (current-request))) + '(/ "ctrl")) + (let* ((packet (db:string->obj dat)) + (qtype (cdb:packet-get-qtype packet))) + (debug:print-info 12 "server=> received packet=" packet) + (if (not (member qtype '(sync ping))) + (begin + (mutex-lock! *heartbeat-mutex*) + (set! *last-db-access* (current-seconds)) + (mutex-unlock! *heartbeat-mutex*))) + ;; (mutex-lock! *db:process-queue-mutex*) ;; trying a mutex + ;; (set! res (open-run-close db:process-queue-item open-db packet)) + (set! res (db:process-queue-item db packet)) + ;; (mutex-unlock! *db:process-queue-mutex*) + (debug:print-info 11 "Return value from db:process-queue-item is " res) + (send-response body: (conc "ctrl data\n" + res + "") + headers: '((content-type text/plain))))) + (else (continue)))))))) + (server:try-start-server ipaddrstr start-port) + ;; lite3:finalize! db))) + )) + + + +;; (define (server:main-loop) +;; (print "INFO: Exectuing main server loop") +;; (access-log "megatest-http.log") +;; (server-bind-address #f) +;; (define-page (main-page-path) +;; (lambda () +;; (let ((dat ($ "dat"))) +;; ;; (with-request-variables (dat) +;; (debug:print-info 12 "Got dat=" dat) +;; (let* ((packet (db:string->obj dat)) +;; (qtype (cdb:packet-get-qtype packet))) +;; (debug:print-info 12 "server=> received packet=" packet) +;; (if (not (member qtype '(sync ping))) +;; (begin +;; (mutex-lock! *heartbeat-mutex*) +;; (set! *last-db-access* (current-seconds)) +;; (mutex-unlock! *heartbeat-mutex*))) +;; (let ((res (open-run-close db:process-queue-item open-db packet))) +;; (debug:print-info 11 "Return value from db:process-queue-item is " res) +;; res)))))) + +;;; (use spiffy uri-common intarweb) +;;; +;;; (root-path "/var/www") +;;; +;;; (vhost-map `(((* any) . ,(lambda (continue) +;;; (if (equal? (uri-path (request-uri (current-request))) +;;; '(/ "hey")) +;;; (send-response body: "hey there!\n" +;;; headers: '((content-type text/plain))) +;;; (continue)))))) +;;; +;;; (start-server port: 12345) + +;; This is recursively run by server:run until sucessful +;; +(define (server:try-start-server ipaddrstr portnum) + (handle-exceptions + exn + (begin + (print-error-message exn) + (if (< portnum 9000) + (begin + (print "WARNING: failed to start on portnum: " portnum ", trying next port") + (thread-sleep! 0.1) + (open-run-close tasks:remove-server-records tasks:open-db) + (server:try-start-server ipaddrstr (+ portnum 1))) + (print "ERROR: Tried and tried but could not start the server"))) + (set! *runremote* (list ipaddrstr portnum)) + (open-run-close tasks:remove-server-records tasks:open-db) + (open-run-close tasks:server-register + tasks:open-db + (current-process-id) + ipaddrstr portnum 0 'live) + (print "INFO: Trying to start server on " ipaddrstr ":" portnum) + ;; This starts the spiffy server + (start-server port: portnum) + (print "INFO: server has been stopped"))) + +(define (server:mk-signature) + (message-digest-string (md5-primitive) + (with-output-to-string + (lambda () + (write (list (current-directory) + (argv))))))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +;; When using zmq this would send the message back (two step process) +;; with spiffy or rpc this simply returns the return data to be returned +;; +(define (server:reply return-addr query-sig success/fail result) + (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) + ;; (send-message pubsock target send-more: #t) + ;; (send-message pubsock + (db:obj->string (vector success/fail query-sig result))) + +;;====================================================================== +;; C L I E N T S +;;====================================================================== + +(define (server:get-client-signature) + (if *my-client-signature* *my-client-signature* + (let ((sig (server:mk-signature))) + (set! *my-client-signature* sig) + *my-client-signature*))) + +;; +;; +;; 1 Hello, world! Goodbye Dolly +;; Send msg to serverdat and receive result +(define (server:client-send-receive serverdat msg) + (let* ((url (server:make-server-url serverdat)) + (fullurl (conc url "/ctrl")) ;; (conc url "/?dat=" msg))) + (numretries 0)) + (handle-exceptions + exn + (if (< numretries 200) + (server:client-send-receive serverdat msg)) + (begin + (debug:print-info 11 "fullurl=" fullurl "\n") + ;; set up the http-client here + (max-retry-attempts 100) + (retry-request? (lambda (request) + (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10)) + (set! numretries (+ numretries 1)) + #t)) + ;; send the data and get the response + ;; extract the needed info from the http data and + ;; process and return it. + (let* ((res (with-input-from-request fullurl + ;; #f + ;; msg + (list (cons 'dat msg)) + read-string))) + (debug:print-info 11 "got res=" res) + (let ((match (string-search (regexp "(.*)<.body>") res))) + (debug:print-info 11 "match=" match) + (let ((final (cadr match))) + (debug:print-info 11 "final=" final) + final))))))) + +(define (server:client-login serverdat) + (max-retry-attempts 100) + (cdb:login serverdat *toppath* (server:get-client-signature))) + +;; Not currently used! But, I think it *should* be used!!! +(define (server:client-logout serverdat) + (let ((ok (and (socket? serverdat) + (cdb:logout serverdat *toppath* (server:get-client-signature))))) + ;; (close-socket serverdat) + ok)) + +(define (server:client-connect iface port) + (let* ((login-res #f) + (serverdat (list iface port))) + (set! login-res (server:client-login serverdat)) + (if (and (not (null? login-res)) + (car login-res)) + (begin + (debug:print-info 2 "Logged in and connected to " iface ":" port) + (set! *runremote* serverdat) + serverdat) + (begin + (debug:print-info 2 "Failed to login or connect to " iface ":" port) + (set! *runremote* #f) + #f)))) + +;; Do all the connection work, start a server if not already running +(define (server:client-setup #!key (numtries 50)) + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: failed to find megatest.config, exiting") + (exit)))) + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) + (if hostinfo + (let ((host (list-ref hostinfo 0)) + (iface (list-ref hostinfo 1)) + (port (list-ref hostinfo 2)) + (pid (list-ref hostinfo 3))) + (debug:print-info 2 "Setting up to connect to " hostinfo) + (server:client-connect iface port)) ;; ) + (if (> numtries 0) + (let ((exe (car (argv))) + (pid #f)) + (debug:print-info 0 "No server available, attempting to start one...") + (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) + (string-intersperse *verbosity* ",") + (conc *verbosity*))))) + ;; (set! pid (process-fork (lambda () + ;; (current-input-port (open-input-file "/dev/null")) + ;; (current-output-port (open-output-file "/dev/null")) + ;; (current-error-port (open-output-file "/dev/null")) + ;; (server:launch)))) + (let loop ((count 0)) + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) + (if (not hostinfo) + (begin + (debug:print-info 0 "Waiting for server pid=" pid " to start") + (sleep 2) ;; give server time to start + (if (< count 5) + (loop (+ count 1))))))) + ;; we are starting a server, do not try again! That can lead to + ;; recursively starting many processes!!! + (server:client-setup numtries: 0)) + (debug:print-info 1 "Too many attempts, giving up"))))) + +;; run server:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (server:keep-running) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *runremote*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (sleep 4) + (loop)))))) + (iface (car server-info)) + (port (cadr server-info)) + (last-access 0) + (tdb (tasks:open-db)) + (spid (tasks:server-get-server-id tdb #f iface port #f))) + (print "Keep-running got server pid " spid ", using iface " iface " and port " port) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (tasks:server-update-heartbeat tdb spid) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (if (> (+ last-access + ;; (* 50 60 60) ;; 48 hrs + ;; 60 ;; one minute + ;; (* 60 60) ;; one hour + (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (tasks:server-deregister-self tdb (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit))))))) + +;; all routes though here end in exit ... +(define (server:launch) + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: cannot find megatest.config, exiting") + (exit)))) + (debug:print-info 2 "Starting the standalone server") + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) + (debug:print 11 "server:launch hostinfo=" hostinfo) + (if hostinfo + (debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) + (if *toppath* + (let* ((th2 (make-thread (lambda () + (server:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-"))) "Server run")) + (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) + ) + (thread-start! th2) + (thread-start! th3) + (set! *didsomething* #t) + (thread-join! th2) + ) + (debug:print 0 "ERROR: Failed to setup for megatest"))) + (exit))) + +(define (server:client-signal-handler signum) + (handle-exceptions + exn + (debug:print " ... exiting ...") + (let ((th1 (make-thread (lambda () + "") ;; do nothing for now (was flush out last call if applicable) + "eat response")) + (th2 (make-thread (lambda () + (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") + (thread-sleep! 1) ;; give the flush one second to do it's stuff + (debug:print 0 " Done.") + (exit 4)) + "exit on ^C timer"))) + (thread-start! th2) + (thread-start! th1) + (thread-join! th2)))) + +(define (server:client-launch) + (set-signal-handler! signal/int server:client-signal-handler) + (if (server:client-setup) + (debug:print-info 2 "connected as client") + (begin + (debug:print 0 "ERROR: Failed to connect as client") + (exit)))) + Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -185,11 +185,13 @@ ;; (define (server:reply return-addr query-sig success/fail result) (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) ;; (send-message pubsock target send-more: #t) ;; (send-message pubsock - (db:obj->string (vector success/fail query-sig result))) + (case *transport-type* + ((fs) result) + ((http)(db:obj->string (vector success/fail query-sig result))))) ;;====================================================================== ;; C L I E N T S ;;====================================================================== @@ -258,49 +260,58 @@ (begin (debug:print-info 2 "Failed to login or connect to " iface ":" port) (set! *runremote* #f) #f)))) -;; Do all the connection work, start a server if not already running +;; Do all the connection work, look up the transport type and set up the +;; connection if required. +;; (define (server:client-setup #!key (numtries 50)) (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: failed to find megatest.config, exiting") (exit)))) - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (if hostinfo - (let ((host (list-ref hostinfo 0)) - (iface (list-ref hostinfo 1)) - (port (list-ref hostinfo 2)) - (pid (list-ref hostinfo 3))) - (debug:print-info 2 "Setting up to connect to " hostinfo) - (server:client-connect iface port)) ;; ) - (if (> numtries 0) - (let ((exe (car (argv))) - (pid #f)) - (debug:print-info 0 "No server available, attempting to start one...") - (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) - (string-intersperse *verbosity* ",") - (conc *verbosity*))))) - ;; (set! pid (process-fork (lambda () - ;; (current-input-port (open-input-file "/dev/null")) - ;; (current-output-port (open-output-file "/dev/null")) - ;; (current-error-port (open-output-file "/dev/null")) - ;; (server:launch)))) - (let loop ((count 0)) - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (if (not hostinfo) - (begin - (debug:print-info 0 "Waiting for server pid=" pid " to start") - (sleep 2) ;; give server time to start - (if (< count 5) - (loop (+ count 1))))))) - ;; we are starting a server, do not try again! That can lead to - ;; recursively starting many processes!!! - (server:client-setup numtries: 0)) - (debug:print-info 1 "Too many attempts, giving up"))))) + (case *transport-type* + ((fs)(if (not *megatest-db*)(set! *megatest-db* (open-db)))) + ((http) + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) + (if hostinfo + (let ((host (list-ref hostinfo 0)) + (iface (list-ref hostinfo 1)) + (port (list-ref hostinfo 2)) + (pid (list-ref hostinfo 3))) + (debug:print-info 2 "Setting up to connect to " hostinfo) + (server:client-connect iface port)) ;; ) + (begin + (debug:print 0 "ERROR: No server found, exiting") + (exit))))))) + +;; (if (> numtries 0) +;; (let ((exe (car (argv))) +;; (pid #f)) +;; (debug:print-info 0 "No server available, attempting to start one...") +;; (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) +;; (string-intersperse *verbosity* ",") +;; (conc *verbosity*))))) +;; ;; (set! pid (process-fork (lambda () +;; ;; (current-input-port (open-input-file "/dev/null")) +;; ;; (current-output-port (open-output-file "/dev/null")) +;; ;; (current-error-port (open-output-file "/dev/null")) +;; ;; (server:launch)))) +;; (let loop ((count 0)) +;; (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) +;; (if (not hostinfo) +;; (begin +;; (debug:print-info 0 "Waiting for server pid=" pid " to start") +;; (sleep 2) ;; give server time to start +;; (if (< count 5) +;; (loop (+ count 1))))))) +;; ;; we are starting a server, do not try again! That can lead to +;; ;; recursively starting many processes!!! +;; (server:client-setup numtries: 0)) +;; (debug:print-info 1 "Too many attempts, giving up"))))) ;; run server:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (server:keep-running) ADDED zmq-transport.scm Index: zmq-transport.scm ================================================================== --- /dev/null +++ zmq-transport.scm @@ -0,0 +1,412 @@ + +;; Copyright 2006-2012, Matthew Welland. +;; +;; This program is made available under the GNU GPL version 2.0 or +;; greater. See the accompanying file COPYING for details. +;; +;; This program is distributed WITHOUT ANY WARRANTY; without even the +;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +;; PURPOSE. + +(require-extension (srfi 18) extras tcp s11n) + +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) +(import (prefix sqlite3 sqlite3:)) + +(use spiffy uri-common intarweb http-client spiffy-request-vars) + +(tcp-buffer-size 2048) + +(declare (unit server)) + +(declare (uses common)) +(declare (uses db)) +(declare (uses tests)) +(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. + +(include "common_records.scm") +(include "db_records.scm") + +(define (server:make-server-url hostport) + (if (not hostport) + #f + (conc "http://" (car hostport) ":" (cadr hostport)))) + +(define *server-loop-heart-beat* (current-seconds)) +(define *heartbeat-mutex* (make-mutex)) + +;;====================================================================== +;; S E R V E R +;;====================================================================== + +;; Call this to start the actual server +;; + +(define *db:process-queue-mutex* (make-mutex)) + +(define (server:run hostn) + (debug:print 2 "Attempting to start the server ...") + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") + (exit)))) + (let* (;; (iface (if (string=? "-" hostn) + ;; #f ;; (get-host-name) + ;; hostn)) + (db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily + (hostname (get-host-name)) + (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + #f))) + (if ipstr ipstr hostn))) ;; hostname))) + (start-port (if (args:get-arg "-port") + (string->number (args:get-arg "-port")) + (+ 5000 (random 1001)))) + (link-tree-path (config-lookup *configdat* "setup" "linktree"))) + (set! *cache-on* #t) + (root-path (if link-tree-path + link-tree-path + (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP! + + ;; Setup the web server and a /ctrl interface + ;; + (vhost-map `(((* any) . ,(lambda (continue) + ;; open the db on the first call + (if (not db)(set! db (open-db))) + (let* (($ (request-vars source: 'both)) + (dat ($ 'dat)) + (res #f)) + (cond + ((equal? (uri-path (request-uri (current-request))) + '(/ "hey")) + (send-response body: "hey there!\n" + headers: '((content-type text/plain)))) + ;; This is the /ctrl path where data is handed to the server and + ;; responses + ((equal? (uri-path (request-uri (current-request))) + '(/ "ctrl")) + (let* ((packet (db:string->obj dat)) + (qtype (cdb:packet-get-qtype packet))) + (debug:print-info 12 "server=> received packet=" packet) + (if (not (member qtype '(sync ping))) + (begin + (mutex-lock! *heartbeat-mutex*) + (set! *last-db-access* (current-seconds)) + (mutex-unlock! *heartbeat-mutex*))) + ;; (mutex-lock! *db:process-queue-mutex*) ;; trying a mutex + ;; (set! res (open-run-close db:process-queue-item open-db packet)) + (set! res (db:process-queue-item db packet)) + ;; (mutex-unlock! *db:process-queue-mutex*) + (debug:print-info 11 "Return value from db:process-queue-item is " res) + (send-response body: (conc "ctrl data\n" + res + "") + headers: '((content-type text/plain))))) + (else (continue)))))))) + (server:try-start-server ipaddrstr start-port) + ;; lite3:finalize! db))) + )) + + + +;; (define (server:main-loop) +;; (print "INFO: Exectuing main server loop") +;; (access-log "megatest-http.log") +;; (server-bind-address #f) +;; (define-page (main-page-path) +;; (lambda () +;; (let ((dat ($ "dat"))) +;; ;; (with-request-variables (dat) +;; (debug:print-info 12 "Got dat=" dat) +;; (let* ((packet (db:string->obj dat)) +;; (qtype (cdb:packet-get-qtype packet))) +;; (debug:print-info 12 "server=> received packet=" packet) +;; (if (not (member qtype '(sync ping))) +;; (begin +;; (mutex-lock! *heartbeat-mutex*) +;; (set! *last-db-access* (current-seconds)) +;; (mutex-unlock! *heartbeat-mutex*))) +;; (let ((res (open-run-close db:process-queue-item open-db packet))) +;; (debug:print-info 11 "Return value from db:process-queue-item is " res) +;; res)))))) + +;;; (use spiffy uri-common intarweb) +;;; +;;; (root-path "/var/www") +;;; +;;; (vhost-map `(((* any) . ,(lambda (continue) +;;; (if (equal? (uri-path (request-uri (current-request))) +;;; '(/ "hey")) +;;; (send-response body: "hey there!\n" +;;; headers: '((content-type text/plain))) +;;; (continue)))))) +;;; +;;; (start-server port: 12345) + +;; This is recursively run by server:run until sucessful +;; +(define (server:try-start-server ipaddrstr portnum) + (handle-exceptions + exn + (begin + (print-error-message exn) + (if (< portnum 9000) + (begin + (print "WARNING: failed to start on portnum: " portnum ", trying next port") + (thread-sleep! 0.1) + (open-run-close tasks:remove-server-records tasks:open-db) + (server:try-start-server ipaddrstr (+ portnum 1))) + (print "ERROR: Tried and tried but could not start the server"))) + (set! *runremote* (list ipaddrstr portnum)) + (open-run-close tasks:remove-server-records tasks:open-db) + (open-run-close tasks:server-register + tasks:open-db + (current-process-id) + ipaddrstr portnum 0 'live) + (print "INFO: Trying to start server on " ipaddrstr ":" portnum) + ;; This starts the spiffy server + (start-server port: portnum) + (print "INFO: server has been stopped"))) + +(define (server:mk-signature) + (message-digest-string (md5-primitive) + (with-output-to-string + (lambda () + (write (list (current-directory) + (argv))))))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +;; When using zmq this would send the message back (two step process) +;; with spiffy or rpc this simply returns the return data to be returned +;; +(define (server:reply return-addr query-sig success/fail result) + (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) + ;; (send-message pubsock target send-more: #t) + ;; (send-message pubsock + (db:obj->string (vector success/fail query-sig result))) + +;;====================================================================== +;; C L I E N T S +;;====================================================================== + +(define (server:get-client-signature) + (if *my-client-signature* *my-client-signature* + (let ((sig (server:mk-signature))) + (set! *my-client-signature* sig) + *my-client-signature*))) + +;; +;; +;; 1 Hello, world! Goodbye Dolly +;; Send msg to serverdat and receive result +(define (server:client-send-receive serverdat msg) + (let* ((url (server:make-server-url serverdat)) + (fullurl (conc url "/ctrl")) ;; (conc url "/?dat=" msg))) + (numretries 0)) + (handle-exceptions + exn + (if (< numretries 200) + (server:client-send-receive serverdat msg)) + (begin + (debug:print-info 11 "fullurl=" fullurl "\n") + ;; set up the http-client here + (max-retry-attempts 100) + (retry-request? (lambda (request) + (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10)) + (set! numretries (+ numretries 1)) + #t)) + ;; send the data and get the response + ;; extract the needed info from the http data and + ;; process and return it. + (let* ((res (with-input-from-request fullurl + ;; #f + ;; msg + (list (cons 'dat msg)) + read-string))) + (debug:print-info 11 "got res=" res) + (let ((match (string-search (regexp "(.*)<.body>") res))) + (debug:print-info 11 "match=" match) + (let ((final (cadr match))) + (debug:print-info 11 "final=" final) + final))))))) + +(define (server:client-login serverdat) + (max-retry-attempts 100) + (cdb:login serverdat *toppath* (server:get-client-signature))) + +;; Not currently used! But, I think it *should* be used!!! +(define (server:client-logout serverdat) + (let ((ok (and (socket? serverdat) + (cdb:logout serverdat *toppath* (server:get-client-signature))))) + ;; (close-socket serverdat) + ok)) + +(define (server:client-connect iface port) + (let* ((login-res #f) + (serverdat (list iface port))) + (set! login-res (server:client-login serverdat)) + (if (and (not (null? login-res)) + (car login-res)) + (begin + (debug:print-info 2 "Logged in and connected to " iface ":" port) + (set! *runremote* serverdat) + serverdat) + (begin + (debug:print-info 2 "Failed to login or connect to " iface ":" port) + (set! *runremote* #f) + #f)))) + +;; Do all the connection work, start a server if not already running +(define (server:client-setup #!key (numtries 50)) + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: failed to find megatest.config, exiting") + (exit)))) + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) + (if hostinfo + (let ((host (list-ref hostinfo 0)) + (iface (list-ref hostinfo 1)) + (port (list-ref hostinfo 2)) + (pid (list-ref hostinfo 3))) + (debug:print-info 2 "Setting up to connect to " hostinfo) + (server:client-connect iface port)) ;; ) + (if (> numtries 0) + (let ((exe (car (argv))) + (pid #f)) + (debug:print-info 0 "No server available, attempting to start one...") + (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) + (string-intersperse *verbosity* ",") + (conc *verbosity*))))) + ;; (set! pid (process-fork (lambda () + ;; (current-input-port (open-input-file "/dev/null")) + ;; (current-output-port (open-output-file "/dev/null")) + ;; (current-error-port (open-output-file "/dev/null")) + ;; (server:launch)))) + (let loop ((count 0)) + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) + (if (not hostinfo) + (begin + (debug:print-info 0 "Waiting for server pid=" pid " to start") + (sleep 2) ;; give server time to start + (if (< count 5) + (loop (+ count 1))))))) + ;; we are starting a server, do not try again! That can lead to + ;; recursively starting many processes!!! + (server:client-setup numtries: 0)) + (debug:print-info 1 "Too many attempts, giving up"))))) + +;; run server:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (server:keep-running) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *runremote*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (sleep 4) + (loop)))))) + (iface (car server-info)) + (port (cadr server-info)) + (last-access 0) + (tdb (tasks:open-db)) + (spid (tasks:server-get-server-id tdb #f iface port #f))) + (print "Keep-running got server pid " spid ", using iface " iface " and port " port) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (tasks:server-update-heartbeat tdb spid) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (if (> (+ last-access + ;; (* 50 60 60) ;; 48 hrs + ;; 60 ;; one minute + ;; (* 60 60) ;; one hour + (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (tasks:server-deregister-self tdb (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit))))))) + +;; all routes though here end in exit ... +(define (server:launch) + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: cannot find megatest.config, exiting") + (exit)))) + (debug:print-info 2 "Starting the standalone server") + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) + (debug:print 11 "server:launch hostinfo=" hostinfo) + (if hostinfo + (debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) + (if *toppath* + (let* ((th2 (make-thread (lambda () + (server:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-"))) "Server run")) + (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) + ) + (thread-start! th2) + (thread-start! th3) + (set! *didsomething* #t) + (thread-join! th2) + ) + (debug:print 0 "ERROR: Failed to setup for megatest"))) + (exit))) + +(define (server:client-signal-handler signum) + (handle-exceptions + exn + (debug:print " ... exiting ...") + (let ((th1 (make-thread (lambda () + "") ;; do nothing for now (was flush out last call if applicable) + "eat response")) + (th2 (make-thread (lambda () + (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") + (thread-sleep! 1) ;; give the flush one second to do it's stuff + (debug:print 0 " Done.") + (exit 4)) + "exit on ^C timer"))) + (thread-start! th2) + (thread-start! th1) + (thread-join! th2)))) + +(define (server:client-launch) + (set-signal-handler! signal/int server:client-signal-handler) + (if (server:client-setup) + (debug:print-info 2 "connected as client") + (begin + (debug:print 0 "ERROR: Failed to connect as client") + (exit)))) +