Megatest

Diff
Login

Differences From Artifact [0582204ff9]:

To Artifact [43798d972b]:


11
12
13
14
15
16
17

18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36







37

38
39
40
41
42
43
44
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

45
46
47
48
49
50
51
52







+



















+
+
+
+
+
+
+
-
+








(use json format)

(declare (unit rmt))
(declare (uses api))
(declare (uses tdb))
(declare (uses http-transport))
(declare (uses nmsg-transport))

;;
;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!!
;;

;; ;; For debugging add the following to ~/.megatestrc
;;
;; (require-library trace)
;; (import trace)
;; (trace
;; rmt:send-receive
;; api:execute-requests
;; )


;;======================================================================
;;  S U P P O R T   F U N C T I O N S
;;======================================================================

(define (rmt:call-transport run-id connection-info cmd jparams)
  (case (server:get-transport)
    ((rpc)  ( rpc-transport:client-api-send-receive run-id connection-info cmd jparams))
    ((http) (http-transport:client-api-send-receive run-id connection-info cmd jparams))
    ((fs)   ( fs-transport:client-api-send-receive run-id connection-info cmd jparams))
    ((zmq)  (zmq-transport:client-api-send-receive run-id connection-info cmd jparams))
    (else   ( rpc-transport:client-api-send-receive run-id connection-info cmd jparams))))
;; #t means - please start a server!

