Megatest

Check-in [e01a10845a]
Login
Overview
Comment:Basic communication working, ping, get-keys.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v1.80-tcp-inmem
Files: files | file ages | folders
SHA1: e01a10845a1da03f6285ffec0034e0371c26d0f5
User & Date: matt on 2023-02-19 18:41:25
Other Links: branch diff | manifest | tags
Context
2023-02-19
19:19
Couple fixes. basic queries, register-test, get-test-id, login, working and starting a server on demand check-in: 9e78ced13a user: matt tags: v1.80-tcp-inmem
18:41
Basic communication working, ping, get-keys. check-in: e01a10845a user: matt tags: v1.80-tcp-inmem
10:37
rmt:send-receive -> tt:handler -> tcp -> api:tcp-dispatch-request -> api:dispatch-request and back implemented and compiles. check-in: a91d15ac06 user: matt tags: v1.80-tcp-inmem
Changes

Modified api.scm from [c88d2a22c9] to [351c29f44d].

16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

31
32
33






34
35
36
37
38
39
40
16
17
18
19
20
21
22


23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45







-
-






+



+
+
+
+
+
+







;;     GNU General Public License for more details.
;; 
;;     You should have received a copy of the GNU General Public License
;;     along with Megatest.  If not, see <http://www.gnu.org/licenses/>.
;;
;;======================================================================

(use srfi-69 posix)

(declare (unit api))
(declare (uses rmt))
(declare (uses db))
(declare (uses dbmod))
(declare (uses dbfile))
(declare (uses tasks))
(declare (uses tcp-transportmod))

(import dbmod)
(import dbfile)
(import tcp-transportmod)

(use srfi-69
     posix
     matchable
     s11n)

