Megatest

Check-in [3f613cadf2]
Login
Overview
Comment:Scaffolding for send-receive
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v1.70-captain-ulex | v1.70-defunct-try
Files: files | file ages | folders
SHA1: 3f613cadf24fb972ea5485fe861a6d3a70ec13c6
User & Date: matt on 2020-01-20 22:34:03
Other Links: branch diff | manifest | tags
Context
2020-01-21
19:19
Switch from write-line and read-line to write and read for transport across the tcp connection check-in: 124ed3f5a6 user: matt tags: v1.70-captain-ulex, v1.70-defunct-try
2020-01-20
22:34
Scaffolding for send-receive check-in: 3f613cadf2 user: matt tags: v1.70-captain-ulex, v1.70-defunct-try
11:20
Refactored handler loop to facilitate calling locally check-in: a2267e910d user: matt tags: v1.70-captain-ulex, v1.70-defunct-try
Changes

Modified rmtmod.scm from [a1525563eb] to [3b1ae2dfc7].

26
27
28
29
30
31
32
33

34
35
36
37
38
39
40
(declare (uses itemsmod))
(declare (uses ulex))

(module rmtmod
	*
	
(import scheme chicken data-structures extras)
(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable)


(import (prefix ulex ulex:))

(import commonmod)
(import itemsmod)
(import apimod)
(import dbmod)







|
>







26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
(declare (uses itemsmod))
(declare (uses ulex))

(module rmtmod
	*
	
(import scheme chicken data-structures extras)
(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable
	s11n)

(import (prefix ulex ulex:))

(import commonmod)
(import itemsmod)
(import apimod)
(import dbmod)
85
86
87
88
89
90
91

92
93
94
95
96
97
98
99
100
101
102
103
104
105

106






107
108
109
110
111
112
113
114
;; setup the remote calls
(define (rmt:setup-ulex alldat)
  (let* ((udata (ulex:setup))) ;; establish connection to ulex
    (alldat-ulexdat-set! alldat udata)
    ;; register all needed procs
    (ulex:register-handler udata 'ping common:get-full-version)  ;; override ping with get-full-version
    (ulex:register-handler udata 'login common:get-full-version) ;; force setup of the connection

    udata))

;; set up a connection to the current owner of the dbfile associated with rid
;; then send the query to that dbfile owner and wait for a response.
;;
(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected
  (let* ((alldat   *alldat*)
	 (areapath (alldat-areapath alldat))
	 (dbtype   (if (or (not rid)(< rid 1)) ;; this is the criteria for "main.db"
		       'main 'runs))
	 (dbfname  (if (eq? dbtype 'main)
		       "main.db"
		       (conc rid ".db")))
	 (dbfile   (conc areapath "/.db/" dbfname))

	 (ulexconn (rmt:connect alldat dbfname dbtype)))  






    (rmt:open-qry-close-locally cmd 0 params)))

;;   
;; ;; #;(common:telemetry-log (conc "rmt:"(->string cmd))
;; ;; #;(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected
;; ;; 
;; ;; #;(common:telemetry-log (conc "rmt:"(->string cmd))
;; ;; payload: `((rid . ,rid)







>














>
|
>
>
>
>
>
>
|







86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
;; setup the remote calls
(define (rmt:setup-ulex alldat)
  (let* ((udata (ulex:setup))) ;; establish connection to ulex
    (alldat-ulexdat-set! alldat udata)
    ;; register all needed procs
    (ulex:register-handler udata 'ping common:get-full-version)  ;; override ping with get-full-version
    (ulex:register-handler udata 'login common:get-full-version) ;; force setup of the connection
    (ulex:register-handler udata 'execute api:execute-requests)
    udata))

;; set up a connection to the current owner of the dbfile associated with rid
;; then send the query to that dbfile owner and wait for a response.
;;
(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected
  (let* ((alldat   *alldat*)
	 (areapath (alldat-areapath alldat))
	 (dbtype   (if (or (not rid)(< rid 1)) ;; this is the criteria for "main.db"
		       'main 'runs))
	 (dbfname  (if (eq? dbtype 'main)
		       "main.db"
		       (conc rid ".db")))
	 (dbfile   (conc areapath "/.db/" dbfname))
	 (udata    (alldat-ulexdat alldat))
	 (ulexconn (rmt:connect alldat dbfname dbtype)))  ;; ulexconn is our new *runremote*, it is a dbowner struct < pdat lastrefresh >
    ;; need to call this on the other side 
    ;; (api:execute-requests dbstruct-local (vector (symbol->string cmd) params))))
    (with-input-from-string
	(ulex:remote-request udata ulexconn 'immediate dbfile 'execute rid (with-output-to-string (lambda ()(serialize params))))
       (lambda ()(deserialize)))))

#;(rmt:open-qry-close-locally cmd 0 params)

;;   
;; ;; #;(common:telemetry-log (conc "rmt:"(->string cmd))
;; ;; #;(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected
;; ;; 
;; ;; #;(common:telemetry-log (conc "rmt:"(->string cmd))
;; ;; payload: `((rid . ,rid)
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
;; 
;;      ;; not on homehost, do server query
;;      (else (extras-case-11 *default-log-port* runremote cmd params attemptnum rid)))))

;; bunch of small functions factored out of send-receive to make debug easier
;;

#;(define (extras-case-11 *default-log-port* runremote cmd params attemptnum rid)
  ;; (mutex-unlock! *rmt-mutex*)
  (debug:print-info 12 *default-log-port* "rmt:send-receive, case  9")
  ;; (mutex-lock! *rmt-mutex*)
  (let* ((conninfo (remote-conndat runremote))
	 (dat      (case (remote-transport runremote)
		     ((http) (condition-case ;; handling here has
					     ;; caused a lot of
					     ;; problems. However it
					     ;; is needed to deal with
					     ;; attemtped
					     ;; communication to
					     ;; servers that have gone
					     ;; away
				 #;(http-transport:client-api-send-receive 0 conninfo cmd params)
			      ((commfail)(vector #f "communications fail"))
			      ((exn)(vector #f "other fail" (print-call-chain)))))
		     (else
		      (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport runremote) " not supported")
		      (exit))))
	 (success  (if (vector? dat) (vector-ref dat 0) #f))
	 (res      (if (vector? dat) (vector-ref dat 1) #f)))
    (if (and (vector? conninfo) (< 5 (vector-length conninfo)))
	#t #;(http-transport:server-dat-update-last-access conninfo) ;; refresh access time
	(begin
	  (debug:print 0 *default-log-port* "INFO: Should not get here! conninfo=" conninfo)
	  (set! conninfo #f)
	  (remote-conndat-set! *runremote* #f) ;; NOTE: *runremote* is global copy of runremote. Purpose: factor out global.
	  #;(http-transport:close-connections  area-dat: runremote)))
    (debug:print-info 13 *default-log-port* "rmt:send-receive, case  9. conninfo=" conninfo " dat=" dat " runremote = " runremote)
    (mutex-unlock! *rmt-mutex*)
    (if success ;; success only tells us that the transport was
	;; successful, have to examine the data to see if
	;; there was a detected issue at the other end
	(extras-transport-succeded *default-log-port* *rmt-mutex* attemptnum runremote res params rid cmd)
	(extras-transport-failed *default-log-port* *rmt-mutex* attemptnum runremote cmd rid params)
	)))

;; (define (rmt:update-db-stats run-id rawcmd params duration)
;;   (mutex-lock! *db-stats-mutex*)
;;   (handle-exceptions
;;    exn
;;    (begin
;;      (debug:print 0 *default-log-port* "WARNING: stats collection failed in update-db-stats")







|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|







246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
;; 
;;      ;; not on homehost, do server query
;;      (else (extras-case-11 *default-log-port* runremote cmd params attemptnum rid)))))

;; bunch of small functions factored out of send-receive to make debug easier
;;

;;(define (extras-case-11 *default-log-port* runremote cmd params attemptnum rid)
;;  ;; (mutex-unlock! *rmt-mutex*)
;;  (debug:print-info 12 *default-log-port* "rmt:send-receive, case  9")
;;  ;; (mutex-lock! *rmt-mutex*)
;;  (let* ((conninfo (remote-conndat runremote))
;;	 (dat      (case (remote-transport runremote)
;;		     ((http) (condition-case ;; handling here has
;;					     ;; caused a lot of
;;					     ;; problems. However it
;;					     ;; is needed to deal with
;;					     ;; attemtped
;;					     ;; communication to
;;					     ;; servers that have gone
;;					     ;; away
;;				 #;(http-transport:client-api-send-receive 0 conninfo cmd params)
;;			      ((commfail)(vector #f "communications fail"))
;;			      ((exn)(vector #f "other fail" (print-call-chain)))))
;;		     (else
;;		      (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport runremote) " not supported")
;;		      (exit))))
;;	 (success  (if (vector? dat) (vector-ref dat 0) #f))
;;	 (res      (if (vector? dat) (vector-ref dat 1) #f)))
;;    (if (and (vector? conninfo) (< 5 (vector-length conninfo)))
;;	#t #;(http-transport:server-dat-update-last-access conninfo) ;; refresh access time
;;	(begin
;;	  (debug:print 0 *default-log-port* "INFO: Should not get here! conninfo=" conninfo)
;;	  (set! conninfo #f)
;;	  (remote-conndat-set! *runremote* #f) ;; NOTE: *runremote* is global copy of runremote. Purpose: factor out global.
;;	  #;(http-transport:close-connections  area-dat: runremote)))
;;    (debug:print-info 13 *default-log-port* "rmt:send-receive, case  9. conninfo=" conninfo " dat=" dat " runremote = " runremote)
;;    (mutex-unlock! *rmt-mutex*)
;;    (if success ;; success only tells us that the transport was
;;	;; successful, have to examine the data to see if
;;	;; there was a detected issue at the other end
;;	(extras-transport-succeded *default-log-port* *rmt-mutex* attemptnum runremote res params rid cmd)
;;	(extras-transport-failed *default-log-port* *rmt-mutex* attemptnum runremote cmd rid params)
;;	)))

;; (define (rmt:update-db-stats run-id rawcmd params duration)
;;   (mutex-lock! *db-stats-mutex*)
;;   (handle-exceptions
;;    exn
;;    (begin
;;      (debug:print 0 *default-log-port* "WARNING: stats collection failed in update-db-stats")

Modified ulex/ulex.scm from [48c02e2743] to [210fc6977d].

1
2
3
4
5
6
7
8
;;; ulex: Distributed sqlite3 db
;;;
;; Copyright (C) 2018 Matt Welland
;; Redistribution and use in source and binary forms, with or without
;; modification, is permitted.
;;
;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|







1
2
3
4
5
6
7
8
;; ulex: Distributed sqlite3 db
;;;
;; Copyright (C) 2018 Matt Welland
;; Redistribution and use in source and binary forms, with or without
;; modification, is permitted.
;;
;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
77
78
79
80
81
82
83




84
85








86
87
88
89
90
91
92
		  (remove-captain-pkt udata captn)
		  (setup)))))
	(begin
	  (setup-as-captain udata)  ;; this saves the thread to captain-thread and starts the thread
	  (setup)))))

;; connect to a specific dbfile




(define (connect udata dbfname dbtype)
  udata)









;; returns: success pingtime
;;
;; NOTE: causes the callee to store the info on this host along with the dbs this host currently owns
;;
(define (ping udata host-port)
  (let* ((start  (current-milliseconds))







>
>
>
>

|
>
>
>
>
>
>
>
>







77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
		  (remove-captain-pkt udata captn)
		  (setup)))))
	(begin
	  (setup-as-captain udata)  ;; this saves the thread to captain-thread and starts the thread
	  (setup)))))

;; connect to a specific dbfile
;;   - if already connected - return the pdat
;;   - ask the captain who to talk to for this db
;;   - put the entry in the dbowners hash
;;
(define (connect udata dbfname dbtype)
  (or (hash-table-ref/default (udat-dbowners udata) dbfname #f)
      (let-values (((success dbowner-host-port)(get-db-owner udata dbfname dbtype)))
	(if success
	    (let* ((pdat     (udat-get-peer udata dbowner-host-port))
		   (dbowner  (make-dbowner pdat: pdat)))
	      ;; just clobber the record, this is the new data no matter what
	      (hash-table-set! (udat-dbowners udata) dbowner-host-port dbowner)
	      dbowner)
	    #f))))

;; returns: success pingtime
;;
;; NOTE: causes the callee to store the info on this host along with the dbs this host currently owns
;;
(define (ping udata host-port)
  (let* ((start  (current-milliseconds))
110
111
112
113
114
115
116















































117
118
119
120
121
122
123
    (values (equal? res cookie) delta)))

(define (goodbye-captain udata)
  (let* ((host-port (udat-captain-host-port udata)))
    (if host-port
	(goodbye-ping udata host-port)
	(values #f -1))))
















































;;======================================================================
;; network utilities
;;======================================================================

(define (rate-ip ipaddr)
  (regex-case ipaddr







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
    (values (equal? res cookie) delta)))

(define (goodbye-captain udata)
  (let* ((host-port (udat-captain-host-port udata)))
    (if host-port
	(goodbye-ping udata host-port)
	(values #f -1))))

(define (get-db-owner udata dbname dbtype)
  (let* ((host-port (udat-captain-host-port udata)))
    (if host-port
	(let* ((cookie (make-cookie udata))
	       (msg    #f) ;; (conc dbname " " dbtype))
	       (params `(,dbname ,dbtype))
	       (res    (send udata host-port 'db-owner cookie msg params: params retval: #t)))
	  (match (string-split res)
	    ((retcookie owner-host-port)
	     (values (equal? retcookie cookie) owner-host-port))))
	(values #f -1))))

;; called in ulex-handler to dispatch work, called on the workers side
;;     calls (proc params data)
;;     returns result with cookie
;;
;; pdat is the info of the caller, used to send the result data
;; prockey is key into udat-handlers hash dereferencing a proc
;; procparam is a first param handed to proc - often to do further derefrencing
;; NOTE: params is intended to be a list of strings, encoding on data
;;       is up to the user but data must be a single line
;;
(define (process-request udata pdat dbname cookie prockey procparam data)
  (let* ((dbrec (ulex-open-db udata dbname))     ;; this will be a dbconn record, looks for in udata first
	 (proc  (hash-table-ref udata prockey)))
    (let* ((result (proc dbrec procparam data)))
      result)))

;; remote-request - send to remote to process in process-request
;; uconn comes from a call to connect and can be used instead of calling connect again
;; uconn is somewhat redundant with dbname but it tells us what host-port to call
;; uconn is a dbowner struct < pdat lastupdate >
;; we send dbname to the worker so they know which file to open
;; data must be a string with no newlines, it will be handed to the proc
;; at the remote site unchanged. It is up to the user to encode/decode it's contents
;;
;;   rtype: immediate, read-only, normal, low-priority
;; 
(define (remote-request udata uconn rtype dbname prockey procparam data)
  (let* ((cookie    (make-cookie))
	 (pdat      (dbowner-pdat uconn))
	 (host-port (peer-addr-port pdat)))
    (send-receive udata host-port rtype cookie data `(,prockey procparam))))

(define (ulex-open-db udata dbname)
  #f)

;;======================================================================
;; network utilities
;;======================================================================

(define (rate-ip ipaddr)
  (regex-case ipaddr
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
  (serv-listener   #f)                 ;; this processes server info
  (handler-thread  #f)
  (mboxes          (make-hash-table))  ;; key => mbox
  ;; other servers
  (peers           (make-hash-table))  ;; host-port => peer record
  (dbowners        (make-hash-table))  ;; dbfile => host-port
  (handlers        (make-hash-table))  ;; dbfile => proc
  (outgoing-conns  (make-hash-table))  ;; host:port -> conn
  (work-queue      (make-queue))       ;; most stuff goes here
  ;; (fast-queue      (make-queue))       ;; super quick stuff goes here (e.g. ping)
  (busy            #f)                 ;; is either of the queues busy, use to switch between queuing tasks or doing immediately
  ;; app info
  (appname         #f)
  (dbtypes         (make-hash-table))  ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ]
  ;; cookies







|







256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
  (serv-listener   #f)                 ;; this processes server info
  (handler-thread  #f)
  (mboxes          (make-hash-table))  ;; key => mbox
  ;; other servers
  (peers           (make-hash-table))  ;; host-port => peer record
  (dbowners        (make-hash-table))  ;; dbfile => host-port
  (handlers        (make-hash-table))  ;; dbfile => proc
  ;; (outgoing-conns  (make-hash-table))  ;; host:port -> conn
  (work-queue      (make-queue))       ;; most stuff goes here
  ;; (fast-queue      (make-queue))       ;; super quick stuff goes here (e.g. ping)
  (busy            #f)                 ;; is either of the queues busy, use to switch between queuing tasks or doing immediately
  ;; app info
  (appname         #f)
  (dbtypes         (make-hash-table))  ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ]
  ;; cookies
239
240
241
242
243
244
245




246
247
248
249
250
251
252
253
254
255
256
257
258
259


260
261
262


263


264


265
266
267
268
269
270
271
272
(defstruct work
  (peer-dat   #f)
  (handlerkey #f)
  (qrykey     #f)
  (data       #f)
  (start      (current-milliseconds)))





;;======================================================================
;; Captain functions
;;======================================================================

;; NB// This needs to be started in a thread
;;
;; setup to be a captain
;;   - start server
;;   - create pkt
;;   - start server port handler
;;
(define (setup-as-captain udata)
  (if (start-server-find-port udata) ;; puts the server in udata
      (if (create-captain-pkt udata)


	  (let* ((th (make-thread (lambda ()
				    (ulex-handler-loop udata)) "Captain handler")))
	    (udat-handler-thread-set! udata th)


	    (thread-start! th))


	  #f)


      #f))

;; given a pkts dir read 
;;
(define (get-all-captain-pkts udata)
  (let* ((pktsdir       (let ((d (udat-cpkts-dir udata)))
			  (if (file-exists? d)
			      d







>
>
>
>














>
>
|


>
>

>
>
|
>
>
|







298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
(defstruct work
  (peer-dat   #f)
  (handlerkey #f)
  (qrykey     #f)
  (data       #f)
  (start      (current-milliseconds)))

(defstruct dbowner
  (pdat        #f)
  (last-update (current-seconds)))

;;======================================================================
;; Captain functions
;;======================================================================

;; NB// This needs to be started in a thread
;;
;; setup to be a captain
;;   - start server
;;   - create pkt
;;   - start server port handler
;;
(define (setup-as-captain udata)
  (if (start-server-find-port udata) ;; puts the server in udata
      (if (create-captain-pkt udata)
	  (let* ((my-addr (udat-my-address udata))
		 (my-port (udat-my-port    udata))
		 (th (make-thread (lambda ()
				    (ulex-handler-loop udata)) "Captain handler")))
	    (udat-handler-thread-set! udata th)
	    (udat-captain-address-set! udata my-addr)
	    (udat-captain-port-set!    udata my-port)
	    (thread-start! th))
	  (begin
	    (print "ERROR: failed to create captain pkt")
	    #f))
      (begin
	(print "ERROR: failed to start server.")
	#f)))

;; given a pkts dir read 
;;
(define (get-all-captain-pkts udata)
  (let* ((pktsdir       (let ((d (udat-cpkts-dir udata)))
			  (if (file-exists? d)
			      d
390
391
392
393
394
395
396











397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
;;
;;  NOTE: qrykey is what was called the "cookie" previously
;;
;;     retval tells send to expect and wait for return data (one line) and return it or time out
;;       this is for ping where we don't want to necessarily have set up our own server yet.
;;
(define (send udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())(retval #f))











  (handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC
   exn
   #f 
   (let-values (((inp oup)(tcp-connect host-port)))
     ;;
     ;; CONTROL LINE:
     ;;    handlerkey host:port pid qrykey params ...
     ;;
     (let ((res
	    (if (and inp oup)
		(let* ((myhost (udat-my-address udata))
		       (myport (udat-my-port    udata))
		       (dat  (conc
			      handler " "
			      (udat-my-address  udata) ":" (udat-my-port udata) " "
			      ;; (udat-my-hostname udata) " "
			      (udat-my-pid  udata) " "
			      qrykey
			      (if (null? params) "" (conc " " (string-intersperse params " "))))))
		  (if (and myhost myport)
		      (begin
			(write-line dat  oup)
			(write-line data oup)
			;; (print "Sent dat: " dat " data: " data)
			(if retval
			    (read-line inp)
			    #t))
		      (begin
			(print "ERROR: send called but no receiver has been setup. Please call setup first!")
			#f))
		  ;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE!
		  ;;       (there is a listener for handling that)
		  )
		#f))) ;; #f means failed to connect and send
       (close-input-port inp)
       (close-output-port oup)
       res))))

;; send a request to the given host-port and register a mailbox in udata
;; wait for the mailbox data and return it
;;
(define (send-receive udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())(timeout 20))
  (let ((mbox      (make-mailbox))
	(mbox-time (current-milliseconds))







>
>
>
>
>
>
>
>
>
>
>
|
|
|
|
|
|
|
|
|
|
|
<
<
<
<
|
<
<
<
<
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|







461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489




490




491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
;;
;;  NOTE: qrykey is what was called the "cookie" previously
;;
;;     retval tells send to expect and wait for return data (one line) and return it or time out
;;       this is for ping where we don't want to necessarily have set up our own server yet.
;;
(define (send udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())(retval #f))
  (let* ((my-host-port (udat-my-host-port udata))
	 (isme         (equal? host-port my-host-port)) ;; am I calling myself?
	 (dat          (conc
			handler " "
			my-host-port " "
			(udat-my-pid  udata) " "
			qrykey
			(if (null? params) "" (conc " " (string-intersperse params " "))))))
    ;; (print "send isme is " (if isme "true!" "false!") ", my-host-port: " my-host-port ", host-port: " host-port)
    (if isme
	(ulex-handler udata dat data)
	(handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC
	    exn
	    #f 
	  (let-values (((inp oup)(tcp-connect host-port)))
	    ;;
	    ;; CONTROL LINE:
	    ;;    handlerkey host:port pid qrykey params ...
	    ;;
	    (let ((res
		   (if (and inp oup)
		       (let* ()




			 (if my-host-port




			     (begin
			       (write-line dat  oup)
			       (write-line data oup)
			       ;; (print "Sent dat: " dat " data: " data)
			       (if retval
				   (read-line inp)
				   #t))
			     (begin
			       (print "ERROR: send called but no receiver has been setup. Please call setup first!")
			       #f))
			 ;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE!
			 ;;       (there is a listener for handling that)
			 )
		       #f))) ;; #f means failed to connect and send
	      (close-input-port inp)
	      (close-output-port oup)
	      res))))))

;; send a request to the given host-port and register a mailbox in udata
;; wait for the mailbox data and return it
;;
(define (send-receive udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())(timeout 20))
  (let ((mbox      (make-mailbox))
	(mbox-time (current-milliseconds))
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
	  (if (eq? res 'MBOX_TIMEOUT)
	      #f
	      res))
	#f))) ;; #f means failed to communicate

;; 
(define (ulex-handler udata controldat data)
  (print "controldat: " controldat " data: " data)
  (match (string-split controldat)
    ((handlerkey host-port pid qrykey params ...)
     (print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params)
     (case (string->symbol handlerkey)
       ((ack)(print "Got ack!"))
       ((ping) ;; special case - return result immediately on the same connection
	(let* ((proc  (hash-table-ref/default (udat-handlers udata) 'ping #f))
	       (val   (if proc (proc) "gotping"))
	       (peer  (make-peer addr-port: host-port pid: pid))
	       (dbshash (udat-dbowners udata)))







|


|







523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
	  (if (eq? res 'MBOX_TIMEOUT)
	      #f
	      res))
	#f))) ;; #f means failed to communicate

;; 
(define (ulex-handler udata controldat data)
  ;; (print "controldat: " controldat " data: " data)
  (match (string-split controldat)
    ((handlerkey host-port pid qrykey params ...)
     ;; (print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params)
     (case (string->symbol handlerkey)
       ((ack)(print "Got ack!"))
       ((ping) ;; special case - return result immediately on the same connection
	(let* ((proc  (hash-table-ref/default (udat-handlers udata) 'ping #f))
	       (val   (if proc (proc) "gotping"))
	       (peer  (make-peer addr-port: host-port pid: pid))
	       (dbshash (udat-dbowners udata)))
484
485
486
487
488
489
490
491
492
493
494
495

























496
497
498
499
500
501
502
503
504
505
	(udat-captain-address-set! udata #f)
	(udat-captain-host-set!    udata #f)
	(udat-captain-port-set!    udata #f)
	(udat-captain-pid-set!     udata #f)
	qrykey)
       ((rucaptain) ;; remote is asking if I'm the captain
	(if (udat-my-cpkt-key udata) "yes" "no"))
       ((whoowns) ;; given a db name who do I send my queries to
	;; look up the file in handlers, if have an entry ping them to be sure
	;; they are still alive and then return that host:port.
	;; if no handler found or if the ping fails pick from peers the oldest that
	;; is managing the fewest dbs

























	#f)
       (else
	(add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data)
	#f)))
    (else
     (print "BAD DATA? controldat=" controldat " data=" data)
     #f)));; handles the incoming messages and dispatches to queues

;;
(define (ulex-handler-loop udata)







|




>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|

|







558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
	(udat-captain-address-set! udata #f)
	(udat-captain-host-set!    udata #f)
	(udat-captain-port-set!    udata #f)
	(udat-captain-pid-set!     udata #f)
	qrykey)
       ((rucaptain) ;; remote is asking if I'm the captain
	(if (udat-my-cpkt-key udata) "yes" "no"))
       ((db-owner) ;; given a db name who do I send my queries to
	;; look up the file in handlers, if have an entry ping them to be sure
	;; they are still alive and then return that host:port.
	;; if no handler found or if the ping fails pick from peers the oldest that
	;; is managing the fewest dbs
	(match params
	  ((dbfile dbtype)
	   (let* ((curr-owner (hash-table-ref/default (udat-dbowners udata) dbfile #f))
		  (owner-host-port (and curr-owner (peer-addr-port curr-owner))))
	     (if owner-host-port
		 (conc qrykey " " owner-host-port)
		 (let* ((pdat (or (hash-table-ref/default (udat-peers udata) host-port #f) ;; no owner - caller gets to own it!
				  (make-peer addr-port: host-port pid: pid dbs: `(,dbfile)))))
		   (hash-table-set! (udat-dbowners udata) dbfile pdat)
		   (conc qrykey " " host-port)))))
	  (else (conc qrykey " BADDATA"))))
       ;; for work items:
       ;;    handler is one of; immediate, read-only, read-write, high-priority
       ((immediate read-only normal low-priority) ;; do this work immediately
	;; host-port (caller), pid (caller), qrykey (cookie), params <= all from first line
	;; data => a single line encoded however you want, or should I build json into it?
	(let* ((pdat (get-peer-dat udata host-port)))
	  (match params ;; dbfile prockey procparam
	    ((dbfile prockey procparam)
	     (case (string->symbol handlerkey)
	       ((immediate read-only)
		(process-request udata pdat dbfile qrykey prockey procparam data))
	       ((normal low-priority) ;; split off later and add logic to support low priority
		(add-to-work-queue udata pdat dbfile qrykey prockey procparam data))
	       (else
		#f))))))
       (else
	;; (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data)
	#f)))
    (else
     (print "BAD DATA? controldat=" controldat " data=" data)
     #f)));; handles the incoming messages and dispatches to queues

;;
(define (ulex-handler-loop udata)
513
514
515
516
517
518
519
520


521
522
523
524
525
526
527
	       (data       (read-line inp))
	       (resp       (ulex-handler udata controldat data)))
	  (if resp (write-line resp oup))
	  (close-input-port inp)
	  (close-output-port oup))
	(loop state)))))

;; add a proc to the handler list


(define (register-handler udata key proc)
  (hash-table-set! (udat-handlers udata) key proc))


;;======================================================================
;; work queues
;;======================================================================







|
>
>







612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
	       (data       (read-line inp))
	       (resp       (ulex-handler udata controldat data)))
	  (if resp (write-line resp oup))
	  (close-input-port inp)
	  (close-output-port oup))
	(loop state)))))

;; add a proc to the handler list, these are done symetrically (i.e. in all instances)
;; so that the proc can be dereferenced remotely
;;
(define (register-handler udata key proc)
  (hash-table-set! (udat-handlers udata) key proc))


;;======================================================================
;; work queues
;;======================================================================
552
553
554
555
556
557
558

559
560
561
562
563
564
565
;;   sync on-disk db to inmem
;;   get lock in on-disk db for dbowner of this db
;;   put sync-proc, init-proc, on-disk handle, inmem handle in dbconn stuct
;;   return the stuct
;;======================================================================

(defstruct dbconn

  (inmem  #f)
  (conn   #f)
  (sync   #f) ;; sync proc
  (init   #f) ;; init proc
  (lastsync (current-seconds))
  )








>







653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
;;   sync on-disk db to inmem
;;   get lock in on-disk db for dbowner of this db
;;   put sync-proc, init-proc, on-disk handle, inmem handle in dbconn stuct
;;   return the stuct
;;======================================================================

(defstruct dbconn
  (fname  #f)
  (inmem  #f)
  (conn   #f)
  (sync   #f) ;; sync proc
  (init   #f) ;; init proc
  (lastsync (current-seconds))
  )