Megatest

Check-in [f2556f210b]
Login
Overview
Comment:Get list of servers now works
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | server-log-handshaking
Files: files | file ages | folders
SHA1: f2556f210b10e1b4877a92e4bc3120b9cd50c70e
User & Date: matt on 2017-01-25 23:48:54
Other Links: branch diff | manifest | tags
Context
2017-01-27
22:47
Most needed changes made for multi-server with log handshaking. However there is a segfault ... check-in: 8a4205f90b user: matt tags: server-log-handshaking
2017-01-25
23:48
Get list of servers now works check-in: f2556f210b user: matt tags: server-log-handshaking
22:02
Stripped out nearly all server management stuff related to monitor.db server tables. partially replaced with log file based handshaking check-in: 6d32c79c1e user: matt tags: server-log-handshaking
Changes

Modified http-transport.scm from [c66a3571fe] to [bbf4c4febb].

334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
;; run http-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (http-transport:keep-running) 
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server for run-id=" run-id)
  (let* ((server-start-time (current-seconds))
	 (server-info (let loop ((start-time (current-seconds))
				 (changed    #t)
				 (last-sdat  "not this"))
                        (let ((sdat #f))
			  (thread-sleep! 0.01)
			  (debug:print-info 0 *default-log-port* "Waiting for server alive signature")







|







334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
;; run http-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (http-transport:keep-running) 
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server")
  (let* ((server-start-time (current-seconds))
	 (server-info (let loop ((start-time (current-seconds))
				 (changed    #t)
				 (last-sdat  "not this"))
                        (let ((sdat #f))
			  (thread-sleep! 0.01)
			  (debug:print-info 0 *default-log-port* "Waiting for server alive signature")
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
				(debug:print-info 0 *default-log-port* "Received server alive signature")
				sdat)
                              (begin
				(debug:print-info 0 *default-log-port* "Still waiting, last-sdat=" last-sdat)
                                (sleep 4)
				(if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes
				    (begin
				      (debug:print-error 0 *default-log-port* "transport appears to have died, exiting server " server-id " for run " run-id)
				      ;; (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature")
				      (exit))
				    (loop start-time
					  (equal? sdat last-sdat)
					  sdat)))))))
         (iface       (car server-info))
         (port        (cadr server-info))
         (last-access 0)







|
<







356
357
358
359
360
361
362
363

364
365
366
367
368
369
370
				(debug:print-info 0 *default-log-port* "Received server alive signature")
				sdat)
                              (begin
				(debug:print-info 0 *default-log-port* "Still waiting, last-sdat=" last-sdat)
                                (sleep 4)
				(if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes
				    (begin
				      (debug:print-error 0 *default-log-port* "transport appears to have died, exiting server")

				      (exit))
				    (loop start-time
					  (equal? sdat last-sdat)
					  sdat)))))))
         (iface       (car server-info))
         (port        (cadr server-info))
         (last-access 0)
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410

411
412
413
414
415
416
417

418

419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
	  (loop (+ count 1) 'running bad-sync-count (current-milliseconds)))
      
      ;; Check that iface and port have not changed (can happen if server port collides)
      (mutex-lock! *heartbeat-mutex*)
      (set! sdat *server-info*)
      (mutex-unlock! *heartbeat-mutex*)
      
      (if (or (not (equal? sdat (list iface port)))
	      (not server-id))
	  (let ((new-iface (car sdat))
		(new-port  (cadr sdat)))
	    (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info")
	    (set! iface new-iface)
	    (set! port  new-port)
	    (debug:print 0 *default-log-port* "SERVER STARTED: " iface ":" port " " (current-seconds))))

      
      ;; Transfer *db-last-access* to last-access to use in checking that we are still alive
      (mutex-lock! *heartbeat-mutex*)
      (set! last-access *db-last-access*)
      (mutex-unlock! *heartbeat-mutex*)
      
      (if (common:low-noise-print 120 (conc "server running on " iface ":" port))

	  (debug:print-info 0 *default-log-port* "SERVER STARTED: " iface ":" port " " (current-seconds)))


      (let* ((hrs-since-start  (/ (- (current-seconds) server-start-time) 3600))
	     (adjusted-timeout (if (> hrs-since-start 1)
				   (- server-timeout (inexact->exact (round (* hrs-since-start 60))))  ;; subtract 60 seconds per hour
				   server-timeout)))
	(if (common:low-noise-print 120 "server timeout")
	    (debug:print-info 0 *default-log-port* "Adjusted server timeout: " adjusted-timeout))
	(cond
         ((and *server-run*
		 (> (+ last-access server-timeout)
		    (current-seconds)))
          (if (common:low-noise-print 120 "server continuing")
              (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)))
          (loop 0 server-state bad-sync-count (current-milliseconds)))
         (else
          (debug:print-info 0 *default-log-port* "Server timeed out. seconds since last db access: " (- (current-seconds) last-access))
          (http-transport:server-shutdown server-id port)))))))

(define (http-transport:server-shutdown server-id port)
  (let ((tdbdat (tasks:open-db)))
    ;;(BB> "http-transport:server-shutdown called")
    (debug:print-info 0 *default-log-port* "Starting to shutdown the server. pid="(current-process-id))
    ;;
    ;; start_shutdown
    ;;
    ;; (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "shutting-down")







|
<





|
>







>
|
>
















|

|







395
396
397
398
399
400
401
402

403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
	  (loop (+ count 1) 'running bad-sync-count (current-milliseconds)))
      
      ;; Check that iface and port have not changed (can happen if server port collides)
      (mutex-lock! *heartbeat-mutex*)
      (set! sdat *server-info*)
      (mutex-unlock! *heartbeat-mutex*)
      
      (if (not (equal? sdat (list iface port)))

	  (let ((new-iface (car sdat))
		(new-port  (cadr sdat)))
	    (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info")
	    (set! iface new-iface)
	    (set! port  new-port)
	    (debug:print 0 *default-log-port* "SERVER STARTED: " iface ":" port " AT " (current-seconds))
	    (flush-output *default-log-port*)))
      
      ;; Transfer *db-last-access* to last-access to use in checking that we are still alive
      (mutex-lock! *heartbeat-mutex*)
      (set! last-access *db-last-access*)
      (mutex-unlock! *heartbeat-mutex*)
      
      (if (common:low-noise-print 120 (conc "server running on " iface ":" port))
	  (begin
	    (debug:print 0 *default-log-port* "SERVER STARTED: " iface ":" port " AT " (current-seconds))
	    (flush-output *default-log-port*)))

      (let* ((hrs-since-start  (/ (- (current-seconds) server-start-time) 3600))
	     (adjusted-timeout (if (> hrs-since-start 1)
				   (- server-timeout (inexact->exact (round (* hrs-since-start 60))))  ;; subtract 60 seconds per hour
				   server-timeout)))
	(if (common:low-noise-print 120 "server timeout")
	    (debug:print-info 0 *default-log-port* "Adjusted server timeout: " adjusted-timeout))
	(cond
         ((and *server-run*
		 (> (+ last-access server-timeout)
		    (current-seconds)))
          (if (common:low-noise-print 120 "server continuing")
              (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)))
          (loop 0 server-state bad-sync-count (current-milliseconds)))
         (else
          (debug:print-info 0 *default-log-port* "Server timeed out. seconds since last db access: " (- (current-seconds) last-access))
          (http-transport:server-shutdown port)))))))

(define (http-transport:server-shutdown port)
  (let ((tdbdat (tasks:open-db)))
    ;;(BB> "http-transport:server-shutdown called")
    (debug:print-info 0 *default-log-port* "Starting to shutdown the server. pid="(current-process-id))
    ;;
    ;; start_shutdown
    ;;
    ;; (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "shutting-down")
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
    ;; 			  (/ *total-non-write-delay* 
    ;; 			     *number-non-write-queries*))
    ;; 		      " ms")
    
    (db:print-current-query-stats)
    
    (debug:print-info 0 *default-log-port* "Server shutdown complete. Exiting")
    ;; (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " http-transport:keep-running complete")
    ;; if the .server file contained :myport then we can remove it
    ;; (server:remove-dotserver-file *toppath* port)
    ;;(BB> "http-transport:server-shutdown -> exit")
    (exit)))

;; all routes though here end in exit ...
;;
;; start_server? 
;;
(define (http-transport:launch)







<
<
<
<







462
463
464
465
466
467
468




469
470
471
472
473
474
475
    ;; 			  (/ *total-non-write-delay* 
    ;; 			     *number-non-write-queries*))
    ;; 		      " ms")
    
    (db:print-current-query-stats)
    
    (debug:print-info 0 *default-log-port* "Server shutdown complete. Exiting")




    (exit)))

