Megatest

Check-in [09d825a5e1]
Login
Overview
Comment:This combo of (no) mutexes seems to work best but the blocking still happens
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v2.0001
Files: files | file ages | folders
SHA1: 09d825a5e1f4a9c0b1cf882616cb244402cd4dc3
User & Date: matt on 2022-01-14 08:13:37
Other Links: branch diff | manifest | tags
Context
2022-01-14
17:57
merged work for using csm for compiling check-in: fc3edb2f32 user: matt tags: v2.0001
08:13
This combo of (no) mutexes seems to work best but the blocking still happens check-in: 09d825a5e1 user: matt tags: v2.0001
06:25
Fixed serialize to be compatible with scm check-in: 829acf0839 user: matt tags: v2.0001
Changes

Modified tests/simplerun/debug.scm from [6634dce456] to [16d17455ce].





1






2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24




(import big-chicken trace rmtmod apimod dbmod ulex srfi-18)






(trace-call-sites #t)
(trace 
  ;; db:get-tests-for-run
  ;; rmt:general-open-connection
  ;; rmt:open-main-connection
  ;; rmt:drop-conn
  ;; rmt:send-receive
  ;; rmt:log-to-main
  )


(module junk
	*

  (import big-chicken rmtmod apimod dbmod srfi-18)
  
(define (make-run-id)
  (let* ((s (conc (current-process-id)))
	 (l (string-length s)))
    (string->number (substring s (- l 3) l))
    ))

(define (run)
>
>
>
>
|
>
>
>
>
>
>










<
<
<
<
<
<







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21






22
23
24
25
26
27
28

(module junk
	*

(import big-chicken
	rmtmod
	apimod
	dbmod
	srfi-18
	trace)

(trace-call-sites #t)
(trace 
  ;; db:get-tests-for-run
  ;; rmt:general-open-connection
  ;; rmt:open-main-connection
  ;; rmt:drop-conn
  ;; rmt:send-receive
  ;; rmt:log-to-main
  )







(define (make-run-id)
  (let* ((s (conc (current-process-id)))
	 (l (string-length s)))
    (string->number (substring s (- l 3) l))
    ))

(define (run)
45
46
47
48
49
50
51
52
53
54
55
56
57
58
				 (print "Got "(length (rmt:get-tests-for-run run-id "%" '() '() 0 #f #f #f #f #f 0 #f))" tests for run "run-id)
				 (print "Average query time: "avg-query-time)
				 (loop (+ r 1) 0 tot-query-time))))))))
	       )))
    (thread-start! th1)
    (thread-join! th1)))

)

(import junk)
(run)










<
<
<

|


49
50
51
52
53
54
55



56
57
58
59
				 (print "Got "(length (rmt:get-tests-for-run run-id "%" '() '() 0 #f #f #f #f #f 0 #f))" tests for run "run-id)
				 (print "Average query time: "avg-query-time)
				 (loop (+ r 1) 0 tot-query-time))))))))
	       )))
    (thread-start! th1)
    (thread-join! th1)))




(run)
)


Modified ulex/ulex.scm from [b06701b724] to [e34b2a5b05].

71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
	
	address-info
	mailbox
	matchable
	;; queues
	regex
	regex-case
    simple-exceptions
	s11n
	srfi-1
	srfi-18
	srfi-4
	srfi-69
	system-information
	tcp6







|







