Megatest

Diff
Login

Differences From Artifact [b2990ea4bd]:

To Artifact [a65dcd51b0]:


59
60
61
62
63
64
65
66

67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85

86
87
88
89








90
91
92
93
94
95
96
59
60
61
62
63
64
65

66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86




87
88
89
90
91
92
93
94
95
96
97
98
99
100
101







-
+



















+
-
-
-
-
+
+
+
+
+
+
+
+







(define *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))

;;======================================================================
;; S E R V E R
;;======================================================================

(define (nmsg-transport:run dbstruct hostn run-id server-id)
(define (nmsg-transport:run dbstruct hostn run-id server-id #!key (retrynum 1000))
  (debug:print 2 "Attempting to start the server ...")
  (let* ((start-port      (portlogger:open-run-close portlogger:find-port))
	 (server-thread   (make-thread (lambda ()
					 (nmsg-transport:try-start-server dbstruct run-id start-port server-id))
				       "server thread"))
	 (tdbdat          (tasks:open-db)))
    (thread-start! server-thread)
    (if (nmsg-transport:ping hostn start-port timeout: 2 expected-key: (current-process-id))
	(let ((interface (if (equal? hostn "-")(get-host-name) hostn)))
	  (tasks:server-set-interface-port (db:delay-if-busy tdbdat) server-id interface start-port)
	  (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep")
	  (set! *server-info* (list hostn start-port)) ;; probably not needed anymore? currently used by keep-running
	  (thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access
	  (set! *inmemdb*  dbstruct)
	  (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running")
	  (thread-start! (make-thread
			  (lambda ()(nmsg-transport:keep-running server-id))
			  "keep running"))
	  (thread-join! server-thread))
	(if (> retrynum 0)
	(begin
	  (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature")
	  (portlogger:open-run-close portlogger:set-failed start-port)
	  (nmsg-transport:run dbstruct hostn run-id server-id)))))
	    (begin
	      (debug:print 0 "WARNING: Failed to connect to server (self) on host " hostn ":" start-port ", trying again.")
	      (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature")
	      (portlogger:open-run-close portlogger:set-failed start-port)
	      (nmsg-transport:run dbstruct hostn run-id server-id))
	    (begin
	      (debug:print 0 "ERROR: could not find an open port to start server on. Giving up")
	      (exit 1))))))

(define (nmsg-transport:try-start-server dbstruct run-id portnum server-id)
  (let ((repsoc (nn-socket 'rep)))
    (nn-bind repsoc (conc "tcp://*:" portnum))
    (let loop ((msg-in (nn-recv repsoc)))
      (cond
       ((equal? msg-in "quit")
169
170
171
172
173
174
175
176
177

178
179
180
181
182




183
184

185
186
187
188
189
190
191











192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
































210
211
212
213
214
215
216

217
218
219
220


221
222
223

224
225
226
227
228
229
230
231
232
233
174
175
176
177
178
179
180


181
182
183
184


185
186
187
188
189

190







191
192
193
194
195
196
197
198
199
200
201


















202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233

234
235




236

237


238
239
240


241



242
243
244
245
246
247
248







-
-
+



-
-
+
+
+
+

-
+
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-


-
-
-
-
+
-

-
-
+
+

-
-
+
-
-
-







;; ping the server at host:port
;;   return the open socket if successful (return-socket == #t)
;;   expect the key expected-key returned in payload
;;   send our-key or #f as payload
;;
(define (nmsg-transport:ping hostn port #!key (timeout 3)(return-socket #t)(expected-key #f)(our-key #f)(socket #f))
  ;; send a random number along with pid and check that we get it back
  (let* ((req     (or socket (nn-socket 'req)))
	 (host    (if (or (not hostn)
  (let* ((host    (if (or (not hostn)
			  (equal? hostn "-")) ;; use localhost
		      (get-host-name)
		      hostn))
	 (success #f)
	 (keepwaiting #t)
	 (req     (or socket
		      (let ((soc (nn-socket 'req)))
			(nn-connect soc (conc "tcp://" host ":" port))
			soc)))
	 (dat     (db:obj->string (vector "ping" our-key) transport: 'nmsg))
	 (ping    (make-thread
	 (result  (nmsg-transport:client-api-send-receive-raw req dat timeout: timeout))
		   (lambda ()
		     (nn-send req dat)
		     (let* ((result  (nn-recv req))
			    (key     (vector-ref (db:string->obj result transport: 'nmsg) 1)))
		       (if (or (not expected-key) ;; just getting a reply is good enough then
			       (equal? key expected-key)) 
			   (begin
	 (success (vector-ref result 0))
	 (key     (if success 
		      (vector-ref (db:string->obj (vector-ref result 1) transport: 'nmsg) 1)
		      #f)))
    (debug:print 0 "success=" success ", key=" key ", expected-key=" expected-key ", equal? " (equal? key expected-key))
    (if (and success
	     (or (not expected-key) ;; just getting a reply is good enough then
		 (equal? key expected-key)))
	(if return-socket
	    req
	    (begin
			     ;; (print "ping, success: received \"" result "\"")
			     (set! success #t))
			   (begin
			     ;; (print "ping, failed: received key \"" result "\"")
			     (set! keepwaiting #f)
			     (set! success #f)))))
		   "ping"))
	 (timeout (make-thread (lambda ()
				 (let loop ((count 0))
				   (thread-sleep! 1)
				   (print "still waiting after count seconds...")
				   (if (and keepwaiting (< count timeout)) ;; yes, this is very aproximate
				       (loop (+ count 1))))
				 (if keepwaiting
				     (begin
				       (print "timeout waiting for ping")
				       (thread-terminate! ping))))
			       "timeout")))
	      (if (not socket)(nn-close req)) ;; don't want a side effect of closing socket if handed it
	      #t))
	(begin
	  (if (not socket)(nn-close req)) ;; failed to ping, close socket as side effect
	  #f))))

;; send data to server, wait max of timeout seconds for a response.
;; return #( success/fail result )
;;
(define (nmsg-transport:client-api-send-receive-raw socreq dat #!key (timeout 5))
  (let* ((success     #f)
	 (result      #f)
	 (keepwaiting #t)
	 (send-recv   (make-thread
		       (lambda ()
			 (nn-send socreq dat)
			 (let* ((res (nn-recv socreq)))
			   (set! success #t)
			   (set! result res)))
		       "send-recv"))
	 (timeout     (make-thread
		       (lambda ()
			 (let loop ((count 0))
			   (thread-sleep! 1)
			   (debug:print-info 1 "send-receive-raw, still waiting after " count " seconds...")
			   (if (and keepwaiting (< count timeout)) ;; yes, this is very aproximate
			       (loop (+ count 1))))
			 (if keepwaiting
			     (begin
			       (print "timeout waiting for ping")
			       (thread-terminate! send-recv))))
		       "timeout")))
    (if (not socket)(nn-connect req (conc "tcp://" host ":" port)))
    (handle-exceptions
     exn
     (begin
       ;; (print-call-chain)
       ;; (print 0 " message: " ((condition-property-accessor 'exn 'message) exn))
       ;; (print "exn=" (condition->list exn))
     (set! result "timeout")
       (debug:print-info 1 "ping failed to connect to " host ":" port))
     (thread-start! timeout)
     (thread-start! ping)
     (thread-join! ping)
     (thread-start! send-recv)
     (thread-join! send-recv)
     (if success (thread-terminate! timeout)))
    (if return-socket
	(if success req #f)
    (vector success result)))
	(begin
	  (nn-close req) ;; should it be closed if we were handed a socket?
	  success))))

;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (nmsg-transport:keep-running server-id)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
287
288
289
290
291
292
293


294

295
296
297
298



299
300



301
302


303
304
305
306
307
308
309
302
303
304
305
306
307
308
309
310

311
312



313
314
315


316
317
318


319
320
321
322
323
324
325
326
327







+
+
-
+

-
-
-
+
+
+
-
-
+
+
+
-
-
+
+







;; C L I E N T S
;;======================================================================

(define (nmsg-transport:client-connect iface portnum)
  (let* ((reqsoc      (nmsg-transport:ping iface portnum return-socket: #t)))
    (vector iface portnum #f #f #f (current-seconds) reqsoc)))

;; return #( success result )
;;
(define (nmsg-transport:client-api-send-receive run-id connection-info cmd param)
(define (nmsg-transport:client-api-send-receive run-id connection-info cmd param #!key (remtries 5))
  (mutex-lock! *http-mutex*)
  (let ((packet  (vector cmd param))
	(reqsoc  (http-transport:server-dat-get-socket connection-info)))
    (nn-send reqsoc (db:obj->string packet transport: 'nmsg))
  (let* ((packet  (db:obj->string (vector cmd param) transport: 'nmsg))
	 (reqsoc  (http-transport:server-dat-get-socket connection-info))
	 (rawres  (nmsg-transport:client-api-send-receive-raw reqsoc packet))
    (let ((res (db:string->obj (nn-recv reqsoc) transport: 'nmsg)))
      (mutex-unlock! *http-mutex*)
	 (status  (vector-ref rawres 0))
	 (result  (vector-ref rawres 1)))
    (mutex-unlock! *http-mutex*)
      res)))

    (vector status (if status (db:string->obj result transport: 'nmsg) result))))
	
;;======================================================================
;; J U N K 
;;======================================================================

;; DO NOT USE
;;
(define (nmsg-transport:client-signal-handler signum)