Megatest

Diff
Login

Differences From Artifact [e30b68cf23]:

To Artifact [48c02e2743]:


254
255
256
257
258
259
260
261

262
263
264
265
266
267
268
254
255
256
257
258
259
260

261
262
263
264
265
266
267
268







-
+







;;   - create pkt
;;   - start server port handler
;;
(define (setup-as-captain udata)
  (if (start-server-find-port udata) ;; puts the server in udata
      (if (create-captain-pkt udata)
	  (let* ((th (make-thread (lambda ()
				    (ulex-handler udata)) "Captain handler")))
				    (ulex-handler-loop udata)) "Captain handler")))
	    (udat-handler-thread-set! udata th)
	    (thread-start! th))
	  #f)
      #f))

;; given a pkts dir read 
;;
447
448
449
450
451
452
453
















































454


455
456

457
458
459
460
461
462
463
464
465
466

467
468

469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485


486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505

506
507
508
509
510
511
512
513
514
515
516
517
518
519

520
521
522
523
524
525
526
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501

502
503
504

505
506

507
508
509
510
511

512

513


514

















515
516




















517














518
519
520
521
522
523
524
525







+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
+

-
+

-





-

-
+
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+







	       (mbox-receive-time    (current-milliseconds)))
	  (hash-table-delete! mboxes qrykey)
	  (if (eq? res 'MBOX_TIMEOUT)
	      #f
	      res))
	#f))) ;; #f means failed to communicate

;; 
(define (ulex-handler udata controldat data)
  (print "controldat: " controldat " data: " data)
  (match (string-split controldat)
    ((handlerkey host-port pid qrykey params ...)
     (print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params)
     (case (string->symbol handlerkey)
       ((ack)(print "Got ack!"))
       ((ping) ;; special case - return result immediately on the same connection
	(let* ((proc  (hash-table-ref/default (udat-handlers udata) 'ping #f))
	       (val   (if proc (proc) "gotping"))
	       (peer  (make-peer addr-port: host-port pid: pid))
	       (dbshash (udat-dbowners udata)))
	  (peer-dbs-set! peer params) ;; params for ping is list of dbs owned by pinger
	  (for-each (lambda (dbfile)
		      (hash-table-set! dbshash dbfile host-port))
		    params) ;; register each db in the dbshash
	  (if (not (hash-table-exists? (udat-peers udata) host-port))
	      (hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers
	  qrykey)) ;; End of ping
       ((goodbye)
	;; remove all traces of the caller in db ownership etc.
	(let* ((peer  (hash-table-ref/default (udat-peers udata) host-port #f))
	       (dbs   (if peer (peer-dbs peer) '()))
	       (dbshash (udat-dbowners udata)))
	  (for-each (lambda (dbfile)(hash-table-delete! dbshash dbfile)) dbs)
	  (hash-table-delete! (udat-peers udata) host-port)
	  qrykey))
       ((dropcaptain)
	;; remove all traces of the captain
	(udat-captain-address-set! udata #f)
	(udat-captain-host-set!    udata #f)
	(udat-captain-port-set!    udata #f)
	(udat-captain-pid-set!     udata #f)
	qrykey)
       ((rucaptain) ;; remote is asking if I'm the captain
	(if (udat-my-cpkt-key udata) "yes" "no"))
       ((whoowns) ;; given a db name who do I send my queries to
	;; look up the file in handlers, if have an entry ping them to be sure
	;; they are still alive and then return that host:port.
	;; if no handler found or if the ping fails pick from peers the oldest that
	;; is managing the fewest dbs
	#f)
       (else
	(add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data)
	#f)))
    (else
     (print "BAD DATA? controldat=" controldat " data=" data)
;; handles the incoming messages and dispatches to queues
     #f)));; handles the incoming messages and dispatches to queues

;;
(define (ulex-handler udata)
(define (ulex-handler-loop udata)
  (let* ((serv-listener (udat-serv-listener udata)))
    (print "serv-listner: " serv-listener)
    ;; data comes as two lines
    ;;   handlerkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db]
    ;;   data
    (let loop ((state 'start))
      (let-values (((inp oup)(tcp-accept serv-listener)))
	(print "got here: inp=" inp " oup=" oup)
	(let* ((controldat (read-line inp))
	       (data       (read-line inp)))
	       (data       (read-line inp))
	  (print "controldat: " controldat " data: " data)
	  (match (string-split controldat)
	       (resp       (ulex-handler udata controldat data)))
		 ((handlerkey host-port pid qrykey params ...)
		  (print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params)
		  (case (string->symbol handlerkey)
		    ((ack)(print "Got ack!"))
		    ((ping) ;; special case - return result immediately on the same connection
		     (let* ((proc  (hash-table-ref/default (udat-handlers udata) 'ping #f))
			    (val   (if proc (proc) "gotping"))
			    (peer  (make-peer addr-port: host-port pid: pid))
			    (dbshash (udat-dbowners udata)))
		       (peer-dbs-set! peer params) ;; params for ping is list of dbs owned by pinger
		       (for-each (lambda (dbfile)
				   (hash-table-set! dbshash dbfile host-port))
				 params) ;; register each db in the dbshash
		       (if (not (hash-table-exists? (udat-peers udata) host-port))
			   (hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers
		       (write-line qrykey oup)
		       (close-input-port inp)
	  (if resp (write-line resp oup))
	  (close-input-port inp)
		       (close-output-port oup))) ;; End of ping
		    ((goodbye)
		     ;; remove all traces of the caller in db ownership etc.
		     (let* ((peer  (hash-table-ref/default (udat-peers udata) host-port #f))
			    (dbs   (if peer (peer-dbs peer) '()))
			    (dbshash (udat-dbowners udata)))
		       (for-each (lambda (dbfile)(hash-table-delete! dbshash dbfile)) dbs)
		       (hash-table-delete! (udat-peers udata) host-port)
		       (write-line qrykey oup)
		       (close-input-port inp)
		       (close-output-port oup)))
		    ((dropcaptain)
		     ;; remove all traces of the captain
		     (udat-captain-address-set! udata #f)
		     (udat-captain-host-set!    udata #f)
		     (udat-captain-port-set!    udata #f)
		     (udat-captain-pid-set!     udata #f)
		     (write-line qrykey oup)
		     (close-input-port inp)
		     (close-output-port oup))
	  (close-output-port oup))
		    ((rucaptain) ;; remote is asking if I'm the captain
		     (write-line (if (udat-my-cpkt-key udata) "yes" "no"))
		     (close-input-port inp)
		     (close-output-port oup))
		    ((whoowns) ;; given a db name who do I send my queries to
		     ;; look up the file in handlers, if have an entry ping them to be sure
		     ;; they are still alive and then return that host:port.
		     ;; if no handler found or if the ping fails pick from peers the oldest that
		     ;; is managing the fewest dbs
		     #f)
		    (else
		     (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data))))
		 (else (print "BAD DATA? controldat=" controldat " data=" data)))))
	(loop state))))
	(loop state)))))

;; add a proc to the handler list
(define (register-handler udata key proc)
  (hash-table-set! (udat-handlers udata) key proc))


;;======================================================================