@@ -13,18 +13,18 @@ (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) (import (prefix sqlite3 sqlite3:)) (use spiffy uri-common intarweb http-client spiffy-request-vars) -(tcp-buffer-size 2048) - (declare (unit server)) (declare (uses common)) (declare (uses db)) (declare (uses tests)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. +(declare (uses http-transport)) +(declare (uses zmq-transport)) (include "common_records.scm") (include "db_records.scm") (define (server:make-server-url hostport) @@ -107,70 +107,10 @@ (server:try-start-server ipaddrstr start-port) ;; lite3:finalize! db))) )) - -;; (define (server:main-loop) -;; (print "INFO: Exectuing main server loop") -;; (access-log "megatest-http.log") -;; (server-bind-address #f) -;; (define-page (main-page-path) -;; (lambda () -;; (let ((dat ($ "dat"))) -;; ;; (with-request-variables (dat) -;; (debug:print-info 12 "Got dat=" dat) -;; (let* ((packet (db:string->obj dat)) -;; (qtype (cdb:packet-get-qtype packet))) -;; (debug:print-info 12 "server=> received packet=" packet) -;; (if (not (member qtype '(sync ping))) -;; (begin -;; (mutex-lock! *heartbeat-mutex*) -;; (set! *last-db-access* (current-seconds)) -;; (mutex-unlock! *heartbeat-mutex*))) -;; (let ((res (open-run-close db:process-queue-item open-db packet))) -;; (debug:print-info 11 "Return value from db:process-queue-item is " res) -;; res)))))) - -;;; (use spiffy uri-common intarweb) -;;; -;;; (root-path "/var/www") -;;; -;;; (vhost-map `(((* any) . ,(lambda (continue) -;;; (if (equal? (uri-path (request-uri (current-request))) -;;; '(/ "hey")) -;;; (send-response body: "hey there!\n" -;;; headers: '((content-type text/plain))) -;;; (continue)))))) -;;; -;;; (start-server port: 12345) - -;; This is recursively run by server:run until sucessful -;; -(define (server:try-start-server ipaddrstr portnum) - (handle-exceptions - exn - (begin - (print-error-message exn) - (if (< portnum 9000) - (begin - (print "WARNING: failed to start on portnum: " portnum ", trying next port") - (thread-sleep! 0.1) - (open-run-close tasks:remove-server-records tasks:open-db) - (server:try-start-server ipaddrstr (+ portnum 1))) - (print "ERROR: Tried and tried but could not start the server"))) - (set! *runremote* (list ipaddrstr portnum)) - (open-run-close tasks:remove-server-records tasks:open-db) - (open-run-close tasks:server-register - tasks:open-db - (current-process-id) - ipaddrstr portnum 0 'live) - (print "INFO: Trying to start server on " ipaddrstr ":" portnum) - ;; This starts the spiffy server - (start-server port: portnum) - (print "INFO: server has been stopped"))) - (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) @@ -187,11 +127,14 @@ (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) ;; (send-message pubsock target send-more: #t) ;; (send-message pubsock (case *transport-type* ((fs) result) - ((http)(db:obj->string (vector success/fail query-sig result))))) + ((http)(db:obj->string (vector success/fail query-sig result))) + ((zmq) + (send-message pubsock target send-more: #t) + (send-message pubsock (db:obj->string (vector success/fail query-sig result)))) ;;====================================================================== ;; C L I E N T S ;;====================================================================== @@ -199,54 +142,17 @@ (if *my-client-signature* *my-client-signature* (let ((sig (server:mk-signature))) (set! *my-client-signature* sig) *my-client-signature*))) -;; -;; -;; 1 Hello, world! Goodbye Dolly -;; Send msg to serverdat and receive result -(define (server:client-send-receive serverdat msg) - (let* ((url (server:make-server-url serverdat)) - (fullurl (conc url "/ctrl")) ;; (conc url "/?dat=" msg))) - (numretries 0)) - (handle-exceptions - exn - (if (< numretries 200) - (server:client-send-receive serverdat msg)) - (begin - (debug:print-info 11 "fullurl=" fullurl "\n") - ;; set up the http-client here - (max-retry-attempts 100) - (retry-request? (lambda (request) - (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10)) - (set! numretries (+ numretries 1)) - #t)) - ;; send the data and get the response - ;; extract the needed info from the http data and - ;; process and return it. - (let* ((res (with-input-from-request fullurl - ;; #f - ;; msg - (list (cons 'dat msg)) - read-string))) - (debug:print-info 11 "got res=" res) - (let ((match (string-search (regexp "(.*)<.body>") res))) - (debug:print-info 11 "match=" match) - (let ((final (cadr match))) - (debug:print-info 11 "final=" final) - final))))))) - (define (server:client-login serverdat) - (max-retry-attempts 100) (cdb:login serverdat *toppath* (server:get-client-signature))) ;; Not currently used! But, I think it *should* be used!!! (define (server:client-logout serverdat) (let ((ok (and (socket? serverdat) (cdb:logout serverdat *toppath* (server:get-client-signature))))) - ;; (close-socket serverdat) ok)) (define (server:client-connect iface port) (let* ((login-res #f) (serverdat (list iface port))) @@ -269,134 +175,51 @@ (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: failed to find megatest.config, exiting") (exit)))) - (case *transport-type* + (let* ((hostinfo (if (not *transport-type*) ;; If we dont' already have transport type set then figure it out + (open-run-close tasks:get-best-server tasks:open-db) + #f))) + ;; if have hostinfo then extract the transport type + ;; else fall back to fs + ;; + (set! *transport-type* (if hostinfo + (string->vector (tasks:hostinfo-get-transport hostinfo)) + 'fs)) + (debug:print-info 1 "Using transport type of " *transport-type* (if hostinfo (conc " to connect to " hostinfo) "")) + (case *transport-type* ((fs)(if (not *megatest-db*)(set! *megatest-db* (open-db)))) ((http) - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (if hostinfo - (let ((host (list-ref hostinfo 0)) - (iface (list-ref hostinfo 1)) - (port (list-ref hostinfo 2)) - (pid (list-ref hostinfo 3))) - (debug:print-info 2 "Setting up to connect to " hostinfo) - (server:client-connect iface port)) ;; ) - (begin - (debug:print 0 "ERROR: No server found, exiting") - (exit))))))) - -;; (if (> numtries 0) -;; (let ((exe (car (argv))) -;; (pid #f)) -;; (debug:print-info 0 "No server available, attempting to start one...") -;; (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) -;; (string-intersperse *verbosity* ",") -;; (conc *verbosity*))))) -;; ;; (set! pid (process-fork (lambda () -;; ;; (current-input-port (open-input-file "/dev/null")) -;; ;; (current-output-port (open-output-file "/dev/null")) -;; ;; (current-error-port (open-output-file "/dev/null")) -;; ;; (server:launch)))) -;; (let loop ((count 0)) -;; (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) -;; (if (not hostinfo) -;; (begin -;; (debug:print-info 0 "Waiting for server pid=" pid " to start") -;; (sleep 2) ;; give server time to start -;; (if (< count 5) -;; (loop (+ count 1))))))) -;; ;; we are starting a server, do not try again! That can lead to -;; ;; recursively starting many processes!!! -;; (server:client-setup numtries: 0)) -;; (debug:print-info 1 "Too many attempts, giving up"))))) - -;; run server:keep-running in a parallel thread to monitor that the db is being -;; used and to shutdown after sometime if it is not. -;; -(define (server:keep-running) - ;; if none running or if > 20 seconds since - ;; server last used then start shutdown - ;; This thread waits for the server to come alive - (let* ((server-info (let loop () - (let ((sdat #f)) - (mutex-lock! *heartbeat-mutex*) - (set! sdat *runremote*) - (mutex-unlock! *heartbeat-mutex*) - (if sdat sdat - (begin - (sleep 4) - (loop)))))) - (iface (car server-info)) - (port (cadr server-info)) - (last-access 0) - (tdb (tasks:open-db)) - (spid (tasks:server-get-server-id tdb #f iface port #f))) - (print "Keep-running got server pid " spid ", using iface " iface " and port " port) - (let loop ((count 0)) - (thread-sleep! 4) ;; no need to do this very often - ;; NB// sync currently does NOT return queue-length - (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) - ;; (print "Server running, count is " count) - (if (< count 1) ;; 3x3 = 9 secs aprox - (loop (+ count 1))) - - ;; NOTE: Get rid of this mechanism! It really is not needed... - (tasks:server-update-heartbeat tdb spid) - - ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access - (mutex-lock! *heartbeat-mutex*) - (set! last-access *last-db-access*) - (mutex-unlock! *heartbeat-mutex*) - (if (> (+ last-access - ;; (* 50 60 60) ;; 48 hrs - ;; 60 ;; one minute - ;; (* 60 60) ;; one hour - (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. - ) - (current-seconds)) - (begin - (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) - (loop 0)) - (begin - (debug:print-info 0 "Starting to shutdown the server.") - ;; need to delete only *my* server entry (future use) - (set! *time-to-exit* #t) - (tasks:server-deregister-self tdb (get-host-name)) - (thread-sleep! 1) - (debug:print-info 0 "Max cached queries was " *max-cache-size*) - (debug:print-info 0 "Server shutdown complete. Exiting") - (exit))))))) + (http-transport:client-connect (tasks:hostinfo-get-interface hostinfo) + (tasks:hostinfo-get-port hostinfo))) + ((zmq) + (zmq-transport:client-connect (tasks:hostinfo-get-interface hostinfo) + (tasks:hostinfo-get-port hostinfo) + (tasks:hostinfo-get-pubport hostinfo))) + (else ;; default to fs + (set! *transport-type* 'fs) + (set! *megatest-db* (open-db)))))) + + ;; all routes though here end in exit ... -(define (server:launch) +(define (server:launch transport) (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, exiting") (exit)))) - (debug:print-info 2 "Starting the standalone server") - (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) - (debug:print 11 "server:launch hostinfo=" hostinfo) - (if hostinfo - (debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) - (if *toppath* - (let* ((th2 (make-thread (lambda () - (server:run - (if (args:get-arg "-server") - (args:get-arg "-server") - "-"))) "Server run")) - (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) - ) - (thread-start! th2) - (thread-start! th3) - (set! *didsomething* #t) - (thread-join! th2) - ) - (debug:print 0 "ERROR: Failed to setup for megatest"))) - (exit))) + (debug:print-info 2 "Starting server using " transport " transport") + (set! *transport-type* transport) + (case transport + ((fs) (exit)) ;; there is no "fs" transport + ((http) (http-transport:launch)) + ((zmq) (zmq-transport:launch)) + (else + (debug:print "WARNING: unrecognised transport " transport) + (exit)))) (define (server:client-signal-handler signum) (handle-exceptions exn (debug:print " ... exiting ...")