Index: client.scm ================================================================== --- client.scm +++ client.scm @@ -41,22 +41,76 @@ (define (client:logout serverdat) (let ((ok (and (socket? serverdat) (cdb:logout serverdat *toppath* (client:get-signature))))) ok)) -(define (client:connect iface port) - (case (server:get-transport) - ((rpc) (rpc:client-connect iface port)) - ((http) (http:client-connect iface port)) - ((zmq) (zmq:client-connect iface port)) - (else (rpc:client-connect iface port)))) - -(define (client:setup run-id #!key (remaining-tries 10) (failed-connects 0)) - (case (server:get-transport) - ((rpc) (rpc-transport:client-setup run-id)) ;;(client:setup-rpc run-id)) - ((http)(client:setup-http run-id)) - (else (rpc-transport:client-setup run-id)))) ;; (client:setup-rpc run-id)))) +;; BB: commenting out orphan code +;;;;; +;; (define (client:connect iface port) +;; (case (server:get-transport) +;; ((rpc) (rpc:client-connect iface port)) +;; ((http) (http:client-connect iface port)) +;; ((zmq) (zmq:client-connect iface port)) +;; (else (rpc:client-connect iface port)))) + +(define (client:setup run-id #!key (remaining-tries 10)) + (debug:print-info 2 *default-log-port* "client:setup remaining-tries=" remaining-tries) + (let* ((server-dat (tasks:bb-get-server-info run-id)) + (transport (if server-dat (string->symbol (tasks:hostinfo-get-transport server-dat)) 'noserver))) + ;;(BB> "transport >"transport"< string? transport >"(string? transport)"< symbol? transport >"(symbol? transport)"<") + (case transport + ((noserver) ;; no server registered + (if (<= remaining-tries 0) + (begin + (debug:print-error 0 *default-log-port* "failed to start or connect to server for run-id " run-id) + (exit 1)) + (begin + (let ((num-available (tasks:bb-num-in-available-state run-id))) + (debug:print-info 0 *default-log-port* "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available) + (if (< num-available 2) + (server:try-running run-id)) + (thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. + (client:setup run-id remaining-tries: (- remaining-tries 1)))))) + ((http)(client:setup-http run-id server-dat remaining-tries)) + ;; ((rpc) (rpc-transport:client-setup run-id)) ;;(client:setup-rpc run-id)) rpc not implemented; want to see a failure here for now. + (else + (debug:print-error 0 *default-log-port* "Transport [" + transport "] specified for run-id [" run-id "] is not implemented in client:setup. Cannot proceed.") + (exit 1))))) + +;; client:setup-http +;; +;; For http transport, robustly ensure an advertised-running server is actually working and responding, and +;; establish tcp connection to server. For servers marked running but not responding, kill them and clear from mdb +;; +(define (client:setup-http run-id server-dat remaining-tries) + (let* ((iface (tasks:hostinfo-get-interface server-dat)) + (hostname (tasks:hostinfo-get-hostname server-dat)) + (port (tasks:hostinfo-get-port server-dat)) + + (start-res (http-transport:client-connect iface port)) + (ping-res (rmt:login-no-auto-client-setup start-res run-id))) + (if (and start-res ping-res) + (begin + (hash-table-set! *runremote* run-id start-res) ;; side-effect - *runremote* cache init fpr rmt:* + (debug:print-info 2 *default-log-port* "connected to " (http-transport:server-dat-make-url start-res)) + start-res) + (begin ;; login failed but have a server record, clean out the record and try again + (debug:print-info 0 *default-log-port* "client:setup, login failed, will attempt to start server ... start-res=" start-res ", run-id=" run-id ", server-dat=" server-dat) + (http-transport:close-connections run-id) + (hash-table-delete! *runremote* run-id) + (tasks:kill-server-run-id run-id) + (tasks:bb-server-force-clean-run-record run-id iface port + " client:setup (server-dat = #t)") + (if (> remaining-tries 8) + (thread-sleep! (+ 1 (random 5))) ;; spread out the starts a little + (thread-sleep! (+ 15 (random 20)))) ;; it isn't going well. give it plenty of time + (server:try-running run-id) + (thread-sleep! 5) ;; give server a little time to start up + (client:setup run-id remaining-tries: (- remaining-tries 1)) + )))) + ;; (define (client:login-no-auto-setup server-info run-id) ;; (case (server:get-transport) ;; ((rpc) (rpc:login-no-auto-client-setup server-info run-id)) ;; ((http) (rmt:login-no-auto-client-setup server-info run-id)) @@ -95,11 +149,11 @@ ;; (begin ;; (debug:print 25 *default-log-port* "INFO: client:setup failed to connect, start-res=" start-res ", run-id=" run-id ", host-info=" host-info) ;; (thread-sleep! 5) ;; (client:setup run-id remaining-tries: (- remaining-tries 1)))))) ;; ;; YUK: rename server-dat here -;; (let* ((server-dat (open-run-close tasks:get-server tasks:open-db run-id))) +;; (let* ((server-dat (open-run-close tasks:get-server-info tasks:open-db run-id))) ;; (debug:print-info 0 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) ;; (if server-dat ;; (let* ((iface (tasks:hostinfo-get-interface server-dat)) ;; (port (tasks:hostinfo-get-port server-dat)) ;; (start-res (http-transport:client-connect iface port)) @@ -152,71 +206,17 @@ ;; ;; client:setup ;; ;; lookup_server, need to remove *runremote* stuff ;; -(define (client:setup-http run-id #!key (remaining-tries 10) (failed-connects 0)) - (debug:print-info 2 *default-log-port* "client:setup remaining-tries=" remaining-tries) - (let* ((tdbdat (tasks:open-db))) - (if (<= remaining-tries 0) - (begin - (debug:print-error 0 *default-log-port* "failed to start or connect to server for run-id " run-id) - (exit 1)) - (let* ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id))) - (debug:print-info 4 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) - (if server-dat - (let* ((iface (tasks:hostinfo-get-interface server-dat)) - (hostname (tasks:hostinfo-get-hostname server-dat)) - (port (tasks:hostinfo-get-port server-dat)) - (start-res (case *transport-type* - ((http)(http-transport:client-connect iface port)) - ;;((nmsg)(nmsg-transport:client-connect hostname port)) - )) - (ping-res (case *transport-type* - ((http)(rmt:login-no-auto-client-setup start-res run-id)) - ;; ((nmsg)(let ((logininfo (rmt:login-no-auto-client-setup start-res run-id))) - ;; (if logininfo - ;; (car (vector-ref logininfo 1)) - ;; #f))) - - ))) - (if (and start-res - ping-res) - (begin - (hash-table-set! *runremote* run-id start-res) - (debug:print-info 2 *default-log-port* "connected to " (http-transport:server-dat-make-url start-res)) - start-res) - (begin ;; login failed but have a server record, clean out the record and try again - (debug:print-info 0 *default-log-port* "client:setup, login failed, will attempt to start server ... start-res=" start-res ", run-id=" run-id ", server-dat=" server-dat) - (case *transport-type* - ((http)(http-transport:close-connections run-id))) - (hash-table-delete! *runremote* run-id) - (tasks:kill-server-run-id run-id) - (tasks:server-force-clean-run-record (db:delay-if-busy tdbdat) - run-id - (tasks:hostinfo-get-interface server-dat) - (tasks:hostinfo-get-port server-dat) - " client:setup (server-dat = #t)") - (if (> remaining-tries 8) - (thread-sleep! (+ 1 (random 5))) ;; spread out the starts a little - (thread-sleep! (+ 15 (random 20)))) ;; it isn't going well. give it plenty of time - (server:try-running run-id) - (thread-sleep! 5) ;; give server a little time to start up - (client:setup run-id remaining-tries: (- remaining-tries 1)) - ))) - (begin ;; no server registered - (let ((num-available (tasks:num-in-available-state (db:dbdat-get-db tdbdat) run-id))) - (debug:print-info 0 *default-log-port* "client:setup, no server registered, remaining-tries=" remaining-tries " num-available=" num-available) - (if (< num-available 2) - (server:try-running run-id)) - (thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. - (client:setup run-id remaining-tries: (- remaining-tries 1))))))))) - -;; keep this as a function to ease future -(define (client:start run-id server-info) - (http-transport:client-connect (tasks:hostinfo-get-interface server-info) - (tasks:hostinfo-get-port server-info))) + +;; BB: commenting out orphan code. +;; +;; ;; keep this as a function to ease future +;; (define (client:start run-id server-info) +;; (http-transport:client-connect (tasks:hostinfo-get-interface server-info) +;; (tasks:hostinfo-get-port server-info))) ;; ;; client:signal-handler ;; (define (client:signal-handler signum) ;; (signal-mask! signum) ;; (set! *time-to-exit* #t) Index: http-transport.scm ================================================================== --- http-transport.scm +++ http-transport.scm @@ -323,11 +323,11 @@ (define (http-transport:server-dat-get-port vec) (vector-ref vec 1)) (define (http-transport:server-dat-get-api-uri vec) (vector-ref vec 2)) (define (http-transport:server-dat-get-api-url vec) (vector-ref vec 3)) (define (http-transport:server-dat-get-api-req vec) (vector-ref vec 4)) (define (http-transport:server-dat-get-last-access vec) (vector-ref vec 5)) -(define (http-transport:server-dat-get-socket vec) (vector-ref vec 6)) +(define (http-transport:server-dat-get-transport vec) (vector-ref vec 6)) (define (http-transport:server-dat-make-url vec) (if (and (http-transport:server-dat-get-iface vec) (http-transport:server-dat-get-port vec)) (conc "http://" @@ -348,11 +348,11 @@ ;; (define (http-transport:client-connect iface port) (let* ((api-url (conc "http://" iface ":" port "/api")) (api-uri (uri-reference (conc "http://" iface ":" port "/api"))) (api-req (make-request method: 'POST uri: api-uri)) - (server-dat (vector iface port api-uri api-url api-req (current-seconds)))) + (server-dat (vector iface port api-uri api-url api-req (current-seconds) 'http))) server-dat)) ;; run http-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; @@ -535,17 +535,17 @@ (current-output-port *alt-log-file*))))) (if (server:check-if-running run-id) (begin (debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running") (exit 0))) - (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)) + (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id 'http)) (remtries 4)) (if (not server-id) (if (> remtries 0) (begin (thread-sleep! 2) - (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id) + (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id 'http) (- remtries 1))) (begin ;; since we didn't get the server lock we are going to clean up and bail out (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") (tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch") Index: launch.scm ================================================================== --- launch.scm +++ launch.scm @@ -700,11 +700,11 @@ ;; *toppath* ;; side effects: ;; sets; *configdat* (megatest.config info) ;; *runconfigdat* (runconfigs.config info) ;; *configstatus* (status of the read data) -;; +;; *transport-type* (define (launch:setup #!key (force #f)) (let* ((toppath (or *toppath* (getenv "MT_RUN_AREA_HOME"))) ;; preserve toppath (runname (common:args-get-runname)) (target (common:args-get-target)) (linktree (common:get-linktree)) @@ -827,10 +827,12 @@ (if (and *toppath* (directory-exists? *toppath*)) (setenv "MT_RUN_AREA_HOME" *toppath*) (begin (debug:print-error 0 *default-log-port* "failed to find the top path to your Megatest area."))) + (server:set-transport) + ;;(BB> "Transport is >"*transport-type*"<") *toppath*)) (define (get-best-disk confdat testconfig) (let* ((disks (or (and testconfig (hash-table-ref/default testconfig "disks" #f)) (hash-table-ref/default confdat "disks" #f))) Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -81,70 +81,67 @@ (let ((expire-time (- (current-seconds) (server:get-timeout) 10))) ;; don't forget the 10 second margin (for-each (lambda (run-id) (let ((connection (hash-table-ref/default *runremote* run-id #f))) (if (and (vector? connection) - (< (http-transport:server-dat-get-last-access connection) expire-time)) + (< (http-transport:server-dat-get-last-access connection) expire-time)) ;; BB> BBTODO: make this generic, not http transport specific. (begin (debug:print-info 0 *default-log-port* "Discarding connection to server for run-id " run-id ", too long between accesses") - ;; bb- disabling nanomsg - ;; SHOULD CLOSE THE CONNECTION HERE - ;; (case *transport-type* - ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket - ;; (hash-table-ref *runremote* run-id))))) (hash-table-delete! *runremote* run-id))))) (hash-table-keys *runremote*))) ;; (mutex-unlock! *db-multi-sync-mutex*) ;; (mutex-lock! *send-receive-mutex*) (let* ((run-id (if rid rid 0)) (connection-info (rmt:get-connection-info run-id))) ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) (if connection-info ;; use the server if have connection info - (let* ((dat (case *transport-type* + (let* ((transport-type (vector-ref connection-info 6)) + (dat (case transport-type ;; BB: replaced *transport-type* global with run-id specific transport-type, item 6 in server-info vector which was populated by *-transport:client-connect with >> (vector iface port api-uri api-url api-req (current-seconds) 'http ) << ((http)(condition-case (http-transport:client-api-send-receive run-id connection-info cmd params) ((commfail)(vector #f "communications fail")) ((exn)(vector #f "other fail")))) - ;; ((nmsg)(condition-case - ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd params) - ;; ((timeout)(vector #f "timeout talking to server")))) - (else (exit)))) + ;;((rpc) (rpc-transport:client-api-send-receive run-id connection-info cmd params)) ;; BB: let us error out for now + (else + (debug:print-error 0 *default-log-port* "Transport [" + transport "] specified for run-id [" run-id "] is not implemented in rmt:send-receive. Cannot proceed.") + (exit 1)))) (success (if (vector? dat) (vector-ref dat 0) #f)) (res (if (vector? dat) (vector-ref dat 1) #f))) - (if (vector? connection-info)(http-transport:server-dat-update-last-access connection-info)) + (if (vector? connection-info)(http-transport:server-dat-update-last-access connection-info)) ;; BB> BBTODO: make this generic, not http transport specific. (if success (begin ;; (mutex-unlock! *send-receive-mutex*) (case *transport-type* - ((http) res) ;; (db:string->obj res)) + ((http rpc) res) ;; (db:string->obj res)) ;; ((nmsg) res) )) ;; (vector-ref res 1))) (begin ;; let ((new-connection-info (client:setup run-id))) (debug:print 0 *default-log-port* "WARNING: Communication failed, trying call to rmt:send-receive again.") - ;; (case *transport-type* - ;; ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info)))) - (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection - ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. - ;; (if (eq? (modulo attemptnum 5) 0) - ;; (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) - ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications - (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) - ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) - - ;; no longer killing the server in http-transport:client-api-send-receive - ;; may kill it here but what are the criteria? - ;; start with three calls then kill server - ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) - ;; (thread-sleep! 2) - (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1))))) + (case *transport-type* + ((http) + (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection + ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. + ;; (if (eq? (modulo attemptnum 5) 0) + ;; (tasks:kill-server-run-id run-id tag: "api-send-receive-failed")) + ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications + (tasks:start-and-wait-for-server (tasks:open-db) run-id 15) + ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1)))))) + + ;; no longer killing the server in http-transport:client-api-send-receive + ;; may kill it here but what are the criteria? + ;; start with three calls then kill server + ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id)) + ;; (thread-sleep! 2) + (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1))))))) ;; no connection info? try to start a server, or access locally if no ;; server and the query is read-only ;; ;; Note: The tasks db was checked for a server in starting mode in the rmt:get-connection-info call ;; - (if (and (< attemptnum 15) + (if (and (< attemptnum 15) (member cmd api:write-queries)) (let ((faststart (configf:lookup *configdat* "server" "faststart"))) (hash-table-delete! *runremote* run-id) ;; (mutex-unlock! *send-receive-mutex*) (if (and faststart (equal? faststart "no")) Index: rpc-transport.scm ================================================================== --- rpc-transport.scm +++ rpc-transport.scm @@ -47,43 +47,51 @@ (daemon:ize)) (if (server:check-if-running run-id) (begin (debug:print 0 *default-log-port* "INFO: Server for run-id " run-id " already running") (exit 0))) - (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)) + (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id 'rpc)) (remtries 4)) (if (not server-id) (if (> remtries 0) (begin (thread-sleep! 2) - (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id) + (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id 'rpc) (- remtries 1))) (begin ;; since we didn't get the server lock we are going to clean up and bail out (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") (tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " rpc-transport:launch"))) (begin + (rpc-transport:run (if (args:get-arg "-server")(args:get-arg "-server") "-") run-id server-id) (exit)))))) +(define *rpc-listener-port* #f) +(define *rpc-listener-port-bind-timestamp* #f) + (define (rpc-transport:run hostn run-id server-id) + (BB> "rpc-transport:run fired for hostn="hostn" run-id="run-id" server-id="server-id) (debug:print 2 *default-log-port* "Attempting to start the rpc server ...") ;; (trace rpc:publish-procedure!) (rpc:publish-procedure! 'server:login server:login) (rpc:publish-procedure! 'testing (lambda () "Just testing")) - + (BB> "flag1") (let* ((db #f) - (hostname (get-host-name)) - (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + (tdbdat (tasks:open-db)) + (hostname (let ((res (get-host-name))) (BB> "hostname="res) res)) + (ipaddrstr (let* ((ipstr (if (string=? "-" hostn) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") (server:get-best-guess-address hostname) - #f))) - (if ipstr ipstr hostn))) ;; hostname))) - (start-port (open-run-close tasks:server-get-next-port tasks:open-db)) + #f)) + (res (if ipstr ipstr hostn))) + (BB> "ipaddrstr="res) + res)) ;; hostname))) + (start-port (let ((res (portlogger:open-run-close portlogger:find-port))) (BB> "start-port="res) res)) (link-tree-path (configf:lookup *configdat* "setup" "linktree")) - (rpc:listener (rpc-transport:find-free-port-and-open (rpc:default-server-port))) + (rpc:listener (rpc-transport:find-free-port-and-open start-port)) (th1 (make-thread (lambda () ((rpc:make-server rpc:listener) #t)) "rpc:server")) ;; (cute (rpc:make-server rpc:listener) "rpc:server") @@ -92,19 +100,21 @@ (get-host-name) hostn)) (ipaddrstr (if (string=? "-" hostn) (server:get-best-guess-address hostname) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") #f)) - (portnum (rpc:default-server-port)) + (portnum (let ((res (rpc:default-server-port))) (BB> "rpc:default-server-port="res" rpc-listener-port="*rpc-listener-port*) res)) (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" portnum)) (tdb (tasks:open-db))) + (BB> "Got here before thread start of rpc listener") (thread-start! th1) + (BB> "started thread th1="th1) (set! db *inmemdb*) - (open-run-close tasks:server-set-interface-port - tasks:open-db - server-id - ipaddrstr portnum) + (tasks:server-set-interface-port + (db:delay-if-busy tdbdat) + server-id + ipaddrstr portnum) (debug:print 0 *default-log-port* "Server started on " host:port) ;; (trace rpc:publish-procedure!) ;; (rpc:publish-procedure! 'server:login server:login) ;; (rpc:publish-procedure! 'testing (lambda () "Just testing")) @@ -112,14 +122,14 @@ ;;====================================================================== ;; ;; end of publish-procedure section ;;====================================================================== ;; (on-exit (lambda () - (open-run-close tasks:server-set-state! tasks:open-db server-id "stopped"))) + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "stopped"))) (set! *rpc:listener* rpc:listener) - (tasks:server-set-state! tdb server-id "running") + (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running") (set! *inmemdb* (db:setup run-id)) ;; if none running or if > 20 seconds since ;; server last used then start shutdown (let loop ((count 0)) (thread-sleep! 5) ;; no need to do this very often @@ -135,20 +145,23 @@ (thread-sleep! 10) (debug:print-info 0 *default-log-port* "Max cached queries was " *max-cache-size*) (debug:print-info 0 *default-log-port* "Server shutdown complete. Exiting") )))))) -(define (rpc-transport:find-free-port-and-open port) +(define (rpc-transport:find-free-port-and-open port #!key ) (handle-exceptions exn - (begin + (begin (print "Failed to bind to port " (rpc:default-server-port) ", trying next port") (rpc-transport:find-free-port-and-open (+ port 1))) (rpc:default-server-port port) + (set! *rpc-listener-port* port) ;; a bit paranoid about rpc:default-server-port parameter not changing across threads (as params are wont to do). keeping this global in my back pocket in case this causes problems + (set! *rpc-listener-port-bind-timestamp* (current-milliseconds)) ;; may want to test how long it has been since the last bind attempt happened... (tcp-read-timeout 240000) + (BB> "rpc-transport> attempting to bind tcp port "port) (tcp-listen (rpc:default-server-port) 10000))) - + (define (rpc-transport:ping run-id host port) (handle-exceptions exn (begin (print "SERVER_NOT_FOUND") @@ -179,11 +192,11 @@ server-dat) (begin (server:try-running run-id) (thread-sleep! 2) (rpc-transport:client-setup run-id (- remtries 1))))) - (let* ((server-db-info (open-run-close tasks:get-server tasks:open-db run-id))) + (let* ((server-db-info (open-run-close tasks:get-server-info tasks:open-db run-id))) (debug:print-info 0 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) (if server-db-info (let* ((iface (tasks:hostinfo-get-interface server-db-info)) (port (tasks:hostinfo-get-port server-db-info)) (server-dat (list iface port #f #f #f)) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -61,21 +61,27 @@ ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== -;; Get the transport -(define (server:get-transport) - (if *transport-type* - *transport-type* - (let ((ttype (string->symbol - (or (args:get-arg "-transport") - (configf:lookup *configdat* "server" "transport") - "rpc")))) - (set! *transport-type* ttype) - ttype))) - +;; set global *transport-type* based on -transport switch and serer/transport configuration. default http otherwise. +;; called by launch:setup +(define (server:set-transport) + (let ((ttype (string->symbol + (or (args:get-arg "-transport") + (configf:lookup *configdat* "server" "transport") + "http")))) + (set! *transport-type* ttype) + ttype)) + +;; Get the transport -- DO NOT call this from client code. In client code, this is run-id sensitive and not a global + + (define (server:get-transport) + (if *transport-type* + *transport-type* + (server:set-transport))) + ;; Generate a unique signature for this server (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () @@ -180,11 +186,11 @@ (server:run run-id) (rmt:start-server run-id))) (define (server:check-if-running run-id) (let ((tdbdat (tasks:open-db))) - (let loop ((server (tasks:get-server (db:delay-if-busy tdbdat) run-id)) + (let loop ((server (tasks:get-server-info (db:delay-if-busy tdbdat) run-id)) (trycount 0)) (if server ;; note: client:start will set *runremote*. this needs to be changed ;; also, client:start will login to the server, also need to change that. ;; @@ -215,11 +221,11 @@ (let* ((host-port (let ((slst (string-split host:port ":"))) (if (eq? (length slst) 2) (list (car slst)(string->number (cadr slst))) #f))) (toppath (launch:setup)) - (server-db-dat (if (not host-port)(tasks:get-server (db:delay-if-busy tdbdat) run-id) #f))) + (server-db-dat (if (not host-port)(tasks:get-server-info (db:delay-if-busy tdbdat) run-id) #f))) (if (not run-id) (begin (debug:print-error 0 *default-log-port* "must specify run-id when doing ping, -run-id n") (print "ERROR: No run-id") (exit 1)) Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -170,21 +170,21 @@ (define (tasks:hostinfo-get-pubport vec) (vector-ref vec 3)) (define (tasks:hostinfo-get-transport vec) (vector-ref vec 4)) (define (tasks:hostinfo-get-pid vec) (vector-ref vec 5)) (define (tasks:hostinfo-get-hostname vec) (vector-ref vec 6)) -(define (tasks:server-lock-slot mdb run-id) +(define (tasks:server-lock-slot mdb run-id transport-type) (tasks:server-clean-out-old-records-for-run-id mdb run-id " tasks:server-lock-slot") (if (< (tasks:num-in-available-state mdb run-id) 4) (begin - (tasks:server-set-available mdb run-id) + (tasks:server-set-available mdb run-id transport-type) (thread-sleep! (/ (random 1500) 1000)) ;; (thread-sleep! 2) ;; Try removing this. It may not be needed. (tasks:server-am-i-the-server? mdb run-id)) #f)) ;; register that this server may come online (first to register goes though with the process) -(define (tasks:server-set-available mdb run-id) +(define (tasks:server-set-available mdb run-id transport-type) (sqlite3:execute mdb "INSERT INTO servers (pid,hostname,port,pubport,start_time, priority,state,mt_version,heartbeat, interface,transport,run_id) VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?,-1,?, ?, ?);" (current-process-id) ;; pid @@ -194,11 +194,11 @@ (random 1000) ;; priority (used a tiebreaker on get-available) "available" ;; state (common:version-signature) ;; mt_version -1 ;; interface ;; (conc (server:get-transport)) ;; transport - (conc *transport-type*) ;; transport + (conc transport-type) ;; transport run-id )) (define (tasks:num-in-available-state mdb run-id) (let ((res 0)) @@ -327,27 +327,42 @@ mdb (conc "SELECT " selstr " FROM servers WHERE run_id=? AND state in ('available','running','dbprep') ORDER BY start_time DESC;") run-id) (vector header res))) -(define (tasks:get-server mdb run-id #!key (retries 10)) + +;; BB> bb opinion - want to push responsibility into api (encapsulation), like waiting if db is busy and finding the db handle in the first place. why should the caller need to be concerned?? If my opinion carries, we'll remove the bb- and make other needful adjustments. +(define (bb-mdb-inserter mdb-expecting-proc mdbless-args) + (let ((mdb (db:delay-if-busy (tasks:open-db)))) + (apply mdb-expecting-proc (cons mdb mdbless-args)))) + +(define (tasks:bb-get-server-info . args) + (bb-mdb-inserter tasks:get-server-info args)) + +(define (tasks:bb-num-in-available-state . args) + (bb-mdb-inserter tasks:num-in-available-state args)) + + + +;; BB: renaming tasks:get-server to get-server-info to make clear we aren't creating servers here +(define (tasks:get-server-info mdb run-id #!key (retries 10)) (let ((res #f) (best #f)) (handle-exceptions exn (begin (print-call-chain (current-error-port)) - (debug:print 0 *default-log-port* "WARNING: tasks:get-server db access error.") + (debug:print 0 *default-log-port* "WARNING: tasks:get-server-info db access error.") (debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn)) (debug:print 0 *default-log-port* " for run " run-id) (print-call-chain (current-error-port)) (if (> retries 0) (begin - (debug:print 0 *default-log-port* " trying call to tasks:get-server again in 10 seconds") + (debug:print 0 *default-log-port* " trying call to tasks:get-server-info again in 10 seconds") (thread-sleep! 10) - (tasks:get-server mdb run-id retries: (- retries 0))) - (debug:print 0 *default-log-port* "10 tries of tasks:get-server all crashed and burned. Giving up and returning \"no server found\""))) + (tasks:get-server-info mdb run-id retries: (- retries 0))) + (debug:print 0 *default-log-port* "10 tries of tasks:get-server-info all crashed and burned. Giving up and returning \"no server found\""))) (sqlite3:for-each-row (lambda (id interface port pubport transport pid hostname) (set! res (vector id interface port pubport transport pid hostname))) mdb ;; removed: @@ -394,21 +409,21 @@ ;; try to start a server and wait for it to be available ;; (define (tasks:start-and-wait-for-server tdbdat run-id delay-max-tries) ;; ensure a server is running for this run - (let loop ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id)) + (let loop ((server-dat (tasks:get-server-info (db:delay-if-busy tdbdat) run-id)) (delay-time 0)) (if (and (not server-dat) (< delay-time delay-max-tries)) (begin (if (common:low-noise-print 60 "tasks:start-and-wait-for-server" run-id) (debug:print 0 *default-log-port* "Try starting server for run-id " run-id)) (thread-sleep! (/ (random 2000) 1000)) (server:kind-run run-id) (thread-sleep! (min delay-time 1)) - (loop (tasks:get-server (db:delay-if-busy tdbdat) run-id)(+ delay-time 1)))))) + (loop (tasks:get-server-info (db:delay-if-busy tdbdat) run-id)(+ delay-time 1)))))) (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id) @@ -443,18 +458,18 @@ ;; look up a server by run-id and send it a kill, also delete the record for that server ;; (define (tasks:kill-server-run-id run-id #!key (tag "default")) (let* ((tdbdat (tasks:open-db)) - (sdat (tasks:get-server (db:delay-if-busy tdbdat) run-id))) + (sdat (tasks:get-server-info (db:delay-if-busy tdbdat) run-id))) (if sdat (let ((hostname (vector-ref sdat 6)) (pid (vector-ref sdat 5)) (server-id (vector-ref sdat 0))) (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "killed") (debug:print-info 0 *default-log-port* "Killing server " server-id " for run-id " run-id " on host " hostname " with pid " pid) - (tasks:kill-server hostname pid) + (tasks:kill-server hostname pid kill-switch: "-9") ;; BB: added -9, let's not be kind here. we need it to die (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id tag) ) (debug:print-info 0 *default-log-port* "No server found for run-id " run-id ", nothing to kill")) ;; (sqlite3:finalize! tdb) ))