;; allow these queries through without starting a server
;;
(define api:read-only-queries
  '(get-key-val-pairs
    get-var
    get-keys
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245






















246
247
248
249



250
251

252
253
254
255
256
257
258
228
229
230
231
232
233
234
















235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256




257
258
259


260
261
262
263
264
265
266
267







-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
+
+
+
-
-
+







             (begin
               #;(common:telemetry-log (conc "api-out:"(->string cmd))
               payload: `((params . ,params)
               (ok-res . #f)))
               (vector #t res))))))))

;; indat is (cmd run-id params meta)
(define (api:tcp-dispatch-request dbstruct indat) ;; cmd run-id params)
  (set! *api-process-request-count* (+ *api-process-request-count* 1))
  (match (deserialize indat)
    ((cmd run-id params meta)
     (let* ((status  (cond
		      ((> *api-process-request-count* 50) 'busy)
		      ((> *api-process-request-count* 25) 'loaded)
		      (else 'ok)))
	    (errmsg  (case status
		       ((busy)   (conc "Server overloaded, "*api-process-request-count*" threads in flight"))
		       ((loaded) (conc "Server loaded, "*api-process-request-count*" threads in flight"))
		       (else     #f)))
	    (result  (case status
		       ((busy) #f)
		       (else (api:dispatch-request dbstruct cmd run-id params))))
	    (payload (list status errmsg result '()))
(define (api:tcp-dispatch-request-make-handler dbstruct) ;; cmd run-id params)
  (lambda ()
    (let* ((indat (deserialize)))
      (set! *api-process-request-count* (+ *api-process-request-count* 1))
      (match indat
	((cmd run-id params meta)
	 (let* ((status  (cond
			  ((> *api-process-request-count* 50) 'busy)
			  ((> *api-process-request-count* 25) 'loaded)
			  (else 'ok)))
		(errmsg  (case status
			   ((busy)   (conc "Server overloaded, "*api-process-request-count*" threads in flight"))
			   ((loaded) (conc "Server loaded, "*api-process-request-count*" threads in flight"))
			   (else     #f)))
		(result  (case status
			   ((busy) #f)
			   (else
			    (case cmd
			      ((ping) (tt:mk-signature *toppath*))
			      (else
			       (api:dispatch-request dbstruct cmd run-id params))))))
		(payload (list status errmsg result '())))
	    (pdat    (serialize payload)))
       (set! *api-process-request-count* (- *api-process-request-count* 1))
       pdat))
    (else
	   (set! *api-process-request-count* (- *api-process-request-count* 1))
	   (serialize payload)))
	(else
     (let* ((msg (conc "(deserialize indat)="(deserialize indat)", indat="indat)))
       (assert #f "FATAL: failed to deserialize indat "msg)))))
	 (assert #f "FATAL: failed to deserialize indat "indat))))))
       

(define (api:dispatch-request dbstruct cmd run-id params)
  (case cmd
    ;;===============================================
    ;; READ/WRITE QUERIES
    ;;===============================================

Modified megatest.scm from [ae2b7cbe8a] to [6f2fe2c4df].

939
940
941
942
943
944
945
946

947
948
949
950
951
952
953
939
940
941
942
943
944
945

946
947
948
949
950
951
952
953







-
+







	   (dbfname    (args:get-arg "-db"))
	   (tl         (launch:setup)))
      (case (rmt:transport-mode)
	((http)(http-transport:launch))
	((tcp)
	 (debug:print 0 *default-log-port* "INFO: Running using tcp method.")
	 (if run-id
	     (tt:start-server tl run-id dbfname api:tcp-dispatch-request)
	     (tt:start-server tl run-id dbfname api:tcp-dispatch-request-make-handler)
	     (begin
	       (debug:print 0 *default-log-port* "ERROR: transport mode is tcp - -run-id is required.")
	       (exit 1))))
	(else (debug:print 0 *default-log-port* "ERROR: rmt:transport-mode value not recognised "(rmt:transport-mode))))
      (set! *didsomething* #t)))

;; The adjutant is a bit different, it does NOT run (launch:setup) as it is not necessarily tied to

Modified tcp-transportmod.scm from [5c83e85f41] to [050a577392].

96
97
98
99
100
101
102
103

104
105
106
107
108
109








110
111
112
113
114
115
116
117
118
119
120
121











122


123






124
125

126
127
128
129
130
131
















132
133
134
135
136
137
138
96
97
98
99
100
101
102

103
104
105
106
107


108
109
110
111
112
113
114
115
116
117
118
119








120
121
122
123
124
125
126
127
128
129
130
131
132
133

134
135
136
137
138
139
140

141




142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166







-
+




-
-
+
+
+
+
+
+
+
+




-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+

+
+
-
+
+
+
+
+
+

-
+
-
-
-
-


+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







  (thread       #f)
  (host-port    #f)
  (cmd-thread   #f)
  (last-access  (current-seconds))
  )

(define (tt:make-remote areapath)
  (make-tt area: areapath))
  (make-tt areapath: areapath))

;; do all the busy work of finding and setting up conn for
;; connecting to a server
;; 
(define (tt:client-connect-to-server ttdat dbfname run-id)
  (let* ((conn (hash-table-ref/default (tt-conns ttdat) dbfname #f)))
(define (tt:client-connect-to-server ttdat dbfname run-id )
  (let* ((conn (hash-table-ref/default (tt-conns ttdat) dbfname #f))
	 (server-start-proc (lambda ()
			      (tt:server-process-run
			       (tt-areapath ttdat)
			       (dbfile:testsuite-name)
			       (common:find-local-megatest)
			       run-id))))
    (if conn
	conn ;; we are already connected to the server
	(let* ((sdat (tt:get-current-server-info ttdat dbfname run-id)))
	  (match sdat
	    ((host port start-time server-id pid)
	     (let ((conn (make-tt-conn
			  host: host
			  port: port
			  dbfname: dbfname
			  server-id: server-id
			  server-start: start-time
			  pid: pid)))
	    ((host port start-time server-id pid dbfname2)
	     (assert (equal? dbfname dbfname2) "FATAL: read server info from wrong file.")
	     (let* ((host-port (conc host":"port))
		    (conn (make-tt-conn
			   host: host
			   port: port
			   host-port: host-port
			   dbfname: dbfname
			   server-id: server-id
			   server-start: start-time
			   pid: pid)))
	       (hash-table-set! (tt-conns ttdat) dbfname conn)
	       ;; verify we can talk to this server
	       (if (tt:ping host port server-id)
	       conn))
		   conn
		   (begin
		     ;; rm the (last server) would go here
		     (server-start-proc)
		     (thread-sleep! 1)
		     (tt:client-connect-to-server ttdat dbfname run-id)))))
	    (else
	     (tt:server-process-run
	     (server-start-proc)
	      (tt-areapath ttdat)
	      (dbfile:testsuite-name)
	      (common:find-local-megatest)
	      run-id)
	     (thread-sleep! 1)
	     (tt:client-connect-to-server ttdat dbfname run-id)))))))
    
(define (tt:ping host port server-id)
  (let*  ((res (tt:send-receive-direct host port `(ping #f #f #f)))) ;; please send me your server-id
    ;;
    ;; need two threads, one a 5 second timer
    ;;
    (match res
      ((status errmsg result meta)
       (if (equal? result server-id)
	   #t ;; then we are good
	   (begin
	     (debug:print 0 *default-log-port* "WARNING: server-id does not match, expected: "server-id", got: "result)
	     #f)))
      (else
       (debug:print 0 *default-log-port* "res not in form (status errmsg resutl meta), got: "res)
       #f))))

;; client side handler
;;
(define (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe)
  ;; NOTE: areapath is passed in and in tt struct. We'll use passed in value for now.
  (let* ((conn (tt:client-connect-to-server ttdat dbfname run-id))) ;; (hash-table-ref/default (tt-conns ttdat) dbfname #f)))
    (if conn
147
148
149
150
151
152
153
154



155
156
157
158
159
160
161
175
176
177
178
179
180
181

182
183
184
185
186
187
188
189
190
191







-
+
+
+







		(thread-sleep! 2)
		(tt:handler  ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe))
	       ((loaded)
		(debug:print 0 *default-log-port* "WARNING: server is loaded, will try again in a second.")
		(thread-sleep! 1)
		(tt:handler  ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe))
	       (else
		result)))))
		result)))
	    (else
	     (assert #f "FATAL: tt:handler received bad data "res))))
	(begin
	  (thread-sleep! 1) ;; give it a rest and try again
	  (tt:handler ttdat cmd run-id params attemptnum area-dat areapath readonly-mode dbfname testsuite mtexe)))))

	;; no conn yet, find and or start and find a server
;; 	(let* ((server (tt:find-server ttdat dbfname)))
;; 	  (if server
170
171
172
173
174
175
176

177

178
179
180
181
182







183
184


185
186
187










188
189
190
191
192
193
194
195

196
197
198


199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214

215
216
217

218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
























243
244
245
246
247
248
249
200
201
202
203
204
205
206
207

208





209
210
211
212
213
214
215
216
217
218
219



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234



235
236

237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258

259

260























261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291







+
-
+
-
-
-
-
-
+
+
+
+
+
+
+


+
+
-
-
-
+
+
+
+
+
+
+
+
+
+





-
-
-
+

-

+
+
















+


-
+
-

-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







;; 		(tt:handler  ttdat cmd run-id params attemptnum area-dat areapath
;; 			     readonly-mode dbfname testsuite mtexe)))))))

