Megatest

Check-in [12d923326a]
Login
Overview
Comment:Heartbeat monitoring, on-the-fly server starting all working in simple manual testing
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 12d923326abacc7e1f37a17e04fd6d97183330f5
User & Date: matt on 2012-11-03 17:11:18
Other Links: manifest | tags
Context
2012-11-03
17:18
Added more instrumentation info on server pulse. Removed rm of monitor.db in Makefile check-in: 04c31891a9 user: matt tags: trunk
17:11
Heartbeat monitoring, on-the-fly server starting all working in simple manual testing check-in: 12d923326a user: matt tags: trunk
2012-11-02
18:33
borked server heartbeat logic check-in: ece909ab1c user: mrwellan tags: trunk
Changes

Modified db.scm from [0f75487810] to [7d7e161003].

1145
1146
1147
1148
1149
1150
1151


1152
1153
1154
1155
1156
1157
1158
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160







+
+







	   (set! *time-to-exit* #t)
	   #t)
	  ((set-verbosity)
	   (set! *verbosity* (caddr params))
	   *verbosity*)
	  ((get-verbosity)
	   *verbosity*)
	  ((ping)
	   'hi)
	  (else
	   (mutex-lock! *incoming-mutex*)
	   (set! *last-db-access* (current-seconds))
	   (set! *incoming-data* (cons 
				  (vector qry-name
					  (current-milliseconds)
					  remparam)
1174
1175
1176
1177
1178
1179
1180

1181
1182
1183
1184
1185
1186
1187
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190







+








(define (cdb:use-non-blocking-mode proc)
  (set! *client-non-blocking-mode* #t)
  (let ((res (proc)))
    (set! *client-non-blocking-mode* #f)
    res))
  
;; params = 'target cached remparams
(define (cdb:client-call zmq-socket . params)
  (debug:print-info 11 "cdb:client-call zmq-socket=" zmq-socket " params=" params)
  (let ((zdat (db:obj->string params)) ;; (with-output-to-string (lambda ()(serialize params))))
	(res  #f))
    (send-message zmq-socket zdat)
    (set! res (db:string->obj (if *client-non-blocking-mode* 
				  (receive-message* zmq-socket zdat)

Modified megatest.scm from [71ef6a547c] to [c4c3f3d52b].

289
290
291
292
293
294
295
296

297
298
299
300
301
302
303
304
305
306
307
308
309
310


311
312
313
314
315
316

317
318
319
320
321
322


323
324
325
326
327
328
329
330
331
332
333
289
290
291
292
293
294
295

296
297
298
299
300
301
302
303
304






305
306




307

308
309





310
311




312
313
314
315
316
317
318







-
+








-
-
-
-
-
-
+
+
-
-
-
-

-
+

-
-
-
-
-
+
+
-
-
-
-







		      (hostname   (vector-ref server 2))
		      (interface  (vector-ref server 3))
		      (port       (vector-ref server 4))
		      (start-time (vector-ref server 5))
		      (priority   (vector-ref server 6))
		      (state      (vector-ref server 7))
		      (mt-ver     (vector-ref server 8))
		      (status     (open-run-close tasks:server-alive? tasks:open-db hostname port: port))
		      (status     (open-run-close tasks:server-alive? tasks:open-db #f hostname: hostname port: port))
		      (killed     #f)
		      (zmq-socket (if status (server:client-connect hostname port) #f)))
		 ;; no need to login as status of #t indicates we are connecting to correct 
		 ;; server
		 (if (or (not status)    ;; no point in keeping dead records in the db
			 (and khost-port ;; kill by host/port
			      (equal? hostname (car khost-port))
			      (equal? port (string->number (cadr khost-port)))))
		     (begin
		       (open-run-close tasks:server-deregister tasks:open-db  hostname port: port)
		       (if status ;; #t means alive
			   (begin
			     (if (equal? hostname (get-host-name))
				 (process-signal pid signal/term) ;; local machine, send sig term
		     (tasks:kill-server status hostname port pid))

				 (cdb:kill-server zmq-socket))    ;; remote machine, try telling server to commit suicide
			     (debug:print-info 1 "Killed server by host:port at " hostname ":" port))
			   (debug:print-info 1 "Removing defunct server record for " hostname ":" port))
		       (set! killed #t)))
		 (if (and kpid
			  ;; (equal? hostname (car khost-port))
			  (equal? hostname (car khost-port))
			  (equal? kpid pid)) ;;; YEP, ALL WITH PID WILL BE KILLED!!!
		     (begin
		       (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid)
		       (set! killed #t)
		       (if status 
			   (if (equal? hostname (get-host-name))
		     (tasks:kill-server status hostname #f pid))

			       (process-signal pid signal/term) ;; local machine, send sig term
			       (debug:print 0 "WARNING: Can't kill a dead server on host " hostname)))
		       (debug:print-info 1 "Killed server by pid at " hostname ":" port)))
		 ;; (if zmq-socket (close-socket  zmq-socket))
		 (format #t fmtstr id mt-ver pid hostname interface port start-time priority 
			 (if status "alive" "dead"))))
	     servers)
	    (debug:print-info 1 "Done with listservers")
	    (set! *didsomething* #t)
	    (exit) ;; must do, would have to add checks to many/all calls below
	    )

Modified server.scm from [aa1a28766d] to [442b819d32].

25
26
27
28
29
30
31
32


33











34
35
36
37
38
39
40
41

42
43
44
45
46
47
48
49


50
51

52
53

54


55




56
57
58
59
60

61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80




81
82
83
84
85
86
87



88
89
90
91
92
93
94
95
96
97
98



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115






116

117

118
119
120
121
122
123
124







125
126
127






128
129
130
131
132
133
134
135

136
137
138
139
140
141
142
25
26
27
28
29
30
31

32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

62
63
64

65
66

67
68
69
70
71
72
73
74
75
76
77
78
79

80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96




97
98
99
100
101
102
103
104



105
106
107
108
109
110
111
112
113
114
115
116
117

118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134



135
136
137
138
139
140
141
142

143
144
145





146
147
148
149
150
151
152

153

154
155
156
157
158
159
160
161
162
163
164
165
166

167
168
169
170
171
172
173
174







-
+
+

+
+
+
+
+
+
+
+
+
+
+








+







-
+
+

-
+

-
+

+
+

+
+
+
+




-
+
















-
-
-
-
+
+
+
+




-
-
-
+
+
+










-
+
+
+














-
-
-
+
+
+
+
+
+

+
-
+


-
-
-
-
-
+
+
+
+
+
+
+
-

-
+
+
+
+
+
+







-
+







(include "db_records.scm")

(define (server:make-server-url hostport)
  (if (not hostport)
      #f
      (conc "tcp://" (car hostport) ":" (cadr hostport))))

(define  *server-loop-heart-beat* (list 'start (current-seconds)))
(define  *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))

(define (server:self-ping iface port)
  (let ((zsocket (server:client-connect iface port)))
    (let loop ()
      (thread-sleep! 5)
      (cdb:client-call zsocket 'ping #t)
      (debug:print 4 "server:self-ping - I'm alive on " iface ":" port "!")
      (mutex-lock! *heartbeat-mutex*)
      (set! *server-loop-heart-beat* (current-seconds))
      (mutex-unlock! *heartbeat-mutex*)
      (loop))))
    
(define (server:run hostn)
  (debug:print 0 "Attempting to start the server ...")
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
	    (exit))))
  (let* ((zmq-socket     #f)
	 (zmq-socket-dat #f)
	 (iface          (if (string=? "-" hostn)
			     "*" ;; (get-host-name) 
			     hostn))
	 (hostname       (get-host-name))
	 (ipaddrstr      (let ((ipstr (if (string=? "-" hostn)
					  (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					  #f)))
			   (if ipstr ipstr hostname))))
			   (if ipstr ipstr hostname)))
	 (actual-port    #f))
    ;; (set! zmq-socket (server:find-free-port-and-open iface zmq-socket 5555 0))
    (set! zmq-socket (server:find-free-port-and-open ipaddrstr zmq-socket (if (args:get-arg "-port")
    (set! zmq-socket-dat (server:find-free-port-and-open ipaddrstr zmq-socket (if (args:get-arg "-port")
									      (string->number (args:get-arg "-port"))
									      5555)
									      (+ 5000 (random 1001)))
						     0))
    (set! zmq-socket  (cadr  zmq-socket-dat))
    (set! actual-port (caddr zmq-socket-dat))
    (set! *cache-on* #t)

    ;; (set! th1 (make-thread (lambda ()
    ;;     		     (server:self-ping ipaddrstr actual-port))))
    ;; (thread-start! th1)
    
    ;; what to do when we quit
    ;;
    (on-exit (lambda ()
	       (if (and *toppath* *server-id*)
	       (if (and *toppath* *server-info*)
		   (begin
		     (open-run-close tasks:server-deregister-self tasks:open-db ipaddrstr))
		   (let loop () 
		     (let ((queue-len 0))
		       (thread-sleep! (random 5))
		       (mutex-lock! *incoming-mutex*)
		       (set! queue-len (length *incoming-data*))
		       (mutex-unlock! *incoming-mutex*)
		       (if (> queue-len 0)
			   (begin
			     (debug:print-info 0 "Queue not flushed, waiting ...")
			     (loop))))))))

    ;; The heavy lifting
    ;;
    (let loop ()
      ;; Ugly yuk. 
      (mutex-lock! *incoming-mutex*)
      (set! *server-loop-heart-beat* (list 'waiting (current-seconds)))
      (mutex-unlock! *incoming-mutex*)
      ;; ;; Ugly yuk. 
      ;; (mutex-lock! *incoming-mutex*)
      ;; (set! *server-loop-heart-beat* (list 'waiting (current-seconds)))
      ;; (mutex-unlock! *incoming-mutex*)
      (let* ((rawmsg (receive-message* zmq-socket))
	     (params (db:string->obj rawmsg)) ;; (with-input-from-string rawmsg (lambda ()(deserialize))))
	     (res    #f))
	;;; Ugly yuk. 
	(mutex-lock! *incoming-mutex*)
	(set! *server-loop-heart-beat* (list 'working (current-seconds)))
	(mutex-unlock! *incoming-mutex*)
	;; (mutex-lock! *incoming-mutex*)
	;; (set! *server-loop-heart-beat* (list 'working (current-seconds)))
	;; (mutex-unlock! *incoming-mutex*)
	(debug:print-info 12 "server=> received params=" params)
	(set! res (cdb:cached-access params))
	(debug:print-info 12 "server=> processed res=" res)
	(send-message zmq-socket (db:obj->string res))
	(if (not *time-to-exit*)
	    (loop)
	    (begin
	      (open-run-close tasks:server-deregister-self tasks:open-db #f)
	      (db:write-cached-data)
	      (exit)
	      ))))))
	      ))))
    (thread-join! th1)))


;; run server:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  (let loop ((count 0))
    (thread-sleep! 3) ;; no need to do this very often
    (db:write-cached-data)
    ;; (print "Server running, count is " count)
    (if (< count 2) ;; 3x3 = 9 secs aprox
	(loop (+ count 1))
	(let ((numrunning            (open-run-close db:get-count-tests-running #f))
	      (server-loop-heartbeat #f))
	;;; Ugly yuk. 
	  (mutex-lock! *incoming-mutex*)
	      (server-loop-heartbeat #f)
	      (server-info           #f)
	      (pulse                 0))
	  ;; BUG add a wait on server alive here!!
	  ;; ;; Ugly yuk. 
	  (mutex-lock! *heartbeat-mutex*)
	  (set! server-loop-heartbeat *server-loop-heart-beat*)
	  (set! server-info *server-info*)
	  (mutex-unlock! *incoming-mutex*)
	  (mutex-unlock! *heartbeat-mutex*)
	  ;; The logic here is that if the server loop gets stuck blocked in working
	  ;; we don't want to update our heartbeat
	  (let ((server-state  (car server-loop-heartbeat))
		(server-update (cadr server-loop-heartbeat)))
	    (if (or (eq? server-state 'waiting)
		    (< (- (current-seconds) server-update) 10))
		(open-run-close tasks:server-update-heartbeat  tasks:open-db *server-id*)
	  (set! pulse (- (current-seconds) server-loop-heartbeat))
	  (debug:print-info 1 "Heartbeat period is " pulse " on " (cadr server-info) ":" (caddr server-info))
	  (if (> pulse 11) ;; must stay less than 10 seconds 
	      (begin
		(debug:print 0 "ERROR: Heartbeat failed, committing servercide")
		(exit))
	      (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)))
		(debug:print "ERROR: No heartbeat update, server appears stuck")))
	  (if (or (> numrunning 0) ;; stay alive for two days after last access
		  (> (+ *last-db-access* (* 48 60 60))(current-seconds)))
		  (> (+ *last-db-access* 
			;; (* 48 60 60)    ;; 48 hrs
			;; 60              ;; one minute
			(* 60 60)       ;; one hour
			)
		     (current-seconds)))
	      (begin
		(debug:print-info 2 "Server continuing, tests running: " numrunning ", seconds since last db access: " (- (current-seconds) *last-db-access*))
		(loop 0))
	      (begin
		(debug:print-info 0 "Starting to shutdown the server.")
		;; need to delete only *my* server entry (future use)
		(set! *time-to-exit* #t)
		(open-run-close tasks:server-deregister-self tasks:open-db)
		(open-run-close tasks:server-deregister-self tasks:open-db (get-host-name))
		(thread-sleep! 1)
		(debug:print-info 0 "Max cached queries was " *max-cache-size*)
		(debug:print-info 0 "Server shutdown complete. Exiting")
		(exit)))))))

(define (server:find-free-port-and-open iface s port #!key (trynum 50))
  (let ((s (if s s (make-socket 'rep)))
154
155
156
157
158
159
160

161
162



163
164
165
166
167
168
169
186
187
188
189
190
191
192
193


194
195
196
197
198
199
200
201
202
203







+
-
-
+
+
+







	   (debug:print-info 0 "Tried ports up to " p 
			     " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use")))
     (let ((zmq-url (conc "tcp://" iface ":" p)))
       (print "Trying to start server on " zmq-url)
       (bind-socket s zmq-url)
       (set! *runremote* #f)
       (debug:print 0 "Server started on " zmq-url)
       (mutex-lock! *heartbeat-mutex*)
       (set! *server-id* (open-run-close tasks:server-register tasks:open-db (current-process-id) iface p 0 'live))
       s))))
       (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) iface p 0 'live))
       (mutex-unlock! *heartbeat-mutex*)
       (list iface s port)))))

(define (server:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))
195
196
197
198
199
200
201
202

203
204
205
206
207
208

209
210
211
212
213
214
215
216


217
218
219
220

221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241




242
243

244
245
246
247
248
249
250
251
252
253
254












255
256


257
258



259
260
261
262
263


264
265
266
267
268
269
270
229
230
231
232
233
234
235

236
237
238
239
240
241

242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305


306
307
308

309
310
311
312
313
314
315

316
317
318
319
320
321
322
323
324







-
+





-
+








+
+




+


















-
-
-
+
+
+
+


+











+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+

-
+
+
+




-
+
+







(define (server:client-logout zmq-socket)
  (let ((ok (and (socket? zmq-socket)
		 (cdb:logout zmq-socket *toppath* (server:get-client-signature)))))
    ;; (close-socket zmq-socket)
    ok))

;; Do all the connection work, start a server if not already running
(define (server:client-setup #!key (numtries 50)(do-ping #f))
(define (server:client-setup #!key (numtries 50))
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: failed to find megatest.config, exiting")
	    (exit))))
  (let ((hostinfo   (open-run-close tasks:get-best-server tasks:open-db do-ping: do-ping)))
  (let ((hostinfo   (open-run-close tasks:get-best-server tasks:open-db)))
    (if hostinfo
	(let ((host    (car   hostinfo))
	      (iface   (cadr  hostinfo))
	      (port    (caddr hostinfo)))
	  (debug:print-info 2 "Setting up to connect to " hostinfo)
	  (handle-exceptions
	   exn
	   (begin
	     ;; something went wrong in connecting to the server. In this scenario it is ok
	     ;; to try again
	     (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo)
	     (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
	     (debug:print 0 "   perhaps jobs killed with -9? Removing server records")
	     (open-run-close tasks:server-deregister tasks:open-db host port: port)
	     (server:client-setup (- numtries 1))
	     #f)
	   (let* ((zmq-socket (server:client-connect iface port))
		  (login-res  (server:client-login zmq-socket))
		  (connect-ok (if (null? login-res) #f (car login-res)))
		  (conurl     (server:make-server-url (list iface port))))
	     (if connect-ok
		 (begin
		   (debug:print-info 2 "Logged in and connected to " conurl)
		   (set! *runremote* zmq-socket)
		   #t)
		 (begin
		   (debug:print-info 2 "Failed to login or connect to " conurl)
		   (set! *runremote* #f)
		   #f)))))
	(if (> numtries 0)
	    (let ((exe (car (argv))))
	      (debug:print-info 1 "No server available, attempting to start one...")
	      (process-run exe (list "-server" "-" "-debug" (conc *verbosity*)))
	      (sleep 2)
	      ;; not doing ping, assume the server started and registered itself
	      (server:client-setup numtries: (- numtries 1) do-ping: #f))
	      (sleep 5) ;; give server time to start
	      ;; we are starting a server, do not try again! That can lead to 
	      ;; recursively starting many processes!!!
	      (server:client-setup numtries: 0))
	    (debug:print-info 1 "Too many attempts, giving up")))))

;; all routes though here end in exit ...
(define (server:launch)
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, exiting")
	    (exit))))
  (debug:print-info 0 "Starting the standalone server")
  (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
    (if hostinfo
	(debug:print-info 1 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo))
	(if *toppath* 
	    (let* ((th1 (make-thread (lambda ()
				       (let ((server-info #f))
					 ;; wait for the server to be online and available
					 (let loop ()
					   (debug:print-info 1 "Waiting for the server to come online before starting heartbeat")
					   (thread-sleep! 5)
					   (mutex-lock! *heartbeat-mutex*)
					   (set! server-info *server-info* )
					   (mutex-unlock! *heartbeat-mutex*)
					   (if (not server-info)(loop)))
					 (debug:print 1 "Server alive, starting self-ping")
					 (server:self-ping (cadr server-info)(caddr server-info)))) "Self ping"))
	    (let* ((th2 (make-thread (lambda ()
				       (server:run (args:get-arg "-server")))))
		   (th2 (make-thread (lambda ()
				       (server:run (args:get-arg "-server"))) "Server run"))
		   (th3 (make-thread (lambda ()
				       (server:keep-running)))))
				       (server:keep-running)) "Keep running")))
	      (set! *client-non-blocking-mode* #t)
	      (thread-start! th1)
	      (thread-start! th2)
	      (thread-start! th3)
	      (set! *didsomething* #t)
	      (thread-join! th3))
	    (debug:print 0 "ERROR: Failed to setup for megatest")))))
	    (debug:print 0 "ERROR: Failed to setup for megatest")))
    (exit)))

(define (server:client-launch #!key (do-ping #f))
  (if (server:client-setup do-ping: do-ping)
      (debug:print-info 2 "connected as client")
      (begin
	(debug:print 0 "ERROR: Failed to connect as client")
	(exit))))
285
286
287
288
289
290
291
292


293
294
295
296
297
298
299
300
301


302
303
304
305
306
307
308
339
340
341
342
343
344
345

346
347
348
349
350
351
352
353
354
355

356
357
358
359
360
361
362
363
364







-
+
+








-
+
+







				    (begin
				      (server:client-logout zmq-socket)
				      (close-socket  zmq-socket)))
				(set! res (list #t numclients (if return-socket zmq-socket #f))))
			      (begin
				;; (close-socket zmq-socket)
				(set! res (list #f "CAN'T LOGIN" #f))))
			  (set! res (list #f "CAN'T CONNECT" #f)))))))
			  (set! res (list #f "CAN'T CONNECT" #f)))))
		  "Ping: th1"))
	    (th2 (make-thread
		  (lambda ()
		    (let loop ((count 1))
		      (debug:print-info 1 "Ping " count " server on " host " at port " port)
		      (thread-sleep! 2)
		      (if (< count (/ secs 2))
			  (loop (+ count 1))))
		    ;; (thread-terminate! th1)
		    (set! res (list #f "TIMED OUT" #f))))))
		    (set! res (list #f "TIMED OUT" #f)))
		  "Ping: th2")))
       (thread-start! th2)
       (thread-start! th1)
       (handle-exceptions
	exn
	(set! res (list #f "TIMED OUT" #f))
	(thread-join! th1 secs))
       res))))

Modified tasks.scm from [efc84088da] to [6c83278de0].

78
79
80
81
82
83
84

85



86
87
88
89
90
91


92
93


94
95
96
97
98
99
100
78
79
80
81
82
83
84
85

86
87
88
89
90
91
92
93

94
95
96

97
98
99
100
101
102
103
104
105







+
-
+
+
+





-
+
+

-
+
+








;; state: 'live, 'shutting-down, 'dead
(define (tasks:server-register mdb pid interface port priority state)
  (sqlite3:execute 
   mdb 
   "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface) VALUES(?,?,?,strftime('%s','now'),?,?,?,strftime('%s','now'),?);"
   pid (get-host-name) port priority (conc state) megatest-version interface)
  (list 
  (tasks:server-get-server-id mdb (get-host-name) port pid))
   (tasks:server-get-server-id mdb (get-host-name) port pid)
   interface
   port))

;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used!
(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f))
  (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid)
  (if pid
      (sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid)
      ;; (sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid)
      (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid)
      (if port
	  (sqlite3:execute mdb "DELETE FROM servers WHERE  hostname=? AND port=?;" hostname port)
	  ;; (sqlite3:execute mdb "DELETE FROM servers WHERE  hostname=? AND port=?;" hostname port)
	  (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND port=?;" hostname port)
	  (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified"))))

(define (tasks:server-deregister-self mdb hostname)
  (tasks:server-deregister mdb hostname pid: (current-process-id)))

(define (tasks:server-get-server-id mdb hostname port pid)
  (let ((res #f))
117
118
119
120
121
122
123

124
125
126
127
128
129
130

131
132
133
134
135
136
137
122
123
124
125
126
127
128
129
130
131
132
133
134
135

136
137
138
139
140
141
142
143







+






-
+







			 server-id
			 (tasks:server-get-server-id mdb hostname port pid)))
	 (heartbeat-delta 99e9))
    (sqlite3:for-each-row
     (lambda (delta)
       (set! heartbeat-delta delta))
     mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id)
    (debug:print 1 "Found heartbeat-delta of " heartbeat-delta " for server with id " server-id)
    (< heartbeat-delta 10)))

(define (tasks:client-register mdb pid hostname cmdline)
  (sqlite3:execute
   mdb
   "INSERT OR REPLACE INTO clients (server_id,pid,hostname,cmdline,login_time) VALUES(?,?,?,?,strftime('%s','now'));")
  (tasks:server-get-server-id mdb)
  (tasks:server-get-server-id mdb hostname #f pid)
  pid hostname cmdline)

(define (tasks:client-logout mdb pid hostname cmdline)
  (sqlite3:execute
   mdb
   "UPDATE clients SET logout_time=strftime('%s','now') WHERE pid=? AND hostname=? AND cmdline=?;"
   pid hostname cmdline))
146
147
148
149
150
151
152
153

154
155
156
157
158
159

160
161
162
163
164
165
166
167
168
169
170
171
172

173
174
175
176

177
178


179
180
181
182
183
184
185
186
187
188
189

190

191
192
193
194



























195
196
197
198
199
200
201
152
153
154
155
156
157
158

159
160
161
162
163
164

165
166
167
168
169
170
171
172
173
174
175
176


177



178
179


180
181










182
183

184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222







-
+





-
+











-
-
+
-
-
-

+
-
-
+
+
-
-
-
-
-
-
-
-
-
-

+
-
+




+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







     server-id)))

(define (tasks:have-clients? mdb server-id)
  (null? (tasks:get-logged-in-clients mdb server-id)))

;; ping each server in the db and return first found that responds. 
;; remove any others. will not necessarily remove all!
(define (tasks:get-best-server mdb #!key (do-ping #f))
(define (tasks:get-best-server mdb)
  (let ((res '())
	(best #f))
    (sqlite3:for-each-row
     (lambda (id hostname interface port pid)
       (set! res (cons (list hostname interface port pid) res))
       (debug:print-info 1 "Found " hostname ":" port))
       (debug:print-info 1 "Found existing server " hostname ":" port " registered in db"))
     mdb
     "SELECT id,hostname,interface,port,pid FROM servers WHERE state='live' AND mt_version=? ORDER BY start_time DESC LIMIT 1;" megatest-version)
    ;; (print "res=" res)
    (if (null? res) #f
	(let loop ((hed (car res))
		   (tal (cdr res)))
	  ;; (print "hed=" hed ", tal=" tal)
	  (let* ((host     (car    hed))
		 (iface    (cadr   hed))
		 (port     (caddr  hed))
		 (pid      (cadddr hed))
		 ;; (ping-res (if do-ping (server:ping host port return-socket: #f) '(#t "NO PING" #f)))
		 (alive    (open-run-close tasks:server-alive? tasks:open-db host port: port)) ;; (car ping-res))
		 (alive    (open-run-close tasks:server-alive? tasks:open-db #f hostname: host port: port)))
		 ;; (reason   (cadr ping-res))
		 ;; (zsocket  (caddr ping-res))
		 )
	    (if alive
		(begin
		;; (if (server:ping iface port)
		    (list host iface port)
		  (debug:print 1 "Found an existing, alive, server " host ":" port ".")
		  (list host iface port))
		 ;;    ;; not actually alive, destroy!
		 ;;    (begin
		 ;;      (if (equal? host (get-host-name))
		 ;;          (begin
		 ;;            (debug:print-info 0 "Killing process " pid " on host " host " with signal/term")
		 ;;            (send-signal pid signal/term))
		 ;;          (debug:print 0 "WARNING: Can't kill process " pid " on host " host))
		 ;;      (open-run-close tasks:server-deregister tasks:open-db  host port: port)
		 ;;      #f))
		;; remove defunct server from table
		(begin
		  (debug:print-info 1 "Removing " host ":" port " from server registry as it appears to be dead")
		  (open-run-close tasks:server-deregister tasks:open-db  host port: port)
		  (tasks:kill-server #f host port pid)
		  (if (null? tal)
		      #f
		      (loop (car tal)(cdr tal))))))))))

(define (tasks:kill-server status hostname port pid)
  (debug:print-info 1 "Removing defunct server record for " hostname ":" port)
  (if port
      (open-run-close tasks:server-deregister tasks:open-db  hostname port: port)
      (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid))
  
  (if status ;; #t means alive
      (begin
	(if (equal? hostname (get-host-name))
	    (begin
	      (debug:print 1 "Sending signal/term to " pid " on " hostname)
	      (process-signal pid signal/term)
	      (thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill
	      (process-signal pid signal/kill)) ;; local machine, send sig term
	    (begin
	      (debug:print-info 1 "Telling alive server on " hostname ":" port " to commit servercide")
	      (cdb:kill-server zmq-socket))))    ;; remote machine, try telling server to commit suicide
      (begin
	(if status 
	    (if (equal? hostname (get-host-name))
		(begin
		  (debug:print-info 1 "Sending signal/term to " pid " on " hostname)
		  (process-signal pid signal/term)  ;; local machine, send sig term
		  (thread-sleep! 5)                 ;; give it five seconds to die peacefully then do a brutal kill
		  (process-signal pid signal/kill)) 
		(debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname))))))

(define (tasks:get-all-servers mdb)
  (let ((res '()))
    (sqlite3:for-each-row
     (lambda (id pid hostname interface port start-time priority state mt-version)
       (set! res (cons (vector id pid hostname interface port start-time priority state mt-version) res)))
     mdb
     "SELECT id,pid,hostname,interface,port,start_time,priority,state,mt_version FROM servers ORDER BY start_time DESC;")