Megatest

Check-in [0cb9ad87a9]
Login
Overview
Comment:server, list-runs and repl now working
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | interleaved-queries
Files: files | file ages | folders
SHA1: 0cb9ad87a9541cfdda35e493176378187ed6d869
User & Date: matt on 2012-11-19 01:55:53
Other Links: branch diff | manifest | tags
Context
2012-11-19
13:04
Tweaked for testing, all calls immediate check-in: 6ac20061e7 user: mrwellan tags: interleaved-queries
01:55
server, list-runs and repl now working check-in: 0cb9ad87a9 user: matt tags: interleaved-queries
2012-11-18
23:30
Initial coding for interleaved queries done and compiles check-in: b85732a36a user: matt tags: interleaved-queries
Changes

Modified db.scm from [c306724289] to [8d3f7767b5].

1107
1108
1109
1110
1111
1112
1113
1114

1115
1116
1117
1118
1119



1120
1121

1122
1123
1124
1125
1126
1127
1128
1129
1130








1131
1132
1133
1134
1135
1136
1137
1138
1139
1140

1141
1142
1143
1144
1145


1146
1147
1148
1149
1150
1151
1152
1107
1108
1109
1110
1111
1112
1113

1114
1115
1116



1117
1118
1119
1120
1121
1122
1123








1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146

1147
1148
1149
1150
1151
1152
1153
1154
1155







-
+


-
-
-
+
+
+


+

-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+










+




-
+
+







    res))
  
;; params = 'target cached remparams
;;
;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime
;;
(define (cdb:client-call zmq-sockets qtype immediate numretries . params)
  (debug:print-info 11 "cdb:client-call zmq-sockets=" zmq-sockets " params=" params)
  (debug:print-info 11 "cdb:client-call zmq-sockets=" zmq-sockets ", qtype=" qtype ", immediate=" immediate ", numretries=" numretries ", params=" params)
  (let* ((push-socket (vector-ref zmq-sockets 0))
	 (sub-socket  (vector-ref zmq-sockets 1))
	 (client-sig   (server:get-client-signature))
	 (query-sig    (message-digest-string (md5-primitive) (conc qtype immediate params)))
	 (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params))))
	 (client-sig  (server:get-client-signature))
	 (query-sig   (message-digest-string (md5-primitive) (conc qtype immediate params)))
	 (zdat        (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params))))
	 (res  #f)
	 (send-receive (lambda ()
			 (debug:print-info 11 "sending message")
			 (send-message push-socket zdat)
			 (db:string->obj
			  (let ((rmsg (if *client-non-blocking-mode* receive-message* receive-message)))
			    ;; get the sender info
			    ;; this should match (server:get-client-signature)
			    ;; we will need to process "all" messages here some day
			    (rmsg sub-socket)
			    ;; now get the actual message
			    (rmsg  sub-socket)))))
			 (debug:print-info 11 "message sent")
			 (let ((rmsg receive-message*)) ;; (if *client-non-blocking-mode* receive-message* receive-message)))
			   ;; get the sender info
			   ;; this should match (server:get-client-signature)
			   ;; we will need to process "all" messages here some day
			   (rmsg sub-socket)
			   ;; now get the actual message
			   (set! res (db:string->obj (rmsg  sub-socket))))))
	 (timeout (lambda ()
		    (thread-sleep! 5)
		    (if (not res)
			(if (> numretries 0)
			    (begin
			      (debug:print 0 "WARNING: no reply to query " params ", trying again")
			      (apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params))
			    (begin
			      (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.")
			      (exit 5)))))))
    (debug:print-info 11 "Starting threads")
    (let ((th1 (make-thread send-receive "send receive"))
	  (th2 (make-thread timeout      "timeout")))
      (thread-start! th1)
      (thread-start! th2)
      (thread-join! th1)
      (thread-join!  th1)
      (debug:print-info 11 "cdb:client-call returning res=" res)
      res)))
  
