ADDED rpctest/rpctest-continuous-client.scm Index: rpctest/rpctest-continuous-client.scm ================================================================== --- /dev/null +++ rpctest/rpctest-continuous-client.scm @@ -0,0 +1,131 @@ +;;;; rpc-demo.scm +;;;; Simple database server / client + +;;; start server thusly: ./rpctest server test.db +;;; you will need to init test.db: +;;; sqlite3 test.db "CREATE TABLE foo (id INTEGER PRIMARY KEY, var TEXT, val TEXT);" + +(require-extension (srfi 18) extras tcp rpc sql-de-lite) + +;;; Common things + +(define total-queries 0) +(define start-time (current-seconds)) + +(define operation (string->symbol (car (command-line-arguments)))) +(define param (cadr (command-line-arguments))) +(print "Operation: " operation ", param: " param) + +;; have a pool of db's to pick from +(define *dbpool* '()) +(define *pool-mutex* (make-mutex)) + +(define (get-db) + (mutex-lock! *pool-mutex*) + (if (null? *dbpool*) + (begin + (mutex-unlock! *pool-mutex*) + (let ((db (open-database param))) + (set-busy-handler! db (busy-timeout 10000)) + (exec (sql db "PRAGMA synchronous=0;")) + db)) + (let ((res (car *dbpool*))) + (set! *dbpool* (cdr *dbpool*)) + (mutex-unlock! *pool-mutex*) + res))) + +(define (return-db db) + (mutex-lock! *pool-mutex*) + (set! *dbpool* (cons db *dbpool* )) + (let ((res (length *dbpool*))) + (mutex-unlock! *pool-mutex*) + res)) + +(define rpc:listener + (if (eq? operation 'server) + (tcp-listen (rpc:default-server-port)) + (tcp-listen 0))) + +;; Start server thread +(define rpc:server + (make-thread + (cute (rpc:make-server rpc:listener) "rpc:server") + 'rpc:server)) + +(thread-start! rpc:server) + +;;; Server side + +(define (server) + (rpc:publish-procedure! + 'change-response-port + (lambda (port) + (rpc:default-server-port port)) + #f) + ;;(let ((db (get-db))(open-database param))) + ;; (set-finalizer! db finalize!) + (rpc:publish-procedure! + 'query + (lambda (sqlstmt callback) + (set! total-queries (+ total-queries 1)) + (print "Executing query '" sqlstmt "' ...") + (let ((db (get-db))) + (query (for-each-row + callback) + (sql db sqlstmt)) + (print "Query rate: " (/ total-queries (/ (- (current-seconds) start-time) 60)) " per minute") + (print "num dbs: " (return-db db)) + ))) + (thread-join! rpc:server)) + +;;; Client side + +(define (callback1 . columns) + (let loop ((c columns) (i 0)) + (unless (null? c) + (printf "~a=~s " i (car c)) + (loop (cdr c) (+ i 1)))) + (newline)) + +(define callback2-results '()) + +(define (callback2 . columns) + (set! callback2-results (cons columns callback2-results))) + +(define (client param) + ((rpc:procedure 'change-response-port "localhost") + (tcp-listener-port rpc:listener)) + ((rpc:procedure 'query "localhost") param callback1) + (rpc:publish-procedure! 'callback2 callback2) + ((rpc:procedure 'query "localhost") param callback2) + (pp callback2-results) + (rpc:close-all-connections!) + ;; (rpc:close-connection! "localhost" (rpc:default-server-port)) + ) + +(define (run-query param) + ((rpc:procedure 'query "localhost") param callback1) + ((rpc:procedure 'query "localhost") param callback2) + callback2-results) + +(define (continuous-client #!key (duration 600)) ;; default - run for 10 minutes + ((rpc:procedure 'change-response-port "localhost") + (tcp-listener-port rpc:listener)) + (rpc:publish-procedure! 'callback2 callback2) + (let loop () + (if (< (- (current-seconds) start-time) duration) + (begin + (run-query (conc "INSERT INTO foo (var,val) VALUES (" (random 1000) "," (random 1000) ");")) + (let ((numrows (caaar (run-query "SELECT COUNT(id) FROM foo;")))) + (if (and (number? numrows) + (> numrows 300)) + (print (run-query (conc "DELETE FROM foo WHERE var > " (random 1000) ";"))))) + (loop)))) + (rpc:close-all-connections!)) + +;;; Run it + +(if (eq? operation 'server) + (server) + (continuous-client)) +