@@ -20,12 +20,13 @@ (declare (uses common)) (declare (uses db)) (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (declare (uses synchash)) (declare (uses http-transport)) +(declare (uses rpc-transport)) +(declare (uses nmsg-transport)) (declare (uses launch)) -;; (declare (uses zmq-transport)) (declare (uses daemon)) (include "common_records.scm") (include "db_records.scm") @@ -47,37 +48,59 @@ ;; all routes though here end in exit ... ;; ;; start_server ;; (define (server:launch run-id) - (http-transport:launch run-id)) - -;;====================================================================== -;; Q U E U E M A N A G E M E N T -;;====================================================================== - -;; We don't want to flush the queue if it was just flushed -(define *server:last-write-flush* (current-milliseconds)) + (case *transport-type* + ((http)(http-transport:launch run-id)) + ((nmsg)(nmsg-transport:launch run-id)) + ((rpc) (rpc-transport:launch run-id)) + (else (debug:print 0 "ERROR: unknown server type " *transport-type*)))) +;; (else (debug:print 0 "ERROR: No known transport set, transport=" transport ", using rpc") +;; (rpc-transport:launch run-id))))) ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== +;; Get the transport +(define (server:get-transport) + (if *transport-type* + *transport-type* + (let ((ttype (string->symbol + (or (args:get-arg "-transport") + (configf:lookup *configdat* "server" "transport") + "rpc")))) + (set! *transport-type* ttype) + ttype))) + ;; Generate a unique signature for this server (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) (argv))))))) - ;; When using zmq this would send the message back (two step process) ;; with spiffy or rpc this simply returns the return data to be returned ;; (define (server:reply return-addr query-sig success/fail result) - (db:obj->string (vector success/fail query-sig result))) + (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) + ;; (send-message pubsock target send-more: #t) + ;; (send-message pubsock + (case (server:get-transport) + ((rpc) (db:obj->string (vector success/fail query-sig result))) + ((http) (db:obj->string (vector success/fail query-sig result))) + ((zmq) + (let ((pub-socket (vector-ref *runremote* 1))) + (send-message pub-socket return-addr send-more: #t) + (send-message pub-socket (db:obj->string (vector success/fail query-sig result))))) + ((fs) result) + (else + (debug:print 0 "ERROR: unrecognised transport type: " *transport-type*) + result))) ;; Given a run id start a server process ### NOTE ### > file 2>&1 ;; if the run-id is zero and the target-host is set ;; try running on that host ;; @@ -108,10 +131,16 @@ (system (conc "nbfake " cmdln)) (unsetenv "TARGETHOST_LOGF") (if (get-environment-variable "TARGETHOST")(unsetenv "TARGETHOST")) ;; (system cmdln) (pop-directory))) + +(define (server:get-client-signature) + (if *my-client-signature* *my-client-signature* + (let ((sig (server:mk-signature))) + (set! *my-client-signature* sig) + *my-client-signature*))) ;; kind start up of servers, wait 40 seconds before allowing another server for a given ;; run-id to be launched (define (server:kind-run run-id) (let ((last-run-time (hash-table-ref/default *server-kind-run* run-id #f))) @@ -136,13 +165,17 @@ ;; note: client:start will set *runremote*. this needs to be changed ;; also, client:start will login to the server, also need to change that. ;; ;; client:start returns #t if login was successful. ;; - (let ((res (server:ping-server run-id - (tasks:hostinfo-get-interface server) - (tasks:hostinfo-get-port server)))) + (let ((res (case *transport-type* + ((http)(server:ping-server run-id + (tasks:hostinfo-get-interface server) + (tasks:hostinfo-get-port server))) + ((nmsg)(nmsg-transport:ping (tasks:hostinfo-get-interface server) + (tasks:hostinfo-get-port server) + timeout: 2))))) ;; if the server didn't respond we must remove the record (if res #t (begin (debug:print-info 0 "server at " server " not responding, removing record") @@ -196,5 +229,27 @@ (case (string->symbol res) ((NOREPLY) #f) ((LOGIN_OK) #t) (else #f)) (loop (read-line) inl)))))) + +(define (server:login toppath) + (lambda (toppath) + (set! *last-db-access* (current-seconds)) + (if (equal? *toppath* toppath) + (begin + ;; (debug:print-info 2 "login successful") + #t) + (begin + ;; (debug:print-info 2 "login failed") + #f)))) + +(define (server:get-timeout) + (let ((tmo (configf:lookup *configdat* "server" "timeout"))) + (if (and (string? tmo) + (string->number tmo)) + (* 60 60 (string->number tmo)) + ;; (* 3 24 60 60) ;; default to three days + (* 60 1) ;; default to one minute + ;; (* 60 60 25) ;; default to 25 hours + ))) +