Megatest

Check-in [97a3c4ad11]
Login
Overview
Comment:wip, close idle db connections
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v1.80-close-idle-connections
Files: files | file ages | folders
SHA1: 97a3c4ad117281ac58073e623e2771896b46773b
User & Date: matt on 2023-02-05 08:36:51
Other Links: branch diff | manifest | tags
Context
2023-02-07
16:24
fix for assert check-in: a7577f7a9b user: pjhatwal tags: v1.80-close-idle-connections
2023-02-05
08:36
wip, close idle db connections check-in: 97a3c4ad11 user: matt tags: v1.80-close-idle-connections
2023-02-02
12:54
Use an actual droop check-in: 19861e6399 user: matt tags: v1.80
Changes

Modified api.scm from [0676a2f9d1] to [e4088e0a3d].

16
17
18
19
20
21
22
23

24
25
26
27
28
29
30
16
17
18
19
20
21
22

23
24
25
26
27
28
29
30







-
+







;;     GNU General Public License for more details.
;; 
;;     You should have received a copy of the GNU General Public License
;;     along with Megatest.  If not, see <http://www.gnu.org/licenses/>.
;;
;;======================================================================

(use srfi-69 posix)
(use srfi-69 posix srfi-18)

(declare (unit api))
(declare (uses rmt))
(declare (uses db))
(declare (uses dbmod))
(declare (uses dbfile))
(declare (uses tasks))
139
140
141
142
143
144
145

146










147
148
149
150
151
152
153
154

155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164

165






166
167
168
169
170
171
172
173
174




175
176
177
178
179
180
181







+

+
+
+
+
+
+
+
+
+
+







-
+
-
-
-
-
-
-









-
-
-
-








    ;; TASKS
    tasks-add
    tasks-set-state-given-param-key
    ))

