Megatest

Check-in [f249e200e4]
Login
Overview
Comment:Moved sync to inside the server
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v1.80-revolution
Files: files | file ages | folders
SHA1: f249e200e4e0dec56ae90ac24ad073b5745c42e5
User & Date: mrwellan on 2023-11-16 15:49:16
Other Links: branch diff | manifest | tags
Context
2023-11-17
20:16
small tweaks - not there yet check-in: 8a3f889655 user: matt tags: v1.80-revolution
2023-11-16
15:49
Moved sync to inside the server check-in: f249e200e4 user: mrwellan tags: v1.80-revolution
2023-11-15
20:24
Servers consolidated on machine where main.db started check-in: 9a71fda483 user: mrwellan tags: v1.80-revolution
Changes

Modified api.scm from [5e50d724f1] to [f8a9578235].

309
310
311
312
313
314
315
316

317
318
319
320









321
322
323
324
325
326
327
		(payload (list status errmsg result meta)))
	   (set! *api-process-request-count* (- *api-process-request-count* 1))
	   ;; (serialize payload)
	   (api:unregister-thread (current-thread))
	   payload))
	(else
	 (assert #f "FATAL: failed to deserialize indat "indat))))))
       


(define (api:dispatch-request dbstruct cmd run-id params)
  (if (not *no-sync-db*)
      (db:open-no-sync-db))









  (case cmd
    ;;===============================================
    ;; READ/WRITE QUERIES
    ;;===============================================

    ((get-keys-write)                        (db:get-keys dbstruct)) ;; force a dummy "write" query to force server; for debug in -repl
    







|
>




>
>
>
>
>
>
>
>
>







309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
		(payload (list status errmsg result meta)))
	   (set! *api-process-request-count* (- *api-process-request-count* 1))
	   ;; (serialize payload)
	   (api:unregister-thread (current-thread))
	   payload))
	(else
	 (assert #f "FATAL: failed to deserialize indat "indat))))))

(define *api-halt-writes* #f)

(define (api:dispatch-request dbstruct cmd run-id params)
  (if (not *no-sync-db*)
      (db:open-no-sync-db))
  (if (member cmd api:write-queries)
      (let loop ((start-time (current-milliseconds)))
	(if *api-halt-writes*
	    (begin
	      (thread-sleep! 0.2)
	      (if (< (- (current-milliseconds) start-time)
		     5000) ;; hope it don't take more than five seconds to sync
		  (loop start-time)
		  (debug:print 0 *default-log-port* "ERROR: writes halted for more than 5 seconds, sync might be taking too long"))))))
  (case cmd
    ;;===============================================
    ;; READ/WRITE QUERIES
    ;;===============================================

    ((get-keys-write)                        (db:get-keys dbstruct)) ;; force a dummy "write" query to force server; for debug in -repl
    

Modified dbmod.scm from [d4cca4ae8c] to [bc19f724a5].

223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261

262




263
264


265
266
267
268
269
270
271
272
273
274
275
276
    (dbr:dbstruct-dbfile-set!    dbstruct dbfullname)
    (dbr:dbstruct-dbtmpname-set! dbstruct tmpdb)
    (dbr:dbstruct-dbfname-set!   dbstruct dbfname)
    (dbr:dbstruct-sync-proc-set! dbstruct
				 (lambda (last-update)
				   (if *sync-in-progress*
				       (debug:print 3 *default-log-port* "WARNING: overlapping calls to sync to disk")
				       (let* ((syncer-logfile    (conc areapath"/logs/"dbfname"-syncer.log"))
					      (sync-cmd          (if (eq? syncdir 'todisk)
								     (conc "(NBFAKE_LOG="syncer-logfile" nbfake megatest -db2db -from "tmpdb" -to "dbfullname" -period 5 -timeout 10 > /dev/null 2&>1)&")
								     (conc "(NBFAKE_LOG="syncer-logfile" nbfake megatest -db2db -from "dbfullname" -to "tmpdb" -period 5 -timeout 10 > /dev/null 2&>1)&")))
					      (synclock-file     (conc dbfullname".lock"))
					      (syncer-running-file (conc dbfullname"-sync-running"))
					      (synclock-mod-time (if (file-exists? synclock-file)
								     (handle-exceptions
									 exn
								       #f
								       (file-modification-time synclock-file))
								     #f))
					      (thethread         (lambda ()
								   (thread-start!
								    (make-thread
								     (lambda ()
								       (set! *sync-in-progress* #t)
								       (debug:print-info "Running "sync-cmd)
								       (if (file-exists? syncer-running-file)
									   (debug:print-info 0 *default-log-port* "Syncer still running, skipping syncer start.")
									   (system sync-cmd))
								       (set! *sync-in-progress* #f)))))))
					 (if ((if (eq? syncdir 'todisk) < >) ;; use less than for todisk, greater than for from disk
					      (file-modification-time tmpdb)
					      (file-modification-time dbfullname))
					     (debug:print 4 *default-log-port* "Skipping sync, "tmpdb" older than "dbfullname)
					     (if synclock-mod-time
						 (if (> (- (current-seconds) synclock-mod-time) 20) ;; something wrong with sync, remove file
						     (begin
						       (handle-exceptions
							   exn
							 #f

                                                         (begin




                                                           (debug:print 0 *default-log-port* "Sync lock file " synclock-file "is older than 20 seconds ("  synclock-mod-time " seconds). Removing it")
							   (delete-file synclock-file)


                                                         )
                                                       )
						       (thethread))
						     (debug:print 0 *default-log-port* "Skipping sync, lockfile "synclock-file" found."))
						 (thethread)))))))
    ;; (dbmod:sync-tables tables #f db cachedb)
    ;; 
    (thread-sleep! 1) ;; let things settle before syncing in needed data
    (dbmod:sync-gasket tables #f cachedb db dbfullname 'fromdest keys) ;; ) ;; load into cachedb
    (dbr:dbstruct-last-update-set! dbstruct (+ (current-seconds) -10)) ;; should this be offset back in time by one second?
    dbstruct))








<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
<
<
<
>
|
>
>
>
>
|
<
>
>
|
<
<
<
<







223
224
225
226
227
228
229




























230



231
232
233
234
235
236
237

238
239
240




241
242
243
244
245
246
247
    (dbr:dbstruct-dbfile-set!    dbstruct dbfullname)
    (dbr:dbstruct-dbtmpname-set! dbstruct tmpdb)
    (dbr:dbstruct-dbfname-set!   dbstruct dbfname)
    (dbr:dbstruct-sync-proc-set! dbstruct
				 (lambda (last-update)
				   (if *sync-in-progress*
				       (debug:print 3 *default-log-port* "WARNING: overlapping calls to sync to disk")




























				       (begin



					 ;; turn off writes - send busy or block?
					 ;; call db2db internally
					 ;; turn writes back on
					 ;;
					 (set! *api-halt-writes* #t) ;; do we need a mutex?
					 ;; (dbmod:db-to-db-sync src-db dest-db last-update (dbfile:db-init-proc) keys)
					 (debug:print-info 2 *default-log-port* "Internal sync running from "tmpdb" to "dbfullname)

					 (dbmod:db-to-db-sync tmpdb dbfullname last-update (dbfile:db-init-proc) keys)
					 (set! *api-halt-writes* #f)
					 ))))




    ;; (dbmod:sync-tables tables #f db cachedb)
    ;; 
    (thread-sleep! 1) ;; let things settle before syncing in needed data
    (dbmod:sync-gasket tables #f cachedb db dbfullname 'fromdest keys) ;; ) ;; load into cachedb
    (dbr:dbstruct-last-update-set! dbstruct (+ (current-seconds) -10)) ;; should this be offset back in time by one second?
    dbstruct))

855
856
857
858
859
860
861












































		   (ddb    (dbmod:safely-open-db dest-db init-proc d-wr))
		   (res    (dbmod:sync-gasket tables last-update sdb ddb dest-db 'todisk keys)))
	      (sqlite3:finalize! sdb)
	      (sqlite3:finalize! ddb)
	      res)))
      #f))
)



















































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
		   (ddb    (dbmod:safely-open-db dest-db init-proc d-wr))
		   (res    (dbmod:sync-gasket tables last-update sdb ddb dest-db 'todisk keys)))
	      (sqlite3:finalize! sdb)
	      (sqlite3:finalize! ddb)
	      res)))
      #f))
)


;; ATTIC

					 #;(let* ((syncer-logfile    (conc areapath"/logs/"dbfname"-syncer.log"))
					      (sync-cmd          (if (eq? syncdir 'todisk)
								     (conc "(NBFAKE_LOG="syncer-logfile" nbfake megatest -db2db -from "tmpdb" -to "dbfullname" -period 5 -timeout 10 > /dev/null 2&>1)&")
								     (conc "(NBFAKE_LOG="syncer-logfile" nbfake megatest -db2db -from "dbfullname" -to "tmpdb" -period 5 -timeout 10 > /dev/null 2&>1)&")))
					      (synclock-file     (conc dbfullname".lock"))
					      (syncer-running-file (conc dbfullname"-sync-running"))
					      (synclock-mod-time (if (file-exists? synclock-file)
								     (handle-exceptions
									 exn
								       #f
								       (file-modification-time synclock-file))
								     #f))
					      (thethread         (lambda ()
								   (thread-start!
								    (make-thread
								     (lambda ()
								       (set! *sync-in-progress* #t)
								       (debug:print-info "Running "sync-cmd)
								       (if (file-exists? syncer-running-file)
									   (debug:print-info 0 *default-log-port* "Syncer still running, skipping syncer start.")
									   (system sync-cmd))
								       (set! *sync-in-progress* #f)))))))
					 (if ((if (eq? syncdir 'todisk) < >) ;; use less than for todisk, greater than for from disk
					      (file-modification-time tmpdb)
					      (file-modification-time dbfullname))
					     (debug:print 4 *default-log-port* "Skipping sync, "tmpdb" older than "dbfullname)
					     (if synclock-mod-time
						 (if (> (- (current-seconds) synclock-mod-time) 20) ;; something wrong with sync, remove file
						     (begin
						       (handle-exceptions
							   exn
							 #f
                                                         (begin
                                                           (debug:print 0 *default-log-port* "Sync lock file " synclock-file "is older than 20 seconds ("  synclock-mod-time " seconds). Removing it")
							   (delete-file synclock-file)
                                                         )
                                                       )
						       (thethread))
						     (debug:print 0 *default-log-port* "Skipping sync, lockfile "synclock-file" found."))
						 (thethread))))

Modified tcp-transportmod.scm from [ff1d2811a4] to [8a99124972].

256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
			 (debug:print 0 *default-log-port* "Server is loaded, delaying "delay-wait" seconds")
			 (thread-sleep! delay-wait)))))
	     (case status
	       ((busy) ;; result will be how long the server wants you to delay
		(let* ((dly  (if (number? result) result 0.1)))
		  (debug:print 0 *default-log-port* "WARNING: server for "dbfname" is busy, will try again in "dly" seconds.")
		  (thread-sleep! dly)
		  (tt:handler  ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe)))
	       ((loaded)
		(debug:print 0 *default-log-port* "WARNING: server for "dbfname" is loaded, slowing queries.")
		(tt:backoff-incr (tt-conn-host conn)(tt-conn-port conn))
		result) ;; (tt:handler  ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe))
	       (else
		result)))
	    (else ;; did not receive properly formated result
	     (if (not res) ;; tt:send-receive telling us that communication failed
		 (let* ((host    (tt-conn-host conn))
			(port    (tt-conn-port conn))
			;; (dbfname (tt-conn-port conn)) ;; 192.168.0.127:4242-726924:4.db
			(pid     (tt-conn-pid  conn))
                        ;;(servinf (tt-conn-servinf-file conn))) 
			(servinf (tt-servinf-file ttdat))) ;; (conc areapath"/.servinfo/"host":"port"-"pid":"dbfname))) ;; TODO, use (server:get-servinfo-dir areapath)
		   (hash-table-set! (tt-conns ttdat) dbfname #f) ;; clear out the conn for this dbfname to force finding new server
		   (if (and servinf (file-exists? servinf))
		       (begin
			 (if (< attemptnum 10)
			     (begin
			       (thread-sleep! 0.5)
			       (tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe))
			     (begin
			       (debug:print 0 *default-log-port* "INFO: no response from server "host":"port" for "dbfname)
			       (if (and (file-exists? servinf)
					(> (- (current-seconds)(file-modification-time servinf)) 60))
				   (begin
				     (debug:print 0 *default-log-port* "INFO: "servinf" file seems old and no ping response, removing it.")
				     (handle-exceptions
					 exn
				       #f
				       (delete-file* servinf))
				     (tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe))
				   (begin
				     ;; start server - addressed in client-connect-to-server
				     ;; delay        - addressed in client-connect-to-server
				     ;; try again
				     (thread-sleep! 0.25) ;; dunno, I think this needs to be here
				     (tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe))
				   ))))
		       (begin ;; no server file, delay and try again
			 (debug:print 2 *default-log-port* "INFO: connection to server "host":"port" broken for "dbfname", no servinf file. Server exited? ")
			 (thread-sleep! 0.5)
			 (tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe))))
		 (begin ;; this case is where res is malformed. Probably should abort
		   (assert #f "FATAL: tt:handler received bad data "res)
		   ;; (debug:print 0 *default-log-port* "INFO: got corrupt data from server "host":"port", "res", for "dbfname", will try again.")
		   ;; (tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe)
		   )))))
	(begin
	  (thread-sleep! 1) ;; no conn yet set up, give it a rest and try again
	  (tt:handler ttdat cmd run-id params attemptnum readonly-mode dbfname testsuite mtexe)))))

(define (tt:bid-for-servership run-id)
  #f)

;; gets server info and appends path to server file
;; sorts by age, oldest first
;;







|




















|










|





|




|







|







256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
			 (debug:print 0 *default-log-port* "Server is loaded, delaying "delay-wait" seconds")
			 (thread-sleep! delay-wait)))))
	     (case status
	       ((busy) ;; result will be how long the server wants you to delay
		(let* ((dly  (if (number? result) result 0.1)))
		  (debug:print 0 *default-log-port* "WARNING: server for "dbfname" is busy, will try again in "dly" seconds.")
		  (thread-sleep! dly)
		  (tt:handler  ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe server-start-proc)))
	       ((loaded)
		(debug:print 0 *default-log-port* "WARNING: server for "dbfname" is loaded, slowing queries.")
		(tt:backoff-incr (tt-conn-host conn)(tt-conn-port conn))
		result) ;; (tt:handler  ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe))
	       (else
		result)))
	    (else ;; did not receive properly formated result
	     (if (not res) ;; tt:send-receive telling us that communication failed
		 (let* ((host    (tt-conn-host conn))
			(port    (tt-conn-port conn))
			;; (dbfname (tt-conn-port conn)) ;; 192.168.0.127:4242-726924:4.db
			(pid     (tt-conn-pid  conn))
                        ;;(servinf (tt-conn-servinf-file conn))) 
			(servinf (tt-servinf-file ttdat))) ;; (conc areapath"/.servinfo/"host":"port"-"pid":"dbfname))) ;; TODO, use (server:get-servinfo-dir areapath)
		   (hash-table-set! (tt-conns ttdat) dbfname #f) ;; clear out the conn for this dbfname to force finding new server
		   (if (and servinf (file-exists? servinf))
		       (begin
			 (if (< attemptnum 10)
			     (begin
			       (thread-sleep! 0.5)
			       (tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe server-start-proc))
			     (begin
			       (debug:print 0 *default-log-port* "INFO: no response from server "host":"port" for "dbfname)
			       (if (and (file-exists? servinf)
					(> (- (current-seconds)(file-modification-time servinf)) 60))
				   (begin
				     (debug:print 0 *default-log-port* "INFO: "servinf" file seems old and no ping response, removing it.")
				     (handle-exceptions
					 exn
				       #f
				       (delete-file* servinf))
				     (tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe server-start-proc))
				   (begin
				     ;; start server - addressed in client-connect-to-server
				     ;; delay        - addressed in client-connect-to-server
				     ;; try again
				     (thread-sleep! 0.25) ;; dunno, I think this needs to be here
				     (tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe server-start-proc))
				   ))))
		       (begin ;; no server file, delay and try again
			 (debug:print 2 *default-log-port* "INFO: connection to server "host":"port" broken for "dbfname", no servinf file. Server exited? ")
			 (thread-sleep! 0.5)
			 (tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe server-start-proc))))
		 (begin ;; this case is where res is malformed. Probably should abort
		   (assert #f "FATAL: tt:handler received bad data "res)
		   ;; (debug:print 0 *default-log-port* "INFO: got corrupt data from server "host":"port", "res", for "dbfname", will try again.")
		   ;; (tt:handler ttdat cmd run-id params (+ attemptnum 1) readonly-mode dbfname testsuite mtexe)
		   )))))
	(begin
	  (thread-sleep! 1) ;; no conn yet set up, give it a rest and try again
	  (tt:handler ttdat cmd run-id params attemptnum readonly-mode dbfname testsuite mtexe server-start-proc)))))

(define (tt:bid-for-servership run-id)
  #f)

;; gets server info and appends path to server file
;; sorts by age, oldest first
;;
730
731
732
733
734
735
736
737
738
739



740
741
742
743
744
745
746
747
	 (goodfiles '()))

    ;; filter the files here by looking in processes table (if we are not main.db)
    ;; and or look at the time stamp on the servinfo file, a running server will
    ;; touch the file every minute (again, this will only apply for main.db)
    (for-each (lambda (fname)
		(let* ((age (- (current-seconds)(file-modification-time fname))))
		  (if (> age 10) ;; can't trust it if over ten seconds old
		      (begin
			(debug:print 0 *default-log-port* "WARNING: removing stale servinfo file "fname)



			(delete-file fname))
		      (set! goodfiles (cons fname goodfiles)))))
	      sfiles)
    goodfiles))

;; given a path to a server info file return: host port startseconds server-id pid dbfname logf
;; example of what it's looking for in the log file:
;;     SERVER STARTED: 10.38.175.67:50216 AT 1616502350.0 server-id: 4907e90fc55c7a09694e3f658c639cf4 







|


>
>
>
|







730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
	 (goodfiles '()))

    ;; filter the files here by looking in processes table (if we are not main.db)
    ;; and or look at the time stamp on the servinfo file, a running server will
    ;; touch the file every minute (again, this will only apply for main.db)
    (for-each (lambda (fname)
		(let* ((age (- (current-seconds)(file-modification-time fname))))
		  (if (> age 20) ;; can't trust it if over twenty seconds old
		      (begin
			(debug:print 0 *default-log-port* "WARNING: removing stale servinfo file "fname)
			(handle-exceptions
			 exn
			 (debug:print 0 *default-log-port* "WARNING: error attempting to remove stale servinfo file "fname)
			 (delete-file fname))) ;; 
		      (set! goodfiles (cons fname goodfiles)))))
	      sfiles)
    goodfiles))

;; given a path to a server info file return: host port startseconds server-id pid dbfname logf
;; example of what it's looking for in the log file:
;;     SERVER STARTED: 10.38.175.67:50216 AT 1616502350.0 server-id: 4907e90fc55c7a09694e3f658c639cf4