71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
	
	address-info
	mailbox
	matchable
	;; queues
	regex
	regex-case
	simple-exceptions
	s11n
	srfi-1
	srfi-18
	srfi-4
	srfi-69
	system-information
	tcp6
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
    (if (setup-listener uconn port-suggestion)
	(let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))
	       (th2 (make-thread (lambda ()
				   (case (work-method)
				     ((mailbox limited)
				      (process-work-queue uconn))))
				 "Ulex work queue processor")))
	  (tcp-buffer-size 2048)
	  ;; (max-connections 2048) 
	  (thread-start! th1)
	  (thread-start! th2)
	  (udat-cmd-thread-set! uconn th1)
	  (udat-work-queue-thread-set! uconn th2)
	  (print "cmd loop and process workers started, listening on "(udat-host-port uconn)".")
	  uconn)
	(assert #f "ERROR: run-listener called without proper setup."))))







|
<







184
185
186
187
188
189
190
191

192
193
194
195
196
197
198
    (if (setup-listener uconn port-suggestion)
	(let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop"))
	       (th2 (make-thread (lambda ()
				   (case (work-method)
				     ((mailbox limited)
				      (process-work-queue uconn))))
				 "Ulex work queue processor")))
	  ;; (tcp-buffer-size 2048)

	  (thread-start! th1)
	  (thread-start! th2)
	  (udat-cmd-thread-set! uconn th1)
	  (udat-work-queue-thread-set! uconn th2)
	  (print "cmd loop and process workers started, listening on "(udat-host-port uconn)".")
	  uconn)
	(assert #f "ERROR: run-listener called without proper setup."))))
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
    (cond
     (isme (ulex-handler udata dat)) ;; no transmission needed
     (else
      (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC?
	  exn
	  (message exn)
	(begin
	  ;; (mutex-lock! *send-mutex*)
	  (let-values (((inp oup)(tcp-connect host-port)))
	    (let ((res (if (and inp oup)
			   (begin
			     (serialize dat oup)
                 (close-output-port oup)
			     (deserialize inp)
               )
			   (begin
			     (print "ERROR: send called but no receiver has been setup. Please call setup first!")
			     #f))))
	      (close-input-port inp)
	      ;; (mutex-unlock! *send-mutex*)
	      res)))))))) ;; res will always be 'ack unless return-method is direct

(define (send-via-polling uconn host-port cmd data)
  (let* ((qrykey (make-cookie uconn))
	 (sres   (send uconn host-port qrykey cmd data)))
    (case sres
      ((ack)







|




|

|




|







226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
    (cond
     (isme (ulex-handler udata dat)) ;; no transmission needed
     (else
      (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC?
	  exn
	  (message exn)
	(begin
	  ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
	  (let-values (((inp oup)(tcp-connect host-port)))
	    (let ((res (if (and inp oup)
			   (begin
			     (serialize dat oup)
			     (close-output-port oup)
			     (deserialize inp)
			     )
			   (begin
			     (print "ERROR: send called but no receiver has been setup. Please call setup first!")
			     #f))))
	      (close-input-port inp)
	      ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
	      res)))))))) ;; res will always be 'ack unless return-method is direct

(define (send-via-polling uconn host-port cmd data)
  (let* ((qrykey (make-cookie uconn))
	 (sres   (send uconn host-port qrykey cmd data)))
    (case sres
      ((ack)
373
374
375
376
377
378
379

380
381
382
383
384


385
386
387
388
389
390
391
;; given an already set up uconn start the cmd-loop
;;
(define (ulex-cmd-loop uconn)
  (let* ((serv-listener (udat-socket uconn))
	 (listener      (lambda ()
			  (let loop ((state 'start))
			    (let-values (((inp oup)(tcp-accept serv-listener)))

			      (let* ((rdat  (deserialize inp)) ;; '(my-host-port qrykey cmd params)
				     (resp  (ulex-handler uconn rdat)))
				(if resp (serialize resp oup))
				(close-input-port inp)
				(close-output-port oup))


			      (loop state))))))
    ;; start N of them
    (let loop ((thnum   0)
	       (threads '()))
      (if (< thnum 100)
	  (let* ((th (make-thread listener (conc "listener" thnum))))
	    (thread-start! th)







>


|

|
>
>







372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
;; given an already set up uconn start the cmd-loop
;;
(define (ulex-cmd-loop uconn)
  (let* ((serv-listener (udat-socket uconn))
	 (listener      (lambda ()
			  (let loop ((state 'start))
			    (let-values (((inp oup)(tcp-accept serv-listener)))
			      ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP
			      (let* ((rdat  (deserialize inp)) ;; '(my-host-port qrykey cmd params)
				     (resp  (ulex-handler uconn rdat)))
				(serialize resp oup)
				(close-input-port inp)
				(close-output-port oup)
				;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
				)
			      (loop state))))))
    ;; start N of them
    (let loop ((thnum   0)
	       (threads '()))
      (if (< thnum 100)
	  (let* ((th (make-thread listener (conc "listener" thnum))))
	    (thread-start! th)
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
	 (case (work-method)
	   ((direct) result)
	   (else
	    (print "ULEX: work "cmd", "params" done in "run-time" ms")
	    ;; send 'response as cmd and result as params
	    (send uconn rem-host-port qrykey 'response result) ;; could check for ack
	    (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time))))))
      (MBOX_TIMEOUT #f)
      (else
       (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")))))

;; NEW APPROACH:
;;   
(define (process-work-queue uconn) 
  (let ((wqueue (udat-work-queue uconn))







|







436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
	 (case (work-method)
	   ((direct) result)
	   (else
	    (print "ULEX: work "cmd", "params" done in "run-time" ms")
	    ;; send 'response as cmd and result as params
	    (send uconn rem-host-port qrykey 'response result) ;; could check for ack
	    (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time))))))
      (MBOX_TIMEOUT 'do-work-timeout)
      (else
       (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params")))))

;; NEW APPROACH:
;;   
(define (process-work-queue uconn) 
  (let ((wqueue (udat-work-queue uconn))