Index: TODO ================================================================== --- TODO +++ TODO @@ -16,10 +16,13 @@ # along with Megatest. If not, see . TODO ==== +NextSteps +. Remove servermod.scm + WW15 . fill newview matrix with data, filter pipeline gui elements . improve [script], especially indent handling WW16 Index: http-transportmod.scm ================================================================== --- http-transportmod.scm +++ http-transportmod.scm @@ -65,17 +65,17 @@ srfi-69 stack system-information typed-records z3 - spiffy - uri-common - intarweb - http-client - spiffy-request-vars - intarweb - spiffy-directory-listing + spiffy + uri-common + intarweb + http-client + spiffy-request-vars + intarweb + spiffy-directory-listing (prefix mtargs args:) commonmod configfmod debugprint @@ -85,841 +85,7 @@ portloggermod pkts ) -;; (require-extension (srfi 18) extras tcp s11n) -;; -;; -;; (use srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest posix-extras) -;; -;; (use spiffy uri-common intarweb http-client spiffy-request-vars intarweb spiffy-directory-listing) -;; -;; Configurations for server -(tcp-buffer-size 2048) -(max-connections 2048) - -(defstruct servdat - (host #f) - (port #f) - (uuid #f) - (dbfile #f) - (api-url #f) - (api-uri #f) - (api-req #f)) - -(define (servdat->url sdat) - (conc (servdat-host sdat)":"(servdat-port sdat))) - -;; this must be set up prior to making calls -(define api-proc (make-parameter conc)) - -(define (http-transport:make-server-url hostport) - (if (not hostport) - #f - (conc "http://" (car hostport) ":" (cadr hostport)))) - -;;====================================================================== -;; S E R V E R -;; ====================================================================== - -;; NOTE: http-transport:launch is the entry point -;; -> http-transport:run -;; -> http-transport:try-start-server -> http-transport:try-start-server (until success) - -(define (http-get-function fnkey) - (hash-table-ref/default *http-functions* fnkey (lambda () "nothing here yet"))) - -(define (http-handle-api dbstruct $) - (if (api-proc) - ((api-proc) dbstruct $) ;; ($) => alist - 'no-api-proc-set)) - -(define (http-transport:run hostn) - ;; Configurations for server - (tcp-buffer-size 2048) - (max-connections 2048) - (debug:print 2 *default-log-port* "Attempting to start the server ...") - (let* ((db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily - (hostname (get-host-name)) - (ipaddrstr (let ((ipstr (if (string=? "-" hostn) - ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") - (server:get-best-guess-address hostname) - #f))) - (if ipstr ipstr hostn))) ;; hostname))) - (start-port (portlogger:open-run-close portlogger:find-port)) - (link-tree-path (common:get-linktree)) - (tmp-area (common:get-db-tmp-area)) - #;(start-file (conc tmp-area "/.server-start"))) - (debug:print-info 0 *default-log-port* "portlogger recommended port: " start-port) - ;; set some parameters for the server - (root-path (if link-tree-path - link-tree-path - (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP! - (handle-directory spiffy-directory-listing) - #;(handle-exception (lambda (exn chain) - (signal (make-composite-condition - (make-property-condition - 'server - 'message "server error"))))) - - ;; Setup the web server and a /ctrl interface - ;; - (vhost-map `(((* any) . ,(lambda (continue) - ;; open the db on the first call - ;; This is were we set up the database connections - (let* (($ (request-vars source: 'both)) - ;; (dat ($ 'dat)) - (res #f)) - (cond - ((equal? (uri-path (request-uri (current-request))) - '(/ "api")) - (debug:print 0 *default-log-port* "In api request $=" $) - (send-response ;; the $ is the request vars proc - body: (http-handle-api *dbstruct-db* $) - headers: '((content-type text/plain))) - (set! *db-last-access* (current-seconds))) - ((equal? (uri-path (request-uri (current-request))) - '(/ "ping")) - (send-response body: (conc *toppath*"/"(args:get-arg "-db")) - headers: '((content-type text/plain)))) - ((equal? (uri-path (request-uri (current-request))) - '(/ "loop-test")) - (send-response body: (alist-ref 'data ($)) - headers: '((content-type text/plain)))) - ((equal? (uri-path (request-uri (current-request))) - '(/ "")) - (send-response body: ((http-get-function 'http-transport:main-page)))) - ((equal? (uri-path (request-uri (current-request))) - '(/ "json_api")) - (send-response body: ((http-get-function 'http-transport:main-page)))) - ((equal? (uri-path (request-uri (current-request))) - '(/ "runs")) - (send-response body: ((http-get-function 'http-transport:main-page)))) - ((equal? (uri-path (request-uri (current-request))) - '(/ any)) - (send-response body: "hey there!\n" - headers: '((content-type text/plain)))) - ((equal? (uri-path (request-uri (current-request))) - '(/ "hey")) - (send-response body: "hey there!\n" - headers: '((content-type text/plain)))) - ((equal? (uri-path (request-uri (current-request))) - '(/ "jquery3.1.0.js")) - (send-response body: ((http-get-function 'http-transport:show-jquery)) - headers: '((content-type application/javascript)))) - ((equal? (uri-path (request-uri (current-request))) - '(/ "test_log")) - (send-response body: ((http-get-function 'http-transport:html-test-log) $) - headers: '((content-type text/HTML)))) - ((equal? (uri-path (request-uri (current-request))) - '(/ "dashboard")) - (send-response body: ((http-get-function 'http-transport:html-dboard) $) - headers: '((content-type text/HTML)))) - (else (continue)))))))) - #;(handle-exceptions - exn - (debug:print 0 *default-log-port* "Failed to create file " start-file ", exn=" exn) - (with-output-to-file start-file (lambda ()(print (current-process-id))))) - (http-transport:try-start-server ipaddrstr start-port))) - -;; This is recursively run by http-transport:run until sucessful -;; -(define (http-transport:try-start-server ipaddrstr portnum) - (let ((config-hostname (configf:lookup *configdat* "server" "hostname")) - (config-use-proxy (equal? (configf:lookup *configdat* "client" "use-http_proxy") "yes"))) - (if (not config-use-proxy) - (determine-proxy (constantly #f))) - (debug:print-info 0 *default-log-port* "http-transport:try-start-server time=" (seconds->time-string (current-seconds)) " ipaddrsstr=" ipaddrstr " portnum=" portnum " config-hostname=" config-hostname) - (handle-exceptions - exn - (begin - (print-error-message exn) - (if (< portnum 64000) - (begin - (debug:print 0 *default-log-port* "WARNING: attempt to start server failed. Trying again ...") - (debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn)) - (debug:print 5 *default-log-port* "exn=" (condition->list exn)) - (portlogger:open-run-close portlogger:set-failed portnum) - (debug:print 0 *default-log-port* "WARNING: failed to start on portnum: " portnum ", trying next port") - (thread-sleep! 0.1) - - ;; get_next_port goes here - (http-transport:try-start-server ipaddrstr - (portlogger:open-run-close portlogger:find-port))) - (begin - (print "ERROR: Tried and tried but could not start the server")))) - ;; any error in following steps will result in a retry - (set! *server-info* (make-servdat host: ipaddrstr port: portnum)) - (debug:print 0 *default-log-port* "INFO: Trying to start server on " ipaddrstr ":" portnum) - ;; This starts the spiffy server - ;; NEED WAY TO SET IP TO #f TO BIND ALL - ;; (start-server bind-address: ipaddrstr port: portnum) - (if config-hostname ;; this is a hint to bind directly - (start-server port: portnum bind-address: (if (equal? config-hostname "-") - ipaddrstr - config-hostname)) - (start-server port: portnum)) - (portlogger:open-run-close portlogger:set-port portnum "released") - (debug:print 1 *default-log-port* "INFO: server has been stopped")))) - -;;====================================================================== -;; S E R V E R U T I L I T I E S -;;====================================================================== - -;;====================================================================== -;; C L I E N T S -;;====================================================================== - - -(define (http-transport:get-time-to-cleanup) - (let ((res #f)) - (mutex-lock! *http-mutex*) - (set! res (> (current-seconds) *http-connections-next-cleanup*)) - (mutex-unlock! *http-mutex*) - res)) - -(define (http-transport:inc-requests-count) - (mutex-lock! *http-mutex*) - (set! *http-requests-in-progress* (+ 1 *http-requests-in-progress*)) - ;; Use this opportunity to slow things down iff there are too many requests in flight - (if (> *http-requests-in-progress* 5) - (begin - (debug:print-info 0 *default-log-port* "Whoa there buddy, ease up...") - (thread-sleep! 1))) - (mutex-unlock! *http-mutex*)) - -(define (http-transport:dec-requests-count proc) - (mutex-lock! *http-mutex*) - (proc) - (set! *http-requests-in-progress* (- *http-requests-in-progress* 1)) - (mutex-unlock! *http-mutex*)) - -(define (http-transport:dec-requests-count-and-close-all-connections) - (set! *http-requests-in-progress* (- *http-requests-in-progress* 1)) - (let loop ((etime (+ (current-seconds) 5))) ;; give up in five seconds - (if (> *http-requests-in-progress* 0) - (if (> etime (current-seconds)) - (begin - (thread-sleep! 0.052) - (loop etime)) - (debug:print-error 0 *default-log-port* "requests still in progress after 5 seconds of waiting. I'm going to pass on cleaning up http connections")) - (close-idle-connections!))) - (set! *http-connections-next-cleanup* (+ (current-seconds) 10)) - (mutex-unlock! *http-mutex*)) - -(define (http-transport:inc-requests-and-prep-to-close-all-connections) - (mutex-lock! *http-mutex*) - (set! *http-requests-in-progress* (+ 1 *http-requests-in-progress*))) - -;; serverdat contains uuid to be used for connection validation -;; -;; NOTE: serverdat must be initialized or created by servdat-init -;; -;; DO NOT USE. Moved to rmt:set-receive-real -;; -;; (define (http-transport:send-receive conn qry-key cmd params #!key (numretries 3)) -;; (let* ((res #f) -;; (success #t) -;; (sparams (with-output-to-string -;; (lambda ()(write params))))) -;; ;; send the data and get the response extract the needed info from -;; ;; the http data and process and return it. -;; (let* ((send-recieve (lambda () -;; (set! res -;; (with-input-from-request -;; (rmt:conn->uri conn "api") -;; (list (cons 'key qry-key) -;; ;; (cons 'srvid (servdat-uuid sdat)) -;; (cons 'cmd cmd) -;; (cons 'params sparams)) -;; read-string)))) -;; (time-out (lambda () -;; (thread-sleep! 45) -;; (debug:print 0 *default-log-port* "WARNING: send-receive took more than 45 seconds!!") -;; #f)) -;; (th1 (make-thread send-recieve "with-input-from-request")) -;; (th2 (make-thread time-out "time out"))) -;; (thread-start! th1) -;; (thread-start! th2) -;; (thread-join! th1) -;; (close-idle-connections!) -;; (thread-terminate! th2) -;; (if (string? res) -;; (with-input-from-string res -;; (lambda () read)) -;; res)))) - -;; careful closing of connections stored in *runremote* -;; -(define (http-transport:close-connections #!key (area-dat #f)) - (debug:print-info 0 *default-log-port* "http-transport:close-connections doesn't do anything now!")) -;; (let* ((runremote (or area-dat *runremote*)) -;; (server-dat (if runremote -;; (remote-conndat runremote) -;; #f))) ;; (hash-table-ref/default *runremote* run-id #f))) -;; (if (vector? server-dat) -;; (let ((api-dat (http-transport:server-dat-get-api-uri server-dat))) -;; (handle-exceptions -;; exn -;; (begin -;; (print-call-chain *default-log-port*) -;; (debug:print-error 0 *default-log-port* " closing connection failed with error: " ((condition-property-accessor 'exn 'message) exn) ", exn=" exn)) -;; (close-connection! api-dat) -;; ;;(close-idle-connections!) -;; #t)) -;; #f))) - - -(define (make-http-transport:server-dat)(make-vector 6)) -(define (http-transport:server-dat-get-iface vec) (vector-ref vec 0)) -(define (http-transport:server-dat-get-port vec) (vector-ref vec 1)) -(define (http-transport:server-dat-get-api-uri vec) (vector-ref vec 2)) -(define (http-transport:server-dat-get-api-url vec) (vector-ref vec 3)) -(define (http-transport:server-dat-get-api-req vec) (vector-ref vec 4)) -(define (http-transport:server-dat-get-last-access vec) (vector-ref vec 5)) -;(define (http-transport:server-dat-get-socket vec) (vector-ref vec 6)) -(define (http-transport:server-dat-get-server-id vec) (vector-ref vec 6)) - -(define (http-transport:server-dat-make-url vec) - (if (and (http-transport:server-dat-get-iface vec) - (http-transport:server-dat-get-port vec)) - (conc "http://" - (http-transport:server-dat-get-iface vec) - ":" - (http-transport:server-dat-get-port vec)) - #f)) - -(define (http-transport:server-dat-update-last-access vec) - (if (vector? vec) - (vector-set! vec 5 (current-seconds)) - (begin - (print-call-chain (current-error-port)) - (debug:print-error 0 *default-log-port* "call to http-transport:server-dat-update-last-access with non-vector!!")))) - -;; initialize servdat for client side, setup needed parameters -;; pass in #f as sdat-in to create sdat -;; -(define (servdat-init sdat-in iface port uuid) - (let* ((sdat (or sdat-in (make-servdat)))) - (if uuid (servdat-uuid-set! sdat uuid)) - (servdat-host-set! sdat iface) - (servdat-port-set! sdat port) - (servdat-api-url-set! sdat (conc "http://" iface ":" port "/api")) - (servdat-api-uri-set! sdat (uri-reference (servdat-api-url sdat))) - (servdat-api-req-set! sdat (make-request method: 'POST - uri: (servdat-api-uri sdat))) - ;; set up the http-client parameters - (max-retry-attempts 1) - ;; consider all requests indempotent - (retry-request? (lambda (request) - #f)) - (determine-proxy (constantly #f)) - sdat)) - -;;====================================================================== -;; NEW SERVER METHOD -;;====================================================================== - -;; only use for main.db - need to re-write some of this :( -;; -(define (get-lock-db sdat dbfile) - (let* ((dbh (db:open-run-db dbfile db:initialize-db)) - (res (db:get-iam-server-lock dbh dbfile))) - (sqlite3:finalize! dbh) - res)) - - -(define *srvpktspec* - `((server (host . h) - (port . p) - (servkey . k) - (pid . i) - (ipaddr . a) - (dbpath . d)))) - -(define (register-server pkts-dir pkt-spec host port servkey ipaddr dbpath) - (let* ((pkt-dat `((host . ,host) - (port . ,port) - (servkey . ,servkey) - (pid . ,(current-process-id)) - (ipaddr . ,ipaddr) - (dbpath . ,dbpath))) - (uuid (write-alist->pkt - pkts-dir - pkt-dat - pktspec: pkt-spec - ptype: 'server))) - (debug:print 0 *default-log-port* "Server on "host":"port" registered in pkt "uuid) - uuid)) - -(define (get-pkts-dir #!optional (apath #f)) - (let* ((effective-toppath (or *toppath* apath))) - (assert effective-toppath - "ERROR: get-pkts-dir called without *toppath* set. Exiting.") - (let* ((pdir (conc effective-toppath "/.meta/srvpkts"))) - (if (file-exists? pdir) - pdir - (begin - (create-directory pdir #t) - pdir))))) - -;; given a pkts dir read -;; -(define (get-all-server-pkts pktsdir-in pktspec) - (let* ((pktsdir (if (file-exists? pktsdir-in) - pktsdir-in - (begin - (create-directory pktsdir-in #t) - pktsdir-in))) - (all-pkt-files (glob (conc pktsdir "/*.pkt")))) - (map (lambda (pkt-file) - (read-pkt->alist pkt-file pktspec: pktspec)) - all-pkt-files))) - -(define (server-address srv-pkt) - (conc (alist-ref 'host srv-pkt) ":" - (alist-ref 'port srv-pkt))) - -(define (server-ready? host port key) ;; server-address is host:port - ;; ping the server and ask it - ;; if it ready - ;; (let* ((sdat (servdat-init #f host port #f))) - ;; (http-transport:send-receive sdat "abc" 'ping '()))) - (let* ((res (with-input-from-request - (conc "http://"host":"port"/ping") ;; returns *toppath*/dbname - #f - read-string))) - (if (equal? res key) - #t - (begin - (debug:print-info 0 *default-log-port* "server-ready? key="key", received="res) - #f)))) - -(define (loop-test host port data) ;; server-address is host:port - ;; ping the server and ask it - ;; if it ready - ;; (let* ((sdat (servdat-init #f host port #f))) - ;; (http-transport:send-receive sdat "abc" 'ping '()))) - (let* ((payload (sexpr->string data)) - (res (with-input-from-request - (conc "http://"host":"port"/loop-test") - `((data . ,payload)) - read-string))) - (string->sexpr res))) - -; from the pkts return servers associated with dbpath -;; NOTE: Only one can be alive - have to check on each -;; in the list of pkts returned -;; -(define (get-viable-servers serv-pkts dbpath) - (let loop ((tail serv-pkts) - (res '())) - (if (null? tail) - res ;; NOTE: sort by age so oldest is considered first - (let* ((spkt (car tail))) - (loop (cdr tail) - (if (equal? dbpath (alist-ref 'dbpath spkt)) - (cons spkt res) - res)))))) - -;; from viable servers get one that is alive and ready -;; -(define (get-the-server serv-pkts) - (let loop ((tail serv-pkts)) - (if (null? tail) - #f - (let* ((spkt (car tail)) - (host (alist-ref 'ipaddr spkt)) - (port (alist-ref 'port spkt)) - (dbpth (alist-ref 'dbpath spkt)) - (addr (server-address spkt))) - (if (server-ready? host port dbpth) - spkt - (loop (cdr tail))))))) - -;; am I the "first" in line server? I.e. my D card is smallest -;; use Z card as tie breaker -;; -(define (get-best-candidate serv-pkts dbpath) - (if (null? serv-pkts) - #f - (let loop ((tail serv-pkts) - (best (car serv-pkts))) - (if (null? tail) - best - (let* ((candidate (car tail)) - (candidate-bd (string->number (alist-ref 'D candidate))) - (best-bd (string->number (alist-ref 'D best))) - ;; bigger number is younger - (candidate-z (alist-ref 'Z candidate)) - (best-z (alist-ref 'Z best)) - (new-best (cond - ((> best-bd candidate-bd) ;; best is younger than candidate - candidate) - ((< best-bd candidate-bd) ;; candidate is younger than best - best) - (else - (if (string>=? best-z candidate-z) - best - candidate))))) ;; use Z card as tie breaker - (if (null? tail) - new-best - (loop (cdr tail) new-best))))))) - - - -;;====================================================================== -;; END NEW SERVER METHOD -;;====================================================================== - -(define (http-transport:wait-for-server pkts-dir db-file server-key) - (let* ((sdat *server-info*)) - (let loop ((start-time (current-seconds)) - (changed #t) - (last-sdat "not this")) - (begin ;; let ((sdat #f)) - (thread-sleep! 0.01) - (debug:print-info 0 *default-log-port* "Waiting for server alive signature") - (mutex-lock! *heartbeat-mutex*) - (set! sdat *server-info*) - (mutex-unlock! *heartbeat-mutex*) - (if (and sdat - (not changed) - (> (- (current-seconds) start-time) 2)) - (begin - (debug:print-info 0 *default-log-port* "Received server alive signature, now attempting to lock in server") - ;; create a server pkt in *toppath*/.meta/srvpkts - - ;; TODO: - ;; 1. change sdat to stuct - ;; 2. add uuid to struct - ;; 3. update uuid in sdat here - ;; - (servdat-uuid-set! sdat - (register-server - pkts-dir *srvpktspec* - (get-host-name) - (servdat-port sdat) server-key - (servdat-host sdat) db-file)) - - ;; now read pkts and see if we are a contender - (let* ((all-pkts (get-all-server-pkts pkts-dir *srvpktspec*)) - (viables (get-viable-servers all-pkts db-file)) - (best-srv (get-best-candidate viables db-file)) - (best-srv-key (if best-srv (alist-ref 'servkey best-srv) #f))) - (debug:print 0 *default-log-port* "best-srv-key: "best-srv-key", server-key: "server-key) - ;; am I the best-srv, compare server-keys to know - (if (equal? best-srv-key server-key) - (if (get-lock-db sdat db-file) ;; (db:get-iam-server-lock *dbstruct-db* *toppath* run-id) - (begin - (debug:print 0 *default-log-port* "I'm the server!") - (servdat-dbfile-set! sdat db-file)) - (begin - (debug:print 0 *default-log-port* "I'm not the server, exiting.") - (bdat-time-to-exit-set! *bdat* #t) - (thread-sleep! 0.2) - (exit))) - (begin - (debug:print 0 *default-log-port* - "Keys do not match "best-srv-key", "server-key", exiting.") - (bdat-time-to-exit-set! *bdat* #t) - (thread-sleep! 0.2) - (exit))) - sdat)) - (begin ;; sdat not yet contains server info - (debug:print-info 0 *default-log-port* "Still waiting, last-sdat=" last-sdat) - (sleep 4) - (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes - (begin - (debug:print-error 0 *default-log-port* "transport appears to have died, exiting server") - (exit)) - (loop start-time - (equal? sdat last-sdat) - sdat)))))))) - -;; run http-transport:keep-running in a parallel thread to monitor that the db is being -;; used and to shutdown after sometime if it is not. -;; -(define (http-transport:keep-running dbname) - ;; if none running or if > 20 seconds since - ;; server last used then start shutdown - ;; This thread waits for the server to come alive - (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server") - (let* ((run-id (let ((rid (args:get-arg "-run-id"))) ;; consider getting rid of the -run-id mechanism - (if rid ;; replace with -db - (string->number rid) - #f))) - (db-file (if dbname - (db:dbname->path *toppath* dbname) - (db:run-id->path *toppath* run-id))) - (sdat #f) - ;; (tmp-area (common:get-db-tmp-area)) - (server-start-time (current-seconds)) - (pkts-dir (get-pkts-dir)) - (server-key (server:mk-signature)) - (server-info (http-transport:wait-for-server pkts-dir db-file server-key )) - (iface (servdat-host server-info)) - (port (servdat-port server-info)) - (last-access 0) - (server-timeout (server:expiration-timeout)) - (server-log-file (args:get-arg "-log"))) ;; always set when we are a server - - (let loop ((count 0) - (server-state 'available) - (bad-sync-count 0) - (start-time (current-milliseconds))) - ;; Use this opportunity to sync the tmp db to megatest.db NOTE: This conflicts with the watchdog syncing? - (if (not *dbstruct-db* ) - (let ((watchdog (bdat-watchdog *bdat*))) - (debug:print 0 *default-log-port* "SERVER: dbprep") - - (db:setup dbname) ;; sets *dbstruct-db* as side effect - ;; NOW REGISTER THE SERVER in main.db - - - - - - - - - - - - - - - - - - - - - (debug:print 0 *default-log-port* "SERVER: running, megatest version: " (common:get-full-version)) ;; NOTE: the server is NOT yet marked as running in the log. We do that in the keep-running routine. - (if watchdog - (if (not (member (thread-state watchdog) '(ready running blocked sleeping dead))) - (begin - (debug:print-info 0 *default-log-port* "Starting watchdog thread (in state "(thread-state watchdog)")") - (thread-start! watchdog))) - (debug:print 0 *default-log-port* "ERROR: *watchdog* not setup, cannot start it.")))) - - ;; when things go wrong we don't want to be doing the various queries too often - ;; so we strive to run this stuff only every four seconds or so. - (let* ((sync-time (- (current-milliseconds) start-time)) - (rem-time (quotient (- 4000 sync-time) 1000))) - (if (and (<= rem-time 4) - (> rem-time 0)) - (thread-sleep! rem-time))) - - (if (< count 1) ;; 3x3 = 9 secs aprox - (loop (+ count 1) 'running bad-sync-count (current-milliseconds))) - - ;; Check that iface and port have not changed (can happen if server port collides) - (mutex-lock! *heartbeat-mutex*) - (set! sdat *server-info*) - (mutex-unlock! *heartbeat-mutex*) - - (if (or (not (equal? (servdat-host sdat) iface)) - (not (equal? (servdat-port sdat) port))) - (let ((new-iface (servdat-host sdat)) - (new-port (servdat-port sdat))) - (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info") - (set! iface new-iface) - (set! port new-port) - (if (not *server-id*) - (set! *server-id* (server:mk-signature))) - ;; (debug:print 0 *default-log-port* (current-seconds) (current-directory) (current-process-id) (argv)) - (debug:print 0 *default-log-port* "SERVER STARTED: " iface ":" port " AT " (current-seconds) " server-id: " *server-id*) - (flush-output *default-log-port*))) - - ;; Transfer *db-last-access* to last-access to use in checking that we are still alive - (mutex-lock! *heartbeat-mutex*) - (set! last-access *db-last-access*) - (mutex-unlock! *heartbeat-mutex*) - - (if (common:low-noise-print 120 (conc "server running on " iface ":" port)) - (begin - (if (not *server-id*) - (set! *server-id* (server:mk-signature))) - ;; (debug:print 0 *default-log-port* (current-seconds) (current-directory) (current-process-id) (argv)) - (debug:print 0 *default-log-port* "SERVER STARTED: " iface ":" port " AT " (current-seconds) " server-id: " *server-id*) - (flush-output *default-log-port*))) - (if (common:low-noise-print 60 "dbstats") - (begin - (debug:print 0 *default-log-port* "Server stats:") - (db:print-current-query-stats))) - (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600))) - (cond - ((and *server-run* - (> (+ last-access server-timeout) - (current-seconds))) - (if (common:low-noise-print 120 "server continuing") - (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) - (let ((curr-time (current-seconds))) - (handle-exceptions - exn - (debug:print 0 *default-log-port* "ERROR: Failed to change timestamp on log file " server-log-file ". Are you out of space on that disk? exn=" exn) - (if (and server-log-file (not *server-overloaded*)) - (set-file-times! server-log-file curr-time curr-time))))) - (loop 0 server-state bad-sync-count (current-milliseconds))) - (else - (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access)) - (http-transport:server-shutdown port))))))) - -(define (http-transport:server-shutdown port) - (begin - ;;(BB> "http-transport:server-shutdown called") - (debug:print-info 0 *default-log-port* "Starting to shutdown the server. pid="(current-process-id)) - ;; - ;; start_shutdown - ;; - (bdat-time-to-exit-set! *bdat* #t) ;; tell on-exit to be fast as we've already cleaned up - (portlogger:open-run-close portlogger:set-port port "released") - (thread-sleep! 1) - - ;; (debug:print-info 0 *default-log-port* "Max cached queries was " *max-cache-size*) - ;; (debug:print-info 0 *default-log-port* "Number of cached writes " *number-of-writes*) - ;; (debug:print-info 0 *default-log-port* "Average cached write time " - ;; (if (eq? *number-of-writes* 0) - ;; "n/a (no writes)" - ;; (/ *writes-total-delay* - ;; *number-of-writes*)) - ;; " ms") - ;; (debug:print-info 0 *default-log-port* "Number non-cached queries " *number-non-write-queries*) - ;; (debug:print-info 0 *default-log-port* "Average non-cached time " - ;; (if (eq? *number-non-write-queries* 0) - ;; "n/a (no queries)" - ;; (/ *total-non-write-delay* - ;; *number-non-write-queries*)) - ;; " ms") - - (db:print-current-query-stats) - (common:save-pkt `((action . exit) - (T . server) - (pid . ,(current-process-id))) - *configdat* #t) - (debug:print-info 0 *default-log-port* "Server shutdown complete. Exiting") - (exit))) - -;; Call this to start the actual server -;; - -;; all routes though here end in exit ... -;; -;; start_server? -;; -(define (http-transport:launch dbname) - (let* (;; (tmp-area (common:get-db-tmp-area)) - ;; (server-start (conc tmp-area "/.server-start")) - ;; (server-started (conc tmp-area "/.server-started")) - ;; (start-time (common:lazy-modification-time server-start)) - ;; (started-time (common:lazy-modification-time server-started)) - ;; (server-starting (< start-time started-time)) ;; if start-time is less than started-time then a server is still starting - ;; (start-time-old (> (- (current-seconds) start-time) 5)) - (cleanup-proc (lambda (msg) - (let* ((serv-fname (conc "server-" (current-process-id) "-" (get-host-name) ".log")) - (full-serv-fname (conc *toppath* "/logs/" serv-fname)) - (new-serv-fname (conc *toppath* "/logs/" "defunct-" serv-fname))) - (debug:print 0 *default-log-port* msg) - (if (common:file-exists? full-serv-fname) - (system (conc "sleep 1;mv -f " full-serv-fname " " new-serv-fname)) - (debug:print 0 *default-log-port* "INFO: cannot move " full-serv-fname " to " new-serv-fname)) - (exit))))) - #;(common:save-pkt `((action . start) - (T . server) - (pid . ,(current-process-id))) - *configdat* #t) - (let* ((th2 (make-thread (lambda () - (debug:print-info 0 *default-log-port* "Server run thread started") - (http-transport:run - (if (args:get-arg "-server") - (args:get-arg "-server") - "-") - )) "Server run")) - (th3 (make-thread (lambda () - (debug:print-info 0 *default-log-port* "Server monitor thread started") - (http-transport:keep-running dbname) - "Keep running")))) - (thread-start! th2) - (thread-sleep! 0.252) ;; give the server time to settle before starting the keep-running monitor. - (thread-start! th3) - (set! *didsomething* #t) - (thread-join! th2) - (exit)))) - -;; Generate a unique signature for this server -(define (server:mk-signature) - (message-digest-string (md5-primitive) - (with-output-to-string - (lambda () - (write (list (current-directory) - (current-process-id) - (argv))))))) - -(define (server:get-client-signature) - (if *my-client-signature* *my-client-signature* - (let ((sig (server:mk-signature))) - (set! *my-client-signature* sig) - *my-client-signature*))) - -;; run ping in separate process, safest way in some cases -;; -(define (server:ping-server ifaceport) - (with-input-from-pipe - (conc (common:get-megatest-exe) " -ping " ifaceport) - (lambda () - (let loop ((inl (read-line)) - (res "NOREPLY")) - (if (eof-object? inl) - (case (string->symbol res) - ((NOREPLY) #f) - ((LOGIN_OK) #t) - (else #f)) - (loop (read-line) inl)))))) - - -;;====================================================================== -;; S E R V E R -;;====================================================================== - -;; Call this to start the actual server -;; - -;; all routes though here end in exit ... -;; -;; start_server -;; -(define (server:launch dbname) - (http-transport:launch dbname)) - -;;====================================================================== -;; S E R V E R U T I L I T I E S -;;====================================================================== - -;; NOT USED (well, ok, reference in rpc-transport but otherwise not used). -;; -(define (server:login toppath) - (lambda (toppath) - (set! *db-last-access* (current-seconds)) ;; might not be needed. - (if (equal? *toppath* toppath) - #t - #f))) - -;; (define server:sync-lock-token "SERVER_SYNC_LOCK") -;; (define (server:release-sync-lock) -;; (db:no-sync-del! *no-sync-db* server:sync-lock-token)) -;; (define (server:have-sync-lock?) -;; (let* ((have-lock-pair (db:no-sync-get-lock *no-sync-db* server:sync-lock-token)) -;; (have-lock? (car have-lock-pair)) -;; (lock-time (cdr have-lock-pair)) -;; (lock-age (- (current-seconds) lock-time))) -;; (cond -;; (have-lock? #t) -;; ((>lock-age -;; (* 3 (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 180))) -;; (server:release-sync-lock) -;; (server:have-sync-lock?)) -;; (else #f)))) - ) Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -1181,12 +1181,12 @@ (seconds->hr-min-sec mod) (if alv "alive" "dead")) (if (and alv (args:get-arg "-kill-servers")) (begin - (debug:print-info 0 *default-log-port* "Attempting to kill server with pid " pid) - (server:kill server))))) + (debug:print-info 0 *default-log-port* "Attempting to kill server with pid " pid " !!needs completion!!") + #;(server:kill server))))) (sort servers (lambda (a b) (let ((ma (or (any->number (car a)) 9e9)) (mb (or (any->number (car b)) 9e9))) (> ma mb))))) ;; (debug:print-info 1 *default-log-port* "Done with listservers") Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -34,48 +34,70 @@ (declare (uses clientmod)) (module rmtmod * -(import scheme - chicken.random - chicken.string +(import scheme + chicken.base chicken.condition - chicken.sort - chicken.time - chicken.base chicken.file + chicken.file.posix chicken.format + chicken.io + chicken.pathname + chicken.port + chicken.pretty-print chicken.process - chicken.file.posix - chicken.process-context.posix chicken.process-context - chicken.io + chicken.process-context.posix + chicken.sort + chicken.string + chicken.tcp chicken.random + chicken.time + chicken.time.posix + (prefix sqlite3 sqlite3:) + directory-utils + http-client + intarweb + matchable + md5 + message-digest + (prefix base64 base64:) (prefix sqlite3 sqlite3:) - typed-records + regex + s11n + spiffy + spiffy-directory-listing + spiffy-request-vars srfi-1 srfi-13 srfi-18 srfi-69 - commonmod + stack + system-information + typed-records + uri-common + z3 + apimod - itemsmod - http-client - debugprint - mtver - regex - tasksmod - pgdb - (prefix mtargs args:) + clientmod + commonmod + configfmod dbmod + debugprint http-transportmod + itemsmod + mtver + pgdb + pkts + portloggermod + (prefix mtargs args:) servermod - clientmod - configfmod - + stml2 + tasksmod ) (defstruct alldat (areapath #f) (ulexdat #f) @@ -174,10 +196,11 @@ ;; (define (rmt:conn->uri conn entrypoint) (conc "http://"(rmt:conn-ipaddr conn)":"(rmt:conn-port conn)"/"entrypoint)) ;; set up the api proc, seems like there should be a better place for this? +(define api-proc (make-parameter conc)) (api-proc api:process-request) ;; do we have a connection to apath dbname and ;; is it not expired? then return it ;; @@ -1577,11 +1600,11 @@ (let* ((sdat (servdat-init #f host port server-id))) (rmt:send-receive sdat 'ping '()))) ;; ping the given server ;; -(define (server:check-server server-record) +#;(define (server:check-server server-record) (let* ((server-url (server:record->url server-record)) (server-id (server:record->id server-record)) (res (server:ping server-url server-id))) (if res server-url @@ -1723,7 +1746,848 @@ ;; (server:kind-run areapath) (server:start-and-wait areapath) (debug:print-info 0 *default-log-port* "client:setup, no server registered, remaining-tries=" remaining-tries) (thread-sleep! 1) ;; (+ 5 (pseudo-random-integer (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. (client:setup-http areapath remaining-tries: (- remaining-tries 1))))))))) + + +;;====================================================================== +;; http-transportmod.scm contents moved here +;;====================================================================== + +;; (require-extension (srfi 18) extras tcp s11n) +;; +;; +;; (use srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest posix-extras) +;; +;; (use spiffy uri-common intarweb http-client spiffy-request-vars intarweb spiffy-directory-listing) +;; +;; Configurations for server +(tcp-buffer-size 2048) +(max-connections 2048) + +(defstruct servdat + (host #f) + (port #f) + (uuid #f) + (dbfile #f) + (api-url #f) + (api-uri #f) + (api-req #f)) + +(define (servdat->url sdat) + (conc (servdat-host sdat)":"(servdat-port sdat))) + +(define (http-transport:make-server-url hostport) + (if (not hostport) + #f + (conc "http://" (car hostport) ":" (cadr hostport)))) + +;;====================================================================== +;; S E R V E R +;; ====================================================================== + +;; NOTE: http-transport:launch is the entry point +;; -> http-transport:run +;; -> http-transport:try-start-server -> http-transport:try-start-server (until success) + +(define (http-get-function fnkey) + (hash-table-ref/default *http-functions* fnkey (lambda () "nothing here yet"))) + +(define (http-handle-api dbstruct $) + (if (api-proc) + ((api-proc) dbstruct $) ;; ($) => alist + 'no-api-proc-set)) + +(define (http-transport:run hostn) + ;; Configurations for server + (tcp-buffer-size 2048) + (max-connections 2048) + (debug:print 2 *default-log-port* "Attempting to start the server ...") + (let* ((db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily + (hostname (get-host-name)) + (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + (server:get-best-guess-address hostname) + #f))) + (if ipstr ipstr hostn))) ;; hostname))) + (start-port (portlogger:open-run-close portlogger:find-port)) + (link-tree-path (common:get-linktree)) + (tmp-area (common:get-db-tmp-area)) + #;(start-file (conc tmp-area "/.server-start"))) + (debug:print-info 0 *default-log-port* "portlogger recommended port: " start-port) + ;; set some parameters for the server + (root-path (if link-tree-path + link-tree-path + (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP! + (handle-directory spiffy-directory-listing) + #;(handle-exception (lambda (exn chain) + (signal (make-composite-condition + (make-property-condition + 'server + 'message "server error"))))) + + ;; Setup the web server and a /ctrl interface + ;; + (vhost-map `(((* any) . ,(lambda (continue) + ;; open the db on the first call + ;; This is were we set up the database connections + (let* (($ (request-vars source: 'both)) + ;; (dat ($ 'dat)) + (res #f)) + (cond + ((equal? (uri-path (request-uri (current-request))) + '(/ "api")) + (debug:print 0 *default-log-port* "In api request $=" $) + (send-response ;; the $ is the request vars proc + body: (http-handle-api *dbstruct-db* $) + headers: '((content-type text/plain))) + (set! *db-last-access* (current-seconds))) + ((equal? (uri-path (request-uri (current-request))) + '(/ "ping")) + (send-response body: (conc *toppath*"/"(args:get-arg "-db")) + headers: '((content-type text/plain)))) + ((equal? (uri-path (request-uri (current-request))) + '(/ "loop-test")) + (send-response body: (alist-ref 'data ($)) + headers: '((content-type text/plain)))) + ((equal? (uri-path (request-uri (current-request))) + '(/ "")) + (send-response body: ((http-get-function 'http-transport:main-page)))) + ((equal? (uri-path (request-uri (current-request))) + '(/ "json_api")) + (send-response body: ((http-get-function 'http-transport:main-page)))) + ((equal? (uri-path (request-uri (current-request))) + '(/ "runs")) + (send-response body: ((http-get-function 'http-transport:main-page)))) + ((equal? (uri-path (request-uri (current-request))) + '(/ any)) + (send-response body: "hey there!\n" + headers: '((content-type text/plain)))) + ((equal? (uri-path (request-uri (current-request))) + '(/ "hey")) + (send-response body: "hey there!\n" + headers: '((content-type text/plain)))) + ((equal? (uri-path (request-uri (current-request))) + '(/ "jquery3.1.0.js")) + (send-response body: ((http-get-function 'http-transport:show-jquery)) + headers: '((content-type application/javascript)))) + ((equal? (uri-path (request-uri (current-request))) + '(/ "test_log")) + (send-response body: ((http-get-function 'http-transport:html-test-log) $) + headers: '((content-type text/HTML)))) + ((equal? (uri-path (request-uri (current-request))) + '(/ "dashboard")) + (send-response body: ((http-get-function 'http-transport:html-dboard) $) + headers: '((content-type text/HTML)))) + (else (continue)))))))) + #;(handle-exceptions + exn + (debug:print 0 *default-log-port* "Failed to create file " start-file ", exn=" exn) + (with-output-to-file start-file (lambda ()(print (current-process-id))))) + (http-transport:try-start-server ipaddrstr start-port))) + +;; This is recursively run by http-transport:run until sucessful +;; +(define (http-transport:try-start-server ipaddrstr portnum) + (let ((config-hostname (configf:lookup *configdat* "server" "hostname")) + (config-use-proxy (equal? (configf:lookup *configdat* "client" "use-http_proxy") "yes"))) + (if (not config-use-proxy) + (determine-proxy (constantly #f))) + (debug:print-info 0 *default-log-port* "http-transport:try-start-server time=" (seconds->time-string (current-seconds)) " ipaddrsstr=" ipaddrstr " portnum=" portnum " config-hostname=" config-hostname) + (handle-exceptions + exn + (begin + (print-error-message exn) + (if (< portnum 64000) + (begin + (debug:print 0 *default-log-port* "WARNING: attempt to start server failed. Trying again ...") + (debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn)) + (debug:print 5 *default-log-port* "exn=" (condition->list exn)) + (portlogger:open-run-close portlogger:set-failed portnum) + (debug:print 0 *default-log-port* "WARNING: failed to start on portnum: " portnum ", trying next port") + (thread-sleep! 0.1) + + ;; get_next_port goes here + (http-transport:try-start-server ipaddrstr + (portlogger:open-run-close portlogger:find-port))) + (begin + (print "ERROR: Tried and tried but could not start the server")))) + ;; any error in following steps will result in a retry + (set! *server-info* (make-servdat host: ipaddrstr port: portnum)) + (debug:print 0 *default-log-port* "INFO: Trying to start server on " ipaddrstr ":" portnum) + ;; This starts the spiffy server + ;; NEED WAY TO SET IP TO #f TO BIND ALL + ;; (start-server bind-address: ipaddrstr port: portnum) + (if config-hostname ;; this is a hint to bind directly + (start-server port: portnum bind-address: (if (equal? config-hostname "-") + ipaddrstr + config-hostname)) + (start-server port: portnum)) + (portlogger:open-run-close portlogger:set-port portnum "released") + (debug:print 1 *default-log-port* "INFO: server has been stopped")))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +;;====================================================================== +;; C L I E N T S +;;====================================================================== + + +(define (http-transport:get-time-to-cleanup) + (let ((res #f)) + (mutex-lock! *http-mutex*) + (set! res (> (current-seconds) *http-connections-next-cleanup*)) + (mutex-unlock! *http-mutex*) + res)) + +(define (http-transport:inc-requests-count) + (mutex-lock! *http-mutex*) + (set! *http-requests-in-progress* (+ 1 *http-requests-in-progress*)) + ;; Use this opportunity to slow things down iff there are too many requests in flight + (if (> *http-requests-in-progress* 5) + (begin + (debug:print-info 0 *default-log-port* "Whoa there buddy, ease up...") + (thread-sleep! 1))) + (mutex-unlock! *http-mutex*)) + +(define (http-transport:dec-requests-count proc) + (mutex-lock! *http-mutex*) + (proc) + (set! *http-requests-in-progress* (- *http-requests-in-progress* 1)) + (mutex-unlock! *http-mutex*)) + +(define (http-transport:dec-requests-count-and-close-all-connections) + (set! *http-requests-in-progress* (- *http-requests-in-progress* 1)) + (let loop ((etime (+ (current-seconds) 5))) ;; give up in five seconds + (if (> *http-requests-in-progress* 0) + (if (> etime (current-seconds)) + (begin + (thread-sleep! 0.052) + (loop etime)) + (debug:print-error 0 *default-log-port* "requests still in progress after 5 seconds of waiting. I'm going to pass on cleaning up http connections")) + (close-idle-connections!))) + (set! *http-connections-next-cleanup* (+ (current-seconds) 10)) + (mutex-unlock! *http-mutex*)) + +(define (http-transport:inc-requests-and-prep-to-close-all-connections) + (mutex-lock! *http-mutex*) + (set! *http-requests-in-progress* (+ 1 *http-requests-in-progress*))) + +;; serverdat contains uuid to be used for connection validation +;; +;; NOTE: serverdat must be initialized or created by servdat-init +;; +;; DO NOT USE. Moved to rmt:set-receive-real +;; +;; (define (http-transport:send-receive conn qry-key cmd params #!key (numretries 3)) +;; (let* ((res #f) +;; (success #t) +;; (sparams (with-output-to-string +;; (lambda ()(write params))))) +;; ;; send the data and get the response extract the needed info from +;; ;; the http data and process and return it. +;; (let* ((send-recieve (lambda () +;; (set! res +;; (with-input-from-request +;; (rmt:conn->uri conn "api") +;; (list (cons 'key qry-key) +;; ;; (cons 'srvid (servdat-uuid sdat)) +;; (cons 'cmd cmd) +;; (cons 'params sparams)) +;; read-string)))) +;; (time-out (lambda () +;; (thread-sleep! 45) +;; (debug:print 0 *default-log-port* "WARNING: send-receive took more than 45 seconds!!") +;; #f)) +;; (th1 (make-thread send-recieve "with-input-from-request")) +;; (th2 (make-thread time-out "time out"))) +;; (thread-start! th1) +;; (thread-start! th2) +;; (thread-join! th1) +;; (close-idle-connections!) +;; (thread-terminate! th2) +;; (if (string? res) +;; (with-input-from-string res +;; (lambda () read)) +;; res)))) + +;; careful closing of connections stored in *runremote* +;; +(define (http-transport:close-connections #!key (area-dat #f)) + (debug:print-info 0 *default-log-port* "http-transport:close-connections doesn't do anything now!")) +;; (let* ((runremote (or area-dat *runremote*)) +;; (server-dat (if runremote +;; (remote-conndat runremote) +;; #f))) ;; (hash-table-ref/default *runremote* run-id #f))) +;; (if (vector? server-dat) +;; (let ((api-dat (http-transport:server-dat-get-api-uri server-dat))) +;; (handle-exceptions +;; exn +;; (begin +;; (print-call-chain *default-log-port*) +;; (debug:print-error 0 *default-log-port* " closing connection failed with error: " ((condition-property-accessor 'exn 'message) exn) ", exn=" exn)) +;; (close-connection! api-dat) +;; ;;(close-idle-connections!) +;; #t)) +;; #f))) + + +(define (make-http-transport:server-dat)(make-vector 6)) +(define (http-transport:server-dat-get-iface vec) (vector-ref vec 0)) +(define (http-transport:server-dat-get-port vec) (vector-ref vec 1)) +(define (http-transport:server-dat-get-api-uri vec) (vector-ref vec 2)) +(define (http-transport:server-dat-get-api-url vec) (vector-ref vec 3)) +(define (http-transport:server-dat-get-api-req vec) (vector-ref vec 4)) +(define (http-transport:server-dat-get-last-access vec) (vector-ref vec 5)) +;(define (http-transport:server-dat-get-socket vec) (vector-ref vec 6)) +(define (http-transport:server-dat-get-server-id vec) (vector-ref vec 6)) + +(define (http-transport:server-dat-make-url vec) + (if (and (http-transport:server-dat-get-iface vec) + (http-transport:server-dat-get-port vec)) + (conc "http://" + (http-transport:server-dat-get-iface vec) + ":" + (http-transport:server-dat-get-port vec)) + #f)) + +(define (http-transport:server-dat-update-last-access vec) + (if (vector? vec) + (vector-set! vec 5 (current-seconds)) + (begin + (print-call-chain (current-error-port)) + (debug:print-error 0 *default-log-port* "call to http-transport:server-dat-update-last-access with non-vector!!")))) + +;; initialize servdat for client side, setup needed parameters +;; pass in #f as sdat-in to create sdat +;; +(define (servdat-init sdat-in iface port uuid) + (let* ((sdat (or sdat-in (make-servdat)))) + (if uuid (servdat-uuid-set! sdat uuid)) + (servdat-host-set! sdat iface) + (servdat-port-set! sdat port) + (servdat-api-url-set! sdat (conc "http://" iface ":" port "/api")) + (servdat-api-uri-set! sdat (uri-reference (servdat-api-url sdat))) + (servdat-api-req-set! sdat (make-request method: 'POST + uri: (servdat-api-uri sdat))) + ;; set up the http-client parameters + (max-retry-attempts 1) + ;; consider all requests indempotent + (retry-request? (lambda (request) + #f)) + (determine-proxy (constantly #f)) + sdat)) + +;;====================================================================== +;; NEW SERVER METHOD +;;====================================================================== + +;; only use for main.db - need to re-write some of this :( +;; +(define (get-lock-db sdat dbfile) + (let* ((dbh (db:open-run-db dbfile db:initialize-db)) + (res (db:get-iam-server-lock dbh dbfile))) + (sqlite3:finalize! dbh) + res)) + + +(define *srvpktspec* + `((server (host . h) + (port . p) + (servkey . k) + (pid . i) + (ipaddr . a) + (dbpath . d)))) + +(define (register-server pkts-dir pkt-spec host port servkey ipaddr dbpath) + (let* ((pkt-dat `((host . ,host) + (port . ,port) + (servkey . ,servkey) + (pid . ,(current-process-id)) + (ipaddr . ,ipaddr) + (dbpath . ,dbpath))) + (uuid (write-alist->pkt + pkts-dir + pkt-dat + pktspec: pkt-spec + ptype: 'server))) + (debug:print 0 *default-log-port* "Server on "host":"port" registered in pkt "uuid) + uuid)) + +(define (get-pkts-dir #!optional (apath #f)) + (let* ((effective-toppath (or *toppath* apath))) + (assert effective-toppath + "ERROR: get-pkts-dir called without *toppath* set. Exiting.") + (let* ((pdir (conc effective-toppath "/.meta/srvpkts"))) + (if (file-exists? pdir) + pdir + (begin + (create-directory pdir #t) + pdir))))) + +;; given a pkts dir read +;; +(define (get-all-server-pkts pktsdir-in pktspec) + (let* ((pktsdir (if (file-exists? pktsdir-in) + pktsdir-in + (begin + (create-directory pktsdir-in #t) + pktsdir-in))) + (all-pkt-files (glob (conc pktsdir "/*.pkt")))) + (map (lambda (pkt-file) + (read-pkt->alist pkt-file pktspec: pktspec)) + all-pkt-files))) + +(define (server-address srv-pkt) + (conc (alist-ref 'host srv-pkt) ":" + (alist-ref 'port srv-pkt))) + +(define (server-ready? host port key) ;; server-address is host:port + ;; ping the server and ask it + ;; if it ready + ;; (let* ((sdat (servdat-init #f host port #f))) + ;; (http-transport:send-receive sdat "abc" 'ping '()))) + (let* ((res (with-input-from-request + (conc "http://"host":"port"/ping") ;; returns *toppath*/dbname + #f + read-string))) + (if (equal? res key) + #t + (begin + (debug:print-info 0 *default-log-port* "server-ready? key="key", received="res) + #f)))) + +(define (loop-test host port data) ;; server-address is host:port + ;; ping the server and ask it + ;; if it ready + ;; (let* ((sdat (servdat-init #f host port #f))) + ;; (http-transport:send-receive sdat "abc" 'ping '()))) + (let* ((payload (sexpr->string data)) + (res (with-input-from-request + (conc "http://"host":"port"/loop-test") + `((data . ,payload)) + read-string))) + (string->sexpr res))) + +; from the pkts return servers associated with dbpath +;; NOTE: Only one can be alive - have to check on each +;; in the list of pkts returned +;; +(define (get-viable-servers serv-pkts dbpath) + (let loop ((tail serv-pkts) + (res '())) + (if (null? tail) + res ;; NOTE: sort by age so oldest is considered first + (let* ((spkt (car tail))) + (loop (cdr tail) + (if (equal? dbpath (alist-ref 'dbpath spkt)) + (cons spkt res) + res)))))) + +;; from viable servers get one that is alive and ready +;; +(define (get-the-server serv-pkts) + (let loop ((tail serv-pkts)) + (if (null? tail) + #f + (let* ((spkt (car tail)) + (host (alist-ref 'ipaddr spkt)) + (port (alist-ref 'port spkt)) + (dbpth (alist-ref 'dbpath spkt)) + (addr (server-address spkt))) + (if (server-ready? host port dbpth) + spkt + (loop (cdr tail))))))) + +;; am I the "first" in line server? I.e. my D card is smallest +;; use Z card as tie breaker +;; +(define (get-best-candidate serv-pkts dbpath) + (if (null? serv-pkts) + #f + (let loop ((tail serv-pkts) + (best (car serv-pkts))) + (if (null? tail) + best + (let* ((candidate (car tail)) + (candidate-bd (string->number (alist-ref 'D candidate))) + (best-bd (string->number (alist-ref 'D best))) + ;; bigger number is younger + (candidate-z (alist-ref 'Z candidate)) + (best-z (alist-ref 'Z best)) + (new-best (cond + ((> best-bd candidate-bd) ;; best is younger than candidate + candidate) + ((< best-bd candidate-bd) ;; candidate is younger than best + best) + (else + (if (string>=? best-z candidate-z) + best + candidate))))) ;; use Z card as tie breaker + (if (null? tail) + new-best + (loop (cdr tail) new-best))))))) + + +;;====================================================================== +;; END NEW SERVER METHOD +;;====================================================================== + +(define (http-transport:wait-for-server pkts-dir db-file server-key) + (let* ((sdat *server-info*)) + (let loop ((start-time (current-seconds)) + (changed #t) + (last-sdat "not this")) + (begin ;; let ((sdat #f)) + (thread-sleep! 0.01) + (debug:print-info 0 *default-log-port* "Waiting for server alive signature") + (mutex-lock! *heartbeat-mutex*) + (set! sdat *server-info*) + (mutex-unlock! *heartbeat-mutex*) + (if (and sdat + (not changed) + (> (- (current-seconds) start-time) 2)) + (begin + (debug:print-info 0 *default-log-port* "Received server alive signature, now attempting to lock in server") + ;; create a server pkt in *toppath*/.meta/srvpkts + + ;; TODO: + ;; 1. change sdat to stuct + ;; 2. add uuid to struct + ;; 3. update uuid in sdat here + ;; + (servdat-uuid-set! sdat + (register-server + pkts-dir *srvpktspec* + (get-host-name) + (servdat-port sdat) server-key + (servdat-host sdat) db-file)) + + ;; now read pkts and see if we are a contender + (let* ((all-pkts (get-all-server-pkts pkts-dir *srvpktspec*)) + (viables (get-viable-servers all-pkts db-file)) + (best-srv (get-best-candidate viables db-file)) + (best-srv-key (if best-srv (alist-ref 'servkey best-srv) #f))) + (debug:print 0 *default-log-port* "best-srv-key: "best-srv-key", server-key: "server-key) + ;; am I the best-srv, compare server-keys to know + (if (equal? best-srv-key server-key) + (if (get-lock-db sdat db-file) ;; (db:get-iam-server-lock *dbstruct-db* *toppath* run-id) + (begin + (debug:print 0 *default-log-port* "I'm the server!") + (servdat-dbfile-set! sdat db-file)) + (begin + (debug:print 0 *default-log-port* "I'm not the server, exiting.") + (bdat-time-to-exit-set! *bdat* #t) + (thread-sleep! 0.2) + (exit))) + (begin + (debug:print 0 *default-log-port* + "Keys do not match "best-srv-key", "server-key", exiting.") + (bdat-time-to-exit-set! *bdat* #t) + (thread-sleep! 0.2) + (exit))) + sdat)) + (begin ;; sdat not yet contains server info + (debug:print-info 0 *default-log-port* "Still waiting, last-sdat=" last-sdat) + (sleep 4) + (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes + (begin + (debug:print-error 0 *default-log-port* "transport appears to have died, exiting server") + (exit)) + (loop start-time + (equal? sdat last-sdat) + sdat)))))))) + +;; run http-transport:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (http-transport:keep-running dbname) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server") + (let* ((run-id (let ((rid (args:get-arg "-run-id"))) ;; consider getting rid of the -run-id mechanism + (if rid ;; replace with -db + (string->number rid) + #f))) + (db-file (if dbname + (db:dbname->path *toppath* dbname) + (db:run-id->path *toppath* run-id))) + (sdat #f) + ;; (tmp-area (common:get-db-tmp-area)) + (server-start-time (current-seconds)) + (pkts-dir (get-pkts-dir)) + (server-key (server:mk-signature)) + (server-info (http-transport:wait-for-server pkts-dir db-file server-key )) + (iface (servdat-host server-info)) + (port (servdat-port server-info)) + (last-access 0) + (server-timeout (server:expiration-timeout)) + (server-log-file (args:get-arg "-log"))) ;; always set when we are a server + + (let loop ((count 0) + (server-state 'available) + (bad-sync-count 0) + (start-time (current-milliseconds))) + ;; Use this opportunity to sync the tmp db to megatest.db NOTE: This conflicts with the watchdog syncing? + (if (not *dbstruct-db* ) + (let ((watchdog (bdat-watchdog *bdat*))) + (debug:print 0 *default-log-port* "SERVER: dbprep") + + (db:setup dbname) ;; sets *dbstruct-db* as side effect + ;; NOW REGISTER THE SERVER in main.db + + + + + + + + + + + + + + + + + + + + + (debug:print 0 *default-log-port* "SERVER: running, megatest version: " (common:get-full-version)) ;; NOTE: the server is NOT yet marked as running in the log. We do that in the keep-running routine. + (if watchdog + (if (not (member (thread-state watchdog) '(ready running blocked sleeping dead))) + (begin + (debug:print-info 0 *default-log-port* "Starting watchdog thread (in state "(thread-state watchdog)")") + (thread-start! watchdog))) + (debug:print 0 *default-log-port* "ERROR: *watchdog* not setup, cannot start it.")))) + + ;; when things go wrong we don't want to be doing the various queries too often + ;; so we strive to run this stuff only every four seconds or so. + (let* ((sync-time (- (current-milliseconds) start-time)) + (rem-time (quotient (- 4000 sync-time) 1000))) + (if (and (<= rem-time 4) + (> rem-time 0)) + (thread-sleep! rem-time))) + + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1) 'running bad-sync-count (current-milliseconds))) + + ;; Check that iface and port have not changed (can happen if server port collides) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *server-info*) + (mutex-unlock! *heartbeat-mutex*) + + (if (or (not (equal? (servdat-host sdat) iface)) + (not (equal? (servdat-port sdat) port))) + (let ((new-iface (servdat-host sdat)) + (new-port (servdat-port sdat))) + (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info") + (set! iface new-iface) + (set! port new-port) + (if (not *server-id*) + (set! *server-id* (server:mk-signature))) + ;; (debug:print 0 *default-log-port* (current-seconds) (current-directory) (current-process-id) (argv)) + (debug:print 0 *default-log-port* "SERVER STARTED: " iface ":" port " AT " (current-seconds) " server-id: " *server-id*) + (flush-output *default-log-port*))) + + ;; Transfer *db-last-access* to last-access to use in checking that we are still alive + (mutex-lock! *heartbeat-mutex*) + (set! last-access *db-last-access*) + (mutex-unlock! *heartbeat-mutex*) + + (if (common:low-noise-print 120 (conc "server running on " iface ":" port)) + (begin + (if (not *server-id*) + (set! *server-id* (server:mk-signature))) + ;; (debug:print 0 *default-log-port* (current-seconds) (current-directory) (current-process-id) (argv)) + (debug:print 0 *default-log-port* "SERVER STARTED: " iface ":" port " AT " (current-seconds) " server-id: " *server-id*) + (flush-output *default-log-port*))) + (if (common:low-noise-print 60 "dbstats") + (begin + (debug:print 0 *default-log-port* "Server stats:") + (db:print-current-query-stats))) + (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600))) + (cond + ((and *server-run* + (> (+ last-access server-timeout) + (current-seconds))) + (if (common:low-noise-print 120 "server continuing") + (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (let ((curr-time (current-seconds))) + (handle-exceptions + exn + (debug:print 0 *default-log-port* "ERROR: Failed to change timestamp on log file " server-log-file ". Are you out of space on that disk? exn=" exn) + (if (and server-log-file (not *server-overloaded*)) + (set-file-times! server-log-file curr-time curr-time))))) + (loop 0 server-state bad-sync-count (current-milliseconds))) + (else + (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access)) + (http-transport:server-shutdown port))))))) + +(define (http-transport:server-shutdown port) + (begin + ;;(BB> "http-transport:server-shutdown called") + (debug:print-info 0 *default-log-port* "Starting to shutdown the server. pid="(current-process-id)) + ;; + ;; start_shutdown + ;; + + ;; deregister the server + + + (bdat-time-to-exit-set! *bdat* #t) ;; tell on-exit to be fast as we've already cleaned up + (portlogger:open-run-close portlogger:set-port port "released") + (thread-sleep! 1) + + ;; (debug:print-info 0 *default-log-port* "Max cached queries was " *max-cache-size*) + ;; (debug:print-info 0 *default-log-port* "Number of cached writes " *number-of-writes*) + ;; (debug:print-info 0 *default-log-port* "Average cached write time " + ;; (if (eq? *number-of-writes* 0) + ;; "n/a (no writes)" + ;; (/ *writes-total-delay* + ;; *number-of-writes*)) + ;; " ms") + ;; (debug:print-info 0 *default-log-port* "Number non-cached queries " *number-non-write-queries*) + ;; (debug:print-info 0 *default-log-port* "Average non-cached time " + ;; (if (eq? *number-non-write-queries* 0) + ;; "n/a (no queries)" + ;; (/ *total-non-write-delay* + ;; *number-non-write-queries*)) + ;; " ms") + + (db:print-current-query-stats) + (common:save-pkt `((action . exit) + (T . server) + (pid . ,(current-process-id))) + *configdat* #t) + (debug:print-info 0 *default-log-port* "Server shutdown complete. Exiting") + (exit))) + +;; Call this to start the actual server +;; + +;; all routes though here end in exit ... +;; +;; start_server? +;; +(define (http-transport:launch dbname) + (let* (;; (tmp-area (common:get-db-tmp-area)) + ;; (server-start (conc tmp-area "/.server-start")) + ;; (server-started (conc tmp-area "/.server-started")) + ;; (start-time (common:lazy-modification-time server-start)) + ;; (started-time (common:lazy-modification-time server-started)) + ;; (server-starting (< start-time started-time)) ;; if start-time is less than started-time then a server is still starting + ;; (start-time-old (> (- (current-seconds) start-time) 5)) + (cleanup-proc (lambda (msg) + (let* ((serv-fname (conc "server-" (current-process-id) "-" (get-host-name) ".log")) + (full-serv-fname (conc *toppath* "/logs/" serv-fname)) + (new-serv-fname (conc *toppath* "/logs/" "defunct-" serv-fname))) + (debug:print 0 *default-log-port* msg) + (if (common:file-exists? full-serv-fname) + (system (conc "sleep 1;mv -f " full-serv-fname " " new-serv-fname)) + (debug:print 0 *default-log-port* "INFO: cannot move " full-serv-fname " to " new-serv-fname)) + (exit))))) + #;(common:save-pkt `((action . start) + (T . server) + (pid . ,(current-process-id))) + *configdat* #t) + (let* ((th2 (make-thread (lambda () + (debug:print-info 0 *default-log-port* "Server run thread started") + (http-transport:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-") + )) "Server run")) + (th3 (make-thread (lambda () + (debug:print-info 0 *default-log-port* "Server monitor thread started") + (http-transport:keep-running dbname) + "Keep running")))) + (thread-start! th2) + (thread-sleep! 0.252) ;; give the server time to settle before starting the keep-running monitor. + (thread-start! th3) + (set! *didsomething* #t) + (thread-join! th2) + (exit)))) + +;; Generate a unique signature for this server +(define (server:mk-signature) + (message-digest-string (md5-primitive) + (with-output-to-string + (lambda () + (write (list (current-directory) + (current-process-id) + (argv))))))) + +(define (server:get-client-signature) + (if *my-client-signature* *my-client-signature* + (let ((sig (server:mk-signature))) + (set! *my-client-signature* sig) + *my-client-signature*))) + +;; run ping in separate process, safest way in some cases +;; +(define (server:ping-server ifaceport) + (with-input-from-pipe + (conc (common:get-megatest-exe) " -ping " ifaceport) + (lambda () + (let loop ((inl (read-line)) + (res "NOREPLY")) + (if (eof-object? inl) + (case (string->symbol res) + ((NOREPLY) #f) + ((LOGIN_OK) #t) + (else #f)) + (loop (read-line) inl)))))) + + +;;====================================================================== +;; S E R V E R +;;====================================================================== + +;; Call this to start the actual server +;; + +;; all routes though here end in exit ... +;; +;; start_server +;; +(define (server:launch dbname) + (http-transport:launch dbname)) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +;; NOT USED (well, ok, reference in rpc-transport but otherwise not used). +;; +#;(define (server:login toppath) + (lambda (toppath) + (set! *db-last-access* (current-seconds)) ;; might not be needed. + (if (equal? *toppath* toppath) + #t + #f))) + +;; (define server:sync-lock-token "SERVER_SYNC_LOCK") +;; (define (server:release-sync-lock) +;; (db:no-sync-del! *no-sync-db* server:sync-lock-token)) +;; (define (server:have-sync-lock?) +;; (let* ((have-lock-pair (db:no-sync-get-lock *no-sync-db* server:sync-lock-token)) +;; (have-lock? (car have-lock-pair)) +;; (lock-time (cdr have-lock-pair)) +;; (lock-age (- (current-seconds) lock-time))) +;; (cond +;; (have-lock? #t) +;; ((>lock-age +;; (* 3 (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 180))) +;; (server:release-sync-lock) +;; (server:have-sync-lock?)) +;; (else #f)))) + + ) Index: servermod.scm ================================================================== --- servermod.scm +++ servermod.scm @@ -57,11 +57,11 @@ http-transportmod pkts ) -(define (server:make-server-url hostport) +#;(define (server:make-server-url hostport) (if (not hostport) #f (conc "http://" (car hostport) ":" (cadr hostport)))) ;; reuse this for server load? @@ -77,11 +77,11 @@ ;; Given a run id start a server process ### NOTE ### > file 2>&1 ;; if the run-id is zero and the target-host is set ;; try running on that host ;; incidental: rotate logs in logs/ dir. ;; -(define (server:run areapath) ;; areapath is *toppath* for a given testsuite area +#;(define (server:run areapath) ;; areapath is *toppath* for a given testsuite area (let* ((curr-host (get-host-name)) (curr-ip (server:get-best-guess-address curr-host)) (curr-pid (current-process-id)) (testsuite (common:get-area-name)) (logfile (conc areapath "/logs/server.log")) @@ -105,11 +105,11 @@ (setenv "NBFAKE_LOG" logfile) (system (conc "nbfake " cmdln)) (unsetenv "NBFAKE_LOG") (pop-directory))) -(define (server:record->url servr) +#;(define (server:record->url servr) (handle-exceptions exn (begin (debug:print-info 0 *default-log-port* "Unable to get server url from " servr ", exn=" exn) #f) @@ -118,11 +118,11 @@ (if (and host port) (conc host ":" port) #f)))) -(define (server:kill servr) +#;(define (server:kill servr) (handle-exceptions exn (begin (debug:print-info 0 *default-log-port* "Unable to get host and/or port from " servr ", exn=" exn) #f)