Megatest

Diff
Login

Differences From Artifact [f8b8e960d6]:

To Artifact [e99055ba4e]:


77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
		  (remove-captain-pkt udata captn)
		  (setup)))))
	(begin
	  (setup-as-captain udata)  ;; this saves the thread to captain-thread and starts the thread
	  (setup)))))

;; connect to a specific dbfile
;;   - if already connected - return the pdat
;;   - ask the captain who to talk to for this db
;;   - put the entry in the dbowners hash
;;
(define (connect udata dbfname dbtype)
  (or (hash-table-ref/default (udat-dbowners udata) dbfname #f)
      (let-values (((success dbowner-host-port)(get-db-owner udata dbfname dbtype)))
	(if success
	    (let* ((pdat     (udat-get-peer udata dbowner-host-port))
		   (dbowner  (make-dbowner pdat: pdat)))
	      ;; just clobber the record, this is the new data no matter what
	      (hash-table-set! (udat-dbowners udata) dbowner-host-port dbowner)
	      dbowner)
	    #f))))

;; returns: success pingtime
;;
;; NOTE: causes the callee to store the info on this host along with the dbs this host currently owns
;;
(define (ping udata host-port)







|

|





|
<

|
|







77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92

93
94
95
96
97
98
99
100
101
102
		  (remove-captain-pkt udata captn)
		  (setup)))))
	(begin
	  (setup-as-captain udata)  ;; this saves the thread to captain-thread and starts the thread
	  (setup)))))

