Overview
Context
Changes
Modified ulex/ulex.scm
from [c6786edcec]
to [bd67e74654].
︙ | | |
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
-
-
-
-
-
-
+
+
+
+
+
+
+
-
+
-
+
+
+
+
+
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
|
(ipaddr (alist-ref 'ipaddr captn))
(pid (alist-ref 'pid captn))
(Z (alist-ref 'Z captn)))
(udat-captain-address-set! udata ipaddr)
(udat-captain-host-set! udata host)
(udat-captain-port-set! udata port)
(udat-captain-pid-set! udata pid)
(if (ping udata (conc ipaddr ":" port))
udata
(begin
(print "Found unreachable captain at " ipaddr ":" port ", removing pkt")
(remove-captain-pkt udata captn)
(setup))))
(let-values (((success pingtime)(ping udata (conc ipaddr ":" port))))
(if success
udata
(begin
(print "Found unreachable captain at " ipaddr ":" port ", removing pkt")
(remove-captain-pkt udata captn)
(setup)))))
(begin
(setup-as-captain udata) ;; this saves the thread to captain-thread and starts the thread
(setup)))
(setup)))))
))
;; connect to a specific dbfile
(define (connect udata dbfname dbtype)
udata)
;; returns: success pingtime
;;
;; NOTE: causes the callee to store the info on this host along with the dbs this host currently owns
;;
(define (ping udata host-port)
(let* ((start (current-milliseconds))
(let* ((cookie (make-cookie udata))
(res (send udata host-port 'ping cookie (conc (current-seconds)) retval: #t)))
;; (print "got res=" res)
(equal? res cookie)
))
(cookie (make-cookie udata))
(dbs (udat-my-dbs udata))
(msg (string-intersperse dbs " "))
(res (send udata host-port 'ping cookie msg retval: #t))
(delta (- (current-milliseconds) start)))
(values (equal? res cookie) delta)))
;; returns: success pingtime
;;
;; NOTE: causes all references to this worker to be wiped out in the callee (ususally the captain)
;;
(define (goodbye-ping udata host-port)
(let* ((start (current-milliseconds))
(cookie (make-cookie udata))
(dbs (udat-my-dbs udata))
(res (send udata host-port 'goodbye cookie "nomsg" retval: #t))
(delta (- (current-milliseconds) start)))
(values (equal? res cookie) delta)))
(define (goodbye-captain udata)
(let* ((host-port (udat-captain-host-port udata)))
(if host-port
(goodbye-ping udata host-port)
(values #f -1))))
;;======================================================================
;; network utilities
;;======================================================================
(define (rate-ip ipaddr)
(regex-case ipaddr
|
︙ | | |
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
|
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
|
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
-
+
|
(cpkt-spec *captain-pktspec*)
;; this processes info
(my-cpkt-key #f) ;; put Z card here when I create a pkt for myself as captain
(my-address #f)
(my-hostname #f)
(my-port #f)
(my-pid (current-process-id))
(my-dbs '())
;; server and handler thread
(serv-listener #f) ;; this processes server info
(handler-thread #f)
(mboxes (make-hash-table)) ;; key => mbox
;; other servers
(peers (make-hash-table)) ;; host-port => peer record
(dbowners (make-hash-table)) ;; dbfile => host-port
(handlers (make-hash-table)) ;; dbfile => peer record
(handlers (make-hash-table)) ;; dbfile => proc
(outgoing-conns (make-hash-table)) ;; host:port -> conn
(work-queue (make-queue)) ;; most stuff goes here
;; (fast-queue (make-queue)) ;; super quick stuff goes here (e.g. ping)
(busy #f) ;; is either of the queues busy, use to switch between queuing tasks or doing immediately
;; app info
(appname #f)
(dbtypes (make-hash-table)) ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ]
;; cookies
(cnum 0) ;; cookie num
)
(define (udat-my-host-port udata)
(if (and (udat-my-address udata)(udat-my-port udata))
(conc (udat-my-address udata) ":" (udat-my-port udata))
#f))
(define (udat-captain-host-port udata)
(if (and (udat-captain-address udata)(udat-captain-port udata))
(conc (udat-captain-address udata) ":" (udat-captain-port udata))
#f))
;; struct for keeping track of others we are talking to
(defstruct peer
(addr-port #f)
(hostname #f)
(pid #f)
(inp #f)
(oup #f)
(owns '()) ;; list of databases this peer is currently handling
(dbs '()) ;; list of databases this peer is currently handling
)
(defstruct work
(peer-dat #f)
(handlerkey #f)
(qrykey #f)
(data #f)
|
︙ | | |
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
|
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
|
-
+
|
(udat-serv-listener-set! udata tlsn)
udata))
(define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f))
;; I'm currently very fuzzy on whether it makes sense to be reusing the outgoing connections.
;; at the other end of the line I think the reciever has closed the ports - thus each message
;; requires new connection?
(let* ((pdat (or (hash-table-ref/default (udat-outgoing-conns udata) host-port #f)
(let* ((pdat (or #f #;(hash-table-ref/default (udat-outgoing-conns udata) host-port #f)
(handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC
exn
#f
(let ((npdat (make-peer addr-port: host-port)))
(if hostname (peer-hostname-set! npdat hostname))
(if pid (peer-pid-set! npdat pid))
(let-values (((ninp noup)(tcp-connect host-port)))
|
︙ | | |
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
|
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
|
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
-
-
-
+
+
|
(print "controldat: " controldat " data: " data)
(match (string-split controldat)
((handlerkey host-port pid qrykey params ...)
(print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params)
(case (string->symbol handlerkey)
((ack)(print "Got ack!"))
((ping) ;; special case - return result immediately on the same connection
(let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f))
(val (if proc (proc) "gotping"))
(peer (make-peer addr-port: host-port pid: pid)))
(let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f))
(val (if proc (proc) "gotping"))
(peer (make-peer addr-port: host-port pid: pid))
(dbshash (udat-dbowners udata)))
(peer-dbs-set! peer params) ;; params for ping is list of dbs owned by pinger
(for-each (lambda (dbfile)
(hash-table-set! dbshash dbfile host-port))
params) ;; register each db in the dbshash
(if (not (hash-table-exists? (udat-peers udata) host-port))
(hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers
(write-line qrykey oup)
(close-input-port inp)
(close-output-port oup))) ;; End of ping
((goodbye)
;; remove all traces of the caller in db ownership etc.
(let* ((peer (hash-table-ref/default (udat-peers udata) host-port #f))
(dbs (if peer (peer-dbs peer) '()))
(dbshash (udat-dbowners udata)))
(for-each (lambda (dbfile)(hash-table-delete! dbshash dbfile)) dbs)
(hash-table-delete! (udat-peers udata) host-port)
#;(send udata host-port "version" qrykey val)
(write-line qrykey oup)
)
(close-input-port inp)
(close-output-port oup))
(close-input-port inp)
(close-output-port oup)))
((rucaptain) ;; remote is asking if I'm the captain
(write-line (if (udat-my-cpkt-key udata) "yes" "no"))
(close-input-port inp)
(close-output-port oup))
((whoowns) ;; given a db name who do I send my queries to
;; look up the file in handlers, if have an entry ping them to be sure
;; they are still alive and then return that host:port.
|
︙ | | |