Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -1092,17 +1092,68 @@ ;;====================================================================== ;; QUEUE UP META, TEST STATUS AND STEPS ;;====================================================================== +;; db:updater is run in a thread to write out the cached data periodically (define (db:updater) (debug:print-info 4 "Starting cache processing") (let loop ((start-time (current-time))) (thread-sleep! 10) ;; move save time around to minimize regular collisions? (db:write-cached-data) (loop start-time))) +;; cdb:cached-access is called by the server loop to dispatch commands or queue up +;; db accesses +;; +;; params := qry-name cached? val1 val2 val3 ... +(define (cdb:cached-access params) + (if (< (length params) 2) + "ERROR" + (let ((qry-name (car params)) + (cached? (cadr params)) + (remparam (list-tail params 2))) + (debug:print-info 12 "cdb:cached-access qry-name=" qry-name " params=" params) + ;; Any special calls are dispatched here. + ;; Remainder are put in the db queue + (case qry-name + ((login) ;; login checks that the megatest path matches + (if (null? remparam) + #f ;; no path - fail! + (let ((calling-path (car remparam))) + (if (equal? calling-path *toppath*) + #t ;; path matches - pass! Should vet the caller at this time ... + #f)))) ;; else fail to login + (else + (mutex-lock! *incoming-mutex*) + (set! *last-db-access* (current-seconds)) + (set! *incoming-data* (cons + (vector qry-name + (current-milliseconds) + params) + *incoming-data*)) + (mutex-unlock! *incoming-mutex*) + ;; NOTE: if cached? is #f then this call must be run immediately + ;; but first all calls in the queue are run first in the order + ;; of their time stamp + (if (and cached? *cache-on*) + (begin + (debug:print-info 12 "*cache-on* is " *cache-on* ", skipping cache write") + "CACHED") + (begin + (db:write-cached-data) + "WRITTEN"))))))) + +(define (cdb:client-call zmq-socket . params) + (debug:print-info 11 "zmq-socket " params) + (let ((zdat (with-output-to-string (lambda ()(serialize params)))) + (res #f)) + (send-message zmq-socket zdat) + (set! res (receive-message zdat)) + (debug:print-info 11 "zmq-socket " (car params) " res=" res) + res)) + (define (cdb:test-set-status-state test-id status state msg) (debug:print-info 4 "cdb:test-set-status-state test-id=" test-id ", status=" status ", state=" state ", msg=" msg) (mutex-lock! *incoming-mutex*) (set! *last-db-access* (current-seconds)) (if msg @@ -1117,51 +1168,21 @@ (mutex-unlock! *incoming-mutex*) (if *cache-on* (debug:print-info 6 "*cache-on* is " *cache-on* ", skipping cache write") (db:write-cached-data))) -(define (cdb:test-rollup-test_data-pass-fail test-id) - (debug:print-info 4 "Adding " test-id " for test_data rollup to the queue") - (mutex-lock! *incoming-mutex*) - (set! *last-db-access* (current-seconds)) - (set! *incoming-data* (cons (vector 'test_data-pf-rollup - (current-milliseconds) - (list test-id test-id test-id test-id)) - *incoming-data*)) - (mutex-unlock! *incoming-mutex*) - (if *cache-on* - (debug:print-info 6 "*cache-on* is " *cache-on* ", skipping cache write") - (db:write-cached-data))) - -(define (cdb:pass-fail-counts test-id fail-count pass-count) - (debug:print-info 4 "Adding " test-id " for setting pass/fail counts to the queue") - (mutex-lock! *incoming-mutex*) - (set! *last-db-access* (current-seconds)) - (set! *incoming-data* (cons (vector 'pass-fail-counts - (current-milliseconds) - (list fail-count pass-count test-id)) - *incoming-data*)) - (mutex-unlock! *incoming-mutex*) - (if *cache-on* - (debug:print-info 6 "*cache-on* is " *cache-on* ", skipping cache write") - (db:write-cached-data))) - -(define (cdb:tests-register-test db run-id test-name item-path #!key (force-write #f)) +(define (cdb:test-rollup-test_data-pass-fail zmqsocket test-id) + (cdb:client-call zmqsocket 'test_data-pf-rollup #t test-id test-id test-id)) + +(define (cdb:pass-fail-counts zmqsocket test-id fail-count pass-count) + (cdb:client-call zmqsocket 'pass-fail-counts fail-count pass-count test-id)) + +(define (cdb:tests-register-test zmqsocket db run-id test-name item-path) (let ((item-paths (if (equal? item-path "") (list item-path) (list item-path "")))) - (debug:print-info 4 "Adding " run-id ", " test-name "/" item-path " for setting pass/fail counts to the queue") - (mutex-lock! *incoming-mutex*) - (set! *last-db-access* (current-seconds)) - (set! *incoming-data* (cons (vector 'register-test - (current-milliseconds) - (list run-id test-name item-path)) ;; fail-count pass-count test-id)) - *incoming-data*)) - (mutex-unlock! *incoming-mutex*) - (if (and (not force-write) *cache-on*) - (debug:print-info 6 "*cache-on* is " *cache-on* ", skipping cache write") - (db:write-cached-data)))) + (cdb:client-call zmqsocket 'register-test run-id test-name item-path))) ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -6,11 +6,11 @@ ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the ;; implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ;; PURPOSE. -(require-extension (srfi 18) extras tcp rpc) +(require-extension (srfi 18) extras tcp rpc s11n) (import (prefix rpc rpc:)) (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo) (import (prefix sqlite3 sqlite3:)) @@ -21,134 +21,64 @@ (declare (uses tests)) (include "common_records.scm") (include "db_records.scm") -;; procstr is the name of the procedure to be called as a string -(define (server:autoremote procstr params) - (handle-exceptions - exn - (begin - (debug:print 1 "Remote failed for " proc " " params) - (apply (eval (string->symbol procstr)) params)) - ;; (if *runremote* - ;; (apply (eval (string->symbol (conc "remote:" procstr))) params) - (apply (eval (string->symbol procstr)) params))) - -(define (server:start db hostn) + (define a (with-output-to-string (lambda ()(serialize '(1 2 3 "Hello and goodbye" #t))))) + (define b (with-input-from-string a (lambda ()(deserialize)))) + + +(define (server:run hostn) (debug:print 0 "Attempting to start the server ...") - (let ((host:port (db:get-var db "SERVER"))) ;; do whe already have a server running? + (let ((host:port (open-run-close db:get-var db "SERVER"))) ;; do whe already have a server running? (if host:port - (set! *runremote* (let* ((lst (string-split host:port ":")) - (port (if (> (length lst) 1) - (string->number (cadr lst)) - #f))) - (if port (vector (car lst) port) #f))) - (let* ((rpc:listener (server:find-free-port-and-open (rpc:default-server-port))) - (th1 (make-thread - (cute (rpc:make-server rpc:listener) "rpc:server") - 'rpc:server)) - ;; (th2 (make-thread (lambda ()(db:updater)))) + (set! *runremote* host:port) + (let* ((zmq-socket #f) (hostname (if (string=? "-" hostn) (get-host-name) hostn)) - (ipaddrstr (if (string=? "-" hostn) - (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") - #f)) - (host:port (conc (if ipaddrstr ipaddrstr hostname) ":" (rpc:default-server-port)))) - (debug:print 0 "Server started on " host:port) - (db:set-var db "SERVER" host:port) + (ipaddrstr (let ((ipstr (if (string=? "-" hostn) + (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") + #f))) + (if ipstr ipstr hostname)))) + (set! zmq-socket (server:find-free-port-and-open ipaddrstr)) (set! *cache-on* #t) - ;; can use this to run most anything at the remote - (rpc:publish-procedure! - 'remote:run - (lambda (procstr . params) - (server:autoremote procstr params))) - - (rpc:publish-procedure! - 'server:login - (lambda (toppath) - (set! *last-db-access* (current-seconds)) - (if (equal? *toppath* toppath) - (begin - (debug:print-info 2 "login successful") - #t) - #f))) - - ;;====================================================================== - ;; db specials here - ;;====================================================================== - ;; remote call to open-run-close - (rpc:publish-procedure! - 'rdb:open-run-close - (lambda (procname . remargs) - (debug:print-info 12 "Remote call of rdb:open-run-close " procname " " remargs) - (set! *last-db-access* (current-seconds)) - (apply open-run-close (eval procname) remargs))) - - (rpc:publish-procedure! - 'cdb:test-set-status-state - (lambda (test-id status state msg) - (debug:print-info 12 "Remote call of cdb:test-set-status-state test-id=" test-id ", status=" status ", state=" state ", msg=" msg) - (cdb:test-set-status-state test-id status state msg))) - - (rpc:publish-procedure! - 'cdb:test-rollup-test_data-pass-fail - (lambda (test-id) - (debug:print-info 12 "Remote call of cdb:test-rollup-test_data-pass-fail " test-id) - (cdb:test-rollup-test_data-pass-fail test-id))) - - (rpc:publish-procedure! - 'cdb:pass-fail-counts - (lambda (test-id fail-count pass-count) - (debug:print-info 12 "Remote call of cdb:pass-fail-counts " test-id " passes: " pass-count " fails: " fail-count) - (cdb:pass-fail-counts test-id fail-count pass-count))) - - (rpc:publish-procedure! - 'cdb:tests-register-test - (lambda (db run-id test-name item-path) - (debug:print-info 12 "Remote call of cdb:tests-register-test " run-id " testname: " test-name " item-path: " item-path) - (cdb:tests-register-test db run-id test-name item-path))) - - (rpc:publish-procedure! - 'cdb:flush-queue - (lambda () - (debug:print-info 12 "Remote call of cdb:flush-queue") - (cdb:flush-queue))) - - ;;====================================================================== - ;; end of publish-procedure section - ;;====================================================================== - - (set! *rpc:listener* rpc:listener) + ;; what to do when we quit + ;; (on-exit (lambda () (open-run-close (lambda (db . params) - (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER' and val=?;" host:port)) + (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER';")) #f ;; for db #f) ;; for a param - (let loop ((n 0)) + (let loop () (let ((queue-len 0)) (thread-sleep! (random 5)) (mutex-lock! *incoming-mutex*) (set! queue-len (length *incoming-data*)) (mutex-unlock! *incoming-mutex*) (if (> queue-len 0) (begin (debug:print-info 0 "Queue not flushed, waiting ...") - (loop (+ n 1))))) - ))) - (db:updater) - (thread-start! th1) - ;; (debug:print 0 "Server started on port " (rpc:default-server-port) "...") - ;; (thread-start! th2) - ;; (thread-join! th2) - ;; return th2 for the calling process to do a join with - th1 - )))) ;; rpc:server))) - + (loop))))))) + + ;; The heavy lifting + ;; + (let loop () + (let* ((rawmsg (receive-message zmq-socket)) + (params (with-input-from-string rawmsg (lambda ()(deserialize)))) + (res #f)) + (debug:print-info 12 "server=> received msg=" msg) + (set! res (cdb:cached-access params)) + (debug:print-info 12 "server=> processed msg=" msg) + (send-message zmq-socket res) + (loop))))))) + +;; run server:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; (define (server:keep-running db host:port) ;; if none running or if > 20 seconds since ;; server last used then start shutdown (let loop ((count 0)) (thread-sleep! 20) ;; no need to do this very often @@ -159,49 +89,47 @@ (debug:print-info 0 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*)) (loop (+ 1 count))) (begin (debug:print-info 0 "Starting to shutdown the server side") ;; need to delete only *my* server entry (future use) - (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER' AND val like ?;" host:port) + (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER';") (thread-sleep! 10) (debug:print-info 0 "Max cached queries was " *max-cache-size*) (debug:print-info 0 "Server shutdown complete. Exiting") ;; (exit))) ))))) -(define (server:find-free-port-and-open port) - (handle-exceptions - exn - (begin - (print "Failed to bind to port " (rpc:default-server-port) ", trying next port") - (server:find-free-port-and-open (+ port 1))) - (rpc:default-server-port port) - (tcp-read-timeout 240000) - (tcp-listen (rpc:default-server-port) 10000))) +(define (server:find-free-port-and-open host s port) + (let ((s (if s s (make-socket 'rep))) + (p (if (number? port) port 5555))) + (handle-exceptions + exn + (begin + (print "Failed to bind to port " p ", trying next port") + (server:find-free-port-and-open host s (+ p 1))) + (let ((zmq-url (conc "tcp://" host ":" p))) + (bind-socket s zmq-url) + (set! *runremote* zmq-url) + (debug:print 0 "Server started on " zmq-url) + (db:set-var db "SERVER" zmq-url) + s)))) (define (server:client-setup) (if *runremote* (begin (debug:print 0 "ERROR: Attempt to connect to server but already connected") #f) - (let* ((hostinfo (open-run-close db:get-var #f "SERVER")) - (hostdat (if hostinfo (string-split hostinfo ":") #f)) - (host (if hostinfo (car hostdat) #f)) - (port (if (and hostinfo (> (length hostdat) 1))(cadr hostdat) #f))) - (if (and port - (string->number port)) - (let ((portn (string->number port))) - (debug:print-info 2 "Setting up to connect to host " host ":" port) + (let* ((hostinfo (open-run-close db:get-var #f "SERVER")) + (zmq-socket (make-socket 'req))) + (if hostinfo + (begin + (debug:print-info 2 "Setting up to connect to " hostinfo) (handle-exceptions exn (begin (debug:print 0 "ERROR: Failed to open a connection to the server at host: " host " port: " port) (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) - ;; (open-run-close - ;; (lambda (db . param) - ;; (sqlite3:execute db "DELETE FROM metadat WHERE var='SERVER'")) - ;; #f) (set! *runremote* #f)) (if (and (not (args:get-arg "-server")) ;; no point in the server using the server using the server ((rpc:procedure 'server:login host portn) *toppath*)) (begin (debug:print-info 2 "Logged in and connected to " host ":" port) Index: testzmq/hwclient.scm ================================================================== --- testzmq/hwclient.scm +++ testzmq/hwclient.scm @@ -1,9 +1,9 @@ (use zmq posix) (define s (make-socket 'req)) -(connect-socket s "tcp://127.0.0.1:5563") +(connect-socket s "tcp://*:5563") (define myname (cadr (argv))) (print "Start client...") Index: testzmq/hwserver.scm ================================================================== --- testzmq/hwserver.scm +++ testzmq/hwserver.scm @@ -1,15 +1,15 @@ (use zmq srfi-18 posix) (define s (make-socket 'rep)) -(bind-socket s "tcp://127.0.0.1:5563") +(bind-socket s "tcp://*:5563") (print "Start server...") (let loop () (let* ((msg (receive-message s)) (name (caddr (string-split msg " "))) (resp (conc "World " name))) (print "Received request: [" msg "]") - (thread-sleep! 0.01) + (thread-sleep! 0.0001) (print "Sending response \"" resp "\"") (send-message s resp) (loop)))