Megatest

Check-in [3541d27302]
Login
Overview
Comment:wip. rmt:get-keys now works
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v2.0001
Files: files | file ages | folders
SHA1: 3541d273024fdbc321c99db6102d0aa71aa539f5
User & Date: matt on 2022-01-03 17:38:33
Other Links: branch diff | manifest | tags
Context
2022-01-03
18:24
wip, more tests passing check-in: 3333a49fd4 user: matt tags: v2.0001
17:38
wip. rmt:get-keys now works check-in: 3541d27302 user: matt tags: v2.0001
12:01
main.db and <run>.db servers working with ulex check-in: 2f2d804be0 user: matt tags: v2.0001
Changes

Modified rmtmod.scm from [77a44ba5d3] to [708190534e].

198
199
200
201
202
203
204
205
206
207
208

209
210
211

212
213
214
215
216
217

218



219
220
221
222
223
224
225
226
227
228
229
230
231
232
;;
;; TODO: This is unnecessarily re-creating the record in the hash table
;;
(define (rmt:open-main-connection remdat apath)
  (let* ((fullpath (db:dbname->path apath "/.db/main.db"))
	 (conns    (servdat-conns remdat))
	 (conn     (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this
	 (myconn   (if *db-serv-info*
		       (servdat-uconn *db-serv-info*)
		       (let* ((th1 (make-thread (lambda ()(rmt:run (get-host-name))) "non-db mode server")))
			 (thread-start! th1)

			 (let loop ((count 0))
			   (assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection")
			   (if (not *db-serv-info*)

			       (begin
				 (thread-sleep! 1)
				 (loop (+ count 1)))
			       (begin
				 (servdat-mode-set! *db-serv-info* 'non-db)
				 (servdat-uconn *db-serv-info*))))))))

    (cond



     ((and conn                                             ;; conn is NOT a socket, just saying ...
	   (< (current-seconds) (conndat-expires conn)))
      #t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died 
     ((and conn
	   (>= (current-seconds)(conndat-expires conn)))
      (debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.")
      (hash-table-set! conns fullpath #f) ;; clean up
      (rmt:open-main-connection remdat apath))
     (else
      ;; Below we will find or create and connect to main
      (let* ((dbname         (db:run-id->dbname #f))
	     (the-srv        (rmt:find-main-server myconn apath dbname))
	     (start-main-srv (lambda () ;; call IF there is no the-srv found
			       (mutex-lock! *connstart-mutex*)







|
<
|
|
>
|
|
|
>
|
|
|
|
|
|
>

>
>
>






|







198
199
200
201
202
203
204
205

206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
;;
;; TODO: This is unnecessarily re-creating the record in the hash table
;;
(define (rmt:open-main-connection remdat apath)
  (let* ((fullpath (db:dbname->path apath "/.db/main.db"))
	 (conns    (servdat-conns remdat))
	 (conn     (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this
	 (start-rmt:run (lambda ()

			  (let* ((th1 (make-thread (lambda ()(rmt:run (get-host-name))) "non-db mode server")))
			    (thread-start! th1)
			    (thread-sleep! 1)
			    (let loop ((count 0))
			      (assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection")
			      (if (or (not *db-serv-info*)
				      (not (servdat-uconn *db-serv-info*)))
				  (begin
				    (thread-sleep! 1)
				    (loop (+ count 1)))
				  (begin
				    (servdat-mode-set! *db-serv-info* 'non-db)
				    (servdat-uconn *db-serv-info*)))))))
	 (myconn    (servdat-uconn *db-serv-info*)))
    (cond
     ((not myconn)
      (start-rmt:run)
      (rmt:open-main-connection remdat apath))
     ((and conn                                             ;; conn is NOT a socket, just saying ...
	   (< (current-seconds) (conndat-expires conn)))
      #t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died 
     ((and conn
	   (>= (current-seconds)(conndat-expires conn)))
      (debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.")
      (hash-table-delete! conns fullpath) ;; clean up
      (rmt:open-main-connection remdat apath))
     (else
      ;; Below we will find or create and connect to main
      (let* ((dbname         (db:run-id->dbname #f))
	     (the-srv        (rmt:find-main-server myconn apath dbname))
	     (start-main-srv (lambda () ;; call IF there is no the-srv found
			       (mutex-lock! *connstart-mutex*)
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
;; (define *localmode* #t)
(define *localmode* #f)
(define *dbstruct* (make-dbr:dbstruct))

;; Defaults to current area
;;
(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f))
  ;; (if (not *remotedat*)(set! *remotedat* (make-remotedat)))
  (let* ((apath      *toppath*)
	 (sinfo      *db-serv-info*)
	 (dbname     (db:run-id->dbname rid)))
    (if *localmode*
	(let* ((dbdat    (dbr:dbstruct-get-dbdat *dbstruct* dbname))
	       (indat    `((cmd . ,cmd)(params . ,params))))
	  (api:execute-requests *dbstruct* cmd params)
	  ;; (api:process-request *dbstruct* indat)
	  ;; (api:process-request dbdat indat)
	  )
	(begin
	  (rmt:open-main-connection sinfo apath)
	  (if rid (rmt:general-open-connection sinfo apath dbname))
	  (rmt:send-receive-real sinfo apath dbname cmd params)))))

;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
;; sometime in the future
;;
(define (rmt:send-receive-real sinfo apath dbname cmd params)
  (let* ((cdat (rmt:get-conn sinfo apath dbname)))
    (assert cdat "FATAL: rmt:send-receive-real called without the needed channels opened")
    (let* ((key     #f)
	   #;(payload `(,cmd ;; (cmd    . ,cmd)(key    .
		      ,(conndat-srvkey cdat)
		      ,params))
	   (uconn    (servdat-uconn sinfo))
	   (res      (send-receive uconn (conndat-hostport cdat) cmd params))) ;; payload)))
      (if (member res '("#<unspecified>")) ;; TODO - fix this in string->sexpr
	  #f
	  res))))

;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
;; sometime in the future.







<




<
<
|
<
<
<











<
<
<
<
|







339
340
341
342
343
344
345

346
347
348
349


350



351
352
353
354
355
356
357
358
359
360
361




362
363
364
365
366
367
368
369
;; (define *localmode* #t)
(define *localmode* #f)
(define *dbstruct* (make-dbr:dbstruct))

;; Defaults to current area
;;
(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f))

  (let* ((apath      *toppath*)
	 (sinfo      *db-serv-info*)
	 (dbname     (db:run-id->dbname rid)))
    (if *localmode*


	(api:execute-requests *dbstruct* cmd params)



	(begin
	  (rmt:open-main-connection sinfo apath)
	  (if rid (rmt:general-open-connection sinfo apath dbname))
	  (rmt:send-receive-real sinfo apath dbname cmd params)))))

;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
;; sometime in the future
;;
(define (rmt:send-receive-real sinfo apath dbname cmd params)
  (let* ((cdat (rmt:get-conn sinfo apath dbname)))
    (assert cdat "FATAL: rmt:send-receive-real called without the needed channels opened")




    (let* ((uconn    (servdat-uconn sinfo))
	   (res      (send-receive uconn (conndat-hostport cdat) cmd params))) ;; payload)))
      (if (member res '("#<unspecified>")) ;; TODO - fix this in string->sexpr
	  #f
	  res))))

;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
;; sometime in the future.
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805

1806
1807
1808
1809
1810
1811
1812
	
(define (server-ready? uconn host-port key) ;; server-address is host:port
  (let* ((params `((cmd . ping)(key . ,key)))
	 (data `((cmd . ping)
		 (key . ,key)
		 (params . ,params))) ;; I don't get it.
	 (res  (send-receive uconn host-port 'ping data)))
    (if res
	(car res)
	res)))


; from the pkts return servers associated with dbpath
;; NOTE: Only one can be alive - have to check on each
;;       in the list of pkts returned
;;
(define (get-viable-servers serv-pkts dbpath)
  (let loop ((tail serv-pkts)







|
|
|
>







1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
	
(define (server-ready? uconn host-port key) ;; server-address is host:port
  (let* ((params `((cmd . ping)(key . ,key)))
	 (data `((cmd . ping)
		 (key . ,key)
		 (params . ,params))) ;; I don't get it.
	 (res  (send-receive uconn host-port 'ping data)))
    (if (eq? res 'ack) ;; yep, likely it is who we want on the other end
	res
	#f)))
;; (begin (debug:print-info 0 *default-log-port* "server-ready? => "res) #f))))

; from the pkts return servers associated with dbpath
;; NOTE: Only one can be alive - have to check on each
;;       in the list of pkts returned
;;
(define (get-viable-servers serv-pkts dbpath)
  (let loop ((tail serv-pkts)

Modified tests/unittests/basicserver.scm from [eb62de6943] to [d569827954].

25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
(import rmtmod trace http-client apimod dbmod
	launchmod srfi-69 ulex system-information)

(trace-call-sites #t)
(trace
 ;; get-the-server
 ;; db:get-dbdat
 rmt:find-main-server
;;  rmt:send-receive-real
;;  rmt:send-receive
 ;; sexpr->string
 server-ready?
 ;; rmt:register-server
 api:run-server-process
 rmt:open-main-connection
 ;; rmt:general-open-connection
 ;; rmt:get-conny
 ;; common:watchdog
 ;; rmt:find-main-server
 ;; get-all-server-pkts
 ;; get-viable-servers
 ;; get-best-candidate







|



|

|
|







25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
(import rmtmod trace http-client apimod dbmod
	launchmod srfi-69 ulex system-information)

(trace-call-sites #t)
(trace
 ;; get-the-server
 ;; db:get-dbdat
 ;; rmt:find-main-server
;;  rmt:send-receive-real
;;  rmt:send-receive
 ;; sexpr->string
 ;; server-ready?
 ;; rmt:register-server
 ;; api:run-server-process
 ;; rmt:open-main-connection
 ;; rmt:general-open-connection
 ;; rmt:get-conny
 ;; common:watchdog
 ;; rmt:find-main-server
 ;; get-all-server-pkts
 ;; get-viable-servers
 ;; get-best-candidate
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107

108
109
110
111
112
;; switch to *db-serv-info* instead of *servdat*
(define *uconn* (servdat-uconn *db-serv-info*))
(print "*uconn*: " *uconn*)
(test #f #t (ulex-listener? (servdat-uconn *db-serv-info*)))
(test #f #t (string? (udat-host-port *uconn*)))

(run-in-thread
 (test #f #t (server-ready? *uconn* (udat-host-port *uconn*) (servdat-uuid *db-serv-info*))))
  
(test #f #t (rmt:open-main-connection *db-serv-info* *toppath*))
;; (pp (hash-table->alist (remotedat-conns *db-serv-info*)))
(test #f #t (conndat? (rmt:get-conn *db-serv-info* *toppath* ".db/main.db")))
(exit)

(define *main*  (rmt:get-conn *db-serv-info* *toppath* ".db/main.db"))

;; (for-each (lambda (tdat)
;; 	    (test #f tdat (loop-test (rmt:conn-ipaddr *main*)
;; 				     (rmt:conn-port *main*) tdat)))
;; 	  (list 'a
;; 		'(a "b" 123 1.23 )))
(test #f #t (rmt:send-receive 'ping #f 'hello))

(define *db* (db:setup ".db/main.db"))

;; these let me cut and paste from source easily
(define apath *toppath*)
(define dbname ".db/2.db")
(define remote *db-serv-info*)
(define keyvals  '(("SYSTEM" "a")("RELEASE" "b")))


(test #f '() (string->sexpr "()"))
(test #f 'server-started (api:execute-requests *db* 'get-server (list *toppath* ".db/2.db")))
(set! *dbstruct-db* #f)

(exit)







|




<

<
<
<
<
<
<
<
<
<
<
<
<
<
<



>
|
<
<


77
78
79
80
81
82
83
84
85
86
87
88

89














90
91
92
93
94


95
96
;; switch to *db-serv-info* instead of *servdat*
(define *uconn* (servdat-uconn *db-serv-info*))
(print "*uconn*: " *uconn*)
(test #f #t (ulex-listener? (servdat-uconn *db-serv-info*)))
(test #f #t (string? (udat-host-port *uconn*)))

(run-in-thread
 (test #f 'ack (server-ready? *uconn* (udat-host-port *uconn*) (servdat-uuid *db-serv-info*))))
  
(test #f #t (rmt:open-main-connection *db-serv-info* *toppath*))
;; (pp (hash-table->alist (remotedat-conns *db-serv-info*)))
(test #f #t (conndat? (rmt:get-conn *db-serv-info* *toppath* ".db/main.db")))
















(define remote *db-serv-info*)
(define keyvals  '(("SYSTEM" "a")("RELEASE" "b")))

(run-in-thread
 (test #f (map car keyvals) (rmt:get-keys)))



(exit)

Modified ulex/ulex.scm from [5cd5e6659a] to [5f309c38c1].

208
209
210
211
212
213
214




215
216
217
218
219
220


221
222
223

224


225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
	      (close-output-port oup)
	      res)))))) ;; res will always be 'ack

;; send a request to the given host-port and register a mailbox in udata
;; wait for the mailbox data and return it
;;
(define (send-receive uconn host-port cmd data)




  (let* ((cmbox     (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse?
	 (qrykey    (car cmbox))
	 (mbox      (cdr cmbox))
	 (mbox-time (current-milliseconds)))
    (if (eq? (send uconn host-port qrykey cmd data) 'ack)
	(let* ((mbox-timeout-secs    120) ;; timeout)


	       (mbox-timeout-result 'MBOX_TIMEOUT)
	       (res                  (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
	       (mbox-receive-time    (current-milliseconds)))

	  (if (eq? res 'MBOX_TIMEOUT)


	      #f  ;; convert to raising exception?
	      res))
	(begin
	  (print "ERROR: Communication failed?")
	  #f)))) ;; #f means failed to communicate

;;======================================================================
;; responder side
;;======================================================================

;; take a request, rdat, and if not immediate put it in the work queue
;;
;; Reserved cmds; ack ping goodbye response
;;
(define (ulex-handler uconn rdat)
  (assert (list? rdat) "FATAL: ulex-handler give rdat as not list")
  (match rdat ;;  (string-split controldat)
    ((rem-host-port qrykey cmd params)
     ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params)
     (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f)))
       (case cmd
	 ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack)
	 ((ping)
	  ;; (print "Got Ping!")
	  (add-to-work-queue uconn rdat)
	 'ack)
	 ((goodbye)
	  ;; just clear out references to the caller
	  (add-to-work-queue uconn rdat)
	  'ack)
	 ((response) ;; this is a result from remote processing, send it as mail ...
	  (if mbox







>
>
>
>
|
|
|
|
|
|
>
>
|
|
|
>
|
>
>
|
|
|
|
|



















|







208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
	      (close-output-port oup)
	      res)))))) ;; res will always be 'ack

;; send a request to the given host-port and register a mailbox in udata
;; wait for the mailbox data and return it
;;
(define (send-receive uconn host-port cmd data)
  (cond
   ((member cmd '(ping goodbye)) ;; these are immediate
    (send uconn host-port 'ping cmd data))
   (else
    (let* ((cmbox     (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse?
	   (qrykey    (car cmbox))
	   (mbox      (cdr cmbox))
	   (mbox-time (current-milliseconds)))
      (if (eq? (send uconn host-port qrykey cmd data) 'ack)
	  (let* ((mbox-timeout-secs    (if (eq? 'primordial (thread-name (current-thread)))
					   #f
					   120)) ;; timeout)
		 (mbox-timeout-result 'MBOX_TIMEOUT)
		 (res                  (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result))
		 (mbox-receive-time    (current-milliseconds)))
	    (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it?
	    (if (eq? res 'MBOX_TIMEOUT)
		(begin
		  (print "WARNING: mbox timed out for query "cmd", with data "data)
		  #f)  ;; convert to raising exception?
		res))
	  (begin
	    (print "ERROR: Communication failed?")
	    #f)))))) ;; #f means failed to communicate

;;======================================================================
;; responder side
;;======================================================================

;; take a request, rdat, and if not immediate put it in the work queue
;;
;; Reserved cmds; ack ping goodbye response
;;
(define (ulex-handler uconn rdat)
  (assert (list? rdat) "FATAL: ulex-handler give rdat as not list")
  (match rdat ;;  (string-split controldat)
    ((rem-host-port qrykey cmd params)
     ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params)
     (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f)))
       (case cmd
	 ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack)
	 ((ping)
	  ;; (print "Got Ping!")
	  ;; (add-to-work-queue uconn rdat)
	 'ack)
	 ((goodbye)
	  ;; just clear out references to the caller
	  (add-to-work-queue uconn rdat)
	  'ack)
	 ((response) ;; this is a result from remote processing, send it as mail ...
	  (if mbox