;; all routes though here end in exit ...
;;
;; start_server? 
;;
(define (http-transport:launch)

Modified server.scm from [d398b938c6] to [e489317c03].

45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
;;

;; all routes though here end in exit ...
;;
;; start_server
;;
(define (server:launch run-id transport-type)
  ;;(BB> "server:launch fired for run-id="run-id" transport-type="transport-type)

  (let ((attempt-in-progress (server:start-attempted? *toppath*))) ; check for .server-starting
    (when attempt-in-progress
      (debug:print-info 0 *default-log-port* "Server start attempt in progress in other process (=> "attempt-in-progress"<=).  Aborting server launch attempt in this process ("(current-process-id)")")
      (exit)))
      
  (let ((dotserver-url (server:check-if-running *toppath*))) ;; check for .server  
    (when dotserver-url
      (debug:print-info 0 *default-log-port* "Server already running (=> "dotserver-url"<=).  Aborting server launch attempt in this process ("(current-process-id)")")
      (exit)
      ))
  
  (case transport-type
    ((http)(http-transport:launch))
    ;;((nmsg)(nmsg-transport:launch run-id))
    ((rpc)  (rpc-transport:launch run-id))
    (else (debug:print-error 0 *default-log-port* "unknown server type " transport-type)))

  ;; is this a good place to print server exit stats?
  (debug:print 0 "SERVER: max parallel api requests: " *max-api-process-requests*)
  
  )
;;       (else   (debug:print-error 0 *default-log-port* "No known transport set, transport=" transport ", using rpc")
;; 	      (rpc-transport:launch run-id)))))

