Megatest

Changes On Branch 120292c013782eb3
Login

Changes In Branch try-nanomsg Through [120292c013] Excluding Merge-Ins

This is equivalent to a diff from 738756b239 to 120292c013

2014-11-24
12:45
Fixed import-megatest.db bug check-in: 398c48390d user: mrwellan tags: v1.60, v1.6006
2014-11-23
20:56
Basic server functioning, responding to ping and login check-in: eab23b866a user: matt tags: try-nanomsg
18:14
Partial implementation. Not yet functional check-in: 120292c013 user: matt tags: try-nanomsg
2014-11-22
12:53
try nanomsg. check-in: 45e51acb2a user: matt tags: try-nanomsg
2014-11-21
13:18
Better err msg check-in: 738756b239 user: mrwellan tags: v1.60
10:56
Added layer of exception handling inside db:with-db check-in: 171838c893 user: mrwellan tags: v1.60

Modified Makefile from [64fd867d54] to [4886dc652e].

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# make install CSCOPTS='-accumulate-profile -profile-name $(PWD)/profile-ww$(shell date +%V.%u)'
PREFIX=$(PWD)
CSCOPTS= 
INSTALL=install
SRCFILES = common.scm items.scm launch.scm \
           ods.scm runconfig.scm server.scm configf.scm \
           db.scm keys.scm margs.scm megatest-version.scm \
           process.scm runs.scm tasks.scm tests.scm genexample.scm \
	   http-transport.scm filedb.scm \
           client.scm gutils.scm synchash.scm daemon.scm mt.scm dcommon.scm \
	   tree.scm ezsteps.scm lock-queue.scm sdb.scm \
	   rmt.scm api.scm tdb.scm portlogger.scm

# Eggs to install (straightforward ones)
EGGS=matchable readline apropos base64 regex-literals format regex-case test coops trace csv \
     dot-locking posix-utils posix-extras directory-utils hostinfo tcp-server rpc csv-xml fmt \








|







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# make install CSCOPTS='-accumulate-profile -profile-name $(PWD)/profile-ww$(shell date +%V.%u)'
PREFIX=$(PWD)
CSCOPTS= 
INSTALL=install
SRCFILES = common.scm items.scm launch.scm \
           ods.scm runconfig.scm server.scm configf.scm \
           db.scm keys.scm margs.scm megatest-version.scm \
           process.scm runs.scm tasks.scm tests.scm genexample.scm \
	   http-transport.scm nmsg-transport.scm filedb.scm \
           client.scm gutils.scm synchash.scm daemon.scm mt.scm dcommon.scm \
	   tree.scm ezsteps.scm lock-queue.scm sdb.scm \
	   rmt.scm api.scm tdb.scm portlogger.scm

# Eggs to install (straightforward ones)
EGGS=matchable readline apropos base64 regex-literals format regex-case test coops trace csv \
     dot-locking posix-utils posix-extras directory-utils hostinfo tcp-server rpc csv-xml fmt \

Modified api.scm from [a688529701] to [0d03b6cbef].

119
120
121
122
123
124
125

126
127
128
129
130
131
132
					  (run-id     (cadr params))
					  (realparams (cddr params)))
				      (db:with-db dbstruct run-id #t ;; these are all for modifying the db
						  (lambda (db)
						    (db:general-call db stmtname realparams)))))
    ((sync-inmem->db)               (db:sync-touched dbstruct run-id force-sync: #t))
    ((sdb-qry)                      (apply sdb:qry params))


    ;; TESTMETA
    ((testmeta-get-record)       (apply db:testmeta-get-record dbstruct params))
    ((testmeta-add-record)       (apply db:testmeta-add-record dbstruct params))
    ((testmeta-update-field)     (apply db:testmeta-update-field dbstruct params))
    (else
     (list "ERROR" 0))))







>







119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
					  (run-id     (cadr params))
					  (realparams (cddr params)))
				      (db:with-db dbstruct run-id #t ;; these are all for modifying the db
						  (lambda (db)
						    (db:general-call db stmtname realparams)))))
    ((sync-inmem->db)               (db:sync-touched dbstruct run-id force-sync: #t))
    ((sdb-qry)                      (apply sdb:qry params))
    ((ping)                         (current-process-id))

    ;; TESTMETA
    ((testmeta-get-record)       (apply db:testmeta-get-record dbstruct params))
    ((testmeta-add-record)       (apply db:testmeta-add-record dbstruct params))
    ((testmeta-update-field)     (apply db:testmeta-update-field dbstruct params))
    (else
     (list "ERROR" 0))))

Modified common.scm from [aa6e9ff977] to [96d40f50d8].

