Megatest

Diff
Login

Differences From Artifact [c28712df60]:

To Artifact [9c105ef210]:


59
60
61
62
63
64
65
66

67
68

69
70
71
72

73
74
75
76
77
78


79
80
81
82

83
84

85
86
87
88
89
90
91
92



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111


112
113
114
115
116
117
118
119
120
121
122
123
124
125

126
127
128
129

130
131
132
133
134
135
136


137
138
139
140
141
142
143
144

145
146
147

148
149
150
151
152
153
154
59
60
61
62
63
64
65

66
67

68
69
70
71

72
73
74
75
76


77
78
79
80
81

82
83

84
85
86
87
88
89



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109


110
111
112
113
114
115
116
117
118
119
120
121
122
123
124

125
126
127
128

129
130
131
132
133
134


135
136
137
138
139
140
141
142
143

144
145
146

147
148
149
150
151
152
153
154







-
+

-
+



-
+




-
-
+
+



-
+

-
+





-
-
-
+
+
+

















-
-
+
+













-
+



-
+





-
-
+
+







-
+


-
+







(define *server-loop-heart-beat* (current-seconds))
(define *heartbeat-mutex* (make-mutex))

;;======================================================================
;; S E R V E R
;;======================================================================

(define (nmsg-transport:run dbstruct hostn run-id server-id #!key (retrynum 1000))
(define (nmsg-transport:run dbstruct area-dat hostn run-id server-id #!key (retrynum 1000))
  (debug:print 2 "Attempting to start the server ...")
  (let* ((start-port      (portlogger:open-run-close portlogger:find-port))
  (let* ((start-port      (portlogger:open-run-close portlogger:find-port area-dat))
	 (server-thread   (make-thread (lambda ()
					 (nmsg-transport:try-start-server dbstruct run-id start-port server-id))
				       "server thread"))
	 (tdbdat          (tasks:open-db)))
	 (tdbdat          (tasks:open-db area-dat)))
    (thread-start! server-thread)
    (thread-sleep! 0.1)
    (if (nmsg-transport:ping hostn start-port timeout: 2 expected-key: (current-process-id))
	(let ((interface (if (equal? hostn "-")(get-host-name) hostn)))
	  (tasks:server-set-interface-port (db:delay-if-busy tdbdat) server-id interface start-port)
	  (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep")
	  (tasks:server-set-interface-port (db:delay-if-busy tdbdat area-dat) server-id interface start-port)
	  (tasks:server-set-state! (db:delay-if-busy tdbdat area-dat) server-id "dbprep")
	  (set! *server-info* (list hostn start-port)) ;; probably not needed anymore? currently used by keep-running
	  (thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access
	  ;; (set! *inmemdb*  dbstruct)
	  (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running")
	  (tasks:server-set-state! (db:delay-if-busy tdbdat area-dat) server-id "running")
	  (thread-start! (make-thread
			  (lambda ()(nmsg-transport:keep-running server-id run-id))
			  (lambda ()(nmsg-transport:keep-running server-id run-id area-dat))
			  "keep running"))
	  (thread-join! server-thread))
	(if (> retrynum 0)
	    (begin
	      (debug:print 0 "WARNING: Failed to connect to server (self) on host " hostn ":" start-port ", trying again.")
	      (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature")
	      (portlogger:open-run-close portlogger:set-failed start-port)
	      (nmsg-transport:run dbstruct hostn run-id server-id))
	      (tasks:server-delete-record (db:delay-if-busy tdbdat area-dat) server-id "failed to start, never received server alive signature")
	      (portlogger:open-run-close portlogger:set-failed area-dat start-port)
	      (nmsg-transport:run dbstruct area-dat hostn run-id server-id))
	    (begin
	      (debug:print 0 "ERROR: could not find an open port to start server on. Giving up")
	      (exit 1))))))

(define (nmsg-transport:try-start-server dbstruct run-id portnum server-id)
  (let ((repsoc (nn-socket 'rep)))
    (nn-bind repsoc (conc "tcp://*:" portnum))
    (let loop ((msg-in (nn-recv repsoc)))
      (let* ((dat    (db:string->obj msg-in transport: 'nmsg)))
	(debug:print 0 "server, received: " dat)
	(let ((result (api:execute-requests dbstruct dat)))
	  (debug:print 0 "server, sending: " result)
	  (nn-send repsoc (db:obj->string result  transport: 'nmsg)))
	(loop (nn-recv repsoc))))))

;; all routes though here end in exit ...
;;
(define (nmsg-transport:launch run-id)
  (let* ((tdbdat   (tasks:open-db))
(define (nmsg-transport:launch run-id area-dat)
  (let* ((tdbdat   (tasks:open-db area-dat))
	 (dbstruct (db:setup run-id))
	 (hostn    (or (args:get-arg "-server") "-")))
    (set! *run-id*   run-id)
    (set! *inmemdb* dbstruct)
    ;; with nbfake daemonize isn't really needed
    ;;
    ;; (if (args:get-arg "-daemonize")
    ;;     (begin
    ;;       (daemon:ize)
    ;;       (if *alt-log-file* ;; we should re-connect to this port, I think daemon:ize disrupts it
    ;;           (begin
    ;;     	(current-error-port *alt-log-file*)
    ;;     	(current-output-port *alt-log-file*)))))
    (if (server:check-if-running run-id)
    (if (server:check-if-running run-id area-dat)
	(begin
	  (debug:print-info 0 "Server for run-id " run-id " already running")
	  (exit 0)))
    (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id))
    (let loop ((server-id (tasks:server-lock-slot (db:delay-if-busy tdbdat area-dat) run-id))
	       (remtries  4))
      (if (not server-id)
	  (if (> remtries 0)
	      (begin
		(thread-sleep! 2)
		(if (not (server:check-if-running run-id))
		    (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat) run-id)
		(if (not (server:check-if-running run-id area-dat))
		    (loop (tasks:server-lock-slot (db:delay-if-busy tdbdat area-dat) run-id)
			  (- remtries 1))
		    (begin
		      (debug:print-info 0 "Another server took the slot, exiting")
		      (exit 0))))
	      (begin
		;; since we didn't get the server lock we are going to clean up and bail out
		(debug:print-info 2 "INFO: server pid=" (current-process-id) ", hostname=" (get-host-name) " not starting due to other candidates ahead in start queue")
		(tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat) " http-transport:launch")
		(tasks:server-delete-records-for-this-pid (db:delay-if-busy tdbdat area-dat) " http-transport:launch")
		))
	  ;; locked in a server id, try to start up
	  (nmsg-transport:run dbstruct hostn run-id server-id))
	  (nmsg-transport:run dbstruct area-dat hostn run-id server-id))
      (set! *didsomething* #t)
      (exit))))

;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

250
251
252
253
254
255
256
257

258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277


278
279
280
281
282
283
284
250
251
252
253
254
255
256

257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275


276
277
278
279
280
281
282
283
284







-
+


















-
-
+
+







	      (signal (vector-ref result 0))))
	(signal (make-composite-condition
		 (make-property-condition 'timeout 'message "nmsg-transport:client-api-send-receive-raw timed out talking to server"))))))

;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (nmsg-transport:keep-running server-id run-id)
(define (nmsg-transport:keep-running server-id run-id area-dat)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
                        (let ((sdat #f))
                          (mutex-lock! *heartbeat-mutex*)
                          (set! sdat *server-info*)
                          (mutex-unlock! *heartbeat-mutex*)
                          (if sdat 
			      (begin
				(debug:print-info 0 "keep-running got sdat=" sdat)
				sdat)
                              (begin
                                (thread-sleep! 0.5)
                                (loop))))))
         (iface       (car server-info))
         (port        (cadr server-info))
         (last-access 0)
	 (tdbdat      (tasks:open-db))
	 (server-timeout (let ((tmo (configf:lookup  *configdat* "server" "timeout")))
	 (tdbdat      (tasks:open-db area-dat))
	 (server-timeout (let ((tmo (configf:lookup  (megatest:area-configdat area-dat) "server" "timeout")))
			   (if (and (string? tmo)
				    (string->number tmo))
			       (* 60 60 (string->number tmo))
			       ;; (* 3 24 60 60) ;; default to three days
			       (* 60 1)         ;; default to one minute
			       ;; (* 60 60 25)      ;; default to 25 hours
			       ))))
301
302
303
304
305
306
307
308

309
310
311
312
313
314
315
301
302
303
304
305
306
307

308
309
310
311
312
313
314
315







-
+







            (begin
              (debug:print-info 0 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
              (loop 0))
            (begin
              (debug:print-info 0 "Starting to shutdown the server.")
              (set! *time-to-exit* #t)
	      (db:sync-touched *inmemdb* run-id force-sync: #t)
              (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " http-transport:keep-running")
              (tasks:server-delete-record (db:delay-if-busy tdbdat area-dat) server-id " http-transport:keep-running")
              (debug:print-info 0 "Server shutdown complete. Exiting")
              (exit)
	      ))))))

;;======================================================================
;; C L I E N T S
;;======================================================================
340
341
342
343
344
345
346
347

348
349
350
351
352
353
354
355
356
357
358
340
341
342
343
344
345
346

347
348
349
350
351
352
353
354
355
356
357
358







-
+











;;
(define (nmsg-transport:client-signal-handler signum)
  (handle-exceptions
   exn
   (debug:print " ... exiting ...")
   (let ((th1 (make-thread (lambda ()
			     (if (not *received-response*)
				 (receive-message* *runremote*))) ;; flush out last call if applicable
				 (receive-message* (common:get-remote remote #f)))) ;; flush out last call if applicable
			   "eat response"))
	 (th2 (make-thread (lambda ()
			     (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.")
			     (thread-sleep! 3) ;; give the flush three seconds to do it's stuff
			     (debug:print 0 "       Done.")
			     (exit 4))
			   "exit on ^C timer")))
     (thread-start! th2)
     (thread-start! th1)
     (thread-join! th2))))