;;
(define (rmt:write-frequency-over-limit? cmd run-id)
  (and (not (member cmd api:read-only-queries))
       (let* ((tmprec (hash-table-ref/default *write-frequency* run-id #f))
	      (record (if tmprec tmprec 
			  (let ((v (vector (current-seconds) 0)))
			    (hash-table-set! *write-frequency* run-id v)
61
62
63
64
65
66
67
68

69
70
71

72
73
74

75
76
77
78
79
80
81
82









83
84

85
86

87

88
89


90
91
92








93
94
95
96
97

98
99
100
101



102
103
104

105
106



107






108
109
110
111
112
113


114

115

116

117


118


119
120







121
122
123
124
125
126
127
69
70
71
72
73
74
75

76



77
78
79

80
81
82
83





84
85
86
87
88
89
90
91
92
93
94
95
96

97

98
99
100
101
102



103
104
105
106
107
108
109
110
111
112



113




114
115
116



117
118

119
120
121
122
123
124
125
126
127
128
129
130
131
132


133
134
135
136

137

138
139
140
141
142
143
144


145
146
147
148
149
150
151
152
153
154
155
156
157
158







-
+
-
-
-
+


-
+



-
-
-
-
-
+
+
+
+
+
+
+
+
+


+

-
+
-
+


+
+
-
-
-
+
+
+
+
+
+
+
+


-
-
-
+
-
-
-
-
+
+
+
-
-
-
+

-
+
+
+

+
+
+
+
+
+




-
-
+
+

+
-
+
-
+

+
+

+
+
-
-
+
+
+
+
+
+
+







	cinfo
	;; NB// can cache the answer for server running for 10 seconds ...
	;;  ;; (and (not (rmt:write-frequency-over-limit? cmd run-id))
	(if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id)
	    (client:setup run-id)
	    #f))))

;; cmd is a symbol
(define *send-receive-mutex* (make-mutex)) ;; should have separate mutex per run-id
;; vars is a json string encoding the parameters for the call
;;
(define (rmt:send-receive cmd rid params #!key (attemptnum 0))
(define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected
  ;; clean out old connections
  (mutex-lock! *db-multi-sync-mutex*)
  (let ((expire-time (- (current-seconds) 60)))
  (let ((expire-time (- (current-seconds) (server:get-timeout) 10))) ;; don't forget the 10 second margin
    (for-each 
     (lambda (run-id)
       (let ((connection (hash-table-ref/default *runremote* run-id #f)))
	 (if (and connection 
		  (< (http-transport:server-dat-get-last-access connection) expire-time))
	     (begin
	       (debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses")
	       (hash-table-delete! *runremote* run-id)))))
         (if (and connection 
        	  (< (http-transport:server-dat-get-last-access connection) expire-time))
             (begin
               (debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses")
               ;; SHOULD CLOSE THE CONNECTION HERE
	       (case *transport-type*
		 ((nmsg)(nn-close (http-transport:server-dat-get-socket 
				   (hash-table-ref *runremote* run-id)))))
               (hash-table-delete! *runremote* run-id)))))
     (hash-table-keys *runremote*)))
  (mutex-unlock! *db-multi-sync-mutex*)
  ;; (mutex-lock! *send-receive-mutex*)
  (let* ((run-id          (if rid rid 0))
	 (connection-info (rmt:get-connection-info run-id))
	 (connection-info (rmt:get-connection-info run-id)))
	 (jparams         (db:obj->string params)))
    ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also)
    (if connection-info
	;; use the server if have connection info
	(let* ((dat     (case *transport-type*
			  ((http)(condition-case
	(let* ((dat     (http-transport:client-api-send-receive run-id connection-info cmd jparams))
	       (res     (if (and dat (vector? dat)) (vector-ref dat 1) #f))
	       (success (if (and dat (vector? dat)) (vector-ref dat 0) #f)))
				  (http-transport:client-api-send-receive run-id connection-info cmd params)
				  ((commfail)(vector #f "communications fail"))))
			  ((nmsg)(condition-case
				  (nmsg-transport:client-api-send-receive run-id connection-info cmd params)
				  ((timeout)(vector #f "timeout talking to server"))))
			  (else  (exit))))
	       (success (if (and dat (vector? dat)) (vector-ref dat 0) #f))
	       (res     (if (and dat (vector? dat)) (vector-ref dat 1) #f)))
	  (http-transport:server-dat-update-last-access connection-info)
	  (if success
	      (db:string->obj res)
	      ;; (if (< attemptnum 100)
	      ;;     (begin
	      (begin
	      ;;       (hash-table-delete! *runremote* run-id)
	      ;;       (thread-sleep! 0.5)
	      ;;       (rmt:send-receive cmd rid params attempnum: (+ attemptnum 1)))
	      ;;     (begin
		;; (mutex-unlock! *send-receive-mutex*)
		(case *transport-type* 
		  ((http) res) ;; (db:string->obj res))
	      ;;       (print-call-chain (current-error-port))
	      ;;       (debug:print 0 "ERROR: too many attempts to communicate have failed. Giving up. Kill your mtest processes and start over")
	      ;;       (exit 1)))))
		  ((nmsg) res))) ;; (vector-ref res 1)))
	      (begin ;; let ((new-connection-info (client:setup run-id)))
		(debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.")
		(debug:print 0 "WARNING: Communication failed, trying call to rmt:send-receive again.")
		;; (case *transport-type*
		;;   ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info))))
		(hash-table-delete! *runremote* run-id) ;; don't keep using the same connection
		;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. 
		;; (if (eq? (modulo attemptnum 5) 0)
		;;     (tasks:kill-server-run-id run-id tag: "api-send-receive-failed"))
		;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications
		(tasks:start-and-wait-for-server (tasks:open-db) run-id 15)
		;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1))))))

		;; no longer killing the server in http-transport:client-api-send-receive
		;; may kill it here but what are the criteria?
		;; start with three calls then kill server
		(if (eq? attemptnum 3)(tasks:kill-server-run-id run-id))
		(thread-sleep! 2)
		;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id))
		;; (thread-sleep! 2)
		(rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))))
	;; no connection info? try to start a server
	(if (and (< attemptnum 10)
	(if (and (< attemptnum 15)
		 (tasks:need-server run-id))
		 (member cmd api:write-queries))
	    (begin
	      (hash-table-delete! *runremote* run-id)
	      ;; (mutex-unlock! *send-receive-mutex*)
	      (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10)
	      ;; (client:setup run-id) ;; client setup happens in rmt:get-connection-info
	      (thread-sleep! (random 5)) ;; give some time to settle and minimize collison?
	      (rmt:send-receive cmd rid params (+ attemptnum 1)))
	    (rmt:open-qry-close-locally cmd run-id params)))))
	      (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1)))
	    (begin
	      ;; (debug:print 0 "ERROR: Communication failed!")
	      ;; (mutex-unlock! *send-receive-mutex*)
	      ;; (exit)
	      (rmt:open-qry-close-locally cmd run-id params)
	      )))))

(define (rmt:update-db-stats run-id rawcmd params duration)
  (mutex-lock! *db-stats-mutex*)
  (handle-exceptions
   exn
   (begin
     (debug:print 0 "WARNING: stats collection failed in update-db-stats")
182
183
184
185
186
187
188
189


190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210









211
212
213
214
215
216
217
213
214
215
216
217
218
219

220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235







236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251







-
+
+














-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+







			     (let* ((dbdir (conc    (configf:lookup *configdat* "setup" "linktree") "/.db"))
				    (db (make-dbr:dbstruct path:  dbdir local: #t)))
			       (set! *dbstruct-db* db)
			       db)))
	 (db-file-path   (db:dbfile-path 0)))
    ;; (read-only      (not (file-read-access? db-file-path)))
    (let* ((start         (current-milliseconds))
	   (res           (api:execute-requests dbstruct-local (symbol->string cmd) params))
	   (resdat        (api:execute-requests dbstruct-local (vector (symbol->string cmd) params)))
	   (res           (vector-ref resdat 1))
	   (duration      (- (current-milliseconds) start)))
      (rmt:update-db-stats run-id cmd params duration)
      ;; mark this run as dirty if this was a write
      (if (not (member cmd api:read-only-queries))
	  (let ((start-time (current-seconds)))
	    (mutex-lock! *db-multi-sync-mutex*)
	    ;; (if (not (hash-table-ref/default *db-local-sync* run-id #f))
	    ;; just set it every time. Is a write more expensive than a read and does it matter?
	    (hash-table-set! *db-local-sync* (or run-id 0) start-time) ;; the oldest "write"
	    (mutex-unlock! *db-multi-sync-mutex*)))
      res)))

(define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params)
  (let* ((run-id   (if run-id run-id 0))
	 (jparams  (db:obj->string params)) ;; (rmt:dat->json-str params))
	 (dat      (http-transport:client-api-send-receive run-id connection-info cmd jparams)))
    (if (and dat (vector-ref dat 0))
	(db:string->obj (vector-ref dat 1))
	(begin
	  (debug:print 0 "ERROR: rmt:send-receive-no-auto-client-setup failed, attempting to continue. Got " dat)
	  dat))))
	 ;; (jparams  (db:obj->string params)) ;; (rmt:dat->json-str params))
	 (res  	   (http-transport:client-api-send-receive run-id connection-info cmd params)))
    (if (and res (vector-ref res 0))
	res
	#f)))
;; 	(db:string->obj (vector-ref dat 1))
;; 	(begin
;; 	  (debug:print 0 "ERROR: rmt:send-receive-no-auto-client-setup failed, attempting to continue. Got " dat)
;; 	  dat))))

;; Wrap json library for strings (why the ports crap in the first place?)
(define (rmt:dat->json-str dat)
  (with-output-to-string 
    (lambda ()
      (json-write dat))))

240
241
242
243
244
245
246

247
248

249
250



251
252
253
254
255
256
257
274
275
276
277
278
279
280
281
282
283
284


285
286
287
288
289
290
291
292
293
294







+


+
-
-
+
+
+







;;  M I S C
;;======================================================================

(define (rmt:login run-id)
  (rmt:send-receive 'login run-id (list *toppath* megatest-version run-id *my-client-signature*)))

;; This login does no retries under the hood - it acts a bit like a ping.
;; Deprecated for nmsg-transport.
;;
(define (rmt:login-no-auto-client-setup connection-info run-id)
  (case *transport-type*
  (rmt:send-receive-no-auto-client-setup connection-info 'login run-id (list *toppath* megatest-version run-id *my-client-signature*)))
  
    ((http)(rmt:send-receive-no-auto-client-setup connection-info 'login run-id (list *toppath* megatest-version run-id *my-client-signature*)))
    ((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info 'login (list *toppath* megatest-version run-id *my-client-signature*)))))

;; hand off a call to one of the db:queries statements
;; added run-id to make looking up the correct db possible 
;;
(define (rmt:general-call stmtname run-id . params)
  (rmt:send-receive 'general-call run-id (append (list stmtname run-id) params)))

(define (rmt:sync-inmem->db run-id)
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352





















353
354
355
356
357
358
359
360
361
362
363
364
365
366























367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394







-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







  (let ((multi-run-mutex (make-mutex))
	(run-id-list (if run-ids
			 run-ids
			 (rmt:get-all-run-ids)))
	(result      '()))
    (if (null? run-id-list)
	'()
	(for-each 
	 (lambda (th)

	   (thread-join! th)) ;; I assume that joining completed threads just moves on
	 (let loop ((hed     (car run-id-list))
		    (tal     (cdr run-id-list))
		    (threads '()))
	   (let* ((newthread (make-thread
			      (lambda ()
				(let ((res (rmt:send-receive 'get-tests-for-run-mindata hed (list hed testpatt states status not-in))))
				  (if (list? res)
				      (begin
					(mutex-lock! multi-run-mutex)
					(set! result (append result res))
					(mutex-unlock! multi-run-mutex))
				      (debug:print 0 "ERROR: get-tests-for-run-mindata failed for run-id " hed ", testpatt " testpatt ", states " states ", status " status ", not-in " not-in))))
			      (conc "multi-run-thread for run-id " hed)))
		  (newthreads (cons newthread threads)))
	     (thread-start! newthread)
	     (thread-sleep! 0.5) ;; give that thread some time to start
	     (if (null? tal)
		 newthreads
		 (loop (car tal)(cdr tal) newthreads))))))
	(let loop ((hed     (car run-id-list))
		   (tal     (cdr run-id-list))
		   (threads '()))
	  (if (> (length threads) 5)
	      (loop hed tal (filter (lambda (th)(not (member (thread-state th) '(terminated dead)))) threads))
	      (let* ((newthread (make-thread
				 (lambda ()
				   (let ((res (rmt:send-receive 'get-tests-for-run-mindata hed (list hed testpatt states status not-in))))
				     (if (list? res)
					 (begin
					   (mutex-lock! multi-run-mutex)
					   (set! result (append result res))
					   (mutex-unlock! multi-run-mutex))
					 (debug:print 0 "ERROR: get-tests-for-run-mindata failed for run-id " hed ", testpatt " testpatt ", states " states ", status " status ", not-in " not-in))))
				 (conc "multi-run-thread for run-id " hed)))
		     (newthreads (cons newthread threads)))
		(thread-start! newthread)
		(thread-sleep! 0.5) ;; give that thread some time to start
		(if (null? tal)
		    newthreads
		    (loop (car tal)(cdr tal) newthreads))))))
    result))

;; ;; IDEA: Threadify these - they spend a lot of time waiting ...
;; ;;
;; (define (rmt:get-tests-for-runs-mindata run-ids testpatt states status not-in)
;;   (let ((run-id-list (if run-ids
;; 			 run-ids
480
481
482
483
484
485
486

487

488
489
490
491
492
493
494
515
516
517
518
519
520
521
522

523
524
525
526
527
528
529
530







+
-
+







(define (rmt:update-run-event_time run-id)
  (rmt:send-receive 'update-run-event_time #f (list run-id)))

(define (rmt:get-runs-by-patt  keys runnamepatt targpatt offset limit)
  (rmt:send-receive 'get-runs-by-patt #f (list keys runnamepatt targpatt offset limit)))

(define (rmt:find-and-mark-incomplete run-id ovr-deadtime)
  (if (rmt:send-receive 'have-incompletes? run-id (list run-id ovr-deadtime))
  (rmt:send-receive 'find-and-mark-incomplete run-id (list run-id ovr-deadtime)))
      (rmt:send-receive 'mark-incomplete run-id (list run-id ovr-deadtime))))

;;======================================================================
;; M U L T I R U N   Q U E R I E S
;;======================================================================

;; Need to move this to multi-run section and make associated changes
(define (rmt:find-and-mark-incomplete-all-runs #!key (ovr-deadtime #f))
571
572
573
574
575
576
577













607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626







+
+
+
+
+
+
+
+
+
+
+
+
+
  (rmt:send-receive 'testmeta-update-field #f (list test-name fld val)))

(define (rmt:test-data-rollup run-id test-id status)
  (rmt:send-receive 'test-data-rollup run-id (list run-id test-id status)))

(define (rmt:csv->test-data run-id test-id csvdata)
  (rmt:send-receive 'csv->test-data run-id (list run-id test-id csvdata)))

;;======================================================================
;;  T A S K S
;;======================================================================

(define (rmt:tasks-find-task-queue-records target run-name test-patt state-patt action-patt)
  (rmt:send-receive 'find-task-queue-records #f (list target run-name test-patt state-patt action-patt)))

(define (rmt:tasks-add action owner target runname testpatt params)
  (rmt:send-receive 'tasks-add #f (list action owner target runname testpatt params)))

(define (rmt:tasks-set-state-given-param-key param-key new-state)
  (rmt:send-receive 'tasks-set-state-given-param-key #f (list  param-key new-state)))