@@ -25,10 +25,11 @@ (declare (uses db)) (declare (uses tests)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (declare (uses server)) (declare (uses daemon)) +(declare (uses portlogger)) (include "common_records.scm") (include "db_records.scm") (define (http-transport:make-server-url hostport) @@ -67,13 +68,14 @@ (ipaddrstr (let ((ipstr (if (string=? "-" hostn) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") (server:get-best-guess-address hostname) #f))) (if ipstr ipstr hostn))) ;; hostname))) - (start-port (open-run-close tasks:server-get-next-port tasks:open-db)) + (start-port (portlogger:open-run-close portlogger:find-port)) (link-tree-path (configf:lookup *configdat* "setup" "linktree"))) - (set! db *inmemdb*) + ;; (set! db *inmemdb*) + (debug:print-info 0 "portlogger recommended port: " start-port) (root-path (if link-tree-path link-tree-path (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP! (handle-directory spiffy-directory-listing) ;; http-transport:handle-directory) ;; simple-directory-handler) @@ -86,11 +88,11 @@ (dat ($ 'dat)) (res #f)) (cond ((equal? (uri-path (request-uri (current-request))) '(/ "api")) - (send-response body: (api:process-request db $) ;; the $ is the request vars proc + (send-response body: (api:process-request *inmemdb* $) ;; the $ is the request vars proc headers: '((content-type text/plain))) (mutex-lock! *heartbeat-mutex*) (set! *last-db-access* (current-seconds)) (mutex-unlock! *heartbeat-mutex*)) ((equal? (uri-path (request-uri (current-request))) @@ -111,38 +113,52 @@ (http-transport:try-start-server run-id ipaddrstr start-port server-id))) ;; This is recursively run by http-transport:run until sucessful ;; (define (http-transport:try-start-server run-id ipaddrstr portnum server-id) - (handle-exceptions - exn - (begin - (print-error-message exn) - (if (< portnum 9000) - (begin - (debug:print 0 "WARNING: failed to start on portnum: " portnum ", trying next port") - (thread-sleep! 0.1) - - ;; get_next_port goes here - - (http-transport:try-start-server run-id ipaddrstr (+ portnum 1) server-id)) - (begin - (open-run-close tasks:server-force-clean-run-record tasks:open-db run-id ipaddrstr portnum " http-transport:try-start-server") - (print "ERROR: Tried and tried but could not start the server")))) - ;; any error in following steps will result in a retry - (set! *server-info* (list ipaddrstr portnum)) - (open-run-close tasks:server-set-interface-port - tasks:open-db - server-id - ipaddrstr portnum) - (debug:print 1 "INFO: Trying to start server on " ipaddrstr ":" portnum) - ;; This starts the spiffy server - ;; NEED WAY TO SET IP TO #f TO BIND ALL - ;; (start-server bind-address: ipaddrstr port: portnum) - (start-server port: portnum) - (open-run-close tasks:server-force-clean-run-record tasks:open-db run-id ipaddrstr portnum " http-transport:try-start-server") - (debug:print 1 "INFO: server has been stopped"))) + (let ((config-hostname (configf:lookup *configdat* "server" "hostname")) + (tdbdat (tasks:open-db))) + (debug:print-info 0 "http-transport:try-start-server run-id=" run-id " ipaddrsstr=" ipaddrstr " portnum=" portnum " server-id=" server-id " config-hostname=" config-hostname) + (handle-exceptions + exn + (begin + (print-error-message exn) + (if (< portnum 64000) + (begin + (debug:print 0 "WARNING: attempt to start server failed. Trying again ...") + (debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) + (debug:print 0 "exn=" (condition->list exn)) + (portlogger:open-run-close portlogger:set-failed portnum) + (debug:print 0 "WARNING: failed to start on portnum: " portnum ", trying next port") + (thread-sleep! 0.1) + + ;; get_next_port goes here + (http-transport:try-start-server run-id + ipaddrstr + (portlogger:open-run-close portlogger:find-port) + server-id)) + (begin + (tasks:server-force-clean-run-record (db:delay-if-busy tdbdat) run-id ipaddrstr portnum " http-transport:try-start-server") + (print "ERROR: Tried and tried but could not start the server")))) + ;; any error in following steps will result in a retry + (set! *server-info* (list ipaddrstr portnum)) + (tasks:server-set-interface-port + (db:delay-if-busy tdbdat) + server-id + ipaddrstr portnum) + (debug:print 0 "INFO: Trying to start server on " ipaddrstr ":" portnum) + ;; This starts the spiffy server + ;; NEED WAY TO SET IP TO #f TO BIND ALL + ;; (start-server bind-address: ipaddrstr port: portnum) + (if config-hostname ;; this is a hint to bind directly + (start-server port: portnum bind-address: (if (equal? config-hostname "-") + ipaddrstr + config-hostname)) + (start-server port: portnum)) + ;; (portlogger:open-run-close portlogger:set-port portnum "released") + (tasks:server-force-clean-run-record (db:delay-if-busy tdbdat) run-id ipaddrstr portnum " http-transport:try-start-server") + (debug:print 1 "INFO: server has been stopped")))) ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== @@ -199,43 +215,72 @@ (mutex-lock! *http-mutex*) (set! *http-requests-in-progress* (+ 1 *http-requests-in-progress*))) ;; Send "cmd" with json payload "params" to serverdat and receive result ;; -(define (http-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 30)) - (let* ((fullurl (if (list? serverdat) - (list-ref serverdat 4) ;; (cadddr serverdat) ;; this is the uri for /api +(define (http-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 3)) + (let* ((fullurl (if (vector? serverdat) + (http-transport:server-dat-get-api-req serverdat) (begin (debug:print 0 "FATAL ERROR: http-transport:client-api-send-receive called with no server info") (exit 1)))) - (res #f)) - (handle-exceptions - exn - (if (> numretries 0) - (begin - (mutex-unlock! *http-mutex*) - (thread-sleep! 10) - (http-transport:client-api-send-receive run-id serverdat cmd params (- numretries 1))) - #f) - (begin + (res #f) + (success #t) + (sparams (db:obj->string params transport: 'http))) +;; (condition-case +;; handle-exceptions +;; exn +;; (if (> numretries 0) +;; (begin +;; (mutex-unlock! *http-mutex*) +;; (thread-sleep! 1) +;; (handle-exceptions +;; exn +;; (debug:print 0 "WARNING: closing connections failed. Server at " fullurl " almost certainly dead") +;; (close-all-connections!)) +;; (debug:print 0 "WARNING: Failed to communicate with server, trying again, numretries left: " numretries) +;; (http-transport:client-api-send-receive run-id serverdat cmd sparams numretries: (- numretries 1))) +;; (begin +;; (mutex-unlock! *http-mutex*) +;; (tasks:kill-server-run-id run-id) +;; #f)) +;; (begin (debug:print-info 11 "fullurl=" fullurl ", cmd=" cmd ", params=" params ", run-id=" run-id "\n") ;; set up the http-client here - (max-retry-attempts 5) + (max-retry-attempts 1) ;; consider all requests indempotent (retry-request? (lambda (request) - #t)) + #f)) ;; send the data and get the response ;; extract the needed info from the http data and ;; process and return it. (let* ((send-recieve (lambda () (mutex-lock! *http-mutex*) - (set! res (with-input-from-request ;; was dat - fullurl - (list (cons 'key "thekey") - (cons 'cmd cmd) - (cons 'params params)) - read-string)) + ;; (condition-case (with-input-from-request "http://localhost"; #f read-lines) + ;; ((exn http client-error) e (print e))) + (set! res (vector + success + (db:string->obj + (handle-exceptions + exn + (begin + (set! success #f) + (debug:print 0 "WARNING: failure in with-input-from-request to " fullurl ".") + (debug:print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) + (hash-table-delete! *runremote* run-id) + ;; Killing associated server to allow clean retry.") + (tasks:kill-server-run-id run-id) ;; better to kill the server in the logic that called this routine? + ;; (signal (make-composite-condition + ;; (make-property-condition 'commfail 'message "failed to connect to server"))) + "communications failed") + (with-input-from-request ;; was dat + fullurl + (list (cons 'key "thekey") + (cons 'cmd cmd) + (cons 'params sparams)) + read-string)) + transport: 'http))) ;; Shouldn't this be a call to the managed call-all-connections stuff above? (close-all-connections!) (mutex-unlock! *http-mutex*) )) (time-out (lambda () @@ -246,89 +291,127 @@ (thread-start! th1) (thread-start! th2) (thread-join! th1) (thread-terminate! th2) (debug:print-info 11 "got res=" res) - res))))) + (if (vector? res) + (if (vector-ref res 0) + res + (begin ;; note: this code also called in nmsg-transport - consider consolidating it + (debug:print 0 "ERROR: error occured at server, info=" (vector-ref res 2)) + (debug:print 0 " client call chain:") + (print-call-chain (current-error-port)) + (debug:print 0 " server call chain:") + (pp (vector-ref res 1) (current-error-port)) + (signal (vector-ref result 0)))) + (signal (make-composite-condition + (make-property-condition + 'timeout + 'message "nmsg-transport:client-api-send-receive-raw timed out talking to server"))))))) + +;; careful closing of connections stored in *runremote* +;; +(define (http-transport:close-connections run-id) + (let* ((server-dat (hash-table-ref/default *runremote* run-id #f))) + (if (vector? server-dat) + (let ((api-dat (http-transport:server-dat-get-api-uri server-dat))) + (close-connection! api-dat) + #t) + #f))) + + +(define (make-http-transport:server-dat)(make-vector 6)) +(define (http-transport:server-dat-get-iface vec) (vector-ref vec 0)) +(define (http-transport:server-dat-get-port vec) (vector-ref vec 1)) +(define (http-transport:server-dat-get-api-uri vec) (vector-ref vec 2)) +(define (http-transport:server-dat-get-api-url vec) (vector-ref vec 3)) +(define (http-transport:server-dat-get-api-req vec) (vector-ref vec 4)) +(define (http-transport:server-dat-get-last-access vec) (vector-ref vec 5)) +(define (http-transport:server-dat-get-socket vec) (vector-ref vec 6)) + +(define (http-transport:server-dat-make-url vec) + (if (and (http-transport:server-dat-get-iface vec) + (http-transport:server-dat-get-port vec)) + (conc "http://" + (http-transport:server-dat-get-iface vec) + ":" + (http-transport:server-dat-get-port vec)) + #f)) + +(define (http-transport:server-dat-update-last-access vec) + (vector-set! vec 5 (current-seconds))) ;; ;; connect ;; (define (http-transport:client-connect iface port) - (let* ((api-url (conc "http://" iface ":" port "/api")) - (uri-dat (make-request method: 'POST uri: (uri-reference (conc "http://" iface ":" port "/ctrl")))) - ;; (uri-dat (make-request method: 'GET uri: (uri-reference (conc "http://" iface ":" port "/ctrl")))) - (uri-api-dat (make-request method: 'POST uri: api-url)) ;; (uri-reference (conc "http://" iface ":" port "/api")))) - ;; (uri-api-dat (make-request method: 'GET uri: (uri-reference (conc "http://" iface ":" port "/api")))) - (server-dat (list iface port uri-dat uri-api-dat api-url))) -;; (login-res (server:ping-server run-id server-dat))) ;; login-no-auto-client-setup server-dat run-id))) + (let* ((api-url (conc "http://" iface ":" port "/api")) + (api-uri (uri-reference (conc "http://" iface ":" port "/api"))) + (api-req (make-request method: 'POST uri: api-uri)) + (server-dat (vector iface port api-uri api-url api-req (current-seconds)))) server-dat)) -;; (if (and (list? login-res) -;; (car login-res)) -;; (begin -;; (hash-table-set! *runremote* run-id server-dat) -;; (debug:print-info 2 "Logged in and connected to " iface ":" port) -;; (hash-table-set! *runremote* run-id server-dat) -;; server-dat) -;; (begin -;; (debug:print-info 0 "ERROR: Failed to login or connect to " iface ":" port) -;; #f)))) ;; run http-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; -(define (http-transport:keep-running server-id) +(define (http-transport:keep-running server-id run-id) ;; if none running or if > 20 seconds since ;; server last used then start shutdown ;; This thread waits for the server to come alive - (let* ((server-info (let loop ((start-time (current-seconds)) + (debug:print-info 0 "Starting the sync-back, keep alive thread in server for run-id=" run-id) + (let* ((tdbdat (tasks:open-db)) + (server-info (let loop ((start-time (current-seconds)) (changed #t) (last-sdat "not this")) (let ((sdat #f)) + (thread-sleep! 0.01) + (debug:print-info 0 "Waiting for server alive signature") (mutex-lock! *heartbeat-mutex*) (set! sdat *server-info*) (mutex-unlock! *heartbeat-mutex*) (if (and sdat (not changed) (> (- (current-seconds) start-time) 2)) sdat (begin + (debug:print-info 0 "Still waiting, last-sdat=" last-sdat) (sleep 4) - (loop start-time - (equal? sdat last-sdat) - sdat)))))) + (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes + (begin + (debug:print 0 "ERROR: transport appears to have died, exiting server " server-id " for run " run-id) + (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature") + (exit)) + (loop start-time + (equal? sdat last-sdat) + sdat))))))) (iface (car server-info)) (port (cadr server-info)) (last-access 0) - (tdb (tasks:open-db)) - (server-timeout (let ((tmo (configf:lookup *configdat* "server" "timeout"))) - (if (and (string? tmo) - (string->number tmo)) - (* 60 60 (string->number tmo)) - ;; (* 3 24 60 60) ;; default to three days - ;; (* 60 1) ;; default to one minute - (* 60 60 25) ;; default to 25 hours - )))) + (server-timeout (server:get-timeout))) (let loop ((count 0) (server-state 'available)) ;; Use this opportunity to sync the inmemdb to db (let ((start-time (current-milliseconds)) (sync-time #f) (rem-time #f)) - - (if *inmemdb* (db:sync-touched *inmemdb* force-sync: #t)) + ;; inmemdb is a dbstruct + (if *inmemdb* (db:sync-touched *inmemdb* *run-id* force-sync: #t)) (set! sync-time (- (current-milliseconds) start-time)) (set! rem-time (quotient (- 4000 sync-time) 1000)) - (debug:print 0 "SYNC: time= " sync-time ", rem-time=" rem-time) + (debug:print 2 "SYNC: time= " sync-time ", rem-time=" rem-time) ;; - ;; set_running after our first pass through + ;; set_running after our first pass through and start the db ;; (if (eq? server-state 'available) - (tasks:server-set-state! tdb server-id "running")) + (begin + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep") + (thread-sleep! 5) ;; give some margin for queries to complete before switching from file based access to server based access + (set! *inmemdb* (db:setup run-id)) + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running"))) (if (and (<= rem-time 4) (> rem-time 0)) (thread-sleep! rem-time) (thread-sleep! 4))) ;; fallback for if the math is changed ... @@ -353,11 +436,13 @@ (set! last-access *last-db-access*) (mutex-unlock! *heartbeat-mutex*) ;; (debug:print 11 "last-access=" last-access ", server-timeout=" server-timeout) ;; - ;; no_traffic + ;; no_traffic, no running tests, if server 0, no running servers + ;; + ;; (let ((wait-on-running (configf:lookup *configdat* "server" "wait-on-running"))) ;; wait on running tasks (if not true then exit on time out) ;; (if (and *server-run* (> (+ last-access server-timeout) (current-seconds))) (begin @@ -368,81 +453,92 @@ ;; ;; (if (tasks:server-am-i-the-server? tdb run-id) ;; (tasks:server-set-state! tdb server-id "running")) ;; (loop 0 server-state)) - (begin - (debug:print-info 0 "Starting to shutdown the server.") - ;; need to delete only *my* server entry (future use) - (set! *time-to-exit* #t) - (if *inmemdb* (db:sync-touched *inmemdb* force-sync: #t)) - ;; - ;; start_shutdown - ;; - ( tasks:server-set-state! tdb server-id "shutting-down") - (thread-sleep! 5) - (debug:print-info 0 "Max cached queries was " *max-cache-size*) - (debug:print-info 0 "Number of cached writes " *number-of-writes*) - (debug:print-info 0 "Average cached write time " - (if (eq? *number-of-writes* 0) - "n/a (no writes)" - (/ *writes-total-delay* - *number-of-writes*)) - " ms") - (debug:print-info 0 "Number non-cached queries " *number-non-write-queries*) - (debug:print-info 0 "Average non-cached time " - (if (eq? *number-non-write-queries* 0) - "n/a (no queries)" - (/ *total-non-write-delay* - *number-non-write-queries*)) - " ms") - (debug:print-info 0 "Server shutdown complete. Exiting") - (tasks:server-delete-record tdb server-id " http-transport:keep-running") - (exit)))))) + (http-transport:server-shutdown server-id port))))) + +(define (http-transport:server-shutdown server-id port) + (let ((tdbdat (tasks:open-db))) + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (if *inmemdb* (db:sync-touched *inmemdb* *run-id* force-sync: #t)) + ;; + ;; start_shutdown + ;; + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "shutting-down") + (portlogger:open-run-close portlogger:set-port port "released") + (thread-sleep! 5) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Number of cached writes " *number-of-writes*) + (debug:print-info 0 "Average cached write time " + (if (eq? *number-of-writes* 0) + "n/a (no writes)" + (/ *writes-total-delay* + *number-of-writes*)) + " ms") + (debug:print-info 0 "Number non-cached queries " *number-non-write-queries*) + (debug:print-info 0 "Average non-cached time " + (if (eq? *number-non-write-queries* 0) + "n/a (no queries)" + (/ *total-non-write-delay* + *number-non-write-queries*)) + " ms") + (debug:print-info 0 "Server shutdown complete. Exiting") + (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " http-transport:keep-running complete") + (exit))) ;; all routes though here end in exit ... ;; ;; start_server? ;; (define (http-transport:launch run-id) - (set! *run-id* run-id) - (if (args:get-arg "-daemonize") - (daemon:ize)) - (if (server:check-if-running run-id) - (begin - (debug:print 0 "INFO: Server for run-id " run-id " already running") - (exit 0))) - (let loop ((server-id (open-run-close tasks:server-lock-slot tasks:open-db run-id)) - (remtries 4)) - (if (not server-id) - (if (> remtries 0) - (begin - (thread-sleep! 2) - (loop (open-run-close tasks:server-lock-slot tasks:open-db run-id) - (- remtries 1))) - (begin - ;; since we didn't get the server lock we are going to clean up and bail out - (debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") - (open-run-close tasks:server-delete-records-for-this-pid tasks:open-db " http-transport:launch") - )) - (let* ((th2 (make-thread (lambda () - (http-transport:run - (if (args:get-arg "-server") - (args:get-arg "-server") - "-") - run-id - server-id)) "Server run")) - (th3 (make-thread (lambda () - (http-transport:keep-running server-id)) - "Keep running"))) - ;; Database connection - (set! *inmemdb* (db:setup run-id)) - (thread-start! th2) - (thread-start! th3) - (set! *didsomething* #t) - (thread-join! th2) - (exit))))) + (let* ((tdbdat (tasks:open-db))) + (set! *run-id* run-id) + (if (args:get-arg "-daemonize") + (begin + (daemon:ize) + (if *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it + (begin + (current-error-port *alt-log-file*) + (current-output-port *alt-log-file*))))) + (if (server:check-if-running run-id) + (begin + (debug:print 0 "INFO: Server for run-id " run-id " already running") + (exit 0))) + (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)) + (remtries 4)) + (if (not server-id) + (if (> remtries 0) + (begin + (thread-sleep! 2) + (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id) + (- remtries 1))) + (begin + ;; since we didn't get the server lock we are going to clean up and bail out + (debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") + (tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch") + )) + (let* ((th2 (make-thread (lambda () + (debug:print-info 0 "Server run thread started") + (http-transport:run + (if (args:get-arg "-server") + (args:get-arg "-server") + "-") + run-id + server-id)) "Server run")) + (th3 (make-thread (lambda () + (debug:print-info 0 "Server monitor thread started") + (http-transport:keep-running server-id run-id)) + "Keep running"))) + (thread-start! th2) + (thread-sleep! 0.25) ;; give the server time to settle before starting the keep-running monitor. + (thread-start! th3) + (set! *didsomething* #t) + (thread-join! th2) + (exit)))))) (define (http:ping run-id host-port) (let* ((server-dat (http-transport:client-connect (car host-port)(cadr host-port))) (login-res (rmt:login-no-auto-client-setup server-dat run-id))) (if (and (list? login-res) @@ -453,10 +549,11 @@ (begin (print "LOGIN_FAILED") (exit 1))))) (define (http-transport:server-signal-handler signum) + (signal-mask! signum) (handle-exceptions exn (debug:print " ... exiting ...") (let ((th1 (make-thread (lambda () (thread-sleep! 1))