Megatest

Check-in [cf3341f204]
Login
Overview
Comment:fixed bug where client stopped working when server needed restart (cached rpc stub was not refreshing when port changed)
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | rpc-transport
Files: files | file ages | folders
SHA1: cf3341f20435119561aed393b90fd6252d5a4484
User & Date: bjbarcla on 2016-11-15 18:14:41
Other Links: branch diff | manifest | tags
Context
2016-11-16
16:08
Merged v1.62 into rpc-transport Closed-Leaf check-in: 534875ccf1 user: mrwellan tags: rpc-transport-merge-v1.62
14:45
deadlock msg check-in: a66741d98b user: bjbarcla tags: rpc-transport
2016-11-15
18:14
fixed bug where client stopped working when server needed restart (cached rpc stub was not refreshing when port changed) check-in: cf3341f204 user: bjbarcla tags: rpc-transport
16:37
sync point check-in: 1cf31de4df user: bjbarcla tags: rpc-transport
Changes

Modified megatest.scm from [56597888dd] to [debfa33c05].

780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808

809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
(if (or (args:get-arg "-list-servers")
	(args:get-arg "-stop-server")
        (args:get-arg "-kill-server"))
    (let ((tl (launch:setup)))
      (if tl 
	  (let* ((tdbdat  (tasks:open-db))
		 (servers (tasks:get-all-servers (db:delay-if-busy tdbdat)))
		 (fmtstr  "~5a~12a~8a~20a~24a~10a~10a~10a~10a\n")
		 (servers-to-kill '())
                 (kill-switch  (if (args:get-arg "-kill-server") "-9" ""))
                 (killinfo   (or (args:get-arg "-stop-server") (args:get-arg "-kill-server") ))
		 (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f))
		 (sid        (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f)))
	    (format #t fmtstr "Id" "MTver" "Pid" "Host" "Interface:OutPort" "InPort" "LastBeat" "State" "Transport")
	    (format #t fmtstr "==" "=====" "===" "====" "=================" "======" "========" "=====" "=========")
	    (for-each 
	     (lambda (server)
	       (let* ((id         (vector-ref server 0))
		      (pid        (vector-ref server 1))
		      (hostname   (vector-ref server 2))
		      (interface  (vector-ref server 3)) 
		      (pullport   (vector-ref server 4))
		      (pubport    (vector-ref server 5))
		      (start-time (vector-ref server 6))
		      (priority   (vector-ref server 7))
		      (state      (vector-ref server 8))
		      (mt-ver     (vector-ref server 9))
		      (last-update (vector-ref server 10)) 
		      (transport  (vector-ref server 11))

		      (killed     #f)
		      (status     (< last-update 20)))
		 ;;   (zmq-sockets (if status (server:client-connect hostname port) #f)))
		 ;; no need to login as status of #t indicates we are connecting to correct 
		 ;; server
		 (if (equal? state "dead")
		     (if (> last-update (* 25 60 60)) ;; keep records around for slighly over a day.
			 (tasks:server-deregister (db:delay-if-busy tdbdat) hostname pullport: pullport pid: pid action: 'delete))
		     (if (> last-update 20)        ;; Mark as dead if not updated in last 20 seconds
			 (tasks:server-deregister (db:delay-if-busy tdbdat) hostname pullport: pullport pid: pid)))
		 (format #t fmtstr id mt-ver pid hostname (conc interface ":" pullport) pubport last-update
			 (if status "alive" "dead") transport)
		 (if (or (equal? id sid)
			 (equal? sid 0)) ;; kill all/any
		     (begin
		       (debug:print-info 0 *default-log-port* "Attempting to kill "kill-switch" server with pid " pid)
		       (tasks:kill-server hostname pid kill-switch: kill-switch)))))
	     servers)
	    (debug:print-info 1 *default-log-port* "Done with listservers")







|





|
|














>










|
|







780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
(if (or (args:get-arg "-list-servers")
	(args:get-arg "-stop-server")
        (args:get-arg "-kill-server"))
    (let ((tl (launch:setup)))
      (if tl 
	  (let* ((tdbdat  (tasks:open-db))
		 (servers (tasks:get-all-servers (db:delay-if-busy tdbdat)))
		 (fmtstr  "~5a~6a~12a~8a~20a~24a~10a~10a~10a~10a\n")
		 (servers-to-kill '())
                 (kill-switch  (if (args:get-arg "-kill-server") "-9" ""))
                 (killinfo   (or (args:get-arg "-stop-server") (args:get-arg "-kill-server") ))
		 (khost-port (if killinfo (if (substring-index ":" killinfo)(string-split ":") #f) #f))
		 (sid        (if killinfo (if (substring-index ":" killinfo) #f (string->number killinfo)) #f)))
	    (format #t fmtstr "Id" "RunId" "MTver" "Pid" "Host" "Interface:OutPort" "InPort" "LastBeat" "State" "Transport")
	    (format #t fmtstr "==" "=====" "=====" "===" "====" "=================" "======" "========" "=====" "=========")
	    (for-each 
	     (lambda (server)
	       (let* ((id         (vector-ref server 0))
		      (pid        (vector-ref server 1))
		      (hostname   (vector-ref server 2))
		      (interface  (vector-ref server 3)) 
		      (pullport   (vector-ref server 4))
		      (pubport    (vector-ref server 5))
		      (start-time (vector-ref server 6))
		      (priority   (vector-ref server 7))
		      (state      (vector-ref server 8))
		      (mt-ver     (vector-ref server 9))
		      (last-update (vector-ref server 10)) 
		      (transport  (vector-ref server 11))
                      (run-id  (vector-ref server 12))
		      (killed     #f)
		      (status     (< last-update 20)))
		 ;;   (zmq-sockets (if status (server:client-connect hostname port) #f)))
		 ;; no need to login as status of #t indicates we are connecting to correct 
		 ;; server
		 (if (equal? state "dead")
		     (if (> last-update (* 25 60 60)) ;; keep records around for slighly over a day.
			 (tasks:server-deregister (db:delay-if-busy tdbdat) hostname pullport: pullport pid: pid action: 'delete))
		     (if (> last-update 20)        ;; Mark as dead if not updated in last 20 seconds
			 (tasks:server-deregister (db:delay-if-busy tdbdat) hostname pullport: pullport pid: pid)))
		 (format #t fmtstr id run-id mt-ver pid hostname (conc interface ":" pullport) pubport last-update
			 (if status state "dead") transport)
		 (if (or (equal? id sid)
			 (equal? sid 0)) ;; kill all/any
		     (begin
		       (debug:print-info 0 *default-log-port* "Attempting to kill "kill-switch" server with pid " pid)
		       (tasks:kill-server hostname pid kill-switch: kill-switch)))))
	     servers)
	    (debug:print-info 1 *default-log-port* "Done with listservers")

Modified rmt.scm from [eac4b98e46] to [d69ae51ab1].

116
117
118
119
120
121
122





123


124
125
126
127
128
129
130
131
132
133
134
135
136
137

138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161


162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179




180
181
182
183
184
185
186
187
188


189
190
191
192
193
194
195
196
197
198
199
200
201


202
203
204
205
206
207
208
;; RA => e.g. usage (rmt:send-receive 'get-var #f (list varname))
;;

(define *rmt:srmutex* (make-mutex))

(define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected
  ;; side-effect: clean out old connections





  (mutex-lock! *rmt:srmutex*)


  (let ((expire-time (- (current-seconds) (server:get-timeout) 10))) ;; don't forget the 10 second margin
    (for-each 
     (lambda (run-id)
       (let ((connection (rmt:get-cinfo run-id)))
         (if (and (vector? connection)
        	  (< (http-transport:server-dat-get-last-access connection) expire-time)) ;; BB> BBTODO: make this generic, not http transport specific.
             (begin
               (debug:print-info 0 *default-log-port* "Discarding connection to server for run-id " run-id ", too long between accesses")
               (hash-table-delete! *runremote* run-id)))))
     (hash-table-keys *runremote*)))
  
  (let* ((run-id     (if rid rid 0))
	 (connection-info (rmt:get-connection-info-start-server-if-none run-id)))
    ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also)

    (if connection-info
	;; use the server if have connection info
	(let* ((transport-type (rmt:run-id->transport-type run-id))
               
               ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
               ;;  Here, we make request to remote server
               ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
               (dat     (begin
                          
                          (case transport-type ;; BB: replaced *transport-type* global with run-id specific transport-type
                            ((http)(condition-case
                                    (http-transport:client-api-send-receive run-id connection-info cmd params) 
                                    ((commfail)(vector #f "communications fail"))
                                    ((exn)(vector #f "other fail"))))
                            ((rpc) (rpc-transport:client-api-send-receive run-id connection-info cmd params)) ;; BB: let us error out for now
                            (else  
                             (debug:print-error 0 *default-log-port* "(1) Transport [" transport-type
                                                "] specified for run-id [" run-id
                                                "] is not implemented in rmt:send-receive.  Cannot proceed." (symbol? transport-type))
                             (vector #f (conc "transport ["transport-type"] unimplemented"))))))
               
               
	       (success (if (vector? dat) (vector-ref dat 0) #f))
	       (res     (if (vector? dat) (vector-ref dat 1) #f)))


	  (if (vector? connection-info)(http-transport:server-dat-update-last-access connection-info)) ;; BB> BBTODO: make this generic, not http transport specific.
	  (if success
	      (begin
                (mutex-unlock! *rmt:srmutex*)
		;; (mutex-unlock! *send-receive-mutex*)
		(case transport-type 
		  ((http rpc) res) ;; (db:string->obj res))
                  (else
                   (debug:print-error 0 *default-log-port* "(2) Transport [" transport-type
                                      "] specified for run-id [" run-id
                                      "] is not implemented in rmt:send-receive.  Cannot proceed. Also unexpected since this branch follows success which would follow a suported transport...")
                   #f)
                  )) ;; (vector-ref res 1)))
              
              ;; no success...
	      (begin ;; let ((new-connection-info (client:setup run-id)))
		(debug:print 0 *default-log-port* "WARNING: Communication failed, trying call to rmt:send-receive again.")
                (mutex-unlock! *rmt:srmutex*)




                (case transport-type
                  
                  ((http rpc)
                   (hash-table-delete! *runremote* run-id) ;; don't keep using the same connection
                   ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. 
                   ;; (if (eq? (modulo attemptnum 5) 0)
                   ;;     (tasks:kill-server-run-id run-id tag: "api-send-receive-failed"))
                   ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications
                   (tasks:start-and-wait-for-server (tasks:open-db) run-id 15)


                   ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1))))))
                   
                   ;; no longer killing the server in http-transport:client-api-send-receive
                   ;; may kill it here but what are the criteria?
                   ;; start with three calls then kill server
                   ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id))
                   ;; (thread-sleep! 2)
                   (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))
                  (else
                   (debug:print-error 0 *default-log-port* "(3) Transport [" transport-type
                                      "] specified for run-id [" run-id
                                      "] is not implemented in rmt:send-receive.  Cannot proceed.")
                   (exit 1))))))


        
	;; no connection info; try to start a server
	;;
	;; Note: The tasks db was checked for a server in starting mode in the rmt:get-connection-info call
	;;
        (begin
          (mutex-unlock! *rmt:srmutex*)







>
>
>
>
>

>
>














>



|




















>
>
|

















>
>
>
>
|

|
|
|
|
|
|
|
>
>
|

|
|
|
|
|
|
|
|
|
|
|
>
>







116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
;; RA => e.g. usage (rmt:send-receive 'get-var #f (list varname))
;;

(define *rmt:srmutex* (make-mutex))

(define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected
  ;; side-effect: clean out old connections

  (when (eq? (modulo attemptnum 5) 0)
    (debug:print-error 0 *default-log-port* "rmt:send-receive did not succeed after "(sub1 attemptnum)" tries.  Aborting. (cmd="cmd" rid="rid" param="params)
    (exit 1))

  (mutex-lock! *rmt:srmutex*)

  ;; expire connections
  (let ((expire-time (- (current-seconds) (server:get-timeout) 10))) ;; don't forget the 10 second margin
    (for-each 
     (lambda (run-id)
       (let ((connection (rmt:get-cinfo run-id)))
         (if (and (vector? connection)
        	  (< (http-transport:server-dat-get-last-access connection) expire-time)) ;; BB> BBTODO: make this generic, not http transport specific.
             (begin
               (debug:print-info 0 *default-log-port* "Discarding connection to server for run-id " run-id ", too long between accesses")
               (hash-table-delete! *runremote* run-id)))))
     (hash-table-keys *runremote*)))
  
  (let* ((run-id     (if rid rid 0))
	 (connection-info (rmt:get-connection-info-start-server-if-none run-id)))
    ;; the nmsg method does the encoding under the hood (the http method should be changed to do this also)
    (BB> "in rmt:send-receive; run-id="run-id";;connection-info="connection-info)
    (if connection-info
	;; use the server if have connection info
	(let* ((transport-type (rmt:run-id->transport-type run-id))

               ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
               ;;  Here, we make request to remote server
               ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
               (dat     (begin
                          
                          (case transport-type ;; BB: replaced *transport-type* global with run-id specific transport-type
                            ((http)(condition-case
                                    (http-transport:client-api-send-receive run-id connection-info cmd params) 
                                    ((commfail)(vector #f "communications fail"))
                                    ((exn)(vector #f "other fail"))))
                            ((rpc) (rpc-transport:client-api-send-receive run-id connection-info cmd params)) ;; BB: let us error out for now
                            (else  
                             (debug:print-error 0 *default-log-port* "(1) Transport [" transport-type
                                                "] specified for run-id [" run-id
                                                "] is not implemented in rmt:send-receive.  Cannot proceed." (symbol? transport-type))
                             (vector #f (conc "transport ["transport-type"] unimplemented"))))))
               
               
	       (success (if (vector? dat) (vector-ref dat 0) #f))
	       (res     (if (vector? dat) (vector-ref dat 1) #f)))
          (BB> "in rmt:send-receive; transport-type="transport-type" success="success" connection-info="connection-info" res="res " dat="dat)
          (if (and success (vector? connection-info))
              (http-transport:server-dat-update-last-access connection-info)) ;; BB> BBTODO: make this generic, not http transport specific.
	  (if success
	      (begin
                (mutex-unlock! *rmt:srmutex*)
		;; (mutex-unlock! *send-receive-mutex*)
		(case transport-type 
		  ((http rpc) res) ;; (db:string->obj res))
                  (else
                   (debug:print-error 0 *default-log-port* "(2) Transport [" transport-type
                                      "] specified for run-id [" run-id
                                      "] is not implemented in rmt:send-receive.  Cannot proceed. Also unexpected since this branch follows success which would follow a suported transport...")
                   #f)
                  )) ;; (vector-ref res 1)))
              
              ;; no success...
	      (begin ;; let ((new-connection-info (client:setup run-id)))
		(debug:print 0 *default-log-port* "WARNING: Communication failed, trying call to rmt:send-receive again.")
                (mutex-unlock! *rmt:srmutex*)
                (rmt:del-cinfo run-id) ;; don't keep using the same connection
                (rmt:send-receive cmd rid params attemptnum: attemptnum)


                ;; (case transport-type
                  
                ;;   ((http rpc)

                ;;    ;; NOTE: killing server causes this process to block forever. No idea why. Dec 2. 
                ;;    ;; (if (eq? (modulo attemptnum 5) 0)
                ;;    ;;     (tasks:kill-server-run-id run-id tag: "api-send-receive-failed"))
                ;;    ;; (mutex-unlock! *send-receive-mutex*) ;; close the mutex here to allow other threads access to communications
                ;;    (tasks:start-and-wait-for-server (tasks:open-db) run-id 15)
                ;;    (thread-sleep! 5)
                                   
                ;;    ;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1))))))
                   
                ;;    ;; no longer killing the server in http-transport:client-api-send-receive
                ;;    ;; may kill it here but what are the criteria?
                ;;    ;; start with three calls then kill server
                ;;    ;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id))
                ;;    ;; (thread-sleep! 2)
                ;;    (rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))
                ;;   (else
                ;;    (debug:print-error 0 *default-log-port* "(3) Transport [" transport-type
                ;;                       "] specified for run-id [" run-id
                ;;                       "] is not implemented in rmt:send-receive.  Cannot proceed.")
                ;;    (exit 1)))

                )))
        
	;; no connection info; try to start a server
	;;
	;; Note: The tasks db was checked for a server in starting mode in the rmt:get-connection-info call
	;;
        (begin
          (mutex-unlock! *rmt:srmutex*)

Modified rpc-transport.scm from [d50c6c50d7] to [f8b4c106d1].

242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
	(debug:print-error 0 *default-log-port* "call to rpc-transport:server-dat-update-last-access with non-vector!!"))))


(define *api-exec-ht* (make-hash-table))

;; let's see if caching the rpc stub curbs thread-profusion on server side
(define (rpc-transport:get-api-exec iface port)
  (let* ((lu (hash-table-ref/default *api-exec-ht* '(iface . port) #f)))
    (if lu
        lu
        (let ((res (rpc:procedure 'api-exec iface port)))
          (hash-table-set! *api-exec-ht* '(iface . port) res)
          res))))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; this client-side procedure makes rpc call to server and returns result
;;
(define (rpc-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 3))
  (BB> "entered nsport:client-api-send-receive with run-id="run-id " serverdat="serverdat" cmd="cmd" params="params" numretries="numretries)
  (if (not (vector? serverdat))
      (begin
        (BB> "WHAT?? for run-id="run-id", serverdat="serverdat)
        (print-call-chain)
        (exit 1)))
  (let* ((iface (rpc-transport:server-dat-get-iface serverdat))
         (port  (rpc-transport:server-dat-get-port serverdat))
         (res #f)
         (api-exec (rpc-transport:get-api-exec iface port))  
         (send-receive (lambda ()
                         (tcp-buffer-size 0)
                         (set! res (retry-thunk
                                    (lambda ()
                                      (condition-case
                                       ;;(vector #t (run-remote cmd params))
                                       (vector 'success (api-exec cmd params))







|



|






|








|







242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
	(debug:print-error 0 *default-log-port* "call to rpc-transport:server-dat-update-last-access with non-vector!!"))))


(define *api-exec-ht* (make-hash-table))

;; let's see if caching the rpc stub curbs thread-profusion on server side
(define (rpc-transport:get-api-exec iface port)
  (let* ((lu (hash-table-ref/default *api-exec-ht* (cons iface  port) #f)))
    (if lu
        lu
        (let ((res (rpc:procedure 'api-exec iface port)))
          (hash-table-set! *api-exec-ht* (cons iface port) res)
          res))))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; this client-side procedure makes rpc call to server and returns result
;;
(define (rpc-transport:client-api-send-receive run-id serverdat cmd params #!key (numretries 3))
  (BB> "entered rpc-transport:client-api-send-receive with run-id="run-id " serverdat="serverdat" cmd="cmd" params="params" numretries="numretries)
  (if (not (vector? serverdat))
      (begin
        (BB> "WHAT?? for run-id="run-id", serverdat="serverdat)
        (print-call-chain)
        (exit 1)))
  (let* ((iface (rpc-transport:server-dat-get-iface serverdat))
         (port  (rpc-transport:server-dat-get-port serverdat))
         (res #f)
         (api-exec (rpc-transport:get-api-exec iface port))  ;; chached by host/port. may need to clear...
         (send-receive (lambda ()
                         (tcp-buffer-size 0)
                         (set! res (retry-thunk
                                    (lambda ()
                                      (condition-case
                                       ;;(vector #t (run-remote cmd params))
                                       (vector 'success (api-exec cmd params))
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
	 (thread-terminate! th2)
         (BB> "alt got res="res)
	 (debug:print-info 11 *default-log-port* "got res=" res)
	 (if (vector? res)
             (case (vector-ref res 0)
               ((success) (vector #t (vector-ref res 1)))
               ((comms-fail)
                (debug:print 0 *default-log-port* "WARNING: comms failure for rpc request")
                ;;(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
                (vector #f (vector-ref res 1)))
               (else
                (debug:print-error 0 *default-log-port* "error occured at server, info=" (vector-ref res 1))
                (debug:print 0 *default-log-port* " client call chain:")
                (print-call-chain (current-error-port))
                (debug:print 0 *default-log-port* " server call chain:")







|







302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
	 (thread-terminate! th2)
         (BB> "alt got res="res)
	 (debug:print-info 11 *default-log-port* "got res=" res)
	 (if (vector? res)
             (case (vector-ref res 0)
               ((success) (vector #t (vector-ref res 1)))
               ((comms-fail)
                (debug:print 0 *default-log-port* "WARNING: comms failure for rpc request >>"res"<<")
                ;;(debug:print 0 *default-log-port* " message: " ((condition-property-accessor 'exn 'message) exn))
                (vector #f (vector-ref res 1)))
               (else
                (debug:print-error 0 *default-log-port* "error occured at server, info=" (vector-ref res 1))
                (debug:print 0 *default-log-port* " client call chain:")
                (print-call-chain (current-error-port))
                (debug:print 0 *default-log-port* " server call chain:")