Index: api.scm ================================================================== --- api.scm +++ api.scm @@ -51,11 +51,13 @@ ;; These are called by the server on recipt of /api calls ;; - keep it simple, only return the actual result of the call, i.e. no meta info here ;; (define (api:execute-requests dbstruct cmd params) (let ((res - (case (string->symbol cmd) + (case (if (symbol? cmd) + cmd + (string->symbol cmd)) ;; SERVERS ((start-server) (apply server:kind-run params)) ((kill-server) (set! *server-run* #f)) ;; KEYS Index: client.scm ================================================================== --- client.scm +++ client.scm @@ -61,26 +61,30 @@ (if (<= remaining-tries 0) (begin (debug:print 0 "ERROR: failed to start or connect to server for run-id " run-id) (exit 1)) (let ((host-info (hash-table-ref/default *runremote* run-id #f))) - (if host-info + (if host-info ;; this is a bit circular. the host-info *is* the start-res FIXME (let* ((iface (http-transport:server-dat-get-iface host-info)) (port (http-transport:server-dat-get-port host-info)) - (start-res (http-transport:client-connect iface port)) - (ping-res (rmt:login-no-auto-client-setup start-res run-id))) + (start-res (case *transport-type* + ((http)(http-transport:client-connect iface port)) + ((nmsg) host-info) ;; (http-transport:server-dat-get-socket host-info)) + (else #f))) + (ping-res (case *transport-type* + ((http)(rmt:login-no-auto-client-setup start-res run-id)) + ((nmsg)(nmsg-transport:ping iface port timeout: 2 socket: )) + (else #f)))) (if ping-res ;; sucessful login? (begin (debug:print-info 2 "client:setup, ping is good using host-info=" host-info ", remaining-tries=" remaining-tries) - ;; Why add the close-connections here? - ;; (http-transport:close-connections run-id) - (hash-table-set! *runremote* run-id start-res) start-res) ;; return the server info ;; have host info but no ping. shutdown the current connection and try again (begin ;; login failed (debug:print-info 1 "client:setup, ping is bad for start-res=" start-res " and *runremote*=" host-info) - (http-transport:close-connections run-id) + (case *transport-type* + ((http)(http-transport:close-connections run-id))) (hash-table-delete! *runremote* run-id) (if (< remaining-tries 8) (thread-sleep! 5) (thread-sleep! 1)) (client:setup run-id remaining-tries: (- remaining-tries 1))))) @@ -87,22 +91,28 @@ ;; YUK: rename server-dat here (let* ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id))) (debug:print-info 4 "client:setup server-dat=" server-dat ", remaining-tries=" remaining-tries) (if server-dat (let* ((iface (tasks:hostinfo-get-interface server-dat)) + (hostname (tasks:hostinfo-get-hostname server-dat)) (port (tasks:hostinfo-get-port server-dat)) - (start-res (http-transport:client-connect iface port)) - (ping-res (rmt:login-no-auto-client-setup start-res run-id))) + (start-res (case *transport-type* + ((http)(http-transport:client-connect iface port)) + ((nmsg)(nmsg-transport:client-connect hostname port)))) + (ping-res (case *transport-type* + ((http)(rmt:login-no-auto-client-setup start-res run-id)) + ((nmsg)(http-transport:server-dat-get-socket start-res))))) ;; socket is the result of a ping (if (and start-res ping-res) (begin (hash-table-set! *runremote* run-id start-res) (debug:print-info 2 "connected to " (http-transport:server-dat-make-url start-res)) start-res) (begin ;; login failed but have a server record, clean out the record and try again (debug:print-info 0 "client:setup, login failed, will attempt to start server ... start-res=" start-res ", run-id=" run-id ", server-dat=" server-dat) - (http-transport:close-connections run-id) + (case *transport-type* + ((http)(http-transport:close-connections run-id))) (hash-table-delete! *runremote* run-id) (tasks:server-force-clean-run-record (db:delay-if-busy tdbdat) run-id (tasks:hostinfo-get-interface server-dat) (tasks:hostinfo-get-port server-dat) Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -65,11 +65,11 @@ (define *db-access-allowed* #t) ;; flag to allow access (define *db-access-mutex* (make-mutex)) ;; SERVER (define *my-client-signature* #f) -(define *transport-type* 'nm) +(define *transport-type* 'nmsg) (define *runremote* (make-hash-table)) ;; if set up for server communication this will hold (define *max-cache-size* 0) (define *logged-in-clients* (make-hash-table)) (define *client-non-blocking-mode* #f) (define *server-id* #f) Index: http-transport.scm ================================================================== --- http-transport.scm +++ http-transport.scm @@ -318,17 +318,18 @@ (close-connection! api-dat) #t) #f))) -(define (make-http-transport:server-dat)(make-vector 5)) +(define (make-http-transport:server-dat)(make-vector 6)) (define (http-transport:server-dat-get-iface vec) (vector-ref vec 0)) (define (http-transport:server-dat-get-port vec) (vector-ref vec 1)) (define (http-transport:server-dat-get-api-uri vec) (vector-ref vec 2)) (define (http-transport:server-dat-get-api-url vec) (vector-ref vec 3)) (define (http-transport:server-dat-get-api-req vec) (vector-ref vec 4)) (define (http-transport:server-dat-get-last-access vec) (vector-ref vec 5)) +(define (http-transport:server-dat-get-socket vec) (vector-ref vec 6)) (define (http-transport:server-dat-make-url vec) (if (and (http-transport:server-dat-get-iface vec) (http-transport:server-dat-get-port vec)) (conc "http://" Index: nmsg-transport.scm ================================================================== --- nmsg-transport.scm +++ nmsg-transport.scm @@ -70,11 +70,12 @@ (nmsg-transport:try-start-server dbstruct run-id start-port server-id)) "server thread")) (tdbdat (tasks:open-db))) (thread-start! server-thread) (if (nmsg-transport:ping hostn start-port timeout: 2 expected-key: (current-process-id)) - (begin + (let ((interface (if (equal? hostn "-")(get-host-name) hostn))) + (tasks:server-set-interface-port (db:delay-if-busy tdbdat) server-id interface start-port) (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep") (set! *server-info* (list hostn start-port)) ;; probably not needed anymore? currently used by keep-running (thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access (set! *inmemdb* dbstruct) (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running") @@ -142,12 +143,13 @@ (exit 0)))) (begin ;; since we didn't get the server lock we are going to clean up and bail out (debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue") (tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch") - ))) - (nmsg-transport:run dbstruct hostn run-id server-id) + )) + ;; locked in a server id, try to start up + (nmsg-transport:run dbstruct hostn run-id server-id)) (set! *didsomething* #t) (exit)))) ;;====================================================================== ;; S E R V E R U T I L I T I E S @@ -167,13 +169,13 @@ ;; ping the server at host:port ;; return the open socket if successful (return-socket == #t) ;; expect the key expected-key returned in payload ;; send our-key or #f as payload ;; -(define (nmsg-transport:ping hostn port #!key (timeout 3)(return-socket #t)(expected-key #f)(our-key #f)) +(define (nmsg-transport:ping hostn port #!key (timeout 3)(return-socket #t)(expected-key #f)(our-key #f)(socket #f)) ;; send a random number along with pid and check that we get it back - (let* ((req (nn-socket 'req)) + (let* ((req (or socket (nn-socket 'req))) (host (if (or (not hostn) (equal? hostn "-")) ;; use localhost (get-host-name) hostn)) (success #f) @@ -203,11 +205,11 @@ (if keepwaiting (begin (print "timeout waiting for ping") (thread-terminate! ping)))) "timeout"))) - (nn-connect req (conc "tcp://" host ":" port)) + (if (not socket)(nn-connect req (conc "tcp://" host ":" port))) (handle-exceptions exn (begin ;; (print-call-chain) ;; (print 0 " message: " ((condition-property-accessor 'exn 'message) exn)) @@ -218,30 +220,13 @@ (thread-join! ping) (if success (thread-terminate! timeout))) (if return-socket (if success req #f) (begin - (nn-close req) + (nn-close req) ;; should it be closed if we were handed a socket? success)))) -(define (nmsg-transport:client-connect iface portnum) - (let* ((reqsoc (nmsg-transport:ping iface portnum)) - (login-res #f)) - (nn-connect reqsoc (conc "tcp://" iface ":" portnum)) - (debug:print-info 11 "nmsg-transport:client-connect started. Next is login") - (set! login-res (client:login serverdat nmsg-sockets)) - (if (and (not (null? login-res)) - (car login-res)) - (begin - (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".") - (set! *nm-port* nmsg-sockets) - nmsg-sockets) - (begin - (debug:print-info 2 "Failed to login or connect to " conurl) - (set! *runremote* #f) - #f)))) - ;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (nmsg-transport:keep-running server-id) ;; if none running or if > 20 seconds since @@ -292,11 +277,30 @@ (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " http-transport:keep-running") (debug:print-info 0 "Server shutdown complete. Exiting") ;; (exit) )))))) +;;====================================================================== +;; C L I E N T S +;;====================================================================== + +(define (nmsg-transport:client-connect iface portnum) + (let* ((reqsoc (nmsg-transport:ping iface portnum return-socket: #t))) + (vector iface portnum #f #f #f (current-seconds) reqsoc))) + +(define (nmsg-transport:client-api-send-receive run-id connection-info cmd param) + (let ((packet (vector cmd param)) + (reqsoc (http-transport:server-dat-get-socket connection-info))) + (nn-send reqsoc (db:obj->string packet transport: 'nmsg)) + (db:string->obj (nn-recv reqsoc) transport: 'nmsg))) + +;;====================================================================== +;; J U N K +;;====================================================================== +;; DO NOT USE +;; (define (nmsg-transport:client-signal-handler signum) (handle-exceptions exn (debug:print " ... exiting ...") (let ((th1 (make-thread (lambda () Index: rmt.scm ================================================================== --- rmt.scm +++ rmt.scm @@ -82,21 +82,24 @@ (hash-table-keys *runremote*))) (mutex-unlock! *db-multi-sync-mutex*) (let* ((run-id (if rid rid 0)) (connection-info (rmt:get-connection-info run-id)) (jparams (db:obj->string params))) + ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also) (if connection-info ;; use the server if have connection info (let* ((dat (case *transport-type* ((http)(http-transport:client-api-send-receive run-id connection-info cmd jparams)) - ((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info cmd jparams)) + ((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info cmd params)) (else (exit)))) (res (if (vector? dat) (vector-ref dat 1) #f)) (success (if (vector? dat) (vector-ref dat 0) #f))) (http-transport:server-dat-update-last-access connection-info) (if success - (db:string->obj res) + (case *transport-type* + ((http)(db:string->obj res)) + ((nmsg) res)) (begin ;; let ((new-connection-info (client:setup run-id))) (debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.") (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection ;; no longer killing the server in http-transport:client-api-send-receive Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -50,11 +50,11 @@ ;; start_server ;; (define (server:launch run-id) (case *transport-type* ((http)(http-transport:launch run-id)) - ((nm) (nmsg-transport:launch run-id)) + ((nmsg)(nmsg-transport:launch run-id)) (else (debug:print 0 "ERROR: unknown server type " *transport-type*)))) ;;====================================================================== ;; Q U E U E M A N A G E M E N T ;;====================================================================== Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -185,11 +185,11 @@ -1 ;; pubport (random 1000) ;; priority (used a tiebreaker on get-available) "available" ;; state (common:version-signature) ;; mt_version -1 ;; interface - "http" ;; transport + (conc *transport-type*) ;; transport run-id )) (define (tasks:num-in-available-state mdb run-id) (let ((res 0))