Megatest

Diff
Login

Differences From Artifact [f8b8e960d6]:

To Artifact [e99055ba4e]:


77
78
79
80
81
82
83
84

85
86

87
88
89
90
91
92

93
94
95
96


97
98
99
100
101
102
103
77
78
79
80
81
82
83

84
85

86
87
88
89
90
91

92

93


94
95
96
97
98
99
100
101
102







-
+

-
+





-
+
-

-
-
+
+







		  (remove-captain-pkt udata captn)
		  (setup)))))
	(begin
	  (setup-as-captain udata)  ;; this saves the thread to captain-thread and starts the thread
	  (setup)))))

;; connect to a specific dbfile
;;   - if already connected - return the pdat
;;   - if already connected - return the dbowner host-port
;;   - ask the captain who to talk to for this db
;;   - put the entry in the dbowners hash
;;   - put the entry in the dbowners hash as dbfile => host-port
;;
(define (connect udata dbfname dbtype)
  (or (hash-table-ref/default (udat-dbowners udata) dbfname #f)
      (let-values (((success dbowner-host-port)(get-db-owner udata dbfname dbtype)))
	(if success
	    (let* ((pdat     (udat-get-peer udata dbowner-host-port))
	    (begin
		   (dbowner  (make-dbowner pdat: pdat)))
	      ;; just clobber the record, this is the new data no matter what
	      (hash-table-set! (udat-dbowners udata) dbowner-host-port dbowner)
	      dbowner)
	      (hash-table-set! (udat-dbowners udata) dbfname dbowner-host-port)
	      dbowner-host-port)
	    #f))))

;; returns: success pingtime
;;
;; NOTE: causes the callee to store the info on this host along with the dbs this host currently owns
;;
(define (ping udata host-port)
153
154
155
156
157
158
159
160
161

162
163
164
165
166
167
168
169

170
171
172

173
174
175
176
177
178
179
152
153
154
155
156
157
158


159
160
161
162
163
164
165
166

167



168
169
170
171
172
173
174
175







-
-
+







-
+
-
-
-
+







  (let* ((dbrec (ulex-open-db udata dbname))     ;; this will be a dbconn record, looks for in udata first
	 (proc  (hash-table-ref udata prockey)))
    (let* ((result (proc dbrec procparam data)))
      result)))

;; remote-request - send to remote to process in process-request
;; uconn comes from a call to connect and can be used instead of calling connect again
;; uconn is somewhat redundant with dbname but it tells us what host-port to call
;; uconn is a dbowner struct < pdat lastupdate >
;; uconn is the host-port to call
;; we send dbname to the worker so they know which file to open
;; data must be a string with no newlines, it will be handed to the proc
;; at the remote site unchanged. It is up to the user to encode/decode it's contents
;;
;;   rtype: immediate, read-only, normal, low-priority
;; 
(define (remote-request udata uconn rtype dbname prockey procparam data)
  (let* ((cookie    (make-cookie))
  (let* ((cookie    (make-cookie udata)))
	 (pdat      (dbowner-pdat uconn))
	 (host-port (peer-addr-port pdat)))
    (send-receive udata host-port rtype cookie data `(,prockey procparam))))
    (send-receive udata uconn rtype cookie data `(,prockey procparam))))

(define (ulex-open-db udata dbname)
  #f)

;;======================================================================
;; network utilities
;;======================================================================
298
299
300
301
302
303
304
305

306
307
308
309
310
311
312
294
295
296
297
298
299
300

301
302
303
304
305
306
307
308







-
+







(defstruct work
  (peer-dat   #f)
  (handlerkey #f)
  (qrykey     #f)
  (data       #f)
  (start      (current-milliseconds)))

(defstruct dbowner
#;(defstruct dbowner
  (pdat        #f)
  (last-update (current-seconds)))

;;======================================================================
;; Captain functions
;;======================================================================

414
415
416
417
418
419
420
421

422
423
424
425
426
427
428
410
411
412
413
414
415
416

417
418
419
420
421
422
423
424







-
+







     peers)))

;;======================================================================
;; server primitives
;;======================================================================

(define (make-cookie udata)
  (let ((newcnum (+ (udat-cnum udata))))
  (let ((newcnum (+ (udat-cnum udata) 1)))
    (udat-cnum-set! udata newcnum)
    (conc (udat-my-address udata) ":"
	  (udat-my-port    udata) "-"
	  (udat-my-pid     udata) "-"
	  newcnum)))

;; create a tcp listener and return a populated udat struct with
536
537
538
539
540
541
542
543

544
545
546
547
548
549
550
532
533
534
535
536
537
538

539
540
541
542
543
544
545
546







-
+







       ((ping) ;; special case - return result immediately on the same connection
	(let* ((proc  (hash-table-ref/default (udat-handlers udata) 'ping #f))
	       (val   (if proc (proc) "gotping"))
	       (peer  (make-peer addr-port: host-port pid: pid))
	       (dbshash (udat-dbowners udata)))
	  (peer-dbs-set! peer params) ;; params for ping is list of dbs owned by pinger
	  (for-each (lambda (dbfile)
		      (hash-table-set! dbshash dbfile host-port))
		      (hash-table-set! dbshash dbfile host-port)) ;; WRONG?
		    params) ;; register each db in the dbshash
	  (if (not (hash-table-exists? (udat-peers udata) host-port))
	      (hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers
	  qrykey)) ;; End of ping
       ((goodbye)
	;; remove all traces of the caller in db ownership etc.
	(let* ((peer  (hash-table-ref/default (udat-peers udata) host-port #f))
565
566
567
568
569
570
571
572

573
574
575
576
577

578

579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595


596
597
598
599
600
601
602
561
562
563
564
565
566
567

568

569
570
571
572
573

574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590

591
592
593
594
595
596
597
598
599







-
+
-




+
-
+
















-
+
+







       ((db-owner) ;; given a db name who do I send my queries to
	;; look up the file in handlers, if have an entry ping them to be sure
	;; they are still alive and then return that host:port.
	;; if no handler found or if the ping fails pick from peers the oldest that
	;; is managing the fewest dbs
	(match params
	  ((dbfile dbtype)
	   (let* ((curr-owner (hash-table-ref/default (udat-dbowners udata) dbfile #f))
	   (let* ((owner-host-port (hash-table-ref/default (udat-dbowners udata) dbfile #f)))
		  (owner-host-port (and curr-owner (peer-addr-port curr-owner))))
	     (if owner-host-port
		 (conc qrykey " " owner-host-port)
		 (let* ((pdat (or (hash-table-ref/default (udat-peers udata) host-port #f) ;; no owner - caller gets to own it!
				  (make-peer addr-port: host-port pid: pid dbs: `(,dbfile)))))
		   (hash-table-set! (udat-peers udata) host-port pdat)
		   (hash-table-set! (udat-dbowners udata) dbfile pdat)
		   (hash-table-set! (udat-dbowners udata) dbfile host-port)
		   (conc qrykey " " host-port)))))
	  (else (conc qrykey " BADDATA"))))
       ;; for work items:
       ;;    handler is one of; immediate, read-only, read-write, high-priority
       ((immediate read-only normal low-priority) ;; do this work immediately
	;; host-port (caller), pid (caller), qrykey (cookie), params <= all from first line
	;; data => a single line encoded however you want, or should I build json into it?
	(let* ((pdat (get-peer-dat udata host-port)))
	  (match params ;; dbfile prockey procparam
	    ((dbfile prockey procparam)
	     (case (string->symbol handlerkey)
	       ((immediate read-only)
		(process-request udata pdat dbfile qrykey prockey procparam data))
	       ((normal low-priority) ;; split off later and add logic to support low priority
		(add-to-work-queue udata pdat dbfile qrykey prockey procparam data))
	       (else
		#f))))))
		#f)))
	    (else (print "ERROR: params=" params)))))
       (else
	;; (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data)
	#f)))
    (else
     (print "BAD DATA? controldat=" controldat " data=" data)
     #f)));; handles the incoming messages and dispatches to queues