Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -28,11 +28,12 @@ (module rmtmod * (import scheme chicken data-structures extras) -(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable) +(import (prefix sqlite3 sqlite3:) posix typed-records srfi-18 srfi-69 format ports srfi-1 matchable + s11n) (import (prefix ulex ulex:)) (import commonmod) (import itemsmod) @@ -87,10 +88,11 @@ (let* ((udata (ulex:setup))) ;; establish connection to ulex (alldat-ulexdat-set! alldat udata) ;; register all needed procs (ulex:register-handler udata 'ping common:get-full-version) ;; override ping with get-full-version (ulex:register-handler udata 'login common:get-full-version) ;; force setup of the connection + (ulex:register-handler udata 'execute api:execute-requests) udata)) ;; set up a connection to the current owner of the dbfile associated with rid ;; then send the query to that dbfile owner and wait for a response. ;; @@ -101,12 +103,19 @@ 'main 'runs)) (dbfname (if (eq? dbtype 'main) "main.db" (conc rid ".db"))) (dbfile (conc areapath "/.db/" dbfname)) - (ulexconn (rmt:connect alldat dbfname dbtype))) - (rmt:open-qry-close-locally cmd 0 params))) + (udata (alldat-ulexdat alldat)) + (ulexconn (rmt:connect alldat dbfname dbtype))) ;; ulexconn is our new *runremote*, it is a dbowner struct < pdat lastrefresh > + ;; need to call this on the other side + ;; (api:execute-requests dbstruct-local (vector (symbol->string cmd) params)))) + (with-input-from-string + (ulex:remote-request udata ulexconn 'immediate dbfile 'execute rid (with-output-to-string (lambda ()(serialize params)))) + (lambda ()(deserialize))))) + +#;(rmt:open-qry-close-locally cmd 0 params) ;; ;; ;; #;(common:telemetry-log (conc "rmt:"(->string cmd)) ;; ;; #;(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) ;; start attemptnum at 1 so the modulo below works as expected ;; ;; @@ -239,47 +248,47 @@ ;; (else (extras-case-11 *default-log-port* runremote cmd params attemptnum rid))))) ;; bunch of small functions factored out of send-receive to make debug easier ;; -#;(define (extras-case-11 *default-log-port* runremote cmd params attemptnum rid) - ;; (mutex-unlock! *rmt-mutex*) - (debug:print-info 12 *default-log-port* "rmt:send-receive, case 9") - ;; (mutex-lock! *rmt-mutex*) - (let* ((conninfo (remote-conndat runremote)) - (dat (case (remote-transport runremote) - ((http) (condition-case ;; handling here has - ;; caused a lot of - ;; problems. However it - ;; is needed to deal with - ;; attemtped - ;; communication to - ;; servers that have gone - ;; away - #;(http-transport:client-api-send-receive 0 conninfo cmd params) - ((commfail)(vector #f "communications fail")) - ((exn)(vector #f "other fail" (print-call-chain))))) - (else - (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport runremote) " not supported") - (exit)))) - (success (if (vector? dat) (vector-ref dat 0) #f)) - (res (if (vector? dat) (vector-ref dat 1) #f))) - (if (and (vector? conninfo) (< 5 (vector-length conninfo))) - #t #;(http-transport:server-dat-update-last-access conninfo) ;; refresh access time - (begin - (debug:print 0 *default-log-port* "INFO: Should not get here! conninfo=" conninfo) - (set! conninfo #f) - (remote-conndat-set! *runremote* #f) ;; NOTE: *runremote* is global copy of runremote. Purpose: factor out global. - #;(http-transport:close-connections area-dat: runremote))) - (debug:print-info 13 *default-log-port* "rmt:send-receive, case 9. conninfo=" conninfo " dat=" dat " runremote = " runremote) - (mutex-unlock! *rmt-mutex*) - (if success ;; success only tells us that the transport was - ;; successful, have to examine the data to see if - ;; there was a detected issue at the other end - (extras-transport-succeded *default-log-port* *rmt-mutex* attemptnum runremote res params rid cmd) - (extras-transport-failed *default-log-port* *rmt-mutex* attemptnum runremote cmd rid params) - ))) +;;(define (extras-case-11 *default-log-port* runremote cmd params attemptnum rid) +;; ;; (mutex-unlock! *rmt-mutex*) +;; (debug:print-info 12 *default-log-port* "rmt:send-receive, case 9") +;; ;; (mutex-lock! *rmt-mutex*) +;; (let* ((conninfo (remote-conndat runremote)) +;; (dat (case (remote-transport runremote) +;; ((http) (condition-case ;; handling here has +;; ;; caused a lot of +;; ;; problems. However it +;; ;; is needed to deal with +;; ;; attemtped +;; ;; communication to +;; ;; servers that have gone +;; ;; away +;; #;(http-transport:client-api-send-receive 0 conninfo cmd params) +;; ((commfail)(vector #f "communications fail")) +;; ((exn)(vector #f "other fail" (print-call-chain))))) +;; (else +;; (debug:print 0 *default-log-port* "ERROR: transport " (remote-transport runremote) " not supported") +;; (exit)))) +;; (success (if (vector? dat) (vector-ref dat 0) #f)) +;; (res (if (vector? dat) (vector-ref dat 1) #f))) +;; (if (and (vector? conninfo) (< 5 (vector-length conninfo))) +;; #t #;(http-transport:server-dat-update-last-access conninfo) ;; refresh access time +;; (begin +;; (debug:print 0 *default-log-port* "INFO: Should not get here! conninfo=" conninfo) +;; (set! conninfo #f) +;; (remote-conndat-set! *runremote* #f) ;; NOTE: *runremote* is global copy of runremote. Purpose: factor out global. +;; #;(http-transport:close-connections area-dat: runremote))) +;; (debug:print-info 13 *default-log-port* "rmt:send-receive, case 9. conninfo=" conninfo " dat=" dat " runremote = " runremote) +;; (mutex-unlock! *rmt-mutex*) +;; (if success ;; success only tells us that the transport was +;; ;; successful, have to examine the data to see if +;; ;; there was a detected issue at the other end +;; (extras-transport-succeded *default-log-port* *rmt-mutex* attemptnum runremote res params rid cmd) +;; (extras-transport-failed *default-log-port* *rmt-mutex* attemptnum runremote cmd rid params) +;; ))) ;; (define (rmt:update-db-stats run-id rawcmd params duration) ;; (mutex-lock! *db-stats-mutex*) ;; (handle-exceptions ;; exn Index: ulex/ulex.scm ================================================================== --- ulex/ulex.scm +++ ulex/ulex.scm @@ -1,6 +1,6 @@ -;;; ulex: Distributed sqlite3 db +;; ulex: Distributed sqlite3 db ;;; ;; Copyright (C) 2018 Matt Welland ;; Redistribution and use in source and binary forms, with or without ;; modification, is permitted. ;; @@ -79,12 +79,24 @@ (begin (setup-as-captain udata) ;; this saves the thread to captain-thread and starts the thread (setup))))) ;; connect to a specific dbfile +;; - if already connected - return the pdat +;; - ask the captain who to talk to for this db +;; - put the entry in the dbowners hash +;; (define (connect udata dbfname dbtype) - udata) + (or (hash-table-ref/default (udat-dbowners udata) dbfname #f) + (let-values (((success dbowner-host-port)(get-db-owner udata dbfname dbtype))) + (if success + (let* ((pdat (udat-get-peer udata dbowner-host-port)) + (dbowner (make-dbowner pdat: pdat))) + ;; just clobber the record, this is the new data no matter what + (hash-table-set! (udat-dbowners udata) dbowner-host-port dbowner) + dbowner) + #f)))) ;; returns: success pingtime ;; ;; NOTE: causes the callee to store the info on this host along with the dbs this host currently owns ;; @@ -112,10 +124,57 @@ (define (goodbye-captain udata) (let* ((host-port (udat-captain-host-port udata))) (if host-port (goodbye-ping udata host-port) (values #f -1)))) + +(define (get-db-owner udata dbname dbtype) + (let* ((host-port (udat-captain-host-port udata))) + (if host-port + (let* ((cookie (make-cookie udata)) + (msg #f) ;; (conc dbname " " dbtype)) + (params `(,dbname ,dbtype)) + (res (send udata host-port 'db-owner cookie msg params: params retval: #t))) + (match (string-split res) + ((retcookie owner-host-port) + (values (equal? retcookie cookie) owner-host-port)))) + (values #f -1)))) + +;; called in ulex-handler to dispatch work, called on the workers side +;; calls (proc params data) +;; returns result with cookie +;; +;; pdat is the info of the caller, used to send the result data +;; prockey is key into udat-handlers hash dereferencing a proc +;; procparam is a first param handed to proc - often to do further derefrencing +;; NOTE: params is intended to be a list of strings, encoding on data +;; is up to the user but data must be a single line +;; +(define (process-request udata pdat dbname cookie prockey procparam data) + (let* ((dbrec (ulex-open-db udata dbname)) ;; this will be a dbconn record, looks for in udata first + (proc (hash-table-ref udata prockey))) + (let* ((result (proc dbrec procparam data))) + result))) + +;; remote-request - send to remote to process in process-request +;; uconn comes from a call to connect and can be used instead of calling connect again +;; uconn is somewhat redundant with dbname but it tells us what host-port to call +;; uconn is a dbowner struct < pdat lastupdate > +;; we send dbname to the worker so they know which file to open +;; data must be a string with no newlines, it will be handed to the proc +;; at the remote site unchanged. It is up to the user to encode/decode it's contents +;; +;; rtype: immediate, read-only, normal, low-priority +;; +(define (remote-request udata uconn rtype dbname prockey procparam data) + (let* ((cookie (make-cookie)) + (pdat (dbowner-pdat uconn)) + (host-port (peer-addr-port pdat))) + (send-receive udata host-port rtype cookie data `(,prockey procparam)))) + +(define (ulex-open-db udata dbname) + #f) ;;====================================================================== ;; network utilities ;;====================================================================== @@ -199,11 +258,11 @@ (mboxes (make-hash-table)) ;; key => mbox ;; other servers (peers (make-hash-table)) ;; host-port => peer record (dbowners (make-hash-table)) ;; dbfile => host-port (handlers (make-hash-table)) ;; dbfile => proc - (outgoing-conns (make-hash-table)) ;; host:port -> conn + ;; (outgoing-conns (make-hash-table)) ;; host:port -> conn (work-queue (make-queue)) ;; most stuff goes here ;; (fast-queue (make-queue)) ;; super quick stuff goes here (e.g. ping) (busy #f) ;; is either of the queues busy, use to switch between queuing tasks or doing immediately ;; app info (appname #f) @@ -241,10 +300,14 @@ (handlerkey #f) (qrykey #f) (data #f) (start (current-milliseconds))) +(defstruct dbowner + (pdat #f) + (last-update (current-seconds))) + ;;====================================================================== ;; Captain functions ;;====================================================================== ;; NB// This needs to be started in a thread @@ -255,16 +318,24 @@ ;; - start server port handler ;; (define (setup-as-captain udata) (if (start-server-find-port udata) ;; puts the server in udata (if (create-captain-pkt udata) - (let* ((th (make-thread (lambda () + (let* ((my-addr (udat-my-address udata)) + (my-port (udat-my-port udata)) + (th (make-thread (lambda () (ulex-handler-loop udata)) "Captain handler"))) (udat-handler-thread-set! udata th) + (udat-captain-address-set! udata my-addr) + (udat-captain-port-set! udata my-port) (thread-start! th)) - #f) - #f)) + (begin + (print "ERROR: failed to create captain pkt") + #f)) + (begin + (print "ERROR: failed to start server.") + #f))) ;; given a pkts dir read ;; (define (get-all-captain-pkts udata) (let* ((pktsdir (let ((d (udat-cpkts-dir udata))) @@ -392,47 +463,50 @@ ;; ;; retval tells send to expect and wait for return data (one line) and return it or time out ;; this is for ping where we don't want to necessarily have set up our own server yet. ;; (define (send udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())(retval #f)) - (handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC - exn - #f - (let-values (((inp oup)(tcp-connect host-port))) - ;; - ;; CONTROL LINE: - ;; handlerkey host:port pid qrykey params ... - ;; - (let ((res - (if (and inp oup) - (let* ((myhost (udat-my-address udata)) - (myport (udat-my-port udata)) - (dat (conc - handler " " - (udat-my-address udata) ":" (udat-my-port udata) " " - ;; (udat-my-hostname udata) " " - (udat-my-pid udata) " " - qrykey - (if (null? params) "" (conc " " (string-intersperse params " ")))))) - (if (and myhost myport) - (begin - (write-line dat oup) - (write-line data oup) - ;; (print "Sent dat: " dat " data: " data) - (if retval - (read-line inp) - #t)) - (begin - (print "ERROR: send called but no receiver has been setup. Please call setup first!") - #f)) - ;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE! - ;; (there is a listener for handling that) - ) - #f))) ;; #f means failed to connect and send - (close-input-port inp) - (close-output-port oup) - res)))) + (let* ((my-host-port (udat-my-host-port udata)) + (isme (equal? host-port my-host-port)) ;; am I calling myself? + (dat (conc + handler " " + my-host-port " " + (udat-my-pid udata) " " + qrykey + (if (null? params) "" (conc " " (string-intersperse params " ")))))) + ;; (print "send isme is " (if isme "true!" "false!") ", my-host-port: " my-host-port ", host-port: " host-port) + (if isme + (ulex-handler udata dat data) + (handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC + exn + #f + (let-values (((inp oup)(tcp-connect host-port))) + ;; + ;; CONTROL LINE: + ;; handlerkey host:port pid qrykey params ... + ;; + (let ((res + (if (and inp oup) + (let* () + (if my-host-port + (begin + (write-line dat oup) + (write-line data oup) + ;; (print "Sent dat: " dat " data: " data) + (if retval + (read-line inp) + #t)) + (begin + (print "ERROR: send called but no receiver has been setup. Please call setup first!") + #f)) + ;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE! + ;; (there is a listener for handling that) + ) + #f))) ;; #f means failed to connect and send + (close-input-port inp) + (close-output-port oup) + res)))))) ;; send a request to the given host-port and register a mailbox in udata ;; wait for the mailbox data and return it ;; (define (send-receive udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())(timeout 20)) @@ -451,14 +525,14 @@ res)) #f))) ;; #f means failed to communicate ;; (define (ulex-handler udata controldat data) - (print "controldat: " controldat " data: " data) + ;; (print "controldat: " controldat " data: " data) (match (string-split controldat) ((handlerkey host-port pid qrykey params ...) - (print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params) + ;; (print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params) (case (string->symbol handlerkey) ((ack)(print "Got ack!")) ((ping) ;; special case - return result immediately on the same connection (let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f)) (val (if proc (proc) "gotping")) @@ -486,18 +560,43 @@ (udat-captain-port-set! udata #f) (udat-captain-pid-set! udata #f) qrykey) ((rucaptain) ;; remote is asking if I'm the captain (if (udat-my-cpkt-key udata) "yes" "no")) - ((whoowns) ;; given a db name who do I send my queries to + ((db-owner) ;; given a db name who do I send my queries to ;; look up the file in handlers, if have an entry ping them to be sure ;; they are still alive and then return that host:port. ;; if no handler found or if the ping fails pick from peers the oldest that ;; is managing the fewest dbs - #f) + (match params + ((dbfile dbtype) + (let* ((curr-owner (hash-table-ref/default (udat-dbowners udata) dbfile #f)) + (owner-host-port (and curr-owner (peer-addr-port curr-owner)))) + (if owner-host-port + (conc qrykey " " owner-host-port) + (let* ((pdat (or (hash-table-ref/default (udat-peers udata) host-port #f) ;; no owner - caller gets to own it! + (make-peer addr-port: host-port pid: pid dbs: `(,dbfile))))) + (hash-table-set! (udat-dbowners udata) dbfile pdat) + (conc qrykey " " host-port))))) + (else (conc qrykey " BADDATA")))) + ;; for work items: + ;; handler is one of; immediate, read-only, read-write, high-priority + ((immediate read-only normal low-priority) ;; do this work immediately + ;; host-port (caller), pid (caller), qrykey (cookie), params <= all from first line + ;; data => a single line encoded however you want, or should I build json into it? + (let* ((pdat (get-peer-dat udata host-port))) + (match params ;; dbfile prockey procparam + ((dbfile prockey procparam) + (case (string->symbol handlerkey) + ((immediate read-only) + (process-request udata pdat dbfile qrykey prockey procparam data)) + ((normal low-priority) ;; split off later and add logic to support low priority + (add-to-work-queue udata pdat dbfile qrykey prockey procparam data)) + (else + #f)))))) (else - (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data) + ;; (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data) #f))) (else (print "BAD DATA? controldat=" controldat " data=" data) #f)));; handles the incoming messages and dispatches to queues @@ -515,11 +614,13 @@ (if resp (write-line resp oup)) (close-input-port inp) (close-output-port oup)) (loop state))))) -;; add a proc to the handler list +;; add a proc to the handler list, these are done symetrically (i.e. in all instances) +;; so that the proc can be dereferenced remotely +;; (define (register-handler udata key proc) (hash-table-set! (udat-handlers udata) key proc)) ;;====================================================================== @@ -554,10 +655,11 @@ ;; put sync-proc, init-proc, on-disk handle, inmem handle in dbconn stuct ;; return the stuct ;;====================================================================== (defstruct dbconn + (fname #f) (inmem #f) (conn #f) (sync #f) ;; sync proc (init #f) ;; init proc (lastsync (current-seconds))