;; connect to a specific dbfile
;;   - if already connected - return the dbowner host-port
;;   - ask the captain who to talk to for this db
;;   - put the entry in the dbowners hash as dbfile => host-port
;;
(define (connect udata dbfname dbtype)
  (or (hash-table-ref/default (udat-dbowners udata) dbfname #f)
      (let-values (((success dbowner-host-port)(get-db-owner udata dbfname dbtype)))
	(if success
	    (begin

	      ;; just clobber the record, this is the new data no matter what
	      (hash-table-set! (udat-dbowners udata) dbfname dbowner-host-port)
	      dbowner-host-port)
	    #f))))

;; returns: success pingtime
;;
;; NOTE: causes the callee to store the info on this host along with the dbs this host currently owns
;;
(define (ping udata host-port)
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
  (let* ((dbrec (ulex-open-db udata dbname))     ;; this will be a dbconn record, looks for in udata first
	 (proc  (hash-table-ref udata prockey)))
    (let* ((result (proc dbrec procparam data)))
      result)))

;; remote-request - send to remote to process in process-request
;; uconn comes from a call to connect and can be used instead of calling connect again
;; uconn is somewhat redundant with dbname but it tells us what host-port to call
;; uconn is a dbowner struct < pdat lastupdate >
;; we send dbname to the worker so they know which file to open
;; data must be a string with no newlines, it will be handed to the proc
;; at the remote site unchanged. It is up to the user to encode/decode it's contents
;;
;;   rtype: immediate, read-only, normal, low-priority
;; 
(define (remote-request udata uconn rtype dbname prockey procparam data)
  (let* ((cookie    (make-cookie))
	 (pdat      (dbowner-pdat uconn))
	 (host-port (peer-addr-port pdat)))
    (send-receive udata host-port rtype cookie data `(,prockey procparam))))

(define (ulex-open-db udata dbname)
  #f)

;;======================================================================
;; network utilities
;;======================================================================







<
|







|
<
<
|







152
153
154
155
156
157
158

159
160
161
162
163
164
165
166
167


168
169
170
171
172
173
174
175
  (let* ((dbrec (ulex-open-db udata dbname))     ;; this will be a dbconn record, looks for in udata first
	 (proc  (hash-table-ref udata prockey)))
    (let* ((result (proc dbrec procparam data)))
      result)))

;; remote-request - send to remote to process in process-request
;; uconn comes from a call to connect and can be used instead of calling connect again

;; uconn is the host-port to call
;; we send dbname to the worker so they know which file to open
;; data must be a string with no newlines, it will be handed to the proc
;; at the remote site unchanged. It is up to the user to encode/decode it's contents
;;
;;   rtype: immediate, read-only, normal, low-priority
;; 
(define (remote-request udata uconn rtype dbname prockey procparam data)
  (let* ((cookie    (make-cookie udata)))


    (send-receive udata uconn rtype cookie data `(,prockey procparam))))

(define (ulex-open-db udata dbname)
  #f)

;;======================================================================
;; network utilities
;;======================================================================
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
(defstruct work
  (peer-dat   #f)
  (handlerkey #f)
  (qrykey     #f)
  (data       #f)
  (start      (current-milliseconds)))

(defstruct dbowner
  (pdat        #f)
  (last-update (current-seconds)))

;;======================================================================
;; Captain functions
;;======================================================================








|







294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
(defstruct work
  (peer-dat   #f)
  (handlerkey #f)
  (qrykey     #f)
  (data       #f)
  (start      (current-milliseconds)))

#;(defstruct dbowner
  (pdat        #f)
  (last-update (current-seconds)))

;;======================================================================
;; Captain functions
;;======================================================================

414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
     peers)))

;;======================================================================
;; server primitives
;;======================================================================

(define (make-cookie udata)
  (let ((newcnum (+ (udat-cnum udata))))
    (udat-cnum-set! udata newcnum)
    (conc (udat-my-address udata) ":"
	  (udat-my-port    udata) "-"
	  (udat-my-pid     udata) "-"
	  newcnum)))

;; create a tcp listener and return a populated udat struct with







|







410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
     peers)))

;;======================================================================
;; server primitives
;;======================================================================

(define (make-cookie udata)
  (let ((newcnum (+ (udat-cnum udata) 1)))
    (udat-cnum-set! udata newcnum)
    (conc (udat-my-address udata) ":"
	  (udat-my-port    udata) "-"
	  (udat-my-pid     udata) "-"
	  newcnum)))

;; create a tcp listener and return a populated udat struct with
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
       ((ping) ;; special case - return result immediately on the same connection
	(let* ((proc  (hash-table-ref/default (udat-handlers udata) 'ping #f))
	       (val   (if proc (proc) "gotping"))
	       (peer  (make-peer addr-port: host-port pid: pid))
	       (dbshash (udat-dbowners udata)))
	  (peer-dbs-set! peer params) ;; params for ping is list of dbs owned by pinger
	  (for-each (lambda (dbfile)
		      (hash-table-set! dbshash dbfile host-port))
		    params) ;; register each db in the dbshash
	  (if (not (hash-table-exists? (udat-peers udata) host-port))
	      (hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers
	  qrykey)) ;; End of ping
       ((goodbye)
	;; remove all traces of the caller in db ownership etc.
	(let* ((peer  (hash-table-ref/default (udat-peers udata) host-port #f))







|







532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
       ((ping) ;; special case - return result immediately on the same connection
	(let* ((proc  (hash-table-ref/default (udat-handlers udata) 'ping #f))
	       (val   (if proc (proc) "gotping"))
	       (peer  (make-peer addr-port: host-port pid: pid))
	       (dbshash (udat-dbowners udata)))
	  (peer-dbs-set! peer params) ;; params for ping is list of dbs owned by pinger
	  (for-each (lambda (dbfile)
		      (hash-table-set! dbshash dbfile host-port)) ;; WRONG?
		    params) ;; register each db in the dbshash
	  (if (not (hash-table-exists? (udat-peers udata) host-port))
	      (hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers
	  qrykey)) ;; End of ping
       ((goodbye)
	;; remove all traces of the caller in db ownership etc.
	(let* ((peer  (hash-table-ref/default (udat-peers udata) host-port #f))
565
566
567
568
569
570
571
572
573
574
575
576
577

578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595

596
597
598
599
600
601
602
       ((db-owner) ;; given a db name who do I send my queries to
	;; look up the file in handlers, if have an entry ping them to be sure
	;; they are still alive and then return that host:port.
	;; if no handler found or if the ping fails pick from peers the oldest that
	;; is managing the fewest dbs
	(match params
	  ((dbfile dbtype)
	   (let* ((curr-owner (hash-table-ref/default (udat-dbowners udata) dbfile #f))
		  (owner-host-port (and curr-owner (peer-addr-port curr-owner))))
	     (if owner-host-port
		 (conc qrykey " " owner-host-port)
		 (let* ((pdat (or (hash-table-ref/default (udat-peers udata) host-port #f) ;; no owner - caller gets to own it!
				  (make-peer addr-port: host-port pid: pid dbs: `(,dbfile)))))

		   (hash-table-set! (udat-dbowners udata) dbfile pdat)
		   (conc qrykey " " host-port)))))
	  (else (conc qrykey " BADDATA"))))
       ;; for work items:
       ;;    handler is one of; immediate, read-only, read-write, high-priority
       ((immediate read-only normal low-priority) ;; do this work immediately
	;; host-port (caller), pid (caller), qrykey (cookie), params <= all from first line
	;; data => a single line encoded however you want, or should I build json into it?
	(let* ((pdat (get-peer-dat udata host-port)))
	  (match params ;; dbfile prockey procparam
	    ((dbfile prockey procparam)
	     (case (string->symbol handlerkey)
	       ((immediate read-only)
		(process-request udata pdat dbfile qrykey prockey procparam data))
	       ((normal low-priority) ;; split off later and add logic to support low priority
		(add-to-work-queue udata pdat dbfile qrykey prockey procparam data))
	       (else
		#f))))))

       (else
	;; (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data)
	#f)))
    (else
     (print "BAD DATA? controldat=" controldat " data=" data)
     #f)));; handles the incoming messages and dispatches to queues








|
<




>
|
















|
>







561
562
563
564
565
566
567
568

569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
       ((db-owner) ;; given a db name who do I send my queries to
	;; look up the file in handlers, if have an entry ping them to be sure
	;; they are still alive and then return that host:port.
	;; if no handler found or if the ping fails pick from peers the oldest that
	;; is managing the fewest dbs
	(match params
	  ((dbfile dbtype)
	   (let* ((owner-host-port (hash-table-ref/default (udat-dbowners udata) dbfile #f)))

	     (if owner-host-port
		 (conc qrykey " " owner-host-port)
		 (let* ((pdat (or (hash-table-ref/default (udat-peers udata) host-port #f) ;; no owner - caller gets to own it!
				  (make-peer addr-port: host-port pid: pid dbs: `(,dbfile)))))
		   (hash-table-set! (udat-peers udata) host-port pdat)
		   (hash-table-set! (udat-dbowners udata) dbfile host-port)
		   (conc qrykey " " host-port)))))
	  (else (conc qrykey " BADDATA"))))
       ;; for work items:
       ;;    handler is one of; immediate, read-only, read-write, high-priority
       ((immediate read-only normal low-priority) ;; do this work immediately
	;; host-port (caller), pid (caller), qrykey (cookie), params <= all from first line
	;; data => a single line encoded however you want, or should I build json into it?
	(let* ((pdat (get-peer-dat udata host-port)))
	  (match params ;; dbfile prockey procparam
	    ((dbfile prockey procparam)
	     (case (string->symbol handlerkey)
	       ((immediate read-only)
		(process-request udata pdat dbfile qrykey prockey procparam data))
	       ((normal low-priority) ;; split off later and add logic to support low priority
		(add-to-work-queue udata pdat dbfile qrykey prockey procparam data))
	       (else
		#f)))
	    (else (print "ERROR: params=" params)))))
       (else
	;; (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data)
	#f)))
    (else
     (print "BAD DATA? controldat=" controldat " data=" data)
     #f)));; handles the incoming messages and dispatches to queues