Megatest

Check-in [27aae9f29d]
Login
Overview
Comment:Partially disabled transaction based write coallesing
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 27aae9f29d2462d1f2518130b446b5a8ae79d2e7
User & Date: matt on 2013-11-10 15:14:49
Other Links: manifest | tags
Context
2013-11-10
17:06
Manual merge from api branch check-in: d79fb960e6 user: matt tags: trunk
15:14
Partially disabled transaction based write coallesing check-in: 27aae9f29d user: matt tags: trunk
03:43
Added syncing of runs table check-in: 7693c01883 user: matt tags: trunk
Changes

Modified db.scm from [6f20a1d6d0] to [8d243ee646].

1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958








1959
1960
1961
1962
1963
1964
1965
1966
			       set-verbosity
			       killserver
			       ))

;; not used, intended to indicate to run in calling process
(define db:run-local-queries '()) ;; rollup-tests-pass-fail))

(define (db:process-cached-writes db)
  (let ((queries    (make-hash-table))
	(data       #f))
    (mutex-lock! *incoming-mutex*)
    ;; data is a list of query packets <vector qry-sig query params
    (set! data (reverse *incoming-writes*)) ;;  (sort ... (lambda (a b)(< (vector-ref a 1)(vector-ref b 1)))))
    (set! *server:last-write-flush* (current-milliseconds))
    (set! *incoming-writes* '())
    (mutex-unlock! *incoming-mutex*)
    (if (> (length data) 0)
	;; Process if we have data
	(begin
	  (debug:print-info 7 "Writing cached data " data)
	  
	  ;; Prepare the needed sql statements
	  ;;
	  (for-each (lambda (request-item)
		      (let ((stmt-key (vector-ref request-item 0))
			    (query    (vector-ref request-item 1)))
			(hash-table-set! queries stmt-key (sqlite3:prepare db query))))
		    data)
	  
	  ;; No outer loop needed. Single loop for write items only. Reads trigger flush of queue
	  ;; and then are executed.
	  (sqlite3:with-transaction 
	   db
	   (lambda ()
	     (for-each
	      (lambda (hed)
		(let* ((params   (vector-ref hed 2))
		       (stmt-key (vector-ref hed 0))
		       (stmt     (hash-table-ref/default queries stmt-key #f)))
		  (if stmt
		      (apply sqlite3:execute stmt params)
		      (debug:print 0 "ERROR: Problem Executing " stmt-key " for " params))))
	      data)))
	  
	  ;; let all the waiting calls know all is done
	  (mutex-lock! *completed-mutex*)
	  (for-each (lambda (item)
		      (let ((qry-sig (cdb:packet-get-client-sig item)))
			(debug:print-info 7 "Registering query " qry-sig " as done")
			(hash-table-set! *completed-writes* qry-sig #t)))
		    data)
	  (mutex-unlock! *completed-mutex*)
	  
	  ;; Finalize the statements. Should this be done inside the mutex above?
	  ;; I think sqlite3 mutexes will keep the data safe
	  (for-each (lambda (stmt-key)
		      (sqlite3:finalize! (hash-table-ref queries stmt-key)))
		    (hash-table-keys queries))
	  
	  ;; Do a little record keeping
	  (let ((cache-size (length data)))
	    (if (> cache-size *max-cache-size*)
		(set! *max-cache-size* cache-size)))
	  #t)








	#f)))

(define *db:process-queue-mutex* (make-mutex))

(define *number-of-writes*         0)
(define *writes-total-delay*       0)
(define *total-non-write-delay*    0)
(define *number-non-write-queries* 0)







|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
>
>
>
>
>
>
>
>
|







1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
			       set-verbosity
			       killserver
			       ))

;; not used, intended to indicate to run in calling process
(define db:run-local-queries '()) ;; rollup-tests-pass-fail))

;; (define (db:process-cached-writes db)
;;   (let ((queries    (make-hash-table))
;; 	(data       #f))
;;     (mutex-lock! *incoming-mutex*)
;;     ;; data is a list of query packets <vector qry-sig query params
;;     (set! data (reverse *incoming-writes*)) ;;  (sort ... (lambda (a b)(< (vector-ref a 1)(vector-ref b 1)))))
;;     (set! *server:last-write-flush* (current-milliseconds))
;;     (set! *incoming-writes* '())
;;     (mutex-unlock! *incoming-mutex*)
;;     (if (> (length data) 0)
;; 	;; Process if we have data
;; 	(begin
;; 	  (debug:print-info 7 "Writing cached data " data)
;; 	  
;; 	  ;; Prepare the needed sql statements
;; 	  ;;
;; 	  (for-each (lambda (request-item)
;; 		      (let ((stmt-key (vector-ref request-item 0))
;; 			    (query    (vector-ref request-item 1)))
;; 			(hash-table-set! queries stmt-key (sqlite3:prepare db query))))
;; 		    data)
;; 	  
;; 	  ;; No outer loop needed. Single loop for write items only. Reads trigger flush of queue
;; 	  ;; and then are executed.
;; 	  (sqlite3:with-transaction 
;; 	   db
;; 	   (lambda ()
;; 	     (for-each
;; 	      (lambda (hed)
;; 		(let* ((params   (vector-ref hed 2))
;; 		       (stmt-key (vector-ref hed 0))
;; 		       (stmt     (hash-table-ref/default queries stmt-key #f)))
;; 		  (if stmt
;; 		      (apply sqlite3:execute stmt params)
;; 		      (debug:print 0 "ERROR: Problem Executing " stmt-key " for " params))))
;; 	      data)))
;; 	  
;; 	  ;; let all the waiting calls know all is done
;; 	  (mutex-lock! *completed-mutex*)
;; 	  (for-each (lambda (item)
;; 		      (let ((qry-sig (cdb:packet-get-client-sig item)))
;; 			(debug:print-info 7 "Registering query " qry-sig " as done")
;; 			(hash-table-set! *completed-writes* qry-sig #t)))
;; 		    data)
;; 	  (mutex-unlock! *completed-mutex*)
;; 	  
;; 	  ;; Finalize the statements. Should this be done inside the mutex above?
;; 	  ;; I think sqlite3 mutexes will keep the data safe
;; 	  (for-each (lambda (stmt-key)
;; 		      (sqlite3:finalize! (hash-table-ref queries stmt-key)))
;; 		    (hash-table-keys queries))
;; 	  
;; 	  ;; Do a little record keeping
;; 	  (let ((cache-size (length data)))
;; 	    (if (> cache-size *max-cache-size*)
;; 		(set! *max-cache-size* cache-size)))
;; 	  #t)
;; 	#f)))

(define (db:process-write db request-item)
  (let ((stmt-key (vector-ref request-item 0))
	(query    (vector-ref request-item 1))
	(params   (vector-ref request-item 2))
	(queryh   (sqlite3:prepare db query)))
    (apply sqlite3:execute stmt params)
    #f))

(define *db:process-queue-mutex* (make-mutex))

(define *number-of-writes*         0)
(define *writes-total-delay*       0)
(define *total-non-write-delay*    0)
(define *number-non-write-queries* 0)
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045

2046
2047
2048
2049
2050
2051
2052
2053
2054
	(begin
	  (cond
	   ((member stmt-key db:special-queries)
	    (let ((starttime (current-milliseconds)))
	      (debug:print-info 9 "Handling special statement " stmt-key)
	      (case stmt-key
		((immediate)
		 ;; This is a read or mixed read-write query, must clear the cache
		 (case *transport-type*
		   ((http)
		    (mutex-lock! *db:process-queue-mutex*)
		    (db:process-cached-writes db)
		    (mutex-unlock! *db:process-queue-mutex*)))
		 (let* ((proc      (car params))
			(remparams (cdr params))
			;; we are being handed a procedure so call it
			;; (debug:print-info 11 "Running (apply " proc " " remparams ")")
			(result (server:reply return-address qry-sig #t (apply proc remparams))))

		   (set! *total-non-write-delay* (+ *total-non-write-delay* (- (current-milliseconds) starttime))) 
		   (set! *number-non-write-queries* (+ *number-non-write-queries* 1))
		   result))
		((login)
		 (if (< (length params) 3) ;; should get toppath, version and signature
		     (server:reply return-address qry-sig '(#f "login failed due to missing params")) ;; missing params
		     (let ((calling-path (car   params))
			   (calling-vers (cadr  params))
			   (client-key   (caddr params)))







|
<
<
<
<
<



<

>
|
|







2036
2037
2038
2039
2040
2041
2042
2043





2044
2045
2046

2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
	(begin
	  (cond
	   ((member stmt-key db:special-queries)
	    (let ((starttime (current-milliseconds)))
	      (debug:print-info 9 "Handling special statement " stmt-key)
	      (case stmt-key
		((immediate)
		 (debug:print 0 "WARNING: Immediate calls are verboten now!")





		 (let* ((proc      (car params))
			(remparams (cdr params))
			;; we are being handed a procedure so call it

			(result (server:reply return-address qry-sig #t (apply proc remparams))))
		   (debug:print-info 11 "Ran (apply " proc " " remparams ")")
		   ;; (set! *total-non-write-delay* (+ *total-non-write-delay* (- (current-milliseconds) starttime))) 
		   ;; (set! *number-non-write-queries* (+ *number-non-write-queries* 1))
		   result))
		((login)
		 (if (< (length params) 3) ;; should get toppath, version and signature
		     (server:reply return-address qry-sig '(#f "login failed due to missing params")) ;; missing params
		     (let ((calling-path (car   params))
			   (calling-vers (cadr  params))
			   (client-key   (caddr params)))

Modified http-transport.scm from [deb40a4fd3] to [40aad33287].

91
92
93
94
95
96
97

98






99
100
101
102
103
104
105
		       (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP!
    (handle-directory spiffy-directory-listing)
    ;; http-transport:handle-directory) ;; simple-directory-handler)
    ;; Setup the web server and a /ctrl interface
    ;;
    (vhost-map `(((* any) . ,(lambda (continue)
			       ;; open the db on the first call 

			       (if (not db)(set! db *inmemdb*)) ;; (open-db)))






			       (let* (($   (request-vars source: 'both))
				      (dat ($ 'dat))
				      (res #f))
				 (cond
				  ;; This is the /ctrl path where data is handed to the server and
				  ;; responses 
				  ((equal? (uri-path (request-uri (current-request)))







>
|
>
>
>
>
>
>







91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
		       (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP!
    (handle-directory spiffy-directory-listing)
    ;; http-transport:handle-directory) ;; simple-directory-handler)
    ;; Setup the web server and a /ctrl interface
    ;;
    (vhost-map `(((* any) . ,(lambda (continue)
			       ;; open the db on the first call 
			       (let loop ()
				 (if (not db)
				     (if (not (sqlite3:database? *inmemdb*))
					 (begin
					   (debug:print 0 "WARNING: db not ready yet. Waiting for it to be ready")
					   (thread-sleep! 5)
					   (loop)))
				     (set! db *inmemdb*))) ;; (open-db)))
			       (let* (($   (request-vars source: 'both))
				      (dat ($ 'dat))
				      (res #f))
				 (cond
				  ;; This is the /ctrl path where data is handed to the server and
				  ;; responses 
				  ((equal? (uri-path (request-uri (current-request)))
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
	(debug:print-info 2 "NOT starting new server, one is already running on " (vector-ref hostinfo 1) ":" (vector-ref hostinfo 2))
	(if *toppath* 
	    (let* ((th2 (make-thread (lambda ()
				       (http-transport:run 
					(if (args:get-arg "-server")
					    (args:get-arg "-server")
					    "-"))) "Server run"))
		   (th3 (make-thread http-transport:keep-running "Keep running"))
		   (th1 (make-thread server:write-queue-handler  "write queue")))
	      ;; This is were we set up the database connections
	      (set! *db* (open-db))
	      (set! *inmemdb* (open-in-mem-db))
	      (db:sync-to *db* *inmemdb*)
	      (thread-start! th2)
	      (thread-start! th3)
	      (thread-start! th1)
	      (set! *didsomething* #t)
	      (thread-join! th2))
	    (debug:print 0 "ERROR: Failed to setup for megatest")))
    (exit)))

;; (use trace)
;; (trace http-transport:keep-running 







|
|






|







379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
	(debug:print-info 2 "NOT starting new server, one is already running on " (vector-ref hostinfo 1) ":" (vector-ref hostinfo 2))
	(if *toppath* 
	    (let* ((th2 (make-thread (lambda ()
				       (http-transport:run 
					(if (args:get-arg "-server")
					    (args:get-arg "-server")
					    "-"))) "Server run"))
		   (th3 (make-thread http-transport:keep-running "Keep running")))
;;		   (th1 (make-thread server:write-queue-handler  "write queue")))
	      ;; This is were we set up the database connections
	      (set! *db* (open-db))
	      (set! *inmemdb* (open-in-mem-db))
	      (db:sync-to *db* *inmemdb*)
	      (thread-start! th2)
	      (thread-start! th3)
	      ;; (thread-start! th1)
	      (set! *didsomething* #t)
	      (thread-join! th2))
	    (debug:print 0 "ERROR: Failed to setup for megatest")))
    (exit)))

;; (use trace)
;; (trace http-transport:keep-running 

Modified server.scm from [9e4ffe8744] to [00385235c5].

65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
;;======================================================================

;; We don't want to flush the queue if it was just flushed
(define *server:last-write-flush* (current-milliseconds))

;; Flush the queue every third of a second. Can we assume that setup-for-run 
;; has already been done?
(define (server:write-queue-handler)
  (if (setup-for-run)
      (let ((db (open-db)))
	(let loop ()
	  (let ((last-write-flush-time #f))
	    (mutex-lock! *incoming-mutex*)
	    (set! last-write-flush-time *server:last-write-flush*)
	    (mutex-unlock! *incoming-mutex*)
	    (if (> (- (current-milliseconds) last-write-flush-time) 10)
		(begin
		  (mutex-lock! *db:process-queue-mutex*)
		  (db:process-cached-writes db)
		  (mutex-unlock! *db:process-queue-mutex*)
		  (thread-sleep! 0.005))))
	  (loop)))
      (begin
	(debug:print 0 "ERROR: failed to setup for Megatest in server:write-queue-handler")
	(exit 1))))
    
;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;; Generate a unique signature for this server
(define (server:mk-signature)







|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|







65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
;;======================================================================

;; We don't want to flush the queue if it was just flushed
(define *server:last-write-flush* (current-milliseconds))

;; Flush the queue every third of a second. Can we assume that setup-for-run 
;; has already been done?
;; (define (server:write-queue-handler)
;;   (if (setup-for-run)
;;       (let ((db (open-db)))
;; 	(let loop ()
;; 	  (let ((last-write-flush-time #f))
;; 	    (mutex-lock! *incoming-mutex*)
;; 	    (set! last-write-flush-time *server:last-write-flush*)
;; 	    (mutex-unlock! *incoming-mutex*)
;; 	    (if (> (- (current-milliseconds) last-write-flush-time) 10)
;; 		(begin
;; 		  (mutex-lock! *db:process-queue-mutex*)
;; 		  (db:process-cached-writes db)
;; 		  (mutex-unlock! *db:process-queue-mutex*)
;; 		  (thread-sleep! 0.005))))
;; 	  (loop)))
;;       (begin
;; 	(debug:print 0 "ERROR: failed to setup for Megatest in server:write-queue-handler")
;; 	(exit 1))))
    
;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;; Generate a unique signature for this server
(define (server:mk-signature)