(define (tt:bid-for-servership run-id)
  #f)

(define (tt:get-current-server-info ttdat dbfname run-id)
  (assert (tt-areapath ttdat) "FATAL: areapath not set in ttdat.")
  (let* ((sfiles (tt:find-server ttdat dbfname)))
  (let* ((areapath (tt-areapath ttdat))
    (case (length sfiles)
      ((0) #f) ;; no server around
      ((1) (tt:server-get-info (car sfiles)))
      (else #f) ;; we'll want to wait until extra servers have exited
      )))
	 (sfiles   (tt:find-server areapath dbfname))
	 (sdats    (filter car (map tt:server-get-info sfiles))) ;; first element is #f if the file disappeared while being read
	 (sorted   (sort sdats (lambda (a b)
				 (< (list-ref a 2)(list-ref b 2))))))
    (if (null? sorted)
	#f  ;; we'll want to wait until extra servers have exited
	(car sorted))))

(define (tt:send-receive ttdat conn cmd run-id params)
  (let* ((host-port (tt-conn-host-port conn)) ;; (conc (tt-conn-host conn)":"(tt-conn-port conn)))
	 (host      (tt-conn-host conn))
  (let* ((host-port (conc (tt-conn-host conn)":"(tt-conn-port conn)))
	 (dat       (list cmd run-id params)))
    (let-values (((inp oup)(tcp-connect host-port)))
	 (port      (tt-conn-port conn))
	 (dat       (list cmd run-id params #f))) ;; no meta data yet
    (tt:send-receive-direct host port dat)))

(define (tt:send-receive-direct host port dat)
  (assert (number? port) "FATAL: tt:send-receive-direct called with port not a number "port)
  (handle-exceptions
      exn
    #f ;; Add condition-case or better handling here
    (let-values (((inp oup)(tcp-connect host port)))
      (let ((res (if (and inp oup)
		     (begin
		       (serialize dat oup)
		       (close-output-port oup)
		       (deserialize inp))
		     (begin
		       (debug:print 0 *default-log-port* "ERROR: send called but no receiver has been setup. Please call setup first!")
		       #f))))
		     )))
	(close-input-port inp)
	;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP
	res))))



;;======================================================================
;; server
;;======================================================================

(define (tt:sync-dbs ttdat)
  #f)

;; start the listener and start responding to requests
;;
;; NOTE: organise by dbfname, not run-id so we don't need
;;       to pull in more modules
;;
;; This is the routine called in megatest.scm to start a server
;;
(define (tt:start-server areapath run-id dbfname handler)
  (assert areapath "FATAL: areapath not provided for tt:start-server")
  ;; is there already a server for this dbfile? Then exit.
  (let* ((ttdat   (make-tt areapath: areapath))
	 (servers (tt:find-server ttdat dbfname)))
	 (servers (tt:find-server areapath dbfname))) ;; should use tt:get-current-server-info instead
    (tt-handler-set! ttdat handler)
    (if (null? servers)
	(let* ((dbstruct   (dbmod:open-dbmoddb areapath run-id (dbfile:db-init-proc)))
	       (tcp-thread (make-thread
			    (lambda ()
			      (tt:start-tcp-server ttdat)) ;; start the tcp-server which applies handler to incoming data
			    "tcp-server-thread"))
	       (run-thread (make-thread
			    (lambda ()
			      (tt:keep-running ttdat dbfname)))))
	  (thread-start! tcp-thread)
	  (thread-start! run-thread)
	  (thread-join! run-thread) ;; run thread will exit on timeout or other conditions
	  ;;
	  ;; set a flag here to tell tcp-thread to stop running
	  ;;
	  ;; (thread-join! tcp-thread) ;; can't wait 
	  ;;
	  ;; remove the servinfo file
	  ;;
	  ;; close the database, remove lock in on-disk db
	  ;;
	  ;; close the listener ports
	  ;;
	  (exit))
	(let* ((dbstruct   (dbmod:open-dbmoddb areapath run-id (dbfile:db-init-proc))))
	  (tt-handler-set! ttdat (handler dbstruct))
	  (let* ((tcp-thread (make-thread
			      (lambda ()
				(tt:start-tcp-server ttdat)) ;; start the tcp-server which applies handler to incoming data
			      "tcp-server-thread"))
		 (run-thread (make-thread
			      (lambda ()
				(tt:keep-running ttdat dbfname)))))
	    (thread-start! tcp-thread)
	    (thread-start! run-thread)
	    (thread-join! run-thread) ;; run thread will exit on timeout or other conditions
	    ;;
	    ;; set a flag here to tell tcp-thread to stop running
	    ;;
	    ;; (thread-join! tcp-thread) ;; can't wait 
	    ;;
	    ;; remove the servinfo file
	    ;;
	    ;; close the database, remove lock in on-disk db
	    ;;
	    ;; close the listener ports
	    ;;
	    (exit)))
	(begin
	  (debug:print 0 *default-log-port* "INFO: found server(s) already running for db "dbfname", "(string-intersperse servers ",")" Exiting.")
	  (exit)))))

(define (tt:keep-running ttdat dbfname)
  ;; verfiy conn for ready
  ;; listener socket has been started by this stage
258
259
260
261
262
263
264
265

266
267
268
269
270
271
272
300
301
302
303
304
305
306

307
308
309
310
311
312
313
314







-
+







	      (thread-sleep! 1)
	      (loop (+ count 1))))))
  
  (tt:create-server-registration-file ttdat dbfname)
  ;; now start watching the last-access, if it hasn't been touched
  ;; in over ten seconds we exit
  (let loop ()
    (if (< (- (current-seconds) (tt-last-access ttdat)) 10)
    (if (< (- (current-seconds) (tt-last-access ttdat)) 60)
	(begin
	  (thread-sleep! 2)
	  (loop))))
  (if (tt-cleanup-proc ttdat)
      ((tt-cleanup-proc ttdat)))
  (debug:print 0 *default-log-port* "INFO: Server timed out, exiting."))

334
335
336
337
338
339
340
341

342
343

344
345
346
347
348
349
350
351
352

353
354
355

356
357
358
359
360
361
362
376
377
378
379
380
381
382

383


384
385
386
387
388
389
390
391
392

393
394
395

396
397
398
399
400
401
402
403







-
+
-
-
+








-
+


-
+







      serv-id))

;; find valid server
;; get servers listed, last part of name must match :<dbfname>
;; if more than one, wait one second and look again
;; future: ping oldest, if alive remove other :<dbfname> files
;;
(define (tt:find-server ttdat dbfname)
(define (tt:find-server areapath dbfname)
  (let* ((areapath (tt-areapath ttdat))
	 (servdir  (tt:get-servinfo-dir areapath))
  (let* ((servdir  (tt:get-servinfo-dir areapath))
	 (sfiles   (glob (conc servdir"/*:"dbfname))))
    sfiles))

;; given a path to a server info file return: host port startseconds server-id
;; example of what it's looking for in the log file:
;;     SERVER STARTED: 10.38.175.67:50216 AT 1616502350.0 server-id: 4907e90fc55c7a09694e3f658c639cf4 
;;
(define (tt:server-get-info logf)
  (let ((server-rx    (regexp "^SERVER STARTED: (\\S+):(\\d+) AT ([\\d\\.]+) server-id: (\\S+) pid: (\\d+)")) ;; SERVER STARTED: host:port AT timesecs server id
  (let ((server-rx    (regexp "^SERVER STARTED: (\\S+):(\\d+) AT ([\\d\\.]+) server-id: (\\S+) pid: (\\d+) dbfname: (\\S+)")) ;; SERVER STARTED: host:port AT timesecs server id
        (dbprep-rx    (regexp "^SERVER: dbprep"))
        (dbprep-found 0)
	(bad-dat      (list #f #f #f #f #f)))
	(bad-dat      (list #f #f #f #f #f #f)))
    (handle-exceptions
     exn
     (begin
       ;; WARNING: this is potentially dangerous to blanket ignore the errors
       (if (file-exists? logf)
	   (debug:print-info 2 *default-log-port* "Unable to get server info from "logf", exn=" exn))
       bad-dat) ;; no idea what went wrong, call it a bad server
372
373
374
375
376
377
378
379

380
381
382
383
384


385
386
387
388
389
390
391
413
414
415
416
417
418
419

420
421
422
423
424

425
426
427
428
429
430
431
432
433







-
+




-
+
+







		 (if (not mlst)
		     (if (< lnum 500) ;; give up if more than 500 lines of server log read
			 (loop (read-line)(+ lnum 1))
			 (begin 
                           (debug:print-info 0 *default-log-port* "Unable to get server info from first 500 lines of " logf )
                           bad-dat))
		     (match mlst
			    ((_ host port start server-id pid)
			    ((_ host port start server-id pid dbfname)
			     (list host
				   (string->number port)
				   (string->number start)
				   server-id
				   (string->number pid)))
				   (string->number pid)
				   dbfname))
			    (else
			     (debug:print 0 *default-log-port* "ERROR: did not recognise SERVER line info "mlst)
			     bad-dat))))
	       (begin 
		 (if dbprep-found
		     (begin
		       (debug:print-info 2 *default-log-port* "Server is in dbprep at " (common:human-time))