Megatest

Check-in [ea060a034b]
Login
Overview
Comment:Use run-id in queries looking for test info. Can't assume test-ids are unique. Added separate exception handler for serialization to help with debug (still causes grief when hit but at least you can find the issue but looking in server logs).
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v1.80
Files: files | file ages | folders
SHA1: ea060a034b1e8656fa66c527e7d52f537bbf0248
User & Date: matt on 2023-04-09 21:07:39
Other Links: branch diff | manifest | tags
Context
2023-04-09
21:51
Partial implementation of using the loaded flag to throttle on client side. Need to turn off all server side throttling check-in: b79f855fba user: matt tags: v1.80
21:07
Use run-id in queries looking for test info. Can't assume test-ids are unique. Added separate exception handler for serialization to help with debug (still causes grief when hit but at least you can find the issue but looking in server logs). check-in: ea060a034b user: matt tags: v1.80
14:45
Clean up uses of tcp. tcp6 did not seem to work. Increased tcp backlog (didn't seem to help) and improved backoff check-in: 4860a4e6aa user: matt tags: v1.80
Changes

Modified api.scm from [2ad118d009] to [00192d5ed4].

224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
;; WARNING: Do not print anything in the lambda of this function as it
;;          reads/writes to current in/out port
;;
(define (api:tcp-dispatch-request-make-handler dbstruct) ;; cmd run-id params)
  (assert *toppath* "FATAL: api:tcp-dispatch-request-make-handler called but *toppath* not set.")
  (if (not *server-signature*)
      (set! *server-signature* (tt:mk-signature *toppath*)))
  (lambda ()
    (let* ((indat      (deserialize))
	   (newcount   (+ *api-process-request-count* 1))
	   (delay-wait (if (> newcount 10)
			   (- newcount 10)
			   0))
	   (normal-proc (lambda (cmd run-id params)
			  (case cmd
			    ((ping) *server-signature*)







|
|







224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
;; WARNING: Do not print anything in the lambda of this function as it
;;          reads/writes to current in/out port
;;
(define (api:tcp-dispatch-request-make-handler dbstruct) ;; cmd run-id params)
  (assert *toppath* "FATAL: api:tcp-dispatch-request-make-handler called but *toppath* not set.")
  (if (not *server-signature*)
      (set! *server-signature* (tt:mk-signature *toppath*)))
  (lambda (indat)
    (let* (;; (indat      (deserialize))
	   (newcount   (+ *api-process-request-count* 1))
	   (delay-wait (if (> newcount 10)
			   (- newcount 10)
			   0))
	   (normal-proc (lambda (cmd run-id params)
			  (case cmd
			    ((ping) *server-signature*)
269
270
271
272
273
274
275
276

277
278
279
280
281
282
283
			   (else
			    (normal-proc cmd run-id params))))
		(meta   (case cmd
			  ((ping) `((sstate . ,server-state)))
			  (else   `((wait . ,delay-wait)))))
		(payload (list status errmsg result meta)))
	   (set! *api-process-request-count* (- *api-process-request-count* 1))
	   (serialize payload)))

	(else
	 (assert #f "FATAL: failed to deserialize indat "indat))))))
       

(define (api:dispatch-request dbstruct cmd run-id params)
  (if (not *no-sync-db*)
      (db:open-no-sync-db))







|
>







269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
			   (else
			    (normal-proc cmd run-id params))))
		(meta   (case cmd
			  ((ping) `((sstate . ,server-state)))
			  (else   `((wait . ,delay-wait)))))
		(payload (list status errmsg result meta)))
	   (set! *api-process-request-count* (- *api-process-request-count* 1))
	   ;; (serialize payload)
	   payload))
	(else
	 (assert #f "FATAL: failed to deserialize indat "indat))))))
       

(define (api:dispatch-request dbstruct cmd run-id params)
  (if (not *no-sync-db*)
      (db:open-no-sync-db))

Modified db.scm from [cf4037913b] to [2ad1867042].

2640
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
  (db:with-db
   dbstruct
   run-id
   #f
   (lambda (dbdat db)
     (db:first-result-default 
      db
      "SELECT attemptnum FROM tests WHERE id=?;"
      #f
      test-id))))

(define db:test-record-fields '("id"           "run_id"        "testname"  "state"      "status"      "event_time"
				"host"         "cpuload"       "diskfree"  "uname"      "rundir"      "item_path"
                                "run_duration" "final_logf"    "comment"   "shortdir"   "attemptnum"  "archived" "last_update"))

;; fields *must* be a non-empty list
;;







|

|







2640
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
  (db:with-db
   dbstruct
   run-id
   #f
   (lambda (dbdat db)
     (db:first-result-default 
      db
      "SELECT attemptnum FROM tests WHERE id=? AND run_id=?;"
      #f
      test-id run-id))))

(define db:test-record-fields '("id"           "run_id"        "testname"  "state"      "status"      "event_time"
				"host"         "cpuload"       "diskfree"  "uname"      "rundir"      "item_path"
                                "run_duration" "final_logf"    "comment"   "shortdir"   "attemptnum"  "archived" "last_update"))

;; fields *must* be a non-empty list
;;
2760
2761
2762
2763
2764
2765
2766

2767

2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
   #f
   (lambda (dbdat db)
     (let ((res #f))
       (sqlite3:for-each-row ;; attemptnum added to hold pid of top process (not Megatest) controlling a test
	(lambda (id run-id testname state status event-time host cpuload diskfree uname rundir-id item-path run_duration final-logf-id comment short-dir-id attemptnum archived last-update)
	  ;;                0    1       2      3      4        5       6      7        8     9     10      11          12          13           14         15          16
	  (set! res (vector id run-id testname state status event-time host cpuload diskfree uname rundir-id item-path run_duration final-logf-id comment short-dir-id attemptnum archived last-update)))

	(db:get-cache-stmth dbdat db

			    (conc "SELECT " db:test-record-qry-selector " FROM tests WHERE id=?;"))
	test-id)
       res))))

;; Get test state, status using test_id
;; 
(define (db:get-test-state-status-by-id dbstruct run-id test-id)
  (db:with-db
   dbstruct
   run-id
   #f
   (lambda (dbdat db)
     (let ((res   (cons #f #f)))
;;	   (stmth (db:get-cache-stmth dbdat db "SELECT state,status FROM tests WHERE id=?;")))
       (sqlite3:for-each-row ;; attemptnum added to hold pid of top process (not Megatest) controlling a test
	(lambda (state status)
	  (cons state status))
	db
	"SELECT state,status FROM tests WHERE id=?;" ;; stmth try not compiling this one - yes, this fixed the bind issue
	test-id)
       res))))

;; Use db:test-get* to access
;; Get test data using test_ids. NB// Only works within a single run!!
;;
(define (db:get-test-info-by-ids dbstruct run-id test-ids)
  (db:with-db







>
|
>
|
|
















|
|







2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
   #f
   (lambda (dbdat db)
     (let ((res #f))
       (sqlite3:for-each-row ;; attemptnum added to hold pid of top process (not Megatest) controlling a test
	(lambda (id run-id testname state status event-time host cpuload diskfree uname rundir-id item-path run_duration final-logf-id comment short-dir-id attemptnum archived last-update)
	  ;;                0    1       2      3      4        5       6      7        8     9     10      11          12          13           14         15          16
	  (set! res (vector id run-id testname state status event-time host cpuload diskfree uname rundir-id item-path run_duration final-logf-id comment short-dir-id attemptnum archived last-update)))
	db
	;; (db:get-cache-stmth dbdat db
	;; 		    (conc "SELECT " db:test-record-qry-selector " FROM tests WHERE id=? AND run_id=?;"))
	(conc "SELECT " db:test-record-qry-selector " FROM tests WHERE id=? AND run_id=?;")
	test-id run-id)
       res))))

;; Get test state, status using test_id
;; 
(define (db:get-test-state-status-by-id dbstruct run-id test-id)
  (db:with-db
   dbstruct
   run-id
   #f
   (lambda (dbdat db)
     (let ((res   (cons #f #f)))
;;	   (stmth (db:get-cache-stmth dbdat db "SELECT state,status FROM tests WHERE id=?;")))
       (sqlite3:for-each-row ;; attemptnum added to hold pid of top process (not Megatest) controlling a test
	(lambda (state status)
	  (cons state status))
	db
	"SELECT state,status FROM tests WHERE id=? AND run_id=?;" ;; stmth try not compiling this one - yes, this fixed the bind issue
	test-id run-id)
       res))))

;; Use db:test-get* to access
;; Get test data using test_ids. NB// Only works within a single run!!
;;
(define (db:get-test-info-by-ids dbstruct run-id test-ids)
  (db:with-db
2843
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
  (db:with-db
   dbstruct
   run-id
   #f
   (lambda (dbdat db)
     (db:first-result-default
      db
      "SELECT rundir FROM tests WHERE id=?;"
      #f ;; default result
      test-id))))

(define (db:get-test-times dbstruct run-name target)
  (let ((res `())
        (qry 	(conc "select testname, item_path, run_duration, "
		      (string-join (db:get-keys dbstruct) " || '/' || ")
		      " as target from tests inner join runs on tests.run_id = runs.id where runs.runname = ? and target = ?  ;")))
  (db:with-db 







|

|







2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860
2861
  (db:with-db
   dbstruct
   run-id
   #f
   (lambda (dbdat db)
     (db:first-result-default
      db
      "SELECT rundir FROM tests WHERE id=? AND run_id=?;"
      #f ;; default result
      test-id run-id))))

(define (db:get-test-times dbstruct run-name target)
  (let ((res `())
        (qry 	(conc "select testname, item_path, run_duration, "
		      (string-join (db:get-keys dbstruct) " || '/' || ")
		      " as target from tests inner join runs on tests.run_id = runs.id where runs.runname = ? and target = ?  ;")))
  (db:with-db 

Modified tcp-transportmod.scm from [574d8a43e3] to [f5f57267d2].

321
322
323
324
325
326
327






































328
329

330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348






349



350
351
352
353
354
355
356
357
358
359
360

361
362
363
364
365
366
367
368
369
370
371
(define (tt:send-receive ttdat conn cmd run-id params)
  (let* ((host-port (tt-conn-host-port conn)) ;; (conc (tt-conn-host conn)":"(tt-conn-port conn)))
	 (host      (tt-conn-host conn))
	 (port      (tt-conn-port conn))
	 (dat       (list cmd run-id params #f))) ;; no meta data yet
    (tt:send-receive-direct host port dat)))







































(define (tt:send-receive-direct host port dat #!key (ping-mode #f)(tries-remaining 25))
  (assert (number? port) "FATAL: tt:send-receive-direct called with port not a number "port)

  (let* ((retry          (lambda ()
			   (tt:send-receive-direct host port dat tries-remaining: (- tries-remaining 1))))
	 (full-err-print (lambda (exn)
			   (pp (condition->list exn) *default-log-port*)
			   (pp dat *default-log-port*)
			   (debug:print 0 *default-log-port*
					", error: "     ((condition-property-accessor 'exn 'message)   exn)
					", arguments: " ((condition-property-accessor 'exn 'arguments) exn)
					", location: "  ((condition-property-accessor 'exn 'location)  exn)
					))))
    (condition-case
     (let-values (((inp oup)(tcp-connect host port)))
       (let ((res (if (and inp oup)
		      (begin
			(serialize dat oup)
			(close-output-port oup)
			(deserialize inp))
		      )))
	 (close-input-port inp)






	 res))



     (exn (io-error)
	  (full-err-print exn)
	  (debug:print 0 *default-log-port* exn "ERROR: i/o error")
	  #f)
     (exn (i/o net)
	  (if ping-mode
	      #f
	      (if (>= tries-remaining 0)
		  (let* ((backoff-delay (* (- 26 tries-remaining) 0.5)))
		    (debug:print 0 *default-log-port* "WARNING: TCP overload, trying again in "backoff-delay"s.")
		    (thread-sleep! backoff-delay)

		    (retry))
		  (assert #f "FATAL: Too many retries in tt:send-receive-direct"))))
     (exn ()
	  (full-err-print exn)
	  #f))))


;;======================================================================
;; server
;;======================================================================








>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


>


|


|













>
>
>
>
>
>
|
>
>
>

|
|








>



|







321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
(define (tt:send-receive ttdat conn cmd run-id params)
  (let* ((host-port (tt-conn-host-port conn)) ;; (conc (tt-conn-host conn)":"(tt-conn-port conn)))
	 (host      (tt-conn-host conn))
	 (port      (tt-conn-port conn))
	 (dat       (list cmd run-id params #f))) ;; no meta data yet
    (tt:send-receive-direct host port dat)))

(defstruct tt:backoff
  (last-ioerr (current-seconds))
  (last-adj-t (current-seconds))
  (wait-delay 0.1))

(define *tt:backoff-smoothing* (make-hash-table)) ;; host:port => lastaccess backoffdelay )

(define (tt:backoff-incr host port) ;; call if tcp fails i/o net
  (let* ((host-port (conc host":"port))
	 (bkoff     (hash-table-ref/default *tt:backoff-smoothing* host-port #f)))
    (if bkoff
	(begin
	  (tt:backoff-last-ioerr-set! bkoff (current-seconds))
	  (tt:backoff-wait-delay-set! bkoff (+ (tt:backoff-wait-delay bkoff) 0.1)))
	(hash-table-set! *tt:backoff-smoothing* host-port (make-tt:backoff)))))

(define (tt:backoff-decr-and-wait host port)
  (let* ((host-port (conc host":"port))
	 (bkoff     (hash-table-ref/default *tt:backoff-smoothing* host-port #f)))
    (if bkoff
	(let* ((wait-delay (tt:backoff-wait-delay bkoff))
	       (last-ioerr (tt:backoff-last-ioerr bkoff))
	       (last-adj-t (tt:backoff-last-adj-t bkoff))
	       (delta      (- (current-seconds) last-adj-t))
	       (adj        (* delta 0.01)) ;; it takes ten seconds to recover from hitting an io err
	       (new-wait   (if (> wait-delay 0)
			       (if (> adj wait-delay)
				   0
				   (- wait-delay adj))
			       0)))
	  (if (> new-wait 0)
	      (begin
		(debug:print-info 0 *default-log-port* "Server loaded, DelayWait: "new-wait)
		(tt:backoff-wait-delay-set! bkoff new-wait)
		(tt:backoff-last-adj-t-set! bkoff (current-seconds))
		(thread-sleep! new-wait))
	      (hash-table-delete! *tt:backoff-smoothing* host-port))))))

(define (tt:send-receive-direct host port dat #!key (ping-mode #f)(tries-remaining 25))
  (assert (number? port) "FATAL: tt:send-receive-direct called with port not a number "port)
  (tt:backoff-decr-and-wait host port)
  (let* ((retry          (lambda ()
			   (tt:send-receive-direct host port dat tries-remaining: (- tries-remaining 1))))
	 (full-err-print (lambda (exn msg)
			   (pp (condition->list exn) *default-log-port*)
			   (pp dat *default-log-port*)
			   (debug:print 0 *default-log-port* msg
					", error: "     ((condition-property-accessor 'exn 'message)   exn)
					", arguments: " ((condition-property-accessor 'exn 'arguments) exn)
					", location: "  ((condition-property-accessor 'exn 'location)  exn)
					))))
    (condition-case
     (let-values (((inp oup)(tcp-connect host port)))
       (let ((res (if (and inp oup)
		      (begin
			(serialize dat oup)
			(close-output-port oup)
			(deserialize inp))
		      )))
	 (close-input-port inp)
	 (match res
	   ((result exn-result stdout-result)
	    (if exn-result
		(full-err-print exn-result "ERROR: Server side exception detected"))
	    (if stdout-result
		(debug:print 0 *default-log-port* "ERROR: Output detected on stdout on server side execution => "stdout-result))
	    result)
	   (else
	    (debug:print 0 *default-log-port* "ERROR: server returned non-standard output: "res)
	    #f))))
     (exn (io-error)
	  (full-err-print exn  "ERROR: i/o error")
	  (tt:backoff-incr host port)
	  #f)
     (exn (i/o net)
	  (if ping-mode
	      #f
	      (if (>= tries-remaining 0)
		  (let* ((backoff-delay (* (- 26 tries-remaining) 0.5)))
		    (debug:print 0 *default-log-port* "WARNING: TCP overload, trying again in "backoff-delay"s.")
		    (thread-sleep! backoff-delay)
		    (tt:backoff-incr host port)
		    (retry))
		  (assert #f "FATAL: Too many retries in tt:send-receive-direct"))))
     (exn ()
	  (full-err-print exn "Unhandled exception from client side.")
	  #f))))


;;======================================================================
;; server
;;======================================================================

683
684
685
686
687
688
689
690





















691
692
693
694
695
696
697
698
;; find a port and start tcp-server. This only starts the tcp portion of
;; the server, look at (tt:start-server ...) above for the entry point
;; for the entire server system
;;
(define (tt:start-tcp-server ttdat)
  (setup-listener-portlogger ttdat)
  (let* ((socket   (tt-socket  ttdat))
	 (handler  (tt-handler ttdat)))





















    ((make-tcp-server socket handler)
     #f ;; yes, send error messages to std-err
     )))

;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;







|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|







732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
;; find a port and start tcp-server. This only starts the tcp portion of
;; the server, look at (tt:start-server ...) above for the entry point
;; for the entire server system
;;
(define (tt:start-tcp-server ttdat)
  (setup-listener-portlogger ttdat)
  (let* ((socket   (tt-socket  ttdat))
	 (handler  (tt-handler ttdat))
	 (handler-proc (lambda ()
			 (let* ((indat         (deserialize))
				(result        #f)
				(exn-result    #f)
				(stdout-result (with-output-to-string
						 (lambda ()
						   (let ((res (handle-exceptions
								  exn
								(begin
								  (set! exn-result (condition->list exn))
								  #f)
								(handler indat))))
						     (set! result res)))))
				(full-result (list result exn-result (if (equal? stdout-result "") #f stdout-result))))
			   (handle-exceptions
			       exn
			     (begin
			       (debug:print 0 *default-log-port* "Serialization failure. full-result="full-result)
			       ;; (serialize '(#f #f #f)) ;; doesn't work - the first call to serialize caused failure
			       )
			     (serialize full-result))))))
    ((make-tcp-server socket handler-proc)
     #f ;; yes, send error messages to std-err
     )))

;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;