Megatest

Check-in [245a3a0b6d]
Login
Overview
Comment:Add timeout on all remote calls
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | try-nanomsg
Files: files | file ages | folders
SHA1: 245a3a0b6d1daffabb61e689baef2ab4f6d83af4
User & Date: matt on 2014-11-26 21:29:00
Other Links: branch diff | manifest | tags
Context
2014-11-26
22:49
Cleaned up client:start check-in: f64f2f8ca8 user: matt tags: try-nanomsg
21:29
Add timeout on all remote calls check-in: 245a3a0b6d user: matt tags: try-nanomsg
16:09
Added debug support in newdashboard check-in: b79afa463f user: matt tags: try-nanomsg
Changes

Modified nmsg-transport.scm from [b2990ea4bd] to [a65dcd51b0].

59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85

86

87
88
89



90
91
92
93
94
95
96
(define *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))

;;======================================================================
;; S E R V E R
;;======================================================================

(define (nmsg-transport:run dbstruct hostn run-id server-id)
  (debug:print 2 "Attempting to start the server ...")
  (let* ((start-port      (portlogger:open-run-close portlogger:find-port))
	 (server-thread   (make-thread (lambda ()
					 (nmsg-transport:try-start-server dbstruct run-id start-port server-id))
				       "server thread"))
	 (tdbdat          (tasks:open-db)))
    (thread-start! server-thread)
    (if (nmsg-transport:ping hostn start-port timeout: 2 expected-key: (current-process-id))
	(let ((interface (if (equal? hostn "-")(get-host-name) hostn)))
	  (tasks:server-set-interface-port (db:delay-if-busy tdbdat) server-id interface start-port)
	  (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep")
	  (set! *server-info* (list hostn start-port)) ;; probably not needed anymore? currently used by keep-running
	  (thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access
	  (set! *inmemdb*  dbstruct)
	  (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running")
	  (thread-start! (make-thread
			  (lambda ()(nmsg-transport:keep-running server-id))
			  "keep running"))
	  (thread-join! server-thread))

	(begin

	  (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature")
	  (portlogger:open-run-close portlogger:set-failed start-port)
	  (nmsg-transport:run dbstruct hostn run-id server-id)))))




(define (nmsg-transport:try-start-server dbstruct run-id portnum server-id)
  (let ((repsoc (nn-socket 'rep)))
    (nn-bind repsoc (conc "tcp://*:" portnum))
    (let loop ((msg-in (nn-recv repsoc)))
      (cond
       ((equal? msg-in "quit")







|



















>
|
>
|
|
|
>
>
>







59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
(define *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))

;;======================================================================
;; S E R V E R
;;======================================================================

(define (nmsg-transport:run dbstruct hostn run-id server-id #!key (retrynum 1000))
  (debug:print 2 "Attempting to start the server ...")
  (let* ((start-port      (portlogger:open-run-close portlogger:find-port))
	 (server-thread   (make-thread (lambda ()
					 (nmsg-transport:try-start-server dbstruct run-id start-port server-id))
				       "server thread"))
	 (tdbdat          (tasks:open-db)))
    (thread-start! server-thread)
    (if (nmsg-transport:ping hostn start-port timeout: 2 expected-key: (current-process-id))
	(let ((interface (if (equal? hostn "-")(get-host-name) hostn)))
	  (tasks:server-set-interface-port (db:delay-if-busy tdbdat) server-id interface start-port)
	  (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep")
	  (set! *server-info* (list hostn start-port)) ;; probably not needed anymore? currently used by keep-running
	  (thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access
	  (set! *inmemdb*  dbstruct)
	  (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running")
	  (thread-start! (make-thread
			  (lambda ()(nmsg-transport:keep-running server-id))
			  "keep running"))
	  (thread-join! server-thread))
	(if (> retrynum 0)
	    (begin
	      (debug:print 0 "WARNING: Failed to connect to server (self) on host " hostn ":" start-port ", trying again.")
	      (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature")
	      (portlogger:open-run-close portlogger:set-failed start-port)
	      (nmsg-transport:run dbstruct hostn run-id server-id))
	    (begin
	      (debug:print 0 "ERROR: could not find an open port to start server on. Giving up")
	      (exit 1))))))

(define (nmsg-transport:try-start-server dbstruct run-id portnum server-id)
  (let ((repsoc (nn-socket 'rep)))
    (nn-bind repsoc (conc "tcp://*:" portnum))
    (let loop ((msg-in (nn-recv repsoc)))
      (cond
       ((equal? msg-in "quit")
169
170
171
172
173
174
175
176
177
178
179
180
181
182


183
184
185
186
187
188



189
190


191
192

193
194


195






196




197

198

199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
;; ping the server at host:port
;;   return the open socket if successful (return-socket == #t)
;;   expect the key expected-key returned in payload
;;   send our-key or #f as payload
;;
(define (nmsg-transport:ping hostn port #!key (timeout 3)(return-socket #t)(expected-key #f)(our-key #f)(socket #f))
  ;; send a random number along with pid and check that we get it back
  (let* ((req     (or socket (nn-socket 'req)))
	 (host    (if (or (not hostn)
			  (equal? hostn "-")) ;; use localhost
		      (get-host-name)
		      hostn))
	 (success #f)
	 (keepwaiting #t)


	 (dat     (db:obj->string (vector "ping" our-key) transport: 'nmsg))
	 (ping    (make-thread
		   (lambda ()
		     (nn-send req dat)
		     (let* ((result  (nn-recv req))
			    (key     (vector-ref (db:string->obj result transport: 'nmsg) 1)))



		       (if (or (not expected-key) ;; just getting a reply is good enough then
			       (equal? key expected-key)) 


			   (begin
			     ;; (print "ping, success: received \"" result "\"")

			     (set! success #t))
			   (begin


			     ;; (print "ping, failed: received key \"" result "\"")






			     (set! keepwaiting #f)




			     (set! success #f)))))

		   "ping"))

	 (timeout (make-thread (lambda ()
				 (let loop ((count 0))
				   (thread-sleep! 1)
				   (print "still waiting after count seconds...")
				   (if (and keepwaiting (< count timeout)) ;; yes, this is very aproximate
				       (loop (+ count 1))))
				 (if keepwaiting
				     (begin
				       (print "timeout waiting for ping")
				       (thread-terminate! ping))))
			       "timeout")))
    (if (not socket)(nn-connect req (conc "tcp://" host ":" port)))
    (handle-exceptions
     exn
     (begin
       ;; (print-call-chain)
       ;; (print 0 " message: " ((condition-property-accessor 'exn 'message) exn))
       ;; (print "exn=" (condition->list exn))
       (debug:print-info 1 "ping failed to connect to " host ":" port))
     (thread-start! timeout)
     (thread-start! ping)
     (thread-join! ping)
     (if success (thread-terminate! timeout)))
    (if return-socket
	(if success req #f)
	(begin
	  (nn-close req) ;; should it be closed if we were handed a socket?
	  success))))

;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (nmsg-transport:keep-running server-id)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown







<
|



|
|
>
>

|
<
|
|
|
>
>
>
|
|
>
>
|
<
>
|
|
>
>
|
>
>
>
>
>
>
|
>
>
>
>
|
>
|
>
|
|
|
|
|
|
|
|
|
|
|
<


<
<
<
|
<

|
|

<
|
<
<
<







174
175
176
177
178
179
180

181
182
183
184
185
186
187
188
189
190

191
192
193
194
195
196
197
198
199
200
201

202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233

234
235



236

237
238
239
240

241



242
243
244
245
246
247
248
;; ping the server at host:port
;;   return the open socket if successful (return-socket == #t)
;;   expect the key expected-key returned in payload
;;   send our-key or #f as payload
;;
(define (nmsg-transport:ping hostn port #!key (timeout 3)(return-socket #t)(expected-key #f)(our-key #f)(socket #f))
  ;; send a random number along with pid and check that we get it back

  (let* ((host    (if (or (not hostn)
			  (equal? hostn "-")) ;; use localhost
		      (get-host-name)
		      hostn))
	 (req     (or socket
		      (let ((soc (nn-socket 'req)))
			(nn-connect soc (conc "tcp://" host ":" port))
			soc)))
	 (dat     (db:obj->string (vector "ping" our-key) transport: 'nmsg))
	 (result  (nmsg-transport:client-api-send-receive-raw req dat timeout: timeout))

	 (success (vector-ref result 0))
	 (key     (if success 
		      (vector-ref (db:string->obj (vector-ref result 1) transport: 'nmsg) 1)
		      #f)))
    (debug:print 0 "success=" success ", key=" key ", expected-key=" expected-key ", equal? " (equal? key expected-key))
    (if (and success
	     (or (not expected-key) ;; just getting a reply is good enough then
		 (equal? key expected-key)))
	(if return-socket
	    req
	    (begin

	      (if (not socket)(nn-close req)) ;; don't want a side effect of closing socket if handed it
	      #t))
	(begin
	  (if (not socket)(nn-close req)) ;; failed to ping, close socket as side effect
	  #f))))

;; send data to server, wait max of timeout seconds for a response.
;; return #( success/fail result )
;;
(define (nmsg-transport:client-api-send-receive-raw socreq dat #!key (timeout 5))
  (let* ((success     #f)
	 (result      #f)
	 (keepwaiting #t)
	 (send-recv   (make-thread
		       (lambda ()
			 (nn-send socreq dat)
			 (let* ((res (nn-recv socreq)))
			   (set! success #t)
			   (set! result res)))
		       "send-recv"))
	 (timeout     (make-thread
		       (lambda ()
			 (let loop ((count 0))
			   (thread-sleep! 1)
			   (debug:print-info 1 "send-receive-raw, still waiting after " count " seconds...")
			   (if (and keepwaiting (< count timeout)) ;; yes, this is very aproximate
			       (loop (+ count 1))))
			 (if keepwaiting
			     (begin
			       (print "timeout waiting for ping")
			       (thread-terminate! send-recv))))
		       "timeout")))

    (handle-exceptions
     exn



     (set! result "timeout")

     (thread-start! timeout)
     (thread-start! send-recv)
     (thread-join! send-recv)
     (if success (thread-terminate! timeout)))

    (vector success result)))




;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (nmsg-transport:keep-running server-id)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
287
288
289
290
291
292
293


294
295
296
297
298
299


300
301

302
303
304
305
306
307
308
309
;; C L I E N T S
;;======================================================================

(define (nmsg-transport:client-connect iface portnum)
  (let* ((reqsoc      (nmsg-transport:ping iface portnum return-socket: #t)))
    (vector iface portnum #f #f #f (current-seconds) reqsoc)))



(define (nmsg-transport:client-api-send-receive run-id connection-info cmd param)
  (mutex-lock! *http-mutex*)
  (let ((packet  (vector cmd param))
	(reqsoc  (http-transport:server-dat-get-socket connection-info)))
    (nn-send reqsoc (db:obj->string packet transport: 'nmsg))
    (let ((res (db:string->obj (nn-recv reqsoc) transport: 'nmsg)))


      (mutex-unlock! *http-mutex*)
      res)))


;;======================================================================
;; J U N K 
;;======================================================================

;; DO NOT USE
;;
(define (nmsg-transport:client-signal-handler signum)







>
>
|

|
|
|
<
>
>
|
<
>
|







302
303
304
305
306
307
308
309
310
311
312
313
314
315

316
317
318

319
320
321
322
323
324
325
326
327
;; C L I E N T S
;;======================================================================

(define (nmsg-transport:client-connect iface portnum)
  (let* ((reqsoc      (nmsg-transport:ping iface portnum return-socket: #t)))
    (vector iface portnum #f #f #f (current-seconds) reqsoc)))

;; return #( success result )
;;
(define (nmsg-transport:client-api-send-receive run-id connection-info cmd param #!key (remtries 5))
  (mutex-lock! *http-mutex*)
  (let* ((packet  (db:obj->string (vector cmd param) transport: 'nmsg))
	 (reqsoc  (http-transport:server-dat-get-socket connection-info))
	 (rawres  (nmsg-transport:client-api-send-receive-raw reqsoc packet))

	 (status  (vector-ref rawres 0))
	 (result  (vector-ref rawres 1)))
    (mutex-unlock! *http-mutex*)

    (vector status (if status (db:string->obj result transport: 'nmsg) result))))
	
;;======================================================================
;; J U N K 
;;======================================================================

;; DO NOT USE
;;
(define (nmsg-transport:client-signal-handler signum)

Modified rmt.scm from [2b271743b3] to [ac5c069146].

87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102


103



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
    ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also)
    (if connection-info
	;; use the server if have connection info
	(let* ((dat     (case *transport-type*
			  ((http)(http-transport:client-api-send-receive run-id connection-info cmd jparams))
			  ((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info cmd params))
			  (else  (exit))))
	       (res     (if (and dat (vector? dat)) (vector-ref dat 1) #f))
	       (success (if (and dat (vector? dat)) (vector-ref dat 0) #f)))
	  (http-transport:server-dat-update-last-access connection-info)
	  (if success
	      (case *transport-type* 
		((http)(db:string->obj res))
		((nmsg) res))
	      (begin ;; let ((new-connection-info (client:setup run-id)))
		(debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.")


		(hash-table-delete! *runremote* run-id) ;; don't keep using the same connection




		;; no longer killing the server in http-transport:client-api-send-receive
		;; may kill it here but what are the criteria?
		;; start with three calls then kill server
		(if (eq? attemptnum 3)(tasks:kill-server-run-id run-id))
		(thread-sleep! 2)
		(rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))))
	(if (and (< attemptnum 10)
		 (tasks:need-server run-id))
	    (begin
	      (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10)
	      (rmt:send-receive cmd rid params (+ attemptnum 1)))
	    (rmt:open-qry-close-locally cmd run-id params)))))

(define (rmt:update-db-stats run-id rawcmd params duration)
  (mutex-lock! *db-stats-mutex*)
  (handle-exceptions
   exn
   (begin
     (debug:print 0 "WARNING: stats collection failed in update-db-stats")







|
|







>
>

>
>
>




|
|
|
|
|
|
|
|
|







87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
    ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also)
    (if connection-info
	;; use the server if have connection info
	(let* ((dat     (case *transport-type*
			  ((http)(http-transport:client-api-send-receive run-id connection-info cmd jparams))
			  ((nmsg)(nmsg-transport:client-api-send-receive run-id connection-info cmd params))
			  (else  (exit))))
	       (success (if (and dat (vector? dat)) (vector-ref dat 0) #f))
	       (res     (if (and dat (vector? dat)) (vector-ref dat 1) #f)))
	  (http-transport:server-dat-update-last-access connection-info)
	  (if success
	      (case *transport-type* 
		((http)(db:string->obj res))
		((nmsg) res))
	      (begin ;; let ((new-connection-info (client:setup run-id)))
		(debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.")
		(case *transport-type*
		  ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info))))
		(hash-table-delete! *runremote* run-id) ;; don't keep using the same connection
		(tasks:kill-server-run-id run-id tag: "api-send-receive-failed")
		(tasks:start-and-wait-for-server (tasks:open-db) run-id 15)
		;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1))))))

		;; no longer killing the server in http-transport:client-api-send-receive
		;; may kill it here but what are the criteria?
		;; start with three calls then kill server
		;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id))
		;; (thread-sleep! 2)
		(rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1))))))
    (if (and (< attemptnum 10)
	     (tasks:need-server run-id))
	(begin
	  (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10)
	  (rmt:send-receive cmd rid params (+ attemptnum 1)))
	(rmt:open-qry-close-locally cmd run-id params))))

(define (rmt:update-db-stats run-id rawcmd params duration)
  (mutex-lock! *db-stats-mutex*)
  (handle-exceptions
   exn
   (begin
     (debug:print 0 "WARNING: stats collection failed in update-db-stats")