Megatest

Check-in [a2267e910d]
Login
Overview
Comment:Refactored handler loop to facilitate calling locally
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v1.70-captain-ulex | v1.70-defunct-try
Files: files | file ages | folders
SHA1: a2267e910db658798619a48f050321a437bb7ccc
User & Date: matt on 2020-01-20 11:20:44
Other Links: branch diff | manifest | tags
Context
2020-01-20
22:34
Scaffolding for send-receive check-in: 3f613cadf2 user: matt tags: v1.70-captain-ulex, v1.70-defunct-try
11:20
Refactored handler loop to facilitate calling locally check-in: a2267e910d user: matt tags: v1.70-captain-ulex, v1.70-defunct-try
03:34
Added drop captain send to all peers check-in: d20ee11c75 user: matt tags: v1.70-captain-ulex, v1.70-defunct-try
Changes

Modified ulex/ulex.scm from [e30b68cf23] to [48c02e2743].

254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
;;   - create pkt
;;   - start server port handler
;;
(define (setup-as-captain udata)
  (if (start-server-find-port udata) ;; puts the server in udata
      (if (create-captain-pkt udata)
	  (let* ((th (make-thread (lambda ()
				    (ulex-handler udata)) "Captain handler")))
	    (udat-handler-thread-set! udata th)
	    (thread-start! th))
	  #f)
      #f))

;; given a pkts dir read 
;;







|







254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
;;   - create pkt
;;   - start server port handler
;;
(define (setup-as-captain udata)
  (if (start-server-find-port udata) ;; puts the server in udata
      (if (create-captain-pkt udata)
	  (let* ((th (make-thread (lambda ()
				    (ulex-handler-loop udata)) "Captain handler")))
	    (udat-handler-thread-set! udata th)
	    (thread-start! th))
	  #f)
      #f))

;; given a pkts dir read 
;;
447
448
449
450
451
452
453
















































454

455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
	       (mbox-receive-time    (current-milliseconds)))
	  (hash-table-delete! mboxes qrykey)
	  (if (eq? res 'MBOX_TIMEOUT)
	      #f
	      res))
	#f))) ;; #f means failed to communicate

















































;; handles the incoming messages and dispatches to queues

;;
(define (ulex-handler udata)
  (let* ((serv-listener (udat-serv-listener udata)))
    (print "serv-listner: " serv-listener)
    ;; data comes as two lines
    ;;   handlerkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db]
    ;;   data
    (let loop ((state 'start))
      (let-values (((inp oup)(tcp-accept serv-listener)))
	(print "got here: inp=" inp " oup=" oup)
	(let* ((controldat (read-line inp))
	       (data       (read-line inp)))
	  (print "controldat: " controldat " data: " data)
	  (match (string-split controldat)
		 ((handlerkey host-port pid qrykey params ...)
		  (print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params)
		  (case (string->symbol handlerkey)
		    ((ack)(print "Got ack!"))
		    ((ping) ;; special case - return result immediately on the same connection
		     (let* ((proc  (hash-table-ref/default (udat-handlers udata) 'ping #f))
			    (val   (if proc (proc) "gotping"))
			    (peer  (make-peer addr-port: host-port pid: pid))
			    (dbshash (udat-dbowners udata)))
		       (peer-dbs-set! peer params) ;; params for ping is list of dbs owned by pinger
		       (for-each (lambda (dbfile)
				   (hash-table-set! dbshash dbfile host-port))
				 params) ;; register each db in the dbshash
		       (if (not (hash-table-exists? (udat-peers udata) host-port))
			   (hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers
		       (write-line qrykey oup)
		       (close-input-port inp)
		       (close-output-port oup))) ;; End of ping
		    ((goodbye)
		     ;; remove all traces of the caller in db ownership etc.
		     (let* ((peer  (hash-table-ref/default (udat-peers udata) host-port #f))
			    (dbs   (if peer (peer-dbs peer) '()))
			    (dbshash (udat-dbowners udata)))
		       (for-each (lambda (dbfile)(hash-table-delete! dbshash dbfile)) dbs)
		       (hash-table-delete! (udat-peers udata) host-port)
		       (write-line qrykey oup)
		       (close-input-port inp)
		       (close-output-port oup)))
		    ((dropcaptain)
		     ;; remove all traces of the captain
		     (udat-captain-address-set! udata #f)
		     (udat-captain-host-set!    udata #f)
		     (udat-captain-port-set!    udata #f)
		     (udat-captain-pid-set!     udata #f)
		     (write-line qrykey oup)
		     (close-input-port inp)
		     (close-output-port oup))
		    ((rucaptain) ;; remote is asking if I'm the captain
		     (write-line (if (udat-my-cpkt-key udata) "yes" "no"))
		     (close-input-port inp)
		     (close-output-port oup))
		    ((whoowns) ;; given a db name who do I send my queries to
		     ;; look up the file in handlers, if have an entry ping them to be sure
		     ;; they are still alive and then return that host:port.
		     ;; if no handler found or if the ping fails pick from peers the oldest that
		     ;; is managing the fewest dbs
		     #f)
		    (else
		     (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data))))
		 (else (print "BAD DATA? controldat=" controldat " data=" data)))))
	(loop state))))

;; add a proc to the handler list
(define (register-handler udata key proc)
  (hash-table-set! (udat-handlers udata) key proc))


;;======================================================================







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>

|

<





<

|
<
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
<
<
<
<
<
<
<
<
<
<
<
<
<
|







447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506

507
508
509
510
511

512
513

514















515
516



















517













518
519
520
521
522
523
524
525
	       (mbox-receive-time    (current-milliseconds)))
	  (hash-table-delete! mboxes qrykey)
	  (if (eq? res 'MBOX_TIMEOUT)
	      #f
	      res))
	#f))) ;; #f means failed to communicate

;; 
(define (ulex-handler udata controldat data)
  (print "controldat: " controldat " data: " data)
  (match (string-split controldat)
    ((handlerkey host-port pid qrykey params ...)
     (print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params)
     (case (string->symbol handlerkey)
       ((ack)(print "Got ack!"))
       ((ping) ;; special case - return result immediately on the same connection
	(let* ((proc  (hash-table-ref/default (udat-handlers udata) 'ping #f))
	       (val   (if proc (proc) "gotping"))
	       (peer  (make-peer addr-port: host-port pid: pid))
	       (dbshash (udat-dbowners udata)))
	  (peer-dbs-set! peer params) ;; params for ping is list of dbs owned by pinger
	  (for-each (lambda (dbfile)
		      (hash-table-set! dbshash dbfile host-port))
		    params) ;; register each db in the dbshash
	  (if (not (hash-table-exists? (udat-peers udata) host-port))
	      (hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers
	  qrykey)) ;; End of ping
       ((goodbye)
	;; remove all traces of the caller in db ownership etc.
	(let* ((peer  (hash-table-ref/default (udat-peers udata) host-port #f))
	       (dbs   (if peer (peer-dbs peer) '()))
	       (dbshash (udat-dbowners udata)))
	  (for-each (lambda (dbfile)(hash-table-delete! dbshash dbfile)) dbs)
	  (hash-table-delete! (udat-peers udata) host-port)
	  qrykey))
       ((dropcaptain)
	;; remove all traces of the captain
	(udat-captain-address-set! udata #f)
	(udat-captain-host-set!    udata #f)
	(udat-captain-port-set!    udata #f)
	(udat-captain-pid-set!     udata #f)
	qrykey)
       ((rucaptain) ;; remote is asking if I'm the captain
	(if (udat-my-cpkt-key udata) "yes" "no"))
       ((whoowns) ;; given a db name who do I send my queries to
	;; look up the file in handlers, if have an entry ping them to be sure
	;; they are still alive and then return that host:port.
	;; if no handler found or if the ping fails pick from peers the oldest that
	;; is managing the fewest dbs
	#f)
       (else
	(add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data)
	#f)))
    (else
     (print "BAD DATA? controldat=" controldat " data=" data)
     #f)));; handles the incoming messages and dispatches to queues

;;
(define (ulex-handler-loop udata)
  (let* ((serv-listener (udat-serv-listener udata)))

    ;; data comes as two lines
    ;;   handlerkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db]
    ;;   data
    (let loop ((state 'start))
      (let-values (((inp oup)(tcp-accept serv-listener)))

	(let* ((controldat (read-line inp))
	       (data       (read-line inp))

	       (resp       (ulex-handler udata controldat data)))















	  (if resp (write-line resp oup))
	  (close-input-port inp)



















	  (close-output-port oup))













	(loop state)))))

;; add a proc to the handler list
(define (register-handler udata key proc)
  (hash-table-set! (udat-handlers udata) key proc))


;;======================================================================