Megatest

Check-in [9b4be80a9a]
Login
Overview
Comment:wip
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v1.6584-ck5
Files: files | file ages | folders
SHA1: 9b4be80a9a656c42bd3b7c5d1d66cd1003009a21
User & Date: matt on 2021-05-23 22:32:03
Other Links: branch diff | manifest | tags
Context
2021-05-24
04:16
wip Leaf check-in: 82185ccf67 user: matt tags: v1.6584-ck5
2021-05-23
22:32
wip check-in: 9b4be80a9a user: matt tags: v1.6584-ck5
2021-05-22
05:15
Fixed server registration for run dbs, cleaned up a bit in rmtmod.scm check-in: 36de6e8829 user: matt tags: v1.6584-ck5
Changes

Modified rmtmod.scm from [12431684cd] to [ee8447e2f7].

99
100
101
102
103
104
105

























106
107
108
109
110
111
112
	)

(defstruct alldat
  (areapath #f)
  (ulexdat  #f)
  )


























;; (include "db_records.scm")

;;======================================================================
;; return the handle struct for sending queries to a specific database
;;  - initializes the connection object if this is the first access
;;    - finds the "captain" and asks who to talk to for the given dbfname
;;    - establishes the connection to the current dbowner







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
	)

(defstruct alldat
  (areapath #f)
  (ulexdat  #f)
  )


;; (require-extension (srfi 18) extras tcp s11n)
;; 
;; 
;; (use  srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest posix-extras)
;; 
;; (use spiffy uri-common intarweb http-client spiffy-request-vars intarweb spiffy-directory-listing)
;; 
;; Configurations for server
(tcp-buffer-size 2048)
(max-connections 2048) 

(defstruct servdat
  (host #f)
  (port #f)
  (uuid #f)
  (dbfile #f)
  (api-url #f)
  (api-uri #f)
  (api-req #f)
  (status 'starting))

(define (servdat->url sdat)
  (conc (servdat-host sdat)":"(servdat-port sdat)))

;; (include "db_records.scm")

;;======================================================================
;; return the handle struct for sending queries to a specific database
;;  - initializes the connection object if this is the first access
;;    - finds the "captain" and asks who to talk to for the given dbfname
;;    - establishes the connection to the current dbowner
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
		       (begin
			 (bdat-time-to-exit-set! *bdat* #t)
			 #t))))
    (debug:print-info 4 *default-log-port* "starting exit process, finalizing databases.")
    (if (and no-hurry (debug:debug-mode 18))
	(rmt:print-db-stats))
    (let ((th1 (make-thread (lambda () ;; thread for cleaning up, give it five seconds
                              (if *dbstruct-db* (db:close-all *dbstruct-db*)) ;; one second allocated
			      (if *server-info*
				  (let ((pkt-file (conc (get-pkts-dir *toppath*)
							"/" (servdat-uuid *server-info*)
							".pkt"))
					(dbfile   (servdat-dbfile *server-info*)))
				    (if dbfile
					(begin








<
|







1479
1480
1481
1482
1483
1484
1485

1486
1487
1488
1489
1490
1491
1492
1493
		       (begin
			 (bdat-time-to-exit-set! *bdat* #t)
			 #t))))
    (debug:print-info 4 *default-log-port* "starting exit process, finalizing databases.")
    (if (and no-hurry (debug:debug-mode 18))
	(rmt:print-db-stats))
    (let ((th1 (make-thread (lambda () ;; thread for cleaning up, give it five seconds

                              (if *server-info*
				  (let ((pkt-file (conc (get-pkts-dir *toppath*)
							"/" (servdat-uuid *server-info*)
							".pkt"))
					(dbfile   (servdat-dbfile *server-info*)))
				    (if dbfile
					(begin

1481
1482
1483
1484
1485
1486
1487

1488
1489
1490
1491
1492
1493
1494
					      (rmt:send-receive-real *rmt:remote* *toppath*
								     (db:run-id->dbname #f)
								     'deregister-server
								     `(,(servdat-uuid sdat)
								       ,(current-process-id)
								       ,(servdat-host sdat)   ;; iface
								       ,(servdat-port sdat)))))))))

			      (if (bdat-task-db *bdat*)    ;; TODO: Check that this is correct for task db
				  (let ((db (cdr (bdat-task-db *bdat*))))
				    (if (sqlite3:database? db)
					(begin
					  (sqlite3:interrupt! db)
					  (sqlite3:finalize! db #t)
					  (bdat-task-db-set! *bdat* #f)))))







>







1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
					      (rmt:send-receive-real *rmt:remote* *toppath*
								     (db:run-id->dbname #f)
								     'deregister-server
								     `(,(servdat-uuid sdat)
								       ,(current-process-id)
								       ,(servdat-host sdat)   ;; iface
								       ,(servdat-port sdat)))))))))
			      (if *dbstruct-db* (db:close-all *dbstruct-db*)) ;; one second allocated
			      (if (bdat-task-db *bdat*)    ;; TODO: Check that this is correct for task db
				  (let ((db (cdr (bdat-task-db *bdat*))))
				    (if (sqlite3:database? db)
					(begin
					  (sqlite3:interrupt! db)
					  (sqlite3:finalize! db #t)
					  (bdat-task-db-set! *bdat* #f)))))
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
  (let* ((sdat       (servdat-init #f host port server-id)))
    (rmt:send-receive sdat 'ping '())))

;;======================================================================
;; http-transportmod.scm contents moved here
;;======================================================================

;; (require-extension (srfi 18) extras tcp s11n)
;; 
;; 
;; (use  srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest posix-extras)
;; 
;; (use spiffy uri-common intarweb http-client spiffy-request-vars intarweb spiffy-directory-listing)
;; 
;; Configurations for server
(tcp-buffer-size 2048)
(max-connections 2048) 

(defstruct servdat
  (host #f)
  (port #f)
  (uuid #f)
  (dbfile #f)
  (api-url #f)
  (api-uri #f)
  (api-req #f))

(define (servdat->url sdat)
  (conc (servdat-host sdat)":"(servdat-port sdat)))

(define (http-transport:make-server-url hostport)
  (if (not hostport)
      #f
      (conc "http://" (car hostport) ":" (cadr hostport))))

;;======================================================================
;; S E R V E R







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







1553
1554
1555
1556
1557
1558
1559























1560
1561
1562
1563
1564
1565
1566
  (let* ((sdat       (servdat-init #f host port server-id)))
    (rmt:send-receive sdat 'ping '())))

;;======================================================================
;; http-transportmod.scm contents moved here
;;======================================================================
























(define (http-transport:make-server-url hostport)
  (if (not hostport)
      #f
      (conc "http://" (car hostport) ":" (cadr hostport))))

;;======================================================================
;; S E R V E R
2051
2052
2053
2054
2055
2056
2057

2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076



2077
2078

2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103

2104
2105
2106
2107
2108
2109

2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121

2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
								   ,(current-process-id)
								   ,iface
								   ,apath
								   ,dbname)))

(define (http-transport:wait-for-stable-interface #!optional (num-tries-allowed 100))
  ;; wait until *server-info* stops changing

  (let loop ((sdat  #f) ;; this is our copy of the *last* *server-info*
	     (tries 0))
    ;; first we verify port and interface, update *server-info* in need be.
    (cond
     ((> tries num-tries-allowed)
      (debug:print 0 *default-log-port* "http-transport:keep-running, giving up after trying for several minutes.")
      (exit 1))
     ((not *server-info*)
      (thread-sleep! 0.25)
      (loop *server-info* (+ tries 1)))
     ((not sdat)
      (debug:print 0 *default-log-port* "http-transport:keep-running, still no interface, tries="tries)
      (thread-sleep! 0.25)
      (loop *server-info* (+ tries 1)))
     ((or (not (equal? (servdat-host sdat)(servdat-host *server-info*)))
	  (not (equal? (servdat-port sdat)(servdat-port *server-info*))))
      (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info")
      (thread-sleep! 0.25)
      (loop *server-info* (+ tries 1)))



     (else
      (if (not *server-id*)(set! *server-id* (server:mk-signature)))

      (debug:print 0 *default-log-port*
		   "SERVER STARTED: " (servdat-host *server-info*)
		   ":" (servdat-port *server-info*)
		   " AT " (current-seconds) " server-id: " *server-id*)
      (flush-output *default-log-port*)
      #t))))

;; run http-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (http-transport:keep-running dbname) 
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server")

  (let* ((server-start-time (current-seconds))
	 (pkts-dir          (get-pkts-dir))
	 (server-key        (server:mk-signature))
	 (is-main           (equal? (args:get-arg "-db") ".db/main.db"))
	 (last-access       0)
	 (server-timeout    (server:expiration-timeout)))
    ;; exits if nothing found in 100 tries (switch to a duration would be good)
    (http-transport:wait-for-stable-interface)
    (if is-main (http-transport:wait-for-server pkts-dir dbname server-key))

    (let* ((iface             (servdat-host *server-info*))
	   (port              (servdat-port *server-info*)))
      (let loop ((count         0)
		 (server-state 'available)
		 (bad-sync-count 0)
		 (start-time     (current-milliseconds)))

	;; set up the database handle
	(if (not *dbstruct-db*) ;; no db opened yet, open the db and register with main if appropriate
	    (let ((watchdog (bdat-watchdog *bdat*)))
	      (debug:print 0 *default-log-port* "SERVER: dbprep")
	      (db:setup dbname) ;; sets *dbstruct-db* as side effect
	      
	      ;; IFF I'm not main, call into main and register self
	      (if (not is-main)
		  (let ((res (rmt:register-server *rmt:remote*
						  *toppath* iface port
						  server-key dbname)))
		    (if (not res) ;; we are not the server!

			(begin
			  (debug:print 0 *default-log-port* "We are not the server for "dbname", exiting.")
			  (exit)))))
	      (debug:print 0 *default-log-port*
			   "SERVER: running, megatest version: "
			   (common:get-full-version)) 
	      (if watchdog
		  (if (not (member (thread-state watchdog)
				   '(ready running blocked
					   sleeping dead)))
		      (begin
			(debug:print-info 0 *default-log-port* "Starting watchdog thread (in state "(thread-state watchdog)")")
			(thread-start! watchdog)))
		  (debug:print 0 *default-log-port* "ERROR: *watchdog* not setup, cannot start it."))
	      (loop (+ count 1) 'running bad-sync-count start-time)))
	
	;; when things go wrong we don't want to be doing the various
	;; queries too often so we strive to run this stuff only every
	;; four seconds or so.
	(let* ((sync-time (- (current-milliseconds) start-time))
	       (rem-time  (quotient (- 4000 sync-time) 1000)))
	  (if (and (<= rem-time 4)
		   (>  rem-time 0))
	      (thread-sleep! rem-time)))
	
	(if (< count 1) ;; 3x3 = 9 secs aprox
	    (loop (+ count 1) 'running bad-sync-count (current-milliseconds)))
	
	;; Transfer *db-last-access* to last-access to use in checking that we are still alive
	(mutex-lock! *heartbeat-mutex*)
	(set! last-access *db-last-access*)
	(mutex-unlock! *heartbeat-mutex*)
	
	(if (common:low-noise-print 60 "dbstats")
	    (begin
	      (debug:print 0 *default-log-port* "Server stats:")
	      (db:print-current-query-stats)))
	(let* ((hrs-since-start  (/ (- (current-seconds) server-start-time) 3600)))
	  (cond
	   ((and *server-run*
		 (> (+ last-access server-timeout)
		    (current-seconds)))
	    (if (common:low-noise-print 120 "server continuing")
		(debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)))
	    (loop 0 server-state bad-sync-count (current-milliseconds)))
	   (else
	    (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access))
	    (http-transport:server-shutdown port))))))))

(define (http-transport:server-shutdown port)
  (begin
    ;;(BB> "http-transport:server-shutdown called")







>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
>
>
>
|
|
>
|
|
|
|
|
|



















>



<


>


|








|
>














|











|

















|







2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114

2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
								   ,(current-process-id)
								   ,iface
								   ,apath
								   ,dbname)))

(define (http-transport:wait-for-stable-interface #!optional (num-tries-allowed 100))
  ;; wait until *server-info* stops changing
  (let* ((stime (current-seconds)))
    (let loop ((sdat  #f) ;; this is our copy of the *last* *server-info*
	       (tries 0))
      ;; first we verify port and interface, update *server-info* in need be.
      (cond
       ((> tries num-tries-allowed)
	(debug:print 0 *default-log-port* "http-transport:keep-running, giving up after trying for several minutes.")
	(exit 1))
       ((not *server-info*)
	(thread-sleep! 0.25)
	(loop *server-info* (+ tries 1)))
       ((not sdat)
	(debug:print 0 *default-log-port* "http-transport:keep-running, still no interface, tries="tries)
	(thread-sleep! 0.25)
	(loop *server-info* (+ tries 1)))
       ((or (not (equal? (servdat-host sdat)(servdat-host *server-info*)))
	    (not (equal? (servdat-port sdat)(servdat-port *server-info*))))
	(debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info")
	(thread-sleep! 0.25)
	(loop *server-info* (+ tries 1)))
       ((< (- (current-seconds) stime) 3) ;; keep up the looping until at least 3 seconds have passed
	(thread-sleep! 1)
	(loop *server-info* (+ tries 1)))
       (else
	(if (not *server-id*)(set! *server-id* (server:mk-signature)))
	(servdat-status-set! *server-info* 'interface-alive)
	(debug:print 0 *default-log-port*
		     "SERVER STARTED: " (servdat-host *server-info*)
		     ":" (servdat-port *server-info*)
		     " AT " (current-seconds) " server-id: " *server-id*)
	(flush-output *default-log-port*)
	#t)))))

;; run http-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (http-transport:keep-running dbname) 
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server")

  (let* ((server-start-time (current-seconds))
	 (pkts-dir          (get-pkts-dir))
	 (server-key        (server:mk-signature))
	 (is-main           (equal? (args:get-arg "-db") ".db/main.db"))
	 (last-access       0)
	 (server-timeout    (server:expiration-timeout)))
    ;; exits if nothing found in 100 tries (switch to a duration would be good)
    (http-transport:wait-for-stable-interface)
    (if is-main (http-transport:wait-for-server pkts-dir dbname server-key))
    ;; this is our forever loop
    (let* ((iface             (servdat-host *server-info*))
	   (port              (servdat-port *server-info*)))
      (let loop ((count         0)

		 (bad-sync-count 0)
		 (start-time     (current-milliseconds)))
	(debug:print-info 0 *default-log-port* "servdat-status is " (servdat-status *server-info*) ", is-main="is-main)
	;; set up the database handle
	(if (not *dbstruct-db*) ;; no db opened yet, open the db and register with main if appropriate
	    (let ((watchdog (bdat-watchdog *bdat*)))		 
	      (debug:print 0 *default-log-port* "SERVER: dbprep")
	      (db:setup dbname) ;; sets *dbstruct-db* as side effect
	      
	      ;; IFF I'm not main, call into main and register self
	      (if (not is-main)
		  (let ((res (rmt:register-server *rmt:remote*
						  *toppath* iface port
						  server-key dbname)))
		    (if res ;; we are not the server!
			(servdat-status-set! *server-info* 'have-interface-and-db)
			(begin
			  (debug:print 0 *default-log-port* "We are not the server for "dbname", exiting.")
			  (exit)))))
	      (debug:print 0 *default-log-port*
			   "SERVER: running, megatest version: "
			   (common:get-full-version)) 
	      (if watchdog
		  (if (not (member (thread-state watchdog)
				   '(ready running blocked
					   sleeping dead)))
		      (begin
			(debug:print-info 0 *default-log-port* "Starting watchdog thread (in state "(thread-state watchdog)")")
			(thread-start! watchdog)))
		  (debug:print 0 *default-log-port* "ERROR: *watchdog* not setup, cannot start it."))
	      (loop (+ count 1) bad-sync-count start-time)))
	
	;; when things go wrong we don't want to be doing the various
	;; queries too often so we strive to run this stuff only every
	;; four seconds or so.
	(let* ((sync-time (- (current-milliseconds) start-time))
	       (rem-time  (quotient (- 4000 sync-time) 1000)))
	  (if (and (<= rem-time 4)
		   (>  rem-time 0))
	      (thread-sleep! rem-time)))
	
	(if (< count 1) ;; 3x3 = 9 secs aprox
	    (loop (+ count 1) bad-sync-count (current-milliseconds)))
	
	;; Transfer *db-last-access* to last-access to use in checking that we are still alive
	(mutex-lock! *heartbeat-mutex*)
	(set! last-access *db-last-access*)
	(mutex-unlock! *heartbeat-mutex*)
	
	(if (common:low-noise-print 60 "dbstats")
	    (begin
	      (debug:print 0 *default-log-port* "Server stats:")
	      (db:print-current-query-stats)))
	(let* ((hrs-since-start  (/ (- (current-seconds) server-start-time) 3600)))
	  (cond
	   ((and *server-run*
		 (> (+ last-access server-timeout)
		    (current-seconds)))
	    (if (common:low-noise-print 120 "server continuing")
		(debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)))
	    (loop 0 bad-sync-count (current-milliseconds)))
	   (else
	    (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access))
	    (http-transport:server-shutdown port))))))))

(define (http-transport:server-shutdown port)
  (begin
    ;;(BB> "http-transport:server-shutdown called")

Modified tests/unittests/basicserver.scm from [d928562f12] to [fc6484b63a].

339
340
341
342
343
344
345
346
;;     (list "" "a" "b" "c" "d" "e" "f" "g" "h" "i" "j")))
;;  (list "test1" "test2" "test3" "test4" "test5"))
;; 
;; 
;; (test #f '(#t "exit process started") (rmt:kill-server)) ;; *toppath* *my-client-signature* #f)))
;; 

(exit)







|
339
340
341
342
343
344
345
346
;;     (list "" "a" "b" "c" "d" "e" "f" "g" "h" "i" "j")))
;;  (list "test1" "test2" "test3" "test4" "test5"))
;; 
;; 
;; (test #f '(#t "exit process started") (rmt:kill-server)) ;; *toppath* *my-client-signature* #f)))
;; 

;; (exit)