Megatest

Check-in [5f757480e6]
Login
Overview
Comment:Added interface to the monitor db and appropriate handling thereof.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk | v1.506
Files: files | file ages | folders
SHA1: 5f757480e6441e210f1f71d1a87ddbd9af482886
User & Date: mrwellan on 2012-11-02 17:36:08
Other Links: manifest | tags
Context
2012-11-02
18:33
borked server heartbeat logic check-in: ece909ab1c user: mrwellan tags: trunk
17:36
Added interface to the monitor db and appropriate handling thereof. check-in: 5f757480e6 user: mrwellan tags: trunk, v1.506
13:19
Made repl use non-blocking client mode check-in: 50f33a00a7 user: mrwellan tags: trunk, v1.5105
Changes

Modified db.scm from [2a2b5ea15a] to [0f75487810].

231
232
233
234
235
236
237

238
239
240
241
242
243
244
245
;;======================================================================
;; T E S T   S P E C I F I C   D B 
;;======================================================================

;; Create the sqlite db for the individual test(s)
(define (open-test-db testpath) 
  (debug:print-info 11 "open-test-db " testpath)

  (if (and (directory? testpath)
	   (file-read-access? testpath))
      (let* ((dbpath    (conc testpath "/testdat.db"))
	     (dbexists  (file-exists? dbpath))
	     (db        (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath))
	     (handler   (make-busy-timeout (if (args:get-arg "-override-timeout")
					       (string->number (args:get-arg "-override-timeout"))
					       136000))))







>
|







231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
;;======================================================================
;; T E S T   S P E C I F I C   D B 
;;======================================================================

;; Create the sqlite db for the individual test(s)
(define (open-test-db testpath) 
  (debug:print-info 11 "open-test-db " testpath)
  (if (and testpath 
	   (directory? testpath)
	   (file-read-access? testpath))
      (let* ((dbpath    (conc testpath "/testdat.db"))
	     (dbexists  (file-exists? dbpath))
	     (db        (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath))
	     (handler   (make-busy-timeout (if (args:get-arg "-override-timeout")
					       (string->number (args:get-arg "-override-timeout"))
					       136000))))

Modified megatest-version.scm from [0ac622c7df] to [4bf7ad110a].

1
2
3
4
5
6
7
;; Always use two digit decimal
;; 1.01, 1.02...1.10,1.11 ... 1.99,2.00..

(declare (unit megatest-version))

(define megatest-version 1.5105)






|

1
2
3
4
5
6
7
;; Always use two digit decimal
;; 1.01, 1.02...1.10,1.11 ... 1.99,2.00..

(declare (unit megatest-version))

(define megatest-version 1.5106)

Modified megatest.scm from [9c15c3f066] to [1433ed4462].

270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288

289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306


307
308
309
310
311
312
313
314
315
316
317



318
319
320
321
322
323

324
325

326
327
328
329
330
331
332
      (server:launch)))

(if (or (args:get-arg "-listservers")
	(args:get-arg "-killserver"))
    (let ((tl (setup-for-run)))
      (if tl 
	  (let ((servers (open-run-close tasks:get-all-servers tasks:open-db))
		(fmtstr  "~5a~8a~8a~20a~5a~20a~9a~20a\n")
		(servers-to-kill '()))
	    (format #t fmtstr "Id" "MTver" "Pid" "Host" "Port" "Time" "Priority" "State")
	    (format #t fmtstr "==" "=====" "===" "====" "====" "====" "========" "=====")
	    (for-each 
	     (lambda (server)
	       (let* ((killinfo   (args:get-arg "-killserver"))
		      (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f))
		      (kpid       (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f))
		      (id         (vector-ref server 0))
		      (pid        (vector-ref server 1))
		      (hostname   (vector-ref server 2))

		      (port       (vector-ref server 3))
		      (start-time (vector-ref server 4))
		      (priority   (vector-ref server 5))
		      (state      (vector-ref server 6))
		      (mt-ver     (vector-ref server 7))
		      (status     (open-run-close tasks:server-alive? tasks:open-db hostname port: port))
		      (killed     #f)
		      (zmq-socket (if status (server:client-connect hostname port) #f)))
		 ;; no need to login as status of #t indicates we are connecting to correct 
		 ;; server
		 (if (or (not status)    ;; no point in keeping dead records in the db
			 (and khost-port ;; kill by host/port
			      (equal? hostname (car khost-port))
			      (equal? port (string->number (cadr khost-port)))))
		     (begin
		       (open-run-close tasks:server-deregister tasks:open-db  hostname port: port)
		       (if status ;; #t means alive
			   (begin


			     (cdb:kill-server zmq-socket)
			     (debug:print-info 1 "Killed server by host:port at " hostname ":" port))
			   (debug:print-info 1 "Removing defunct server record for " hostname ":" port))
		       (set! killed #t)))
		 (if (and kpid
			  ;; (equal? hostname (car khost-port))
			  (equal? kpid pid)) ;;; YEP, ALL WITH PID WILL BE KILLED!!!
		     (begin
		       (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid)
		       (set! killed #t)
		       (if status (cdb:kill-server zmq-socket))



		       (debug:print-info 1 "Killed server by pid at " hostname ":" port)))
		 ;; (if zmq-socket (close-socket  zmq-socket))
		 (format #t fmtstr id mt-ver pid hostname port start-time priority 
			 status)))
	     servers)
	    (debug:print-info 1 "Done with listservers")

	    (exit) ;; must do, would have to add checks to many/all calls below
	    (set! *didsomething* #t))

	  (exit)))
    ;; if not list or kill then start a client (if appropriate)
    (if (or (args-defined? "-h" "-version" "-gen-megatest-area" "-gen-megatest-test")
	    (eq? (length (hash-table-keys args:arg-hash)) 0))
	(debug:print-info 1 "Server connection not needed")
	(server:client-launch do-ping: #t)))








|

|
|








>
|
|
|
|
|













>
>
|









|
>
>
>


|
|


>

<
>







270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331

332
333
334
335
336
337
338
339
      (server:launch)))

(if (or (args:get-arg "-listservers")
	(args:get-arg "-killserver"))
    (let ((tl (setup-for-run)))
      (if tl 
	  (let ((servers (open-run-close tasks:get-all-servers tasks:open-db))
		(fmtstr  "~5a~8a~8a~20a~20a~10a~20a~10a~10a\n")
		(servers-to-kill '()))
	    (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface" "Port" "Time" "Priority" "State")
	    (format #t fmtstr "==" "=====" "===" "====" "=========" "====" "====" "========" "=====")
	    (for-each 
	     (lambda (server)
	       (let* ((killinfo   (args:get-arg "-killserver"))
		      (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f))
		      (kpid       (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f))
		      (id         (vector-ref server 0))
		      (pid        (vector-ref server 1))
		      (hostname   (vector-ref server 2))
		      (interface  (vector-ref server 3))
		      (port       (vector-ref server 4))
		      (start-time (vector-ref server 5))
		      (priority   (vector-ref server 6))
		      (state      (vector-ref server 7))
		      (mt-ver     (vector-ref server 8))
		      (status     (open-run-close tasks:server-alive? tasks:open-db hostname port: port))
		      (killed     #f)
		      (zmq-socket (if status (server:client-connect hostname port) #f)))
		 ;; no need to login as status of #t indicates we are connecting to correct 
		 ;; server
		 (if (or (not status)    ;; no point in keeping dead records in the db
			 (and khost-port ;; kill by host/port
			      (equal? hostname (car khost-port))
			      (equal? port (string->number (cadr khost-port)))))
		     (begin
		       (open-run-close tasks:server-deregister tasks:open-db  hostname port: port)
		       (if status ;; #t means alive
			   (begin
			     (if (equal? hostname (get-host-name))
				 (process-signal pid signal/term)
				 (cdb:kill-server zmq-socket))
			     (debug:print-info 1 "Killed server by host:port at " hostname ":" port))
			   (debug:print-info 1 "Removing defunct server record for " hostname ":" port))
		       (set! killed #t)))
		 (if (and kpid
			  ;; (equal? hostname (car khost-port))
			  (equal? kpid pid)) ;;; YEP, ALL WITH PID WILL BE KILLED!!!
		     (begin
		       (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid)
		       (set! killed #t)
		       (if status 
			   (if (equal? hostname (get-host-name))
			       (process-signal pid signal/term)
			       (debug:print 0 "WARNING: Can't kill a dead server on host " hostname)))
		       (debug:print-info 1 "Killed server by pid at " hostname ":" port)))
		 ;; (if zmq-socket (close-socket  zmq-socket))
		 (format #t fmtstr id mt-ver pid hostname interface port start-time priority 
			 (if status "alive" "dead"))))
	     servers)
	    (debug:print-info 1 "Done with listservers")
	    (set! *didsomething* #t)
	    (exit) ;; must do, would have to add checks to many/all calls below

	    )
	  (exit)))
    ;; if not list or kill then start a client (if appropriate)
    (if (or (args-defined? "-h" "-version" "-gen-megatest-area" "-gen-megatest-test")
	    (eq? (length (hash-table-keys args:arg-hash)) 0))
	(debug:print-info 1 "Server connection not needed")
	(server:client-launch do-ping: #t)))

Modified server.scm from [59c1e6d986] to [000964c6b9].

33
34
35
36
37
38
39
40
41
42

43
44
45
46

47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
  (debug:print 0 "Attempting to start the server ...")
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
	    (exit))))
  (let* ((zmq-socket     #f)
	 (hostname       (if (string=? "-" hostn)
			     (get-host-name) 
			     hostn))

	 (ipaddrstr      (let ((ipstr (if (string=? "-" hostn)
					  (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					  #f)))
			   (if ipstr ipstr hostname))))

    (set! zmq-socket (server:find-free-port-and-open ipaddrstr zmq-socket 5555 0))
    (set! *cache-on* #t)
    
    ;; what to do when we quit
    ;;
    (on-exit (lambda ()
	       (if (and *toppath* *server-id*)
		   (begin
		     (open-run-close tasks:server-deregister-self tasks:open-db #f))
		   (let loop () 
		     (let ((queue-len 0))
		       (thread-sleep! (random 5))
		       (mutex-lock! *incoming-mutex*)
		       (set! queue-len (length *incoming-data*))
		       (mutex-unlock! *incoming-mutex*)
		       (if (> queue-len 0)







|
|

>




>








|







33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
  (debug:print 0 "Attempting to start the server ...")
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
	    (exit))))
  (let* ((zmq-socket     #f)
	 (iface          (if (string=? "-" hostn)
			     "*" ;; (get-host-name) 
			     hostn))
	 (hostname       (get-host-name))
	 (ipaddrstr      (let ((ipstr (if (string=? "-" hostn)
					  (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					  #f)))
			   (if ipstr ipstr hostname))))
    ;; (set! zmq-socket (server:find-free-port-and-open iface zmq-socket 5555 0))
    (set! zmq-socket (server:find-free-port-and-open ipaddrstr zmq-socket 5555 0))
    (set! *cache-on* #t)
    
    ;; what to do when we quit
    ;;
    (on-exit (lambda ()
	       (if (and *toppath* *server-id*)
		   (begin
		     (open-run-close tasks:server-deregister-self tasks:open-db ipaddrstr))
		   (let loop () 
		     (let ((queue-len 0))
		       (thread-sleep! (random 5))
		       (mutex-lock! *incoming-mutex*)
		       (set! queue-len (length *incoming-data*))
		       (mutex-unlock! *incoming-mutex*)
		       (if (> queue-len 0)
107
108
109
110
111
112
113
114
115
116

117
118
119
120
121


122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
		(set! *time-to-exit* #t)
		(open-run-close tasks:server-deregister-self tasks:open-db)
		(thread-sleep! 1)
		(debug:print-info 0 "Max cached queries was " *max-cache-size*)
		(debug:print-info 0 "Server shutdown complete. Exiting")
		(exit)))))))

(define (server:find-free-port-and-open host s port #!key (trynum 50))
  (let ((s (if s s (make-socket 'rep)))
	(p (if (number? port) port 5555)))

    (handle-exceptions
     exn
     (begin
       (debug:print 0 "Failed to bind to port " p ", trying next port")
       (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))


       (if (> trynum 0)
	   (server:find-free-port-and-open host s (+ p 1) trynum: (- trynum 1))
	   (debug:print-info 0 "Tried ports from " (- p trynum) " to " p 
			     " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use")))
     (let ((zmq-url (conc "tcp://" host ":" p)))
       (print "Trying to start server on " zmq-url)
       (bind-socket s zmq-url)
       (set! *runremote* #f)
       (debug:print 0 "Server started on " zmq-url)
       (set! *server-id* (open-run-close tasks:server-register tasks:open-db (current-process-id) host p 0 'live))
       s))))

(define (server:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))

(define (server:get-client-signature)
  (if *my-client-signature* *my-client-signature*
      (let ((sig (server:mk-signature)))
	(set! *my-client-signature* sig)
	*my-client-signature*)))

;; 
(define (server:client-connect host port #!key (context #f))
  (debug:print 3 "client-connect " host ":" port)
  (let ((connect-ok #f)
	(zmq-socket (if context 
			(make-socket 'req context)
			(make-socket 'req)))
	(conurl     (server:make-server-url (list host port))))
    (if (socket? zmq-socket)
	(begin
	  (connect-socket zmq-socket conurl)
	  zmq-socket)
	#f)))
  








|

|
>





>
>

|
|

|




|
















|
|




|







109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
		(set! *time-to-exit* #t)
		(open-run-close tasks:server-deregister-self tasks:open-db)
		(thread-sleep! 1)
		(debug:print-info 0 "Max cached queries was " *max-cache-size*)
		(debug:print-info 0 "Server shutdown complete. Exiting")
		(exit)))))))

(define (server:find-free-port-and-open iface s port #!key (trynum 50))
  (let ((s (if s s (make-socket 'rep)))
	(p (if (number? port) port 5555))
 	(old-handler (current-exception-handler)))
    (handle-exceptions
     exn
     (begin
       (debug:print 0 "Failed to bind to port " p ", trying next port")
       (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
       ;; (old-handler)
       ;; (print-call-chain)
       (if (> trynum 0)
	   (server:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1))
	   (debug:print-info 0 "Tried ports up to " p 
			     " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use")))
     (let ((zmq-url (conc "tcp://" iface ":" p)))
       (print "Trying to start server on " zmq-url)
       (bind-socket s zmq-url)
       (set! *runremote* #f)
       (debug:print 0 "Server started on " zmq-url)
       (set! *server-id* (open-run-close tasks:server-register tasks:open-db (current-process-id) iface p 0 'live))
       s))))

(define (server:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))

(define (server:get-client-signature)
  (if *my-client-signature* *my-client-signature*
      (let ((sig (server:mk-signature)))
	(set! *my-client-signature* sig)
	*my-client-signature*)))

;; 
(define (server:client-connect iface port #!key (context #f))
  (debug:print 3 "client-connect " iface ":" port)
  (let ((connect-ok #f)
	(zmq-socket (if context 
			(make-socket 'req context)
			(make-socket 'req)))
	(conurl     (server:make-server-url (list iface port))))
    (if (socket? zmq-socket)
	(begin
	  (connect-socket zmq-socket conurl)
	  zmq-socket)
	#f)))
  

173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: failed to find megatest.config, exiting")
	    (exit))))
  (let ((hostinfo   (open-run-close tasks:get-best-server tasks:open-db do-ping: do-ping)))
    (if hostinfo
	(let ((host    (car hostinfo))
	      (port    (cadr hostinfo)))
	  ;; (zsocket (caddr hostinfo)))
	;; (set! *runremote* zsocket))
	  (let* ((host       (car hostinfo))
		 (port       (cadr hostinfo)))
	    (debug:print-info 2 "Setting up to connect to " hostinfo)
	    (handle-exceptions
	     exn
	     (begin
	       (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo)
	       (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
	       (debug:print 0 "   perhaps jobs killed with -9? Removing server records")
	       (open-run-close tasks:server-deregister tasks:open-db host port: port)
	       #f)
	     (let* ((zmq-socket (server:client-connect host port))
		    (login-res  (server:client-login zmq-socket))
		    (connect-ok (if (null? login-res) #f (car login-res)))
		    (conurl     (server:make-server-url hostinfo)))
	       (if connect-ok
		   (begin
		     (debug:print-info 2 "Logged in and connected to " conurl)
		     (set! *runremote* zmq-socket)
		     #t)
		   (begin
		     (debug:print-info 2 "Failed to login or connect to " conurl)
		     (set! *runremote* #f)
		     #f))))))
	(if (> numtries 0)
	    (let ((exe (car (argv))))
	      (debug:print-info 1 "No server available, attempting to start one...")
	      (process-run exe (list "-server" "-" "-debug" (conc *verbosity*)))
	      (sleep 2)
	      ;; not doing ping, assume the server started and registered itself
	      (server:client-setup numtries: (- numtries 1) do-ping: #f))







|
|
|
<
<
<
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|







178
179
180
181
182
183
184
185
186
187



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: failed to find megatest.config, exiting")
	    (exit))))
  (let ((hostinfo   (open-run-close tasks:get-best-server tasks:open-db do-ping: do-ping)))
    (if hostinfo
	(let ((host    (car   hostinfo))
	      (iface   (cadr  hostinfo))
	      (port    (caddr hostinfo)))



	  (debug:print-info 2 "Setting up to connect to " hostinfo)
	  (handle-exceptions
	   exn
	   (begin
	     (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo)
	     (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
	     (debug:print 0 "   perhaps jobs killed with -9? Removing server records")
	     (open-run-close tasks:server-deregister tasks:open-db host port: port)
	     #f)
	   (let* ((zmq-socket (server:client-connect iface port))
		  (login-res  (server:client-login zmq-socket))
		  (connect-ok (if (null? login-res) #f (car login-res)))
		  (conurl     (server:make-server-url (list iface port))))
	     (if connect-ok
		 (begin
		   (debug:print-info 2 "Logged in and connected to " conurl)
		   (set! *runremote* zmq-socket)
		   #t)
		 (begin
		   (debug:print-info 2 "Failed to login or connect to " conurl)
		   (set! *runremote* #f)
		   #f)))))
	(if (> numtries 0)
	    (let ((exe (car (argv))))
	      (debug:print-info 1 "No server available, attempting to start one...")
	      (process-run exe (list "-server" "-" "-debug" (conc *verbosity*)))
	      (sleep 2)
	      ;; not doing ping, assume the server started and registered itself
	      (server:client-setup numtries: (- numtries 1) do-ping: #f))

Modified tasks.scm from [5ae2b507c5] to [3a1458e323].

47
48
49
50
51
52
53

54
55
56
57
58
59
60
                                start_time TIMESTAMP,
                                last_update TIMESTAMP,
                                hostname TEXT,
                                username TEXT,
                               CONSTRAINT monitors_constraint UNIQUE (pid,hostname));")
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY,
                                  pid INTEGER,

                                  hostname TEXT,
                                  port INTEGER,
                                  start_time TIMESTAMP,
                                  priority INTEGER,
                                  state TEXT,
                                  mt_version TEXT,
                                  heartbeat TIMESTAMP,







>







47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
                                start_time TIMESTAMP,
                                last_update TIMESTAMP,
                                hostname TEXT,
                                username TEXT,
                               CONSTRAINT monitors_constraint UNIQUE (pid,hostname));")
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY,
                                  pid INTEGER,
                                  interface TEXT,
                                  hostname TEXT,
                                  port INTEGER,
                                  start_time TIMESTAMP,
                                  priority INTEGER,
                                  state TEXT,
                                  mt_version TEXT,
                                  heartbeat TIMESTAMP,
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
    mdb))
    
;;======================================================================
;; Server and client management
;;======================================================================

;; state: 'live, 'shutting-down, 'dead
(define (tasks:server-register mdb pid hostname port priority state)
  (sqlite3:execute 
   mdb 
   "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat) VALUES(?,?,?,strftime('%s','now'),?,?,?,strftime('%s','now'));"
   pid hostname port priority (conc state) megatest-version)
  (tasks:server-get-server-id mdb hostname port pid))

;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used!
(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f))
  (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid)
  (if pid
      (sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid)
      (if port
	  (sqlite3:execute mdb "DELETE FROM servers WHERE  hostname=? AND port=?;" hostname port)
	  (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified"))))

(define (tasks:server-deregister-self mdb hostname)
  (tasks:server-deregister mdb hostname pid: (current-process-id)))

(define (tasks:server-get-server-id mdb host port pid)
  (let ((res #f))
    (sqlite3:for-each-row
     (lambda (id)
       (set! res id))
     mdb
     (if (and host  pid)
	 "SELECT id FROM servers WHERE hostname=? AND pid=?;"
	 "SELECT id FROM servers WHERE hostname=? AND port=?;")
     host (if pid pid port))
    res))

(define (tasks:server-update-heartbeat mdb server-id)
  (sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id))

;; alive servers keep the heartbeat field upto date with seconds every 6 or so seconds
(define (tasks:server-alive? mdb server-id #!key (hostname #f)(port #f)(pid #f))
  (let* ((server-id  (if server-id 
			 server-id
			 (tasks:server-get-server-id mdb hostname port pid)))
	 (heartbeat-delta 99e9))
    (sqlite3:for-each-row
     (lambda (delta)
       (set! heartbeat-delta delta))
     mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id)
    (< (- (current-seconds) heartbeat-delta) 10)))

(define (tasks:client-register mdb pid hostname cmdline)
  (sqlite3:execute
   mdb
   "INSERT OR REPLACE INTO clients (server_id,pid,hostname,cmdline,login_time) VALUES(?,?,?,?,strftime('%s','now'));")
  (tasks:server-get-server-id mdb)
  pid hostname cmdline)







|


|
|
|













|





|


|















|







73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
    mdb))
    
;;======================================================================
;; Server and client management
;;======================================================================

;; state: 'live, 'shutting-down, 'dead
(define (tasks:server-register mdb pid interface port priority state)
  (sqlite3:execute 
   mdb 
   "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface) VALUES(?,?,?,strftime('%s','now'),?,?,?,strftime('%s','now'),?);"
   pid (get-host-name) port priority (conc state) megatest-version interface)
  (tasks:server-get-server-id mdb (get-host-name) port pid))

;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used!
(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f))
  (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid)
  (if pid
      (sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid)
      (if port
	  (sqlite3:execute mdb "DELETE FROM servers WHERE  hostname=? AND port=?;" hostname port)
	  (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified"))))

(define (tasks:server-deregister-self mdb hostname)
  (tasks:server-deregister mdb hostname pid: (current-process-id)))

(define (tasks:server-get-server-id mdb hostname port pid)
  (let ((res #f))
    (sqlite3:for-each-row
     (lambda (id)
       (set! res id))
     mdb
     (if (and hostname  pid)
	 "SELECT id FROM servers WHERE hostname=? AND pid=?;"
	 "SELECT id FROM servers WHERE hostname=? AND port=?;")
     hostname (if pid pid port))
    res))

(define (tasks:server-update-heartbeat mdb server-id)
  (sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id))

;; alive servers keep the heartbeat field upto date with seconds every 6 or so seconds
(define (tasks:server-alive? mdb server-id #!key (hostname #f)(port #f)(pid #f))
  (let* ((server-id  (if server-id 
			 server-id
			 (tasks:server-get-server-id mdb hostname port pid)))
	 (heartbeat-delta 99e9))
    (sqlite3:for-each-row
     (lambda (delta)
       (set! heartbeat-delta delta))
     mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id)
    (> heartbeat-delta 10)))

(define (tasks:client-register mdb pid hostname cmdline)
  (sqlite3:execute
   mdb
   "INSERT OR REPLACE INTO clients (server_id,pid,hostname,cmdline,login_time) VALUES(?,?,?,?,strftime('%s','now'));")
  (tasks:server-get-server-id mdb)
  pid hostname cmdline)
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166

167

168
169
170
171
172
173











174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194

;; ping each server in the db and return first found that responds. 
;; remove any others. will not necessarily remove all!
(define (tasks:get-best-server mdb #!key (do-ping #f))
  (let ((res '())
	(best #f))
    (sqlite3:for-each-row
     (lambda (id hostname port)
       (set! res (cons (list hostname port) res))
       (debug:print-info 1 "Found " hostname ":" port))
     mdb
     "SELECT id,hostname,port FROM servers WHERE state='live' AND mt_version=? ORDER BY start_time DESC LIMIT 1;" megatest-version)
    ;; (print "res=" res)
    (if (null? res) #f
	(let loop ((hed (car res))
		   (tal (cdr res)))
	  ;; (print "hed=" hed ", tal=" tal)
	  (let* ((host     (car hed))

		 (port     (cadr hed))

		 ;; (ping-res (if do-ping (server:ping host port return-socket: #f) '(#t "NO PING" #f)))
		 (alive    (open-run-close tasks:server-alive? tasks:open-db host port: port)) ;; (car ping-res))
		 ;; (reason   (cadr ping-res))
		 ;; (zsocket  (caddr ping-res))
		 )
	    (if alive (list host port)











		;; remove defunct server from table
		(begin
		  (open-run-close tasks:server-deregister tasks:open-db  host port: port)
		  (if (null? tal)
		      #f
		      (loop (car tal)(cdr tal))))))))))

(define (tasks:get-all-servers mdb)
  (let ((res '()))
    (sqlite3:for-each-row
     (lambda (id pid hostname port start-time priority state mt-version)
       (set! res (cons (vector id pid hostname port start-time priority state mt-version) res)))
     mdb
     "SELECT id,pid,hostname,port,start_time,priority,state,mt_version FROM servers ORDER BY start_time DESC;")
    res))
       

;;======================================================================
;; Tasks and Task monitors
;;======================================================================








|
|


|





|
>
|
>





|
>
>
>
>
>
>
>
>
>
>
>










|
|

|







150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208

;; ping each server in the db and return first found that responds. 
;; remove any others. will not necessarily remove all!
(define (tasks:get-best-server mdb #!key (do-ping #f))
  (let ((res '())
	(best #f))
    (sqlite3:for-each-row
     (lambda (id hostname interface port pid)
       (set! res (cons (list hostname interface port pid) res))
       (debug:print-info 1 "Found " hostname ":" port))
     mdb
     "SELECT id,hostname,interface,port,pid FROM servers WHERE state='live' AND mt_version=? ORDER BY start_time DESC LIMIT 1;" megatest-version)
    ;; (print "res=" res)
    (if (null? res) #f
	(let loop ((hed (car res))
		   (tal (cdr res)))
	  ;; (print "hed=" hed ", tal=" tal)
	  (let* ((host     (car    hed))
		 (iface    (cadr   hed))
		 (port     (caddr  hed))
		 (pid      (cadddr hed))
		 ;; (ping-res (if do-ping (server:ping host port return-socket: #f) '(#t "NO PING" #f)))
		 (alive    (open-run-close tasks:server-alive? tasks:open-db host port: port)) ;; (car ping-res))
		 ;; (reason   (cadr ping-res))
		 ;; (zsocket  (caddr ping-res))
		 )
	    (if alive
		;; (if (server:ping iface port)
		    (list host iface port)
		 ;;    ;; not actually alive, destroy!
		 ;;    (begin
		 ;;      (if (equal? host (get-host-name))
		 ;;          (begin
		 ;;            (debug:print-info 0 "Killing process " pid " on host " host " with signal/term")
		 ;;            (send-signal pid signal/term))
		 ;;          (debug:print 0 "WARNING: Can't kill process " pid " on host " host))
		 ;;      (open-run-close tasks:server-deregister tasks:open-db  host port: port)
		 ;;      #f))
		;; remove defunct server from table
		(begin
		  (open-run-close tasks:server-deregister tasks:open-db  host port: port)
		  (if (null? tal)
		      #f
		      (loop (car tal)(cdr tal))))))))))

(define (tasks:get-all-servers mdb)
  (let ((res '()))
    (sqlite3:for-each-row
     (lambda (id pid hostname interface port start-time priority state mt-version)
       (set! res (cons (vector id pid hostname interface port start-time priority state mt-version) res)))
     mdb
     "SELECT id,pid,hostname,interface,port,start_time,priority,state,mt_version FROM servers ORDER BY start_time DESC;")
    res))
       

;;======================================================================
;; Tasks and Task monitors
;;======================================================================