1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
(define sub (make-socket 'sub))
(define push (make-socket 'push))
(socket-option-set! sub 'subscribe cname)
(connect-socket sub "tcp://localhost:5563")
(connect-socket push "tcp://localhost:5564")
(define (dbaccess cname cmd var val #!key (numtries 20))
(let* ((msg (conc cname ":" cmd ":" (if val (conc var " " val) var)))
(res #f)
(mtx1 (make-mutex))
(do-access (lambda ()
(print "Sending msg: " msg)
(send-message push msg)
(print "Message " msg " sent")
(print "Client " cname " waiting for response to " msg)
(print "Client " cname " received address " (receive-message* sub))
(mutex-lock! mtx1)
(set! res (receive-message* sub))
(mutex-unlock! mtx1))))
(let ((th1 (make-thread do-access "do access"))
(th2 (make-thread (lambda ()
(let ((result #f))
(mutex-lock! mtx1)
(set! result res)
(mutex-unlock! mtx1)
(thread-sleep! 5)
(if (not result)
(if (> numtries 0)
(begin
(print "WARNING: access timed out for " cname ", trying again. Trys remaining=" numtries)
(dbaccess cname cmd var val numtries: (- numtries 1)))
(begin
(print "ERROR: dbaccess timed out. Exiting")
(exit)))))
"timeout thread"))))
(thread-start! th1)
(thread-start! th2)
(thread-join! th1)
res)))
|
>
>
>
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
|
|
|
|
>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
>
|
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
(define sub (make-socket 'sub))
(define push (make-socket 'push))
(socket-option-set! sub 'subscribe cname)
(socket-option-set! sub 'hwm 1000)
(socket-option-set! push 'hwm 1000)
(connect-socket sub "tcp://localhost:6563")
(connect-socket push "tcp://localhost:6564")
(thread-sleep! 0.2)
(define (server-ping cname timeout)
(let ((msg (conc cname ":ping:" timeout))
(maxtime (+ (current-seconds) timeout)))
(print "pinging server from " cname " with timeout " timeout)
(let loop ((res #f))
(if (< maxtime (current-seconds))
#f ;; failed to ping
(if (equal? res "Got ping")
#t
(begin
(print "Ping received from server " res)
(send-message push msg)
(thread-sleep! 0.1)
(loop (receive-message sub non-blocking: #t))))))))
(define (dbaccess cname cmd var val #!key (numtries 20))
(let* ((msg (conc cname ":" cmd ":" (if val (conc var " " val) var)))
(res #f)
(mtx1 (make-mutex))
(do-access (lambda ()
(let ((tmpres #f))
(print "Sending msg: " msg)
(send-message push msg)
(print "Message " msg " sent")
(print "Client " cname " waiting for response to " msg)
(print "Client " cname " received address " (receive-message* sub))
(set! tmpres (receive-message* sub))
(mutex-lock! mtx1)
(set! res tmpres)
(mutex-unlock! mtx1))))
(th1 (make-thread do-access "do access"))
(th2 (make-thread (lambda ()
(let ((result #f))
(mutex-lock! mtx1)
(set! result res)
(mutex-unlock! mtx1)
(thread-sleep! 5)
(if (not result)
(if (> numtries 0)
(begin
(print "WARNING: access timed out for " cname ", trying again. Trys remaining=" numtries)
(dbaccess cname cmd var val numtries: (- numtries 1)))
(begin
(print "ERROR: dbaccess timed out. Exiting")
(exit)))))
"timeout thread"))))
(thread-start! th1)
(thread-start! th2)
(thread-join! th1)
(if res (print "SUCCESS: received " res " with " numtries " remaining possible attempts"))
res))
|