@@ -6,207 +6,408 @@ ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. -(require-extension (srfi 18) extras tcp rpc) -(import (prefix rpc rpc:)) +(require-extension (srfi 18) extras tcp s11n) -(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo) +(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) (import (prefix sqlite3 sqlite3:)) + +(use spiffy uri-common intarweb http-client spiffy-request-vars) + +(tcp-buffer-size 2048) (declare (unit server)) (declare (uses common)) (declare (uses db)) (declare (uses tests)) +(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (include "common_records.scm") (include "db_records.scm") -;; procstr is the name of the procedure to be called as a string -(define (server:autoremote procstr params) - (handle-exceptions - exn - (begin - (debug:print 1 "Remote failed for " proc " " params) - (apply (eval (string->symbol procstr)) params)) - ;; (if *runremote* - ;; (apply (eval (string->symbol (conc "remote:" procstr))) params) - (apply (eval (string->symbol procstr)) params))) - -(define (server:start db hostn) - (debug:print 0 "Attempting to start the server ...") - (let ((host:port (db:get-var db "SERVER"))) ;; do whe already have a server running? - (if host:port - (set! *runremote* (let* ((lst (string-split host:port ":")) - (port (if (> (length lst) 1) - (string->number (cadr lst)) - #f))) - (if port (vector (car lst) port) #f))) - (let* ((rpc:listener (server:find-free-port-and-open (rpc:default-server-port))) - (th1 (make-thread - (cute (rpc:make-server rpc:listener) "rpc:server") - 'rpc:server)) - (th2 (make-thread (lambda ()(db:updater)))) - (hostname (if (string=? "-" hostn) - (get-host-name) - hostn)) - (ipaddrstr (if (string=? "-" hostn) - (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") - #f)) - (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" (rpc:default-server-port)))) - (debug:print 0 "Server started on " host:port) - (db:set-var db "SERVER" host:port) - (set! *cache-on* #t) - - ;; can use this to run most anything at the remote - (rpc:publish-procedure! - 'remote:run - (lambda (procstr . params) - (server:autoremote procstr params))) - - (rpc:publish-procedure! - 'server:login - (lambda (toppath) - (set! *last-db-access* (current-seconds)) - (if (equal? *toppath* toppath) - (begin - (debug:print 2 "INFO: login successful") - #t) - #f))) - - ;;====================================================================== - ;; db specials here - ;;====================================================================== - ;; remote call to open-run-close - (rpc:publish-procedure! - 'rdb:open-run-close - (lambda (procname . remargs) - (debug:print 4 "INFO: Remote call of rdb:open-run-close " procname " " remargs) - (set! *last-db-access* (current-seconds)) - (apply open-run-close (eval procname) remargs))) - - (rpc:publish-procedure! - 'cdb:test-set-status-state - (lambda (test-id status state msg) - (debug:print 4 "INFO: Remote call of cdb:test-set-status-state test-id=" test-id ", status=" status ", state=" state ", msg=" msg) - (cdb:test-set-status-state test-id status state msg))) - - (rpc:publish-procedure! - 'cdb:test-rollup-test_data-pass-fail - (lambda (test-id) - (debug:print 4 "INFO: Remote call of cdb:test-rollup-test_data-pass-fail " test-id) - (cdb:test-rollup-test_data-pass-fail test-id))) - - (rpc:publish-procedure! - 'cdb:pass-fail-counts - (lambda (test-id fail-count pass-count) - (debug:print 4 "INFO: Remote call of cdb:pass-fail-counts " test-id " passes: " pass-count " fails: " fail-count) - (cdb:pass-fail-counts test-id fail-count pass-count))) - - (rpc:publish-procedure! - 'cdb:tests-register-test - (lambda (db run-id test-name item-path) - (debug:print 4 "INFO: Remote call of cdb:tests-register-test " run-id " testname: " test-name " item-path: " item-path) - (cdb:tests-register-test db run-id test-name item-path))) - - (rpc:publish-procedure! - 'cdb:flush-queue - (lambda () - (debug:print 4 "INFO: Remote call of cdb:flush-queue") - (cdb:flush-queue))) - - ;;====================================================================== - ;; end of publish-procedure section - ;;====================================================================== - - (set! *rpc:listener* rpc:listener) - (on-exit (lambda () - (open-run-close - (lambda (db . params) - (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER' and val=?;" host:port)) - #f ;; for db - #f) ;; for a param - (let loop ((n 0)) - (let ((queue-len 0)) - (thread-sleep! (random 5)) - (mutex-lock! *incoming-mutex*) - (set! queue-len (length *incoming-data*)) - (mutex-unlock! *incoming-mutex*) - (if (> queue-len 0) - (begin - (debug:print 0 "INFO: Queue not flushed, waiting ...") - (loop (+ n 1))))) - ))) - (thread-start! th1) - ;; (debug:print 0 "Server started on port " (rpc:default-server-port) "...") - (thread-start! th2) - ;; (thread-join! th2) - ;; return th2 for the calling process to do a join with - th2 - )))) ;; rpc:server))) - -(define (server:keep-running db host:port) - ;; if none running or if > 20 seconds since - ;; server last used then start shutdown - (let loop ((count 0)) - (thread-sleep! 20) ;; no need to do this very often - (let ((numrunning (db:get-count-tests-running db))) - (if (or (> numrunning 0) - (> (+ *last-db-access* 60)(current-seconds))) - (begin - (debug:print 0 "INFO: Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*)) - (loop (+ 1 count))) - (begin - (debug:print 0 "INFO: Starting to shutdown the server side") - ;; need to delete only *my* server entry (future use) - (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER' AND val like ?;" host:port) - (thread-sleep! 10) - (debug:print 0 "INFO: Max cached queries was " *max-cache-size*) - (debug:print 0 "INFO: Server shutdown complete. Exiting") - ;; (exit))) - ))))) - -(define (server:find-free-port-and-open port) - (handle-exceptions - exn - (begin - (print "Failed to bind to port " (rpc:default-server-port) ", trying next port") - (server:find-free-port-and-open (+ port 1))) - (rpc:default-server-port port) - (tcp-read-timeout 240000) - (tcp-listen (rpc:default-server-port) 10000))) - -(define (server:client-setup) - (if *runremote* - (begin - (debug:print 0 "ERROR: Attempt to connect to server but already connected") - #f) - (let* ((hostinfo (open-run-close db:get-var #f "SERVER")) - (hostdat (if hostinfo (string-split hostinfo ":") #f)) - (host (if hostinfo (car hostdat) #f)) - (port (if (and hostinfo (> (length hostdat) 1))(cadr hostdat) #f))) - (if (and port - (string->number port)) - (let ((portn (string->number port))) - (debug:print 2 "INFO: Setting up to connect to host " host ":" port) - (handle-exceptions - exn - (begin - (debug:print 0 "ERROR: Failed to open a connection to the server at host: " host " port: " port) - (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) - ;; (open-run-close - ;; (lambda (db . param) - ;; (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER'")) - ;; #f) - (set! *runremote* #f)) - (if (and (not (args:get-arg "-server")) ;; no point in the server using the server using the server - ((rpc:procedure 'server:login host portn) *toppath*)) - (begin - (debug:print 2 "INFO: Logged in and connected to " host ":" port) - (set! *runremote* (vector host portn))) - (begin - (debug:print 2 "INFO: Failed to login or connect to " host ":" port) - (set! *runremote* #f))))) - (debug:print 2 "INFO: no server available"))))) +(define (server:make-server-url hostport) + (if (not hostport) + #f + (conc "http://" (car hostport) ":" (cadr hostport)))) + +(define *server-loop-heart-beat* (current-seconds)) +(define *heartbeat-mutex* (make-mutex)) + (debug:print 0 "Server started on " host:port) + +;;====================================================================== +;; S E R V E R +;;====================================================================== + +;; Call this to start the actual server +;; + +(define *db:process-queue-mutex* (make-mutex)) + +(define (server:run hostn) + (debug:print 2 "Attempting to start the server ...") + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") + (exit)))) + (let* (;; (iface (if (string=? "-" hostn) + ;; #f ;; (get-host-name) + ;; hostn)) + (db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily + (hostname (get-host-name)) + (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + #f))) + (if ipstr ipstr hostn))) ;; hostname))) + (start-port (if (args:get-arg "-port") + (string->number (args:get-arg "-port")) + (+ 5000 (random 1001)))) + (link-tree-path (config-lookup *configdat* "setup" "linktree"))) + (set! *cache-on* #t) + (root-path (if link-tree-path + link-tree-path + (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP! + + ;; Setup the web server and a /ctrl interface + ;; + (vhost-map `(((* any) . ,(lambda (continue) + ;; open the db on the first call + (if (not db)(set! db (open-db))) + (let* (($ (request-vars source: 'both)) + (dat ($ 'dat)) + (res #f)) + (cond + ((equal? (uri-path (request-uri (current-request))) + '(/ "hey")) + (send-response body: "hey there!\n" + headers: '((content-type text/plain)))) + ;; This is the /ctrl path where data is handed to the server and + ;; responses + ((equal? (uri-path (request-uri (current-request))) + '(/ "ctrl")) + (let* ((packet (db:string->obj dat)) + (qtype (cdb:packet-get-qtype packet))) + (debug:print-info 12 "server=> received packet=" packet) + (if (not (member qtype '(sync ping))) + (begin + (mutex-lock! *heartbeat-mutex*) + (set! *last-db-access* (current-seconds)) + (mutex-unlock! *heartbeat-mutex*))) + ;; (mutex-lock! *db:process-queue-mutex*) ;; trying a mutex + ;; (set! res (open-run-close db:process-queue-item open-db packet)) + (set! res (db:process-queue-item db packet)) + ;; (mutex-unlock! *db:process-queue-mutex*) + (debug:print-info 11 "Return value from db:process-queue-item is " res) + (send-response body: (conc "ctrl data\n" + res + "") + headers: '((content-type text/plain))))) + (else (continue)))))))) + (server:try-start-server ipaddrstr start-port) + ;; lite3:finalize! db))) + )) + + + +;; (define (server:main-loop) +;; (print "INFO: Exectuing main server loop") +;; (access-log "megatest-http.log") +;; (server-bind-address #f) +;; (define-page (main-page-path) +;; (lambda () +;; (let ((dat ($ "dat"))) +;; ;; (with-request-variables (dat) +;; (debug:print-info 12 "Got dat=" dat) +;; (let* ((packet (db:string->obj dat)) +;; (qtype (cdb:packet-get-qtype packet))) +;; (debug:print-info 12 "server=> received packet=" packet) +;; (if (not (member qtype '(sync ping))) +;; (begin +;; (mutex-lock! *heartbeat-mutex*) +;; (set! *last-db-access* (current-seconds)) +;; (mutex-unlock! *heartbeat-mutex*))) +;; (let ((res (open-run-close db:process-queue-item open-db packet))) +;; (debug:print-info 11 "Return value from db:process-queue-item is " res) +;; res)))))) + +;;; (use spiffy uri-common intarweb) +;;; +;;; (root-path "/var/www") +;;; +;;; (vhost-map `(((* any) . ,(lambda (continue) +;;; (if (equal? (uri-path (request-uri (current-request))) +;;; '(/ "hey")) +;;; (send-response body: "hey there!\n" +;;; headers: '((content-type text/plain))) +;;; (continue)))))) +;;; +;;; (start-server port: 12345) + +;; This is recursively run by server:run until sucessful +;; +(define (server:try-start-server ipaddrstr portnum) + (handle-exceptions + exn + (begin + (print-error-message exn) + (if (< portnum 9000) + (begin + (print "WARNING: failed to start on portnum: " portnum ", trying next port") + (thread-sleep! 0.1) + (open-run-close tasks:remove-server-records tasks:open-db) + (server:try-start-server ipaddrstr (+ portnum 1))) + (print "ERROR: Tried and tried but could not start the server"))) + (set! *runremote* (list ipaddrstr portnum)) + (open-run-close tasks:remove-server-records tasks:open-db) + (open-run-close tasks:server-register + tasks:open-db + (current-process-id) + ipaddrstr portnum 0 'live) + (print "INFO: Trying to start server on " ipaddrstr ":" portnum) + ;; This starts the spiffy server + (start-server port: portnum) + (print "INFO: server has been stopped"))) + +(define (server:mk-signature) + (message-digest-string (md5-primitive) + (with-output-to-string + (lambda () + (write (list (current-directory) + (argv))))))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +;; When using zmq this would send the message back (two step process) +;; with spiffy or rpc this simply returns the return data to be returned +;; +(define (server:reply return-addr query-sig success/fail result) + (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) + ;; (send-message pubsock target send-more: #t) + ;; (send-message pubsock + (db:obj->string (vector success/fail query-sig result))) + +;;====================================================================== +;; C L I E N T S +;;====================================================================== + +(define (server:get-client-signature) + (if *my-client-signature* *my-client-signature* + (let ((sig (server:mk-signature))) + (set! *my-client-signature* sig) + *my-client-signature*))) + +;; +;; +;; 1 Hello, world! Goodbye Dolly +;; Send msg to serverdat and receive result +(define (server:client-send-receive serverdat msg) + (let* ((url (server:make-server-url serverdat)) + (fullurl (conc url "/ctrl")) ;; (conc url "/?dat=" msg))) + (numretries 0)) + (handle-exceptions + exn + (if (< numretries 200) + (server:client-send-receive serverdat msg)) + (begin + (debug:print-info 11 "fullurl=" fullurl "\n") + ;; set up the http-client here + (max-retry-attempts 100) + (retry-request? (lambda (request) + (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10)) + (set! numretries (+ numretries 1)) + #t)) + ;; send the data and get the response + ;; extract the needed info from the http data and + ;; process and return it. + (let* ((res (with-input-from-request fullurl + ;; #f + ;; msg + (list (cons 'dat msg)) + read-string))) + (debug:print-info 11 "got res=" res) + (let ((match (string-search (regexp "(.*)<.body>") res))) + (debug:print-info 11 "match=" match) + (let ((final (cadr match))) + (debug:print-info 11 "final=" final) + final))))))) + +(define (server:client-login serverdat) + (max-retry-attempts 100) + (cdb:login serverdat *toppath* (server:get-client-signature))) + +;; Not currently used! But, I think it *should* be used!!! +(define (server:client-logout serverdat) + (let ((ok (and (socket? serverdat) + (cdb:logout serverdat *toppath* (server:get-client-signature))))) + ;; (close-socket serverdat) + ok)) + +(define (server:client-connect iface port) + (let* ((login-res #f) + (serverdat (list iface port))) + (set! login-res (server:client-login serverdat)) + (if (and (not (null? login-res)) + (car login-res)) + (begin + (debug:print-info 2 "Logged in and connected to " iface ":" port) + (set! *runremote* serverdat) + serverdat) + (begin + (debug:print-info 2 "Failed to login or connect to " iface ":" port) + (set! *runremote* #f) + #f)))) + +;; Do all the connection work, start a server if not already running +(define (server:client-setup #!key (numtries 50)) + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: failed to find megatest.config, exiting") + (exit)))) + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) + (if hostinfo + (let ((host (list-ref hostinfo 0)) + (iface (list-ref hostinfo 1)) + (port (list-ref hostinfo 2)) + (pid (list-ref hostinfo 3))) + (debug:print-info 2 "Setting up to connect to " hostinfo) + (server:client-connect iface port)) ;; ) + (if (> numtries 0) + (let ((exe (car (argv))) + (pid #f)) + (debug:print-info 0 "No server available, attempting to start one...") + (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) + (string-intersperse *verbosity* ",") + (conc *verbosity*))))) + ;; (set! pid (process-fork (lambda () + ;; (current-input-port (open-input-file "/dev/null")) + ;; (current-output-port (open-output-file "/dev/null")) + ;; (current-error-port (open-output-file "/dev/null")) + ;; (server:launch)))) + (let loop ((count 0)) + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) + (if (not hostinfo) + (begin + (debug:print-info 0 "Waiting for server pid=" pid " to start") + (sleep 2) ;; give server time to start + (if (< count 5) + (loop (+ count 1))))))) + ;; we are starting a server, do not try again! That can lead to + ;; recursively starting many processes!!! + (server:client-setup numtries: 0)) + (debug:print-info 1 "Too many attempts, giving up"))))) + +;; run server:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (server:keep-running) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *runremote*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (sleep 4) + (loop)))))) + (iface (car server-info)) + (port (cadr server-info)) + (last-access 0) + (tdb (tasks:open-db)) + (spid (tasks:server-get-server-id tdb #f iface port #f))) + (print "Keep-running got server pid " spid ", using iface " iface " and port " port) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (tasks:server-update-heartbeat tdb spid) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (if (> (+ last-access + ;; (* 50 60 60) ;; 48 hrs + ;; 60 ;; one minute + ;; (* 60 60) ;; one hour + (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (tasks:server-deregister-self tdb (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit))))))) + +;; all routes though here end in exit ... +(define (server:launch) + (if (not *toppath*) + (if (not (setup-for-run)) + (begin + (debug:print 0 "ERROR: cannot find megatest.config, exiting") + (exit)))) + (debug:print-info 2 "Starting the standalone server") + (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) + (debug:print 11 "server:launch hostinfo=" hostinfo) + (if hostinfo + (debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) + (if *toppath* + (let* ((th2 (make-thread (lambda () + (server:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-"))) "Server run")) + (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) + ) + (thread-start! th2) + (thread-start! th3) + (set! *didsomething* #t) + (thread-join! th2) + ) + (debug:print 0 "ERROR: Failed to setup for megatest"))) + (exit))) + +(define (server:client-signal-handler signum) + (handle-exceptions + exn + (debug:print " ... exiting ...") + (let ((th1 (make-thread (lambda () + "") ;; do nothing for now (was flush out last call if applicable) + "eat response")) + (th2 (make-thread (lambda () + (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") + (thread-sleep! 1) ;; give the flush one second to do it's stuff + (debug:print 0 " Done.") + (exit 4)) + "exit on ^C timer"))) + (thread-start! th2) + (thread-start! th1) + (thread-join! th2)))) + +(define (server:client-launch) + (set-signal-handler! signal/int server:client-signal-handler) + (if (server:client-setup) + (debug:print-info 2 "connected as client") + (begin + (debug:print 0 "ERROR: Failed to connect as client") + (exit))))