63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
(define *inmemdb*             #f)
(define *task-db*             #f) ;; (vector db path-to-db)
(define *db-access-allowed*   #t) ;; flag to allow access
(define *db-access-mutex*     (make-mutex))

;; SERVER
(define *my-client-signature* #f)
(define *transport-type*    'http)
(define *rpc:listener*      #f) ;; if set up for server communication this will hold the tcp port
(define *runremote*         (make-hash-table)) ;; if set up for server communication this will hold <host port>
(define *max-cache-size*    0)
(define *logged-in-clients* (make-hash-table))
(define *client-non-blocking-mode* #f)
(define *server-id*         #f)
(define *server-info*       #f)
(define *time-to-exit*      #f)







|
<







63
64
65
66
67
68
69
70

71
72
73
74
75
76
77
(define *inmemdb*             #f)
(define *task-db*             #f) ;; (vector db path-to-db)
(define *db-access-allowed*   #t) ;; flag to allow access
(define *db-access-mutex*     (make-mutex))

;; SERVER
(define *my-client-signature* #f)
(define *transport-type*    'nm)

(define *runremote*         (make-hash-table)) ;; if set up for server communication this will hold <host port>
(define *max-cache-size*    0)
(define *logged-in-clients* (make-hash-table))
(define *client-non-blocking-mode* #f)
(define *server-id*         #f)
(define *server-info*       #f)
(define *time-to-exit*      #f)

Modified db.scm from [d3e0e30a50] to [930b139284].

2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
       res))))

;;======================================================================
;; QUEUE UP META, TEST STATUS AND STEPS REMOTE ACCESS
;;======================================================================

;; NOTE: Can remove the regex and base64 encoding for zmq
(define (db:obj->string obj)
  (case *transport-type*
    ;; ((fs) obj)
    ((http fs)
     (string-substitute
      (regexp "=") "_"
      (base64:base64-encode 
       (z3:encode-buffer
	(with-output-to-string
	  (lambda ()(serialize obj)))))
      #t))
    ((zmq)(with-output-to-string (lambda ()(serialize obj))))
    (else obj)))

(define (db:string->obj msg)
  (case *transport-type*
    ;; ((fs) msg)
    ((http fs)
     (if (string? msg)
	 (with-input-from-string 
	     (z3:decode-buffer
	      (base64:base64-decode
	       (string-substitute 
		(regexp "_") "=" msg #t)))
	   (lambda ()(deserialize)))
	 (begin
	   (debug:print 0 "ERROR: reception failed. Received " msg " but cannot translate it.")
	   #f))) ;; crude reply for when things go awry
    ((zmq)(with-input-from-string msg (lambda ()(deserialize))))
    (else msg)))

(define (db:test-set-status-state dbstruct run-id test-id status state msg)
  (let ((dbdat  (db:get-db dbstruct run-id)))
    (if (member state '("LAUNCHED" "REMOTEHOSTSTART"))
	(db:general-call dbdat 'set-test-start-time (list test-id)))
    (if msg







|
|









|


|
|












|







2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
       res))))

;;======================================================================
;; QUEUE UP META, TEST STATUS AND STEPS REMOTE ACCESS
;;======================================================================

;; NOTE: Can remove the regex and base64 encoding for zmq
(define (db:obj->string obj #!key (transport 'http))
  (case transport
    ;; ((fs) obj)
    ((http fs)
     (string-substitute
      (regexp "=") "_"
      (base64:base64-encode 
       (z3:encode-buffer
	(with-output-to-string
	  (lambda ()(serialize obj)))))
      #t))
    ((zmq nm)(with-output-to-string (lambda ()(serialize obj))))
    (else obj)))

(define (db:string->obj msg #!key (transport 'http))
  (case transport
    ;; ((fs) msg)
    ((http fs)
     (if (string? msg)
	 (with-input-from-string 
	     (z3:decode-buffer
	      (base64:base64-decode
	       (string-substitute 
		(regexp "_") "=" msg #t)))
	   (lambda ()(deserialize)))
	 (begin
	   (debug:print 0 "ERROR: reception failed. Received " msg " but cannot translate it.")
	   #f))) ;; crude reply for when things go awry
    ((zmq nm)(with-input-from-string msg (lambda ()(deserialize))))
    (else msg)))

(define (db:test-set-status-state dbstruct run-id test-id status state msg)
  (let ((dbdat  (db:get-db dbstruct run-id)))
    (if (member state '("LAUNCHED" "REMOTEHOSTSTART"))
	(db:general-call dbdat 'set-test-start-time (list test-id)))
    (if msg

Added nmsg-transport.scm version [2f563bf500].













































































































































































































































































































































































































































































































































































































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374

;; Copyright 2006-2012, Matthew Welland.
;; 
;;  This program is made available under the GNU GPL version 2.0 or
;;  greater. See the accompanying file COPYING for details.
;; 
;;  This program is distributed WITHOUT ANY WARRANTY; without even the
;;  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;;  PURPOSE.

(require-extension (srfi 18) extras tcp s11n)

(use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest)
(import (prefix sqlite3 sqlite3:))

(use nanomsg)

(declare (unit nmsg-transport))

(declare (uses common))
(declare (uses db))
(declare (uses tests))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
(declare (uses server))

(include "common_records.scm")
(include "db_records.scm")

;; Transition to pub --> sub with pull <-- push
;;
;;   1. client sends request to server via push to the pull port
;;   2. server puts request in queue or processes immediately as appropriate
;;   3. server puts responses from completed requests into pub port 
;;
;; TODO
;;
;; Done Tested
;; [x]  [ ]    1. Add columns pullport pubport to servers table
;; [x]  [ ]    2. Add rm of monitor.db if older than 11/12/2012 
;; [x]  [ ]    3. Add create of pullport and pubport with finding of available ports
;; [x]  [ ]    4. Add client compose of request
;; [x]  [ ]        - name of client: testname/itempath-test_id-hostname 
;; [x]  [ ]        - name of request: callname, params
;; [x]  [ ]        - request key: f(clientname, callname, params)
;; [x]  [ ]    5. Add processing of subscription hits
;; [x]  [ ]        - done when get key 
;; [x]  [ ]        - return results
;; [x]  [ ]    6. Add timeout processing
;; [x]  [ ]        - after 60 seconds
;; [ ]  [ ]            i. check server alive, connect to new if necessary
;; [ ]  [ ]           ii. resend request
;; [ ]  [ ]    7. Turn self ping back on

(define (nmsg-transport:make-server-url hostport #!key (bindall #f))
  (if (not hostport)
      #f
      (conc "tcp://" (if bindall "*" (car hostport)) ":" (cadr hostport))))

(define *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))

;;======================================================================
;; S E R V E R
;;======================================================================

(define (nmsg-transport:run dbstruct hostn run-id server-id)
  (debug:print 2 "Attempting to start the server ...")
  (let* ((start-port      (portlogger:open-run-close portlogger:find-port))
	 (server-thread   (make-thread (lambda ()
					 (nmsg-transport:try-start-server dbstruct run-id start-port server-id))
				       "server thread"))
	 (tdbdat          (tasks:open-db)))
    (thread-start! server-thread)
    (if (nmsg-transport:ping hostn start-port)
	(begin
	  (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep")
	  (set! *server-info* (list hostn start-port)) ;; probably not needed anymore?
	  (thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access
	  (set! *inmemdb*  dbstruct)
	  (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running")
	  (thread-start! nmsg-transport:keep-running)
	  (thread-join! server-thread))
	(begin
	  (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature")
	  (portlogger:open-run-close portlogger:set-failed start-port)
	  (nmsg-transport:run dbstruct hostn run-id server-id)))))

(define (nmsg-transport:try-start-server dbstruct run-id portnum server-id)
  (let ((repsoc (nn-socket 'rep)))
    (nn-bind repsoc (conc "tcp://*:" portnum))
    (let loop ((msg-in (nn-recv repsoc)))
      (cond
       ((equal? msg-in "quit")
	(nn-send repsoc "Ok, quitting"))
       ((and (>= (string-length msg-in) 4)
	     (equal? (substring msg-in 0 4) "ping"))
	(nn-send repsoc (conc (current-process-id)))
	(loop (nn-recv repsoc)))
       (else
	(let* ((dat    (db:string->obj msg-in transport: 'nm))
	       (cmd    (vector-ref dat 0))
	       (params (vector-ref dat 1))
	       (result (api:execute-requests dbstruct cmd params))
	       (newdat (db:obj->string result transport: 'nm)))
	  (nn-send repsoc newdat)
	  (loop (nn-recv repsoc))))))))

;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (nmsg-transport:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
			(let ((sdat #f))
			  (mutex-lock! *heartbeat-mutex*)
			  (set! sdat *server-info*)
			  (mutex-unlock! *heartbeat-mutex*)
			  (if sdat sdat
			      (begin
				(debug:print 12 "WARNING: server not started yet, waiting few seconds before trying again")
				(sleep 4)
				(loop))))))
	 (iface       (cadr server-info))
	 (pullport    (caddr server-info))
	 (pubport     (cadddr server-info)) ;; id interface pullport pubport)
	 ;; (nmsg-sockets (nmsg-transport:client-connect iface pullport pubport))
	 (last-access 0))
    (debug:print-info 11 "heartbeat started for nmsg server on " iface " " pullport " " pubport)
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      ;; NB// sync currently does NOT return queue-length
      ;; GET REAL QUEUE LENGTH FROM THE VARIABLE
      (let ((queue-len 0)) ;; FOR NOW DO NOT DO THIS (cdb:client-call nmsg-sockets 'sync #t 1)))
      ;; (print "Server running, count is " count)
	(if (< count 1) ;; 3x3 = 9 secs aprox
	    (loop (+ count 1)))

	;; NOTE: Get rid of this mechanism! It really is not needed...
	;; (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))

	;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
	(mutex-lock! *heartbeat-mutex*)
	(set! last-access *last-db-access*)
	(mutex-unlock! *heartbeat-mutex*)
	(if (> (+ last-access
		  ;; (* 50 60 60)    ;; 48 hrs
		  ;; 60              ;; one minute
		  ;; (* 60 60)       ;; one hour
		  (* 45 60)          ;; 45 minutes, until the db deletion bug is fixed.
		  )
	       (current-seconds))
	    (begin
	      (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
	      (loop 0))
	    (begin
	      (debug:print-info 0 "Starting to shutdown the server.")
	      ;; need to delete only *my* server entry (future use)
	      (set! *time-to-exit* #t)
	      (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name))
	      (thread-sleep! 1)
	      (debug:print-info 0 "Max cached queries was " *max-cache-size*)
	      (debug:print-info 0 "Server shutdown complete. Exiting")
	      (exit)))))))

;; all routes though here end in exit ...
;;
(define (nmsg-transport:launch run-id)
  (let* ((tdbdat   (tasks:open-db))
	 (dbstruct (db:setup run-id))
	 (hostn    (or (args:get-arg "-server") "-")))
    (set! *run-id*   run-id)
    ;; with nbfake daemonize isn't really needed
    ;;
    ;; (if (args:get-arg "-daemonize")
    ;;     (begin
    ;;       (daemon:ize)
    ;;       (if *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it
    ;;           (begin
    ;;     	(current-error-port *alt-log-file*)
    ;;     	(current-output-port *alt-log-file*)))))
    (if (server:check-if-running run-id)
	(begin
	  (debug:print-info 0 "Server for run-id " run-id " already running")
	  (exit 0)))
    (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id))
	       (remtries  4))
      (if (not server-id)
	  (if (> remtries 0)
	      (begin
		(thread-sleep! 2)
		(if (not (server:check-if-running run-id))
		    (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)
			  (- remtries 1))
		    (begin
		      (debug:print-info 0 "Another server took the slot, exiting")
		      (exit 0))))
	      (begin
		;; since we didn't get the server lock we are going to clean up and bail out
		(debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
		(tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch")
		)))
      (nmsg-transport:run dbstruct hostn run-id server-id)
      (set! *didsomething* #t)
      (exit))))

;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

(define (nmsg-transport:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))

;;======================================================================
;; C L I E N T S
;;======================================================================

;; ping the server at host:port
;;   return the open socket if successful (return-socket == #t)
;;   expect the key expected-key returned in payload
;;   send our-key or #f as payload
;;
(define (nmsg-transport:ping hostn port #!key (return-socket #t)(expected-key #f)(our-key #f))
  ;; send a random number along with pid and check that we get it back
  (let* ((req     (nn-socket 'req))
	 (host    (if (or (not hostn)
			  (equal? hostn "-")) ;; use localhost
		      (get-host-name)
		      hostn))
	 (success #f)
	 (keepwaiting #t)
	 (dat     (db:obj->string (vector "ping" our-key) transport: 'nm))
	 (ping    (make-thread
		   (lambda ()
		     (nn-send req dat)
		     (let* ((result  (nn-recv req))
			    (key     (vector-ref (db:string->obj result transport: 'nm) 1)))
		       (if (or (not expect-key) ;; just getting a reply is good enough then
			       (equal? (conc (current-process-id)) expected-key)) 
			   (begin
			     ;; (print "ping, success: received \"" result "\"")
			     (set! success #t))
			   (begin
			     ;; (print "ping, failed: received key \"" result "\"")
			     (set! keepwaiting #f)
			     (set! success #f)))))
		   "ping"))
	 (timeout (make-thread (lambda ()
				 (let loop ((count 0))
				   (thread-sleep! 1)
				   (print "still waiting after count seconds...")
				   (if (and keepwaiting (< count 10))
				       (loop (+ count 1))))
				 (if keepwaiting
				     (begin
				       (print "timeout waiting for ping")
				       (thread-terminate! ping))))
			       "timeout")))
    (nn-connect req (conc "tcp://" host ":" port))
    (handle-exceptions
     exn
     (begin
       (print-call-chain)
       (print 0 " message: " ((condition-property-accessor 'exn 'message) exn))
       (print "exn=" (condition->list exn))
       (print "ping failed to connect to " host ":" port))
     (thread-start! timeout)
     (thread-start! ping)
     (thread-join! ping)
     (if success (thread-terminate! timeout)))
    (if return-socket
	(if success req #f)
	(begin
	  (nn-close req)
	  success))))

(define (nmsg-transport:client-connect iface portnum)
  (let* ((reqsoc      (nmsg-transport:ping iface portnum))
	 (login-res   #f))
    (nn-connect reqsoc (conc "tcp://" iface ":" portnum))
    (debug:print-info 11 "nmsg-transport:client-connect started. Next is login")
    (set! login-res (client:login serverdat nmsg-sockets))
    (if (and (not (null? login-res))
	     (car login-res))
	(begin
	  (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".")
	  (set!  *nm-port* nmsg-sockets)
	  nmsg-sockets)
	(begin
	  (debug:print-info 2 "Failed to login or connect to " conurl)
	  (set! *runremote* #f)
	  #f))))

;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (nmsg-transport:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
                        (let ((sdat #f))
                          (mutex-lock! *heartbeat-mutex*)
                          (set! sdat *runremote*)
                          (mutex-unlock! *heartbeat-mutex*)
                          (if sdat sdat
                              (begin
                                (sleep 4)
                                (loop))))))
         (iface       (car server-info))
         (port        (cadr server-info))
         (last-access 0)
	 (tdb         (tasks:open-db))
	 (spid        (tasks:server-get-server-id tdb #f iface port #f)))
    (print "Keep-running got server pid " spid ", using iface " iface " and port " port)
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      ;; NB// sync currently does NOT return queue-length
      (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1)))
      ;; (print "Server running, count is " count)
        (if (< count 1) ;; 3x3 = 9 secs aprox
            (loop (+ count 1)))
        
        ;; NOTE: Get rid of this mechanism! It really is not needed...
        (tasks:server-update-heartbeat tdb spid)
      
        ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
        (mutex-lock! *heartbeat-mutex*)
        (set! last-access *last-db-access*)
        (mutex-unlock! *heartbeat-mutex*)
        (if (> (+ last-access
                  ;; (* 50 60 60)    ;; 48 hrs
                  ;; 60              ;; one minute
                  ;; (* 60 60)       ;; one hour
                  (* 45 60)          ;; 45 minutes, until the db deletion bug is fixed.
                  )
               (current-seconds))
            (begin
              (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
              (loop 0))
            (begin
              (debug:print-info 0 "Starting to shutdown the server.")
              ;; need to delete only *my* server entry (future use)
              (set! *time-to-exit* #t)
              (tasks:server-deregister-self tdb (get-host-name))
              (thread-sleep! 1)
              (debug:print-info 0 "Max cached queries was " *max-cache-size*)
              (debug:print-info 0 "Server shutdown complete. Exiting")
              (exit)))))))


(define (nmsg-transport:client-signal-handler signum)
  (handle-exceptions
   exn
   (debug:print " ... exiting ...")
   (let ((th1 (make-thread (lambda ()
			     (if (not *received-response*)
				 (receive-message* *runremote*))) ;; flush out last call if applicable
			   "eat response"))
	 (th2 (make-thread (lambda ()
			     (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.")
			     (thread-sleep! 3) ;; give the flush three seconds to do it's stuff
			     (debug:print 0 "       Done.")
			     (exit 4))
			   "exit on ^C timer")))
     (thread-start! th2)
     (thread-start! th1)
     (thread-join! th2))))

Modified rmt.scm from [a4cf4136f4] to [a7d45cc611].

11
12
13
14
15
16
17

18
19
20
21
22
23
24

(use json format)

(declare (unit rmt))
(declare (uses api))
(declare (uses tdb))
(declare (uses http-transport))


;;
;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!!
;;

;; ;; For debugging add the following to ~/.megatestrc
;;







>







11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

(use json format)

(declare (unit rmt))
(declare (uses api))
(declare (uses tdb))
(declare (uses http-transport))
(declare (uses nmsg-transport))

;;
;; THESE ARE ALL CALLED ON THE CLIENT SIDE!!!
;;

;; ;; For debugging add the following to ~/.megatestrc
;;
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81

82
83
84
85
86
87
88
89

90


91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
	cinfo
	;; NB// can cache the answer for server running for 10 seconds ...
	;;  ;; (and (not (rmt:write-frequency-over-limit? cmd run-id))
	(if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id)
	    (client:setup run-id)
	    #f))))

;; cmd is a symbol
;; vars is a json string encoding the parameters for the call
;;
(define (rmt:send-receive cmd rid params #!key (attemptnum 0))
  ;; clean out old connections
  (mutex-lock! *db-multi-sync-mutex*)
  (let ((expire-time (- (current-seconds) 60)))
    (for-each 
     (lambda (run-id)
       (let ((connection (hash-table-ref/default *runremote* run-id #f)))
	 (if (and connection 
		  (< (http-transport:server-dat-get-last-access connection) expire-time))
	     (begin
	       (debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses")

	       (hash-table-delete! *runremote* run-id)))))
     (hash-table-keys *runremote*)))
  (mutex-unlock! *db-multi-sync-mutex*)
  (let* ((run-id          (if rid rid 0))
	 (connection-info (rmt:get-connection-info run-id))
	 (jparams         (db:obj->string params)))
    (if connection-info
	;; use the server if have connection info

	(let* ((dat     (http-transport:client-api-send-receive run-id connection-info cmd jparams))


	       (res     (if (vector? dat) (vector-ref dat 1) #f))
	       (success (if (vector? dat) (vector-ref dat 0) #f)))
	  (http-transport:server-dat-update-last-access connection-info)
	  (if success
	      (db:string->obj res)
	      ;; (if (< attemptnum 100)
	      ;;     (begin
	      ;;       (hash-table-delete! *runremote* run-id)
	      ;;       (thread-sleep! 0.5)
	      ;;       (rmt:send-receive cmd rid params attempnum: (+ attemptnum 1)))
	      ;;     (begin
	      ;;       (print-call-chain (current-error-port))
	      ;;       (debug:print 0 "ERROR: too many attempts to communicate have failed. Giving up. Kill your mtest processes and start over")
	      ;;       (exit 1)))))
	      (begin ;; let ((new-connection-info (client:setup run-id)))
		(debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.")
		(hash-table-delete! *runremote* run-id) ;; don't keep using the same connection

		;; no longer killing the server in http-transport:client-api-send-receive
		;; may kill it here but what are the criteria?
		;; start with three calls then kill server







<
<
<











>








>
|
>
>





<
<
<
<
<
<
<
<
<







62
63
64
65
66
67
68



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97









98
99
100
101
102
103
104
	cinfo
	;; NB// can cache the answer for server running for 10 seconds ...
	;;  ;; (and (not (rmt:write-frequency-over-limit? cmd run-id))
	(if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id)
	    (client:setup run-id)
	    #f))))




(define (rmt:send-receive cmd rid params #!key (attemptnum 0))
  ;; clean out old connections
  (mutex-lock! *db-multi-sync-mutex*)
  (let ((expire-time (- (current-seconds) 60)))
    (for-each 
     (lambda (run-id)
       (let ((connection (hash-table-ref/default *runremote* run-id #f)))
	 (if (and connection 
		  (< (http-transport:server-dat-get-last-access connection) expire-time))
	     (begin
	       (debug:print-info 0 "Discarding connection to server for run-id " run-id ", too long between accesses")
	       ;; SHOULD CLOSE THE CONNECTION HERE
	       (hash-table-delete! *runremote* run-id)))))
     (hash-table-keys *runremote*)))
  (mutex-unlock! *db-multi-sync-mutex*)
  (let* ((run-id          (if rid rid 0))
	 (connection-info (rmt:get-connection-info run-id))
	 (jparams         (db:obj->string params)))
    (if connection-info
	;; use the server if have connection info
	(let* ((dat     (case *transport-type*
			  ((http)(http-transport:client-api-send-receive run-id connection-info cmd jparams))
			  ((nm)  (nm-transport:client-api-send-receive   run-id connection-info cmd jparams))
			  (else  (exit))))
	       (res     (if (vector? dat) (vector-ref dat 1) #f))
	       (success (if (vector? dat) (vector-ref dat 0) #f)))
	  (http-transport:server-dat-update-last-access connection-info)
	  (if success
	      (db:string->obj res)









	      (begin ;; let ((new-connection-info (client:setup run-id)))
		(debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.")
		(hash-table-delete! *runremote* run-id) ;; don't keep using the same connection

		;; no longer killing the server in http-transport:client-api-send-receive
		;; may kill it here but what are the criteria?
		;; start with three calls then kill server

Modified server.scm from [f2b9d5f3d9] to [a2c69bc84d].

18
19
20
21
22
23
24

25
26
27
28
29
30
31
(declare (unit server))

(declare (uses common))
(declare (uses db))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
(declare (uses synchash))
(declare (uses http-transport))

(declare (uses launch))
;; (declare (uses zmq-transport))
(declare (uses daemon))

(include "common_records.scm")
(include "db_records.scm")








>







18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
(declare (unit server))

(declare (uses common))
(declare (uses db))
(declare (uses tasks)) ;; tasks are where stuff is maintained about what is running.
(declare (uses synchash))
(declare (uses http-transport))
(declare (uses nmsg-transport))
(declare (uses launch))
;; (declare (uses zmq-transport))
(declare (uses daemon))

(include "common_records.scm")
(include "db_records.scm")

45
46
47
48
49
50
51

52


53
54
55
56
57
58
59
;;

;; all routes though here end in exit ...
;;
;; start_server
;;
(define (server:launch run-id)

  (http-transport:launch run-id))



;;======================================================================
;; Q U E U E   M A N A G E M E N T
;;======================================================================

;; We don't want to flush the queue if it was just flushed
(define *server:last-write-flush* (current-milliseconds))







>
|
>
>







46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
;;

;; all routes though here end in exit ...
;;
;; start_server
;;
(define (server:launch run-id)
  (case *transport-type*
    ((http)(http-transport:launch run-id))
    ((nm)  (nmsg-transport:launch run-id))
    (else (debug:print 0 "ERROR: unknown server type " *transport-type*))))

;;======================================================================
;; Q U E U E   M A N A G E M E N T
;;======================================================================

;; We don't want to flush the queue if it was just flushed
(define *server:last-write-flush* (current-milliseconds))

Added testnanomsg/basic-req-rep.scm version [1436c827c9].







>
>
>
1
2
3
(use nanomsg srfi-18 sqlite3 numbers)

(define resp (nn-socket 'rep))

Added testnanomsg/mockupclient.scm version [63a8c6685a].





















































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
(use zmq posix numbers)

(define cname "Bob")
(define runtime 10)
(let ((args (argv)))
  (if (< (length args) 3)
      (begin
	(print "Usage: mockupclient clientname runtime")
	(exit))
      (begin
	(set! cname (cadr args))
	(set! runtime (string->number (caddr args))))))
      
;; (define start-delay (/ (random 100) 9))
;; (define runtime     (+ 1 (/ (random 200) 2)))

(print "Starting client " cname " with runtime " runtime)

(include "mockupclientlib.scm")

(set! endtime (+ (current-seconds) runtime))

;; first ping the server to ensure we have a connection
(if (server-ping cname 5)
    (print "SUCCESS: Client " cname " connected to server")
    (begin
      (print "ERROR: Client " cname " failed ping of server, exiting")
      (exit)))

(let loop ()
  (let ((x (random 15))
	(varname (list-ref (list "hello" "goodbye" "saluton" "kiaorana")(random 4))))
    (case x
      ;; ((1)(dbaccess cname 'sync "nodat"    #f))
      ((2 3 4 5)(dbaccess cname 'set varname (random 999)))
      ((6 7 8 9 10)(print cname ": Get \"" varname "\" " (dbaccess cname 'get varname #f)))
      (else
       (thread-sleep! 0.011)))
    (if (< (current-seconds) endtime)
	(loop))))

(print "Client " cname " all done!!")

Added testnanomsg/mockupclientlib.scm version [3b245ba7a9].





















































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
(define reqs (nn-socket 'req))

(connect-socket reqs "tcp://localhost:6563")

(thread-sleep! 0.2)

(define (server-ping cname timeout)
  (let ((msg     (conc cname ":ping:" timeout))
	(maxtime (+ (current-seconds) timeout)))
    (print "pinging server from " cname " with timeout " timeout)
    (let loop ((res   #f))
      (if (< maxtime (current-seconds))
	  #f ;; failed to ping
	  (if (equal? res "Got ping")
	      #t
	      (begin
		(print "Ping received from server " res)
		(send-message push msg)
		(thread-sleep! 0.1)
		(loop (receive-message sub non-blocking: #t))))))))
  
(define (dbaccess cname cmd var val #!key (numtries 20))
  (let* ((msg (conc cname ":" cmd ":" (if val (conc var " " val) var)))
	 (res #f)
	 (mtx1 (make-mutex))
	 (do-access (lambda ()
		      (let ((tmpres #f))
			(print "Sending msg: " msg)
			(send-message push msg)
			(print "Message " msg " sent")
			(print "Client " cname " waiting for response to " msg)
			(print "Client " cname " received address " (receive-message* sub))
			(set! tmpres (receive-message* sub))
			(mutex-lock! mtx1)
			(set! res tmpres)
			(mutex-unlock! mtx1))))
	 (th1 (make-thread do-access "do access"))
	 (th2 (make-thread (lambda ()
			     (let ((result #f))
			       (mutex-lock! mtx1)
			       (set! result res)
			       (mutex-unlock! mtx1)
			       (thread-sleep! 5)
			       (if (not result)
				   (if (> numtries 0)
				       (begin
					 (print "WARNING: access timed out for " cname ", trying again. Trys remaining=" numtries)
					 (dbaccess cname cmd var val numtries: (- numtries 1)))
				       (begin
					 (print "ERROR: dbaccess timed out. Exiting")
					 (exit)))))
			     "timeout thread"))))
    (thread-start! th1)
    (thread-start! th2)
    (thread-join! th1)
    (if res (print "SUCCESS: received " res " with " numtries " remaining possible attempts"))
    res))

Added testnanomsg/mockupserver.scm version [a4d3e5594c].





































































































































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
;; pub/sub with envelope address
;; Note that if you don't insert a sleep, the server will crash with SIGPIPE as soon
;; as a client disconnects.  Also a remaining client may receive tons of
;; messages afterward.

(use nanomsg srfi-18 sqlite3 numbers)

(define resp (nn-socket 'rep))
(define cname "server")
(define total-db-accesses 0)
(define start-time (current-seconds))

(nn-bind resp  "tcp://*:6563")

(thread-sleep! 0.2)

(define (open-db)
  (let* ((dbpath    "mockup.db")
	 (dbexists  (file-exists? dbpath))
	 (db        (open-database dbpath)) ;; (never-give-up-open-db dbpath))
	 (handler   (make-busy-timeout 10)))
    (set-busy-handler! db handler)
    (if (not dbexists)
	(for-each
	 (lambda (stmt)
	   (execute db stmt))
	 (list
	  "PRAGMA SYNCHRONOUS=0;"
	  "CREATE TABLE clients (id INTEGER PRIMARY KEY,name TEXT,num_accesses INTEGER DEFAULT 0);"
	  "CREATE TABLE vars    (var TEXT,val TEXT,CONSTRAINT vars_constraint UNIQUE (var));")))
    db))

(define cid-cache (make-hash-table))

(define (get-client-id db cname)
  (let ((cid (hash-table-ref/default cid-cache cname #f)))
    (if cid 
	cid
	(begin
	  (execute db "INSERT OR REPLACE INTO clients (name) VALUES(?);" cname)
	  (for-each-row 
	   (lambda (id)
	     (set! cid id))
	   db
	   "SELECT id FROM clients WHERE name=?;" cname)
	  (hash-table-set! cid-cache cname cid)
	  (set! total-db-accesses (+ total-db-accesses 2))
	  cid))))

(define (count-client db cname)
  (let ((cid (get-client-id db cname)))
    (execute db "UPDATE clients SET num_accesses=num_accesses+1 WHERE id=?;" cid)
    (set! total-db-accesses (+ total-db-accesses 1))
    ))

(define db (open-db))
;; (define queuelst '())
;; (define mx1 (make-mutex))

(define max-queue-len 0)

(define (process-queue queuelst)
  (let ((queuelen (length queuelst)))
    (if (> queuelen max-queue-len)
	(set! max-queue-len queuelen))
    (for-each
     (lambda (item)
       (let ((cname (vector-ref item 1))
	     (clcmd (vector-ref item 2))
	     (cdata (vector-ref item 3)))
	 (send-message pub cname send-more: #t)
	 (send-message pub (case clcmd
			     ((sync)
			      (conc queuelen))
			     ((set)
			      (set! total-db-accesses (+ total-db-accesses 1))
			      (apply execute db "INSERT OR REPLACE INTO vars (var,val) VALUES (?,?);" (string-split cdata))
			      "ok")
			     ((get)
			      (set! total-db-accesses (+ total-db-accesses 1))
			      (let ((res "noval"))
				(for-each-row
				 (lambda (val)
				   (set! res val))
				 db 
				 "SELECT val FROM vars WHERE var=?;" cdata)
				res))
			     (else (conc "unk cmd: " clcmd))))))
     queuelst)))

;; SERVER THREAD
(define th1 (make-thread 
	     (lambda ()
	       (let ((last-run 0)) ;; current-seconds when run last
		 (let loop ((queuelst '()))
		   (let* ((indat (receive-message* pull))
			  (parts (string-split indat ":"))
			  (cname (car parts))                   ;; client name
			  (clcmd (string->symbol (cadr parts))) ;; client cmd
			  (cdata (caddr parts))                 ;; client data
			  (svect (vector (current-seconds) cname clcmd cdata))) ;; record for the queue
		     ;; (print "Server received message: " indat)
		     (count-client db cname)
		     (case clcmd
		       ((ping)
			(print "Got ping from " cname)
			(send-message pub cname send-more: #t)
			(send-message pub "Got ping")
			(loop queuelst))
		       ((sync) ;; just process the queue
			(print "Got sync from " cname)
			(process-queue (cons svect queuelst))
			(loop '()))
		       ((get)
			(process-queue (cons svect queuelst))
			(loop '()))
		       (else
			(loop (cons svect queuelst))))))))
	     "server thread"))

(include "mockupclientlib.scm")

;; SYNC THREAD
;; send a sync to the pull port
(define th2 (make-thread
	     (lambda ()
	       (let ((last-action-time (current-seconds)))
		 (let loop ()
		   (thread-sleep! 5)
		   (let ((queuelen (string->number (dbaccess "server" 'sync "nada" #f)))
			 (last-action-delta #f))
		     (if (> queuelen 1)(set! last-action-time (current-seconds)))
		     (set! last-action-delta (- (current-seconds) last-action-time))
		     (print "Server: Got queuelen=" queuelen ", last-action-delta=" last-action-delta)
		     (if (< last-action-delta 60)
			 (loop)
			 (print "Server exiting, 25 seconds since last access"))))))
	     "sync thread"))

(thread-start! th1)
(thread-start! th2)
(thread-join! th2)

(let* ((run-time       (- (current-seconds) start-time))
       (queries/second (/  total-db-accesses run-time)))
  (print "Server exited! Total db accesses=" total-db-accesses " in " run-time " seconds for " queries/second " queries/second with max queue length of: " max-queue-len))

Added testnanomsg/pipeline.scm version [1d4d831eb6].



















































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
;; watch nanomsg's pipeline load-balancer in action.
(use nanomsg)

(define push (nn-socket 'push))
(define pull1 (nn-socket 'pull))
(define pull2 (nn-socket 'pull))

(nn-bind    push  "inproc://test")
(nn-connect pull1 "inproc://test")
(nn-connect pull2 "inproc://test")

(nn-send push "a")
(nn-send push "b")
(nn-send push "c")
(nn-send push "d")

(define ((th sock))
  (print (current-thread) ": " (nn-recv sock))
  (print (current-thread) ": " (nn-recv sock))
  (print (current-thread) " is done"))

(thread-start! (th pull1))
(thread-start! (th pull2))

(thread-sleep! 1)

Added testnanomsg/req-rep-client.scm version [7998d54555].





























































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
;; watch nanomsg's pipeline load-balancer in action.
(use nanomsg)

(define req   (nn-socket 'req))

(nn-connect req  "tcp://localhost:22022")

;; (with-output-to-string (lambda ()(serialize obj)))
(define (client-send-receive soc msg)
  (nn-send soc msg)
  (nn-recv soc))

(define ((talk-to-server soc))
  (let loop ((cnt 20))
    (let ((name (list-ref '("Matt" "Tom" "Bob" "Jill" "James" "Jane")(random 6))))
      (print "Sending " name)
      (print (client-send-receive req name))
      (if (> cnt 0)(loop (- cnt 1)))))
  (print (client-send-receive req "quit"))
  (nn-close req)
  (exit))

;; (thread-start! (lambda ()
;; 		 (thread-sleep! 20)
;; 		 (print "Give up on waiting for the server")
;; 		 (nn-close req)
;; 		 (exit)))

(thread-join! (thread-start! (talk-to-server req)))

Added testnanomsg/req-rep-server.scm version [d9de6da037].





















































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
;; watch nanomsg's pipeline load-balancer in action.
(use nanomsg)

;; (use trace)
;; (trace nn-bind nn-socket nn-assert nn-recv nn-send thread-terminate! nn-close )

(define port  22022)
(define host  "127.0.0.1")

(define rep   (nn-socket 'rep))

(print "connecting, got: " (nn-bind    rep  (conc "tcp://" "*" ":" port)))

(define (server soc)
  (print "server starting")
  (let loop ((msg-in (nn-recv soc)))
    (print "server received: " msg-in)
    (cond
     ((equal? msg-in "quit")
      (nn-send soc "Ok, quitting"))
     ((and (>= (string-length msg-in) 4)
	   (equal? (substring msg-in 0 4) "ping"))
      (nn-send soc (conc (current-process-id)))
      (loop (nn-recv soc)))
     ;;((and (>= (string-length msg-in)
     (else
      (let ((this-task (random 15)))
	(thread-sleep! this-task)
	(nn-send soc (conc "hello " msg-in " this task took " this-task " seconds to complete"))
	(loop (nn-recv soc)))))))

(define (ping-self host port #!key (return-socket #t))
  ;; send a random number along with pid and check that we get it back
  (let* ((req     (nn-socket 'req))
	 (key     "ping")
	 (success #f)
	 (keepwaiting #t)
	 (ping    (make-thread
		   (lambda ()
		     (print "ping: sending string \"" key "\", expecting " (current-process-id))
		     (nn-send req key)
		     (let ((result  (nn-recv req)))
		       (if (equal? (conc (current-process-id)) result)
			   (begin
			     (print "ping, success: received \"" result "\"")
			     (set! success #t))
			   (begin
			     (print "ping, failed: received key \"" result "\"")
			     (set! keepwaiting #f)
			     (set! success #f)))))
		   "ping"))
	 (timeout (make-thread (lambda ()
				 (let loop ((count 0))
				   (thread-sleep! 1)
				   (print "still waiting after count seconds...")
				   (if (and keepwaiting (< count 10))
				       (loop (+ count 1))))
				 (if keepwaiting
				     (begin
				       (print "timeout waiting for ping")
				       (thread-terminate! ping))))
			       "timeout")))
    (nn-connect req (conc "tcp://" host ":" port))
    (handle-exceptions
     exn
     (begin
       (print-call-chain)
       (print 0 " message: " ((condition-property-accessor 'exn 'message) exn))
       (print "exn=" (condition->list exn))
       (print "ping failed to connect to " host ":" port))
     (thread-start! timeout)
     (thread-start! ping)
     (thread-join! ping)
     (if success (thread-terminate! timeout)))
    (if return-socket
	(if success req #f)
	(begin
	  (nn-close req)
	  success))))

(let ((server-thread (make-thread (lambda ()(server rep)) "server")))
  (thread-start! server-thread)
  ;; (thread-sleep! 1)
  (if (ping-self host port)
      (begin
	(thread-join! server-thread)
	(nn-close rep))
      (print "ping failed")))

(exit)

Added testnanomsg/req-rep.scm version [b77ebf1421].





























































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
;; watch nanomsg's pipeline load-balancer in action.
(use nanomsg)

(define req   (nn-socket 'req))
(define rep   (nn-socket 'rep))

(nn-bind    rep  "inproc://test")
(nn-connect req  "inproc://test")

(define (client-send-receive soc msg)
  (nn-send soc msg)
  (nn-recv soc))

(define ((server soc))
  (let loop ((msg-in (nn-recv soc)))
    (if (not (equal? msg-in "quit"))
	(begin
	  (nn-send soc (conc "hello " msg-in))
	  (loop (nn-recv soc))))))

(thread-start! (server rep))

(print (client-send-receive req "Matt"))
(print (client-send-receive req "Tom"))

;; (client-send-receive req "quit")

(nn-close req)
(nn-close rep)
(exit)

Modified tests/Makefile from [502a984b43] to [a8b041cd34].

32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

stopserver :
	cd ..;make -j && make install
	cd fullrun;$(MEGATEST) -stop-server 0

repl :
	cd ..;make -j && make install
	cd fullrun;$(MEGATEST) -repl

test0 : cleanprep
	cd simplerun ; $(MEGATEST) -server - -debug $(DEBUG)

test1 : cleanprep

test2 : fullprep







|







32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

stopserver :
	cd ..;make -j && make install
	cd fullrun;$(MEGATEST) -stop-server 0

repl :
	cd ..;make -j && make install
	cd fullrun;$(MEGATEST) -:b -repl

test0 : cleanprep
	cd simplerun ; $(MEGATEST) -server - -debug $(DEBUG)

test1 : cleanprep

test2 : fullprep