Index: testzmq/mockupclient.scm ================================================================== --- testzmq/mockupclient.scm +++ testzmq/mockupclient.scm @@ -17,10 +17,17 @@ (print "Starting client " cname " with runtime " runtime) (include "mockupclientlib.scm") (set! endtime (+ (current-seconds) runtime)) + +;; first ping the server to ensure we have a connection +(if (server-ping cname 5) + (print "SUCCESS: Client " cname " connected to server") + (begin + (print "ERROR: Client " cname " failed ping of server, exiting") + (exit))) (let loop () (let ((x (random 15)) (varname (list-ref (list "hello" "goodbye" "saluton" "kiaorana")(random 4)))) (case x Index: testzmq/mockupclientlib.scm ================================================================== --- testzmq/mockupclientlib.scm +++ testzmq/mockupclientlib.scm @@ -1,40 +1,63 @@ (define sub (make-socket 'sub)) (define push (make-socket 'push)) (socket-option-set! sub 'subscribe cname) -(connect-socket sub "tcp://localhost:5563") -(connect-socket push "tcp://localhost:5564") +(socket-option-set! sub 'hwm 1000) +(socket-option-set! push 'hwm 1000) + +(connect-socket sub "tcp://localhost:6563") +(connect-socket push "tcp://localhost:6564") + +(thread-sleep! 0.2) +(define (server-ping cname timeout) + (let ((msg (conc cname ":ping:" timeout)) + (maxtime (+ (current-seconds) timeout))) + (print "pinging server from " cname " with timeout " timeout) + (let loop ((res #f)) + (if (< maxtime (current-seconds)) + #f ;; failed to ping + (if (equal? res "Got ping") + #t + (begin + (print "Ping received from server " res) + (send-message push msg) + (thread-sleep! 0.1) + (loop (receive-message sub non-blocking: #t)))))))) + (define (dbaccess cname cmd var val #!key (numtries 20)) (let* ((msg (conc cname ":" cmd ":" (if val (conc var " " val) var))) (res #f) (mtx1 (make-mutex)) (do-access (lambda () - (print "Sending msg: " msg) - (send-message push msg) - (print "Message " msg " sent") - (print "Client " cname " waiting for response to " msg) - (print "Client " cname " received address " (receive-message* sub)) - (mutex-lock! mtx1) - (set! res (receive-message* sub)) - (mutex-unlock! mtx1)))) - (let ((th1 (make-thread do-access "do access")) - (th2 (make-thread (lambda () - (let ((result #f)) - (mutex-lock! mtx1) - (set! result res) - (mutex-unlock! mtx1) - (thread-sleep! 5) - (if (not result) - (if (> numtries 0) - (begin - (print "WARNING: access timed out for " cname ", trying again. Trys remaining=" numtries) - (dbaccess cname cmd var val numtries: (- numtries 1))) - (begin - (print "ERROR: dbaccess timed out. Exiting") - (exit))))) - "timeout thread")))) - (thread-start! th1) - (thread-start! th2) - (thread-join! th1) - res))) + (let ((tmpres #f)) + (print "Sending msg: " msg) + (send-message push msg) + (print "Message " msg " sent") + (print "Client " cname " waiting for response to " msg) + (print "Client " cname " received address " (receive-message* sub)) + (set! tmpres (receive-message* sub)) + (mutex-lock! mtx1) + (set! res tmpres) + (mutex-unlock! mtx1)))) + (th1 (make-thread do-access "do access")) + (th2 (make-thread (lambda () + (let ((result #f)) + (mutex-lock! mtx1) + (set! result res) + (mutex-unlock! mtx1) + (thread-sleep! 5) + (if (not result) + (if (> numtries 0) + (begin + (print "WARNING: access timed out for " cname ", trying again. Trys remaining=" numtries) + (dbaccess cname cmd var val numtries: (- numtries 1))) + (begin + (print "ERROR: dbaccess timed out. Exiting") + (exit))))) + "timeout thread")))) + (thread-start! th1) + (thread-start! th2) + (thread-join! th1) + (if res (print "SUCCESS: received " res " with " numtries " remaining possible attempts")) + res)) Index: testzmq/mockupserver.scm ================================================================== --- testzmq/mockupserver.scm +++ testzmq/mockupserver.scm @@ -9,12 +9,17 @@ (define pull (make-socket 'pull)) (define cname "server") (define total-db-accesses 0) (define start-time (current-seconds)) +(socket-option-set! pub 'hwm 1000) +(socket-option-set! pull 'hwm 1000) + (bind-socket pub "tcp://*:6563") (bind-socket pull "tcp://*:6564") + +(thread-sleep! 0.2) (define (open-db) (let* ((dbpath "mockup.db") (dbexists (file-exists? dbpath)) (db (open-database dbpath)) ;; (never-give-up-open-db dbpath)) @@ -86,10 +91,11 @@ "SELECT val FROM vars WHERE var=?;" cdata) res)) (else (conc "unk cmd: " clcmd)))))) queuelst))) +;; SERVER THREAD (define th1 (make-thread (lambda () (let ((last-run 0)) ;; current-seconds when run last (let loop ((queuelst '())) (let* ((indat (receive-message* pull)) @@ -96,12 +102,18 @@ (parts (string-split indat ":")) (cname (car parts)) ;; client name (clcmd (string->symbol (cadr parts))) ;; client cmd (cdata (caddr parts)) ;; client data (svect (vector (current-seconds) cname clcmd cdata))) ;; record for the queue + ;; (print "Server received message: " indat) (count-client db cname) (case clcmd + ((ping) + (print "Got ping from " cname) + (send-message pub cname send-more: #t) + (send-message pub "Got ping") + (loop queuelst)) ((sync) ;; just process the queue (print "Got sync from " cname) (process-queue (cons svect queuelst)) (loop '())) ((get) @@ -111,10 +123,11 @@ (loop (cons svect queuelst)))))))) "server thread")) (include "mockupclientlib.scm") +;; SYNC THREAD ;; send a sync to the pull port (define th2 (make-thread (lambda () (let ((last-action-time (current-seconds))) (let loop () Index: testzmq/testmockup.sh ================================================================== --- testzmq/testmockup.sh +++ testzmq/testmockup.sh @@ -1,30 +1,37 @@ #!/bin/bash rm -f mockup.db echo Compiling mockupserver.scm and mockupclient.scm + +# Clean up first +killall mockupserver mockupclient -v + csc random.scm csc mockupserver.scm csc mockupclient.scm echo Starting server ./mockupserver & sleep 1 +rm -f mockupclients.log + echo Starting clients -for i in a b c d e; # f g h i j k l m n o p q s t u v w x y z; +for i in a b c d e f g h i j k l m n o p q s t u v w x y z; do for k in a b; do for j in 0 1 2 3 4 5 6 7 8 9; do waittime=`./random 0 60` runtime=`./random 5 120` echo "Starting client $i$k$j with waittime $waittime and runtime $runtime" (sleep $waittime;./mockupclient $i$k$j $runtime) & + # >> mockupclients.log & done done done wait