Index: client.scm ================================================================== --- client.scm +++ client.scm @@ -46,17 +46,21 @@ (define (client:connect iface port) (case (server:get-transport) ((rpc) (rpc:client-connect iface port)) ((http) (http:client-connect iface port)) ((zmq) (zmq:client-connect iface port)) - (else (rpc:client-connect iface port)))) + (else + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (5)") + (exit)))) (define (client:setup run-id #!key (remaining-tries 10) (failed-connects 0)) (case (server:get-transport) ((rpc) (rpc-transport:client-setup run-id remaining-tries: remaining-tries failed-connects: failed-connects)) ;;(client:setup-rpc run-id)) ((http)(client:setup-http run-id remaining-tries: remaining-tries failed-connects: failed-connects)) - (else (rpc-transport:client-setup run-id remaining-tries: remaining-tries failed-connects: failed-connects)))) ;; (client:setup-rpc run-id)))) + (else + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (6)") + (exit)))) ;; (client:setup-rpc run-id)))) ;; (define (client:login-no-auto-setup server-info run-id) ;; (case (server:get-transport) ;; ((rpc) (rpc:login-no-auto-client-setup server-info run-id)) ;; ((http) (rmt:login-no-auto-client-setup server-info run-id)) @@ -165,22 +169,12 @@ (debug:print-info 4 *default-log-port* "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) (if server-dat (let* ((iface (tasks:hostinfo-get-interface server-dat)) (hostname (tasks:hostinfo-get-hostname server-dat)) (port (tasks:hostinfo-get-port server-dat)) - (start-res (case *transport-type* - ((http)(http-transport:client-connect iface port)) - ;;((nmsg)(nmsg-transport:client-connect hostname port)) - )) - (ping-res (case *transport-type* - ((http)(rmt:login-no-auto-client-setup start-res)) - ;; ((nmsg)(let ((logininfo (rmt:login-no-auto-client-setup start-res run-id))) - ;; (if logininfo - ;; (car (vector-ref logininfo 1)) - ;; #f))) - - ))) + (start-res (http-transport:client-connect iface port)) + (ping-res (rmt:login-no-auto-client-setup start-res))) (if (and start-res ping-res) (begin (remote-conndat-set! *runremote* start-res) ;; (hash-table-set! *runremote* run-id start-res) (debug:print-info 2 *default-log-port* "connected to " (http-transport:server-dat-make-url start-res)) @@ -209,11 +203,11 @@ (if (< num-available 2) (server:try-running run-id)) (thread-sleep! (+ 5 (random (- 20 remaining-tries)))) ;; give server a little time to start up, randomize a little to avoid start storms. (client:setup run-id remaining-tries: (- remaining-tries 1))))))))) -;; keep this as a function to ease future +;; keep this as a function to ease future ;; this is unused, not porting for rpc -BB (define (client:start run-id server-info) (http-transport:client-connect (tasks:hostinfo-get-interface server-info) (tasks:hostinfo-get-port server-info))) ;; ;; client:signal-handler Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -13,10 +13,11 @@ (declare (unit rmt)) (declare (uses api)) (declare (uses tdb)) (declare (uses http-transport)) +(declare (uses rpc-transport)) ;;(declare (uses nmsg-transport)) (include "common_records.scm") ;; ;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!! @@ -167,20 +168,24 @@ (dat (case (remote-transport *runremote*) ((http) (condition-case ;; handling here has caused a lot of problems. However it is needed to deal with attemtped communication to servers that have gone away (http-transport:client-api-send-receive 0 conninfo cmd params) ((commfail)(vector #f "communications fail")) ((exn)(vector #f "other fail" (print-call-chain))))) + ((rpc) (condition-case ;; handling here has caused a lot of problems. However it is needed to deal with attemtped communication to servers that have gone away + (rpc-transport:client-api-send-receive 0 conninfo cmd params) + ((commfail)(vector #f "communications fail")) + ((exn)(vector #f "other fail" (print-call-chain))))) (else (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported") (exit)))) (success (if (vector? dat) (vector-ref dat 0) #f)) (res (if (vector? dat) (vector-ref dat 1) #f))) (if (vector? conninfo)(http-transport:server-dat-update-last-access conninfo)) ;; refresh access time (print "case 9. conninfo=" conninfo " dat=" dat) (if success (case (remote-transport *runremote*) - ((http) res) + ((http rpc) res) (else (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " is unknown") (exit 1))) (begin (debug:print 0 *default-log-port* "WARNING: communication failed. Trying again, try num: " attemptnum) @@ -283,11 +288,18 @@ (define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params) (let* ((run-id (if run-id run-id 0)) (res (handle-exceptions exn #f - (http-transport:client-api-send-receive run-id connection-info cmd params)))) + (case (remote-transport *runremote*) + ((http) (http-transport:client-api-send-receive run-id connection-info cmd params)) + ((rpc) (rpc-transport:client-api-send-receive run-id connection-info cmd params)) + (else + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (2)") + (exit)) + + ))) (if (and res (vector-ref res 0)) (vector-ref res 1) ;;; YES!! THIS IS CORRECT!! CHANGE IT HERE, THEN CHANGE rmt:send-receive ALSO!!! #f))) ;; ;; Wrap json library for strings (why the ports crap in the first place?) @@ -327,11 +339,16 @@ ;; This login does no retries under the hood - it acts a bit like a ping. ;; Deprecated for nmsg-transport. ;; (define (rmt:login-no-auto-client-setup connection-info) (case *transport-type* ;; run-id of 0 is just a placeholder - ((http)(rmt:send-receive-no-auto-client-setup connection-info 'login 0 (list *toppath* megatest-version *my-client-signature*))) + ((http rpc)(rmt:send-receive-no-auto-client-setup connection-info 'login 0 (list *toppath* megatest-version *my-client-signature*))) + (else + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (3)") + (exit)) + + ;;((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info 'login (list *toppath* megatest-version run-id *my-client-signature*))) )) ;; hand off a call to one of the db:queries statements ;; added run-id to make looking up the correct db possible Index: rpc-transport.scm ================================================================== --- rpc-transport.scm +++ rpc-transport.scm @@ -151,19 +151,23 @@ (define (rpc-transport:server-shutdown server-id rpc:listener #!key (from-on-exit #f)) (on-exit (lambda () #t)) ;; turn off on-exit stuff ;;(tcp-close rpc:listener) ;; gotta exit nicely - ;;(tasks:bb-server-set-state! server-id "stopped") + ;;(tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "stopped") ;; TODO: (low) the following is extraordinaritly slow. Maybe we don't even need portlogger for rpc anyway?? the exception-based failover when ports are taken is fast! ;;(portlogger:open-run-close portlogger:set-port (rpc:default-server-port) "released") (set! *time-to-exit* #t) (if *inmemdb* (db:sync-touched *inmemdb* *run-id* force-sync: #t)) - (tasks:bb-server-delete-record server-id " rpc-transport:keep-running complete") + + + (tasks:server-delete-record (db:delay-if-busy (tasks:open-db)) server-id " rpc-transport:keep-running complete") + + ;;(BB> "Before (exit) (from-on-exit="from-on-exit")") (unless from-on-exit (exit)) ;; sometimes we hang (around) here with 100% cpu. ;;(BB> "After") ;; strace reveals endless: ;; getrusage(RUSAGE_SELF, {ru_utime={413, 917868}, ru_stime={0, 60003}, ...}) = 0 @@ -204,17 +208,17 @@ (exit 0)) ;; let's get a server-id for this server ;; if at first we do not suceed, try 3 more times. (let ((server-id (retry-thunk - (lambda () (tasks:bb-server-lock-slot run-id 'rpc)) + (lambda () (tasks:server-lock-slot (db:delay-if-busy (tasks:open-db)) run-id 'rpc)) chatty: #f retries: 4))) (when (not server-id) ;; dang we couldn't get a server-id. ;; since we didn't get the server lock we are going to clean up and bail out (debug:print-info 2 *default-log-port* "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") - (tasks:bb-server-delete-records-for-this-pid " rpc-transport:launch") + (tasks:server-delete-records-for-this-pid (db:delay-if-busy (tasks:open-db)) " rpc-transport:launch") (exit 1)) ;; we got a server-id (and a corresponding entry in servers table in globally shared mdb) ;; all systems go. Proceed to setup rpc server. (rpc-transport:run @@ -393,11 +397,11 @@ ;; (when (not (equal? start-port portnum)) ;; (BB> "portlogger proffered "start-port" but rpc grabbed "portnum) ;; (portlogger:open-run-close portlogger:set-port start-port "released") ;; (portlogger:open-run-close portlogger:take-port portnum)) - (tasks:bb-server-set-interface-port server-id ipaddrstr portnum) + (tasks:server-set-interface-port (db:delay-if-busy (tasks:open-db)) server-id ipaddrstr portnum) ;;============================================================ ;; activate thread th1 to attach opened tcp port to rpc server ;;============================================================= (thread-start! th1) @@ -416,23 +420,23 @@ (on-exit (lambda () (rpc-transport:server-shutdown server-id rpc:listener from-on-exit: #t))) ;; check again for running servers for this run-id in case one has snuck in since we checked last in rpc-transport:launch - (if (not (equal? server-id (tasks:bb-server-am-i-the-server? run-id)));; try to ensure no double registering of servers + (if (not (equal? server-id (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id)));; try to ensure no double registering of servers (begin ;; i am not the server, another server snuck in and beat this one to the punch (tcp-close rpc:listener) ;; gotta exit nicely and free up that tcp port - (tasks:bb-server-set-state! server-id "collision")) + (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "collision")) (begin ;; i am the server ;; setup the in-memory db (set! *inmemdb* (db:setup run-id)) (db:get-db *inmemdb* run-id) ;; let's make it official (set! *rpc:listener* rpc:listener) - (tasks:bb-server-set-state! server-id "running") ;; update our mdb servers entry + (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running") ;; update our mdb servers entry ;; this let loop will hold open this thread until we want the server to shut down. ;; if no requests received within the last 20 seconds : @@ -510,12 +514,12 @@ (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access))) ;; ;; Consider implementing some smarts here to re-insert the record or kill self is ;; the db indicates so ;; - (if (tasks:bb-server-am-i-the-server? run-id) - (tasks:bb-server-set-state! server-id "running")) + (if (tasks:server-am-i-the-server? (db:delay-if-busy (tasks:open-db)) run-id) + (tasks:server-set-state! (db:delay-if-busy (tasks:open-db)) server-id "running")) ;; (loop 0 bad-sync-count)) (begin ;;(BB> "SERVER SHUTDOWN CALLED! last-access="last-access" current-seconds="(current-seconds)" server-timeout="server-timeout) (rpc-transport:server-shutdown server-id rpc:listener))))) @@ -587,13 +591,13 @@ (rmt:set-cinfo run-id runremote-server-dat) ;; (hash-table-set! *runremote* run-id runremote-server-dat) ;; side-effect - *runremote* cache init fpr rmt:* runremote-server-dat) (begin ;; login failed but have a server record, clean out the record and try again (debug:print-info 0 *default-log-port* "rpc-transport:client-setup UNABLE TO CONNECT run-id="run-id" server-dat=" server-dat) (tasks:kill-server-run-id run-id) - (tasks:bb-server-force-clean-run-record run-id iface port + (tasks:server-force-clean-run-record (db:delay-if-busy (tasks:open-db)) run-id iface port " rpc-transport:client-setup (server-dat = #t)") (if (> remtries 2) (thread-sleep! (+ 1 (random 5))) ;; spread out the starts a little (thread-sleep! (+ 15 (random 20)))) ;; it isn't going well. give it plenty of time (server:try-running run-id) (thread-sleep! 5) ;; give server a little time to start up (client:setup run-id remaining-tries: (sub1 remtries)))))) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -226,11 +226,11 @@ ;; (define (server:check-if-running areapath) (let* ((dotserver (server:read-dotserver areapath))) ;; tdbdat (tasks:open-db))) (if dotserver (let* ((res (case *transport-type* - ((http)(server:ping-server dotserver)) + ((http rpc)(server:ping-server dotserver)) ;; ((nmsg)(nmsg-transport:ping (tasks:hostinfo-get-interface server) ))) (if res dotserver #f)) @@ -259,11 +259,17 @@ (begin (debug:print 0 *default-log-port* "ERROR: bad host:port") (if do-exit (exit 1))) (let* ((iface (car host-port)) (port (cadr host-port)) - (server-dat (http-transport:client-connect iface port)) + (server-dat + (case (remote-transport *runremote*) + ((http) (http-transport:client-connect iface port)) + ((rpc) (rpc-transport:client-connect iface port)) + (else + (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport *runremote*) " not supported (4)") + (exit)))) (login-res (rmt:login-no-auto-client-setup server-dat))) (if (and (list? login-res) (car login-res)) (begin (print "LOGIN_OK")