︙ | | | ︙ | |
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
address-info
mailbox
matchable
;; queues
regex
regex-case
simple-exceptions
s11n
srfi-1
srfi-18
srfi-4
srfi-69
system-information
tcp6
|
|
|
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
address-info
mailbox
matchable
;; queues
regex
regex-case
simple-exceptions
s11n
srfi-1
srfi-18
srfi-4
srfi-69
system-information
tcp6
|
︙ | | | ︙ | |
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
|
(if (setup-listener uconn port-suggestion)
(let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))
(th2 (make-thread (lambda ()
(case (work-method)
((mailbox limited)
(process-work-queue uconn))))
"Ulex work queue processor")))
(tcp-buffer-size 2048)
;; (max-connections 2048)
(thread-start! th1)
(thread-start! th2)
(udat-cmd-thread-set! uconn th1)
(udat-work-queue-thread-set! uconn th2)
(print "cmd loop and process workers started, listening on "(udat-host-port uconn)".")
uconn)
(assert #f "ERROR: run-listener called without proper setup."))))
|
|
<
|
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
|
(if (setup-listener uconn port-suggestion)
(let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))
(th2 (make-thread (lambda ()
(case (work-method)
((mailbox limited)
(process-work-queue uconn))))
"Ulex work queue processor")))
;; (tcp-buffer-size 2048)
(thread-start! th1)
(thread-start! th2)
(udat-cmd-thread-set! uconn th1)
(udat-work-queue-thread-set! uconn th2)
(print "cmd loop and process workers started, listening on "(udat-host-port uconn)".")
uconn)
(assert #f "ERROR: run-listener called without proper setup."))))
|
︙ | | | ︙ | |
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
|
(cond
(isme (ulex-handler udata dat)) ;; no transmission needed
(else
(handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC?
exn
(message exn)
(begin
;; (mutex-lock! *send-mutex*)
(let-values (((inp oup)(tcp-connect host-port)))
(let ((res (if (and inp oup)
(begin
(serialize dat oup)
(close-output-port oup)
(deserialize inp)
)
(begin
(print "ERROR: send called but no receiver has been setup. Please call setup first!")
#f))))
(close-input-port inp)
;; (mutex-unlock! *send-mutex*)
res)))))))) ;; res will always be 'ack unless return-method is direct
(define (send-via-polling uconn host-port cmd data)
(let* ((qrykey (make-cookie uconn))
(sres (send uconn host-port qrykey cmd data)))
(case sres
((ack)
|
|
|
|
|
|
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
|
(cond
(isme (ulex-handler udata dat)) ;; no transmission needed
(else
(handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC?
exn
(message exn)
(begin
;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
(let-values (((inp oup)(tcp-connect host-port)))
(let ((res (if (and inp oup)
(begin
(serialize dat oup)
(close-output-port oup)
(deserialize inp)
)
(begin
(print "ERROR: send called but no receiver has been setup. Please call setup first!")
#f))))
(close-input-port inp)
;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
res)))))))) ;; res will always be 'ack unless return-method is direct
(define (send-via-polling uconn host-port cmd data)
(let* ((qrykey (make-cookie uconn))
(sres (send uconn host-port qrykey cmd data)))
(case sres
((ack)
|
︙ | | | ︙ | |
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
|
;; given an already set up uconn start the cmd-loop
;;
(define (ulex-cmd-loop uconn)
(let* ((serv-listener (udat-socket uconn))
(listener (lambda ()
(let loop ((state 'start))
(let-values (((inp oup)(tcp-accept serv-listener)))
(let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params)
(resp (ulex-handler uconn rdat)))
(if resp (serialize resp oup))
(close-input-port inp)
(close-output-port oup))
(loop state))))))
;; start N of them
(let loop ((thnum 0)
(threads '()))
(if (< thnum 100)
(let* ((th (make-thread listener (conc "listener" thnum))))
(thread-start! th)
|
>
|
|
>
>
|
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
|
;; given an already set up uconn start the cmd-loop
;;
(define (ulex-cmd-loop uconn)
(let* ((serv-listener (udat-socket uconn))
(listener (lambda ()
(let loop ((state 'start))
(let-values (((inp oup)(tcp-accept serv-listener)))
;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
(let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params)
(resp (ulex-handler uconn rdat)))
(serialize resp oup)
(close-input-port inp)
(close-output-port oup)
;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
)
(loop state))))))
;; start N of them
(let loop ((thnum 0)
(threads '()))
(if (< thnum 100)
(let* ((th (make-thread listener (conc "listener" thnum))))
(thread-start! th)
|
︙ | | | ︙ | |
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
|
(case (work-method)
((direct) result)
(else
(print "ULEX: work "cmd", "params" done in "run-time" ms")
;; send 'response as cmd and result as params
(send uconn rem-host-port qrykey 'response result) ;; could check for ack
(print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time))))))
(MBOX_TIMEOUT #f)
(else
(print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")))))
;; NEW APPROACH:
;;
(define (process-work-queue uconn)
(let ((wqueue (udat-work-queue uconn))
|
|
|
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
|
(case (work-method)
((direct) result)
(else
(print "ULEX: work "cmd", "params" done in "run-time" ms")
;; send 'response as cmd and result as params
(send uconn rem-host-port qrykey 'response result) ;; could check for ack
(print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time))))))
(MBOX_TIMEOUT 'do-work-timeout)
(else
(print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")))))
;; NEW APPROACH:
;;
(define (process-work-queue uconn)
(let ((wqueue (udat-work-queue uconn))
|
︙ | | | ︙ | |