Megatest

Check-in [04ba871116]
Login
Overview
Comment:Added logic to not attempt to process cached-writes if they were processed within the last 400 ms. Added stats on cached writes
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | newdashboard
Files: files | file ages | folders
SHA1: 04ba871116e284db3059e19b628ee853a0bcdbd4
User & Date: mrwellan on 2013-03-19 13:29:33
Other Links: branch diff | manifest | tags
Context
2013-03-19
14:23
Switched tests data to minimal amount needed for runs display check-in: 1b7e157405 user: mrwellan tags: newdashboard
13:29
Added logic to not attempt to process cached-writes if they were processed within the last 400 ms. Added stats on cached writes check-in: 04ba871116 user: mrwellan tags: newdashboard
11:26
Partial re-implementation (again) of transaction wrapped writes, now working except for transaction within transaction issue check-in: 69c6ef28ac user: mrwellan tags: newdashboard
Changes

Modified db.scm from [69d9359c67] to [0291a77e98].

1296
1297
1298
1299
1300
1301
1302

1303
1304
1305
1306
1307
1308
1309
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310







+








(define (db:process-cached-writes db)
  (let ((queries    (make-hash-table))
	(data       #f))
    (mutex-lock! *incoming-mutex*)
    ;; data is a list of query packets <vector qry-sig query params
    (set! data (reverse *incoming-writes*)) ;;  (sort ... (lambda (a b)(< (vector-ref a 1)(vector-ref b 1)))))
    (set! *server:last-write-flush* (current-milliseconds))
    (set! *incoming-writes* '())
    (mutex-unlock! *incoming-mutex*)
    (if (> (length data) 0)
	;; Process if we have data
	(begin
	  (debug:print-info 7 "Writing cached data " data)
    
1349
1350
1351
1352
1353
1354
1355



1356
1357
1358
1359
1360
1361
1362
1363
1364
1365

1366
1367
1368
1369
1370
1371
1372
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377







+
+
+










+







	  (let ((cache-size (length data)))
	    (if (> cache-size *max-cache-size*)
		(set! *max-cache-size* cache-size)))
	  #t)
	#f)))

(define *db:process-queue-mutex* (make-mutex))

(define *number-of-writes*   0)
(define *writes-total-delay* 0)

;; The queue is a list of vectors where the zeroth slot indicates the type of query to
;; apply and the second slot is the time of the query and the third entry is a list of 
;; values to be applied
;;
(define (db:queue-write-and-wait db qry-sig query params)
  (let ((queue-len  0)
	(res        #f)
	(got-it     #f)
	(qry-pkt    (vector qry-sig query params))
	(start-time (current-milliseconds))
	(timeout    (+ 10 (current-seconds)))) ;; set the time out to 10 secs in future

    ;; Put the item in the queue *incoming-writes* 
    (mutex-lock! *incoming-mutex*)
    (set! *incoming-writes* (cons qry-pkt *incoming-writes*))
    (set! queue-len (length *incoming-writes*))
    (mutex-unlock! *incoming-mutex*)
1383
1384
1385
1386
1387
1388
1389


1390
1391
1392
1393
1394
1395
1396
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403







+
+







	  (begin
	    (hash-table-delete! *completed-writes* qry-sig)
	    (set! got-it #t)))
      (mutex-unlock! *completed-mutex*)
      (if (and (not got-it)
	       (< (current-seconds) timeout))
	  (loop)))
    (set! *number-of-writes*   (+ *number-of-writes*   1))
    (set! *writes-total-delay* (+ *writes-total-delay* 1))
    got-it))
	  
(define (db:process-queue-item db item)
  (let* ((stmt-key       (cdb:packet-get-qtype item))
	 (qry-sig        (cdb:packet-get-query-sig item))
	 (return-address (cdb:packet-get-client-sig item))
	 (params         (cdb:packet-get-params item))

Modified http-transport.scm from [f36a48c1d7] to [9e3e9e2469].

248
249
250
251
252
253
254
255



256
257
258
259
260
261
262
248
249
250
251
252
253
254

255
256
257
258
259
260
261
262
263
264







-
+
+
+







              (loop 0))
            (begin
              (debug:print-info 0 "Starting to shutdown the server.")
              ;; need to delete only *my* server entry (future use)
              (set! *time-to-exit* #t)
              (tasks:server-deregister-self tdb (get-host-name))
              (thread-sleep! 1)
              (debug:print-info 0 "Max cached queries was " *max-cache-size*)
              (debug:print-info 0 "Max cached queries was    " *max-cache-size*)
	      (debug:print-info 0 "Number of cached writes   " *number-of-writes*)
	      (debug:print-info 0 "Average cached write time " (/ *writes-total-delay* *number-of-writes*) " ms")
              (debug:print-info 0 "Server shutdown complete. Exiting")
              (exit)))))))

;; all routes though here end in exit ...
(define (http-transport:launch)
  (if (not *toppath*)
      (if (not (setup-for-run))

Modified server.scm from [34877c239b] to [65c872df95].

57
58
59
60
61
62
63



64
65
66
67
68
69
70






71
72
73
74




75
76
77
78
79
80
81
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79




80
81
82
83
84
85
86
87
88
89
90







+
+
+







+
+
+
+
+
+
-
-
-
-
+
+
+
+







    (else
     (debug:print "WARNING: unrecognised transport " transport)
     (exit))))

;;======================================================================
;; Q U E U E   M A N A G E M E N T
;;======================================================================

;; We don't want to flush the queue if it was just flushed
(define *server:last-write-flush* (current-milliseconds))

;; Flush the queue every third of a second. Can we assume that setup-for-run 
;; has already been done?
(define (server:write-queue-handler)
  (if (setup-for-run)
      (let ((db (open-db)))
	(let loop ()
	  (let ((last-write-flush-time #f))
	    (mutex-lock! *incoming-mutex*)
	    (set! last-write-flush-time *server:last-write-flush*)
	    (mutex-unlock! *incoming-mutex*)
	    (if (> (- (current-milliseconds) last-write-flush-time) 400)
		(begin
	  (mutex-lock! *db:process-queue-mutex*)
	  (db:process-cached-writes db)
	  (mutex-unlock! *db:process-queue-mutex*)
	  (thread-sleep! 0.3)
		  (mutex-lock! *db:process-queue-mutex*)
		  (db:process-cached-writes db)
		  (mutex-unlock! *db:process-queue-mutex*)
		  (thread-sleep! 0.5))))
	  (loop)))
      (begin
	(debug:print 0 "ERROR: failed to setup for Megatest in server:write-queue-handler")
	(exit 1))))
    
;;======================================================================
;; S E R V E R   U T I L I T I E S