(define (cdb:set-verbosity zmq-socket val)
  (cdb:client-call zmq-socket 'set-verbosity #f *default-numtries* val))

(define (cdb:login zmq-sockets keyval signature)
  (cdb:client-call zmq-sockets 'login #t *default-numtries* keyval megatest-version signature))
1226
1227
1228
1229
1230
1231
1232
1233


1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254



1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274






1275
1276
1277
1278



1279
1280
1281
1282
1283



















1284
1285
1286
1287
1288
1289
1290
1229
1230
1231
1232
1233
1234
1235

1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256


1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274





1275
1276
1277
1278
1279
1280




1281
1282
1283





1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309







-
+
+



















-
-
+
+
+















-
-
-
-
-
+
+
+
+
+
+
-
-
-
-
+
+
+
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







	'(test-set-rundir         "UPDATE tests SET rundir=? WHERE run_id=? AND testname=? AND item_path=?;")
	'(delete-tests-in-state   "DELETE FROM tests WHERE state=? AND run_id=?;")
	'(tests:test-set-toplog    "UPDATE tests SET final_logf=? WHERE run_id=? AND testname=? AND item_path='';")
    ))

;; do not run these as part of the transaction
(define db:special-queries   '(rollup-tests-pass-fail
			       db:roll-up-pass-fail-counts))
			       db:roll-up-pass-fail-counts
                               login))

