Megatest

Check-in [19f85b577e]
Login
Overview
Comment:Merged multi-transport to trunk, all tests passed
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 19f85b577e77e4e3a47e6ac0f8b68e7ed745a8bf
User & Date: mrwellan on 2013-01-29 10:15:56
Other Links: manifest | tags
Context
2013-01-29
10:31
Bumped version to 1.5211 check-in: c670e2d4a4 user: mrwellan tags: trunk, v1.5211
10:15
Merged multi-transport to trunk, all tests passed check-in: 19f85b577e user: mrwellan tags: trunk
00:14
Added template for transaction wrapped db writes pulled from c847 Closed-Leaf check-in: f4c05ffd2b user: matt tags: multi-transport
2013-01-27
12:22
Merged http-transport to trunk check-in: 6bba674f33 user: matt tags: trunk
Changes

Modified Makefile from [a4b44c1e54] to [1a410eaa9f].

1
2
3
4
5
6
7
8

9
10
11
12
13
14
15

PREFIX=$(PWD)
CSCOPTS= 
INSTALL=install
SRCFILES = common.scm items.scm launch.scm \
           ods.scm runconfig.scm server.scm configf.scm \
           db.scm keys.scm margs.scm megatest-version.scm \
           process.scm runs.scm tasks.scm tests.scm genexample.scm


GUISRCF  = dashboard.scm dashboard-tests.scm dashboard-guimonitor.scm dashboard-main.scm

OFILES   = $(SRCFILES:%.scm=%.o)
GOFILES  = $(GUISRCF:%.scm=%.o)

ADTLSCR=mt_laststep mt_runstep mt_ezstep







|
>







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

PREFIX=$(PWD)
CSCOPTS= 
INSTALL=install
SRCFILES = common.scm items.scm launch.scm \
           ods.scm runconfig.scm server.scm configf.scm \
           db.scm keys.scm margs.scm megatest-version.scm \
           process.scm runs.scm tasks.scm tests.scm genexample.scm \
	   fs-transport.scm zmq-transport.scm http-transport.scm

GUISRCF  = dashboard.scm dashboard-tests.scm dashboard-guimonitor.scm dashboard-main.scm

OFILES   = $(SRCFILES:%.scm=%.o)
GOFILES  = $(GUISRCF:%.scm=%.o)

ADTLSCR=mt_laststep mt_runstep mt_ezstep

Modified common.scm from [1ba863b641] to [afd3c8c16f].

38
39
40
41
42
43
44


45
46
47
48
49
50
51
(define *waiting-queue*     (make-hash-table))
(define *test-meta-updated* (make-hash-table))
(define *globalexitstatus*  0) ;; attempt to work around possible thread issues
(define *passnum*           0) ;; when running track calls to run-tests or similar

