Index: common.scm ================================================================== --- common.scm +++ common.scm @@ -50,11 +50,11 @@ ;; DATABASE (define *open-dbs* (vector #f (make-hash-table))) ;; megatestdb run-id-dbs ;; SERVER (define *my-client-signature* #f) -(define *transport-type* 'http) +(define *transport-type* #f) (define *megatest-db* #f) (define *rpc:listener* #f) ;; if set up for server communication this will hold the tcp port (define *runremote* (make-hash-table)) ;; if set up for server communication this will hold (define *last-db-access* (current-seconds)) ;; update when db is accessed via server (define *max-cache-size* 0) Index: http-transport.scm ================================================================== --- http-transport.scm +++ http-transport.scm @@ -91,31 +91,10 @@ (send-response body: (api:process-request db $) ;; the $ is the request vars proc headers: '((content-type text/plain))) (mutex-lock! *heartbeat-mutex*) (set! *last-db-access* (current-seconds)) (mutex-unlock! *heartbeat-mutex*)) - ;; This is the /ctrl path where data is handed to the server and - ;; responses - ((equal? (uri-path (request-uri (current-request))) - '(/ "ctrl")) - (let* ((packet (db:string->obj dat)) - (qtype (cdb:packet-get-qtype packet))) - (debug:print-info 12 "server=> received packet=" packet) - (if (not (member qtype '(sync ping))) - (begin - (mutex-lock! *heartbeat-mutex*) - (set! *last-db-access* (current-seconds)) - (mutex-unlock! *heartbeat-mutex*))) - ;; (mutex-lock! *db:process-queue-mutex*) ;; trying a mutex - ;; (set! res (open-run-close db:process-queue-item open-db packet)) - (set! res (db:process-queue-item db packet)) - ;; (mutex-unlock! *db:process-queue-mutex*) - (debug:print-info 11 "Return value from db:process-queue-item is " res) - (send-response body: (conc "ctrl data\n" - res - "") - headers: '((content-type text/plain))))) ((equal? (uri-path (request-uri (current-request))) '(/ "")) (send-response body: (http-transport:main-page))) ((equal? (uri-path (request-uri (current-request))) '(/ "runs")) @@ -329,10 +308,12 @@ ;; (* 60 1) ;; default to one minute (* 60 60 25) ;; default to 25 hours )))) (let loop ((count 0) (server-state 'available)) + + ;; Use this opportunity to sync the inmemdb to db (let ((start-time (current-milliseconds)) (sync-time #f) (rem-time #f)) Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -25,10 +25,11 @@ (declare (uses client)) (declare (uses tests)) (declare (uses genexample)) (declare (uses daemon)) (declare (uses db)) + (declare (uses tdb)) (declare (uses mt)) (declare (uses api)) (declare (uses tasks)) ;; only used for debugging. Index: rpc-transport.scm ================================================================== --- rpc-transport.scm +++ rpc-transport.scm @@ -66,23 +66,23 @@ (args:get-arg "-server") "-") run-id server-id)) "Server run")) (th3 (make-thread (lambda () - (rpc-transport:keep-running server-id)) + (rpc-transport:keep-running run-id server-id)) "Keep running"))) ;; Database connection (set! *inmemdb* (db:setup run-id)) (thread-start! th2) (thread-start! th3) (set! *didsomething* #t) - (thread-join! th2) + (thread-join! th3) (exit))))) -(define (rpc-transport:run db hostn run-id) - (debug:print 2 "Attempting to start the server ...") - (let* ((db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily +(define (rpc-transport:run hostn run-id server-id) + (debug:print 2 "Attempting to start the rpc server ...") + (let* ((db #f) (hostname (get-host-name)) (ipaddrstr (let ((ipstr (if (string=? "-" hostn) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") (server:get-best-guess-address hostname) #f))) @@ -91,22 +91,25 @@ (link-tree-path (configf:lookup *configdat* "setup" "linktree")) (rpc:listener (rpc-transport:find-free-port-and-open (rpc:default-server-port))) (th1 (make-thread (cute (rpc:make-server rpc:listener) "rpc:server") 'rpc:server)) - ;; (th2 (make-thread (lambda ()(db:updater)))) (hostname (if (string=? "-" hostn) (get-host-name) hostn)) (ipaddrstr (if (string=? "-" hostn) - (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + (server:get-best-guess-address hostname) ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") #f)) - (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" (rpc:default-server-port)))) + (portnum (rpc:default-server-port)) + (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" portnum)) + (tdb (tasks:open-db))) (set! db *inmemdb*) + (open-run-close tasks:server-set-interface-port + tasks:open-db + server-id + ipaddrstr portnum) (debug:print 0 "Server started on " host:port) - (db:set-var db "SERVER" host:port) - (set! *cache-on* #t) ;; can use this to run most anything at the remote (rpc:publish-procedure! 'remote:run (lambda (procstr . params) @@ -161,60 +164,43 @@ ;; 'cdb:flush-queue ;; (lambda () ;; (debug:print-info 12 "Remote call of cdb:flush-queue") ;; (cdb:flush-queue))) ;; + ;;====================================================================== ;; ;; end of publish-procedure section ;;====================================================================== ;; - (set! *rpc:listener* rpc:listener) (on-exit (lambda () - (open-run-close - (lambda (db . params) - (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER' and val=?;" host:port)) - #f ;; for db - #f) ;; for a param - (let loop ((n 0)) - (let ((queue-len 0)) - (thread-sleep! (random 5)) - (mutex-lock! *incoming-mutex*) - (set! queue-len (length *incoming-data*)) - (mutex-unlock! *incoming-mutex*) - (if (> queue-len 0) - (begin - (debug:print-info 0 "Queue not flushed, waiting ...") - (loop (+ n 1))))) - ))) - (db:updater) + (open-run-close tasks:server-set-state! tasks:open-db server-id "stopped"))) + (thread-start! th1) - ;; (debug:print 0 "Server started on port " (rpc:default-server-port) "...") - ;; (thread-start! th2) - ;; (thread-join! th2) - ;; return th2 for the calling process to do a join with + + (set! *rpc:listener* rpc:listener) + (tasks:server-set-state! tdb server-id "running") + ; (sqlite3:finalize! tdb) th1 )) ;; rpc:server))) -(define (rpc-transport:keep-running db host:port) +(define (rpc-transport:keep-running run-id server-id) ;; if none running or if > 20 seconds since ;; server last used then start shutdown - (let loop ((count 0)) - (thread-sleep! 20) ;; no need to do this very often - (let ((numrunning (db:get-count-tests-running db))) + (let loop ((count 0)) + (thread-sleep! 5) ;; no need to do this very often + (let ((numrunning -1)) ;; (db:get-count-tests-running db))) (if (or (> numrunning 0) (> (+ *last-db-access* 60)(current-seconds))) - (begin + (begin (debug:print-info 0 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*)) (loop (+ 1 count))) - (begin + (begin (debug:print-info 0 "Starting to shutdown the server side") - ;; need to delete only *my* server entry (future use) - (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER' AND val like ?;" host:port) + (open-run-close tasks:server-force-clean-run-record tasks:open-db run-id ipaddrstr portnum " rpc-transport:try-start-server stop") (thread-sleep! 10) - (debug:print-info 0 "Max cached queries was " *max-cache-size*) - (debug:print-info 0 "Server shutdown complete. Exiting") - ;; (exit))) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") ))))) (define (rpc-transport:find-free-port-and-open port) (handle-exceptions exn Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -59,43 +59,45 @@ ;; S E R V E R U T I L I T I E S ;;====================================================================== ;; Get the transport (define (server:get-transport) - (if * - (string->symbol - (or (args:get-arg "-transport") - (configf:lookup *configdat* "server" "transport") - "rpc"))) - + (if *transport-type* + *transport-type* + (let ((ttype (string->symbol + (or (args:get-arg "-transport") + (configf:lookup *configdat* "server" "transport") + "rpc")))) + (set! *transport-type* ttype) + ttype))) + ;; Generate a unique signature for this server (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () (write (list (current-directory) (argv))))))) - ;; When using zmq this would send the message back (two step process) ;; with spiffy or rpc this simply returns the return data to be returned ;; (define (server:reply return-addr query-sig success/fail result) (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) ;; (send-message pubsock target send-more: #t) ;; (send-message pubsock (case (server:get-transport) - ((fs) result) - ((http)(db:obj->string (vector success/fail query-sig result))) + ((rpc) (db:obj->string (vector success/fail query-sig result))) + ((http) (db:obj->string (vector success/fail query-sig result))) ((zmq) (let ((pub-socket (vector-ref *runremote* 1))) (send-message pub-socket return-addr send-more: #t) (send-message pub-socket (db:obj->string (vector success/fail query-sig result))))) + ((fs) result) (else (debug:print 0 "ERROR: unrecognised transport type: " *transport-type*) result))) - (db:obj->string (vector success/fail query-sig result))) ;; Given a run id start a server process ### NOTE ### > file 2>&1 ;; if the run-id is zero and the target-host is set ;; try running on that host ;; Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -105,19 +105,19 @@ (define (tasks:server-set-available mdb run-id) (sqlite3:execute mdb "INSERT INTO servers (pid,hostname,port,pubport,start_time, priority,state,mt_version,heartbeat, interface,transport,run_id) VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?,-1,?, ?, ?);" - (current-process-id) ;; pid - (get-host-name) ;; hostname - -1 ;; port - -1 ;; pubport - (random 1000) ;; priority (used a tiebreaker on get-available) - "available" ;; state - (common:version-signature) ;; mt_version - -1 ;; interface - "http" ;; transport + (current-process-id) ;; pid + (get-host-name) ;; hostname + -1 ;; port + -1 ;; pubport + (random 1000) ;; priority (used a tiebreaker on get-available) + "available" ;; state + (common:version-signature) ;; mt_version + -1 ;; interface + (conc (server:get-transport)) ;; transport run-id )) (define (tasks:num-in-available-state mdb run-id) (let ((res 0))