Megatest

Diff
Login

Differences From Artifact [4e95475281]:

To Artifact [49589d1923]:


11
12
13
14
15
16
17

18
19
20
21
22
23
24

(use json format)

(declare (unit rmt))
(declare (uses api))
(declare (uses tdb))
(declare (uses http-transport))


;;
;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!!
;;

;; ;; For debugging add the following to ~/.megatestrc
;;







>







11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

(use json format)

(declare (unit rmt))
(declare (uses api))
(declare (uses tdb))
(declare (uses http-transport))
(declare (uses nmsg-transport))

;;
;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!!
;;

;; ;; For debugging add the following to ~/.megatestrc
;;
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81

82
83
84
85
86
87

88
89

90




91
92
93
94

95
96
97
98
99
100
101
102
103
104
105
106


107




108
109
110
111
112
113
114

115
116
117

118


119



120

121
122
123
124
125
126
127
	cinfo
	;; NB// can cache the answer for server running for 10 seconds ...
	;;  ;; (and (not (rmt:write-frequency-over-limit? cmd run-id))
	(if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id)
	    (client:setup run-id)
	    #f))))

;; cmd is a symbol
;; vars is a json string encoding the parameters for the call
;;
(define (rmt:send-receive cmd rid params #!key (attemptnum 0))
  ;; clean out old connections
  (mutex-lock! *db-multi-sync-mutex*)
  (let ((expire-time (- (current-seconds) 60)))
    (for-each 
     (lambda (run-id)
       (let ((connection (hash-table-ref/default *runremote* run-id #f)))
	 (if (and connection 
		  (< (http-transport:server-dat-get-last-access connection) expire-time))
	     (begin
	       (debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses")

	       (hash-table-delete! *runremote* run-id)))))
     (hash-table-keys *runremote*)))
  (mutex-unlock! *db-multi-sync-mutex*)
  (let* ((run-id          (if rid rid 0))
	 (connection-info (rmt:get-connection-info run-id))
	 (jparams         (db:obj->string params)))

    (if connection-info
	;; use the server if have connection info

	(let* ((dat     (http-transport:client-api-send-receive run-id connection-info cmd jparams))




	       (res     (if (and dat (vector? dat)) (vector-ref dat 1) #f))
	       (success (if (and dat (vector? dat)) (vector-ref dat 0) #f)))
	  (http-transport:server-dat-update-last-access connection-info)
	  (if success

	      (db:string->obj res)
	      ;; (if (< attemptnum 100)
	      ;;     (begin
	      ;;       (hash-table-delete! *runremote* run-id)
	      ;;       (thread-sleep! 0.5)
	      ;;       (rmt:send-receive cmd rid params attempnum: (+ attemptnum 1)))
	      ;;     (begin
	      ;;       (print-call-chain (current-error-port))
	      ;;       (debug:print 0 "ERROR: too many attempts to communicate have failed. Giving up. Kill your mtest processes and start over")
	      ;;       (exit 1)))))
	      (begin ;; let ((new-connection-info (client:setup run-id)))
		(debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.")


		(hash-table-delete! *runremote* run-id) ;; don't keep using the same connection





		;; no longer killing the server in http-transport:client-api-send-receive
		;; may kill it here but what are the criteria?
		;; start with three calls then kill server
		(if (eq? attemptnum 3)(tasks:kill-server-run-id run-id))
		(thread-sleep! 2)
		(rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))))

	(if (and (< attemptnum 10)
		 (tasks:need-server run-id))
	    (begin

	      (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10)


	      (rmt:send-receive cmd rid params (+ attemptnum 1)))



	    (rmt:open-qry-close-locally cmd run-id params)))))


(define (rmt:update-db-stats run-id rawcmd params duration)
  (mutex-lock! *db-stats-mutex*)
  (handle-exceptions
   exn
   (begin
     (debug:print 0 "WARNING: stats collection failed in update-db-stats")







<
<
<
|


|
|
|
|
|
|
|
|
>
|
|


|
<
>


>
|
>
>
>
>
|
|


>
|
|
<
<
<
<
<
<
<
<

|
>
>

>
>
>
>




|
|

>
|


>

>
>
|
>
>
>
|
>







62
63
64
65
66
67
68



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85

86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101








102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
	cinfo
	;; NB// can cache the answer for server running for 10 seconds ...
	;;  ;; (and (not (rmt:write-frequency-over-limit? cmd run-id))
	(if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id)
	    (client:setup run-id)
	    #f))))




(define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected
  ;; clean out old connections
  (mutex-lock! *db-multi-sync-mutex*)
  ;; (let ((expire-time (- (current-seconds) 60)))
  ;;   (for-each 
  ;;    (lambda (run-id)
  ;;      (let ((connection (hash-table-ref/default *runremote* run-id #f)))
  ;;        (if (and connection 
  ;;       	  (< (http-transport:server-dat-get-last-access connection) expire-time))
  ;;            (begin
  ;;              (debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses")
  ;;              ;; SHOULD CLOSE THE CONNECTION HERE
  ;;              (hash-table-delete! *runremote* run-id)))))
  ;;    (hash-table-keys *runremote*)))
  (mutex-unlock! *db-multi-sync-mutex*)
  (let* ((run-id          (if rid rid 0))
	 (connection-info (rmt:get-connection-info run-id)))

    ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also)
    (if connection-info
	;; use the server if have connection info
	(let* ((dat     (case *transport-type*
			  ((http)(http-transport:client-api-send-receive run-id connection-info cmd params))
			  ((nmsg)(condition-case
				  (nmsg-transport:client-api-send-receive run-id connection-info cmd params)
				  ((timeout)(vector #f "timeout talking to server"))))
			  (else  (exit))))
	       (success (if (and dat (vector? dat)) (vector-ref dat 0) #f))
	       (res     (if (and dat (vector? dat)) (vector-ref dat 1) #f)))
	  (http-transport:server-dat-update-last-access connection-info)
	  (if success
	      (case *transport-type* 
		((http) res) ;; (db:string->obj res))
		((nmsg) res)) ;; (vector-ref res 1)))








	      (begin ;; let ((new-connection-info (client:setup run-id)))
		(debug:print 0 "WARNING: Communication failed, trying call to rmt:send-receive again.")
		;; (case *transport-type*
		;;   ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info))))
		(hash-table-delete! *runremote* run-id) ;; don't keep using the same connection
		(if (eq? (modulo attemptnum 5) 0)
		    (tasks:kill-server-run-id run-id tag: "api-send-receive-failed"))
		(tasks:start-and-wait-for-server (tasks:open-db) run-id 15)
		;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1))))))

		;; no longer killing the server in http-transport:client-api-send-receive
		;; may kill it here but what are the criteria?
		;; start with three calls then kill server
		;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id))
		;; (thread-sleep! 2)
		(rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))))
	;; no connection info? try to start a server
	(if (and (< attemptnum 15)
		 (tasks:need-server run-id))
	    (begin
	      (hash-table-delete! *runremote* run-id)
	      (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10)
	      (client:setup run-id)
	      (thread-sleep! (random 5)) ;; give some time to settle and minimize collison?
	      (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1)))
	    (begin
	      (debug:print 0 "ERROR: Communication failed!")
	      (exit)
	      ;; (rmt:open-qry-close-locally cmd run-id params))))
	      )))))

(define (rmt:update-db-stats run-id rawcmd params duration)
  (mutex-lock! *db-stats-mutex*)
  (handle-exceptions
   exn
   (begin
     (debug:print 0 "WARNING: stats collection failed in update-db-stats")
182
183
184
185
186
187
188
189

190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206


207
208
209
210
211
212
213
214
215
216
217
			     (let* ((dbdir (conc    (configf:lookup *configdat* "setup" "linktree") "/.db"))
				    (db (make-dbr:dbstruct path:  dbdir local: #t)))
			       (set! *dbstruct-db* db)
			       db)))
	 (db-file-path   (db:dbfile-path 0)))
    ;; (read-only      (not (file-read-access? db-file-path)))
    (let* ((start         (current-milliseconds))
	   (res           (api:execute-requests dbstruct-local (symbol->string cmd) params))

	   (duration      (- (current-milliseconds) start)))
      (rmt:update-db-stats run-id cmd params duration)
      ;; mark this run as dirty if this was a write
      (if (not (member cmd api:read-only-queries))
	  (let ((start-time (current-seconds)))
	    (mutex-lock! *db-multi-sync-mutex*)
	    ;; (if (not (hash-table-ref/default *db-local-sync* run-id #f))
	    ;; just set it every time. Is a write more expensive than a read and does it matter?
	    (hash-table-set! *db-local-sync* (or run-id 0) start-time) ;; the oldest "write"
	    (mutex-unlock! *db-multi-sync-mutex*)))
      res)))

(define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params)
  (let* ((run-id   (if run-id run-id 0))
	 (jparams  (db:obj->string params)) ;; (rmt:dat->json-str params))
	 (dat      (http-transport:client-api-send-receive run-id connection-info cmd jparams)))
    (if (and dat (vector-ref dat 0))


	(db:string->obj (vector-ref dat 1))
	(begin
	  (debug:print 0 "ERROR: rmt:send-receive-no-auto-client-setup failed, attempting to continue. Got " dat)
	  dat))))

;; Wrap json library for strings (why the ports crap in the first place?)
(define (rmt:dat->json-str dat)
  (with-output-to-string 
    (lambda ()
      (json-write dat))))








|
>














|
|
|
>
>
|
|
|
|







193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
			     (let* ((dbdir (conc    (configf:lookup *configdat* "setup" "linktree") "/.db"))
				    (db (make-dbr:dbstruct path:  dbdir local: #t)))
			       (set! *dbstruct-db* db)
			       db)))
	 (db-file-path   (db:dbfile-path 0)))
    ;; (read-only      (not (file-read-access? db-file-path)))
    (let* ((start         (current-milliseconds))
	   (resdat        (api:execute-requests dbstruct-local (symbol->string cmd) params))
	   (res           (vector-ref resdat 1))
	   (duration      (- (current-milliseconds) start)))
      (rmt:update-db-stats run-id cmd params duration)
      ;; mark this run as dirty if this was a write
      (if (not (member cmd api:read-only-queries))
	  (let ((start-time (current-seconds)))
	    (mutex-lock! *db-multi-sync-mutex*)
	    ;; (if (not (hash-table-ref/default *db-local-sync* run-id #f))
	    ;; just set it every time. Is a write more expensive than a read and does it matter?
	    (hash-table-set! *db-local-sync* (or run-id 0) start-time) ;; the oldest "write"
	    (mutex-unlock! *db-multi-sync-mutex*)))
      res)))

(define (rmt:send-receive-no-auto-client-setup connection-info cmd run-id params)
  (let* ((run-id   (if run-id run-id 0))
	 ;; (jparams  (db:obj->string params)) ;; (rmt:dat->json-str params))
	 (res  	   (http-transport:client-api-send-receive run-id connection-info cmd params)))
    (if (and res (vector-ref res 0))
	res
	#f)))
;; 	(db:string->obj (vector-ref dat 1))
;; 	(begin
;; 	  (debug:print 0 "ERROR: rmt:send-receive-no-auto-client-setup failed, attempting to continue. Got " dat)
;; 	  dat))))

;; Wrap json library for strings (why the ports crap in the first place?)
(define (rmt:dat->json-str dat)
  (with-output-to-string 
    (lambda ()
      (json-write dat))))

240
241
242
243
244
245
246

247
248

249

250
251
252
253
254
255
256
257
;;  M I S C
;;======================================================================

(define (rmt:login run-id)
  (rmt:send-receive 'login run-id (list *toppath* megatest-version run-id *my-client-signature*)))

;; This login does no retries under the hood - it acts a bit like a ping.

;;
(define (rmt:login-no-auto-client-setup connection-info run-id)

  (rmt:send-receive-no-auto-client-setup connection-info 'login run-id (list *toppath* megatest-version run-id *my-client-signature*)))

  
;; hand off a call to one of the db:queries statements
;; added run-id to make looking up the correct db possible 
;;
(define (rmt:general-call stmtname run-id . params)
  (rmt:send-receive 'general-call run-id (append (list stmtname run-id) params)))

(define (rmt:sync-inmem->db run-id)







>


>
|
>
|







254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
;;  M I S C
;;======================================================================

(define (rmt:login run-id)
  (rmt:send-receive 'login run-id (list *toppath* megatest-version run-id *my-client-signature*)))

;; This login does no retries under the hood - it acts a bit like a ping.
;; Deprecated for nmsg-transport.
;;
(define (rmt:login-no-auto-client-setup connection-info run-id)
  (case *transport-type*
    ((http)(rmt:send-receive-no-auto-client-setup connection-info 'login run-id (list *toppath* megatest-version run-id *my-client-signature*)))
    ((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info 'login (list *toppath* megatest-version run-id *my-client-signature*)))))

;; hand off a call to one of the db:queries statements
;; added run-id to make looking up the correct db possible 
;;
(define (rmt:general-call stmtname run-id . params)
  (rmt:send-receive 'general-call run-id (append (list stmtname run-id) params)))

(define (rmt:sync-inmem->db run-id)