Index: commonmod.scm ================================================================== --- commonmod.scm +++ commonmod.scm @@ -3284,11 +3284,26 @@ (create-directory pktsdir #t)) (with-output-to-file (conc pktsdir "/" uuid ".pkt") (lambda () (print pkt))))))))) - + +;;====================================================================== +;; use-lt is use linktree "lt" link to find pkts dir +(define (common:minimal-save-pkt pktalist pktsdir) + (let-values (((uuid pkt) + (alist->pkt pktalist common:pkts-spec))) + (handle-exceptions + exn + (debug:print-info 0 "failed to write out packet to " pktsdir ", exn=" exn) ;; don't care if this failed for now but MUST FIX - BUG!! + (if (not (file-exists? pktsdir)) + (create-directory pktsdir #t)) + (with-output-to-file + (conc pktsdir "/" uuid ".pkt") + (lambda () + (print pkt)))))) + ;; common:with-queue-db was here ;; common:load-pkts-to-db was here (define (common:get-pkt-alists pkts) (map (lambda (x) Index: http-transportmod.scm ================================================================== --- http-transportmod.scm +++ http-transportmod.scm @@ -25,10 +25,11 @@ (declare (uses mtargs)) (declare (uses mtver)) (declare (uses dbmod)) (declare (uses stml2)) (declare (uses portloggermod)) +(declare (uses pkts)) (module http-transportmod * (import scheme @@ -80,10 +81,11 @@ debugprint mtver dbmod stml2 portloggermod + pkts ) ;; (require-extension (srfi 18) extras tcp s11n) ;; @@ -94,37 +96,22 @@ ;; ;; Configurations for server (tcp-buffer-size 2048) (max-connections 2048) -;; (declare (unit http-transport)) -;; -;; (declare (uses common)) -;; (declare (uses db)) -;; (declare (uses tests)) -;; (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. -;; (declare (uses server)) -;; ;; (declare (uses daemon)) -;; (declare (uses portlogger)) -;; (declare (uses rmt)) -;; -;; (include "common_records.scm") -;; (include "db_records.scm") -;; (include "js-path.scm") - -;; (require-library stml) (define (http-transport:make-server-url hostport) (if (not hostport) #f (conc "http://" (car hostport) ":" (cadr hostport)))) ;;====================================================================== ;; S E R V E R ;; ====================================================================== -;; Call this to start the actual server -;; +;; NOTE: http-transport:launch is the entry point +;; -> http-transport:run +;; -> http-transport:try-start-server -> http-transport:try-start-server (until success) (define (http-get-function fnkey) (hash-table-ref/default *http-functions* fnkey (lambda () "nothing here yet"))) (define (http-transport:run hostn) @@ -201,11 +188,11 @@ ((equal? (uri-path (request-uri (current-request))) '(/ "dashboard")) (send-response body: ((http-get-function 'http-transport:html-dboard) $) headers: '((content-type text/HTML)))) (else (continue)))))))) - (handle-exceptions + #;(handle-exceptions exn (debug:print 0 *default-log-port* "Failed to create file " start-file ", exn=" exn) (with-output-to-file start-file (lambda ()(print (current-process-id))))) (http-transport:try-start-server ipaddrstr start-port))) @@ -451,10 +438,139 @@ (let* ((api-url (conc "http://" iface ":" port "/api")) (api-uri (uri-reference (conc "http://" iface ":" port "/api"))) (api-req (make-request method: 'POST uri: api-uri)) (server-dat (vector iface port api-uri api-url api-req (current-seconds) server-id))) server-dat)) + +;;====================================================================== +;; NEW SERVER METHOD +;;====================================================================== + +(define *srvpktspec* + `((server (host . h) + (port . p) + (servkey . k) + (pid . i) + (ipaddr . a) + (dbpath . d)))) + +(define (register-server pkts-dir pkt-spec host port servkey ipaddr dbpath) + (let* ((pkt-dat `((host . ,host) + (port . ,port) + (servkey . ,servkey) + (pid . ,(current-process-id)) + (ipaddr . ,ipaddr) + (dbpath . ,dbpath)))) + (write-alist->pkt + pkts-dir + pkt-dat + pktspec: pkt-spec + ptype: 'server))) + +;; ya, fake it for now +;; +(define (register-server-in-db db-file) + #t) + +;; load up the db into inmem +;; +(define (load-up-database db-file) + (let* ((db (db:open-db db-file))) + db)) + +(define (get-pkts-dir) + (assert *toppath* "ERROR: get-pkts-dir called without *toppath* set. Exiting.") + (let* ((pdir (conc *toppath* "/.meta/srvpkts"))) + (if (file-exists? pdir) + pdir + (begin + (create-directory pdir #t) + pdir)))) + +;; given a pkts dir read +;; +(define (get-all-server-pkts pktsdir-in pktspec) + (let* ((pktsdir (if (file-exists? pktsdir-in) + pktsdir-in + (begin + (create-directory pktsdir-in #t) + pktsdir-in))) + (all-pkt-files (glob (conc pktsdir "/*.pkt")))) + (map (lambda (pkt-file) + (read-pkt->alist pkt-file pktspec: pktspec)) + all-pkt-files))) + +(define (server-address srv-pkt) + (conc (alist-ref 'host srv-pkt) ":" + (alist-ref 'port srv-pkt))) + +(define (server-ready? server-address) + ;; ping the server and ask it + ;; if it ready + #f) + +;; from the pkts return servers associated with dbpath +;; NOTE: Only one can be alive - have to check on each +;; in the list of pkts returned +;; +(define (get-viable-servers serv-pkts dbpath) + (let loop ((tail serv-pkts) + (res '())) + (if (null? tail) + res ;; NOTE: sort by age so oldest is considered first + (let* ((spkt (car tail))) + (loop (cdr tail) + (if (equal? dbpath (alist-ref 'dbpath spkt)) + (cons spkt res) + res)))))) + +;; from viable servers get one that is alive and ready +;; +(define (get-the-server serv-pkts dbpath) + (let loop ((tail serv-pkts)) + (if (null? tail) + #f + (let* ((spkt (car tail)) + (addr (server-address spkt))) + (if (server-ready? addr) + spkt + (loop (cdr tail))))))) + +;; am I the "first" in line server? I.e. my D card is smallest +;; use Z card as tie breaker +;; +(define (get-best-candidate serv-pkts dbpath) + (if (null? serv-pkts) + #f + (let loop ((tail serv-pkts) + (best (car serv-pkts))) + (if (null? tail) + best + (let* ((candidate (car tail)) + (candidate-bd (string->number (alist-ref 'D candidate))) + (best-bd (string->number (alist-ref 'D best))) + ;; bigger number is younger + (candidate-z (alist-ref 'Z candidate)) + (best-z (alist-ref 'Z best)) + (new-best (cond + ((> best-bd candidate-bd) ;; best is younger than candidate + candidate) + ((< best-bd candidate-bd) ;; candidate is younger than best + best) + (else + (if (string>=? best-z candidate-z) + best + candidate))))) ;; use Z card as tie breaker + (if (null? tail) + new-best + (loop (cdr tail) new-best))))))) + + + +;;====================================================================== +;; END NEW SERVER METHOD +;;====================================================================== ;; run http-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (http-transport:keep-running) @@ -464,10 +580,13 @@ (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server") (let* ((sdat #f) (tmp-area (common:get-db-tmp-area)) (started-file (conc tmp-area "/.server-started")) (server-start-time (current-seconds)) + (pkts-dir (get-pkts-dir)) + (server-key (server:mk-signature)) + (db-file (conc *toppath* "/.db/" (or (args:get-arg "-db") "main.db"))) (server-info (let loop ((start-time (current-seconds)) (changed #t) (last-sdat "not this")) (begin ;; let ((sdat #f)) (thread-sleep! 0.01) @@ -477,25 +596,32 @@ (mutex-unlock! *heartbeat-mutex*) (if (and sdat (not changed) (> (- (current-seconds) start-time) 2)) (begin - (debug:print-info 0 *default-log-port* "Received server alive signature") - (common:save-pkt `((action . alive) - (T . server) - (pid . ,(current-process-id)) - (ipaddr . ,(car sdat)) - (port . ,(cadr sdat))) - *configdat* #t) - sdat) + (debug:print-info 0 *default-log-port* "Received server alive signature, now attempting to lock in server") + ;; create a server pkt in *toppath*/.meta/srvpkts + (register-server pkts-dir *srvpktspec* (get-host-name) (cadr sdat) server-key (car sdat) db-file) + + ;; now read pkts and see if we are a contender + (let* ((all-pkts (get-all-server-pkts pkts-dir *srvpktspec*)) + (viables (get-viable-servers all-pkts db-file)) + (best-srv (get-best-candidate viables db-file)) + (best-srv-key (if best-srv (alist-ref 'servkey best-srv) #f))) + ;; am I the best-srv, compare server-keys to know + (if (and (equal? best-srv-key server-key) + (register-server-in-db db-file)) + (load-up-database db-file) ;; ready to go! + (bdat-time-to-exit-set! *bdat* #t)) ;; nope, we are not needed, exit when can do + sdat)) (begin (debug:print-info 0 *default-log-port* "Still waiting, last-sdat=" last-sdat) (sleep 4) (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes (begin (debug:print-error 0 *default-log-port* "transport appears to have died, exiting server") - (common:save-pkt `((action . died) + #;(common:save-pkt `((action . died) (T . server) (pid . ,(current-process-id)) (ipaddr . ,(car sdat)) (port . ,(cadr sdat)) (msg . "Transport died?")) @@ -526,12 +652,14 @@ (debug:print 0 *default-log-port* "SERVER: dbprep") (set! *dbstruct-db* (db:setup #t)) ;; run-id)) (set! server-going #t) (debug:print 0 *default-log-port* "SERVER: running, megatest version: " (common:get-full-version)) ;; NOTE: the server is NOT yet marked as running in the log. We do that in the keep-running routine. (if watchdog - (if (not (member (thread-state watchdog) '(ready running blocked sleeping))) - (thread-start! watchdog)) + (if (not (member (thread-state watchdog) '(ready running blocked sleeping dead))) + (begin + (debug:print-info 0 "Starting watchdog thread (in state "(thread-state watchdog)")") + (thread-start! watchdog))) (debug:print 0 *default-log-port* "ERROR: *watchdog* not setup, cannot start it.")))) ;; when things go wrong we don't want to be doing the various queries too often ;; so we strive to run this stuff only every four seconds or so. (let* ((sync-time (- (current-milliseconds) start-time)) @@ -585,11 +713,11 @@ (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) (let ((curr-time (current-seconds))) (handle-exceptions exn (debug:print 0 *default-log-port* "ERROR: Failed to change timestamp on log file " server-log-file ". Are you out of space on that disk? exn=" exn) - (if (not *server-overloaded*) + (if (and server-log-file (not *server-overloaded*)) (set-file-times! server-log-file curr-time curr-time))))) (loop 0 server-state bad-sync-count (current-milliseconds))) (else (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access)) (http-transport:server-shutdown port))))))) @@ -626,17 +754,19 @@ (T . server) (pid . ,(current-process-id))) *configdat* #t) (debug:print-info 0 *default-log-port* "Server shutdown complete. Exiting") (exit))) + +;; Call this to start the actual server +;; ;; all routes though here end in exit ... ;; ;; start_server? ;; (define (http-transport:launch) - ;; check that a server start is in progress, pause or exit if so (let* ((tmp-area (common:get-db-tmp-area)) (server-start (conc tmp-area "/.server-start")) (server-started (conc tmp-area "/.server-started")) (start-time (common:lazy-modification-time server-start)) (started-time (common:lazy-modification-time server-started)) @@ -649,25 +779,14 @@ (debug:print 0 *default-log-port* msg) (if (common:file-exists? full-serv-fname) (system (conc "sleep 1;mv -f " full-serv-fname " " new-serv-fname)) (debug:print 0 *default-log-port* "INFO: cannot move " full-serv-fname " to " new-serv-fname)) (exit))))) - #;(if (and (not start-time-old) ;; last server start try was less than five seconds ago - (not server-starting)) - (begin - (cleanup-proc "NOT starting server, there is either a recently started server or a server in process of starting") - (exit))) - ;; lets not even bother to start if there are already three or more server files ready to go - #;(let* ((num-alive (server:get-num-alive (server:get-list *toppath*)))) - (if (> num-alive 3) - (begin - (cleanup-proc (conc "ERROR: Aborting server start because there are already " num-alive " possible servers either running or starting up")) - (exit)))) - (common:save-pkt `((action . start) - (T . server) - (pid . ,(current-process-id))) - *configdat* #t) + (common:save-pkt `((action . start) + (T . server) + (pid . ,(current-process-id))) + *configdat* #t) (let* ((th2 (make-thread (lambda () (debug:print-info 0 *default-log-port* "Server run thread started") (http-transport:run (if (args:get-arg "-server") (args:get-arg "-server") @@ -700,22 +819,10 @@ ;; "exit on ^C timer"))) ;; (thread-start! th2) ;; (thread-start! th1) ;; (thread-join! th2)))) - - -;; Get the transport -(define (server:get-transport) - (if *transport-type* - *transport-type* - (let ((ttype (string->symbol - (or (args:get-arg "-transport") - (configf:lookup *configdat* "server" "transport") - "rpc")))) - (set! *transport-type* ttype) - ttype))) ;; Generate a unique signature for this server (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string Index: servermod.scm ================================================================== --- servermod.scm +++ servermod.scm @@ -57,97 +57,10 @@ http-transportmod pkts ) -;;====================================================================== -;; NEW SERVER METHOD -;;====================================================================== - -(define *srvpktspec* - `((server (host . h) - (port . p) - (servkey . k) - (pid . i) - (ipaddr . a) - (dbpath . d)))) - -(define (register-server pkts-dir pkt-spec host port servkey ipaddr dbpath) - (let* ((pkt-dat `((host . ,host) - (port . ,port) - (servkey . ,servkey) - (pid . ,(current-process-id)) - (ipaddr . ,ipaddr) - (dbpath . ,dbpath)))) - (write-alist->pkt - pkts-dir - pkt-dat - pktspec: pkt-spec - ptype: 'server))) - -(define (get-pkts-dir) - (assert *toppath* "ERROR: get-pkts-dir called without *toppath* set. Exiting.") - (let* ((pdir (conc *toppath* "/.pkts"))) - (if (file-exists? pdir) - pdir - (begin - (create-directory pdir #t) - pdir)))) - -;; given a pkts dir read -;; -(define (get-all-server-pkts pktsdir-in pktspec) - (let* ((pktsdir (if (file-exists? pktsdir-in) - pktsdir-in - (begin - (create-directory pktsdir-in #t) - pktsdir-in))) - (all-pkt-files (glob (conc pktsdir "/*.pkt")))) - (map (lambda (pkt-file) - (read-pkt->alist pkt-file pktspec: pktspec)) - all-pkt-files))) - -(define (server-address srv-pkt) - (conc (alist-ref 'host srv-pkt) ":" - (alist-ref 'port srv-pkt))) - -(define (server-ready? server-address) - ;; ping the server and ask it - ;; if it ready - #f) - -;; from the pkts return servers associated with dbpath -;; NOTE: Only one can be alive - have to check on each -;; in the list of pkts returned -;; -(define (get-viable-servers serv-pkts dbpath) - (let loop ((tail serv-pkts) - (res '())) - (if (null? tail) - res ;; NOTE: sort by age so oldest is considered first - (let* ((spkt (car tail))) - (loop (cdr tail) - (if (equal? dbpath (alist-ref 'dbpath spkt)) - (cons spkt res) - res)))))) - -;; from viable servers get one that is alive and ready -;; -(define (get-the-server serv-pkts dbpath) - (let loop ((tail serv-pkts)) - (if (null? tail) - #f - (let* ((spkt (car tail)) - (addr (server-address spkt))) - (if (server-ready? addr) - spkt - (loop (cdr tail))))))) - -;;====================================================================== -;; END NEW SERVER METHOD -;;====================================================================== - (define (server:make-server-url hostport) (if (not hostport) #f (conc "http://" (car hostport) ":" (cadr hostport))))