(define *db-write-mutexes* (make-hash-table))
(define *api-watchdog* #f)

(define (api:watchdog dbstruct) ;; trim not-used sqlite3 db handles
  (let* ((th1 (make-thread (lambda ()
			     (let loop ()
			       (thread-sleep! 60) ;; 2x the age we close at
			       (db:close-old dbstruct)
			       (loop)))
			   "api:watchdog thread")))
    (thread-start! th1)
    (set! *api-watchdog* th1)))
    
;; These are called by the server on recipt of /api calls
;;    - keep it simple, only return the actual result of the call, i.e. no meta info here
;;
;;    - returns #( flag result )
;;
(define (api:execute-requests dbstruct dat)
  (db:open-no-sync-db) ;; sets *no-sync-db*
;;   (handle-exceptions
  (if (not *api-watchdog*)(api:watchdog dbstruct))
;;    exn
;;    (let ((call-chain (get-call-chain)))
;;      (debug:print 0 *default-log-port* "WARNING: api:execute-requests received an exception from peer, dat=" dat ", exn=" exn)
;;      (print-call-chain (current-error-port))
;;      (debug:print 0 *default-log-port* " message: "  ((condition-property-accessor 'exn 'message) exn))       
  ;;      (vector #f (vector exn call-chain dat))) ;; return some stuff for debug if an exception happens
  (if (> *api-process-request-count* 200)
      (begin
	(if (common:low-noise-print 30 "too many threads")
	    (debug:print 0 *default-log-port* "WARNING: "*api-process-request-count*" threads, potential overload, adding 0.5 sec delay."))
	(thread-sleep! 0.5) ;; take a nap
	))
   (cond
    ((not (vector? dat))                    ;; it is an error to not receive a vector
     (vector #f (vector #f "remote must be called with a vector")))
    #;((> *api-process-request-count* 200) ;; 20)
     (debug:print 0 *default-log-port* "WARNING: api:execute-requests received an overloaded message.")
     (set! *server-overloaded* #t)
     (vector #f (vector #f 'overloaded))) ;; the inner vector is what gets returned. nope, don't know why. please refactor!
    (else  
     (let* ((cmd-in            (vector-ref dat 0))
            (cmd               (if (symbol? cmd-in)
				   cmd-in
				   (string->symbol cmd-in)))
            (params            (vector-ref dat 1))
	    (run-id            (if (null? params)

Modified db.scm from [66cca5d3c4] to [9fa55aaa76].

549
550
551
552
553
554
555
556
557

558
559
560
561

562
563
564
565
566
567
568
569
570
549
550
551
552
553
554
555


556
557
558
559

560


561
562
563
564
565
566
567







-
-
+



-
+
-
-







		       ;; (> (- (current-seconds) time1) 3)) ;; if file is changed and three seconds have passed.
		       #t)
		      ((and changed *time-to-exit*) ;; last sync
		       #t)
		      (else
		       #f))))
          (if (or dejunk do-cp)
	     (let* (
                    (start-time (current-milliseconds))
	     (let* ((start-time (current-milliseconds))

                    (subdb (or (dbfile:get-subdb dbstruct run-id) (dbfile:init-subdb dbstruct run-id dbfile:db-init-proc)))
                    (mtdb      (dbr:subdb-mtdbdat subdb))
                    (tmpdb     (dbfile:open-db dbstruct run-id dbfile:db-init-proc))
                    (tmpdb     (dbfile:open-db dbstruct run-id dbfile:db-init-proc)))

                    )
	       (debug:print-info 2 *default-log-port* "delta syncing file: " srcfile ", time diff: " (- time1 time2) " seconds")

               (if old2new
                 (begin
                   (if dejunk (db:clean-up run-id mtdb))
		   (db:sync-tables (db:sync-all-tables-list dbstruct (db:get-keys dbstruct)) #f mtdb tmpdb)
                 )
4347
4348
4349
4350
4351
4352
4353
4354
4355
4356
4357
4358
4359
4360
4361
4362
4363
4364
4365
4366
4367
4368
4369
4370
4371
4372
4373
4374
4375
4376
4377
4378
4379
4380
4381




























4382
4383
4384
4385
4386
4387
4388
4344
4345
4346
4347
4348
4349
4350




























4351
4352
4353
4354
4355
4356
4357
4358
4359
4360
4361
4362
4363
4364
4365
4366
4367
4368
4369
4370
4371
4372
4373
4374
4375
4376
4377
4378
4379
4380
4381
4382
4383
4384
4385







-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







;; moving watch dogs here due to dependencies
;;======================================================================

;;======================================================================
;; currently the primary job of the watchdog is to run the sync back to megatest.db from the db in /tmp
;; if we are on the homehost and we are a server (by definition we are on the homehost if we are a server)
;;
(define (common:readonly-watchdog dbstruct)
  (thread-sleep! 0.05) ;; delay for startup
  (debug:print-info 13 *default-log-port* "common:readonly-watchdog entered.")
  ;; sync megatest.db to /tmp/.../megatst.db
  (let* ((sync-cool-off-duration   3)
        (golden-mtdb     (dbr:dbstruct-mtdb dbstruct))
        (golden-mtpath   (db:dbdat-get-path golden-mtdb))
        (tmp-mtdb        (dbr:dbstruct-tmpdb dbstruct))
        (tmp-mtpath      (db:dbdat-get-path tmp-mtdb)))
    (debug:print-info 0 *default-log-port* "Read-only periodic sync thread started.")
    (let loop ((last-sync-time 0))
      (debug:print-info 13 *default-log-port* "loop top tmp-mtpath="tmp-mtpath" golden-mtpath="golden-mtpath)
      (let* ((duration-since-last-sync (- (current-seconds) last-sync-time)))
        (debug:print-info 13 *default-log-port* "duration-since-last-sync="duration-since-last-sync)
        (if (and (not *time-to-exit*)
                 (< duration-since-last-sync sync-cool-off-duration))
            (thread-sleep! (- sync-cool-off-duration duration-since-last-sync)))
        (if (not *time-to-exit*)
            (let ((golden-mtdb-mtime (file-modification-time golden-mtpath))
                  (tmp-mtdb-mtime    (file-modification-time tmp-mtpath)))
	      (if (> golden-mtdb-mtime tmp-mtdb-mtime)
		  (if (< golden-mtdb-mtime (- (current-seconds) 3)) ;; file has NOT been touched in past three seconds, this way multiple servers won't fight to sync back
		      (let ((res (db:multi-db-sync dbstruct 'old2new)))
			(debug:print-info 13 *default-log-port* "rosync called, " res " records transferred."))))
              (loop (current-seconds)))
            #t)))
    (debug:print-info 0 *default-log-port* "Exiting readonly-watchdog timer, *time-to-exit* = " *time-to-exit*" pid="(current-process-id)" mtpath="golden-mtpath)))

;; (define (common:readonly-watchdog dbstruct)
;;   (thread-sleep! 0.05) ;; delay for startup
;;   (debug:print-info 13 *default-log-port* "common:readonly-watchdog entered.")
;;   ;; sync megatest.db to /tmp/.../megatst.db
;;   (let* ((sync-cool-off-duration   3)
;;         (golden-mtdb     (dbr:dbstruct-mtdb dbstruct))
;;         (golden-mtpath   (db:dbdat-get-path golden-mtdb))
;;         (tmp-mtdb        (dbr:dbstruct-tmpdb dbstruct))
;;         (tmp-mtpath      (db:dbdat-get-path tmp-mtdb)))
;;     (debug:print-info 0 *default-log-port* "Read-only periodic sync thread started.")
;;     (let loop ((last-sync-time 0))
;;       (debug:print-info 13 *default-log-port* "loop top tmp-mtpath="tmp-mtpath" golden-mtpath="golden-mtpath)
;;       (let* ((duration-since-last-sync (- (current-seconds) last-sync-time)))
;;         (debug:print-info 13 *default-log-port* "duration-since-last-sync="duration-since-last-sync)
;;         (if (and (not *time-to-exit*)
;;                  (< duration-since-last-sync sync-cool-off-duration))
;;             (thread-sleep! (- sync-cool-off-duration duration-since-last-sync)))
;;         (if (not *time-to-exit*)
;;             (let ((golden-mtdb-mtime (file-modification-time golden-mtpath))
;;                   (tmp-mtdb-mtime    (file-modification-time tmp-mtpath)))
;; 	      (if (> golden-mtdb-mtime tmp-mtdb-mtime)
;; 		  (if (< golden-mtdb-mtime (- (current-seconds) 3)) ;; file has NOT been touched in past three seconds, this way multiple servers won't fight to sync back
;; 		      (let ((res (db:multi-db-sync dbstruct 'old2new)))
;; 			(debug:print-info 13 *default-log-port* "rosync called, " res " records transferred."))))
;;               (loop (current-seconds)))
;;             #t)))
;;     (debug:print-info 0 *default-log-port* "Exiting readonly-watchdog timer, *time-to-exit* = " *time-to-exit*" pid="(current-process-id)" mtpath="golden-mtpath)))
;; 

;; Get a lock from the no-sync-db for the from-db, then copy the from-db to the to-db, otherwise return #f

(define (db:lock-and-sync no-sync-db from-db to-db)
  (assert (not *db-sync-in-progress*) "FATAL: db:lock-and-sync called while a sync is in progress.")
  (let* ((lockdat  (db:no-sync-get-lock no-sync-db from-db))
	 (gotlock  (car lockdat))

Modified dbfile.scm from [25f8271ef2] to [8d67468750].

64
65
66
67
68
69
70

71
72
73
74
75
76
77
78
79
80
81
82
83
84



85
86
87
88
89
90
91
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84

85
86
87
88
89
90
91
92
93
94







+













-
+
+
+







  (dbname      #f) ;; .megatest/1.db
  (mtdbfile    #f) ;; mtrah/.megatest/1.db
  (mtdbdat     #f) ;; only need one of these for syncing
  ;; (dbdats      (make-hash-table))  ;; id => dbdat 
  (tmpdbfile   #f) ;; /tmp/.../.megatest/1.db
  ;; (refndbfile  #f) ;; /tmp/.../.megatest/1.db_ref
  (dbstack     (make-stack)) ;; stack for tmp dbr:dbdat,
  (stack-mutex (make-mutex)) ;; gate pop, push, peek and replace with this mutex (allows safe clean up of old handles)
  (homehost    #f) ;; not used yet
  (on-homehost #f) ;; not used yet
  (read-only   #f)
  (last-sync   0)
  (last-write  (current-seconds))
  )                ;; goal is to converge on one struct for an area but for now it is too confusing

;; need to keep dbhandles and cached statements together
(defstruct dbr:dbdat
  (dbfile      #f)
  (dbh         #f)    
  (stmt-cache  (make-hash-table))
  (read-only   #f)
  (birth-sec   (current-seconds)))
  (birth-sec   (current-seconds))
  (last-used   (current-seconds))
  (in-use      #f))

(define *dbstruct-dbs* #f)
(define *db-open-mutex* (make-mutex))
(define *db-access-mutex* (make-mutex)) ;; used in common.scm
(define *no-sync-db*   #f)
(define *db-sync-in-progress* #f)
(define *db-with-db-mutex*    (make-mutex))
126
127
128
129
130
131
132
133

134
135
136
137
138
139
140
141
142
143
144






145
146
147
148
149
150
151
152






153
154
155


156
157
158
159




























160
161
162
163
164
165
166
167
129
130
131
132
133
134
135

136











137
138
139
140
141
142








143
144
145
146
147
148



149
150




151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178

179
180
181
182
183
184
185







-
+
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
-
-
-
-
-
-
-
-
+
+
+
+
+
+
-
-
-
+
+
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-







             (dbfile:print-err "db:safely-close-sqlite3-db: " db " is not an sqlite3 db")
	     #f
            )
        ))))

;; close all opened run-id dbs
(define (db:close-all dbstruct)
  (if (dbr:dbstruct? dbstruct)
  (assert (dbr:dbstruct? dbstruct) "FATAL: db:close-all called with non-dbstruct "dbstruct)
;; (handle-exceptions
;; 	  exn
;; 	  (begin
;; 	    (debug:print 0 *default-log-port* "WARNING: Finalizing failed, "  ((condition-property-accessor 'exn 'message) exn) ", note - exn=" exn)
;; 	    (print-call-chain *default-log-port*))
	;; (db:sync-touched dbstruct 0 force-sync: #t) ;; NO. Do not do this here. Instead we rely on a server to be started when there are writes, even if the server itself is not going to be used as a server.
        (let* ((subdbs     (hash-table-values (dbr:dbstruct-subdbs dbstruct))))
	  (for-each
	   (lambda (subdb)
	     (let* ((tdbs       (stack->list (dbr:subdb-dbstack subdb)))
		    (mtdbdat    (dbr:dbdat-dbh (dbr:subdb-mtdbdat subdb)))
  (let* ((subdbs     (hash-table-values (dbr:dbstruct-subdbs dbstruct))))
    (for-each
     (lambda (subdb)
       (mutex-lock! (dbr:subdb-stack-mutex subdb))
       (let* ((tdbs       (stack->list (dbr:subdb-dbstack subdb)))
	      (mtdbdat    (dbr:dbdat-dbh (dbr:subdb-mtdbdat subdb))))
		    #;(rdb        (dbr:dbdat-dbh (dbr:subdb-refndb subdb))))
		    
	       (map (lambda (dbdat)
		      (let* ((stmt-cache (dbr:dbdat-stmt-cache dbdat))
			     (dbh        (dbr:dbdat-dbh        dbdat)))
			(db:safely-close-sqlite3-db dbh stmt-cache)))
		    tdbs)
	       (db:safely-close-sqlite3-db mtdbdat (dbr:dbdat-stmt-cache  (dbr:subdb-mtdbdat subdb))) 
	 (map (lambda (dbdat)
		(let* ((stmt-cache (dbr:dbdat-stmt-cache dbdat))
		       (dbh        (dbr:dbdat-dbh        dbdat)))
		  (db:safely-close-sqlite3-db dbh stmt-cache)))
	      tdbs)
	 (db:safely-close-sqlite3-db mtdbdat (dbr:dbdat-stmt-cache  (dbr:subdb-mtdbdat subdb))))
               ;; (if (sqlite3:database? mdb) (sqlite3:finalize! mdb))
	       #;(db:safely-close-sqlite3-db rdb #f))) ;; stmt-cache))))) ;; (if (sqlite3:database? rdb) (sqlite3:finalize! rdb))))))
	   subdbs)
       (mutex-unlock! (dbr:subdb-stack-mutex subdb)))
     subdbs)))
           #t
          )
          #f
  )

;; close  opened run-id dbs that haven't been used in age seconds
(define (db:close-old dbstruct #!key (age 30)) ;; close dbs older than this age
  (assert (dbr:dbstruct? dbstruct) "FATAL: db:close-all called with non-dbstruct "dbstruct)
  (let* ((subdbs     (hash-table-values (dbr:dbstruct-subdbs dbstruct))))
    (for-each
     (lambda (subdb)
       (mutex-lock! (dbr:subdb-stack-mutex subdb))
       (let* ((tdbs       (stack->list (dbr:subdb-dbstack subdb)))
	      (mtdbdat    (dbr:dbdat-dbh (dbr:subdb-mtdbdat subdb))))
	 (dbr:subdb-dbstack-set! subdb (make-stack)) ;; replace the stack with a new one
	 (map (lambda (dbdat)
		(assert (dbr:dbdat-in-use dbdat) "FATAL: dbdat in stack was in use "(dbr:dbdat-dbfile dbdat))
		(if (< (- (current-seconds)
			  (dbr:dbdat-last-used dbdat))
		       age)
		    (stack-push! (dbr:subdb-dbstack subdb) dbdat)    ;; keep it
		    (let* ((stmt-cache (dbr:dbdat-stmt-cache dbdat)) ;; close and discard
			   (dbh        (dbr:dbdat-dbh        dbdat)))
		      (dbfile:print-err "INFO: closing unused dbdat for "(dbr:dbdat-dbfile dbdat))
		      (db:safely-close-sqlite3-db dbh stmt-cache))))
	      tdbs)
	 (let* ((size  (stack-count (dbr:subdb-dbstack subdb)))
		(delta (- (length tdbs) size)))
	   (if (> delta 0)
	       (dbfile:print-err "INFO: removed "delta" and "size" dbs left."))))
       (mutex-unlock! (dbr:subdb-stack-mutex subdb)))
     subdbs)))
)

;; ;; set up a single db (e.g. main.db, 1.db ... etc.)
;; ;;
;; (define (db:setup-db dbstruct areapath run-id)
;;   (let* ((dbname   (db:run-id->dbname run-id))
;; 	 (dbstruct (hash-table-ref/default dbstructs dbname #f)))
;;     (if dbstruct
232
233
234
235
236
237
238

239
240


241
242






243
244
245
246

247
248
249
250
251
252
253
250
251
252
253
254
255
256
257


258
259


260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277







+
-
-
+
+
-
-
+
+
+
+
+
+




+







;;    if run-id is a string treat it as a filename
;;    if db already open - return inmem
;;    if db not open, open inmem, rundb and sync then return inmem
;;    inuse gets set automatically for rundb's
;;
(define (dbfile:get-dbdat dbstruct run-id)
  (let* ((subdb (dbfile:get-subdb dbstruct run-id)))
    (mutex-lock! (dbr:subdb-stack-mutex subdb))
    (if (stack-empty? (dbr:subdb-dbstack subdb))
	#f
    (let* ((res (if (stack-empty? (dbr:subdb-dbstack subdb))
		    #f
	(begin
	  (stack-pop! (dbr:subdb-dbstack subdb))))))
		    (let ((dbdat (stack-pop! (dbr:subdb-dbstack subdb))))
		      (dbr:dbdat-last-used-set! dbdat (current-seconds))
		      (dbr:dbdat-in-use-set! dbdat #t)
		      dbdat))))
      (mutex-unlock! (dbr:subdb-stack-mutex subdb))
      res)))

;; return a previously opened db handle to the stack of available handles
(define (dbfile:add-dbdat dbstruct run-id dbdat)
  (let* ((subdb (dbfile:get-subdb dbstruct run-id)))
    (dbr:dbdat-in-use-set! dbdat #f)
    (stack-push! (dbr:subdb-dbstack subdb) dbdat)
    dbdat))

;; set up a subdb
;;
(define (dbfile:init-subdb dbstruct run-id init-proc)
  (let* ((dbname    (dbfile:run-id->dbname run-id))