62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
-
+
|
chicken.string
chicken.sort
chicken.pretty-print
address-info
mailbox
matchable
queues
;; queues
regex
regex-case
s11n
srfi-1
srfi-18
srfi-4
srfi-69
|
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
-
-
-
-
-
+
+
+
+
+
+
|
;; the listener side
(port #f)
(host-port #f)
(socket #f)
;; the peers
(peers (make-hash-table)) ;; host:port->peer
;; work handling
(work-queue (make-queue))
(work-proc #f) ;; set by user
(cnum 0) ;; cookie number
(mboxes (make-hash-table))
(avail-cmboxes '()) ;; list of (<cookie> . <mbox>) for re-use
(work-queue (make-mailbox))
(work-proc #f) ;; set by user
(cnum 0) ;; cookie number
(mboxes (make-hash-table)) ;; for the replies
(avail-cmboxes '()) ;; list of (<cookie> . <mbox>) for re-use
;; threads
(numthreads 10)
(cmd-thread #f)
(work-queue-thread #f)
)
;; ;; struct for keeping track of others we are talking to
;; ;;
;; (defstruct pdat
|
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
|
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
|
-
+
+
-
-
-
+
+
+
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
|
;;======================================================================
;; work queues - this is all happening on the listener side
;;======================================================================
;; rdat is (rem-host-port qrykey cmd params)
(define (add-to-work-queue uconn rdat)
(queue-add! (udat-work-queue uconn) rdat))
#;(queue-add! (udat-work-queue uconn) rdat)
(mailbox-send! (udat-work-queue uconn) rdat))
(define (do-work uconn rdat)
(let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change
;; put this following into a do-work procedure
(match rdat
((rem-host-port qrykey cmd params)
(let* ((result (proc rem-host-port qrykey cmd params)))
;; send 'response as cmd and result as params
(send uconn rem-host-port qrykey 'response result))) ;; could check for ack
(else
(print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")))))
(define (process-work-queue uconn)
(let ((wqueue (udat-work-queue uconn))
(proc (udat-work-proc uconn)))
(let loop ()
(proc (udat-work-proc uconn))
(numthr (udat-numthreads uconn)))
(let loop ((thnum 1)
(if (queue-empty? wqueue)
(thread-sleep! 0.1)
(let ((rdat (queue-remove! wqueue)))
(do-work uconn rdat)))
(loop))))
(threads '()))
(let ((thlst (cons (make-thread (lambda ()
(let ((rdat (mailbox-receive! wqueue #f 'MBOX_TIMEOUT)))
(do-work uconn rdat)))
(conc "work thread " thnum))
threads)))
(if (< thnum numthr)
(loop (+ thnum 1)
thlst)
(begin
(print "ULEX: Starting "(length thlst)" worker threads.")
(map thread-start! thlst)
(print "ULEX: Threads started. Joining all.")
(map thread-join! thlst)))))))
;; below was to enable re-use of connections. This seems non-trivial so for
;; now lets open on each call
;;
;; ;; given host-port get or create peer struct
;; ;;
;; (define (udat-get-peer uconn host-port)
|