Megatest

Check-in [07e285fe2b]
Login
Overview
Comment:wip
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v1.6584-nanomsg
Files: files | file ages | folders
SHA1: 07e285fe2b84a1b0318ca365c19d061f67fb8489
User & Date: matt on 2021-06-14 05:50:35
Other Links: branch diff | manifest | tags
Context
2021-06-15
05:19
wip check-in: 17e7810cad user: matt tags: v1.6584-nanomsg
2021-06-14
05:50
wip check-in: 07e285fe2b user: matt tags: v1.6584-nanomsg
00:21
wip check-in: 8f1a13e4de user: matt tags: v1.6584-nanomsg
Changes

Modified rmtmod.scm from [95a4ff32da] to [7976b7c5ac].

249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
(define (rmt:general-open-connection remote apath dbname #!key (num-tries 5))
  (let* ((mdbname (db:run-id->dbname #f))
	 (mconn   (rmt:get-conn remote apath mdbname)))
    (cond
     ((or (not mconn) ;; no channel open to main?
	  (< (rmt:conn-expires mconn)(+ (current-seconds) 2))) ;; restablish connection if less than 2 seconds on the lease
      (rmt:open-main-connection remote apath)
      (thread-sleep! 2)
      (rmt:general-open-connection remote apath mdbname))
     ((not (rmt:get-conn remote apath dbname))                 ;; no channel open to dbname?     
      (let* ((res (rmt:send-receive-real remote apath mdbname 'get-server `(,apath ,dbname))))
	(case res
	  ((server-started)
	   (if (> num-tries 0)
	       (begin







<







249
250
251
252
253
254
255

256
257
258
259
260
261
262
(define (rmt:general-open-connection remote apath dbname #!key (num-tries 5))
  (let* ((mdbname (db:run-id->dbname #f))
	 (mconn   (rmt:get-conn remote apath mdbname)))
    (cond
     ((or (not mconn) ;; no channel open to main?
	  (< (rmt:conn-expires mconn)(+ (current-seconds) 2))) ;; restablish connection if less than 2 seconds on the lease
      (rmt:open-main-connection remote apath)

      (rmt:general-open-connection remote apath mdbname))
     ((not (rmt:get-conn remote apath dbname))                 ;; no channel open to dbname?     
      (let* ((res (rmt:send-receive-real remote apath mdbname 'get-server `(,apath ,dbname))))
	(case res
	  ((server-started)
	   (if (> num-tries 0)
	       (begin
272
273
274
275
276
277
278

279

280
281
282
283
284
285
286
		      ;;  "5e34239f48e8973b3813221e54701a01" "24310"
		      ;;  "192.168.0.9"
		      ;;  "/home/matt/data/megatest/tests/simplerun"
		 ;;  ".db/1.db")
		 (match
		  res
		  ((host port servkey pid ipaddr apath dbname)

		   (hash-table-set! (rmt:remote-conns remote)

				    (make-rmt:conn
				     apath: apath
				     dbname: dbname
				     hostport: (conc host":"port)
				     ipaddr: ipaddr
				     port: port
				     srvkey: servkey







>

>







271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
		      ;;  "5e34239f48e8973b3813221e54701a01" "24310"
		      ;;  "192.168.0.9"
		      ;;  "/home/matt/data/megatest/tests/simplerun"
		 ;;  ".db/1.db")
		 (match
		  res
		  ((host port servkey pid ipaddr apath dbname)
		   (debug:print-info 0 *default-log-port* "got "res)
		   (hash-table-set! (rmt:remote-conns remote)
				    dbname
				    (make-rmt:conn
				     apath: apath
				     dbname: dbname
				     hostport: (conc host":"port)
				     ipaddr: ipaddr
				     port: port
				     srvkey: servkey
1469
1470
1471
1472
1473
1474
1475

1476

1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514

1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529

1530
1531
1532
1533
1534
1535
1536
		       #f
		       (begin
			 (bdat-time-to-exit-set! *bdat* #t)
			 #t))))
    (debug:print-info 4 *default-log-port* "starting exit process, finalizing databases.")
    (if (and no-hurry (debug:debug-mode 18))
	(rmt:print-db-stats))

    (let ((th1 (make-thread (lambda () ;; thread for cleaning up, give it five seconds

                              (if *server-info*
				  (let ((dbfile   (servdat-dbfile *server-info*)))
				    (if dbfile
					(let* ((am-server  (args:get-arg "-server"))
					       (dbfile     (args:get-arg "-db"))
					       (apath      *toppath*))
					  ;; do a final sync here
					  (debug:print-info 0 *default-log-port* "Doing final sync for "apath" "dbfile" at "(current-seconds))
					  (db:sync-inmem->disk *dbstruct-db* apath dbfile)
					  (if am-server
					      (if (string-match ".*/main.db$" dbfile)
						  (let ((pkt-file (conc (get-pkts-dir *toppath*)
									"/" (servdat-uuid *server-info*)
									".pkt")))
						    (debug:print-info 0 *default-log-port* "removing pkt "pkt-file)
						    (delete-file* pkt-file)
						    (debug:print-info 0 *default-log-port* "Releasing lock for "dbfile)
						    (db:with-lock-db (servdat-dbfile *server-info*)
								     (lambda (dbh dbfile)
								       (db:release-lock dbh dbfile))))
						  (let* ((sdat *server-info*) ;; we have a run-id server
							 (host (servdat-host sdat))
							 (port (servdat-port sdat))
							 (uuid (servdat-uuid sdat)))
						    (debug:print-info 0 *default-log-port* "deregistering server "host":"port" with uuid "uuid)
						    #;(rmt:send-receive-real *rmt:remote* *toppath*
									   (db:run-id->dbname #f)
									   'deregister-server
									   `(,(servdat-uuid sdat)
									     ,(current-process-id)
									     ,(servdat-host sdat)   ;; iface
									     ,(servdat-port sdat)))
						    (rmt:send-receive 'deregister-server #f
								      `(,(servdat-uuid sdat)
									,(current-process-id)
									,(servdat-host sdat)   ;; iface
									,(servdat-port sdat)))
						    )))))))

			      ;; (if *dbstruct-db* (db:close-all *dbstruct-db*)) ;; one second allocated
			      #;(if (bdat-task-db *bdat*)    ;; TODO: Check that this is correct for task db
				  (let ((db (cdr (bdat-task-db *bdat*))))
				    (if (sqlite3:database? db)
					(begin
					  (debug:print-info 0 *default-log-port* "Closing down task db "db)
					  (sqlite3:interrupt! db)
					  (sqlite3:finalize! db #t)
					  (bdat-task-db-set! *bdat* #f)))))
                              #;(http-client#close-idle-connections!)
                              (if (not (eq? *default-log-port* (current-error-port)))
                                  (close-output-port *default-log-port*))
			      (set! *default-log-port* (current-error-port))) "Cleanup db exit thread"))
	  (th2 (make-thread (lambda ()
			      (debug:print 4 *default-log-port* "Attempting clean exit. Please be patient and wait a few seconds...")

			      (if no-hurry
                                  (begin
                                    (thread-sleep! 5)) ;; give the clean up few seconds to do it's stuff
                                  (begin
				    (thread-sleep! 2)))
      			      (debug:print 4 *default-log-port* " ... done")
      			      )







>
|
>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
<
<
<
<
<
<
<
<
<
<
<
<
|
>
|
|
|
|
|
|
|
|
|
|
|
|
|

|
>







1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504












1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
		       #f
		       (begin
			 (bdat-time-to-exit-set! *bdat* #t)
			 #t))))
    (debug:print-info 4 *default-log-port* "starting exit process, finalizing databases.")
    (if (and no-hurry (debug:debug-mode 18))
	(rmt:print-db-stats))
    (let ((th1 (make-thread
		(lambda () ;; thread for cleaning up, give it five seconds
		  (let* ((start-time (current-seconds)))
		    (if *server-info*
			(let ((dbfile   (servdat-dbfile *server-info*)))
			  (if dbfile
			      (let* ((am-server  (args:get-arg "-server"))
				     (dbfile     (args:get-arg "-db"))
				     (apath      *toppath*))
				;; do a final sync here
				(debug:print-info 0 *default-log-port* "Doing final sync for "apath" "dbfile" at "(current-seconds))
				(db:sync-inmem->disk *dbstruct-db* apath dbfile)
				(if am-server
				    (if (string-match ".*/main.db$" dbfile)
					(let ((pkt-file (conc (get-pkts-dir *toppath*)
							      "/" (servdat-uuid *server-info*)
							      ".pkt")))
					  (debug:print-info 0 *default-log-port* "removing pkt "pkt-file)
					  (delete-file* pkt-file)
					  (debug:print-info 0 *default-log-port* "Releasing lock for "dbfile)
					  (db:with-lock-db (servdat-dbfile *server-info*)
							   (lambda (dbh dbfile)
							     (db:release-lock dbh dbfile))))
					(let* ((sdat *server-info*) ;; we have a run-id server
					       (host (servdat-host sdat))
					       (port (servdat-port sdat))
					       (uuid (servdat-uuid sdat)))
					  (debug:print-info 0 *default-log-port* "deregistering server "host":"port" with uuid "uuid)












					  )))))))
		    (debug:print-info 0 *default-log-port* "Shutdown activities completed in "(- (current-seconds) start-time)" seconds"))
		  ;; (if *dbstruct-db* (db:close-all *dbstruct-db*)) ;; one second allocated
		  #;(if (bdat-task-db *bdat*)    ;; TODO: Check that this is correct for task db
		  (let ((db (cdr (bdat-task-db *bdat*))))
		  (if (sqlite3:database? db)
		  (begin
		  (debug:print-info 0 *default-log-port* "Closing down task db "db)
		  (sqlite3:interrupt! db)
		  (sqlite3:finalize! db #t)
		  (bdat-task-db-set! *bdat* #f)))))
		  #;(http-client#close-idle-connections!)
		  (if (not (eq? *default-log-port* (current-error-port)))
		      (close-output-port *default-log-port*))
		  (set! *default-log-port* (current-error-port))) "Cleanup db exit thread"))
	  (th2 (make-thread (lambda ()
			      (debug:print 4 *default-log-port* "Attempting clean exit. Mode="(if no-hurry "no-hurry" "normal")
					   " Please be patient and wait a few seconds...")
			      (if no-hurry
                                  (begin
                                    (thread-sleep! 5)) ;; give the clean up few seconds to do it's stuff
                                  (begin
				    (thread-sleep! 2)))
      			      (debug:print 4 *default-log-port* " ... done")
      			      )
2123
2124
2125
2126
2127
2128
2129







2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
		 (> (+ last-access server-timeout)
		    (current-seconds)))
	    (if (common:low-noise-print 120 "server continuing")
		(debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)))
	    (loop 0 bad-sync-count (current-milliseconds)))
	   (else
	    (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access))







	    (http-transport:server-shutdown port))))))))

(define (http-transport:server-shutdown port)
  (begin
    ;;(BB> "http-transport:server-shutdown called")
    (debug:print-info 0 *default-log-port* "Starting to shutdown the server. pid="(current-process-id))
    ;;
    ;; start_shutdown
    ;;

    ;; deregister the server

    
    (bdat-time-to-exit-set! *bdat* #t) ;; tell on-exit to be fast as we've already cleaned up
    (portlogger:open-run-close portlogger:set-port port "released")
    (thread-sleep! 1)

    ;; (debug:print-info 0 *default-log-port* "Max cached queries was    " *max-cache-size*)
    ;; (debug:print-info 0 *default-log-port* "Number of cached writes   " *number-of-writes*)
    ;; (debug:print-info 0 *default-log-port* "Average cached write time "
    ;; 		      (if (eq? *number-of-writes* 0)
    ;; 			  "n/a (no writes)"
    ;; 			  (/ *writes-total-delay*
    ;; 			     *number-of-writes*))
    ;; 		      " ms")
    ;; (debug:print-info 0 *default-log-port* "Number non-cached queries "  *number-non-write-queries*)
    ;; (debug:print-info 0 *default-log-port* "Average non-cached time   "
    ;; 		      (if (eq? *number-non-write-queries* 0)
    ;; 			  "n/a (no queries)"
    ;; 			  (/ *total-non-write-delay* 
    ;; 			     *number-non-write-queries*))
    ;; 		      " ms")
    
    (db:print-current-query-stats)
    (common:save-pkt `((action . exit)
                       (T      . server)
                       (pid    . ,(current-process-id)))
                     *configdat* #t)
    (debug:print-info 0 *default-log-port* "Server shutdown complete. Exiting")
    (exit)))

;; Call this to start the actual server







>
>
>
>
>
>
>














|



















|







2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
		 (> (+ last-access server-timeout)
		    (current-seconds)))
	    (if (common:low-noise-print 120 "server continuing")
		(debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)))
	    (loop 0 bad-sync-count (current-milliseconds)))
	   (else
	    (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access))
	    (if (not (string-match ".db/main.db" (args:get-arg "-db")))
		(let* ((res (rmt:send-receive 'deregister-server #f
					      `(,(servdat-uuid sdat)
						,(current-process-id)
						,(servdat-host sdat)   ;; iface
						,(servdat-port sdat)))))
		(debug:print-info 0 *default-log-port* "deregistered-server, res="res)))
	    (http-transport:server-shutdown port))))))))

(define (http-transport:server-shutdown port)
  (begin
    ;;(BB> "http-transport:server-shutdown called")
    (debug:print-info 0 *default-log-port* "Starting to shutdown the server. pid="(current-process-id))
    ;;
    ;; start_shutdown
    ;;

    ;; deregister the server

    
    (bdat-time-to-exit-set! *bdat* #t) ;; tell on-exit to be fast as we've already cleaned up
    ;; (portlogger:open-run-close portlogger:set-port port "released") ;; done in rmt:run
    (thread-sleep! 1)

    ;; (debug:print-info 0 *default-log-port* "Max cached queries was    " *max-cache-size*)
    ;; (debug:print-info 0 *default-log-port* "Number of cached writes   " *number-of-writes*)
    ;; (debug:print-info 0 *default-log-port* "Average cached write time "
    ;; 		      (if (eq? *number-of-writes* 0)
    ;; 			  "n/a (no writes)"
    ;; 			  (/ *writes-total-delay*
    ;; 			     *number-of-writes*))
    ;; 		      " ms")
    ;; (debug:print-info 0 *default-log-port* "Number non-cached queries "  *number-non-write-queries*)
    ;; (debug:print-info 0 *default-log-port* "Average non-cached time   "
    ;; 		      (if (eq? *number-non-write-queries* 0)
    ;; 			  "n/a (no queries)"
    ;; 			  (/ *total-non-write-delay* 
    ;; 			     *number-non-write-queries*))
    ;; 		      " ms")
    
    (db:print-current-query-stats)
    #;(common:save-pkt `((action . exit)
                       (T      . server)
                       (pid    . ,(current-process-id)))
                     *configdat* #t)
    (debug:print-info 0 *default-log-port* "Server shutdown complete. Exiting")
    (exit)))

;; Call this to start the actual server