;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;; Get the transport
(define (server:get-transport)







<
<
<
<
<
<
<
<
<
<
<
<
<




|
<
<
<
<
<
<
<







45
46
47
48
49
50
51













52
53
54
55
56







57
58
59
60
61
62
63
;;

;; all routes though here end in exit ...
;;
;; start_server
;;
(define (server:launch run-id transport-type)













  (case transport-type
    ((http)(http-transport:launch))
    ;;((nmsg)(nmsg-transport:launch run-id))
    ((rpc)  (rpc-transport:launch run-id))
    (else (debug:print-error 0 *default-log-port* "unknown server type " transport-type))))








;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;; Get the transport
(define (server:get-transport)
129
130
131
132
133
134
135
136
137
138

139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185



186
187
188

189
190
191
192
193
194
195
196
197
198

199
200
201
202
203



























204
205
206
207
208
209
210
	 (curr-ip     (server:get-best-guess-address curr-host))
	 (curr-pid    (current-process-id))
	 (homehost    (common:get-homehost)) ;; configf:lookup *configdat* "server" "homehost" ))
	 (target-host (car homehost))
	 (testsuite   (common:get-testsuite-name))
	 (logfile     (conc areapath "/logs/server-" curr-pid "-" target-host ".log"))
	 (cmdln (conc (common:get-megatest-exe)
		      " -server " (or target-host "-") " -run-id " 0 (if (equal? (configf:lookup *configdat* "server" "daemonize") "yes")
									      (conc " -daemonize -log " logfile)
									      "")

		      " -m testsuite:" testsuite)) ;; (conc " >> " logfile " 2>&1 &")))))
	 (log-rotate  (make-thread common:rotate-logs  "server run, rotate logs thread")))
    ;; we want the remote server to start in *toppath* so push there
    (push-directory areapath)
    (cond
     ;; (attempt-in-progress
     ;;  (debug:print 0 *default-log-port* "INFO: Not trying to start server because attempt is in progress: "attempt-in-progress))
     ;; (dot-server-url
     ;;        (debug:print 0 *default-log-port* "INFO: Not trying to start server because one is already running : "dot-server-url))
     (else
      (debug:print 0 *default-log-port* "INFO: Trying to start server (" cmdln ") ...")
      (thread-start! log-rotate)

      ;; host.domain.tld match host?
      (if (and target-host 
               ;; look at target host, is it host.domain.tld or ip address and does it 
               ;; match current ip or hostname
               (not (string-match (conc "("curr-host "|" curr-host"\\..*)") target-host))
               (not (equal? curr-ip target-host)))
          (begin
            (debug:print-info 0 *default-log-port* "Starting server on " target-host ", logfile is " logfile)
            (setenv "TARGETHOST" target-host)))
      
      (setenv "TARGETHOST_LOGF" logfile)
      (common:wait-for-normalized-load 4 " delaying server start due to load" remote-host: (get-environment-variable "TARGETHOST")) ;; do not try starting servers on an already overloaded machine, just wait forever
      (system (conc "nbfake " cmdln))
      (unsetenv "TARGETHOST_LOGF")
      (if (get-environment-variable "TARGETHOST")(unsetenv "TARGETHOST"))
      (thread-join! log-rotate)
      (pop-directory)))))

