Megatest

Diff
Login

Differences From Artifact [4adf87fded]:

To Artifact [d74494eba8]:


19
20
21
22
23
24
25
26
27
28
29








30
31

32
33
34
35
36
37
38
19
20
21
22
23
24
25




26
27
28
29
30
31
32
33
34

35
36
37
38
39
40
41
42







-
-
-
-
+
+
+
+
+
+
+
+

-
+







(include "task_records.scm")

;;======================================================================
;; Tasks db
;;======================================================================

(define (tasks:open-db)
  (let* ((dbpath  (conc *toppath* "/monitor.db"))
	 (exists  (file-exists? dbpath))
	 (mdb     (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath))
	 (handler (make-busy-timeout 36000)))
  (let* ((dbpath       (conc *toppath* "/monitor.db"))
	 (exists       (file-exists? dbpath))
	 (write-access (file-write-access? dbpath))
	 (mdb          (sqlite3:open-database dbpath)) ;; (never-give-up-open-db dbpath))
	 (handler      (make-busy-timeout 36000)))
    (if (and exists
	     (not write-access))
	(set! *db-write-access* write-access)) ;; only unset so other db's also can use this control
    (sqlite3:set-busy-handler! mdb handler)
    (sqlite3:execute mdb (conc "PRAGMA synchronous = 1;"))
    (sqlite3:execute mdb (conc "PRAGMA synchronous = 0;"))
    (if (not exists)
	(begin
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS tasks_queue (id INTEGER PRIMARY KEY,
                                action TEXT DEFAULT '',
                                owner TEXT,
                                state TEXT DEFAULT 'new',
                                target TEXT DEFAULT '',
91
92
93
94
95
96
97
98




99
100
101
102
103
104
105
106
107
108

109

110
111
112
113
114
115
116
117
118









119
120
121
122
123
124
125
95
96
97
98
99
100
101

102
103
104
105
106
107
108
109
110
111
112
113
114

115
116
117









118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133







-
+
+
+
+









-
+

+
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+







;; state: 'live, 'shutting-down, 'dead
(define (tasks:server-register mdb pid interface port priority state transport #!key (pubport -1))
  (debug:print-info 11 "tasks:server-register " pid " " interface " " port " " priority " " state)
  (sqlite3:execute 
   mdb 
   "INSERT OR REPLACE INTO servers (pid,hostname,port,pubport,start_time,priority,state,mt_version,heartbeat,interface,transport)
                             VALUES(?,  ?,       ?,   ?,  strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?,?);"
   pid (get-host-name) port pubport priority (conc state) megatest-version interface (conc transport))
   pid (get-host-name) port pubport priority (conc state) 
   (common:version-signature)
   interface 
   (conc transport))
  (vector 
   (tasks:server-get-server-id mdb (get-host-name) interface port pid)
   interface
   port
   pubport
   transport
   ))

;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used!
(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)(action 'markdead))
(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)(action 'delete))
  (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid)
  (if *db-write-access*
  (if pid
      (case action
	((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid))
	(else    (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid)))
      (if port
	  (case action
	    ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE  hostname=? AND port=?;" hostname port))
	    (else    (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND port=?;" hostname port)))
	  (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified"))))
      (if pid
	  (case action
	    ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid))
	    (else    (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid)))
	  (if port
	      (case action
	    ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE (interface=? or hostname=?) AND port=?;" hostname hostname port))
	    (else    (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE (interface=? or hostname=?) AND port=?;" hostname hostname port)))
	      (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified")))))

(define (tasks:server-deregister-self mdb hostname)
  (tasks:server-deregister mdb hostname pid: (current-process-id)))

;; need a simple call for robustly removing records given host and port
(define (tasks:server-delete mdb hostname port)
  (tasks:server-deregister mdb hostname port: port action: 'delete))
139
140
141
142
143
144
145
146
147








148
149
150
151
152
153
154
147
148
149
150
151
152
153


154
155
156
157
158
159
160
161
162
163
164
165
166
167
168







-
-
+
+
+
+
+
+
+
+







       (begin
	 (debug:print 0 "ERROR: tasks:server-get-server-id needs (hostname and pid) OR (iface and port) OR (hostname and port)")
	 "SELECT id FROM servers WHERE pid=-999;")))
     (if hostname hostname iface)(if pid pid port))
    res))

(define (tasks:server-update-heartbeat mdb server-id)
  (debug:print-info 0 "Heart beat update of server id=" server-id)
  (sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id))
  (debug:print-info 1 "Heart beat update of server id=" server-id)
  (handle-exceptions
   exn
   (begin
     (debug:print 0 "WARNING: probable timeout on monitor.db access")
     (thread-sleep! 1)
     (tasks:server-update-heartbeat mdb server-id))
   (sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id)))

;; alive servers keep the heartbeat field upto date with seconds every 6 or so seconds
(define (tasks:server-alive? mdb server-id #!key (iface #f)(hostname #f)(port #f)(pid #f))
  (let* ((server-id  (if server-id 
			 server-id
			 (tasks:server-get-server-id mdb hostname iface port pid)))
	 (heartbeat-delta 99e9))
193
194
195
196
197
198
199
200

201
202
203
204
205
206
207
207
208
209
210
211
212
213

214
215
216
217
218
219
220
221







-
+







       (set! res (cons (vector id interface port pubport transport pid hostname) res))
       ;;(debug:print-info 2 "Found existing server " hostname ":" port " registered in db"))
       )
     mdb
     
     "SELECT id,interface,port,pubport,transport,pid,hostname FROM servers
          WHERE strftime('%s','now')-heartbeat < 10 
          AND mt_version=? ORDER BY start_time DESC LIMIT 1;" megatest-version)
          AND mt_version=? ORDER BY start_time DESC LIMIT 1;" (common:version-signature))
    ;; for now we are keeping only one server registered in the db, return #f or first server found
    (if (null? res) #f (car res))))

;; BUG: This logic is probably needed unless methodology changes completely...
;;
;;     (if (null? res) #f
;; 	(let loop ((hed (car res))
248
249
250
251
252
253
254
255
256
257
258
259
260
261







262
263
264
265
266
267
268
262
263
264
265
266
267
268







269
270
271
272
273
274
275
276
277
278
279
280
281
282







-
-
-
-
-
-
-
+
+
+
+
+
+
+







			       "  EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
	     (debug:print 1 "Sending signal/term to " pid " on " hostname)
	     (process-signal pid signal/term)
	     (thread-sleep! 5) ;; give it five seconds to die peacefully then do a brutal kill
	     ;;(process-signal pid signal/kill)
	     ) ;; local machine, send sig term
	    (begin
		(debug:print-info 1 "Stopping remote servers not yet supported."))))
	;;      (debug:print-info 1 "Telling alive server on " hostname ":" port " to commit servercide")
	;;      (let ((serverdat (list hostname port)))
	;;	(case (string->symbol transport)
	;;	  ((http)(http-transport:client-connect hostname port))
	;;	  (else  (debug:print "ERROR: remote stopping servers of type " transport " not supported yet")))
	;;	(cdb:kill-server serverdat)))))    ;; remote machine, try telling server to commit suicide
	      ;;(debug:print-info 1 "Stopping remote servers not yet supported."))))
	      (debug:print-info 1 "Telling alive server on " hostname ":" port " to commit servercide")
	      (let ((serverdat (list hostname port)))
	      	(case (if (string? transport) (string->symbol transport) transport)
	      	  ((http)(http-transport:client-connect hostname port))
	      	  (else  (debug:print "ERROR: remote stopping servers of type " transport " not supported yet")))
	      	(cdb:kill-server serverdat pid)))))    ;; remote machine, try telling server to commit suicide
      (begin
	(if status 
	    (if (equal? hostname (get-host-name))
		(begin
		  (debug:print-info 1 "Sending signal/term to " pid " on " hostname)
		  (process-signal pid signal/term)  ;; local machine, send sig term
		  (thread-sleep! 5)                 ;; give it five seconds to die peacefully then do a brutal kill
529
530
531
532
533
534
535
536

537
538
539
540
541
542

543
544
545
543
544
545
546
547
548
549

550
551
552
553
554
555

556
557
558
559







-
+





-
+



		    (tasks:task-get-owner  task)
		    flags)
    (tasks:set-state mdb (tasks:task-get-id task) "waiting")))

(define (tasks:rollup-runs db mdb task)
  (let* ((flags (make-hash-table)) 
	 (keys  (db:get-keys db))
	 (keyvallst (keys:target->keyval keys (tasks:task-get-target task))))
	 (keyvals (keys:target-keyval keys (tasks:task-get-target task))))
    ;; (hash-table-set! flags "-rerun" "NOT_STARTED")
    (print "Starting rollup " task)
    ;; sillyness, just call the damn routine with the task vector and be done with it. FIXME SOMEDAY
    (runs:rollup-run db
		     keys 
		     keyvallst
		     keyvals
		     (tasks:task-get-name  task)
		     (tasks:task-get-owner  task))
    (tasks:set-state mdb (tasks:task-get-id task) "waiting")))