;; SERVER
(define *my-client-signature* #f)


(define *rpc:listener*      #f) ;; if set up for server communication this will hold the tcp port
(define *runremote*         #f) ;; if set up for server communication this will hold <host port>
(define *last-db-access*    (current-seconds))  ;; update when db is accessed via server
(define *max-cache-size*    0)
(define *logged-in-clients* (make-hash-table))
(define *client-non-blocking-mode* #f)
(define *server-id*         #f)







>
>







38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
(define *waiting-queue*     (make-hash-table))
(define *test-meta-updated* (make-hash-table))
(define *globalexitstatus*  0) ;; attempt to work around possible thread issues
(define *passnum*           0) ;; when running track calls to run-tests or similar

;; SERVER
(define *my-client-signature* #f)
(define *transport-type*    #f)
(define *megatest-db*       #f)
(define *rpc:listener*      #f) ;; if set up for server communication this will hold the tcp port
(define *runremote*         #f) ;; if set up for server communication this will hold <host port>
(define *last-db-access*    (current-seconds))  ;; update when db is accessed via server
(define *max-cache-size*    0)
(define *logged-in-clients* (make-hash-table))
(define *client-non-blocking-mode* #f)
(define *server-id*         #f)

Modified db.scm from [4a4c4c2fc7] to [56cbe800e8].

15
16
17
18
19
20
21



22
23
24
25
26

27
28
29
30
31
32
33

(require-extension (srfi 18) extras tcp) ;;  rpc)
;; (import (prefix rpc rpc:))

(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest base64)
(import (prefix sqlite3 sqlite3:))
(import (prefix base64 base64:))




(declare (unit db))
(declare (uses common))
(declare (uses keys))
(declare (uses ods))


(include "common_records.scm")
(include "db_records.scm")
(include "key_records.scm")
(include "run_records.scm")

;; timestamp type (val1 val2 ...)







>
>
>





>







15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

(require-extension (srfi 18) extras tcp) ;;  rpc)
;; (import (prefix rpc rpc:))

(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest base64)
(import (prefix sqlite3 sqlite3:))
(import (prefix base64 base64:))

;; Note, try to remove this dependency 
(use zmq)

(declare (unit db))
(declare (uses common))
(declare (uses keys))
(declare (uses ods))
(declare (uses fs-transport))

(include "common_records.scm")
(include "db_records.scm")
(include "key_records.scm")
(include "run_records.scm")

;; timestamp type (val1 val2 ...)
1101
1102
1103
1104
1105
1106
1107
1108




1109



1110
1111
1112
1113


1114
1115



1116
1117
1118
1119
1120


1121
1122
1123
1124
1125
1126
1127
1128
1129
1130






1131
1132
1133
1134
1135
1136
1137
1138


1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152








1153



1154



1155











1156
1157
1158
1159
1160
1161
1162

1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
;; db:updater is run in a thread to write out the cached data periodically
;; (define (db:updater)
;;   (debug:print-info 4 "Starting cache processing")
;;   (let loop ()
;;     (thread-sleep! 10) ;; move save time around to minimize regular collisions?
;;     (db:write-cached-data)
;;     (loop)))





(define (db:obj->string obj)



  (string-substitute
   (regexp "=") "_"
   (base64:base64-encode (with-output-to-string (lambda ()(serialize obj))))
   #t))



(define (db:string->obj msg)



  (with-input-from-string 
      (base64:base64-decode
       (string-substitute 
	(regexp "_") "=" msg #t))
    (lambda ()(deserialize))))



(define (cdb:use-non-blocking-mode proc)
  (set! *client-non-blocking-mode* #t)
  (let ((res (proc)))
    (set! *client-non-blocking-mode* #f)
    res))
  
;; params = 'target cached remparams
;;
;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime






;;
(define (cdb:client-call serverdat qtype immediate numretries . params)
  (debug:print-info 11 "cdb:client-call serverdat=" serverdat ", qtype=" qtype ", immediate=" immediate ", numretries=" numretries ", params=" params)
  ;; (handle-exceptions
  ;;  exn
  ;;  (begin
  ;;    (thread-sleep! 5) 
  ;;    (if (> numretries 0)(apply cdb:client-call serverdat qtype immediate (- numretries 1) params)))


   (let* ((client-sig  (server:get-client-signature))
	  (query-sig   (message-digest-string (md5-primitive) (conc qtype immediate params)))
	  (zdat        (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params))))
	  )
     (debug:print-info 11 "zdat=" zdat)
     (let* (
	  (res  #f)
	  (rawdat      (server:client-send-receive serverdat zdat))
	  (tmp         #f))
     (debug:print-info 11 "Sent " zdat ", received " rawdat)
     (set! tmp (db:string->obj rawdat))
     ;; (if (equal? query-sig (vector-ref myres 1))
     ;; (set! res
     (vector-ref tmp 2)








     ;; (loop (server:client-send-receive serverdat zdat)))))))



	  ;; (timeout (lambda ()



	  ;;            (let loop ((n numretries))











	  ;;              (thread-sleep! 15)
	  ;;              (if (not res)
	  ;;       	   (if (> numretries 0)
	  ;;       	       (begin
	  ;;       		 (debug:print 2 "WARNING: no reply to query " params ", trying resend")
	  ;;       		 (debug:print-info 11 "re-sending message")
	  ;;       		 (apply cdb:client-call serverdat qtype immediate numretries params)

	  ;;       		 (debug:print-info 11 "message re-sent")
	  ;;       		 (loop (- n 1)))
	  ;;       	       ;; (apply cdb:client-call serverdats qtype immediate (- numretries 1) params))
	  ;;       	       (begin
	  ;;       		 (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.")
	  ;;       		 (exit 5))))))))
     ;; (send-receive)
     )))
     ;; (debug:print-info 11 "Starting threads")
     ;; (let ((th1 (make-thread send-receive "send receive"))
     ;;       (th2 (make-thread timeout      "timeout")))
     ;;   (thread-start! th1)
     ;;   (thread-start! th2)
     ;;   (thread-join!  th1)
     ;;   (debug:print-info 11 "cdb:client-call returning res=" res)
     ;;   res))))
  
(define (cdb:set-verbosity serverdat val)
  (cdb:client-call serverdat 'set-verbosity #f *default-numtries* val))

(define (cdb:login serverdat keyval signature)
  (cdb:client-call serverdat 'login #t *default-numtries* keyval megatest-version signature))








|
>
>
>
>

>
>
>
|
|
|
|
>
>


>
>
>
|
|
|
|
|
>
>










>
>
>
>
>
>



|
<
|
<
|
>
>
|
|
|
<
|
|
<
|
|
|
|
<
<
|
>
>
>
>
>
>
>
>
|
>
>
>
|
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
|
|
|
|
|
|
<
>
|
|
|
|
|
|
<
<
|
|
|
|
|
|
|
|







1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158

1159

1160
1161
1162
1163
1164
1165

1166
1167

1168
1169
1170
1171


1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206

1207
1208
1209
1210
1211
1212
1213


1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
;; db:updater is run in a thread to write out the cached data periodically
;; (define (db:updater)
;;   (debug:print-info 4 "Starting cache processing")
;;   (let loop ()
;;     (thread-sleep! 10) ;; move save time around to minimize regular collisions?
;;     (db:write-cached-data)
;;     (loop)))
;; The queue is a list of vectors where the zeroth slot indicates the type of query to
;; apply and the second slot is the time of the query and the third entry is a list of 
;; values to be applied
;;
;; NOTE: Can remove the regex and base64 encoding for zmq
(define (db:obj->string obj)
  (case *transport-type*
    ((fs) obj)
	((http)
     (string-substitute
       (regexp "=") "_"
         (base64:base64-encode (with-output-to-string (lambda ()(serialize obj))))
        #t))
    ((zmq)(with-output-to-string (lambda ()(serialize obj))))
    (else obj)))

(define (db:string->obj msg)
  (case *transport-type*
   ((fs) msg)
   ((http)
    (with-input-from-string 
       (base64:base64-decode
         (string-substitute 
	   (regexp "_") "=" msg #t))
       (lambda ()(deserialize))))
   ((zmq)(with-input-from-string msg (lambda ()(deserialize))))
   (else msg)))

(define (cdb:use-non-blocking-mode proc)
  (set! *client-non-blocking-mode* #t)
  (let ((res (proc)))
    (set! *client-non-blocking-mode* #f)
    res))
  
;; params = 'target cached remparams
;;
;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime
;;
;; cdb:client-call is the unified interface to all the transports. It dispatches the
;;                 query to a server routine (e.g. server:client-send-recieve) that 
;;                 transports the data to the server where it is passed to db:process-queue-item
;;                 which either returns the data to the calling server routine or 
;;                 directly calls the returning procedure (e.g. zmq).
;;
(define (cdb:client-call serverdat qtype immediate numretries . params)
  (debug:print-info 11 "cdb:client-call serverdat=" serverdat ", qtype=" qtype ", immediate=" immediate ", numretries=" numretries ", params=" params)
  (case *transport-type* 

    ((fs)

     (let ((packet (vector "na" qtype immediate "na" params 0)))
       (fs:process-queue-item packet)))
    ((http)
     (let* ((client-sig  (server:get-client-signature))
	    (query-sig   (message-digest-string (md5-primitive) (conc qtype immediate params)))
	    (zdat        (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds))))) ;; (with-output-to-string (lambda ()(serialize params))))

       (debug:print-info 11 "zdat=" zdat)
       (let* ((res  #f)

	      (rawdat      (http-transport:client-send-receive serverdat zdat))
	      (tmp         #f))
	 (debug:print-info 11 "Sent " zdat ", received " rawdat)
	 (set! tmp (db:string->obj rawdat))


	 (vector-ref tmp 2))))
    ((zmq)
     (handle-exceptions
      exn
      (begin
	(thread-sleep! 5) 
	(if (> numretries 0)(apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params)))
      (let* ((push-socket (vector-ref zmq-sockets 0))
	     (sub-socket  (vector-ref zmq-sockets 1))
	     (client-sig  (server:get-client-signature))
	     (query-sig   (message-digest-string (md5-primitive) (conc qtype immediate params)))
	     (zdat        (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params))))
	     (res  #f)
	     (send-receive (lambda ()
			     (debug:print-info 11 "sending message")
			     (send-message push-socket zdat)
			     (debug:print-info 11 "message sent")
			     (let loop ()
			       ;; get the sender info
			       ;; this should match (server:get-client-signature)
			       ;; we will need to process "all" messages here some day
			       (receive-message* sub-socket)
			       ;; now get the actual message
			       (let ((myres (db:string->obj (receive-message* sub-socket))))
				 (if (equal? query-sig (vector-ref myres 1))
				     (set! res (vector-ref myres 2))
				     (loop))))))
	     (timeout (lambda ()
			(let loop ((n numretries))
			  (thread-sleep! 15)
			  (if (not res)
			      (if (> numretries 0)
				  (begin
				    (debug:print 2 "WARNING: no reply to query " params ", trying resend")
				    (debug:print-info 11 "re-sending message")

				    (send-message push-socket zdat)
				    (debug:print-info 11 "message re-sent")
				    (loop (- n 1)))
				  ;; (apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params))
				  (begin
				    (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.")
				    (exit 5))))))))


	(debug:print-info 11 "Starting threads")
	(let ((th1 (make-thread send-receive "send receive"))
	      (th2 (make-thread timeout      "timeout")))
	  (thread-start! th1)
	  (thread-start! th2)
	  (thread-join!  th1)
	  (debug:print-info 11 "cdb:client-call returning res=" res)
	  res))))))
  
(define (cdb:set-verbosity serverdat val)
  (cdb:client-call serverdat 'set-verbosity #f *default-numtries* val))

(define (cdb:login serverdat keyval signature)
  (cdb:client-call serverdat 'login #t *default-numtries* keyval megatest-version signature))

1268
1269
1270
1271
1272
1273
1274

























































1275
1276
1277
1278
1279
1280
1281
			       flush
			       sync
			       set-verbosity
			       killserver))

;; not used, intended to indicate to run in calling process
(define db:run-local-queries '()) ;; rollup-tests-pass-fail))


























































;; The queue is a list of vectors where the zeroth slot indicates the type of query to
;; apply and the second slot is the time of the query and the third entry is a list of 
;; values to be applied
;;
(define (db:process-queue db pubsock indata)
  (let* ((data       (sort indata (lambda (a b)







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
			       flush
			       sync
			       set-verbosity
			       killserver))

;; not used, intended to indicate to run in calling process
(define db:run-local-queries '()) ;; rollup-tests-pass-fail))

(define (db:write-cached-data)
  (open-run-close
   (lambda (db . junkparams)
     (let ((queries    (make-hash-table))
	   (data       #f))
       (mutex-lock! *incoming-mutex*)
       (set! data (sort *incoming-data* (lambda (a b)(< (vector-ref a 1)(vector-ref b 1)))))
       (set! *incoming-data* '())
       (mutex-unlock! *incoming-mutex*)
       (if (> (length data) 0)
	   (debug:print-info 4 "Writing cached data " data))
       ;; prepare the needed statements
       (for-each (lambda (request-item)
		   (let ((stmt-key (vector-ref request-item 0)))
		     (if (not (hash-table-ref/default queries stmt-key #f))
			 (let ((stmt (alist-ref stmt-key db:queries)))
			   (if stmt
			       (hash-table-set! queries stmt-key (sqlite3:prepare db (car stmt)))
			       (debug:print 0 "ERROR: Missing query spec for " stmt-key "!"))))))
		 data)
       (let outerloop ((special-qry #f)
		       (stmts       data))
	 (if special-qry
	     ;; handle a query that cannot be part of the grouped queries
	     (let* ((stmt-key (vector-ref special-qry 0))
		    (qry      (hash-table-ref queries stmt-key))
		    (params   (vector-ref speical-qry 2)))
	       (apply sqlite3:execute db qry params)
	       (if (not (null? stmts))
		   (outerloop #f stmts)))
	     ;; handle normal queries
	     (sqlite3:with-transaction 
	      db
	      (lambda ()
		(debug:print-info 11 "flushing " stmts " to db")
		(if (not (null? stmts))
		    (let innerloop ((hed (car stmts))
				    (tal (cdr stmts)))
		      (let ((params   (vector-ref hed 2))
			    (stmt-key (vector-ref hed 0)))
			(if (not (member stmt-key db:special-queries))
			    (begin
			      (debug:print-info 11 "Executing " stmt-key " for " params)
			      (apply sqlite3:execute (hash-table-ref queries stmt-key) params)
			      (if (not (null? tal))
				  (innerloop (car tal)(cdr tal))))
			    (outerloop hed tal)))))))))
       (for-each (lambda (stmt-key)
		   (sqlite3:finalize! (hash-table-ref queries stmt-key)))
		 (hash-table-keys queries))
       (let ((cache-size (length data)))
	 (if (> cache-size *max-cache-size*)
	     (set! *max-cache-size* cache-size)))
       ))
   #f))


;; The queue is a list of vectors where the zeroth slot indicates the type of query to
;; apply and the second slot is the time of the query and the third entry is a list of 
;; values to be applied
;;
(define (db:process-queue db pubsock indata)
  (let* ((data       (sort indata (lambda (a b)

Added fs-transport.scm version [d187681c70].

























































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

;; Copyright 2006-2012, Matthew Welland.
;; 
;;  This program is made available under the GNU GPL version 2.0 or
;;  greater. See the accompanying file COPYING for details.
;; 
;;  This program is distributed WITHOUT ANY WARRANTY; without even the
;;  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;;  PURPOSE.

(require-extension (srfi 18) extras tcp s11n)

(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))

(use spiffy uri-common intarweb http-client spiffy-request-vars)

(tcp-buffer-size 2048)

(declare (unit fs-transport))

(declare (uses common))
(declare (uses db))
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.

(include "common_records.scm")
(include "db_records.scm")


;;======================================================================
;; F S   T R A N S P O R T   S E R V E R
;;======================================================================

;; There is no "server" per se but a convience routine to make it non
;; necessary to be reopening the db over and over again.
;;

(define (fs:process-queue-item packet)
  (if (not *megatest-db*) ;; we will require that (setup-for-run) has already been called
      (set! *megatest-db* (open-db)))
  (debug:print-info 11 "fs:process-queue-item called with packet=" packet)
  (db:process-queue-item *megatest-db* packet))
      

Added http-transport.scm version [f097187aa7].































































































































































































































































































































































































































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287

;; Copyright 2006-2012, Matthew Welland.
;; 
;;  This program is made available under the GNU GPL version 2.0 or
;;  greater. See the accompanying file COPYING for details.
;; 
;;  This program is distributed WITHOUT ANY WARRANTY; without even the
;;  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;;  PURPOSE.

(require-extension (srfi 18) extras tcp s11n)

(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))

(use spiffy uri-common intarweb http-client spiffy-request-vars)

(tcp-buffer-size 2048)

(declare (unit http-transport))

(declare (uses common))
(declare (uses db))
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
(declare (uses server))

(include "common_records.scm")
(include "db_records.scm")

(define (http-transport:make-server-url hostport)
  (if (not hostport)
      #f
      (conc "http://" (car hostport) ":" (cadr hostport))))

(define  *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))

;;======================================================================
;; S E R V E R
;;======================================================================

;; Call this to start the actual server
;;

(define *db:process-queue-mutex* (make-mutex))

(define (http-transport:run hostn)
  (debug:print 2 "Attempting to start the server ...")
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
	    (exit))))
  (let* (;; (iface           (if (string=? "-" hostn)
	 ;;        	      #f ;; (get-host-name) 
	 ;;        	      hostn))
	 (db              #f) ;;        (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily
	 (hostname        (get-host-name))
	 (ipaddrstr       (let ((ipstr (if (string=? "-" hostn)
					   (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					   #f)))
			    (if ipstr ipstr hostn))) ;; hostname)))
	 (start-port    (if (args:get-arg "-port")
			    (string->number (args:get-arg "-port"))
			    (+ 5000 (random 1001))))
	 (link-tree-path (config-lookup *configdat* "setup" "linktree")))
    (set! *cache-on* #t)
    (root-path     (if link-tree-path 
		       link-tree-path
		       (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP!

    ;; Setup the web server and a /ctrl interface
    ;;
    (vhost-map `(((* any) . ,(lambda (continue)
			       ;; open the db on the first call 
			       (if (not db)(set! db (open-db)))
			       (let* (($   (request-vars source: 'both))
				      (dat ($ 'dat))
				      (res #f))
				 (cond
				  ((equal? (uri-path (request-uri (current-request))) 
					   '(/ "hey"))
				   (send-response body: "hey there!\n"
						  headers: '((content-type text/plain))))
				  ;; This is the /ctrl path where data is handed to the server and
				  ;; responses 
				  ((equal? (uri-path (request-uri (current-request)))
					   '(/ "ctrl"))
				   (let* ((packet (db:string->obj dat))
					  (qtype  (cdb:packet-get-qtype packet)))
				     (debug:print-info 12 "server=> received packet=" packet)
				     (if (not (member qtype '(sync ping)))
					 (begin
					   (mutex-lock! *heartbeat-mutex*)
					   (set! *last-db-access* (current-seconds))
					   (mutex-unlock! *heartbeat-mutex*)))
				     ;; (mutex-lock! *db:process-queue-mutex*) ;; trying a mutex
				     ;; (set! res (open-run-close db:process-queue-item open-db packet))
				     (set! res (db:process-queue-item db packet))
				     ;; (mutex-unlock! *db:process-queue-mutex*)
				     (debug:print-info 11 "Return value from db:process-queue-item is " res)
				     (send-response body: (conc "<head>ctrl data</head>\n<body>"
								res
								"</body>")
						    headers: '((content-type text/plain)))))
				  (else (continue))))))))
    (http-transport:try-start-server ipaddrstr start-port)
    ;; lite3:finalize! db)))
    ))

;; This is recursively run by http-transport:run until sucessful
;;
(define (http-transport:try-start-server ipaddrstr portnum)
  (handle-exceptions
   exn
   (begin
     (print-error-message exn)
     (if (< portnum 9000)
	 (begin 
	   (print "WARNING: failed to start on portnum: " portnum ", trying next port")
	   (thread-sleep! 0.1)
	   (open-run-close tasks:remove-server-records tasks:open-db)
	   (http-transport:try-start-server ipaddrstr (+ portnum 1)))
	 (print "ERROR: Tried and tried but could not start the server")))
   (set! *runremote* (list ipaddrstr portnum))
   (open-run-close tasks:remove-server-records tasks:open-db)
   (open-run-close tasks:server-register 
		   tasks:open-db 
		   (current-process-id)
		   ipaddrstr portnum 0 'live 'http)
   (print "INFO: Trying to start server on " ipaddrstr ":" portnum)
   ;; This starts the spiffy server
   (start-server port: portnum)
   (print "INFO: server has been stopped")))

(define (http-transport:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))

;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;;======================================================================
;; C L I E N T S
;;======================================================================

;; <html>
;; <head></head>
;; <body>1 Hello, world! Goodbye Dolly</body></html>
;; Send msg to serverdat and receive result
(define (http-transport:client-send-receive serverdat msg)
  (let* ((url        (http-transport:make-server-url serverdat))
	 (fullurl    (conc url "/ctrl")) ;; (conc url "/?dat=" msg)))
	 (numretries 0))     
    (handle-exceptions
     exn
     (if (< numretries 200)
	 (http-transport:client-send-receive serverdat msg))
     (begin
       (debug:print-info 11 "fullurl=" fullurl "\n")
       ;; set up the http-client here
       (max-retry-attempts 100)
       (retry-request? (lambda (request)
			 (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10))
			 (set! numretries (+ numretries 1))
			 #t))
       ;; send the data and get the response
       ;; extract the needed info from the http data and 
       ;; process and return it.
       (let* ((res   (with-input-from-request fullurl 
					      ;; #f
					      ;; msg 
					      (list (cons 'dat msg)) 
					      read-string)))
	 (debug:print-info 11 "got res=" res)
	 (let ((match (string-search (regexp "<body>(.*)<.body>") res)))
	   (debug:print-info 11 "match=" match)
	   (let ((final (cadr match)))
	     (debug:print-info 11 "final=" final)
	     final)))))))

(define (http-transport:client-connect iface port)
  (let* ((login-res   #f)
	 (serverdat   (list iface port)))
    (set! login-res (server:client-login serverdat))
    (if (and (not (null? login-res))
	     (car login-res))
	(begin
	  (debug:print-info 2 "Logged in and connected to " iface ":" port)
	  (set! *runremote* serverdat)
	  serverdat)
	(begin
	  (debug:print-info 2 "Failed to login or connect to " iface ":" port)
	  (set! *runremote* #f)
	  #f))))


;; run http-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (http-transport:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
                        (let ((sdat #f))
                          (mutex-lock! *heartbeat-mutex*)
                          (set! sdat *runremote*)
                          (mutex-unlock! *heartbeat-mutex*)
                          (if sdat sdat
                              (begin
                                (sleep 4)
                                (loop))))))
         (iface       (car server-info))
         (port        (cadr server-info))
         (last-access 0)
	 (tdb         (tasks:open-db))
	 (spid        (tasks:server-get-server-id tdb #f iface port #f)))
    (print "Keep-running got server pid " spid ", using iface " iface " and port " port)
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      ;; NB// sync currently does NOT return queue-length
      (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1)))
      ;; (print "Server running, count is " count)
        (if (< count 1) ;; 3x3 = 9 secs aprox
            (loop (+ count 1)))
        
        ;; NOTE: Get rid of this mechanism! It really is not needed...
        (tasks:server-update-heartbeat tdb spid)
      
        ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
        (mutex-lock! *heartbeat-mutex*)
        (set! last-access *last-db-access*)
        (mutex-unlock! *heartbeat-mutex*)
        (if (> (+ last-access
                  ;; (* 50 60 60)    ;; 48 hrs
                  ;; 60              ;; one minute
                  ;; (* 60 60)       ;; one hour
                  (* 45 60)          ;; 45 minutes, until the db deletion bug is fixed.
                  )
               (current-seconds))
            (begin
              (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
              (loop 0))
            (begin
              (debug:print-info 0 "Starting to shutdown the server.")
              ;; need to delete only *my* server entry (future use)
              (set! *time-to-exit* #t)
              (tasks:server-deregister-self tdb (get-host-name))
              (thread-sleep! 1)
              (debug:print-info 0 "Max cached queries was " *max-cache-size*)
              (debug:print-info 0 "Server shutdown complete. Exiting")
              (exit)))))))

;; all routes though here end in exit ...
(define (http-transport:launch)
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, exiting")
	    (exit))))
  (debug:print-info 2 "Starting the standalone server")
  (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
    (debug:print 11 "http-transport:launch hostinfo=" hostinfo)
    (if hostinfo
	(debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo))
	(if *toppath* 
	    (let* ((th2 (make-thread (lambda ()
				       (http-transport:run 
					(if (args:get-arg "-server")
					    (args:get-arg "-server")
					    "-"))) "Server run"))
		   (th3 (make-thread (lambda ()(http-transport:keep-running)) "Keep running"))
		   )
	      (thread-start! th2)
	      (thread-start! th3)
	      (set! *didsomething* #t)
	      (thread-join! th2)
	      )
	    (debug:print 0 "ERROR: Failed to setup for megatest")))
    (exit)))

Modified megatest.scm from [d00ef9a849] to [135d8ed2af].

95
96
97
98
99
100
101

102
103
104
105
106
107
108
109
  -rebuild-db             : bring the database schema up to date
  -update-meta            : update the tests metadata for all tests
  -env2file fname         : write the environment to fname.csh and fname.sh
  -setvars VAR1=val1,VAR2=val2 : Add environment variables to a run NB// these are
                                 overwritten by values set in config files.
  -server -|hostname      : start the server (reduces contention on megatest.db), use
                            - to automatically figure out hostname

  -list-servers            : list the servers 
  -repl                   : start a repl (useful for extending megatest)

Spreadsheet generation
  -extract-ods fname.ods  : extract an open document spreadsheet from the database
  -pathmod path           : insert path, i.e. path/runame/itempath/logfile.html
                            will clear the field if no rundir/testname/itempath/logfile
                            if it contains forward slashes the path will be converted







>
|







95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
  -rebuild-db             : bring the database schema up to date
  -update-meta            : update the tests metadata for all tests
  -env2file fname         : write the environment to fname.csh and fname.sh
  -setvars VAR1=val1,VAR2=val2 : Add environment variables to a run NB// these are
                                 overwritten by values set in config files.
  -server -|hostname      : start the server (reduces contention on megatest.db), use
                            - to automatically figure out hostname
  -transport http|zmq     : use http or zmq for transport (default is http) 
  -list-servers           : list the servers 
  -repl                   : start a repl (useful for extending megatest)

Spreadsheet generation
  -extract-ods fname.ods  : extract an open document spreadsheet from the database
  -pathmod path           : insert path, i.e. path/runame/itempath/logfile.html
                            will clear the field if no rundir/testname/itempath/logfile
                            if it contains forward slashes the path will be converted
155
156
157
158
159
160
161

162
163
164
165
166
167
168
			":variable"
			":value"
			":expected"
			":tol"
			":units"
			;; misc
			"-server"

			"-kill-server"
			"-port"
			"-extract-ods"
			"-pathmod"
			"-env2file"
			"-setvars"
			"-set-state-status"







>







156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
			":variable"
			":value"
			":expected"
			":tol"
			":units"
			;; misc
			"-server"
			"-transport"
			"-kill-server"
			"-port"
			"-extract-ods"
			"-pathmod"
			"-env2file"
			"-setvars"
			"-set-state-status"
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316

317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347

;;======================================================================
;; Start the server - can be done in conjunction with -runall or -runtests (one day...)
;;   we start the server if not running else start the client thread
;;======================================================================

(if (args:get-arg "-server")
    (begin
      (debug:print 2 "Launching server...")
      (server:launch)))

(if (args:get-arg "-list-servers")
	;; (args:get-arg "-kill-server"))
    (let ((tl (setup-for-run)))
      (if tl 
	  (let ((servers (open-run-close tasks:get-all-servers tasks:open-db))
		(fmtstr  "~5a~8a~8a~20a~20a~10a~10a~10a~10a\n")
		(servers-to-kill '()))
	    (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface" "OutPort" "InPort" "LastBeat" "State")
	    (format #t fmtstr "==" "=====" "===" "====" "=========" "=======" "======" "========" "=====")
	    (for-each 
	     (lambda (server)
	       (let* (;; (killinfo   (args:get-arg "-kill-server"))
		      ;; (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f))
		      ;; (kpid       (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f))
		      (id         (vector-ref server 0))
		      (pid        (vector-ref server 1))
		      (hostname   (vector-ref server 2))
		      (interface  (vector-ref server 3))
		      (pullport   (vector-ref server 4))
		      (pubport    (vector-ref server 5))
		      (start-time (vector-ref server 6))
		      (priority   (vector-ref server 7))
		      (state      (vector-ref server 8))
		      (mt-ver     (vector-ref server 9))
		      (last-update (vector-ref server 10)) ;;   (open-run-close tasks:server-alive? tasks:open-db #f hostname: hostname port: port))

		      (killed     #f)
		      (status     (< last-update 20)))
		 ;;   (zmq-sockets (if status (server:client-connect hostname port) #f)))
		 ;; no need to login as status of #t indicates we are connecting to correct 
		 ;; server
		 (if (equal? state "dead")
		     (if (> last-update (* 25 60 60)) ;; keep records around for slighly over a day.
			 (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid action: 'delete))
		     (if (> last-update 20)        ;; Mark as dead if not updated in last 20 seconds
			 (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid)))

		 (format #t fmtstr id mt-ver pid hostname interface pullport pubport last-update
			 (if status "alive" "dead"))))
	     servers)
	    (debug:print-info 1 "Done with listservers")
	    (set! *didsomething* #t)
	    (exit) ;; must do, would have to add checks to many/all calls below
	    )
	  (exit)))
    ;; if not list or kill then start a client (if appropriate)
    (if (or (args-defined? "-h" "-version" "-gen-megatest-area" "-gen-megatest-test")
	    (eq? (length (hash-table-keys args:arg-hash)) 0))
	(debug:print-info 1 "Server connection not needed")
	
	(server:client-launch)))

;;======================================================================
;; Weird special calls that need to run *after* the server has started?
;;======================================================================

(if (args:get-arg "-list-targets")







|
|
|






|

|
|
















>












|










|







283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350

;;======================================================================
;; Start the server - can be done in conjunction with -runall or -runtests (one day...)
;;   we start the server if not running else start the client thread
;;======================================================================

(if (args:get-arg "-server")
    (let ((transport (args:get-arg "-transport" "http")))
      (debug:print 2 "Launching server using transport " transport)
      (server:launch (string->symbol transport))))

(if (args:get-arg "-list-servers")
	;; (args:get-arg "-kill-server"))
    (let ((tl (setup-for-run)))
      (if tl 
	  (let ((servers (open-run-close tasks:get-all-servers tasks:open-db))
		(fmtstr  "~5a~8a~8a~20a~20a~10a~10a~10a~10a~10a\n")
		(servers-to-kill '()))
	    (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface" "OutPort" "InPort" "LastBeat" "State" "Transport")
	    (format #t fmtstr "==" "=====" "===" "====" "=========" "=======" "======" "========" "=====" "=========")
	    (for-each 
	     (lambda (server)
	       (let* (;; (killinfo   (args:get-arg "-kill-server"))
		      ;; (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f))
		      ;; (kpid       (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f))
		      (id         (vector-ref server 0))
		      (pid        (vector-ref server 1))
		      (hostname   (vector-ref server 2))
		      (interface  (vector-ref server 3))
		      (pullport   (vector-ref server 4))
		      (pubport    (vector-ref server 5))
		      (start-time (vector-ref server 6))
		      (priority   (vector-ref server 7))
		      (state      (vector-ref server 8))
		      (mt-ver     (vector-ref server 9))
		      (last-update (vector-ref server 10)) ;;   (open-run-close tasks:server-alive? tasks:open-db #f hostname: hostname port: port))
		      (transport  (vector-ref server 11))
		      (killed     #f)
		      (status     (< last-update 20)))
		 ;;   (zmq-sockets (if status (server:client-connect hostname port) #f)))
		 ;; no need to login as status of #t indicates we are connecting to correct 
		 ;; server
		 (if (equal? state "dead")
		     (if (> last-update (* 25 60 60)) ;; keep records around for slighly over a day.
			 (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid action: 'delete))
		     (if (> last-update 20)        ;; Mark as dead if not updated in last 20 seconds
			 (open-run-close tasks:server-deregister tasks:open-db hostname pullport: pullport pid: pid)))

		 (format #t fmtstr id mt-ver pid hostname interface pullport pubport last-update
			 (if status "alive" "dead") transport)))
	     servers)
	    (debug:print-info 1 "Done with listservers")
	    (set! *didsomething* #t)
	    (exit) ;; must do, would have to add checks to many/all calls below
	    )
	  (exit)))
    ;; if not list or kill then start a client (if appropriate)
    (if (or (args-defined? "-h" "-version" "-gen-megatest-area" "-gen-megatest-test")
	    (eq? (length (hash-table-keys args:arg-hash)) 0))
	(debug:print-info 1 "Server connection not needed")
	;; ok, so lets connect to the server
	(server:client-launch)))

;;======================================================================
;; Weird special calls that need to run *after* the server has started?
;;======================================================================

(if (args:get-arg "-list-targets")

Added rpc-transport.scm version [12f5deda72].

























































































































































































































































































































































































































































































































































































































































































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412

;; Copyright 2006-2012, Matthew Welland.
;; 
;;  This program is made available under the GNU GPL version 2.0 or
;;  greater. See the accompanying file COPYING for details.
;; 
;;  This program is distributed WITHOUT ANY WARRANTY; without even the
;;  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;;  PURPOSE.

(require-extension (srfi 18) extras tcp s11n)

(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))

(use spiffy uri-common intarweb http-client spiffy-request-vars)

(tcp-buffer-size 2048)

(declare (unit server))

(declare (uses common))
(declare (uses db))
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.

(include "common_records.scm")
(include "db_records.scm")

(define (server:make-server-url hostport)
  (if (not hostport)
      #f
      (conc "http://" (car hostport) ":" (cadr hostport))))

(define  *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))

;;======================================================================
;; S E R V E R
;;======================================================================

;; Call this to start the actual server
;;

(define *db:process-queue-mutex* (make-mutex))

(define (server:run hostn)
  (debug:print 2 "Attempting to start the server ...")
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
	    (exit))))
  (let* (;; (iface           (if (string=? "-" hostn)
	 ;;        	      #f ;; (get-host-name) 
	 ;;        	      hostn))
	 (db              #f) ;;        (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily
	 (hostname        (get-host-name))
	 (ipaddrstr       (let ((ipstr (if (string=? "-" hostn)
					   (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					   #f)))
			    (if ipstr ipstr hostn))) ;; hostname)))
	 (start-port    (if (args:get-arg "-port")
			    (string->number (args:get-arg "-port"))
			    (+ 5000 (random 1001))))
	 (link-tree-path (config-lookup *configdat* "setup" "linktree")))
    (set! *cache-on* #t)
    (root-path     (if link-tree-path 
		       link-tree-path
		       (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP!

    ;; Setup the web server and a /ctrl interface
    ;;
    (vhost-map `(((* any) . ,(lambda (continue)
			       ;; open the db on the first call 
			       (if (not db)(set! db (open-db)))
			       (let* (($   (request-vars source: 'both))
				      (dat ($ 'dat))
				      (res #f))
				 (cond
				  ((equal? (uri-path (request-uri (current-request))) 
					   '(/ "hey"))
				   (send-response body: "hey there!\n"
						  headers: '((content-type text/plain))))
				  ;; This is the /ctrl path where data is handed to the server and
				  ;; responses 
				  ((equal? (uri-path (request-uri (current-request)))
					   '(/ "ctrl"))
				   (let* ((packet (db:string->obj dat))
					  (qtype  (cdb:packet-get-qtype packet)))
				     (debug:print-info 12 "server=> received packet=" packet)
				     (if (not (member qtype '(sync ping)))
					 (begin
					   (mutex-lock! *heartbeat-mutex*)
					   (set! *last-db-access* (current-seconds))
					   (mutex-unlock! *heartbeat-mutex*)))
				     ;; (mutex-lock! *db:process-queue-mutex*) ;; trying a mutex
				     ;; (set! res (open-run-close db:process-queue-item open-db packet))
				     (set! res (db:process-queue-item db packet))
				     ;; (mutex-unlock! *db:process-queue-mutex*)
				     (debug:print-info 11 "Return value from db:process-queue-item is " res)
				     (send-response body: (conc "<head>ctrl data</head>\n<body>"
								res
								"</body>")
						    headers: '((content-type text/plain)))))
				  (else (continue))))))))
    (server:try-start-server ipaddrstr start-port)
    ;; lite3:finalize! db)))
    ))



;; (define (server:main-loop)
;;   (print "INFO: Exectuing main server loop")
;;   (access-log "megatest-http.log")
;;   (server-bind-address #f)
;;   (define-page (main-page-path)
;;     (lambda ()
;;       (let ((dat ($ "dat")))
;;       ;; (with-request-variables (dat)
;;         (debug:print-info 12 "Got dat=" dat)
;; 	(let* ((packet (db:string->obj dat))
;; 	       (qtype  (cdb:packet-get-qtype packet)))
;; 	  (debug:print-info 12 "server=> received packet=" packet)
;; 	  (if (not (member qtype '(sync ping)))
;; 	      (begin
;; 		(mutex-lock! *heartbeat-mutex*)
;; 		(set! *last-db-access* (current-seconds))
;; 		(mutex-unlock! *heartbeat-mutex*)))
;; 	  (let ((res (open-run-close db:process-queue-item open-db packet)))
;; 	    (debug:print-info 11 "Return value from db:process-queue-item is " res)
;; 	    res))))))

;;; (use spiffy uri-common intarweb)
;;; 
;;; (root-path "/var/www")
;;; 
;;; (vhost-map `(((* any) . ,(lambda (continue)
;;;                            (if (equal? (uri-path (request-uri (current-request))) 
;;;                                        '(/ "hey"))
;;;                                (send-response body: "hey there!\n"
;;;                                               headers: '((content-type text/plain)))
;;;                                (continue))))))
;;; 
;;; (start-server port: 12345)

;; This is recursively run by server:run until sucessful
;;
(define (server:try-start-server ipaddrstr portnum)
  (handle-exceptions
   exn
   (begin
     (print-error-message exn)
     (if (< portnum 9000)
	 (begin 
	   (print "WARNING: failed to start on portnum: " portnum ", trying next port")
	   (thread-sleep! 0.1)
	   (open-run-close tasks:remove-server-records tasks:open-db)
	   (server:try-start-server ipaddrstr (+ portnum 1)))
	 (print "ERROR: Tried and tried but could not start the server")))
   (set! *runremote* (list ipaddrstr portnum))
   (open-run-close tasks:remove-server-records tasks:open-db)
   (open-run-close tasks:server-register 
		   tasks:open-db 
		   (current-process-id)
		   ipaddrstr portnum 0 'live)
   (print "INFO: Trying to start server on " ipaddrstr ":" portnum)
   ;; This starts the spiffy server
   (start-server port: portnum)
   (print "INFO: server has been stopped")))

(define (server:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))

;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;; When using zmq this would send the message back (two step process)
;; with spiffy or rpc this simply returns the return data to be returned
;; 
(define (server:reply return-addr query-sig success/fail result)
  (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result)
  ;; (send-message pubsock target send-more: #t)
  ;; (send-message pubsock 
  (db:obj->string (vector success/fail query-sig result)))

;;======================================================================
;; C L I E N T S
;;======================================================================

(define (server:get-client-signature)
  (if *my-client-signature* *my-client-signature*
      (let ((sig (server:mk-signature)))
	(set! *my-client-signature* sig)
	*my-client-signature*)))

;; <html>
;; <head></head>
;; <body>1 Hello, world! Goodbye Dolly</body></html>
;; Send msg to serverdat and receive result
(define (server:client-send-receive serverdat msg)
  (let* ((url        (server:make-server-url serverdat))
	 (fullurl    (conc url "/ctrl")) ;; (conc url "/?dat=" msg)))
	 (numretries 0))     
    (handle-exceptions
     exn
     (if (< numretries 200)
	 (server:client-send-receive serverdat msg))
     (begin
       (debug:print-info 11 "fullurl=" fullurl "\n")
       ;; set up the http-client here
       (max-retry-attempts 100)
       (retry-request? (lambda (request)
			 (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10))
			 (set! numretries (+ numretries 1))
			 #t))
       ;; send the data and get the response
       ;; extract the needed info from the http data and 
       ;; process and return it.
       (let* ((res   (with-input-from-request fullurl 
					      ;; #f
					      ;; msg 
					      (list (cons 'dat msg)) 
					      read-string)))
	 (debug:print-info 11 "got res=" res)
	 (let ((match (string-search (regexp "<body>(.*)<.body>") res)))
	   (debug:print-info 11 "match=" match)
	   (let ((final (cadr match)))
	     (debug:print-info 11 "final=" final)
	     final)))))))

(define (server:client-login serverdat)
  (max-retry-attempts 100)
  (cdb:login serverdat *toppath* (server:get-client-signature)))

;; Not currently used! But, I think it *should* be used!!!
(define (server:client-logout serverdat)
  (let ((ok (and (socket? serverdat)
		 (cdb:logout serverdat *toppath* (server:get-client-signature)))))
    ;; (close-socket serverdat)
    ok))

(define (server:client-connect iface port)
  (let* ((login-res   #f)
	 (serverdat   (list iface port)))
    (set! login-res (server:client-login serverdat))
    (if (and (not (null? login-res))
	     (car login-res))
	(begin
	  (debug:print-info 2 "Logged in and connected to " iface ":" port)
	  (set! *runremote* serverdat)
	  serverdat)
	(begin
	  (debug:print-info 2 "Failed to login or connect to " iface ":" port)
	  (set! *runremote* #f)
	  #f))))

;; Do all the connection work, start a server if not already running
(define (server:client-setup #!key (numtries 50))
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: failed to find megatest.config, exiting")
	    (exit))))
  (let ((hostinfo   (open-run-close tasks:get-best-server tasks:open-db)))
    (if hostinfo
	(let ((host     (list-ref hostinfo 0))
	      (iface    (list-ref hostinfo 1))
	      (port     (list-ref hostinfo 2))
	      (pid      (list-ref hostinfo 3)))
	  (debug:print-info 2 "Setting up to connect to " hostinfo)
	  (server:client-connect iface port)) ;; )
	(if (> numtries 0)
	    (let ((exe (car (argv)))
		  (pid #f))
	      (debug:print-info 0 "No server available, attempting to start one...")
	      (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*)
	        							  (string-intersperse *verbosity* ",")
	        							  (conc *verbosity*)))))
	      ;; (set! pid (process-fork (lambda ()
	      ;;   			(current-input-port  (open-input-file  "/dev/null"))
	      ;;   			(current-output-port (open-output-file "/dev/null"))
	      ;;   			(current-error-port  (open-output-file "/dev/null"))
	      ;;   			(server:launch))))
	      (let loop ((count 0))
		(let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
		  (if (not hostinfo)
		      (begin
			(debug:print-info 0 "Waiting for server pid=" pid " to start")
			(sleep 2) ;; give server time to start
			(if (< count 5)
			    (loop (+ count 1)))))))
	      ;; we are starting a server, do not try again! That can lead to 
	      ;; recursively starting many processes!!!
	      (server:client-setup numtries: 0))
	    (debug:print-info 1 "Too many attempts, giving up")))))

;; run server:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
                        (let ((sdat #f))
                          (mutex-lock! *heartbeat-mutex*)
                          (set! sdat *runremote*)
                          (mutex-unlock! *heartbeat-mutex*)
                          (if sdat sdat
                              (begin
                                (sleep 4)
                                (loop))))))
         (iface       (car server-info))
         (port        (cadr server-info))
         (last-access 0)
	 (tdb         (tasks:open-db))
	 (spid        (tasks:server-get-server-id tdb #f iface port #f)))
    (print "Keep-running got server pid " spid ", using iface " iface " and port " port)
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      ;; NB// sync currently does NOT return queue-length
      (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1)))
      ;; (print "Server running, count is " count)
        (if (< count 1) ;; 3x3 = 9 secs aprox
            (loop (+ count 1)))
        
        ;; NOTE: Get rid of this mechanism! It really is not needed...
        (tasks:server-update-heartbeat tdb spid)
      
        ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
        (mutex-lock! *heartbeat-mutex*)
        (set! last-access *last-db-access*)
        (mutex-unlock! *heartbeat-mutex*)
        (if (> (+ last-access
                  ;; (* 50 60 60)    ;; 48 hrs
                  ;; 60              ;; one minute
                  ;; (* 60 60)       ;; one hour
                  (* 45 60)          ;; 45 minutes, until the db deletion bug is fixed.
                  )
               (current-seconds))
            (begin
              (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
              (loop 0))
            (begin
              (debug:print-info 0 "Starting to shutdown the server.")
              ;; need to delete only *my* server entry (future use)
              (set! *time-to-exit* #t)
              (tasks:server-deregister-self tdb (get-host-name))
              (thread-sleep! 1)
              (debug:print-info 0 "Max cached queries was " *max-cache-size*)
              (debug:print-info 0 "Server shutdown complete. Exiting")
              (exit)))))))

;; all routes though here end in exit ...
(define (server:launch)
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, exiting")
	    (exit))))
  (debug:print-info 2 "Starting the standalone server")
  (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
    (debug:print 11 "server:launch hostinfo=" hostinfo)
    (if hostinfo
	(debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo))
	(if *toppath* 
	    (let* ((th2 (make-thread (lambda ()
				       (server:run 
					(if (args:get-arg "-server")
					    (args:get-arg "-server")
					    "-"))) "Server run"))
		   (th3 (make-thread (lambda ()(server:keep-running)) "Keep running"))
		   )
	      (thread-start! th2)
	      (thread-start! th3)
	      (set! *didsomething* #t)
	      (thread-join! th2)
	      )
	    (debug:print 0 "ERROR: Failed to setup for megatest")))
    (exit)))

(define (server:client-signal-handler signum)
  (handle-exceptions
   exn
   (debug:print " ... exiting ...")
   (let ((th1 (make-thread (lambda ()
			     "") ;; do nothing for now (was flush out last call if applicable)
			   "eat response"))
	 (th2 (make-thread (lambda ()
			     (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.")
			     (thread-sleep! 1) ;; give the flush one second to do it's stuff
			     (debug:print 0 "       Done.")
			     (exit 4))
			   "exit on ^C timer")))
     (thread-start! th2)
     (thread-start! th1)
     (thread-join! th2))))

(define (server:client-launch)
  (set-signal-handler! signal/int server:client-signal-handler)
   (if (server:client-setup)
       (debug:print-info 2 "connected as client")
       (begin
	 (debug:print 0 "ERROR: Failed to connect as client")
	 (exit))))

Modified server.scm from [12f5deda72] to [f2ac4bfe5b].

11
12
13
14
15
16
17
18
19
20
21
22
23
24
25


26
27
28
29
30
31
32
(require-extension (srfi 18) extras tcp s11n)

(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))

(use spiffy uri-common intarweb http-client spiffy-request-vars)

(tcp-buffer-size 2048)

(declare (unit server))

(declare (uses common))
(declare (uses db))
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.



(include "common_records.scm")
(include "db_records.scm")

(define (server:make-server-url hostport)
  (if (not hostport)
      #f







<
<






>
>







11
12
13
14
15
16
17


18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
(require-extension (srfi 18) extras tcp s11n)

(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))

(use spiffy uri-common intarweb http-client spiffy-request-vars)



(declare (unit server))

(declare (uses common))
(declare (uses db))
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
(declare (uses http-transport))
(declare (uses zmq-transport))

(include "common_records.scm")
(include "db_records.scm")

(define (server:make-server-url hostport)
  (if (not hostport)
      #f
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189


190






191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263


264
265
266
267
268
269


270
271
272

273
274
275

276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367

368

369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
						    headers: '((content-type text/plain)))))
				  (else (continue))))))))
    (server:try-start-server ipaddrstr start-port)
    ;; lite3:finalize! db)))
    ))



;; (define (server:main-loop)
;;   (print "INFO: Exectuing main server loop")
;;   (access-log "megatest-http.log")
;;   (server-bind-address #f)
;;   (define-page (main-page-path)
;;     (lambda ()
;;       (let ((dat ($ "dat")))
;;       ;; (with-request-variables (dat)
;;         (debug:print-info 12 "Got dat=" dat)
;; 	(let* ((packet (db:string->obj dat))
;; 	       (qtype  (cdb:packet-get-qtype packet)))
;; 	  (debug:print-info 12 "server=> received packet=" packet)
;; 	  (if (not (member qtype '(sync ping)))
;; 	      (begin
;; 		(mutex-lock! *heartbeat-mutex*)
;; 		(set! *last-db-access* (current-seconds))
;; 		(mutex-unlock! *heartbeat-mutex*)))
;; 	  (let ((res (open-run-close db:process-queue-item open-db packet)))
;; 	    (debug:print-info 11 "Return value from db:process-queue-item is " res)
;; 	    res))))))

;;; (use spiffy uri-common intarweb)
;;; 
;;; (root-path "/var/www")
;;; 
;;; (vhost-map `(((* any) . ,(lambda (continue)
;;;                            (if (equal? (uri-path (request-uri (current-request))) 
;;;                                        '(/ "hey"))
;;;                                (send-response body: "hey there!\n"
;;;                                               headers: '((content-type text/plain)))
;;;                                (continue))))))
;;; 
;;; (start-server port: 12345)

;; This is recursively run by server:run until sucessful
;;
(define (server:try-start-server ipaddrstr portnum)
  (handle-exceptions
   exn
   (begin
     (print-error-message exn)
     (if (< portnum 9000)
	 (begin 
	   (print "WARNING: failed to start on portnum: " portnum ", trying next port")
	   (thread-sleep! 0.1)
	   (open-run-close tasks:remove-server-records tasks:open-db)
	   (server:try-start-server ipaddrstr (+ portnum 1)))
	 (print "ERROR: Tried and tried but could not start the server")))
   (set! *runremote* (list ipaddrstr portnum))
   (open-run-close tasks:remove-server-records tasks:open-db)
   (open-run-close tasks:server-register 
		   tasks:open-db 
		   (current-process-id)
		   ipaddrstr portnum 0 'live)
   (print "INFO: Trying to start server on " ipaddrstr ":" portnum)
   ;; This starts the spiffy server
   (start-server port: portnum)
   (print "INFO: server has been stopped")))

(define (server:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))

;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;; When using zmq this would send the message back (two step process)
;; with spiffy or rpc this simply returns the return data to be returned
;; 
(define (server:reply return-addr query-sig success/fail result)
  (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result)
  ;; (send-message pubsock target send-more: #t)
  ;; (send-message pubsock 


  (db:obj->string (vector success/fail query-sig result)))







;;======================================================================
;; C L I E N T S
;;======================================================================

(define (server:get-client-signature)
  (if *my-client-signature* *my-client-signature*
      (let ((sig (server:mk-signature)))
	(set! *my-client-signature* sig)
	*my-client-signature*)))

;; <html>
;; <head></head>
;; <body>1 Hello, world! Goodbye Dolly</body></html>
;; Send msg to serverdat and receive result
(define (server:client-send-receive serverdat msg)
  (let* ((url        (server:make-server-url serverdat))
	 (fullurl    (conc url "/ctrl")) ;; (conc url "/?dat=" msg)))
	 (numretries 0))     
    (handle-exceptions
     exn
     (if (< numretries 200)
	 (server:client-send-receive serverdat msg))
     (begin
       (debug:print-info 11 "fullurl=" fullurl "\n")
       ;; set up the http-client here
       (max-retry-attempts 100)
       (retry-request? (lambda (request)
			 (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10))
			 (set! numretries (+ numretries 1))
			 #t))
       ;; send the data and get the response
       ;; extract the needed info from the http data and 
       ;; process and return it.
       (let* ((res   (with-input-from-request fullurl 
					      ;; #f
					      ;; msg 
					      (list (cons 'dat msg)) 
					      read-string)))
	 (debug:print-info 11 "got res=" res)
	 (let ((match (string-search (regexp "<body>(.*)<.body>") res)))
	   (debug:print-info 11 "match=" match)
	   (let ((final (cadr match)))
	     (debug:print-info 11 "final=" final)
	     final)))))))

(define (server:client-login serverdat)
  (max-retry-attempts 100)
  (cdb:login serverdat *toppath* (server:get-client-signature)))

;; Not currently used! But, I think it *should* be used!!!
(define (server:client-logout serverdat)
  (let ((ok (and (socket? serverdat)
		 (cdb:logout serverdat *toppath* (server:get-client-signature)))))
    ;; (close-socket serverdat)
    ok))

(define (server:client-connect iface port)
  (let* ((login-res   #f)
	 (serverdat   (list iface port)))
    (set! login-res (server:client-login serverdat))
    (if (and (not (null? login-res))
	     (car login-res))
	(begin
	  (debug:print-info 2 "Logged in and connected to " iface ":" port)
	  (set! *runremote* serverdat)
	  serverdat)
	(begin
	  (debug:print-info 2 "Failed to login or connect to " iface ":" port)
	  (set! *runremote* #f)
	  #f))))

;; Do all the connection work, start a server if not already running


(define (server:client-setup #!key (numtries 50))
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: failed to find megatest.config, exiting")
	    (exit))))


  (let ((hostinfo   (open-run-close tasks:get-best-server tasks:open-db)))
    (if hostinfo
	(let ((host     (list-ref hostinfo 0))

	      (iface    (list-ref hostinfo 1))
	      (port     (list-ref hostinfo 2))
	      (pid      (list-ref hostinfo 3)))

	  (debug:print-info 2 "Setting up to connect to " hostinfo)
	  (server:client-connect iface port)) ;; )
	(if (> numtries 0)
	    (let ((exe (car (argv)))
		  (pid #f))
	      (debug:print-info 0 "No server available, attempting to start one...")
	      (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*)
	        							  (string-intersperse *verbosity* ",")
	        							  (conc *verbosity*)))))
	      ;; (set! pid (process-fork (lambda ()
	      ;;   			(current-input-port  (open-input-file  "/dev/null"))
	      ;;   			(current-output-port (open-output-file "/dev/null"))
	      ;;   			(current-error-port  (open-output-file "/dev/null"))
	      ;;   			(server:launch))))
	      (let loop ((count 0))
		(let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))
		  (if (not hostinfo)
		      (begin
			(debug:print-info 0 "Waiting for server pid=" pid " to start")
			(sleep 2) ;; give server time to start
			(if (< count 5)
			    (loop (+ count 1)))))))
	      ;; we are starting a server, do not try again! That can lead to 
	      ;; recursively starting many processes!!!
	      (server:client-setup numtries: 0))
	    (debug:print-info 1 "Too many attempts, giving up")))))

;; run server:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (server:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
                        (let ((sdat #f))
                          (mutex-lock! *heartbeat-mutex*)
                          (set! sdat *runremote*)
                          (mutex-unlock! *heartbeat-mutex*)
                          (if sdat sdat
                              (begin
                                (sleep 4)
                                (loop))))))
         (iface       (car server-info))
         (port        (cadr server-info))
         (last-access 0)
	 (tdb         (tasks:open-db))
	 (spid        (tasks:server-get-server-id tdb #f iface port #f)))
    (print "Keep-running got server pid " spid ", using iface " iface " and port " port)
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      ;; NB// sync currently does NOT return queue-length
      (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1)))
      ;; (print "Server running, count is " count)
        (if (< count 1) ;; 3x3 = 9 secs aprox
            (loop (+ count 1)))
        
        ;; NOTE: Get rid of this mechanism! It really is not needed...
        (tasks:server-update-heartbeat tdb spid)
      
        ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
        (mutex-lock! *heartbeat-mutex*)
        (set! last-access *last-db-access*)
        (mutex-unlock! *heartbeat-mutex*)
        (if (> (+ last-access
                  ;; (* 50 60 60)    ;; 48 hrs
                  ;; 60              ;; one minute
                  ;; (* 60 60)       ;; one hour
                  (* 45 60)          ;; 45 minutes, until the db deletion bug is fixed.
                  )
               (current-seconds))
            (begin
              (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
              (loop 0))
            (begin
              (debug:print-info 0 "Starting to shutdown the server.")
              ;; need to delete only *my* server entry (future use)
              (set! *time-to-exit* #t)
              (tasks:server-deregister-self tdb (get-host-name))
              (thread-sleep! 1)
              (debug:print-info 0 "Max cached queries was " *max-cache-size*)
              (debug:print-info 0 "Server shutdown complete. Exiting")
              (exit)))))))

;; all routes though here end in exit ...
(define (server:launch)
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, exiting")
	    (exit))))
  (debug:print-info 2 "Starting the standalone server")

  (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db)))

    (debug:print 11 "server:launch hostinfo=" hostinfo)
    (if hostinfo
	(debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo))
	(if *toppath* 
	    (let* ((th2 (make-thread (lambda ()
				       (server:run 
					(if (args:get-arg "-server")
					    (args:get-arg "-server")
					    "-"))) "Server run"))
		   (th3 (make-thread (lambda ()(server:keep-running)) "Keep running"))
		   )
	      (thread-start! th2)
	      (thread-start! th3)
	      (set! *didsomething* #t)
	      (thread-join! th2)
	      )
	    (debug:print 0 "ERROR: Failed to setup for megatest")))
    (exit)))

(define (server:client-signal-handler signum)
  (handle-exceptions
   exn
   (debug:print " ... exiting ...")
   (let ((th1 (make-thread (lambda ()
			     "") ;; do nothing for now (was flush out last call if applicable)







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<


















>
>
|
>
>
>
>
>
>











<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<

<






<

















|
>
>






>
>
|
|
|
>
|
|
|
>
|
|
|
|
<
<
<
<
|
<
<
<
<
<
<
<
|
|
<
<
<
<
<
<
|
<
|
<
<
<
<
<
<
<
|
|
|
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
<
<
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<


|





|
>
|
>
|
<
<
|
<
|
<
<
<
<
<
<
<
<
<
<
|
|







105
106
107
108
109
110
111




























































112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149



































150

151
152
153
154
155
156

157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196




197







198
199






200

201







202
203
204
205


















206


207























208
209
210
211
212
213
214
215
216
217
218
219
220


221

222










223
224
225
226
227
228
229
230
231
						    headers: '((content-type text/plain)))))
				  (else (continue))))))))
    (server:try-start-server ipaddrstr start-port)
    ;; lite3:finalize! db)))
    ))






























































(define (server:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))

;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;; When using zmq this would send the message back (two step process)
;; with spiffy or rpc this simply returns the return data to be returned
;; 
(define (server:reply return-addr query-sig success/fail result)
  (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result)
  ;; (send-message pubsock target send-more: #t)
  ;; (send-message pubsock 
  (case *transport-type*
    ((fs) result)
    ((http)(db:obj->string (vector success/fail query-sig result)))
    ((zmq)
     (send-message pubsock target send-more: #t)
     (send-message pubsock (db:obj->string (vector success/fail query-sig result))))
    (else 
     (debug:print 0 "ERROR: unrecognised transport type: " *transport-type*)
     result)))

;;======================================================================
;; C L I E N T S
;;======================================================================

(define (server:get-client-signature)
  (if *my-client-signature* *my-client-signature*
      (let ((sig (server:mk-signature)))
	(set! *my-client-signature* sig)
	*my-client-signature*)))




































(define (server:client-login serverdat)

  (cdb:login serverdat *toppath* (server:get-client-signature)))

;; Not currently used! But, I think it *should* be used!!!
(define (server:client-logout serverdat)
  (let ((ok (and (socket? serverdat)
		 (cdb:logout serverdat *toppath* (server:get-client-signature)))))

    ok))

(define (server:client-connect iface port)
  (let* ((login-res   #f)
	 (serverdat   (list iface port)))
    (set! login-res (server:client-login serverdat))
    (if (and (not (null? login-res))
	     (car login-res))
	(begin
	  (debug:print-info 2 "Logged in and connected to " iface ":" port)
	  (set! *runremote* serverdat)
	  serverdat)
	(begin
	  (debug:print-info 2 "Failed to login or connect to " iface ":" port)
	  (set! *runremote* #f)
	  #f))))

;; Do all the connection work, look up the transport type and set up the
;; connection if required.
;;
(define (server:client-setup #!key (numtries 50))
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: failed to find megatest.config, exiting")
	    (exit))))
  (debug:print-info 11 "*transport-type* is " *transport-type*)
  (let* ((hostinfo   (if (not *transport-type*) ;; If we dont' already have transport type set then figure it out
			 (open-run-close tasks:get-best-server tasks:open-db)
			 #f)))
    ;; if have hostinfo then extract the transport type 
    ;; else fall back to fs
    (debug:print-info 11 "CLIENT SETUP, hostinfo=" hostinfo)
    (set! *transport-type* (if hostinfo 
			       (string->symbol (tasks:hostinfo-get-transport hostinfo))
			       'fs))
    (debug:print-info 1 "Using transport type of " *transport-type* (if hostinfo (conc " to connect to " hostinfo) ""))
    (case *transport-type* 
    ((fs)(if (not *megatest-db*)(set! *megatest-db* (open-db))))
    ((http)




     (http-transport:client-connect (tasks:hostinfo-get-interface hostinfo)







				    (tasks:hostinfo-get-port hostinfo)))
    ((zmq)






     (zmq-transport:client-connect (tasks:hostinfo-get-interface hostinfo)

				   (tasks:hostinfo-get-port      hostinfo)







				   (tasks:hostinfo-get-pubport   hostinfo)))
    (else  ;; default to fs
     (set! *transport-type* 'fs)
     (set! *megatest-db*    (open-db))))))














































;; all routes though here end in exit ...
(define (server:launch transport)
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, exiting")
	    (exit))))
  (debug:print-info 2 "Starting server using " transport " transport")
  (set! *transport-type* transport)
  (case transport
    ((fs)   (exit)) ;; there is no "fs" transport
    ((http) (http-transport:launch))


    ((zmq)  (zmq-transport:launch))

    (else










     (debug:print "WARNING: unrecognised transport " transport)
     (exit))))

(define (server:client-signal-handler signum)
  (handle-exceptions
   exn
   (debug:print " ... exiting ...")
   (let ((th1 (make-thread (lambda ()
			     "") ;; do nothing for now (was flush out last call if applicable)

Modified tasks.scm from [7e2c4cdfd8] to [6283f820c0].

59
60
61
62
63
64
65

66
67
68
69
70

71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87









88
89
90
91
92
93
94
95
96
97
98
99


100
101
102
103
104
105
106
                                username TEXT,
                               CONSTRAINT monitors_constraint UNIQUE (pid,hostname));")
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY,
                                  pid INTEGER,
                                  interface TEXT,
                                  hostname TEXT,
                                  port INTEGER,

                                  start_time TIMESTAMP,
                                  priority INTEGER,
                                  state TEXT,
                                  mt_version TEXT,
                                  heartbeat TIMESTAMP,

                               CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));")
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY,
                                  server_id INTEGER,
                                  pid INTEGER,
                                  hostname TEXT,
                                  cmdline TEXT,
                                  login_time TIMESTAMP,
                                  logout_time TIMESTAMP DEFAULT -1,
                                CONSTRAINT clients_constraint UNIQUE (pid,hostname));")
                                  
	  ))
    mdb))
    
;;======================================================================
;; Server and client management
;;======================================================================










;; state: 'live, 'shutting-down, 'dead
(define (tasks:server-register mdb pid interface port priority state)
  (debug:print-info 11 "tasks:server-register " pid " " interface " " port " " priority " " state)
  (sqlite3:execute 
   mdb 
   "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface)
                             VALUES(?,  ?,       ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?);"
   pid (get-host-name) port priority (conc state) megatest-version interface)
  (list 
   (tasks:server-get-server-id mdb (get-host-name) interface port pid)
   interface
   port


   ))

;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used!
(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)(action 'markdead))
  (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid)
  (if pid
      (case action







>





>

















>
>
>
>
>
>
>
>
>

|



|
|
|
|



>
>







59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
                                username TEXT,
                               CONSTRAINT monitors_constraint UNIQUE (pid,hostname));")
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY,
                                  pid INTEGER,
                                  interface TEXT,
                                  hostname TEXT,
                                  port INTEGER,
                                  pubport INTEGER,
                                  start_time TIMESTAMP,
                                  priority INTEGER,
                                  state TEXT,
                                  mt_version TEXT,
                                  heartbeat TIMESTAMP,
                                  transport TEXT,
                               CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));")
	  (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY,
                                  server_id INTEGER,
                                  pid INTEGER,
                                  hostname TEXT,
                                  cmdline TEXT,
                                  login_time TIMESTAMP,
                                  logout_time TIMESTAMP DEFAULT -1,
                                CONSTRAINT clients_constraint UNIQUE (pid,hostname));")
                                  
	  ))
    mdb))
    
;;======================================================================
;; Server and client management
;;======================================================================

;; make-vector-record tasks hostinfo id interface port pubport transport pid hostname
(define (tasks:hostinfo-get-id          vec)    (vector-ref  vec 0))
(define (tasks:hostinfo-get-interface   vec)    (vector-ref  vec 1))
(define (tasks:hostinfo-get-port        vec)    (vector-ref  vec 2))
(define (tasks:hostinfo-get-pubport     vec)    (vector-ref  vec 3))
(define (tasks:hostinfo-get-transport   vec)    (vector-ref  vec 4))
(define (tasks:hostinfo-get-pid         vec)    (vector-ref  vec 5))
(define (tasks:hostinfo-get-hostname    vec)    (vector-ref  vec 6))

;; state: 'live, 'shutting-down, 'dead
(define (tasks:server-register mdb pid interface port priority state transport #!key (pubport -1))
  (debug:print-info 11 "tasks:server-register " pid " " interface " " port " " priority " " state)
  (sqlite3:execute 
   mdb 
   "INSERT OR REPLACE INTO servers (pid,hostname,port,pubport,start_time,priority,state,mt_version,heartbeat,interface,transport)
                             VALUES(?,  ?,       ?,   ?,  strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?,?);"
   pid (get-host-name) port pubport priority (conc state) megatest-version interface (conc transport))
  (vector 
   (tasks:server-get-server-id mdb (get-host-name) interface port pid)
   interface
   port
   pubport
   transport
   ))

;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used!
(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)(action 'markdead))
  (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid)
  (if pid
      (case action
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192

;; ping each server in the db and return first found that responds. 
;; remove any others. will not necessarily remove all!
(define (tasks:get-best-server mdb)
  (let ((res '())
	(best #f))
    (sqlite3:for-each-row
     (lambda (id hostname interface port pid)
       (set! res (cons (list hostname interface port pid id) res))
       (debug:print-info 2 "Found existing server " hostname ":" port " registered in db"))
     mdb
     "SELECT id,hostname,interface,port,pid FROM servers
         WHERE strftime('%s','now')-heartbeat < 10
               AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version)
    ;; for now we are keeping only one server registered in the db, return #f or first server found
    (if (null? res) #f (car res))))

;; BUG: This logic is probably needed unless methodology changes completely...
;;







|
|


|







187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205

;; ping each server in the db and return first found that responds. 
;; remove any others. will not necessarily remove all!
(define (tasks:get-best-server mdb)
  (let ((res '())
	(best #f))
    (sqlite3:for-each-row
     (lambda (id interface port pubport transport pid hostname)
       (set! res (cons (vector id interface port pubport transport pid hostname) res))
       (debug:print-info 2 "Found existing server " hostname ":" port " registered in db"))
     mdb
     "SELECT id,interface,port,pubport,transport,pid,hostname FROM servers
         WHERE strftime('%s','now')-heartbeat < 10
               AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version)
    ;; for now we are keeping only one server registered in the db, return #f or first server found
    (if (null? res) #f (car res))))

;; BUG: This logic is probably needed unless methodology changes completely...
;;
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
		(debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname))))))



(define (tasks:get-all-servers mdb)
  (let ((res '()))
    (sqlite3:for-each-row
     (lambda (id pid hostname interface port start-time priority state mt-version last-update)
       (set! res (cons (vector id pid hostname interface port start-time priority state mt-version last-update) res)))
     mdb
     "SELECT id,pid,hostname,interface,port,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update FROM servers ORDER BY start_time DESC;")
    res))
       

;;======================================================================
;; Tasks and Task monitors
;;======================================================================








|
|

|







264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
		(debug:print 0 "WARNING: Can't kill frozen server on remote host " hostname))))))



(define (tasks:get-all-servers mdb)
  (let ((res '()))
    (sqlite3:for-each-row
     (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport)
       (set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport) res)))
     mdb
     "SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport FROM servers ORDER BY start_time DESC;")
    res))
       

;;======================================================================
;; Tasks and Task monitors
;;======================================================================

Added zmq-transport.scm version [84997153bf].























































































































































































































































































































































































































































































































































































































































































































































































































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475

;; Copyright 2006-2012, Matthew Welland.
;; 
;;  This program is made available under the GNU GPL version 2.0 or
;;  greater. See the accompanying file COPYING for details.
;; 
;;  This program is distributed WITHOUT ANY WARRANTY; without even the
;;  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;;  PURPOSE.

(require-extension (srfi 18) extras tcp s11n)

(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))

(use zmq)

(declare (unit zmq-transport))

(declare (uses common))
(declare (uses db))
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
(declare (uses server))

(include "common_records.scm")
(include "db_records.scm")

;; Transition to pub --> sub with pull <-- push
;;
;;   1. client sends request to server via push to the pull port
;;   2. server puts request in queue or processes immediately as appropriate
;;   3. server puts responses from completed requests into pub port 
;;
;; TODO
;;
;; Done Tested
;; [x]  [ ]    1. Add columns pullport pubport to servers table
;; [x]  [ ]    2. Add rm of monitor.db if older than 11/12/2012 
;; [x]  [ ]    3. Add create of pullport and pubport with finding of available ports
;; [x]  [ ]    4. Add client compose of request
;; [x]  [ ]        - name of client: testname/itempath-test_id-hostname 
;; [x]  [ ]        - name of request: callname, params
;; [x]  [ ]        - request key: f(clientname, callname, params)
;; [x]  [ ]    5. Add processing of subscription hits
;; [x]  [ ]        - done when get key 
;; [x]  [ ]        - return results
;; [x]  [ ]    6. Add timeout processing
;; [x]  [ ]        - after 60 seconds
;; [ ]  [ ]            i. check server alive, connect to new if necessary
;; [ ]  [ ]           ii. resend request
;; [ ]  [ ]    7. Turn self ping back on

(define (zmq-transport:make-server-url hostport)
  (if (not hostport)
      #f
      (conc "tcp://" (car hostport) ":" (cadr hostport))))

(define  *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))

;;======================================================================
;; S E R V E R
;;======================================================================

(define-inline (zmqsock:get-pub  dat)(vector-ref dat 0))
(define-inline (zmqsock:get-pull dat)(vector-ref dat 1))
(define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0))
(define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0))

(define (zmq-transport:run hostn)
  (debug:print 2 "Attempting to start the server ...")
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
	    (exit))))
  (let* ((zmq-sdat1       #f)
	 (zmq-sdat2       #f)
	 (pull-socket     #f)
	 (pub-socket      #f)
	 (p1              #f)
	 (p2              #f)
	 (zmq-sockets-dat #f)
	 (iface           (if (string=? "-" hostn)
			      "*" ;; (get-host-name) 
			      hostn))
	 (hostname        (get-host-name))
	 (ipaddrstr       (let ((ipstr (if (string=? "-" hostn)
					   (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					   #f)))
			    (if ipstr ipstr hostname)))
	 (last-run       0))
    (set! zmq-sockets-dat (zmq-transport:setup-ports ipaddrstr (if (args:get-arg "-port")
			    (string->number (args:get-arg "-port"))
							    (+ 5000 (random 1001)))))

    (set! zmq-sdat1    (car   zmq-sockets-dat))
    (set! pull-socket  (cadr  zmq-sdat1)) ;; (iface s  port)
    (set! p1           (caddr zmq-sdat1))
    
    (set! zmq-sdat2    (cadr  zmq-sockets-dat))
    (set! pub-socket   (cadr  zmq-sdat2))
    (set! p2           (caddr zmq-sdat2))

    (set! *cache-on* #t)

    ;; what to do when we quit
    ;;
;;     (on-exit (lambda ()
;; 	       (if (and *toppath* *server-info*)
;; 		   (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*))
;; 		   (let loop () 
;; 		     (let ((queue-len 0))
;; 		       (thread-sleep! (random 5))
;; 		       (mutex-lock! *incoming-mutex*)
;; 		       (set! queue-len (length *incoming-data*))
;; 		       (mutex-unlock! *incoming-mutex*)
;; 		       (if (> queue-len 0)
;; 			   (begin
;; 			     (debug:print-info 0 "Queue not flushed, waiting ...")
;; 			     (loop))))))))

    ;; The heavy lifting
    ;;
    ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime
    ;;
    (let loop ((queue-lst '()))
      (let* ((rawmsg (receive-message* pull-socket))
	     (packet (db:string->obj rawmsg))
					  (qtype  (cdb:packet-get-qtype packet)))
				     (debug:print-info 12 "server=> received packet=" packet)
				     (if (not (member qtype '(sync ping)))
					 (begin
					   (mutex-lock! *heartbeat-mutex*)
					   (set! *last-db-access* (current-seconds))
					   (mutex-unlock! *heartbeat-mutex*)))
	(if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue
	    (begin
	      (open-run-close db:process-queue #f pub-socket (cons packet queue-lst))
	      (loop '()))
	    (loop (cons packet queue-lst)))))))

;; run zmq-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (zmq-transport:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
			(let ((sdat #f))
			  (mutex-lock! *heartbeat-mutex*)
			  (set! sdat *server-info*)
			  (mutex-unlock! *heartbeat-mutex*)
			  (if sdat sdat
			      (begin
				(sleep 4)
				(loop))))))
	 (iface       (cadr server-info))
	 (pullport    (caddr server-info))
	 (pubport     (cadddr server-info)) ;; id interface pullport pubport)
	 (zmq-sockets (zmq-transport:client-connect iface pullport pubport))
	 (last-access 0))
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      ;; NB// sync currently does NOT return queue-length
      (let ((queue-len (cdb:client-call zmq-sockets 'sync #t 1)))
      ;; (print "Server running, count is " count)
	(if (< count 1) ;; 3x3 = 9 secs aprox
	    (loop (+ count 1)))

	;; NOTE: Get rid of this mechanism! It really is not needed...
	(open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))

	;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
	(mutex-lock! *heartbeat-mutex*)
	(set! last-access *last-db-access*)
	(mutex-unlock! *heartbeat-mutex*)
	(if (> (+ last-access
		  ;; (* 50 60 60)    ;; 48 hrs
		  ;; 60              ;; one minute
		  ;; (* 60 60)       ;; one hour
		  (* 45 60)          ;; 45 minutes, until the db deletion bug is fixed.
		  )
	       (current-seconds))
	    (begin
	      (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
	      (loop 0))
	    (begin
	      (debug:print-info 0 "Starting to shutdown the server.")
	      ;; need to delete only *my* server entry (future use)
	      (set! *time-to-exit* #t)
	      (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name))
	      (thread-sleep! 1)
	      (debug:print-info 0 "Max cached queries was " *max-cache-size*)
	      (debug:print-info 0 "Server shutdown complete. Exiting")
	      (exit)))))))

(define (zmq-transport:find-free-port-and-open iface s port stype #!key (trynum 50))
  (let ((s (if s s (make-socket stype)))
        (p (if (number? port) port 5555))
        (old-handler (current-exception-handler)))
    (handle-exceptions
     exn
     (begin
       (debug:print 0 "Failed to bind to port " p ", trying next port")
       (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
       ;; (old-handler)
       ;; (print-call-chain)
       (if (> trynum 0)
           (zmq-transport:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1))
           (debug:print-info 0 "Tried ports up to " p 
                             " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use"))
       (exit)) ;; To exit or not? That is the question.
     (let ((zmq-url (conc "tcp://" iface ":" p)))
       (debug:print 2 "Trying to start server on " zmq-url)
       (bind-socket s zmq-url)
       (list iface s port)))))

(define (zmq-transport:setup-ports ipaddrstr startport)
  (let* ((s1 (zmq-transport:find-free-port-and-open ipaddrstr #f startport 'pull))
         (p1 (caddr s1))
         (s2 (zmq-transport:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub))
         (p2 (caddr s2)))
    (set! *runremote* #f)
    (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2)
    (mutex-lock! *heartbeat-mutex*)
    (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live 'zmq))
    (mutex-unlock! *heartbeat-mutex*)
    (list s1 s2)))

(define (zmq-transport:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))

;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;;======================================================================
;; C L I E N T S
;;======================================================================

;; 
(define (zmq-transport:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '()))
  (debug:print-info 3 "client-connect " iface ":" port ", type=" type ", subscriptions=" subscriptions)
  (let ((connect-ok #f)
	(zmq-socket (if context 
			(make-socket type context)
			(make-socket type)))
	(conurl     (zmq-transport:make-server-url (list iface port))))
    (if (socket? zmq-socket)
     (begin
	  ;; first apply subscriptions
	  (for-each (lambda (subscription)
		      (debug:print 2 "Subscribing to " subscription)
		      (socket-option-set! zmq-socket 'subscribe subscription))
		    subscriptions)
	  (connect-socket zmq-socket conurl)
	  zmq-socket)
	(begin
	  (debug:print 0 "ERROR: Failed to open socket to " conurl)
	  #f))))

(define (zmq-transport:client-connect iface pullport pubport)
  (let* ((push-socket (zmq-transport:client-socket-connect iface pullport type: 'push))
	 (sub-socket  (zmq-transport:client-socket-connect iface pubport
						    type: 'sub
						    subscriptions: (list (zmq-transport:get-client-signature) "all")))
	 (zmq-sockets (vector push-socket sub-socket))
	 (login-res   #f))
    (set! login-res (zmq-transport:client-login zmq-sockets))
    (if (and (not (null? login-res))
	     (car login-res))
	(begin
	  (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".")
	  (set! *runremote* zmq-sockets)
	  zmq-sockets)
	(begin
	  (debug:print-info 2 "Failed to login or connect to " conurl)
	  (set! *runremote* #f)
	  #f))))

;; run zmq-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (zmq-transport:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
                        (let ((sdat #f))
                          (mutex-lock! *heartbeat-mutex*)
                          (set! sdat *runremote*)
                          (mutex-unlock! *heartbeat-mutex*)
                          (if sdat sdat
                              (begin
                                (sleep 4)
                                (loop))))))
         (iface       (car server-info))
         (port        (cadr server-info))
         (last-access 0)
	 (tdb         (tasks:open-db))
	 (spid        (tasks:server-get-server-id tdb #f iface port #f)))
    (print "Keep-running got server pid " spid ", using iface " iface " and port " port)
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      ;; NB// sync currently does NOT return queue-length
      (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1)))
      ;; (print "Server running, count is " count)
        (if (< count 1) ;; 3x3 = 9 secs aprox
            (loop (+ count 1)))
        
        ;; NOTE: Get rid of this mechanism! It really is not needed...
        (tasks:server-update-heartbeat tdb spid)
      
        ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
        (mutex-lock! *heartbeat-mutex*)
        (set! last-access *last-db-access*)
        (mutex-unlock! *heartbeat-mutex*)
        (if (> (+ last-access
                  ;; (* 50 60 60)    ;; 48 hrs
                  ;; 60              ;; one minute
                  ;; (* 60 60)       ;; one hour
                  (* 45 60)          ;; 45 minutes, until the db deletion bug is fixed.
                  )
               (current-seconds))
            (begin
              (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
              (loop 0))
            (begin
              (debug:print-info 0 "Starting to shutdown the server.")
              ;; need to delete only *my* server entry (future use)
              (set! *time-to-exit* #t)
              (tasks:server-deregister-self tdb (get-host-name))
              (thread-sleep! 1)
              (debug:print-info 0 "Max cached queries was " *max-cache-size*)
              (debug:print-info 0 "Server shutdown complete. Exiting")
              (exit)))))))

;; all routes though here end in exit ...
(define (zmq-transport:launch)
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, exiting")
	    (exit))))
  (debug:print-info 2 "Starting zmq server")
  (if *toppath* 
      (let* (;; (th1 (make-thread (lambda ()
	     ;;      	       (let ((server-info #f))
	     ;;      		 ;; wait for the server to be online and available
	     ;;      		 (let loop ()
	     ;;			   (debug:print-info 2 "Waiting for the server to come online before starting heartbeat")
	     ;;      		   (thread-sleep! 2)
	     ;;      		   (mutex-lock! *heartbeat-mutex*)
	     ;;      		   (set! server-info *server-info* )
	     ;;      		   (mutex-unlock! *heartbeat-mutex*)
	     ;;      		   (if (not server-info)(loop)))
	     ;;			 (debug:print 2 "Server alive, starting self-ping")
	     ;;      		 (zmq-transport:self-ping server-info)
	     ;;      		 ))
	     ;;      	     "Self ping"))
	     (th2 (make-thread (lambda ()
				 (zmq-transport:run 
				  (if (args:get-arg "-server")
				      (args:get-arg "-server")
				      "-"))) "Server run"))
	     (th3 (make-thread (lambda ()(zmq-transport:keep-running)) "Keep running"))
	     )
	(set! *client-non-blocking-mode* #t)
	;; (thread-start! th1)
	(thread-start! th2)
	(thread-start! th3)
	(set! *didsomething* #t)
	;; (thread-join! th3)
	(thread-join! th2)
	)
      (debug:print 0 "ERROR: Failed to setup for megatest")))

(define (zmq-transport:client-signal-handler signum)
  (handle-exceptions
   exn
   (debug:print " ... exiting ...")
   (let ((th1 (make-thread (lambda ()
			     (if (not *received-response*)
				 (receive-message* *runremote*))) ;; flush out last call if applicable
			   "eat response"))
	 (th2 (make-thread (lambda ()
			     (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.")
			     (thread-sleep! 3) ;; give the flush three seconds to do it's stuff
			     (debug:print 0 "       Done.")
			     (exit 4))
			   "exit on ^C timer")))
     (thread-start! th2)
     (thread-start! th1)
     (thread-join! th2))))

(define (zmq-transport:client-launch)
  (set-signal-handler! signal/int zmq-transport:client-signal-handler)
   (if (zmq-transport:client-setup)
       (debug:print-info 2 "connected as client")
       (begin
	 (debug:print 0 "ERROR: Failed to connect as client")
	 (exit))))

;;======================================================================
;; Defunct functions
;;======================================================================

;; ping a server and return number of clients or #f (if no response)
;; NOT IN USE!
(define (zmq-transport:ping host port #!key (secs 10)(return-socket #f))
  (cdb:use-non-blocking-mode
   (lambda ()
     (let* ((res #f)
	    (th1 (make-thread
		  (lambda ()
		    (let* ((zmq-context (make-context 1))
			   (zmq-socket  (zmq-transport:client-connect host port context: zmq-context)))
		      (if zmq-socket
			  (if (zmq-transport:client-login zmq-socket)
			      (let ((numclients (cdb:num-clients zmq-socket)))
				(if (not return-socket)
				    (begin
				      (zmq-transport:client-logout zmq-socket)
				      (close-socket  zmq-socket)))
				(set! res (list #t numclients (if return-socket zmq-socket #f))))
			      (begin
				;; (close-socket zmq-socket)
				(set! res (list #f "CAN'T LOGIN" #f))))
			  (set! res (list #f "CAN'T CONNECT" #f)))))
		  "Ping: th1"))
	    (th2 (make-thread
		  (lambda ()
		    (let loop ((count 1))
		      (debug:print-info 1 "Ping " count " server on " host " at port " port)
		      (thread-sleep! 2)
		      (if (< count (/ secs 2))
			  (loop (+ count 1))))
		    ;; (thread-terminate! th1)
		    (set! res (list #f "TIMED OUT" #f)))
		  "Ping: th2")))
       (thread-start! th2)
       (thread-start! th1)
       (handle-exceptions
	exn
	(set! res (list #f "TIMED OUT" #f))
	(thread-join! th1 secs))
       res))))

;; (define (zmq-transport:self-ping server-info)
;;   ;; server-info: server-id interface pullport pubport
;;   (let ((iface    (list-ref server-info 1))
;; 	(pullport (list-ref server-info 2))
;; 	(pubport  (list-ref server-info 3)))
;;     (zmq-transport:client-connect iface pullport pubport)
;;     (let loop ()
;;       (thread-sleep! 2)
;;       (cdb:client-call *runremote* 'ping #t)
;;       (debug:print 4 "zmq-transport:self-ping - I'm alive on " iface ":" pullport "/" pubport "!")
;;       (mutex-lock! *heartbeat-mutex*)
;;       (set! *server-loop-heart-beat* (current-seconds))
;;       (mutex-unlock! *heartbeat-mutex*)
;;       (loop))))
    
(define (zmq-transport:reply pubsock target query-sig success/fail result)
  (debug:print-info 11 "zmq-transport:reply target=" target ", result=" result)
  (send-message pubsock target send-more: #t)
  (send-message pubsock (db:obj->string (vector success/fail query-sig result))))