;; given a path to a server log return: host port startseconds
;;
(define (server:logf-get-start-info logf)
  (let ((rx (regexp "^SERVER STARTED: (\\S+):(\\d+) AT (\\d+)"))) ;; SERVER STARTED: host:port AT timesecs
    (with-input-from-file
	logf
      (lambda ()
	(let loop ((inl  (read-line))
		   (lnum 0))
	  (if (not (eof-object? inl))
	      (let ((mlst (string-match rx inl)))
		(if (not mlst)
		    (if (< lnum 500) ;; give up if more than 500 lines of server log read
			(loop (read-line)(+ lnum 1))
			(list #f #f #f))
		    (cdr mlst)))



	      (list #f #f #F)))))))

;; get a list of servers with all relevant data

;;
(define (server:get-list areapath)
  (if (directory-exists? areapath)
      (let ((server-logs (glob (conc areapath "/logs/server-*.log"))))
	(if (null? server-logs)
	    '()
	    (let loop ((hed  (car server-logs))
		       (tal  (cdr server-logs))
		       (res '()))
	      (let* ((mod-time (file-modification-time hed))

		     (serv-rec (cons mod-time (server:logf-get-start-info hed)))
		     (new-res  (cons res serv-rec)))
		(if (null? tal)
		    new-res
		    (loop (car tal)(cdr tal) new-res))))))))




























(define (server:get-client-signature) ;; BB> why is this proc named "get-"?  it returns nothing -- set! has not return value.
  (if *my-client-signature* *my-client-signature*
      (let ((sig (server:mk-signature)))
        (set! *my-client-signature* sig)
        *my-client-signature*)))








|
|
|
>




<
<
<
<
<
<
|
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|




|











|
>
>
>
|


>










>
|
|



>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







109
110
111
112
113
114
115
116
117
118
119
120
121
122
123






124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
	 (curr-ip     (server:get-best-guess-address curr-host))
	 (curr-pid    (current-process-id))
	 (homehost    (common:get-homehost)) ;; configf:lookup *configdat* "server" "homehost" ))
	 (target-host (car homehost))
	 (testsuite   (common:get-testsuite-name))
	 (logfile     (conc areapath "/logs/server-" curr-pid "-" target-host ".log"))
	 (cmdln (conc (common:get-megatest-exe)
		      " -server " (or target-host "-") (if (equal? (configf:lookup *configdat* "server" "daemonize") "yes")
							   " -daemonize "
							   "")
		      " -log " logfile
		      " -m testsuite:" testsuite)) ;; (conc " >> " logfile " 2>&1 &")))))
	 (log-rotate  (make-thread common:rotate-logs  "server run, rotate logs thread")))
    ;; we want the remote server to start in *toppath* so push there
    (push-directory areapath)






    (debug:print 0 *default-log-port* "INFO: Trying to start server (" cmdln ") ...")
    (thread-start! log-rotate)
    
    ;; host.domain.tld match host?
    (if (and target-host 
	     ;; look at target host, is it host.domain.tld or ip address and does it 
	     ;; match current ip or hostname
	     (not (string-match (conc "("curr-host "|" curr-host"\\..*)") target-host))
	     (not (equal? curr-ip target-host)))
	(begin
	  (debug:print-info 0 *default-log-port* "Starting server on " target-host ", logfile is " logfile)
	  (setenv "TARGETHOST" target-host)))
      
    (setenv "TARGETHOST_LOGF" "server.log") ;; logfile)
    (common:wait-for-normalized-load 4 " delaying server start due to load" remote-host: (get-environment-variable "TARGETHOST")) ;; do not try starting servers on an already overloaded machine, just wait forever
    (system (conc "nbfake " cmdln))
    (unsetenv "TARGETHOST_LOGF")
    (if (get-environment-variable "TARGETHOST")(unsetenv "TARGETHOST"))
    (thread-join! log-rotate)
    (pop-directory)))

;; given a path to a server log return: host port startseconds
;;
(define (server:logf-get-start-info logf)
  (let ((rx (regexp "^SERVER STARTED: (\\S+):(\\d+) AT ([\\d\\.]+)"))) ;; SERVER STARTED: host:port AT timesecs
    (with-input-from-file
	logf
      (lambda ()
	(let loop ((inl  (read-line))
		   (lnum 0))
	  (if (not (eof-object? inl))
	      (let ((mlst (string-match rx inl)))
		(if (not mlst)
		    (if (< lnum 500) ;; give up if more than 500 lines of server log read
			(loop (read-line)(+ lnum 1))
			(list #f #f #f))
		    (let ((dat  (cdr mlst)))
		      (list (car dat) ;; host
			    (string->number (cadr dat)) ;; port
			    (string->number (caddr dat))))))
	      (list #f #f #f)))))))

;; get a list of servers with all relevant data
;; ( mod-time host port start-time )
;;
(define (server:get-list areapath)
  (if (directory-exists? areapath)
      (let ((server-logs (glob (conc areapath "/logs/server-*.log"))))
	(if (null? server-logs)
	    '()
	    (let loop ((hed  (car server-logs))
		       (tal  (cdr server-logs))
		       (res '()))
	      (let* ((mod-time (file-modification-time hed))
		     (serv-dat (server:logf-get-start-info hed))
		     (serv-rec (cons mod-time serv-dat))
		     (new-res  (cons serv-rec res)))
		(if (null? tal)
		    new-res
		    (loop (car tal)(cdr tal) new-res))))))))

;; given a list of servers get a list of valid servers, i.e. at least
;; 10 seconds old, has started and is less than 1 hour old and is
;; active (i.e. mod-time < 10 seconds
;;
;; mod-time host port start-time
;;
;; sort by start-time descending. I.e. get the oldest first. Young servers will thus drop off
;; and servers should stick around for about two hours or so.
;;
(define (server:get-best srvlst)
  (let ((now (current-seconds)))
    (sort
     (filter (lambda (rec)
	       (let ((start-time (list-ref rec 3))
		     (mod-time   (list-ref rec 0)))
		 (print "start-time: " start-time " mod-time: " mod-time)
		 (and start-time mod-time
		      (> (- now start-time) 1)    ;; been running at least 1 seconds
		      (< (- now mod-time)   10)   ;; still alive - file touched in last 10 seconds
		      (< (- now start-time) 3600) ;; under one hour running time
		      )))
	     srvlst)
     (lambda (a b)
       (< (list-ref a 3)
	  (list-ref b 3))))))
		

(define (server:get-client-signature) ;; BB> why is this proc named "get-"?  it returns nothing -- set! has not return value.
  (if *my-client-signature* *my-client-signature*
      (let ((sig (server:mk-signature)))
        (set! *my-client-signature* sig)
        *my-client-signature*)))

Modified tasks.scm from [af79560081] to [6f2c907335].

168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
(define (tasks:hostinfo-get-interface   vec)    (vector-ref  vec 1))
(define (tasks:hostinfo-get-port        vec)    (vector-ref  vec 2))
(define (tasks:hostinfo-get-pubport     vec)    (vector-ref  vec 3))
(define (tasks:hostinfo-get-transport   vec)    (vector-ref  vec 4))
(define (tasks:hostinfo-get-pid         vec)    (vector-ref  vec 5))
(define (tasks:hostinfo-get-hostname    vec)    (vector-ref  vec 6))

(define (tasks:server-lock-slot mdb run-id)
  (tasks:server-clean-out-old-records-for-run-id mdb run-id " tasks:server-lock-slot")
  (if (< (tasks:num-in-available-state mdb run-id) 4)
      (begin 
	(tasks:server-set-available mdb run-id)
	(thread-sleep! (/ (random 1500) 1000)) ;; (thread-sleep! 2) ;; Try removing this. It may not be needed.
	(tasks:server-am-i-the-server? mdb run-id))
      #f))
	
;; register that this server may come online (first to register goes though with the process)
(define (tasks:server-set-available mdb run-id)
  (sqlite3:execute 
   mdb 
   "INSERT INTO servers (pid,hostname,port,pubport,start_time,      priority,state,mt_version,heartbeat,   interface,transport,run_id)
                   VALUES(?, ?,       ?,   ?, strftime('%s','now'), ?,       ?,    ?,-1,?,        ?,        ?);"
   (current-process-id)          ;; pid
   (get-host-name)               ;; hostname
   -1                            ;; port
   -1                            ;; pubport
   (random 1000)                 ;; priority (used a tiebreaker on get-available)
   "available"                   ;; state
   (common:version-signature)    ;; mt_version
   -1                            ;; interface
   ;; (conc (server:get-transport)) ;; transport
   (conc *transport-type*)    ;; transport
   run-id
   ))

(define (tasks:num-in-available-state mdb run-id)
  (let ((res 0))
    (sqlite3:for-each-row
     (lambda (num-in-queue)
       (set! res num-in-queue))
     mdb
     "SELECT count(id) FROM servers WHERE run_id=? AND state = 'available' AND (strftime('%s','now') - start_time) < 30 ;"
     run-id)
    res))

(define (tasks:num-servers-non-zero-running mdb)
  (let ((res 0))
    (sqlite3:for-each-row
     (lambda (num-running)
       (set! res num-running))
     mdb
     "SELECT count(id) FROM servers WHERE run_id != 0 AND state = 'running';")
    res))

(define (tasks:server-clean-out-old-records-for-run-id mdb run-id tag)
  (sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE state in ('available','dbprep','shutting-down') AND (strftime('%s','now') - start_time) > 50 AND run_id=?;"
		   (conc "defunct" tag) run-id))

(define (tasks:server-force-clean-running-records-for-run-id mdb run-id tag)
  (sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE state = 'running' AND run_id=?;"
		   (conc "defunct" tag) run-id))

(define (tasks:server-force-clean-run-record mdb run-id iface port tag)
  (sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE state = 'running' AND run_id=? AND interface=? AND port=?;"
		   (conc "defunct" tag) run-id iface port))


;; BB> adding missing func for --list-servers
(define (tasks:server-deregister mdb hostname #!key (pullport #f) (pid #f) (action #f)) ;;pullport pid: pid action: 'delete))
  (if (eq? action 'delete)
      (sqlite3:execute mdb "DELETE FROM servers WHERE pid=? AND port=? AND hostname=?;" pid pullport hostname)
      (sqlite3:execute mdb "UPDATE servers SET state='defunct', heartbeat=strftime('%s','now') WHERE hostname=? AND pid=?;"
                       hostname pid)))

(define (tasks:server-delete-records-for-this-pid mdb tag)
  (sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE hostname=? AND pid=?;"
		   (conc "defunct" tag) (get-host-name) (current-process-id)))

(define (tasks:server-delete-record mdb server-id tag) 
  (sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE id=?;"
		   (conc "defunct" tag) server-id)
  ;; use this opportuntity to clean out records over one month old or over 10 minutes old with port = -1 (i.e. a never used placeholder)
  (sqlite3:execute mdb "DELETE FROM servers WHERE state not in ('running','shutting-down','dbprep') AND (strftime('%s','now') - start_time) > 2628000;")
  (sqlite3:execute mdb "DELETE FROM servers WHERE state like 'defunct%' AND port=-1 AND (strftime('%s','now') - start_time) > 600;")
  )

(define (tasks:server-set-state! mdb server-id state)
  (sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE id=?;" state server-id))

(define (tasks:server-set-interface-port mdb server-id interface port)
  (sqlite3:execute mdb "UPDATE servers SET interface=?,port=?,heartbeat=strftime('%s','now') WHERE id=?;" interface port server-id))

;; Get random port not used in long time
;;
(define (tasks:server-get-next-port mdb)
  (let* ((lownum        30000)
	(highnum        64000)
	(used-ports     '())
	(get-rand-port  (lambda ()
			  (+ lownum (random (- highnum lownum)))))
	(port-param     (if (and (args:get-arg "-port")
				 (string->number (args:get-arg "-port")))
			    (string->number (args:get-arg "-port"))
			    #f))
	;; (config-port    (if (and (config-lookup  *configdat* "server" "port")
	;; 			 (string->number (config-lookup  *configdat* "server" "port")))
	;; 		    (string->number (config-lookup  *configdat* "server" "port"))
	;; 		    #f))
	)
    (sqlite3:for-each-row
     (lambda (port)
       (set! used-ports (cons port used-ports)))
     mdb
     "SELECT port FROM servers;")
    (cond
     ((and port-param res)   (if (> res port-param) res port-param))
     (port-param             port-param)
     ;; ((and config-port res)  (if (> res config-port) res config-port))
     ;; (config-port            config-port)
     (else
      (let loop ((port     (get-rand-port))
		 (remtries 100))
	(if (member port used-ports)
	    (if (> remtries 0)
		(loop (get-rand-port)(- remtries 1))
		(get-rand-port))
	    port))))))

(define (tasks:server-am-i-the-server? mdb run-id)
  (let* ((all    (tasks:server-get-servers-vying-for-run-id mdb run-id))
	 (first  (if (null? all)
		     #f;; (begin (debug:print-error 0 *default-log-port* "no servers listed, should be at least one by now.") 
		       ;;      (sqlite3:finalize! mdb)
		       ;;      (exit 1))
		     (car (db:get-rows all)))))
    (if first
	(let* ((header   (db:get-header all))
	       (id       (db:get-value-by-header first header "id"))
	       (hostname (db:get-value-by-header first header "hostname"))
	       (pid      (db:get-value-by-header first header "pid"))
	       (priority (db:get-value-by-header first header "priority")))
	  ;; (debug:print 0 *default-log-port* "INFO: am-i-the-server got record " first)
	  ;; for now a basic check. add tiebreaking by priority later
	  (if (and (equal? hostname (get-host-name))
		   (equal? pid      (current-process-id)))
	      id
	      #f))
	#f)))
	     
;; Use: (db:get-value-by-header (car (db:get-rows dat)) (db:get-header dat) "fieldname")
;;  to extract info from the structure returned
;;
(define (tasks:server-get-servers-vying-for-run-id mdb run-id)
   (let* ((header (list "id" "hostname" "pid" "interface" "port" "pubport" "state" "run_id" "priority" "start_time"))
	  (selstr (string-intersperse header ","))
	  (res    '()))
    (sqlite3:for-each-row
     (lambda (a . b)
       (set! res (cons (apply vector a b) res)))
     mdb
     (conc "SELECT " selstr " FROM servers WHERE state in ('available','running','dbprep') ORDER BY start_time DESC;")
     )
    (vector header res)))

(define (tasks:get-server mdb run-id #!key (retries 10))
  (let ((res  #f)
	(best #f))
    (handle-exceptions
     exn
     (begin
       (print-call-chain (current-error-port))
       (debug:print 0 *default-log-port* "WARNING: tasks:get-server db access error.")
       (debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
       (debug:print 0 *default-log-port* " for run " run-id)
       (print-call-chain (current-error-port))
       (if (> retries 0)
	   (begin
	     (debug:print 0 *default-log-port* " trying call to tasks:get-server again in 10 seconds")
	     (thread-sleep! 10)
	     (tasks:get-server mdb run-id retries: (- retries 0)))
	   (debug:print 0 *default-log-port* "10 tries of tasks:get-server all crashed and burned. Giving up and returning \"no server found\"")))
     (sqlite3:for-each-row
      (lambda (id interface port pubport transport pid hostname)
	(set! res (vector id interface port pubport transport pid hostname)))
      mdb
      ;; removed:
      ;; strftime('%s','now')-heartbeat < 10 AND mt_version = ?
      "SELECT id,interface,port,pubport,transport,pid,hostname FROM servers
          WHERE run_id=? AND state='running'
          ORDER BY start_time DESC LIMIT 1;" run-id) ;; (common:version-signature) run-id)
     res)))

(define (tasks:server-running-or-starting? mdb run-id)
  (let ((res #f))
    (sqlite3:for-each-row
     (lambda (id)
       (set! res id))
     mdb ;; NEEDS dbprep ADDED
     "SELECT id FROM servers WHERE run_id=? AND (state = 'running' OR (state = 'dbprep' AND  (strftime('%s','now') - start_time) < 60));" run-id)
    res))

(define (tasks:server-running? mdb run-id)
  (let ((res #f))
    (sqlite3:for-each-row
     (lambda (id)
       (set! res id))
     mdb ;; NEEDS dbprep ADDED
     "SELECT id FROM servers WHERE run_id=? AND state = 'running';" run-id)
    res))

(define (tasks:need-server run-id)
  (equal? (configf:lookup *configdat* "server" "required") "yes"))

;; 	(maxqry (cdr (rmt:get-max-query-average run-id)))
;; 	(threshold   (string->number (or (configf:lookup *configdat* "server" "server-query-threshold") "10"))))
;;     (cond
;;      (forced 
;;       (if (common:low-noise-print 60 run-id "server required is set")
;; 	  (debug:print-info 0 *default-log-port* "Server required is set, starting server for run-id " run-id "."))
;;       #t)
;;      ((> maxqry threshold)
;;       (if (common:low-noise-print 60 run-id "Max query time execeeded")
;; 	  (debug:print-info 0 *default-log-port* "Max avg query time of " maxqry "ms exceeds limit of " threshold "ms, server needed for run-id " run-id "."))
;;       #t)
;;      (else
;;       #f))))

;; try to start a server and wait for it to be available
;;
(define (tasks:start-and-wait-for-server tdbdat run-id delay-max-tries)
  ;; ensure a server is running for this run
  (let loop ((server-dat (tasks:get-server (db:delay-if-busy tdbdat) run-id))
	     (delay-time 0))
      (if (and (not server-dat)
	       (< delay-time delay-max-tries))
	  (begin
	    (if (common:low-noise-print 60 "tasks:start-and-wait-for-server" run-id)
		(debug:print 0 *default-log-port* "Try starting server for run-id " run-id))
	    (thread-sleep! (/ (random 2000) 1000))
	    (server:kind-run *toppath*)
	    (thread-sleep! (min delay-time 1))
            (if (not (or (server:start-attempted? *toppath*)
                         (server:read-dotserver *toppath*))) ;; no point in trying
                (loop (tasks:get-server (db:delay-if-busy tdbdat) run-id)(+ delay-time 1))
                #f))
          #f)))

(define (tasks:get-all-servers mdb)
  (let ((res '()))
    (sqlite3:for-each-row
     (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id)
       ;;                       0  1     2         3      4     5          6        7     8          9          10        11     12
       (set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id) res)))
     mdb
     "SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport,run_id 
        FROM servers WHERE state NOT LIKE 'defunct%' ORDER BY start_time DESC;")
    res))

(define (tasks:get-server-by-id mdb id)
  (let ((res #f))
    (sqlite3:for-each-row
     (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id)
       ;;                       0  1     2         3      4     5          6        7     8          9          10        11     12
       (set! res (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id)))
     mdb
     "SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport,run_id 
        FROM servers WHERE id=?;"
     id)
    res))

(define (tasks:get-server-records mdb run-id)
  (let ((res '()))
    (sqlite3:for-each-row
     (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id)
       ;;                       0  1     2         3      4     5          6        7     8          9          10        11     12
       (set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id) res)))
     mdb
     "SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport,run_id 
        FROM servers WHERE run_id=? AND state NOT LIKE 'defunct%' ORDER BY start_time DESC;"
     run-id)
    (reverse res)))

;; no elegance here ...
;;
(define (tasks:kill-server hostname pid #!key (kill-switch ""))
  (server:remove-dotserver-file *toppath* ".*")
  (debug:print-info 0 *default-log-port* "Attempting to kill server process " pid " on host " hostname)
  (setenv "TARGETHOST" hostname)
  (setenv "TARGETHOST_LOGF" "server-kills.log")
  (system (conc "nbfake kill "kill-switch" "pid))

  (unsetenv "TARGETHOST_LOGF")
  (unsetenv "TARGETHOST"))
 
;; look up a server by run-id and send it a kill, also delete the record for that server
;;
(define (tasks:kill-server-run-id run-id #!key (tag "default"))
  (let* ((tdbdat  (tasks:open-db))
	 (sdat    (tasks:get-server (db:delay-if-busy tdbdat) run-id)))
    (if sdat
	(let ((hostname (vector-ref sdat 6))
	      (pid      (vector-ref sdat 5))
	      (server-id (vector-ref sdat 0)))
	  (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "killed")
	  (debug:print-info 0 *default-log-port* "Killing server " server-id " for run-id " run-id " on host " hostname " with pid " pid)
          (server:remove-dotserver-file *toppath* ".*")
	  (tasks:kill-server hostname pid)
	  (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id tag) )
	(debug:print-info 0 *default-log-port* "No server found for run-id " run-id ", nothing to kill"))
    ;; (sqlite3:finalize! tdb)
    ))
    
;;======================================================================
;; M O N I T O R S
;;======================================================================

(define (tasks:remove-monitor-record mdb)
  (sqlite3:execute mdb "DELETE FROM monitors WHERE pid=? AND hostname=?;"
		   (current-process-id)







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<



<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<



<








<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







168
169
170
171
172
173
174











































































































































































































175
176
177





































































178
179
180

181
182
183
184
185
186
187
188


















189
190
191
192
193
194
195
(define (tasks:hostinfo-get-interface   vec)    (vector-ref  vec 1))
(define (tasks:hostinfo-get-port        vec)    (vector-ref  vec 2))
(define (tasks:hostinfo-get-pubport     vec)    (vector-ref  vec 3))
(define (tasks:hostinfo-get-transport   vec)    (vector-ref  vec 4))
(define (tasks:hostinfo-get-pid         vec)    (vector-ref  vec 5))
(define (tasks:hostinfo-get-hostname    vec)    (vector-ref  vec 6))












































































































































































































(define (tasks:need-server run-id)
  (equal? (configf:lookup *configdat* "server" "required") "yes"))






































































;; no elegance here ...
;;
(define (tasks:kill-server hostname pid #!key (kill-switch ""))

  (debug:print-info 0 *default-log-port* "Attempting to kill server process " pid " on host " hostname)
  (setenv "TARGETHOST" hostname)
  (setenv "TARGETHOST_LOGF" "server-kills.log")
  (system (conc "nbfake kill "kill-switch" "pid))

  (unsetenv "TARGETHOST_LOGF")
  (unsetenv "TARGETHOST"))
 


















;;======================================================================
;; M O N I T O R S
;;======================================================================

(define (tasks:remove-monitor-record mdb)
  (sqlite3:execute mdb "DELETE FROM monitors WHERE pid=? AND hostname=?;"
		   (current-process-id)