Megatest

Diff
Login

Differences From Artifact [574d8a43e3]:

To Artifact [f5f57267d2]:


321
322
323
324
325
326
327






































328
329

330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348






349



350
351
352
353
354
355
356
357
358
359
360

361
362
363
364
365
366
367
368
369
370
371
(define (tt:send-receive ttdat conn cmd run-id params)
  (let* ((host-port (tt-conn-host-port conn)) ;; (conc (tt-conn-host conn)":"(tt-conn-port conn)))
	 (host      (tt-conn-host conn))
	 (port      (tt-conn-port conn))
	 (dat       (list cmd run-id params #f))) ;; no meta data yet
    (tt:send-receive-direct host port dat)))







































(define (tt:send-receive-direct host port dat #!key (ping-mode #f)(tries-remaining 25))
  (assert (number? port) "FATAL: tt:send-receive-direct called with port not a number "port)

  (let* ((retry          (lambda ()
			   (tt:send-receive-direct host port dat tries-remaining: (- tries-remaining 1))))
	 (full-err-print (lambda (exn)
			   (pp (condition->list exn) *default-log-port*)
			   (pp dat *default-log-port*)
			   (debug:print 0 *default-log-port*
					", error: "     ((condition-property-accessor 'exn 'message)   exn)
					", arguments: " ((condition-property-accessor 'exn 'arguments) exn)
					", location: "  ((condition-property-accessor 'exn 'location)  exn)
					))))
    (condition-case
     (let-values (((inp oup)(tcp-connect host port)))
       (let ((res (if (and inp oup)
		      (begin
			(serialize dat oup)
			(close-output-port oup)
			(deserialize inp))
		      )))
	 (close-input-port inp)






	 res))



     (exn (io-error)
	  (full-err-print exn)
	  (debug:print 0 *default-log-port* exn "ERROR: i/o error")
	  #f)
     (exn (i/o net)
	  (if ping-mode
	      #f
	      (if (>= tries-remaining 0)
		  (let* ((backoff-delay (* (- 26 tries-remaining) 0.5)))
		    (debug:print 0 *default-log-port* "WARNING: TCP overload, trying again in "backoff-delay"s.")
		    (thread-sleep! backoff-delay)

		    (retry))
		  (assert #f "FATAL: Too many retries in tt:send-receive-direct"))))
     (exn ()
	  (full-err-print exn)
	  #f))))


;;======================================================================
;; server
;;======================================================================








>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


>


|


|













>
>
>
>
>
>
|
>
>
>

|
|








>



|







321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
(define (tt:send-receive ttdat conn cmd run-id params)
  (let* ((host-port (tt-conn-host-port conn)) ;; (conc (tt-conn-host conn)":"(tt-conn-port conn)))
	 (host      (tt-conn-host conn))
	 (port      (tt-conn-port conn))
	 (dat       (list cmd run-id params #f))) ;; no meta data yet
    (tt:send-receive-direct host port dat)))

(defstruct tt:backoff
  (last-ioerr (current-seconds))
  (last-adj-t (current-seconds))
  (wait-delay 0.1))

(define *tt:backoff-smoothing* (make-hash-table)) ;; host:port => lastaccess backoffdelay )

(define (tt:backoff-incr host port) ;; call if tcp fails i/o net
  (let* ((host-port (conc host":"port))
	 (bkoff     (hash-table-ref/default *tt:backoff-smoothing* host-port #f)))
    (if bkoff
	(begin
	  (tt:backoff-last-ioerr-set! bkoff (current-seconds))
	  (tt:backoff-wait-delay-set! bkoff (+ (tt:backoff-wait-delay bkoff) 0.1)))
	(hash-table-set! *tt:backoff-smoothing* host-port (make-tt:backoff)))))

(define (tt:backoff-decr-and-wait host port)
  (let* ((host-port (conc host":"port))
	 (bkoff     (hash-table-ref/default *tt:backoff-smoothing* host-port #f)))
    (if bkoff
	(let* ((wait-delay (tt:backoff-wait-delay bkoff))
	       (last-ioerr (tt:backoff-last-ioerr bkoff))
	       (last-adj-t (tt:backoff-last-adj-t bkoff))
	       (delta      (- (current-seconds) last-adj-t))
	       (adj        (* delta 0.01)) ;; it takes ten seconds to recover from hitting an io err
	       (new-wait   (if (> wait-delay 0)
			       (if (> adj wait-delay)
				   0
				   (- wait-delay adj))
			       0)))
	  (if (> new-wait 0)
	      (begin
		(debug:print-info 0 *default-log-port* "Server loaded, DelayWait: "new-wait)
		(tt:backoff-wait-delay-set! bkoff new-wait)
		(tt:backoff-last-adj-t-set! bkoff (current-seconds))
		(thread-sleep! new-wait))
	      (hash-table-delete! *tt:backoff-smoothing* host-port))))))

(define (tt:send-receive-direct host port dat #!key (ping-mode #f)(tries-remaining 25))
  (assert (number? port) "FATAL: tt:send-receive-direct called with port not a number "port)
  (tt:backoff-decr-and-wait host port)
  (let* ((retry          (lambda ()
			   (tt:send-receive-direct host port dat tries-remaining: (- tries-remaining 1))))
	 (full-err-print (lambda (exn msg)
			   (pp (condition->list exn) *default-log-port*)
			   (pp dat *default-log-port*)
			   (debug:print 0 *default-log-port* msg
					", error: "     ((condition-property-accessor 'exn 'message)   exn)
					", arguments: " ((condition-property-accessor 'exn 'arguments) exn)
					", location: "  ((condition-property-accessor 'exn 'location)  exn)
					))))
    (condition-case
     (let-values (((inp oup)(tcp-connect host port)))
       (let ((res (if (and inp oup)
		      (begin
			(serialize dat oup)
			(close-output-port oup)
			(deserialize inp))
		      )))
	 (close-input-port inp)
	 (match res
	   ((result exn-result stdout-result)
	    (if exn-result
		(full-err-print exn-result "ERROR: Server side exception detected"))
	    (if stdout-result
		(debug:print 0 *default-log-port* "ERROR: Output detected on stdout on server side execution => "stdout-result))
	    result)
	   (else
	    (debug:print 0 *default-log-port* "ERROR: server returned non-standard output: "res)
	    #f))))
     (exn (io-error)
	  (full-err-print exn  "ERROR: i/o error")
	  (tt:backoff-incr host port)
	  #f)
     (exn (i/o net)
	  (if ping-mode
	      #f
	      (if (>= tries-remaining 0)
		  (let* ((backoff-delay (* (- 26 tries-remaining) 0.5)))
		    (debug:print 0 *default-log-port* "WARNING: TCP overload, trying again in "backoff-delay"s.")
		    (thread-sleep! backoff-delay)
		    (tt:backoff-incr host port)
		    (retry))
		  (assert #f "FATAL: Too many retries in tt:send-receive-direct"))))
     (exn ()
	  (full-err-print exn "Unhandled exception from client side.")
	  #f))))


;;======================================================================
;; server
;;======================================================================

683
684
685
686
687
688
689
690





















691
692
693
694
695
696
697
698
;; find a port and start tcp-server. This only starts the tcp portion of
;; the server, look at (tt:start-server ...) above for the entry point
;; for the entire server system
;;
(define (tt:start-tcp-server ttdat)
  (setup-listener-portlogger ttdat)
  (let* ((socket   (tt-socket  ttdat))
	 (handler  (tt-handler ttdat)))





















    ((make-tcp-server socket handler)
     #f ;; yes, send error messages to std-err
     )))

;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;







|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|







732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
;; find a port and start tcp-server. This only starts the tcp portion of
;; the server, look at (tt:start-server ...) above for the entry point
;; for the entire server system
;;
(define (tt:start-tcp-server ttdat)
  (setup-listener-portlogger ttdat)
  (let* ((socket   (tt-socket  ttdat))
	 (handler  (tt-handler ttdat))
	 (handler-proc (lambda ()
			 (let* ((indat         (deserialize))
				(result        #f)
				(exn-result    #f)
				(stdout-result (with-output-to-string
						 (lambda ()
						   (let ((res (handle-exceptions
								  exn
								(begin
								  (set! exn-result (condition->list exn))
								  #f)
								(handler indat))))
						     (set! result res)))))
				(full-result (list result exn-result (if (equal? stdout-result "") #f stdout-result))))
			   (handle-exceptions
			       exn
			     (begin
			       (debug:print 0 *default-log-port* "Serialization failure. full-result="full-result)
			       ;; (serialize '(#f #f #f)) ;; doesn't work - the first call to serialize caused failure
			       )
			     (serialize full-result))))))
    ((make-tcp-server socket handler-proc)
     #f ;; yes, send error messages to std-err
     )))

;; create a tcp listener and return a populated udat struct with
;; my port, address, hostname, pid etc.
;; return #f if fail to find a port to allocate.
;;