;; not used, intended to indicate to run in calling process
(define db:run-local-queries '()) ;; rollup-tests-pass-fail))

;; The queue is a list of vectors where the zeroth slot indicates the type of query to
;; apply and the second slot is the time of the query and the third entry is a list of 
;; values to be applied
;;
(define (db:process-queue pubsock indata)
  (open-run-close
   (lambda (db . junkparams)
     (let ((queries    (make-hash-table))
	   (data       (sort indata (lambda (a b)
				      (< (cdb:packet-get-qtime a)(cdb:packet-get-qtime b))))))
       (if (> (length data) 0)
	   (debug:print-info 4 "Writing cached data " data))

       ;; prepare the needed statements, do each only once
       (for-each (lambda (request-item)
		   (let ((stmt-key (cdb:get-qtype request-item)))
		     (if (not (hash-table-ref/default queries stmt-key #f))
		   (let ((stmt-key (cdb:packet-get-qtype request-item)))
		     (if (and (not (hash-table-ref/default queries stmt-key #f))
			      (not (member stmt-key db:special-queries)))
			 (let ((stmt (alist-ref stmt-key db:queries)))
			   (if stmt
			       (hash-table-set! queries stmt-key (sqlite3:prepare db (car stmt)))
			       (if (procedure? stmt-key)
				   (hash-table-set! queries stmt-key #f)
				   (debug:print 0 "ERROR: Missing query spec for " stmt-key "!")))))))
		 data)
       
       ;; outer loop to handle special queries that cannot be handled in the
       ;; transaction.
       (let outerloop ((special-qry #f)
		       (stmts       data))
	 (if special-qry

	     ;; handle a query that cannot be part of the grouped queries
	     (let* ((stmt-key       (cdb:get-qtype special-qry))
		    (return-address (cdb:get-client-sig special-qry))
		    (qry            (hash-table-ref queries stmt-key))
		    (params         (cdb:get-params special-qry)))
	       (if (string? qry)
	     (let* ((stmt-key       (cdb:packet-get-qtype special-qry))
		    (return-address (cdb:packet-get-client-sig special-qry))
		    (qry            (hash-table-ref/default queries stmt-key #f))
		    (params         (cdb:packet-get-params special-qry)))
	       (cond
		((string? qry)
		   (begin
		     (apply sqlite3:execute db qry params)
		     (server:reply return-address #t))
		   (if (procedure? stmt-key)
		 (apply sqlite3:execute db qry params)
		 (server:reply pubsock return-address #t))
		((procedure? stmt-key)
		       (begin
			 ;; we are being handed a procedure so call it
			 (debug:print-info 11 "Running (apply " stmt-key " " db " " params ")")
			 (server:reply return-address (apply stmt-key db params)))
		       (debug:print 0 "ERROR: Unrecognised queued call " qry " " params)))
		 ;; we are being handed a procedure so call it
		 (debug:print-info 11 "Running (apply " stmt-key " " db " " params ")")
		 (server:reply pubsock return-address (apply stmt-key db params)))
		(else 
		 (case stmt-key
		   ((login)
		    (if (< (length params) 3) ;; should get toppath, version and signature
			'(#f "login failed due to missing params") ;; missing params
			(let ((calling-path (car   params))
			      (calling-vers (cadr  params))
			      (client-key   (caddr params)))
			  (if (and (equal? calling-path *toppath*)
				   (equal? megatest-version calling-vers))
			      (begin
				(hash-table-set! *logged-in-clients* client-key (current-seconds))
				(server:reply  pubsock return-address '(#t "successful login")))      ;; path matches - pass! Should vet the caller at this time ...
			      (list #f (conc "Login failed due to mismatch paths: " calling-path ", " *toppath*))))))
		   (else
		    (debug:print 0 "ERROR: Unrecognised queued call " qry " " params)))))
	       (if (not (null? stmts))
		   (outerloop #f stmts)))

	     ;; handle normal queries
	     (let ((rem (sqlite3:with-transaction 
			 db
			 (lambda ()
1300
1301
1302
1303
1304
1305
1306
1307

1308
1309
1310
1311
1312
1313
1314
1319
1320
1321
1322
1323
1324
1325

1326
1327
1328
1329
1330
1331
1332
1333







-
+







					   (member stmt-key db:special-queries))
				       (begin
					 (debug:print-info 11 "Handling special statement " stmt-key)
					 (cons hed tal))
				       (begin
					 (debug:print-info 11 "Executing " stmt-key " for " params)
					 (apply sqlite3:execute (hash-table-ref queries stmt-key) params)
					 (server:reply return-address #t)
					 (server:reply pubsock return-address #t)
					 (if (not (null? tal))
					     (innerloop (car tal)(cdr tal))
					     '()))
				       ))))))))
	       (if (not (null? rem))
		   (outerloop (car rem)(cdr rem))))))
       (for-each (lambda (stmt-key)

Modified server.scm from [67b3f4e15b] to [a18aca67cf].

134
135
136
137
138
139
140
141

142
143
144
145
146
147

148
149

150
151

152
153

154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174

175
176
177
178

179
180
181
182
183
184
185
134
135
136
137
138
139
140

141
142
143
144
145
146

147
148

149
150
151
152
153

154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174

175
176
177
178

179
180
181
182
183
184
185
186







-
+





-
+

-
+


+

-
+




















-
+



-
+







			     (loop))))))))

    ;; The heavy lifting
    ;;
    ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime
    ;;
    (let loop ((queue-lst '()))
      ;; (print "GOT HERE EH?")
      (print "GOT HERE EH?")
      (let* ((rawmsg (receive-message* pull-socket))
	     (packet (db:string->obj rawmsg)))
	(debug:print-info 12 "server=> received packet=" packet)
	(if (cdb:packet-get-immediate packet) ;; process immediately or put in queue
	    (begin
	      (db:process-queue pubsock (cons packet queue))
	      (db:process-queue pub-socket (cons packet queue-lst))
	      (loop '()))
	    (loop (cons packet queue)))))))
	    (loop (cons packet queue-lst)))))))

(define (server:reply pubsock target result)
  (debug:print-info 11 "server:reply target=" target ", result=" result)
  (send-message pubsock target send-more: #t)
  (send-message pubsock result))
  (send-message pubsock (db:obj->string result)))

;; run server:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
			(let ((sdat #f))
			  (mutex-lock! *heartbeat-mutex*)
			  (set! sdat *server-info*)
			  (mutex-unlock! *heartbeat-mutex*)
			  (if sdat sdat
			      (begin
				(sleep 4)
				(loop))))))
	 (iface       (cadr server-info))
	 (pullport    (caddr server-info))
	 (pubport     (cadddr server-info)) ;; id interface pullport pubport)
	 ;; (zmq-sockets (server:client-connect iface pullport pubport)))
	 ;; (zmq-sockets (server:client-connect iface pullport pubport))
	 )
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      ;;  (let ((queue-len (string->number (cdb:client-call zmq-sockets 'sync #t 1))))
      ;; (let ((queue-len (string->number (cdb:client-call zmq-sockets 'sync #t 1))))
      ;; (print "Server running, count is " count)
      (if (< count 1) ;; 3x3 = 9 secs aprox
	  (loop (+ count 1)))
      
      ;; NOTE: Get rid of this mechanism! It really is not needed...
      (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))
      
221
222
223
224
225
226
227
228

229
230

231
232
233
234
235
236
237
222
223
224
225
226
227
228

229
230

231
232
233
234
235
236
237
238







-
+

-
+







       (exit)) ;; To exit or not? That is the question.
     (let ((zmq-url (conc "tcp://" iface ":" p)))
       (debug:print 0 "Trying to start server on " zmq-url)
       (bind-socket s zmq-url)
       (list iface s port)))))

(define (server:setup-ports ipaddrstr startport)
  (let* ((s1 (server:find-free-port-and-open ipaddrstr #f startport 'pub))
  (let* ((s1 (server:find-free-port-and-open ipaddrstr #f startport 'pull))
	 (p1 (caddr s1))
	 (s2 (server:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pull))
	 (s2 (server:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub))
	 (p2 (caddr s2)))
    (set! *runremote* #f)
    (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2)
    (mutex-lock! *heartbeat-mutex*)
    (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live))
    (mutex-unlock! *heartbeat-mutex*)
    (list s1 s2)))