@@ -6,19 +6,16 @@ ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. -(require-extension (srfi 18) extras tcp s11n) +(require-extension (srfi 18) extras tcp s11n rpc) +(import (prefix rpc rpc:)) (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) (import (prefix sqlite3 sqlite3:)) -(use spiffy uri-common intarweb http-client spiffy-request-vars) - -(tcp-buffer-size 2048) - (declare (unit server)) (declare (uses common)) (declare (uses db)) (declare (uses tests)) @@ -25,388 +22,242 @@ (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (include "common_records.scm") (include "db_records.scm") -(define (server:make-server-url hostport) - (if (not hostport) - #f - (conc "http://" (car hostport) ":" (cadr hostport)))) - -(define *server-loop-heart-beat* (current-seconds)) -(define *heartbeat-mutex* (make-mutex)) - -;;====================================================================== -;; S E R V E R -;;====================================================================== - -;; Call this to start the actual server -;; - -(define *db:process-queue-mutex* (make-mutex)) - -(define (server:run hostn) - (debug:print 2 "Attempting to start the server ...") - (if (not *toppath*) - (if (not (setup-for-run)) - (begin - (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") - (exit)))) - (let* (;; (iface (if (string=? "-" hostn) - ;; #f ;; (get-host-name) - ;; hostn)) - (db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily - (hostname (get-host-name)) - (ipaddrstr (let ((ipstr (if (string=? "-" hostn) - (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") - #f))) - (if ipstr ipstr hostn))) ;; hostname))) - (start-port (if (args:get-arg "-port") - (string->number (args:get-arg "-port")) - (+ 5000 (random 1001)))) - (link-tree-path (config-lookup *configdat* "setup" "linktree"))) - (set! *cache-on* #t) - (root-path (if link-tree-path - link-tree-path - (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP! - - ;; Setup the web server and a /ctrl interface - ;; - (vhost-map `(((* any) . ,(lambda (continue) - ;; open the db on the first call - (if (not db)(set! db (open-db))) - (let* (($ (request-vars source: 'both)) - (dat ($ 'dat)) - (res #f)) - (cond - ((equal? (uri-path (request-uri (current-request))) - '(/ "hey")) - (send-response body: "hey there!\n" - headers: '((content-type text/plain)))) - ;; This is the /ctrl path where data is handed to the server and - ;; responses - ((equal? (uri-path (request-uri (current-request))) - '(/ "ctrl")) - (let* ((packet (db:string->obj dat)) - (qtype (cdb:packet-get-qtype packet))) - (debug:print-info 12 "server=> received packet=" packet) - (if (not (member qtype '(sync ping))) - (begin - (mutex-lock! *heartbeat-mutex*) - (set! *last-db-access* (current-seconds)) - (mutex-unlock! *heartbeat-mutex*))) - ;; (mutex-lock! *db:process-queue-mutex*) ;; trying a mutex - ;; (set! res (open-run-close db:process-queue-item open-db packet)) - (set! res (db:process-queue-item db packet)) - ;; (mutex-unlock! *db:process-queue-mutex*) - (debug:print-info 11 "Return value from db:process-queue-item is " res) - (send-response body: (conc "ctrl data\n" - res - "") - headers: '((content-type text/plain))))) - (else (continue)))))))) - (server:try-start-server ipaddrstr start-port) - ;; lite3:finalize! db))) - )) - - - -;; (define (server:main-loop) -;; (print "INFO: Exectuing main server loop") -;; (access-log "megatest-http.log") -;; (server-bind-address #f) -;; (define-page (main-page-path) -;; (lambda () -;; (let ((dat ($ "dat"))) -;; ;; (with-request-variables (dat) -;; (debug:print-info 12 "Got dat=" dat) -;; (let* ((packet (db:string->obj dat)) -;; (qtype (cdb:packet-get-qtype packet))) -;; (debug:print-info 12 "server=> received packet=" packet) -;; (if (not (member qtype '(sync ping))) -;; (begin -;; (mutex-lock! *heartbeat-mutex*) -;; (set! *last-db-access* (current-seconds)) -;; (mutex-unlock! *heartbeat-mutex*))) -;; (let ((res (open-run-close db:process-queue-item open-db packet))) -;; (debug:print-info 11 "Return value from db:process-queue-item is " res) -;; res)))))) - -;;; (use spiffy uri-common intarweb) -;;; -;;; (root-path "/var/www") -;;; -;;; (vhost-map `(((* any) . ,(lambda (continue) -;;; (if (equal? (uri-path (request-uri (current-request))) -;;; '(/ "hey")) -;;; (send-response body: "hey there!\n" -;;; headers: '((content-type text/plain))) -;;; (continue)))))) -;;; -;;; (start-server port: 12345) - -;; This is recursively run by server:run until sucessful -;; -(define (server:try-start-server ipaddrstr portnum) - (handle-exceptions - exn - (begin - (print-error-message exn) - (if (< portnum 9000) - (begin - (print "WARNING: failed to start on portnum: " portnum ", trying next port") - (thread-sleep! 0.1) - (open-run-close tasks:remove-server-records tasks:open-db) - (server:try-start-server ipaddrstr (+ portnum 1))) - (print "ERROR: Tried and tried but could not start the server"))) - (set! *runremote* (list ipaddrstr portnum)) - (open-run-close tasks:remove-server-records tasks:open-db) - (open-run-close tasks:server-register - tasks:open-db - (current-process-id) - ipaddrstr portnum 0 'live) - (print "INFO: Trying to start server on " ipaddrstr ":" portnum) - ;; This starts the spiffy server - (start-server port: portnum) - (print "INFO: server has been stopped"))) - -(define (server:mk-signature) - (message-digest-string (md5-primitive) - (with-output-to-string - (lambda () - (write (list (current-directory) - (argv))))))) - -;;====================================================================== -;; S E R V E R U T I L I T I E S -;;====================================================================== - -;; When using zmq this would send the message back (two step process) -;; with spiffy or rpc this simply returns the return data to be returned -;; -(define (server:reply return-addr query-sig success/fail result) - (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) - ;; (send-message pubsock target send-more: #t) - ;; (send-message pubsock - (db:obj->string (vector success/fail query-sig result))) - -;;====================================================================== -;; C L I E N T S -;;====================================================================== - -(define (server:get-client-signature) - (if *my-client-signature* *my-client-signature* - (let ((sig (server:mk-signature))) - (set! *my-client-signature* sig) - *my-client-signature*))) - -;; -;; -;; 1 Hello, world! Goodbye Dolly -;; Send msg to serverdat and receive result -(define (server:client-send-receive serverdat msg) - (let* ((url (server:make-server-url serverdat)) - (fullurl (conc url "/ctrl")) ;; (conc url "/?dat=" msg))) - (numretries 0)) - (handle-exceptions - exn - (if (< numretries 200) - (server:client-send-receive serverdat msg)) - (begin - (debug:print-info 11 "fullurl=" fullurl "\n") - ;; set up the http-client here - (max-retry-attempts 100) - (retry-request? (lambda (request) - (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10)) - (set! numretries (+ numretries 1)) - #t)) - ;; send the data and get the response - ;; extract the needed info from the http data and - ;; process and return it. - (let* ((res (with-input-from-request fullurl - ;; #f - ;; msg - (list (cons 'dat msg)) - read-string))) - (debug:print-info 11 "got res=" res) - (let ((match (string-search (regexp "(.*)<.body>") res))) - (debug:print-info 11 "match=" match) - (let ((final (cadr match))) - (debug:print-info 11 "final=" final) - final))))))) - -(define (server:client-login serverdat) - (max-retry-attempts 100) - (cdb:login serverdat *toppath* (server:get-client-signature))) - -;; Not currently used! But, I think it *should* be used!!! -(define (server:client-logout serverdat) - (let ((ok (and (socket? serverdat) - (cdb:logout serverdat *toppath* (server:get-client-signature))))) - ;; (close-socket serverdat) - ok)) - -(define (server:client-connect iface port) - (let* ((login-res #f) - (serverdat (list iface port))) - (set! login-res (server:client-login serverdat)) - (if (and (not (null? login-res)) - (car login-res)) - (begin - (debug:print-info 2 "Logged in and connected to " iface ":" port) - (set! *runremote* serverdat) - serverdat) - (begin - (debug:print-info 2 "Failed to login or connect to " iface ":" port) - (set! *runremote* #f) - #f)))) - -;; Do all the connection work, start a server if not already running -(define (server:client-setup #!key (numtries 50)) - (if (not *toppath*) - (if (not (setup-for-run)) - (begin - (debug:print 0 "ERROR: failed to find megatest.config, exiting") - (exit)))) - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (if hostinfo - (let ((host (list-ref hostinfo 0)) - (iface (list-ref hostinfo 1)) - (port (list-ref hostinfo 2)) - (pid (list-ref hostinfo 3))) - (debug:print-info 2 "Setting up to connect to " hostinfo) - (server:client-connect iface port)) ;; ) - (if (> numtries 0) - (let ((exe (car (argv))) - (pid #f)) - (debug:print-info 0 "No server available, attempting to start one...") - (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) - (string-intersperse *verbosity* ",") - (conc *verbosity*))))) - ;; (set! pid (process-fork (lambda () - ;; (current-input-port (open-input-file "/dev/null")) - ;; (current-output-port (open-output-file "/dev/null")) - ;; (current-error-port (open-output-file "/dev/null")) - ;; (server:launch)))) - (let loop ((count 0)) - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (if (not hostinfo) - (begin - (debug:print-info 0 "Waiting for server pid=" pid " to start") - (sleep 2) ;; give server time to start - (if (< count 5) - (loop (+ count 1))))))) - ;; we are starting a server, do not try again! That can lead to - ;; recursively starting many processes!!! - (server:client-setup numtries: 0)) - (debug:print-info 1 "Too many attempts, giving up"))))) - -;; run server:keep-running in a parallel thread to monitor that the db is being -;; used and to shutdown after sometime if it is not. -;; -(define (server:keep-running) - ;; if none running or if > 20 seconds since - ;; server last used then start shutdown - ;; This thread waits for the server to come alive - (let* ((server-info (let loop () - (let ((sdat #f)) - (mutex-lock! *heartbeat-mutex*) - (set! sdat *runremote*) - (mutex-unlock! *heartbeat-mutex*) - (if sdat sdat - (begin - (sleep 4) - (loop)))))) - (iface (car server-info)) - (port (cadr server-info)) - (last-access 0) - (tdb (tasks:open-db)) - (spid (tasks:server-get-server-id tdb #f iface port #f))) - (print "Keep-running got server pid " spid ", using iface " iface " and port " port) - (let loop ((count 0)) - (thread-sleep! 4) ;; no need to do this very often - ;; NB// sync currently does NOT return queue-length - (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) - ;; (print "Server running, count is " count) - (if (< count 1) ;; 3x3 = 9 secs aprox - (loop (+ count 1))) - - ;; NOTE: Get rid of this mechanism! It really is not needed... - (tasks:server-update-heartbeat tdb spid) - - ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access - (mutex-lock! *heartbeat-mutex*) - (set! last-access *last-db-access*) - (mutex-unlock! *heartbeat-mutex*) - (if (> (+ last-access - ;; (* 50 60 60) ;; 48 hrs - ;; 60 ;; one minute - ;; (* 60 60) ;; one hour - (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. - ) - (current-seconds)) - (begin - (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) - (loop 0)) - (begin - (debug:print-info 0 "Starting to shutdown the server.") - ;; need to delete only *my* server entry (future use) - (set! *time-to-exit* #t) - (tasks:server-deregister-self tdb (get-host-name)) - (thread-sleep! 1) - (debug:print-info 0 "Max cached queries was " *max-cache-size*) - (debug:print-info 0 "Server shutdown complete. Exiting") - (exit))))))) - -;; all routes though here end in exit ... -(define (server:launch) - (if (not *toppath*) - (if (not (setup-for-run)) - (begin - (debug:print 0 "ERROR: cannot find megatest.config, exiting") - (exit)))) - (debug:print-info 2 "Starting the standalone server") - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (debug:print 11 "server:launch hostinfo=" hostinfo) - (if hostinfo - (debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) - (if *toppath* - (let* ((th2 (make-thread (lambda () - (server:run - (if (args:get-arg "-server") - (args:get-arg "-server") - "-"))) "Server run")) - (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) - ) - (thread-start! th2) - (thread-start! th3) - (set! *didsomething* #t) - (thread-join! th2) - ) - (debug:print 0 "ERROR: Failed to setup for megatest"))) - (exit))) - -(define (server:client-signal-handler signum) - (handle-exceptions - exn - (debug:print " ... exiting ...") - (let ((th1 (make-thread (lambda () - "") ;; do nothing for now (was flush out last call if applicable) - "eat response")) - (th2 (make-thread (lambda () - (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") - (thread-sleep! 1) ;; give the flush one second to do it's stuff - (debug:print 0 " Done.") - (exit 4)) - "exit on ^C timer"))) - (thread-start! th2) - (thread-start! th1) - (thread-join! th2)))) - -(define (server:client-launch) - (set-signal-handler! signal/int server:client-signal-handler) - (if (server:client-setup) - (debug:print-info 2 "connected as client") - (begin - (debug:print 0 "ERROR: Failed to connect as client") - (exit)))) +;; procstr is the name of the procedure to be called as a string +(define (rpc-transport:autoremote procstr params) + (handle-exceptions + exn + (begin + (debug:print 1 "Remote failed for " proc " " params) + (apply (eval (string->symbol procstr)) params)) + ;; (if *runremote* + ;; (apply (eval (string->symbol (conc "remote:" procstr))) params) + (apply (eval (string->symbol procstr)) params))) + +;; all routes though here end in exit ... +;; +;; start_server? +;; +(define (rpc-transport:launch run-id) + (set! *run-id* run-id) + (if (args:get-arg "-daemonize") + (daemon:ize)) + (if (server:check-if-running run-id) + (begin + (debug:print 0 "INFO: Server for run-id " run-id " already running") + (exit 0))) + (let loop ((server-id (open-run-close tasks:server-lock-slot tasks:open-db run-id)) + (remtries 4)) + (if (not server-id) + (if (> remtries 0) + (begin + (thread-sleep! 2) + (loop (open-run-close tasks:server-lock-slot tasks:open-db run-id) + (- remtries 1))) + (begin + ;; since we didn't get the server lock we are going to clean up and bail out + (debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") + (open-run-close tasks:server-delete-records-for-this-pid tasks:open-db " rpc-transport:launch") + )) + (let* ((th2 (make-thread (lambda () + (rpc-transport:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-") + run-id + server-id)) "Server run")) + (th3 (make-thread (lambda () + (rpc-transport:keep-running server-id)) + "Keep running"))) + ;; Database connection + (set! *inmemdb* (db:setup run-id)) + (thread-start! th2) + (thread-start! th3) + (set! *didsomething* #t) + (thread-join! th2) + (exit))))) + +(define (rpc-transport:run db hostn run-id) + (debug:print 2 "Attempting to start the server ...") + (let* ((db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily + (hostname (get-host-name)) + (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + (server:get-best-guess-address hostname) + #f))) + (if ipstr ipstr hostn))) ;; hostname))) + (start-port (open-run-close tasks:server-get-next-port tasks:open-db)) + (link-tree-path (configf:lookup *configdat* "setup" "linktree")) + (rpc:listener (rpc-transport:find-free-port-and-open (rpc:default-server-port))) + (th1 (make-thread + (cute (rpc:make-server rpc:listener) "rpc:server") + 'rpc:server)) + ;; (th2 (make-thread (lambda ()(db:updater)))) + (hostname (if (string=? "-" hostn) + (get-host-name) + hostn)) + (ipaddrstr (if (string=? "-" hostn) + (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + #f)) + (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" (rpc:default-server-port)))) + (set! db *inmemdb*) + (debug:print 0 "Server started on " host:port) + (db:set-var db "SERVER" host:port) + (set! *cache-on* #t) + + ;; can use this to run most anything at the remote + (rpc:publish-procedure! + 'remote:run + (lambda (procstr . params) + (rpc-transport:autoremote procstr params))) + + ;; (rpc:publish-procedure! + ;; 'server:login + ;; (lambda (toppath) + ;; (set! *last-db-access* (current-seconds)) + ;; (if (equal? *toppath* toppath) + ;; (begin + ;; (debug:print-info 2 "login successful") + ;; #t) + ;; #f))) + ;; + ;; ;;====================================================================== + ;; ;; db specials here + ;; ;;====================================================================== + ;; ;; remote call to open-run-close + ;; (rpc:publish-procedure! + ;; 'rdb:open-run-close + ;; (lambda (procname . remargs) + ;; (debug:print-info 12 "Remote call of rdb:open-run-close " procname " " remargs) + ;; (set! *last-db-access* (current-seconds)) + ;; (apply open-run-close (eval procname) remargs))) + ;; + ;; (rpc:publish-procedure! + ;; 'cdb:test-set-status-state + ;; (lambda (test-id status state msg) + ;; (debug:print-info 12 "Remote call of cdb:test-set-status-state test-id=" test-id ", status=" status ", state=" state ", msg=" msg) + ;; (cdb:test-set-status-state test-id status state msg))) + ;; + ;; (rpc:publish-procedure! + ;; 'cdb:test-rollup-test_data-pass-fail + ;; (lambda (test-id) + ;; (debug:print-info 12 "Remote call of cdb:test-rollup-test_data-pass-fail " test-id) + ;; (cdb:test-rollup-test_data-pass-fail test-id))) + ;; + ;; (rpc:publish-procedure! + ;; 'cdb:pass-fail-counts + ;; (lambda (test-id fail-count pass-count) + ;; (debug:print-info 12 "Remote call of cdb:pass-fail-counts " test-id " passes: " pass-count " fails: " fail-count) + ;; (cdb:pass-fail-counts test-id fail-count pass-count))) + ;; + ;; (rpc:publish-procedure! + ;; 'cdb:tests-register-test + ;; (lambda (db run-id test-name item-path) + ;; (debug:print-info 12 "Remote call of cdb:tests-register-test " run-id " testname: " test-name " item-path: " item-path) + ;; (cdb:tests-register-test db run-id test-name item-path))) + ;; + ;; (rpc:publish-procedure! + ;; 'cdb:flush-queue + ;; (lambda () + ;; (debug:print-info 12 "Remote call of cdb:flush-queue") + ;; (cdb:flush-queue))) + ;; + ;;====================================================================== + ;; ;; end of publish-procedure section + ;;====================================================================== + ;; + (set! *rpc:listener* rpc:listener) + (on-exit (lambda () + (open-run-close + (lambda (db . params) + (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER' and val=?;" host:port)) + #f ;; for db + #f) ;; for a param + (let loop ((n 0)) + (let ((queue-len 0)) + (thread-sleep! (random 5)) + (mutex-lock! *incoming-mutex*) + (set! queue-len (length *incoming-data*)) + (mutex-unlock! *incoming-mutex*) + (if (> queue-len 0) + (begin + (debug:print-info 0 "Queue not flushed, waiting ...") + (loop (+ n 1))))) + ))) + (db:updater) + (thread-start! th1) + ;; (debug:print 0 "Server started on port " (rpc:default-server-port) "...") + ;; (thread-start! th2) + ;; (thread-join! th2) + ;; return th2 for the calling process to do a join with + th1 + )))) ;; rpc:server))) + +(define (rpc-transport:keep-running db host:port) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + (let loop ((count 0)) + (thread-sleep! 20) ;; no need to do this very often + (let ((numrunning (db:get-count-tests-running db))) + (if (or (> numrunning 0) + (> (+ *last-db-access* 60)(current-seconds))) + (begin + (debug:print-info 0 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*)) + (loop (+ 1 count))) + (begin + (debug:print-info 0 "Starting to shutdown the server side") + ;; need to delete only *my* server entry (future use) + (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER' AND val like ?;" host:port) + (thread-sleep! 10) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + ;; (exit))) + ))))) + +(define (rpc-transport:find-free-port-and-open port) + (handle-exceptions + exn + (begin + (print "Failed to bind to port " (rpc:default-server-port) ", trying next port") + (rpc-transport:find-free-port-and-open (+ port 1))) + (rpc:default-server-port port) + (tcp-read-timeout 240000) + (tcp-listen (rpc:default-server-port) 10000))) + +(define (rpc:ping run-id host-port) + #f) + +(define (rpc-transport:client-setup) + (if *runremote* + (begin + (debug:print 0 "ERROR: Attempt to connect to server but already connected") + #f) + (let* ((hostinfo (open-run-close db:get-var #f "SERVER")) + (hostdat (if hostinfo (string-split hostinfo ":") #f)) + (host (if hostinfo (car hostdat) #f)) + (port (if (and hostinfo (> (length hostdat) 1))(cadr hostdat) #f))) + (if (and port + (string->number port)) + (let ((portn (string->number port))) + (debug:print-info 2 "Setting up to connect to host " host ":" port) + (handle-exceptions + exn + (begin + (debug:print 0 "ERROR: Failed to open a connection to the server at host: " host " port: " port) + (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) + ;; (open-run-close + ;; (lambda (db . param) + ;; (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER'")) + ;; #f) + (set! *runremote* #f)) + (if (and (not (args:get-arg "-server")) ;; no point in the server using the server using the server + ((rpc:procedure 'server:login host portn) *toppath*)) + (begin + (debug:print-info 2 "Logged in and connected to " host ":" port) + (set! *runremote* (vector host portn))) + (begin + (debug:print-info 2 "Failed to login or connect to " host ":" port) + (set! *runremote* #f))))) + (debug:print-info 2 "no server available")))))