Megatest

Check-in [7d309a338c]
Login
Overview
Comment:Ulex ping and goodbye working
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v1.70-captain-ulex | v1.70-defunct-try
Files: files | file ages | folders
SHA1: 7d309a338c13815e277e3404907d666d368598eb
User & Date: matt on 2020-01-19 23:00:20
Other Links: branch diff | manifest | tags
Context
2020-01-20
03:01
Speculatively removed inp and oup saving and close all ports check-in: 5263e9f2ce user: matt tags: v1.70-captain-ulex, v1.70-defunct-try
2020-01-19
23:00
Ulex ping and goodbye working check-in: 7d309a338c user: matt tags: v1.70-captain-ulex, v1.70-defunct-try
2020-01-18
21:55
Added placeholder for whoowns check-in: 407a60fccd user: matt tags: v1.70-captain-ulex, v1.70-defunct-try
Changes

Modified ulex/ulex.scm from [c6786edcec] to [bd67e74654].

65
66
67
68
69
70
71
72

73
74
75
76
77
78
79
80
81
82
83
84
85
86




87

88


89


90










91
92





93
94
95
96
97
98
99
	       (ipaddr (alist-ref 'ipaddr captn))
	       (pid    (alist-ref 'pid    captn))
	       (Z      (alist-ref 'Z      captn)))
	  (udat-captain-address-set! udata ipaddr)
	  (udat-captain-host-set!    udata host)
	  (udat-captain-port-set!    udata port)
	  (udat-captain-pid-set!     udata pid)
	  (if (ping udata (conc ipaddr ":" port))

	      udata
	      (begin
		(print "Found unreachable captain at " ipaddr ":" port ", removing pkt")
		(remove-captain-pkt udata captn)
		(setup))))
	(begin
	  (setup-as-captain udata)  ;; this saves the thread to captain-thread and starts the thread
	  (setup)))
    ))

;; connect to a specific dbfile
(define (connect udata dbfname dbtype)
  udata)





(define (ping udata host-port)

  (let* ((cookie (make-cookie udata))


	 (res (send udata host-port 'ping cookie (conc (current-seconds)) retval: #t)))


    ;; (print "got res=" res)










    (equal? res cookie)
    ))






;;======================================================================
;; network utilities
;;======================================================================

(define (rate-ip ipaddr)
  (regex-case ipaddr







|
>
|
|
|
|
|


|
<





>
>
>
>

>
|
>
>
|
>
>
|
>
>
>
>
>
>
>
>
>
>
|
|
>
>
>
>
>







65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81

82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
	       (ipaddr (alist-ref 'ipaddr captn))
	       (pid    (alist-ref 'pid    captn))
	       (Z      (alist-ref 'Z      captn)))
	  (udat-captain-address-set! udata ipaddr)
	  (udat-captain-host-set!    udata host)
	  (udat-captain-port-set!    udata port)
	  (udat-captain-pid-set!     udata pid)
	  (let-values (((success pingtime)(ping udata (conc ipaddr ":" port))))
	    (if success
		udata
		(begin
		  (print "Found unreachable captain at " ipaddr ":" port ", removing pkt")
		  (remove-captain-pkt udata captn)
		  (setup)))))
	(begin
	  (setup-as-captain udata)  ;; this saves the thread to captain-thread and starts the thread
	  (setup)))))


;; connect to a specific dbfile
(define (connect udata dbfname dbtype)
  udata)

;; returns: success pingtime
;;
;; NOTE: causes the callee to store the info on this host along with the dbs this host currently owns
;;
(define (ping udata host-port)
  (let* ((start  (current-milliseconds))
	 (cookie (make-cookie udata))
	 (dbs    (udat-my-dbs udata))
	 (msg    (string-intersperse dbs " "))
	 (res (send udata host-port 'ping cookie msg retval: #t))
	 (delta (- (current-milliseconds) start)))
    (values (equal? res cookie) delta)))

;; returns: success pingtime
;;
;; NOTE: causes all references to this worker to be wiped out in the callee (ususally the captain)
;;
(define (goodbye-ping udata host-port)
  (let* ((start  (current-milliseconds))
	 (cookie (make-cookie udata))
	 (dbs    (udat-my-dbs udata))
	 (res (send udata host-port 'goodbye cookie "nomsg" retval: #t))
	 (delta (- (current-milliseconds) start)))
    (values (equal? res cookie) delta)))

(define (goodbye-captain udata)
  (let* ((host-port (udat-captain-host-port udata)))
    (if host-port
	(goodbye-ping udata host-port)
	(values #f -1))))

;;======================================================================
;; network utilities
;;======================================================================

(define (rate-ip ipaddr)
  (regex-case ipaddr
164
165
166
167
168
169
170

171
172
173
174
175
176

177
178
179
180
181
182
183
184
185
186
187
188











189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
  (cpkt-spec       *captain-pktspec*)
  ;; this processes info
  (my-cpkt-key     #f)   ;; put Z card here when I create a pkt for myself as captain
  (my-address      #f)
  (my-hostname     #f)
  (my-port         #f)
  (my-pid          (current-process-id))

  ;; server and handler thread
  (serv-listener   #f)                 ;; this processes server info
  (handler-thread  #f)
  (mboxes          (make-hash-table))  ;; key => mbox
  ;; other servers
  (peers           (make-hash-table))  ;; host-port => peer record

  (handlers        (make-hash-table))  ;; dbfile => peer record
  (outgoing-conns  (make-hash-table))  ;; host:port -> conn
  (work-queue      (make-queue))       ;; most stuff goes here
  ;; (fast-queue      (make-queue))       ;; super quick stuff goes here (e.g. ping)
  (busy            #f)                 ;; is either of the queues busy, use to switch between queuing tasks or doing immediately
  ;; app info
  (appname         #f)
  (dbtypes         (make-hash-table))  ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ]
  ;; cookies
  (cnum            0) ;; cookie num
  )












;; struct for keeping track of others we are talking to

(defstruct peer
  (addr-port       #f)
  (hostname        #f)
  (pid             #f)
  (inp             #f)
  (oup             #f)
  (owns            '()) ;; list of databases this peer is currently handling
  )

(defstruct work
  (peer-dat   #f)
  (handlerkey #f)
  (qrykey     #f)
  (data       #f)







>






>
|











>
>
>
>
>
>
>
>
>
>
>








|







188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
  (cpkt-spec       *captain-pktspec*)
  ;; this processes info
  (my-cpkt-key     #f)   ;; put Z card here when I create a pkt for myself as captain
  (my-address      #f)
  (my-hostname     #f)
  (my-port         #f)
  (my-pid          (current-process-id))
  (my-dbs          '())
  ;; server and handler thread
  (serv-listener   #f)                 ;; this processes server info
  (handler-thread  #f)
  (mboxes          (make-hash-table))  ;; key => mbox
  ;; other servers
  (peers           (make-hash-table))  ;; host-port => peer record
  (dbowners        (make-hash-table))  ;; dbfile => host-port
  (handlers        (make-hash-table))  ;; dbfile => proc
  (outgoing-conns  (make-hash-table))  ;; host:port -> conn
  (work-queue      (make-queue))       ;; most stuff goes here
  ;; (fast-queue      (make-queue))       ;; super quick stuff goes here (e.g. ping)
  (busy            #f)                 ;; is either of the queues busy, use to switch between queuing tasks or doing immediately
  ;; app info
  (appname         #f)
  (dbtypes         (make-hash-table))  ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ]
  ;; cookies
  (cnum            0) ;; cookie num
  )


(define (udat-my-host-port udata)
  (if (and (udat-my-address udata)(udat-my-port udata))
      (conc (udat-my-address udata) ":" (udat-my-port udata))
      #f))

(define (udat-captain-host-port udata)
  (if (and (udat-captain-address udata)(udat-captain-port udata))
      (conc (udat-captain-address udata) ":" (udat-captain-port udata))
      #f))

;; struct for keeping track of others we are talking to

(defstruct peer
  (addr-port       #f)
  (hostname        #f)
  (pid             #f)
  (inp             #f)
  (oup             #f)
  (dbs            '()) ;; list of databases this peer is currently handling
  )

(defstruct work
  (peer-dat   #f)
  (handlerkey #f)
  (qrykey     #f)
  (data       #f)
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
    (udat-serv-listener-set! udata tlsn)
    udata))

(define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f))
  ;; I'm currently very fuzzy on whether it makes sense to be reusing the outgoing connections.
  ;; at the other end of the line I think the reciever has closed the ports - thus each message
  ;; requires new connection?
  (let* ((pdat (or (hash-table-ref/default (udat-outgoing-conns udata) host-port #f)
		   (handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC
		    exn
		    #f
		    (let ((npdat (make-peer addr-port: host-port)))
		      (if hostname (peer-hostname-set! npdat hostname))
		      (if pid (peer-pid-set! npdat pid))
		      (let-values (((ninp noup)(tcp-connect host-port)))







|







358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
    (udat-serv-listener-set! udata tlsn)
    udata))

(define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f))
  ;; I'm currently very fuzzy on whether it makes sense to be reusing the outgoing connections.
  ;; at the other end of the line I think the reciever has closed the ports - thus each message
  ;; requires new connection?
  (let* ((pdat (or #f #;(hash-table-ref/default (udat-outgoing-conns udata) host-port #f)
		   (handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC
		    exn
		    #f
		    (let ((npdat (make-peer addr-port: host-port)))
		      (if hostname (peer-hostname-set! npdat hostname))
		      (if pid (peer-pid-set! npdat pid))
		      (let-values (((ninp noup)(tcp-connect host-port)))
439
440
441
442
443
444
445
446
447
448





449
450
451









452
453
454
455
456
457
458
459
460
461
462
	  (print "controldat: " controldat " data: " data)
	  (match (string-split controldat)
		 ((handlerkey host-port pid qrykey params ...)
		  (print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params)
		  (case (string->symbol handlerkey)
		    ((ack)(print "Got ack!"))
		    ((ping) ;; special case - return result immediately on the same connection
		     (let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f))
			    (val  (if proc (proc) "gotping"))
			    (peer (make-peer addr-port: host-port pid: pid)))





		       (if (not (hash-table-exists? (udat-peers udata) host-port))
			   (hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers
		       (write-line qrykey oup)









		       #;(send udata host-port "version" qrykey val)
		       )
		     (close-input-port inp)
		     (close-output-port oup))
		    ((rucaptain) ;; remote is asking if I'm the captain
		     (write-line (if (udat-my-cpkt-key udata) "yes" "no"))
		     (close-input-port inp)
		     (close-output-port oup))
		    ((whoowns) ;; given a db name who do I send my queries to
		     ;; look up the file in handlers, if have an entry ping them to be sure
		     ;; they are still alive and then return that host:port.







|
|
|
>
>
>
>
>



>
>
>
>
>
>
>
>
>
|
<
|
|







476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503

504
505
506
507
508
509
510
511
512
	  (print "controldat: " controldat " data: " data)
	  (match (string-split controldat)
		 ((handlerkey host-port pid qrykey params ...)
		  (print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params)
		  (case (string->symbol handlerkey)
		    ((ack)(print "Got ack!"))
		    ((ping) ;; special case - return result immediately on the same connection
		     (let* ((proc  (hash-table-ref/default (udat-handlers udata) 'ping #f))
			    (val   (if proc (proc) "gotping"))
			    (peer  (make-peer addr-port: host-port pid: pid))
			    (dbshash (udat-dbowners udata)))
		       (peer-dbs-set! peer params) ;; params for ping is list of dbs owned by pinger
		       (for-each (lambda (dbfile)
				   (hash-table-set! dbshash dbfile host-port))
				 params) ;; register each db in the dbshash
		       (if (not (hash-table-exists? (udat-peers udata) host-port))
			   (hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers
		       (write-line qrykey oup)
		       (close-input-port inp)
		       (close-output-port oup))) ;; End of ping
		    ((goodbye)
		     ;; remove all traces of the caller in db ownership etc.
		     (let* ((peer  (hash-table-ref/default (udat-peers udata) host-port #f))
			    (dbs   (if peer (peer-dbs peer) '()))
			    (dbshash (udat-dbowners udata)))
		       (for-each (lambda (dbfile)(hash-table-delete! dbshash dbfile)) dbs)
		       (hash-table-delete! (udat-peers udata) host-port)
		       (write-line qrykey oup)

		       (close-input-port inp)
		       (close-output-port oup)))
		    ((rucaptain) ;; remote is asking if I'm the captain
		     (write-line (if (udat-my-cpkt-key udata) "yes" "no"))
		     (close-input-port inp)
		     (close-output-port oup))
		    ((whoowns) ;; given a db name who do I send my queries to
		     ;; look up the file in handlers, if have an entry ping them to be sure
		     ;; they are still alive and then return that host:port.