Megatest

Check-in [aea2d07d89]
Login
Overview
Comment:tweaks eh
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | interleaved-queries
Files: files | file ages | folders
SHA1: aea2d07d89e098ff450c43f609f3f5c422cb818e
User & Date: mrwellan on 2012-11-16 13:10:07
Other Links: branch diff | manifest | tags
Context
2012-11-17
07:48
Adding mock up of dual channel approach check-in: e1bc6c1905 user: matt tags: interleaved-queries
2012-11-16
13:10
tweaks eh check-in: aea2d07d89 user: mrwellan tags: interleaved-queries
09:45
Updated -list-servers, removed -kill-server check-in: 03a1b16c63 user: mrwellan tags: interleaved-queries
Changes

Modified common.scm from [48dba0a8c5] to [753ac43019].

45
46
47
48
49
50
51

52
53
54
55
56
57
58
59
(define *rpc:listener*      #f) ;; if set up for server communication this will hold the tcp port
(define *runremote*         #f) ;; if set up for server communication this will hold <host port>
(define *last-db-access*    (current-seconds))  ;; update when db is accessed via server
(define *max-cache-size*    0)
(define *logged-in-clients* (make-hash-table))
(define *client-non-blocking-mode* #f)
(define *server-id*         #f)

(define *time-to-exit* #f)
(define *received-response* #f)

(define *target*            (make-hash-table)) ;; cache the target here; target is keyval1/keyval2/.../keyvalN
(define *keys*              (make-hash-table)) ;; cache the keys here
(define *keyvals*           (make-hash-table))
(define *toptest-paths*     (make-hash-table)) ;; cache toptest path settings here
(define *test-paths*        (make-hash-table)) ;; cache test-id to test run paths here







>
|







45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
(define *rpc:listener*      #f) ;; if set up for server communication this will hold the tcp port
(define *runremote*         #f) ;; if set up for server communication this will hold <host port>
(define *last-db-access*    (current-seconds))  ;; update when db is accessed via server
(define *max-cache-size*    0)
(define *logged-in-clients* (make-hash-table))
(define *client-non-blocking-mode* #f)
(define *server-id*         #f)
(define *server-info*       #f)
(define *time-to-exit*      #f)
(define *received-response* #f)

(define *target*            (make-hash-table)) ;; cache the target here; target is keyval1/keyval2/.../keyvalN
(define *keys*              (make-hash-table)) ;; cache the keys here
(define *keyvals*           (make-hash-table))
(define *toptest-paths*     (make-hash-table)) ;; cache toptest path settings here
(define *test-paths*        (make-hash-table)) ;; cache test-id to test run paths here

Modified megatest.scm from [82d9a81c2b] to [b74cc84dd9].

273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
      (server:launch)))

(if (args:get-arg "-list-servers")
	;; (args:get-arg "-kill-server"))
    (let ((tl (setup-for-run)))
      (if tl 
	  (let ((servers (open-run-close tasks:get-all-servers tasks:open-db))
		(fmtstr  "~5a~8a~8a~20a~20a~10a~10a~20a~10a~10a\n")
		(servers-to-kill '()))
	    (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface" "OutPort" "InPort" "Time" "LastBeat" "State")
	    (format #t fmtstr "==" "=====" "===" "====" "=========" "=======" "======" "====" "========" "=====")
	    (for-each 
	     (lambda (server)
	       (let* (;; (killinfo   (args:get-arg "-kill-server"))
		      ;; (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f))
		      ;; (kpid       (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f))
		      (id         (vector-ref server 0))
		      (pid        (vector-ref server 1))







|

|
|







273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
      (server:launch)))

(if (args:get-arg "-list-servers")
	;; (args:get-arg "-kill-server"))
    (let ((tl (setup-for-run)))
      (if tl 
	  (let ((servers (open-run-close tasks:get-all-servers tasks:open-db))
		(fmtstr  "~5a~8a~8a~20a~20a~10a~10a~10a~10a\n")
		(servers-to-kill '()))
	    (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface" "OutPort" "InPort" "LastBeat" "State")
	    (format #t fmtstr "==" "=====" "===" "====" "=========" "=======" "======" "========" "=====")
	    (for-each 
	     (lambda (server)
	       (let* (;; (killinfo   (args:get-arg "-kill-server"))
		      ;; (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f))
		      ;; (kpid       (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f))
		      (id         (vector-ref server 0))
		      (pid        (vector-ref server 1))
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
		 ;; server
		 (if (equal? state "dead")
		     (if (> last-update (* 25 60 60)) ;; keep records around for slighly over a day.
			 (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid action: 'delete))
		     (if (> last-update 20)        ;; Mark as dead if not updated in last 20 seconds
			 (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid)))

;; 		 (if (and khost-port ;; kill by host/port
;; 			  (equal? hostname (car khost-port))
;; 			  (equal? port (string->number (cadr khost-port))))
;; 		     (tasks:kill-server status hostname port pid))
;; 
;; 		 (if (and kpid
;; 			  (equal? hostname (get-host-name))
;; 			  (equal? kpid pid)) ;;; YEP, ALL WITH PID WILL BE KILLED!!!
;; 		     (tasks:kill-server status hostname #f pid))
;; 
		 (format #t fmtstr id mt-ver pid hostname interface pullport pubport start-time last-update
			 (if status "alive" "dead"))))
	     servers)
	    (debug:print-info 1 "Done with listservers")
	    (set! *didsomething* #t)
	    (exit) ;; must do, would have to add checks to many/all calls below
	    )
	  (exit)))







<
<
<
<
<
<
<
<
<
<
|







304
305
306
307
308
309
310










311
312
313
314
315
316
317
318
		 ;; server
		 (if (equal? state "dead")
		     (if (> last-update (* 25 60 60)) ;; keep records around for slighly over a day.
			 (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid action: 'delete))
		     (if (> last-update 20)        ;; Mark as dead if not updated in last 20 seconds
			 (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid)))











		 (format #t fmtstr id mt-ver pid hostname interface pullport pubport last-update
			 (if status "alive" "dead"))))
	     servers)
	    (debug:print-info 1 "Done with listservers")
	    (set! *didsomething* #t)
	    (exit) ;; must do, would have to add checks to many/all calls below
	    )
	  (exit)))

Modified server.scm from [84bdc3ebb7] to [a650f389d3].

162
163
164
165
166
167
168










169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224

;; run server:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown










  (let loop ((count 0))
    (thread-sleep! 4) ;; no need to do this very often
    (db:write-cached-data)
    ;; (print "Server running, count is " count)
    (if (< count 1) ;; 3x3 = 9 secs aprox
	(loop (+ count 1))
	(let (;; (numrunning            (open-run-close db:get-count-tests-running #f))
	      (server-loop-heartbeat #f)
	      (server-info           #f)
	      (pulse                 0))
	  ;; BUG add a wait on server alive here!!
	  ;; ;; Ugly yuk. 
	  ;; == (mutex-lock! *heartbeat-mutex*)
	  ;; == (set! server-loop-heartbeat *server-loop-heart-beat*)
	  ;; == (set! server-info *server-info*)
	  ;; == (mutex-unlock! *heartbeat-mutex*)
	  ;; The logic here is that if the server loop gets stuck blocked in working
	  ;; we don't want to update our heartbeat
	  ;; == (set! pulse (- (current-seconds) server-loop-heartbeat))
	  ;; == (debug:print-info 2 "Heartbeat period is " pulse " seconds on " (cadr server-info) ":" (caddr server-info) ", last db access is " (- (current-seconds) *last-db-access*) " seconds ago")
	  ;; == (if (> pulse 15) ;; must stay less than 10 seconds 
	  ;; ==     (begin
	  ;; ==       (open-run-close tasks:server-deregister tasks:open-db (cadr server-info) pullport: (caddr server-info))
	  ;; ==       (debug:print 0 "ERROR: Heartbeat failed, committing servercide")
	  ;; ==       (exit))

	  ;; NOTE: Get rid of this mechanism! It really is not needed...
	  (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))

	  ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
	  (if (> (+ *last-db-access* 
		    ;; (* 48 60 60)    ;; 48 hrs
		    ;; 60              ;; one minute
		    (* 60 60)       ;; one hour
		    )
		 (current-seconds))
	      (begin
		;; (debug:print-info 2 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*))
		(debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) *last-db-access*))
		(loop 0))
	      (begin
		(debug:print-info 0 "Starting to shutdown the server.")
		;; need to delete only *my* server entry (future use)
		(set! *time-to-exit* #t)
		(open-run-close tasks:server-deregister-self tasks:open-db (get-host-name))
		(thread-sleep! 1)
		(debug:print-info 0 "Max cached queries was " *max-cache-size*)
		(debug:print-info 0 "Server shutdown complete. Exiting")
		(exit)))))))

(define (server:find-free-port-and-open iface s port stype #!key (trynum 50))
  (let ((s (if s s (make-socket stype)))
	(p (if (number? port) port 5555))
 	(old-handler (current-exception-handler)))
    (handle-exceptions
     exn







>
>
>
>
>
>
>
>
>
>
|
|
|
|
|
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
|
|
|
|
|
|
|
|
|
|
|
<
|
|
|
|
|
|
|
|
|
|
|







162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184



















185
186
187
188
189
190
191
192
193
194
195
196

197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214

;; run server:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let ((server-info (let loop ()
		       (let ((sdat #f))
			 (mutex-lock! *heartbeat-mutex*)
			 (set! sdat *server-info*)
			 (mutex-unlock! *heartbeat-mutex*)
			 (if sdat sdat
			     (begin
			       (sleep 4)
			       (loop)))))))
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      (db:write-cached-data)
      ;; (print "Server running, count is " count)
      (if (< count 1) ;; 3x3 = 9 secs aprox
	  (loop (+ count 1)))



















	    
      ;; NOTE: Get rid of this mechanism! It really is not needed...
      (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))
      
      ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
      (if (> (+ *last-db-access* 
		;; (* 48 60 60)    ;; 48 hrs
		;; 60              ;; one minute
		(* 60 60)       ;; one hour
		)
	     (current-seconds))
	  (begin

	    (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) *last-db-access*))
	    (loop 0))
	  (begin
	    (debug:print-info 0 "Starting to shutdown the server.")
	    ;; need to delete only *my* server entry (future use)
	    (set! *time-to-exit* #t)
	    (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name))
	    (thread-sleep! 1)
	    (debug:print-info 0 "Max cached queries was " *max-cache-size*)
	    (debug:print-info 0 "Server shutdown complete. Exiting")
	    (exit))))))

(define (server:find-free-port-and-open iface s port stype #!key (trynum 50))
  (let ((s (if s s (make-socket stype)))
	(p (if (number? port) port 5555))
 	(old-handler (current-exception-handler)))
    (handle-exceptions
     exn
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
  (let ((hostinfo   (open-run-close tasks:get-best-server tasks:open-db)))
    (if hostinfo
	(let ((host     (list-ref hostinfo 0))
	      (iface    (list-ref hostinfo 1))
	      (pullport (list-ref hostinfo 2))
	      (pubport  (list-ref hostinfo 3)))
	  (debug:print-info 2 "Setting up to connect to " hostinfo)
	  ;;(handle-exceptions
	  ;;  exn
	   (begin
	     ;; something went wrong in connecting to the server. In this scenario it is ok
	     ;; to try again
	     (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo)
	     (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
	     (debug:print 0 "   perhaps jobs killed with -9? Removing server records")
	     (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport)
	     (server:client-setup (- numtries 1))
	     #f)
	   (server:client-connect iface pullport pubport)))))
	;; (if (> numtries 0)
	;;     (let ((exe (car (argv))))
	;;       (debug:print-info 1 "No server available, attempting to start one...")
	;;       (process-run exe (list "-server" "-" "-debug" (conc *verbosity*)))
	;;       (sleep 5) ;; give server time to start
	;;       ;; we are starting a server, do not try again! That can lead to 
	;;       ;; recursively starting many processes!!!
	;;       (server:client-setup numtries: 0))
	;;     (debug:print-info 1 "Too many attempts, giving up")))))

;; all routes though here end in exit ...
(define (server:launch)
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, exiting")







|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|







307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
  (let ((hostinfo   (open-run-close tasks:get-best-server tasks:open-db)))
    (if hostinfo
	(let ((host     (list-ref hostinfo 0))
	      (iface    (list-ref hostinfo 1))
	      (pullport (list-ref hostinfo 2))
	      (pubport  (list-ref hostinfo 3)))
	  (debug:print-info 2 "Setting up to connect to " hostinfo)
	  ;; (handle-exceptions
	  ;;   exn
	  ;;  (begin
	  ;;    ;; something went wrong in connecting to the server. In this scenario it is ok
	  ;;    ;; to try again
	  ;;    (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo)
	  ;;    (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
	  ;;    (debug:print 0 "   perhaps jobs killed with -9? Removing server records")
	  ;;    (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport)
	  ;;    (server:client-setup (- numtries 1))
	  ;;    #f)
	   (server:client-connect iface pullport pubport)) ;; )
	(if (> numtries 0)
	    (let ((exe (car (argv))))
	      (debug:print-info 1 "No server available, attempting to start one...")
	      (process-run exe (list "-server" "-" "-debug" (conc *verbosity*)))
	      (sleep 5) ;; give server time to start
	      ;; we are starting a server, do not try again! That can lead to 
	      ;; recursively starting many processes!!!
	      (server:client-setup numtries: 0))
	    (debug:print-info 1 "Too many attempts, giving up")))))

;; all routes though here end in exit ...
(define (server:launch)
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, exiting")
405
406
407
408
409
410
411

412
413
414
415
416
417
418
   (if (server:client-setup)
       (debug:print-info 2 "connected as client")
       (begin
	 (debug:print 0 "ERROR: Failed to connect as client")
	 (exit))))

;; ping a server and return number of clients or #f (if no response)

(define (server:ping host port #!key (secs 10)(return-socket #f))
  (cdb:use-non-blocking-mode
   (lambda ()
     (let* ((res #f)
	    (th1 (make-thread
		  (lambda ()
		    (let* ((zmq-context (make-context 1))







>







395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
   (if (server:client-setup)
       (debug:print-info 2 "connected as client")
       (begin
	 (debug:print 0 "ERROR: Failed to connect as client")
	 (exit))))

;; ping a server and return number of clients or #f (if no response)
;; NOT IN USE!
(define (server:ping host port #!key (secs 10)(return-socket #f))
  (cdb:use-non-blocking-mode
   (lambda ()
     (let* ((res #f)
	    (th1 (make-thread
		  (lambda ()
		    (let* ((zmq-context (make-context 1))

Modified tasks.scm from [eee4b54298] to [acea04adee].

104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
  (debug:print-info 11 "server-deregister " hostname ", pullport " pullport ", pid " pid)
  (if pid
      (case action
	((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid))
	(else    (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid)))
      (if pullport
	  (case action
	    ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE  hostname=? AND port=?;" hostname port))
	    (else    (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND pullport=?;" hostname pullport)))
	  (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified"))))

(define (tasks:server-deregister-self mdb hostname)
  (tasks:server-deregister mdb hostname pid: (current-process-id)))

(define (tasks:server-get-server-id mdb hostname pullport pid)







|







104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
  (debug:print-info 11 "server-deregister " hostname ", pullport " pullport ", pid " pid)
  (if pid
      (case action
	((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid))
	(else    (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid)))
      (if pullport
	  (case action
	    ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE  hostname=? AND pullport=?;" hostname port))
	    (else    (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND pullport=?;" hostname pullport)))
	  (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified"))))

(define (tasks:server-deregister-self mdb hostname)
  (tasks:server-deregister mdb hostname pid: (current-process-id)))

(define (tasks:server-get-server-id mdb hostname pullport pid)
173
174
175
176
177
178
179
180
181

182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
  (let ((res '())
	(best #f))
    (sqlite3:for-each-row
     (lambda (id hostname interface pullport pubport pid)
       (set! res (cons (list hostname interface pullport pubport pid) res))
       (debug:print-info 2 "Found existing server " hostname ":" pullport " registered in db"))
     mdb
     "SELECT id,hostname,interface,pullport,pubport,pid FROM servers WHERE state='live' AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version)
    ;; (print "res=" res)

    (if (null? res) #f
	(let loop ((hed (car res))
		   (tal (cdr res)))
	  ;; (print "hed=" hed ", tal=" tal)
	  (let* ((host     (list-ref hed 0))
		 (iface    (list-ref hed 1))
		 (pullport (list-ref hed 2))
		 (pubport  (list-ref hed 3))
		 (pid      (list-ref hed 4))
		 (alive    (open-run-close tasks:server-alive? tasks:open-db #f hostname: host pullport: pullport)))
	    (if alive
		(begin
		  (debug:print-info 2 "Found an existing, alive, server " host ", " pullport " and " pubport ".")
		  (list host iface pullport pubport))
		(begin
		  (debug:print-info 1 "Marking " host ":" pullport " as dead in server registry.")
		  (if port
		      (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport)
		      (open-run-close tasks:server-deregister tasks:open-db host pid:  pid))
		  (if (null? tal)
		      #f
		      (loop (car tal)(cdr tal))))))))))

(define (tasks:mark-server hostname pullport pid state)







|
|
>
















|







173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
  (let ((res '())
	(best #f))
    (sqlite3:for-each-row
     (lambda (id hostname interface pullport pubport pid)
       (set! res (cons (list hostname interface pullport pubport pid) res))
       (debug:print-info 2 "Found existing server " hostname ":" pullport " registered in db"))
     mdb
     "SELECT id,hostname,interface,pullport,pubport,pid FROM servers
         WHERE strftime('%s','now')-heartbeat < 10
               AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version)
    (if (null? res) #f
	(let loop ((hed (car res))
		   (tal (cdr res)))
	  ;; (print "hed=" hed ", tal=" tal)
	  (let* ((host     (list-ref hed 0))
		 (iface    (list-ref hed 1))
		 (pullport (list-ref hed 2))
		 (pubport  (list-ref hed 3))
		 (pid      (list-ref hed 4))
		 (alive    (open-run-close tasks:server-alive? tasks:open-db #f hostname: host pullport: pullport)))
	    (if alive
		(begin
		  (debug:print-info 2 "Found an existing, alive, server " host ", " pullport " and " pubport ".")
		  (list host iface pullport pubport))
		(begin
		  (debug:print-info 1 "Marking " host ":" pullport " as dead in server registry.")
		  (if pullport
		      (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport)
		      (open-run-close tasks:server-deregister tasks:open-db host pid:  pid))
		  (if (null? tal)
		      #f
		      (loop (car tal)(cdr tal))))))))))

(define (tasks:mark-server hostname pullport pid state)