ADDED ulex/ulex-prev-orig.scm Index: ulex/ulex-prev-orig.scm ================================================================== --- /dev/null +++ ulex/ulex-prev-orig.scm @@ -0,0 +1,2252 @@ +;; ulex: Distributed sqlite3 db +;;; +;; Copyright (C) 2018 Matt Welland +;; Redistribution and use in source and binary forms, with or without +;; modification, is permitted. +;; +;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS +;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE +;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT +;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +;; DAMAGE. + +;;====================================================================== +;; ABOUT: +;; See README in the distribution at https://www.kiatoa.com/fossils/ulex +;; NOTES: +;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity. +;; +;;====================================================================== + +(use mailbox) + +(module ulex + * + +(import scheme posix chicken data-structures ports extras files mailbox) +(import srfi-18 pkts matchable regex + typed-records srfi-69 srfi-1 + srfi-4 regex-case + (prefix sqlite3 sqlite3:) + foreign + tcp6 + ;; ulex-netutil + hostinfo + ) + +;; make it a global? Well, it is local to area module + +(define *captain-pktspec* + `((captain (host . h) + (port . p) + (pid . i) + (ipaddr . a) + ) + #;(data (hostname . h) ;; sender hostname + (port . p) ;; sender port + (ipaddr . a) ;; sender ip + (hostkey . k) ;; sending host key - store info at server under this key + (servkey . s) ;; server key - this needs to match at server end or reject the msg + (format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json + (data . d) ;; base64 encoded slln data + ))) + +;; struct for keeping track of our world + +(defstruct udat + ;; captain info + (captain-address #f) + (captain-host #f) + (captain-port #f) + (captain-pid #f) + (captain-lease 0) ;; time (unix epoc) seconds when the lease is up + (ulex-dir (conc (get-environment-variable "HOME") "/.ulex")) + (cpkts-dir (conc (get-environment-variable "HOME") "/.ulex/pkts")) + (cpkt-spec *captain-pktspec*) + ;; this processes info + (my-cpkt-key #f) ;; put Z card here when I create a pkt for myself as captain + (my-address #f) + (my-hostname #f) + (my-port #f) + (my-pid (current-process-id)) + (my-dbs '()) + ;; server and handler thread + (serv-listener #f) ;; this processes server info + (handler-thread #f) + (mboxes (make-hash-table)) ;; key => mbox + ;; other servers + (peers (make-hash-table)) ;; host-port => peer record + (dbowners (make-hash-table)) ;; dbfile => host-port + (handlers (make-hash-table)) ;; dbfile => proc + ;; (outgoing-conns (make-hash-table)) ;; host:port -> conn + (work-queue (make-queue)) ;; most stuff goes here + ;; (fast-queue (make-queue)) ;; super quick stuff goes here (e.g. ping) + (busy #f) ;; is either of the queues busy, use to switch between queuing tasks or doing immediately + ;; app info + (appname #f) + (dbtypes (make-hash-table)) ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ] + ;; cookies + (cnum 0) ;; cookie num + ) + +;;====================================================================== +;; NEW APPROACH +;;====================================================================== + +;; start-server-find-port ;; gotta have a server port ready from the very begining + +;; udata - all the connection info, captain, server, ulex db etc. MUST BE PASSED IN +;; dbpath - full path and filename of the db to talk to or a symbol naming the db? +;; callname - the remote call to execute +;; params - parameters to pass to the remote call +;; +(define (remote-call udata dbpath dbtype callname . params) + (start-server-find-port udata) ;; ensure we have a local server + (find-or-setup-captain udata) + ;; look at connect, process-request, send, send-receive + (let-values (((cookie-key host-port)(get-db-owner udata dbpath dbtype))) + (send-receive udata host-port callname cookie-key params))) + +;;====================================================================== +;; KEY FUNCTIONS - THESE ARE TOO BE EXPOSED AND USED +;;====================================================================== + +;; connection setup and management functions + +;; This is the basic setup command. Must always be +;; called before connecting to a db using connect. +;; +;; find or become the captain +;; setup and return a ulex object +;; +(define (find-or-setup-captain udata) + ;; see if we already have a captain and if the lease is ok + (if (and (udat-captain-address udata) + (udat-captain-port udata) + (< (current-seconds) (udat-captain-lease udata))) + udata + (let* ((cpkts (get-all-captain-pkts udata)) ;; read captain pkts + (captn (get-winning-pkt cpkts))) + (if captn + (let* ((port (alist-ref 'port captn)) + (host (alist-ref 'host captn)) + (ipaddr (alist-ref 'ipaddr captn)) + (pid (alist-ref 'pid captn)) + (Z (alist-ref 'Z captn))) + (udat-captain-address-set! udata ipaddr) + (udat-captain-host-set! udata host) + (udat-captain-port-set! udata port) + (udat-captain-pid-set! udata pid) + (udat-captain-lease-set! udata (+ (current-seconds) 10)) + (let-values (((success pingtime)(ping udata (conc ipaddr ":" port)))) + (if success + udata + (begin + (print "Found unreachable captain at " ipaddr ":" port ", removing pkt") + (remove-captain-pkt udata captn) + (find-or-setup-captain udata)))) + (begin + (setup-as-captain udata) ;; this saves the thread to captain-thread and starts the thread + (find-or-setup-captain udata))))))) + +;; connect to a specific dbfile +;; - if already connected - return the dbowner host-port +;; - ask the captain who to talk to for this db +;; - put the entry in the dbowners hash as dbfile => host-port +;; +(define (connect udata dbfname dbtype) + (or (hash-table-ref/default (udat-dbowners udata) dbfname #f) + (let-values (((success dbowner-host-port)(get-db-owner udata dbfname dbtype))) + (if success + (begin + ;; just clobber the record, this is the new data no matter what + (hash-table-set! (udat-dbowners udata) dbfname dbowner-host-port) + dbowner-host-port) + #f)))) + +;; returns: success pingtime +;; +;; NOTE: causes the callee to store the info on this host along with the dbs this host currently owns +;; +(define (ping udata host-port) + (let* ((start (current-milliseconds)) + (cookie (make-cookie udata)) + (dbs (udat-my-dbs udata)) + (msg (string-intersperse dbs " ")) + (res (send udata host-port 'ping cookie msg retval: #t)) + (delta (- (current-milliseconds) start))) + (values (equal? res cookie) delta))) + +;; returns: success pingtime +;; +;; NOTE: causes all references to this worker to be wiped out in the +;; callee (ususally the captain) +;; +(define (goodbye-ping udata host-port) + (let* ((start (current-milliseconds)) + (cookie (make-cookie udata)) + (dbs (udat-my-dbs udata)) + (res (send udata host-port 'goodbye cookie "nomsg" retval: #t)) + (delta (- (current-milliseconds) start))) + (values (equal? res cookie) delta))) + +(define (goodbye-captain udata) + (let* ((host-port (udat-captain-host-port udata))) + (if host-port + (goodbye-ping udata host-port) + (values #f -1)))) + +(define (get-db-owner udata dbname dbtype) + (let* ((host-port (udat-captain-host-port udata))) + (if host-port + (let* ((cookie (make-cookie udata)) + (msg #f) ;; (conc dbname " " dbtype)) + (params `(,dbname ,dbtype)) + (res (send udata host-port 'db-owner cookie msg + params: params retval: #t))) + (match (string-split res) + ((retcookie owner-host-port) + (values (equal? retcookie cookie) owner-host-port)))) + (values #f -1)))) + +;; called in ulex-handler to dispatch work, called on the workers side +;; calls (proc params data) +;; returns result with cookie +;; +;; pdat is the info of the caller, used to send the result data +;; prockey is key into udat-handlers hash dereferencing a proc +;; procparam is a first param handed to proc - often to do further derefrencing +;; NOTE: params is intended to be a list of strings, encoding on data +;; is up to the user but data must be a single line +;; +(define (process-request udata pdat dbname cookie prockey procparam data) + (let* ((dbrec (ulex-open-db udata dbname)) ;; this will be a dbconn record, looks for in udata first + (proc (hash-table-ref udata prockey))) + (let* ((result (proc dbrec procparam data))) + result))) + +;; remote-request - send to remote to process in process-request +;; uconn comes from a call to connect and can be used instead of calling connect again +;; uconn is the host-port to call +;; we send dbname to the worker so they know which file to open +;; data must be a string with no newlines, it will be handed to the proc +;; at the remote site unchanged. It is up to the user to encode/decode it's contents +;; +;; rtype: immediate, read-only, normal, low-priority +;; +(define (remote-request udata uconn rtype dbname prockey procparam data) + (let* ((cookie (make-cookie udata))) + (send-receive udata uconn rtype cookie data `(,prockey procparam)))) + +(define (ulex-open-db udata dbname) + #f) + + +;;====================================================================== +;; Ulex db +;; +;; - track who is captain, lease expire time +;; - track who owns what db, lease +;; +;;====================================================================== + +;; +;; +(define (ulex-dbfname) + (let ((dbdir (conc (get-environment-variable "HOME") "/.ulex"))) + (if (not (file-exists? dbdir)) + (create-directory dbdir #t)) + (conc dbdir "/network.db"))) + +;; always goes in ~/.ulex/network.db +;; role is captain, adjutant, node +;; +(define (ulexdb-setup) + (let* ((dbfname (ulex-dbfname)) + (have-db (file-exists? dbfname)) + (db (sqlite3:open-database dbfname))) + (sqlite3:set-busy-handler! db (sqlite3:make-busy-timeout 136000)) + (sqlite3:execute db "PRAGMA synchronous = 0;") + (if (not have-db) + (sqlite3:with-transaction + db + (lambda () + (for-each + (lambda (stmt) + (if stmt (sqlite3:execute db stmt))) + `("CREATE TABLE IF NOT EXISTS nodes + (id INTEGER PRIMARY KEY, + role TEXT NOT NULL, + host TEXT NOT NULL, + port TEXT NOT NULL, + ipadr TEXT NOT NULL, + pid INTEGER NOT NULL, + zcard TEXT NOT NULL, + regtime INTEGER DEFAULT (strftime('%s','now')), + lease_thru INTEGER DEFAULT (strftime('%s','now')), + last_update INTEGER DEFAULT (strftime('%s','now')));" + "CREATE TRIGGER IF NOT EXISTS update_nodes_trigger AFTER UPDATE ON nodes + FOR EACH ROW + BEGIN + UPDATE nodes SET last_update=(strftime('%s','now')) + WHERE id=old.id; + END;" + "CREATE TABLE IF NOT EXISTS dbs + (id INTEGER PRIMARY KEY, + dbname TEXT NOT NULL, + dbfile TEXT NOT NULL, + dbtype TEXT NOT NULL, + host_port TEXT NOT NULL, + regtime INTEGER DEFAULT (strftime('%s','now')), + lease_thru INTEGER DEFAULT (strftime('%s','now')), + last_update INTEGER DEFAULT (strftime('%s','now')));" + "CREATE TRIGGER IF NOT EXISTS update_dbs_trigger AFTER UPDATE ON dbs + FOR EACH ROW + BEGIN + UPDATE dbs SET last_update=(strftime('%s','now')) + WHERE id=old.id; + END;"))))) + db)) + +(define (get-host-port-lease db dbfname) + (sqlite3:fold-row + (lambda (rem host-port lease-thru) + (list host-port lease-thru)) + #f db "SELECT host_port,lease_thru FROM dbs WHERE dbfile = ?" dbfname)) + +(define (register-captain db host ipadr port pid zcard #!key (lease 20)) + (let* ((dbfname (ulex-dbfname)) + (host-port (conc host ":" port))) + (sqlite3:with-transaction + db + (lambda () + (match (get-host-port-lease db dbfname) + ((host-port lease-thru) + (if (> (current-seconds) lease-thru) + (begin + (sqlite3:execute db "UPDATE dbs SET host_port=?,lease_thru=? WHERE dbname=?" + (conc host ":" port) + (+ (current-seconds) lease) + dbfname) + #t) + #f)) + (#f (sqlite3:execute db "INSERT INTO dbs (dbname,dbfile,dbtype,host_port,lease_thru) VALUES (?,?,?,?,?)" + "captain" dbfname "captain" host-port (+ (current-seconds) lease))) + (else (print "ERROR: Unrecognised result from fold-row") + (exit 1))))))) + +;;====================================================================== +;; network utilities +;;====================================================================== + +(define (rate-ip ipaddr) + (regex-case ipaddr + ( "^127\\..*" _ 0 ) + ( "^(10\\.0|192\\.168)\\..*" _ 1 ) + ( else 2 ) )) + +;; Change this to bias for addresses with a reasonable broadcast value? +;; +(define (ip-pref-less? a b) + (> (rate-ip a) (rate-ip b))) + + +(define (get-my-best-address) + (let ((all-my-addresses (get-all-ips)) + ;;(all-my-addresses-old (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name))))) + ) + (cond + ((null? all-my-addresses) + (get-host-name)) ;; no interfaces? + ((eq? (length all-my-addresses) 1) + (car all-my-addresses)) ;; only one to choose from, just go with it + + (else + (car (sort all-my-addresses ip-pref-less?))) + ;; (else + ;; (ip->string (car (filter (lambda (x) ;; take any but 127. + ;; (not (eq? (u8vector-ref x 0) 127))) + ;; all-my-addresses)))) + + ))) + +(define (get-all-ips-sorted) + (sort (get-all-ips) ip-pref-less?)) + +(define (get-all-ips) + (map ip->string (vector->list + (hostinfo-addresses + (host-information (current-hostname)))))) + +(define (udat-my-host-port udata) + (if (and (udat-my-address udata)(udat-my-port udata)) + (conc (udat-my-address udata) ":" (udat-my-port udata)) + #f)) + +(define (udat-captain-host-port udata) + (if (and (udat-captain-address udata)(udat-captain-port udata)) + (conc (udat-captain-address udata) ":" (udat-captain-port udata)) + #f)) + +(define (udat-get-peer udata host-port) + (hash-table-ref/default (udat-peers udata) host-port #f)) + +;; struct for keeping track of others we are talking to + +(defstruct peer + (addr-port #f) + (hostname #f) + (pid #f) + ;; (inp #f) + ;; (oup #f) + (dbs '()) ;; list of databases this peer is currently handling + ) + +(defstruct work + (peer-dat #f) + (handlerkey #f) + (qrykey #f) + (data #f) + (start (current-milliseconds))) + +#;(defstruct dbowner + (pdat #f) + (last-update (current-seconds))) + +;;====================================================================== +;; Captain functions +;;====================================================================== + +;; NB// This needs to be started in a thread +;; +;; setup to be a captain +;; - local server MUST be started already +;; - create pkt +;; - start server port handler +;; +(define (setup-as-captain udata) + (if (create-captain-pkt udata) + (let* ((my-addr (udat-my-address udata)) + (my-port (udat-my-port udata)) + (th (make-thread (lambda () + (ulex-handler-loop udata)) "Captain handler"))) + (udat-handler-thread-set! udata th) + (udat-captain-address-set! udata my-addr) + (udat-captain-port-set! udata my-port) + (thread-start! th)) + (begin + (print "ERROR: failed to create captain pkt") + #f))) + +;; given a pkts dir read +;; +(define (get-all-captain-pkts udata) + (let* ((pktsdir (let ((d (udat-cpkts-dir udata))) + (if (file-exists? d) + d + (begin + (create-directory d #t) + d)))) + (all-pkt-files (glob (conc pktsdir "/*.pkt"))) + (pkt-spec (udat-cpkt-spec udata))) + (map (lambda (pkt-file) + (read-pkt->alist pkt-file pktspec: pkt-spec)) + all-pkt-files))) + +;; sort by D then Z, return one, choose the oldest then +;; differentiate if needed using the Z key +;;l +(define (get-winning-pkt pkts) + (if (null? pkts) + #f + (car (sort pkts (lambda (a b) + (let ((ad (string->number (alist-ref 'D a))) + (bd (string->number (alist-ref 'D b)))) + (if (eq? a b) + (let ((az (alist-ref 'Z a)) + (bz (alist-ref 'Z b))) + (string>=? az bz)) + (> ad bd)))))))) + +;; put the host, ip, port and pid into a pkt in +;; the captain pkts dir +;; - assumes user has already fired up a server +;; which will be in the udata struct +;; +(define (create-captain-pkt udata) + (if (not (udat-serv-listener udata)) + (begin + (print "ERROR: create-captain-pkt called with out a listener") + #f) + (let* ((pktdat `((port . ,(udat-my-port udata)) + (host . ,(udat-my-hostname udata)) + (ipaddr . ,(udat-my-address udata)) + (pid . ,(udat-my-pid udata)))) + (pktdir (udat-cpkts-dir udata)) + (pktspec (udat-cpkt-spec udata)) + ) + (udat-my-cpkt-key-set! + udata + (write-alist->pkt + pktdir + pktdat + pktspec: pktspec + ptype: 'captain)) + (udat-my-cpkt-key udata)))) + +;; remove pkt associated with captn (the Z key .pkt) +;; +(define (remove-captain-pkt udata captn) + (let ((Z (alist-ref 'Z captn)) + (cpktdir (udat-cpkts-dir udata))) + (delete-file* (conc cpktdir "/" Z ".pkt")))) + +;; call all known peers and tell them to delete their info on the captain +;; thus forcing them to re-read pkts and connect to a new captain +;; call this when the captain needs to exit and if an older captain is +;; detected. Due to delays in sending file meta data in NFS multiple +;; captains can be initiated in a "Storm of Captains", book soon to be +;; on Amazon +;; +(define (drop-captain udata) + (let* ((peers (hash-table-keys (udat-peers udata))) + (cookie (make-cookie udata))) + (for-each + (lambda (host-port) + (send udata host-port 'dropcaptain cookie "nomsg" retval: #t)) + peers))) + +;;====================================================================== +;; server primitives +;;====================================================================== + +(define (make-cookie udata) + (let ((newcnum (+ (udat-cnum udata) 1))) + (udat-cnum-set! udata newcnum) + (conc (udat-my-address udata) ":" + (udat-my-port udata) "-" + (udat-my-pid udata) "-" + newcnum))) + +;; create a tcp listener and return a populated udat struct with +;; my port, address, hostname, pid etc. +;; return #f if fail to find a port to allocate. +;; +;; if udata-in is #f create the record +;; if there is already a serv-listener return the udata +;; +(define (start-server-find-port udata-in #!optional (port 4242)) + (let ((udata (or udata-in (make-udat)))) + (if (udat-serv-listener udata) ;; TODO - add check that the listener is alive and ready? + udata + (handle-exceptions + exn + (if (< port 65535) + (start-server-find-port udata (+ port 1)) + #f) + (connect-server udata port))))) + +(define (connect-server udata port) + ;; (tcp-listener-socket LISTENER)(socket-name so) + ;; sockaddr-address, sockaddr-port, sockaddr->string + (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]]) + (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname))) + (udat-my-address-set! udata addr) + (udat-my-port-set! udata port) + (udat-my-hostname-set! udata (get-host-name)) + (udat-serv-listener-set! udata tlsn) + udata)) + +(define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f)) + (let* ((pdat (or (udat-get-peer udata host-port) + (handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC + exn + #f + (let ((npdat (make-peer addr-port: host-port))) + (if hostname (peer-hostname-set! npdat hostname)) + (if pid (peer-pid-set! npdat pid)) + npdat))))) + pdat)) + +;; send structured data to recipient +;; +;; NOTE: qrykey is what was called the "cookie" previously +;; +;; retval tells send to expect and wait for return data (one line) and return it or time out +;; this is for ping where we don't want to necessarily have set up our own server yet. +;; +(define (send udata host-port handler qrykey data + #!key (hostname #f)(pid #f)(params '())(retval #f)) + (let* ((my-host-port (udat-my-host-port udata)) + (isme (equal? host-port my-host-port)) ;; am I calling + ;; myself? + (dat (list + handler ;; " " + my-host-port ;; " " + (udat-my-pid udata) ;; " " + qrykey + params ;;(if (null? params) "" (conc " " + ;;(string-intersperse params " "))) + ))) + ;; (print "send isme is " (if isme "true!" "false!") ", + ;; my-host-port: " my-host-port ", host-port: " host-port) + (if isme + (ulex-handler udata dat data) + (handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE + ;; SPECIFIC + exn + #f + (let-values (((inp oup)(tcp-connect host-port))) + ;; + ;; CONTROL LINE: + ;; handlerkey host:port pid qrykey params ... + ;; + (let ((res + (if (and inp oup) + (let* () + (if my-host-port + (begin + (write dat oup) + (write data oup) ;; send as sexpr + ;; (print "Sent dat: " dat " data: " data) + (if retval + (read inp) + #t)) + (begin + (print "ERROR: send called but no receiver has been setup. Please call setup first!") + #f)) + ;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE! + ;; (there is a listener for handling that) + ) + #f))) ;; #f means failed to connect and send + (close-input-port inp) + (close-output-port oup) + res)))))) + +;; send a request to the given host-port and register a mailbox in udata +;; wait for the mailbox data and return it +;; +(define (send-receive udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())(timeout 20)) + (let ((mbox (make-mailbox)) + (mbox-time (current-milliseconds)) + (mboxes (udat-mboxes udata))) + (hash-table-set! mboxes qrykey mbox) + (if (send udata host-port handler qrykey data hostname: hostname pid: pid params: params) + (let* ((mbox-timeout-secs timeout) + (mbox-timeout-result 'MBOX_TIMEOUT) + (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) + (mbox-receive-time (current-milliseconds))) + (hash-table-delete! mboxes qrykey) + (if (eq? res 'MBOX_TIMEOUT) + #f + res)) + #f))) ;; #f means failed to communicate + +;; +(define (ulex-handler udata controldat data) + (print "controldat: " controldat " data: " data) + (match controldat ;; (string-split controldat) + ((handlerkey host-port pid qrykey params ...) + ;; (print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params) + (case handlerkey ;; (string->symbol handlerkey) + ((ack)(print "Got ack!")) + ((ping) ;; special case - return result immediately on the same connection + (let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f)) + (val (if proc (proc) "gotping")) + (peer (make-peer addr-port: host-port pid: pid)) + (dbshash (udat-dbowners udata))) + (peer-dbs-set! peer params) ;; params for ping is list of dbs owned by pinger + (for-each (lambda (dbfile) + (hash-table-set! dbshash dbfile host-port)) ;; WRONG? + params) ;; register each db in the dbshash + (if (not (hash-table-exists? (udat-peers udata) host-port)) + (hash-table-set! (udat-peers udata) host-port peer)) ;; save the details of this caller in peers + qrykey)) ;; End of ping + ((goodbye) + ;; remove all traces of the caller in db ownership etc. + (let* ((peer (hash-table-ref/default (udat-peers udata) host-port #f)) + (dbs (if peer (peer-dbs peer) '())) + (dbshash (udat-dbowners udata))) + (for-each (lambda (dbfile)(hash-table-delete! dbshash dbfile)) dbs) + (hash-table-delete! (udat-peers udata) host-port) + qrykey)) + ((dropcaptain) + ;; remove all traces of the captain + (udat-captain-address-set! udata #f) + (udat-captain-host-set! udata #f) + (udat-captain-port-set! udata #f) + (udat-captain-pid-set! udata #f) + qrykey) + ((rucaptain) ;; remote is asking if I'm the captain + (if (udat-my-cpkt-key udata) "yes" "no")) + ((db-owner) ;; given a db name who do I send my queries to + ;; look up the file in handlers, if have an entry ping them to be sure + ;; they are still alive and then return that host:port. + ;; if no handler found or if the ping fails pick from peers the oldest that + ;; is managing the fewest dbs + (match params + ((dbfile dbtype) + (let* ((owner-host-port (hash-table-ref/default (udat-dbowners udata) dbfile #f))) + (if owner-host-port + (conc qrykey " " owner-host-port) + (let* ((pdat (or (hash-table-ref/default (udat-peers udata) host-port #f) ;; no owner - caller gets to own it! + (make-peer addr-port: host-port pid: pid dbs: `(,dbfile))))) + (hash-table-set! (udat-peers udata) host-port pdat) + (hash-table-set! (udat-dbowners udata) dbfile host-port) + (conc qrykey " " host-port))))) + (else (conc qrykey " BADDATA")))) + ;; for work items: + ;; handler is one of; immediate, read-only, read-write, high-priority + ((immediate read-only normal low-priority) ;; do this work immediately + ;; host-port (caller), pid (caller), qrykey (cookie), params <= all from first line + ;; data => a single line encoded however you want, or should I build json into it? + (print "handlerkey=" handlerkey) + (let* ((pdat (get-peer-dat udata host-port))) + (match params ;; dbfile prockey procparam + ((dbfile prockey procparam) + (case handlerkey + ((immediate read-only) + (process-request udata pdat dbfile qrykey prockey procparam data)) + ((normal low-priority) ;; split off later and add logic to support low priority + (add-to-work-queue udata pdat dbfile qrykey prockey procparam data)) + (else + #f))) + (else + (print "INFO: params=" params " handlerkey=" handlerkey " controldat=" controldat) + #f)))) + (else + ;; (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data) + #f))) + (else + (print "BAD DATA? controldat=" controldat " data=" data) + #f)));; handles the incoming messages and dispatches to queues + +;; +(define (ulex-handler-loop udata) + (let* ((serv-listener (udat-serv-listener udata))) + ;; data comes as two lines + ;; handlerkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db] + ;; data + (let loop ((state 'start)) + (let-values (((inp oup)(tcp-accept serv-listener))) + (let* ((controldat (read inp)) + (data (read inp)) + (resp (ulex-handler udata controldat data))) + (if resp (write resp oup)) + (close-input-port inp) + (close-output-port oup)) + (loop state))))) + +;; add a proc to the handler list, these are done symetrically (i.e. in all instances) +;; so that the proc can be dereferenced remotely +;; +(define (register-handler udata key proc) + (hash-table-set! (udat-handlers udata) key proc)) + + +;;====================================================================== +;; work queues +;;====================================================================== + +(define (add-to-work-queue udata peer-dat handlerkey qrykey data) + (let ((wdat (make-work peer-dat: peer-dat handlerkey: handlerkey qrykey: qrykey data: data))) + (if (udat-busy udata) + (queue-add! (udat-work-queue udata) wdat) + (process-work udata wdat)) ;; passing in wdat tells process-work to first process the passed in wdat + )) + +(define (do-work udata wdat) + #f) + +(define (process-work udata #!optional wdat) + (if wdat (do-work udata wdat)) ;; process wdat + (let ((wqueue (udat-work-queue udata))) + (if (not (queue-empty? wqueue)) + (let loop ((wd (queue-remove! wqueue))) + (do-work udata wd) + (if (not (queue-empty? wqueue)) + (loop (queue-remove! wqueue))))))) + +;;====================================================================== +;; Generic db handling +;; setup a inmem db instance +;; open connection to on-disk db +;; sync on-disk db to inmem +;; get lock in on-disk db for dbowner of this db +;; put sync-proc, init-proc, on-disk handle, inmem handle in dbconn stuct +;; return the stuct +;;====================================================================== + +(defstruct dbconn + (fname #f) + (inmem #f) + (conn #f) + (sync #f) ;; sync proc + (init #f) ;; init proc + (lastsync (current-seconds)) + ) + +(defstruct dbinfo + (initproc #f) + (syncproc #f)) + +;; open inmem and disk database +;; init with initproc +;; return db struct +;; +;; appname; megatest, ulex or something else. +;; +(define (setup-db-connection udata fname-in appname dbtype) + (let* ((is-ulex (eq? appname 'ulex)) + (dbinf (if is-ulex ;; ulex is a built-in special case + (make-dbinfo initproc: ulexdb-init syncproc: ulexdb-sync) + (hash-table-ref/default (udat-dbtypes udata) dbtype #f))) + (initproc (dbinfo-initproc dbinf)) + (syncproc (dbinfo-syncproc dbinf)) + (fname (if is-ulex + (conc (udat-ulex-dir udata) "/ulex.db") + fname-in)) + (inmem-db (open-and-initdb udata #f 'inmem (dbinfo-initproc dbinf))) + (disk-db (open-and-initdb udata fname 'disk (dbinfo-initproc dbinf)))) + (make-dbconn inmem: inmem-db conn: disk-db sync: syncproc init: initproc))) + +;; dest='inmem or 'disk +;; +(define (open-and-initdb udata filename dest init-proc) + (let* ((inmem (eq? dest 'inmem)) + (dbfile (if inmem + ":INMEM:" + filename)) + (dbexists (if inmem #t (file-exists? dbfile))) + (db (sqlite3:open-database dbfile))) + (sqlite3:set-busy-handler! db (sqlite3:make-busy-timeout 136000)) + (if (not dbexists) + (init-proc db)) + db)) + + +;;====================================================================== +;; Previous Ulex db stuff +;;====================================================================== + +(define (ulexdb-init db inmem) + (sqlite3:with-transaction + db + (lambda () + (for-each + (lambda (stmt) + (if stmt (sqlite3:execute db stmt))) + `("CREATE TABLE IF NOT EXISTS processes + (id INTEGER PRIMARY KEY, + host TEXT NOT NULL, + ipadr TEXT NOT NULL, + port INTEGER NOT NULL, + pid INTEGER NOT NULL, + regtime INTEGER DEFAULT (strftime('%s','now')), + last_update INTEGER DEFAULT (strftime('%s','now')));" + (if inmem + "CREATE TRIGGER IF NOT EXISTS update_proces_trigger AFTER UPDATE ON processes + FOR EACH ROW + BEGIN + UPDATE processes SET last_update=(strftime('%s','now')) + WHERE id=old.id; + END;" + #f)))))) + +;; open databases, do initial sync +(define (ulexdb-sync dbconndat udata) + #f) + + +) ;; END OF ULEX + + +;;; ;;====================================================================== +;;; ;; D E B U G H E L P E R S +;;; ;;====================================================================== +;;; +;;; (define (dbg> . args) +;;; (with-output-to-port (current-error-port) +;;; (lambda () +;;; (apply print "dbg> " args)))) +;;; +;;; (define (debug-pp . args) +;;; (if (get-environment-variable "ULEX_DEBUG") +;;; (with-output-to-port (current-error-port) +;;; (lambda () +;;; (apply pp args))))) +;;; +;;; (define *default-debug-port* (current-error-port)) +;;; +;;; (define (sdbg> fn stage-name stage-start stage-end start-time . message) +;;; (if (get-environment-variable "ULEX_DEBUG") +;;; (with-output-to-port *default-debug-port* +;;; (lambda () +;;; (apply print "ulex:" fn " " stage-name " took " (- (if stage-end stage-end (current-milliseconds)) stage-start) " ms. " +;;; (if start-time +;;; (conc "total time " (- (current-milliseconds) start-time) +;;; " ms.") +;;; "") +;;; message +;;; ))))) + +;;====================================================================== +;; M A C R O S +;;====================================================================== +;; iup callbacks are not dumping the stack, this is a work-around +;; + +;; Some of these routines use: +;; +;; http://www.cs.toronto.edu/~gfb/scheme/simple-macros.html +;; +;; Syntax for defining macros in a simple style similar to function definiton, +;; when there is a single pattern for the argument list and there are no keywords. +;; +;; (define-simple-syntax (name arg ...) body ...) +;; +;; +;; (define-syntax define-simple-syntax +;; (syntax-rules () +;; ((_ (name arg ...) body ...) +;; (define-syntax name (syntax-rules () ((name arg ...) (begin body ...))))))) +;; +;; (define-simple-syntax (catch-and-dump proc procname) +;; (handle-exceptions +;; exn +;; (begin +;; (print-call-chain (current-error-port)) +;; (with-output-to-port (current-error-port) +;; (lambda () +;; (print ((condition-property-accessor 'exn 'message) exn)) +;; (print "Callback error in " procname) +;; (print "Full condition info:\n" (condition->list exn))))) +;; (proc))) +;; +;; +;;====================================================================== +;; R E C O R D S +;;====================================================================== + +;;; ;; information about me as a server +;;; ;; +;;; (defstruct area +;;; ;; about this area +;;; (useportlogger #f) +;;; (lowport 32768) +;;; (server-type 'auto) ;; auto=create up to five servers/pkts, main=create pkts, passive=no pkt (unless there are no pkts at all) +;;; (conn #f) +;;; (port #f) +;;; (myaddr (get-my-best-address)) +;;; pktid ;; get pkt from hosts table if needed +;;; pktfile +;;; pktsdir +;;; dbdir +;;; (dbhandles (make-hash-table)) ;; fname => list-of-dbh, NOTE: Should really never need more than one? +;;; (mutex (make-mutex)) +;;; (rtable (make-hash-table)) ;; registration table of available actions +;;; (dbs (make-hash-table)) ;; filename => random number, used for choosing what dbs I serve +;;; ;; about other servers +;;; (hosts (make-hash-table)) ;; key => hostdat +;;; (hoststats (make-hash-table)) ;; key => alist of fname => ( qcount . qtime ) +;;; (reqs (make-hash-table)) ;; uri => queue +;;; ;; work queues +;;; (wqueues (make-hash-table)) ;; fname => qdat +;;; (stats (make-hash-table)) ;; fname => totalqueries +;;; (last-srvup (current-seconds)) ;; last time we updated the known servers +;;; (cookie2mbox (make-hash-table)) ;; map cookie for outstanding request to mailbox of awaiting call +;;; (ready #f) +;;; (health (make-hash-table)) ;; ipaddr:port => num failed pings since last good ping +;;; ) +;;; +;;; ;; host stats +;;; ;; +;;; (defstruct hostdat +;;; (pkt #f) +;;; (dbload (make-hash-table)) ;; "dbfile.db" => queries/min +;;; (hostload #f) ;; normalized load ( 5min load / numcpus ) +;;; ) +;;; +;;; ;; dbdat +;;; ;; +;;; (defstruct dbdat +;;; (dbh #f) +;;; (fname #f) +;;; (write-access #f) +;;; (sths (make-hash-table)) ;; hash mapping query strings to handles +;;; ) +;;; +;;; ;; qdat +;;; ;; +;;; (defstruct qdat +;;; (writeq (make-queue)) +;;; (readq (make-queue)) +;;; (rwq (make-queue)) +;;; (logq (make-queue)) ;; do we need a queue for logging? yes, if we use sqlite3 db for logging +;;; (osshort (make-queue)) +;;; (oslong (make-queue)) +;;; (misc (make-queue)) ;; used for things like ping-full +;;; ) +;;; +;;; ;; calldat +;;; ;; +;;; (defstruct calldat +;;; (ctype 'dbwrite) +;;; (obj #f) ;; this would normally be an SQL statement e.g. SELECT, INSERT etc. +;;; (rtime (current-milliseconds))) +;;; +;;; ;; make it a global? Well, it is local to area module +;;; +;;; (define *pktspec* +;;; `((server (hostname . h) +;;; (port . p) +;;; (pid . i) +;;; (ipaddr . a) +;;; ) +;;; (data (hostname . h) ;; sender hostname +;;; (port . p) ;; sender port +;;; (ipaddr . a) ;; sender ip +;;; (hostkey . k) ;; sending host key - store info at server under this key +;;; (servkey . s) ;; server key - this needs to match at server end or reject the msg +;;; (format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json +;;; (data . d) ;; base64 encoded slln data +;;; ))) +;;; +;;; ;; work item +;;; ;; +;;; (defstruct witem +;;; (rhost #f) ;; return host +;;; (ripaddr #f) ;; return ipaddr +;;; (rport #f) ;; return port +;;; (servkey #f) ;; the packet representing the client of this workitem, used by final send-message +;;; (rdat #f) ;; the request - usually an sql query, type is rdat +;;; (action #f) ;; the action: immediate, dbwrite, dbread,oslong, osshort +;;; (cookie #f) ;; cookie id for response +;;; (data #f) ;; the data payload, i.e. parameters +;;; (result #f) ;; the result from processing the data +;;; (caller #f)) ;; the calling peer according to rpc itself +;;; +;;; (define (trim-pktid pktid) +;;; (if (string? pktid) +;;; (substring pktid 0 4) +;;; "nopkt")) +;;; +;;; (define (any->number num) +;;; (cond +;;; ((number? num) num) +;;; ((string? num) (string->number num)) +;;; (else num))) +;;; +;;; (use trace) +;;; (trace-call-sites #t) +;;; +;;; ;;====================================================================== +;;; ;; D A T A B A S E H A N D L I N G +;;; ;;====================================================================== +;;; +;;; ;; look in dbhandles for a db, return it, else return #f +;;; ;; +;;; (define (get-dbh acfg fname) +;;; (let ((dbh-lst (hash-table-ref/default (area-dbhandles acfg) fname '()))) +;;; (if (null? dbh-lst) +;;; (begin +;;; ;; (print "opening db for " fname) +;;; (open-db acfg fname)) ;; Note that the handles get put back in the queue in the save-dbh calls +;;; (let ((rem-lst (cdr dbh-lst))) +;;; ;; (print "re-using saved connection for " fname) +;;; (hash-table-set! (area-dbhandles acfg) fname rem-lst) +;;; (car dbh-lst))))) +;;; +;;; (define (save-dbh acfg fname dbdat) +;;; ;; (print "saving dbh for " fname) +;;; (hash-table-set! (area-dbhandles acfg) fname (cons dbdat (hash-table-ref/default (area-dbhandles acfg) fname '())))) +;;; +;;; ;; open the database, if never before opened init it. put the handle in the +;;; ;; open db's hash table +;;; ;; returns: the dbdat +;;; ;; +;;; (define (open-db acfg fname) +;;; (let* ((fullname (conc (area-dbdir acfg) "/" fname)) +;;; (exists (file-exists? fullname)) +;;; (write-access (if exists +;;; (file-write-access? fullname) +;;; (file-write-access? (area-dbdir acfg)))) +;;; (db (sqlite3:open-database fullname)) +;;; (handler (sqlite3:make-busy-timeout 136000)) +;;; ) +;;; (sqlite3:set-busy-handler! db handler) +;;; (sqlite3:execute db "PRAGMA synchronous = 0;") +;;; (if (not exists) ;; need to init the db +;;; (if write-access +;;; (let ((isql (get-rsql acfg 'dbinitsql))) ;; get the init sql statements +;;; ;; (sqlite3:with-transaction +;;; ;; db +;;; ;; (lambda () +;;; (if isql +;;; (for-each +;;; (lambda (sql) +;;; (sqlite3:execute db sql)) +;;; isql))) +;;; (print "ERROR: no write access to " (area-dbdir acfg)))) +;;; (make-dbdat dbh: db fname: fname write-access: write-access))) +;;; +;;; ;; This is a low-level command to retrieve or to prepare, save and return a prepared statment +;;; ;; you must extract the db handle +;;; ;; +;;; (define (get-sth db cache stmt) +;;; (if (hash-table-exists? cache stmt) +;;; (begin +;;; ;; (print "Reusing cached stmt for " stmt) +;;; (hash-table-ref/default cache stmt #f)) +;;; (let ((sth (sqlite3:prepare db stmt))) +;;; (hash-table-set! cache stmt sth) +;;; ;; (print "prepared stmt for " stmt) +;;; sth))) +;;; +;;; ;; a little more expensive but does all the tedious deferencing - only use if you don't already +;;; ;; have dbdat and db sitting around +;;; ;; +;;; (define (full-get-sth acfg fname stmt) +;;; (let* ((dbdat (get-dbh acfg fname)) +;;; (db (dbdat-dbh dbdat)) +;;; (sths (dbdat-sths dbdat))) +;;; (get-sth db sths stmt))) +;;; +;;; ;; write to a db +;;; ;; acfg: area data +;;; ;; rdat: request data +;;; ;; hdat: (host . port) +;;; ;; +;;; ;; (define (dbwrite acfg rdat hdat data-in) +;;; ;; (let* ((dbname (car data-in)) +;;; ;; (dbdat (get-dbh acfg dbname)) +;;; ;; (db (dbdat-dbh dbdat)) +;;; ;; (sths (dbdat-sths dbdat)) +;;; ;; (stmt (calldat-obj rdat)) +;;; ;; (sth (get-sth db sths stmt)) +;;; ;; (data (cdr data-in))) +;;; ;; (print "dbname: " dbname " acfg: " acfg " rdat: " (calldat->alist rdat) " hdat: " hdat " data: " data) +;;; ;; (print "dbdat: " (dbdat->alist dbdat)) +;;; ;; (apply sqlite3:execute sth data) +;;; ;; (save-dbh acfg dbname dbdat) +;;; ;; #t +;;; ;; )) +;;; +;;; (define (finalize-all-db-handles acfg) +;;; (let* ((dbhandles (area-dbhandles acfg)) ;; dbhandles is hash of fname ==> dbdat +;;; (num 0)) +;;; (for-each +;;; (lambda (area-name) +;;; (print "Closing handles for " area-name) +;;; (let ((dbdats (hash-table-ref/default dbhandles area-name '()))) +;;; (for-each +;;; (lambda (dbdat) +;;; ;; first close all statement handles +;;; (for-each +;;; (lambda (sth) +;;; (sqlite3:finalize! sth) +;;; (set! num (+ num 1))) +;;; (hash-table-values (dbdat-sths dbdat))) +;;; ;; now close the dbh +;;; (set! num (+ num 1)) +;;; (sqlite3:finalize! (dbdat-dbh dbdat))) +;;; dbdats))) +;;; (hash-table-keys dbhandles)) +;;; (print "FINALIZED " num " dbhandles"))) +;;; +;;; ;;====================================================================== +;;; ;; W O R K Q U E U E H A N D L I N G +;;; ;;====================================================================== +;;; +;;; (define (register-db-as-mine acfg dbname) +;;; (let ((ht (area-dbs acfg))) +;;; (if (not (hash-table-ref/default ht dbname #f)) +;;; (hash-table-set! ht dbname (random 10000))))) +;;; +;;; (define (work-queue-add acfg fname witem) +;;; (let* ((work-queue-start (current-milliseconds)) +;;; (action (witem-action witem)) ;; NB the action is the index into the rdat actions +;;; (qdat (or (hash-table-ref/default (area-wqueues acfg) fname #f) +;;; (let ((newqdat (make-qdat))) +;;; (hash-table-set! (area-wqueues acfg) fname newqdat) +;;; newqdat))) +;;; (rdat (hash-table-ref/default (area-rtable acfg) action #f))) +;;; (if rdat +;;; (queue-add! +;;; (case (calldat-ctype rdat) +;;; ((dbwrite) (register-db-as-mine acfg fname)(qdat-writeq qdat)) +;;; ((dbread) (register-db-as-mine acfg fname)(qdat-readq qdat)) +;;; ((dbrw) (register-db-as-mine acfg fname)(qdat-rwq qdat)) +;;; ((oslong) (qdat-oslong qdat)) +;;; ((osshort) (qdat-osshort qdat)) +;;; ((full-ping) (qdat-misc qdat)) +;;; (else +;;; (print "ERROR: no queue for " action ". Adding to dbwrite queue.") +;;; (qdat-writeq qdat))) +;;; witem) +;;; (case action +;;; ((full-ping)(qdat-misc qdat)) +;;; (else +;;; (print "ERROR: No action " action " was registered")))) +;;; (sdbg> "work-queue-add" "queue-add" work-queue-start #f #f) +;;; #t)) ;; for now, simply return #t to indicate request got to the queue +;;; +;;; (define (doqueue acfg q fname dbdat dbh) +;;; ;; (print "doqueue: " fname) +;;; (let* ((start-time (current-milliseconds)) +;;; (qlen (queue-length q))) +;;; (if (> qlen 1) +;;; (print "Processing queue of length " qlen)) +;;; (let loop ((count 0) +;;; (responses '())) +;;; (let ((delta (- (current-milliseconds) start-time))) +;;; (if (or (queue-empty? q) +;;; (> delta 400)) ;; stop working on this queue after 400ms have passed +;;; (list count delta responses) ;; return count, delta and responses list +;;; (let* ((witem (queue-remove! q)) +;;; (action (witem-action witem)) +;;; (rdat (witem-rdat witem)) +;;; (stmt (calldat-obj rdat)) +;;; (sth (full-get-sth acfg fname stmt)) +;;; (ctype (calldat-ctype rdat)) +;;; (data (witem-data witem)) +;;; (cookie (witem-cookie witem))) +;;; ;; do the processing and save the result in witem-result +;;; (witem-result-set! +;;; witem +;;; (case ctype ;; action +;;; ((noblockwrite) ;; blind write, no ack of success returned +;;; (apply sqlite3:execute sth data) +;;; (sqlite3:last-insert-rowid dbh)) +;;; ((dbwrite) ;; blocking write +;;; (apply sqlite3:execute sth data) +;;; #t) +;;; ((dbread) ;; TODO: consider breaking this up and shipping in pieces for large query +;;; (apply sqlite3:map-row (lambda x x) sth data)) +;;; ((full-ping) 'full-ping) +;;; (else (print "Not ready for action " action) #f))) +;;; (loop (add1 count) +;;; (if cookie +;;; (cons witem responses) +;;; responses)))))))) +;;; +;;; ;; do up to 400ms of processing on each queue +;;; ;; - the work-queue-processor will allow the max 1200ms of work to complete but it will flag as overloaded +;;; ;; +;;; (define (process-db-queries acfg fname) +;;; (if (hash-table-exists? (area-wqueues acfg) fname) +;;; (let* ((process-db-queries-start-time (current-milliseconds)) +;;; (qdat (hash-table-ref/default (area-wqueues acfg) fname #f)) +;;; (queue-sym->queue (lambda (queue-sym) +;;; (case queue-sym ;; lookup the queue from qdat given a name (symbol) +;;; ((wqueue) (qdat-writeq qdat)) +;;; ((rqueue) (qdat-readq qdat)) +;;; ((rwqueue) (qdat-rwq qdat)) +;;; ((misc) (qdat-misc qdat)) +;;; (else #f)))) +;;; (dbdat (get-dbh acfg fname)) +;;; (dbh (if (dbdat? dbdat)(dbdat-dbh dbdat) #f)) +;;; (nowtime (current-seconds))) +;;; ;; handle the queues that require a transaction +;;; ;; +;;; (map ;; +;;; (lambda (queue-sym) +;;; ;; (print "processing queue " queue-sym) +;;; (let* ((queue (queue-sym->queue queue-sym))) +;;; (if (not (queue-empty? queue)) +;;; (let ((responses +;;; (sqlite3:with-transaction ;; todo - catch exceptions... +;;; dbh +;;; (lambda () +;;; (let* ((res (doqueue acfg queue fname dbdat dbh))) ;; this does the work! +;;; ;; (print "res=" res) +;;; (match res +;;; ((count delta responses) +;;; (update-stats acfg fname queue-sym delta count) +;;; (sdbg> "process-db-queries" "sqlite3-transaction" process-db-queries-start-time #f #f) +;;; responses) ;; return responses +;;; (else +;;; (print "ERROR: bad return data from doqueue " res))) +;;; ))))) +;;; ;; having completed the transaction, send the responses. +;;; ;; (print "INFO: sending " (length responses) " responses.") +;;; (let loop ((responses-left responses)) +;;; (cond +;;; ((null? responses-left) #t) +;;; (else +;;; (let* ((witem (car responses-left)) +;;; (response (cdr responses-left))) +;;; (call-deliver-response acfg (witem-ripaddr witem)(witem-rport witem) +;;; (witem-cookie witem)(witem-result witem))) +;;; (loop (cdr responses-left)))))) +;;; ))) +;;; '(wqueue rwqueue rqueue)) +;;; +;;; ;; handle misc queue +;;; ;; +;;; ;; (print "processing misc queue") +;;; (let ((queue (queue-sym->queue 'misc))) +;;; (doqueue acfg queue fname dbdat dbh)) +;;; ;; .... +;;; (save-dbh acfg fname dbdat) +;;; #t ;; just to let the tests know we got here +;;; ) +;;; #f ;; nothing processed +;;; )) +;;; +;;; ;; run all queues in parallel per db but sequentially per queue for that db. +;;; ;; - process the queues every 500 or so ms +;;; ;; - allow for long running queries to continue but all other activities for that +;;; ;; db will be blocked. +;;; ;; +;;; (define (work-queue-processor acfg) +;;; (let* ((threads (make-hash-table))) ;; fname => thread +;;; (let loop ((fnames (hash-table-keys (area-wqueues acfg))) +;;; (target-time (+ (current-milliseconds) 50))) +;;; ;;(if (not (null? fnames))(print "Processing for these databases: " fnames)) +;;; (for-each +;;; (lambda (fname) +;;; ;; (print "processing for " fname) +;;; ;;(process-db-queries acfg fname)) +;;; (let ((th (hash-table-ref/default threads fname #f))) +;;; (if (and th (not (member (thread-state th) '(dead terminated)))) +;;; (begin +;;; (print "WARNING: worker thread for " fname " is taking a long time.") +;;; (print "Thread is in state " (thread-state th))) +;;; (let ((th1 (make-thread (lambda () +;;; ;; (catch-and-dump +;;; ;; (lambda () +;;; ;; (print "Process queries for " fname) +;;; (let ((start-time (current-milliseconds))) +;;; (process-db-queries acfg fname) +;;; ;; (thread-sleep! 0.01) ;; need the thread to take at least some time +;;; (hash-table-delete! threads fname)) ;; no mutexes? +;;; fname) +;;; "th1"))) ;; )) +;;; (hash-table-set! threads fname th1) +;;; (thread-start! th1))))) +;;; fnames) +;;; ;; (thread-sleep! 0.1) ;; give the threads some time to process requests +;;; ;; burn time until 400ms is up +;;; (let ((now-time (current-milliseconds))) +;;; (if (< now-time target-time) +;;; (let ((delta (- target-time now-time))) +;;; (thread-sleep! (/ delta 1000))))) +;;; (loop (hash-table-keys (area-wqueues acfg)) +;;; (+ (current-milliseconds) 50))))) +;;; +;;; ;;====================================================================== +;;; ;; S T A T S G A T H E R I N G +;;; ;;====================================================================== +;;; +;;; (defstruct stat +;;; (qcount-avg 0) ;; coarse running average +;;; (qtime-avg 0) ;; coarse running average +;;; (qcount 0) ;; total +;;; (qtime 0) ;; total +;;; (last-qcount 0) ;; last +;;; (last-qtime 0) ;; last +;;; (dbs '()) ;; list of db files handled by this node +;;; (when 0)) ;; when the last query happened - seconds +;;; +;;; +;;; (define (update-stats acfg fname bucket duration numqueries) +;;; (let* ((key fname) ;; for now do not use bucket. Was: (conc fname "-" bucket)) ;; lazy but good enough +;;; (stats (or (hash-table-ref/default (area-stats acfg) key #f) +;;; (let ((newstats (make-stat))) +;;; (hash-table-set! (area-stats acfg) key newstats) +;;; newstats)))) +;;; ;; when the last query happended (used to remove the fname from the active list) +;;; (stat-when-set! stats (current-seconds)) +;;; ;; last values +;;; (stat-last-qcount-set! stats numqueries) +;;; (stat-last-qtime-set! stats duration) +;;; ;; total over process lifetime +;;; (stat-qcount-set! stats (+ (stat-qcount stats) numqueries)) +;;; (stat-qtime-set! stats (+ (stat-qtime stats) duration)) +;;; ;; coarse average +;;; (stat-qcount-avg-set! stats (/ (+ (stat-qcount-avg stats) numqueries) 2)) +;;; (stat-qtime-avg-set! stats (/ (+ (stat-qtime-avg stats) duration) 2)) +;;; +;;; ;; here is where we add the stats for a given dbfile +;;; (if (not (member fname (stat-dbs stats))) +;;; (stat-dbs-set! stats (cons fname (stat-dbs stats)))) +;;; +;;; )) +;;; +;;; ;;====================================================================== +;;; ;; S E R V E R S T U F F +;;; ;;====================================================================== +;;; +;;; ;; this does NOT return! +;;; ;; +;;; (define (find-free-port-and-open acfg) +;;; (let ((port (or (area-port acfg) 3200))) +;;; (handle-exceptions +;;; exn +;;; (begin +;;; (print "INFO: cannot bind to port " (rpc:default-server-port) ", trying next port") +;;; (area-port-set! acfg (+ port 1)) +;;; (find-free-port-and-open acfg)) +;;; (rpc:default-server-port port) +;;; (area-port-set! acfg port) +;;; (tcp-read-timeout 120000) +;;; ;; ((rpc:make-server (tcp-listen port)) #t) +;;; (tcp-listen (rpc:default-server-port) +;;; )))) +;;; +;;; ;; register this node by putting a packet into the pkts dir. +;;; ;; look for other servers +;;; ;; contact other servers and compile list of servers +;;; ;; there are two types of server +;;; ;; main servers - dashboards, runners and dedicated servers - need pkt +;;; ;; passive servers - test executers, step calls, list-runs - no pkt +;;; ;; +;;; (define (register-node acfg hostip port-num) +;;; ;;(mutex-lock! (area-mutex acfg)) +;;; (let* ((server-type (area-server-type acfg)) ;; auto, main, passive (no pkt created) +;;; (best-ip (or hostip (get-my-best-address))) +;;; (mtdir (area-dbdir acfg)) +;;; (pktdir (area-pktsdir acfg))) ;; conc mtdir "/.server-pkts"))) +;;; (print "Registering node " best-ip ":" port-num) +;;; (if (not mtdir) ;; require a home for this node to put or find databases +;;; #f +;;; (begin +;;; (if (not (directory? pktdir))(create-directory pktdir)) +;;; ;; server is started, now create pkt if needed +;;; (print "Starting server in " server-type " mode with port " port-num) +;;; (if (member server-type '(auto main)) ;; TODO: if auto, count number of servers registers, if > 3 then don't put out a pkt +;;; (begin +;;; (area-pktid-set! acfg +;;; (write-alist->pkt +;;; pktdir +;;; `((hostname . ,(get-host-name)) +;;; (ipaddr . ,best-ip) +;;; (port . ,port-num) +;;; (pid . ,(current-process-id))) +;;; pktspec: *pktspec* +;;; ptype: 'server)) +;;; (area-pktfile-set! acfg (conc pktdir "/" (area-pktid acfg) ".pkt")))) +;;; (area-port-set! acfg port-num) +;;; #;(mutex-unlock! (area-mutex acfg)))))) +;;; +;;; (define *cookie-seqnum* 0) +;;; (define (make-cookie key) +;;; (set! *cookie-seqnum* (add1 *cookie-seqnum*)) +;;; ;;(print "MAKE COOKIE CALLED -- on "servkey"-"*cookie-seqnum*) +;;; (conc key "-" *cookie-seqnum*) +;;; ) +;;; +;;; ;; dispatch locally if possible +;;; ;; +;;; (define (call-deliver-response acfg ipaddr port cookie data) +;;; (if (and (equal? (area-myaddr acfg) ipaddr) +;;; (equal? (area-port acfg) port)) +;;; (deliver-response acfg cookie data) +;;; ((rpc:procedure 'response ipaddr port) cookie data))) +;;; +;;; (define (deliver-response acfg cookie data) +;;; (let ((deliver-response-start (current-milliseconds))) +;;; (thread-start! (make-thread +;;; (lambda () +;;; (let loop ((tries-left 5)) +;;; ;;(print "TOP OF DELIVER_RESPONSE LOOP; triesleft="tries-left) +;;; ;;(pp (hash-table->alist (area-cookie2mbox acfg))) +;;; (let* ((mbox (hash-table-ref/default (area-cookie2mbox acfg) cookie #f))) +;;; (cond +;;; ((eq? 0 tries-left) +;;; (print "ulex:deliver-response: I give up. Mailbox never appeared. cookie="cookie) +;;; ) +;;; (mbox +;;; ;;(print "got mbox="mbox" got data="data" send.") +;;; (mailbox-send! mbox data)) +;;; (else +;;; ;;(print "no mbox yet. look for "cookie) +;;; (thread-sleep! (/ (- 6 tries-left) 10)) +;;; (loop (sub1 tries-left)))))) +;;; ;; (debug-pp (list (conc "ulex:deliver-response took " (- (current-milliseconds) deliver-response-start) " ms, cookie=" cookie " data=") data)) +;;; (sdbg> "deliver-response" "mailbox-send" deliver-response-start #f #f cookie) +;;; ) +;;; (conc "deliver-response thread for cookie="cookie)))) +;;; #t) +;;; +;;; ;; action: +;;; ;; immediate - quick actions, no need to put in queues +;;; ;; dbwrite - put in dbwrite queue +;;; ;; dbread - put in dbread queue +;;; ;; oslong - os actions, e.g. du, that could take a long time +;;; ;; osshort - os actions that should be quick, e.g. df +;;; ;; +;;; (define (request acfg from-ipaddr from-port servkey action cookie fname params) ;; std-peer-handler +;;; ;; NOTE: Use rpc:current-peer for getting return address +;;; (let* ((std-peer-handler-start (current-milliseconds)) +;;; ;; (raw-data (alist-ref 'data dat)) +;;; (rdat (hash-table-ref/default +;;; (area-rtable acfg) action #f)) ;; this looks up the sql query or other details indexed by the action +;;; (witem (make-witem ripaddr: from-ipaddr ;; rhost: from-host +;;; rport: from-port action: action +;;; rdat: rdat cookie: cookie +;;; servkey: servkey data: params ;; TODO - rename data to params +;;; caller: (rpc:current-peer)))) +;;; (if (not (equal? servkey (area-pktid acfg))) +;;; `(#f . ,(conc "I don't know you servkey=" servkey ", pktid=" (area-pktid acfg))) ;; immediately return this +;;; (let* ((ctype (if rdat +;;; (calldat-ctype rdat) ;; is this necessary? these should be identical +;;; action))) +;;; (sdbg> "std-peer-handler" "immediate" std-peer-handler-start #f #f) +;;; (case ctype +;;; ;; (dbwrite acfg rdat (cons from-ipaddr from-port) data))) +;;; ((full-ping) `(#t "ack to full ping" ,(work-queue-add acfg fname witem) ,cookie)) +;;; ((response) `(#t "ack from requestor" ,(deliver-response acfg fname params))) +;;; ((dbwrite) `(#t "db write submitted" ,(work-queue-add acfg fname witem) ,cookie)) +;;; ((dbread) `(#t "db read submitted" ,(work-queue-add acfg fname witem) ,cookie )) +;;; ((dbrw) `(#t "db read/write submitted" ,cookie)) +;;; ((osshort) `(#t "os short submitted" ,cookie)) +;;; ((oslong) `(#t "os long submitted" ,cookie)) +;;; (else `(#f "unrecognised action" ,ctype))))))) +;;; +;;; ;; Call this to start the actual server +;;; ;; +;;; ;; start_server +;;; ;; +;;; ;; mode: ' +;;; ;; handler: proc which takes pktrecieved as argument +;;; ;; +;;; +;;; (define (start-server acfg) +;;; (let* ((conn (find-free-port-and-open acfg)) +;;; (port (area-port acfg))) +;;; (rpc:publish-procedure! +;;; 'delist-db +;;; (lambda (fname) +;;; (hash-table-delete! (area-dbs acfg) fname))) +;;; (rpc:publish-procedure! +;;; 'calling-addr +;;; (lambda () +;;; (rpc:current-peer))) +;;; (rpc:publish-procedure! +;;; 'ping +;;; (lambda ()(real-ping acfg))) +;;; (rpc:publish-procedure! +;;; 'request +;;; (lambda (from-addr from-port servkey action cookie dbname params) +;;; (request acfg from-addr from-port servkey action cookie dbname params))) +;;; (rpc:publish-procedure! +;;; 'response +;;; (lambda (cookie res-dat) +;;; (deliver-response acfg cookie res-dat))) +;;; (area-ready-set! acfg #t) +;;; (area-conn-set! acfg conn) +;;; ((rpc:make-server conn) #f)));; ((tcp-listen (rpc:default-server-port)) #t) +;;; +;;; +;;; (define (launch acfg) ;; #!optional (proc std-peer-handler)) +;;; (print "starting launch") +;;; (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers) +;;; #;(let ((original-handler (current-exception-handler))) ;; is th +;;; (lambda (exception) +;;; (server-exit-procedure) +;;; (original-handler exception))) +;;; (on-exit (lambda () +;;; (shutdown acfg))) ;; (finalize-all-db-handles acfg))) +;;; ;; set up the rpc handler +;;; (let* ((th1 (make-thread +;;; (lambda ()(start-server acfg)) +;;; "server thread")) +;;; (th2 (make-thread +;;; (lambda () +;;; (print "th2 starting") +;;; (let loop () +;;; (work-queue-processor acfg) +;;; (print "work-queue-processor crashed!") +;;; (loop))) +;;; "work queue thread"))) +;;; (thread-start! th1) +;;; (thread-start! th2) +;;; (let loop () +;;; (thread-sleep! 0.025) +;;; (if (area-ready acfg) +;;; #t +;;; (loop))) +;;; ;; attempt to fix my address +;;; (let* ((all-addr (get-all-ips-sorted))) ;; could use (tcp-addresses conn)? +;;; (let loop ((rem-addrs all-addr)) +;;; (if (null? rem-addrs) +;;; (begin +;;; (print "ERROR: Failed to figure out the ip address of myself as a server. Giving up.") +;;; (exit 1)) ;; BUG Changeme to raising an exception +;;; +;;; (let* ((addr (car rem-addrs)) +;;; (good-addr (handle-exceptions +;;; exn +;;; #f +;;; ((rpc:procedure 'calling-addr addr (area-port acfg)))))) +;;; (if good-addr +;;; (begin +;;; (print "Got good-addr of " good-addr) +;;; (area-myaddr-set! acfg good-addr)) +;;; (loop (cdr rem-addrs))))))) +;;; (register-node acfg (area-myaddr acfg)(area-port acfg)) +;;; (print "INFO: Server started on " (area-myaddr acfg) ":" (area-port acfg)) +;;; ;; (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers) +;;; )) +;;; +;;; (define (clear-server-pkt acfg) +;;; (let ((pktf (area-pktfile acfg))) +;;; (if pktf (delete-file* pktf)))) +;;; +;;; (define (shutdown acfg) +;;; (let (;;(conn (area-conn acfg)) +;;; (pktf (area-pktfile acfg)) +;;; (port (area-port acfg))) +;;; (if pktf (delete-file* pktf)) +;;; (send-all "imshuttingdown") +;;; ;; (rpc:close-all-connections!) ;; don't know if this is actually needed +;;; (finalize-all-db-handles acfg))) +;;; +;;; (define (send-all msg) +;;; #f) +;;; +;;; ;; given a area record look up all the packets +;;; ;; +;;; (define (get-all-server-pkts acfg) +;;; (let ((all-pkt-files (glob (conc (area-pktsdir acfg) "/*.pkt")))) +;;; (map (lambda (pkt-file) +;;; (read-pkt->alist pkt-file pktspec: *pktspec*)) +;;; all-pkt-files))) +;;; +;;; #;((Z . "9a0212302295a19610d5796fce0370fa130758e9") +;;; (port . "34827") +;;; (pid . "28748") +;;; (hostname . "zeus") +;;; (T . "server") +;;; (D . "1549427032.0")) +;;; +;;; #;(define (get-my-best-address) +;;; (let ((all-my-addresses (get-all-ips))) ;; (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name)))))) +;;; (cond +;;; ((null? all-my-addresses) +;;; (get-host-name)) ;; no interfaces? +;;; ((eq? (length all-my-addresses) 1) +;;; (ip->string (car all-my-addresses))) ;; only one to choose from, just go with it +;;; (else +;;; (ip->string (car (filter (lambda (x) ;; take any but 127. +;;; (not (eq? (u8vector-ref x 0) 127))) +;;; all-my-addresses))))))) +;;; +;;; ;; whoami? I am my pkt +;;; ;; +;;; (define (whoami? acfg) +;;; (hash-table-ref/default (area-hosts acfg)(area-pktid acfg) #f)) +;;; +;;; ;;====================================================================== +;;; ;; "Client side" operations +;;; ;;====================================================================== +;;; +;;; (define (safe-call call-key host port . params) +;;; (handle-exceptions +;;; exn +;;; (begin +;;; (print "Call " call-key " to " host ":" port " failed") +;;; #f) +;;; (apply (rpc:procedure call-key host port) params))) +;;; +;;; ;; ;; convert to/from string / sexpr +;;; ;; +;;; ;; (define (string->sexpr str) +;;; ;; (if (string? str) +;;; ;; (with-input-from-string str read) +;;; ;; str)) +;;; ;; +;;; ;; (define (sexpr->string s) +;;; ;; (with-output-to-string (lambda ()(write s)))) +;;; +;;; ;; is the server alive? +;;; ;; +;;; (define (ping acfg host port) +;;; (let* ((myaddr (area-myaddr acfg)) +;;; (myport (area-port acfg)) +;;; (start-time (current-milliseconds)) +;;; (res (if (and (equal? myaddr host) +;;; (equal? myport port)) +;;; (real-ping acfg) +;;; ((rpc:procedure 'ping host port))))) +;;; (cons (- (current-milliseconds) start-time) +;;; res))) +;;; +;;; ;; returns ( ipaddr port alist-fname=>randnum ) +;;; (define (real-ping acfg) +;;; `(,(area-myaddr acfg) ,(area-port acfg) ,(get-host-stats acfg))) +;;; +;;; ;; is the server alive AND the queues processing? +;;; ;; +;;; #;(define (full-ping acfg servpkt) +;;; (let* ((start-time (current-milliseconds)) +;;; (res (send-message acfg servpkt '(full-ping) 'full-ping))) +;;; (cons (- (current-milliseconds) start-time) +;;; res))) ;; (equal? res "got ping")))) +;;; +;;; +;;; ;; look up all pkts and get the server id (the hash), port, host/ip +;;; ;; store this info in acfg +;;; ;; return the number of responsive servers found +;;; ;; +;;; ;; DO NOT VERIFY THAT THE SERVER IS ALIVE HERE. This is called at times where the current server is not yet alive and cannot ping itself +;;; ;; +;;; (define (update-known-servers acfg) +;;; ;; readll all pkts +;;; ;; foreach pkt; if it isn't me ping the server; if alive, add to hosts hash, else rm the pkt +;;; (let* ((start-time (current-milliseconds)) +;;; (all-pkts (delete-duplicates +;;; (append (get-all-server-pkts acfg) +;;; (hash-table-values (area-hosts acfg))))) +;;; (hostshash (area-hosts acfg)) +;;; (my-id (area-pktid acfg)) +;;; (pktsdir (area-pktsdir acfg)) ;; needed to remove pkts from non-responsive servers +;;; (numsrvs 0) +;;; (delpkt (lambda (pktsdir sid) +;;; (print "clearing out server " sid) +;;; (delete-file* (conc pktsdir "/" sid ".pkt")) +;;; (hash-table-delete! hostshash sid)))) +;;; (area-last-srvup-set! acfg (current-seconds)) +;;; (for-each +;;; (lambda (servpkt) +;;; (if (list? servpkt) +;;; ;; (pp servpkt) +;;; (let* ((shost (alist-ref 'ipaddr servpkt)) +;;; (sport (any->number (alist-ref 'port servpkt))) +;;; (res (handle-exceptions +;;; exn +;;; (begin +;;; ;; (print "INFO: bad server on " shost ":" sport) +;;; #f) +;;; (ping acfg shost sport))) +;;; (sid (alist-ref 'Z servpkt)) ;; Z code is our name for the server +;;; (url (conc shost ":" sport)) +;;; ) +;;; #;(if (or (not res) +;;; (null? res)) +;;; (begin +;;; (print "STRANGE: ping of " url " gave " res))) +;;; +;;; ;; (print "Got " res " from " shost ":" sport) +;;; (match res +;;; ((qduration . payload) +;;; ;; (print "Server pkt:" (alist-ref 'ipaddr servpkt) ":" (alist-ref 'port servpkt) +;;; ;; (if payload +;;; ;; "Success" "Fail")) +;;; (match payload +;;; ((host port stats) +;;; ;; (print "From " host ":" port " got stats: " stats) +;;; (if (and host port stats) +;;; (let ((url (conc host ":" port))) +;;; (hash-table-set! hostshash sid servpkt) +;;; ;; store based on host:port +;;; (hash-table-set! (area-hoststats acfg) sid stats)) +;;; (print "missing data from the server, not sure what that means!")) +;;; (set! numsrvs (+ numsrvs 1))) +;;; (#f +;;; (print "Removing pkt " sid " due to #f from server or failed ping") +;;; (delpkt pktsdir sid)) +;;; (else +;;; (print "Got ")(pp res)(print " from server ")(pp servpkt) " but response did not match (#f/#t . msg)"))) +;;; (else +;;; ;; here we delete the pkt - can't reach the server, remove it +;;; ;; however this logic is inadequate. we should mark the server as checked +;;; ;; and not good, if it happens a second time - then remove the pkt +;;; ;; or something similar. I.e. don't be too quick to assume the server is wedged or dead +;;; ;; could be it is simply too busy to reply +;;; (let ((bad-pings (hash-table-ref/default (area-health acfg) url 0))) +;;; (if (> bad-pings 1) ;; two bad pings - remove pkt +;;; (begin +;;; (print "INFO: " bad-pings " bad responses from " url ", deleting pkt " sid) +;;; (delpkt pktsdir sid)) +;;; (begin +;;; (print "INFO: " bad-pings " bad responses from " shost ":" sport " not deleting pkt yet") +;;; (hash-table-set! (area-health acfg) +;;; url +;;; (+ (hash-table-ref/default (area-health acfg) url 0) 1)) +;;; )) +;;; )))) +;;; ;; servpkt is not actually a pkt? +;;; (begin +;;; (print "Bad pkt " servpkt)))) +;;; all-pkts) +;;; (sdbg> "update-known-servers" "end" start-time #f #f " found " numsrvs +;;; " servers, pkts: " (map (lambda (p) +;;; (alist-ref 'Z p)) +;;; all-pkts)) +;;; numsrvs)) +;;; +;;; (defstruct srvstat +;;; (numfiles 0) ;; number of db files handled by this server - subtract 1 for the db being currently looked at +;;; (randnum #f) ;; tie breaker number assigned to by the server itself - applies only to the db under consideration +;;; (pkt #f)) ;; the server pkt +;;; +;;; ;;(define (srv->srvstat srvpkt) +;;; +;;; ;; Get the server best for given dbname and key +;;; ;; +;;; ;; NOTE: key is not currently used. The key points to the kind of query, this may be useful for directing read-only queries. +;;; ;; +;;; (define (get-best-server acfg dbname key) +;;; (let* (;; (servers (hash-table-values (area-hosts acfg))) +;;; (servers (area-hosts acfg)) +;;; (skeys (sort (hash-table-keys servers) string>=?)) ;; a stable listing +;;; (start-time (current-milliseconds)) +;;; (srvstats (make-hash-table)) ;; srvid => srvstat +;;; (url (conc (area-myaddr acfg) ":" (area-port acfg)))) +;;; ;; (print "scores for " dbname ": " (map (lambda (k)(cons k (calc-server-score acfg dbname k))) skeys)) +;;; (if (null? skeys) +;;; (if (> (update-known-servers acfg) 0) +;;; (get-best-server acfg dbname key) ;; some risk of infinite loop here, TODO add try counter +;;; (begin +;;; (print "ERROR: no server found!") ;; since this process is also a server this should never happen +;;; #f)) +;;; (begin +;;; ;; (print "in get-best-server with skeys=" skeys) +;;; (if (> (- (current-seconds) (area-last-srvup acfg)) 10) +;;; (begin +;;; (update-known-servers acfg) +;;; (sdbg> "get-best-server" "update-known-servers" start-time #f #f))) +;;; +;;; ;; for each server look at the list of dbfiles, total number of dbs being handled +;;; ;; and the rand number, save the best host +;;; ;; also do a delist-db for each server dbfile not used +;;; (let* ((best-server #f) +;;; (servers-to-delist (make-hash-table))) +;;; (for-each +;;; (lambda (srvid) +;;; (let* ((server (hash-table-ref/default servers srvid #f)) +;;; (stats (hash-table-ref/default (area-hoststats acfg) srvid '(())))) +;;; ;; (print "stats: " stats) +;;; (if server +;;; (let* ((dbweights (car stats)) +;;; (srvload (length (filter (lambda (x)(not (equal? dbname (car x)))) dbweights))) +;;; (dbrec (alist-ref dbname dbweights equal?)) ;; get the pair with fname . randscore +;;; (randnum (if dbrec +;;; dbrec ;; (cdr dbrec) +;;; 0))) +;;; (hash-table-set! srvstats srvid (make-srvstat numfiles: srvload randnum: randnum pkt: server)))))) +;;; skeys) +;;; +;;; (let* ((sorted (sort (hash-table-values srvstats) +;;; (lambda (a b) +;;; (let ((numfiles-a (srvstat-numfiles a)) +;;; (numfiles-b (srvstat-numfiles b)) +;;; (randnum-a (srvstat-randnum a)) +;;; (randnum-b (srvstat-randnum b))) +;;; (if (< numfiles-a numfiles-b) ;; Note, I don't think adding an offset works here. Goal was only move file handling to a different server if it has 2 less +;;; #t +;;; (if (and (equal? numfiles-a numfiles-b) +;;; (< randnum-a randnum-b)) +;;; #t +;;; #f)))))) +;;; (best (if (null? sorted) +;;; (begin +;;; (print "ERROR: should never be null due to self as server.") +;;; #f) +;;; (srvstat-pkt (car sorted))))) +;;; #;(print "SERVER(" url "): " dbname ": " (map (lambda (srv) +;;; (let ((p (srvstat-pkt srv))) +;;; (conc (alist-ref 'ipaddr p) ":" (alist-ref 'port p) +;;; "(" (srvstat-numfiles srv)","(srvstat-randnum srv)")"))) +;;; sorted)) +;;; best)))))) +;;; +;;; ;; send out an "I'm about to exit notice to all known servers" +;;; ;; +;;; (define (death-imminent acfg) +;;; '()) +;;; +;;; ;;====================================================================== +;;; ;; U L E X - T H E I N T E R E S T I N G S T U F F ! ! +;;; ;;====================================================================== +;;; +;;; ;; register a handler +;;; ;; NOTES: +;;; ;; dbinitsql is reserved for a list of sql statements for initializing the db +;;; ;; dbinitfn is reserved for a db init function, if exists called after dbinitsql +;;; ;; +;;; (define (register acfg key obj #!optional (ctype 'dbwrite)) +;;; (let ((ht (area-rtable acfg))) +;;; (if (hash-table-exists? ht key) +;;; (print "WARNING: redefinition of entry " key)) +;;; (hash-table-set! ht key (make-calldat obj: obj ctype: ctype)))) +;;; +;;; ;; usage: register-batch acfg '((key1 . sql1) (key2 . sql2) ... ) +;;; ;; NB// obj is often an sql query +;;; ;; +;;; (define (register-batch acfg ctype data) +;;; (let ((ht (area-rtable acfg))) +;;; (map (lambda (dat) +;;; (hash-table-set! ht (car dat)(make-calldat obj: (cdr dat) ctype: ctype))) +;;; data))) +;;; +;;; (define (initialize-area-calls-from-specfile area specfile) +;;; (let* ((callspec (with-input-from-file specfile read ))) +;;; (for-each (lambda (group) +;;; (register-batch +;;; area +;;; (car group) +;;; (cdr group))) +;;; callspec))) +;;; +;;; ;; get-rentry +;;; ;; +;;; (define (get-rentry acfg key) +;;; (hash-table-ref/default (area-rtable acfg) key #f)) +;;; +;;; (define (get-rsql acfg key) +;;; (let ((cdat (get-rentry acfg key))) +;;; (if cdat +;;; (calldat-obj cdat) +;;; #f))) +;;; +;;; +;;; +;;; ;; blocking call: +;;; ;; client server +;;; ;; ------ ------ +;;; ;; call() +;;; ;; send-message() +;;; ;; nmsg-send() +;;; ;; nmsg-receive() +;;; ;; nmsg-respond(ack,cookie) +;;; ;; ack, cookie +;;; ;; mbox-thread-wait(cookie) +;;; ;; nmsg-send(client,cookie,result) +;;; ;; nmsg-respond(ack) +;;; ;; return result +;;; ;; +;;; ;; reserved action: +;;; ;; 'immediate +;;; ;; 'dbinitsql +;;; ;; +;;; (define (call acfg dbname action params #!optional (count 0)) +;;; (let* ((call-start-time (current-milliseconds)) +;;; (srv (get-best-server acfg dbname action)) +;;; (post-get-start-time (current-milliseconds)) +;;; (rdat (hash-table-ref/default (area-rtable acfg) action #f)) +;;; (myid (trim-pktid (area-pktid acfg))) +;;; (srvid (trim-pktid (alist-ref 'Z srv))) +;;; (cookie (make-cookie myid))) +;;; (sdbg> "call" "get-best-server" call-start-time #f call-start-time " from: " myid " to server: " srvid " for " dbname " action: " action " params: " params " rdat: " rdat) +;;; (print "INFO: call to " (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv) " from " (area-myaddr acfg) ":" (area-port acfg) " for " dbname) +;;; (if (and srv rdat) ;; need both to dispatch a request +;;; (let* ((ripaddr (alist-ref 'ipaddr srv)) +;;; (rsrvid (alist-ref 'Z srv)) +;;; (rport (any->number (alist-ref 'port srv))) +;;; (res-full (if (and (equal? ripaddr (area-myaddr acfg)) +;;; (equal? rport (area-port acfg))) +;;; (request acfg ripaddr rport (area-pktid acfg) action cookie dbname params) +;;; (safe-call 'request ripaddr rport +;;; (area-myaddr acfg) +;;; (area-port acfg) +;;; #;(area-pktid acfg) +;;; rsrvid +;;; action cookie dbname params)))) +;;; ;; (print "res-full: " res-full) +;;; (match res-full +;;; ((response-ok response-msg rem ...) +;;; (let* ((send-message-time (current-milliseconds)) +;;; ;; (match res-full +;;; ;; ((response-ok response-msg) +;;; ;; (response-ok (car res-full)) +;;; ;; (response-msg (cadr res-full) +;;; ) +;;; ;; (res (take res-full 3))) ;; ctype == action, TODO: converge on one term <<=== what was this? BUG +;;; ;; (print "ulex:call: send-message took " (- send-message-time post-get-start-time) " ms params=" params) +;;; (sdbg> "call" "send-message" post-get-start-time #f call-start-time) +;;; (cond +;;; ((not response-ok) #f) +;;; ((member response-msg '("db read submitted" "db write submitted")) +;;; (let* ((cookie-id (cadddr res-full)) +;;; (mbox (make-mailbox)) +;;; (mbox-time (current-milliseconds))) +;;; (hash-table-set! (area-cookie2mbox acfg) cookie-id mbox) +;;; (let* ((mbox-timeout-secs 20) +;;; (mbox-timeout-result 'MBOX_TIMEOUT) +;;; (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) +;;; (mbox-receive-time (current-milliseconds))) +;;; (hash-table-delete! (area-cookie2mbox acfg) cookie-id) +;;; (sdbg> "call" "mailbox-receive" mbox-time #f call-start-time " from: " myid " to server: " srvid " for " dbname) +;;; ;; (print "ulex:call mailbox-receive took " (- mbox-receive-time mbox-time) "ms params=" params) +;;; res))) +;;; (else +;;; (print "Unhandled response \""response-msg"\"") +;;; #f)) +;;; ;; depending on what action (i.e. ctype) is we will block here waiting for +;;; ;; all the data (mechanism to be determined) +;;; ;; +;;; ;; if res is a "working on it" then wait +;;; ;; wait for result +;;; ;; mailbox thread wait on +;;; +;;; ;; if res is a "can't help you" then try a different server +;;; ;; if res is a "ack" (e.g. for one-shot requests) then return res +;;; )) +;;; (else +;;; (if (< count 10) +;;; (let* ((url (conc (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv)))) +;;; (thread-sleep! 1) +;;; (print "ERROR: Bad result from " url ", dbname: " dbname ", action: " action ", params: " params ". Trying again in 1 second.") +;;; (call acfg dbname action params (+ count 1))) +;;; (begin +;;; (error (conc "ERROR: " count " tries, still have improper response res-full=" res-full))))))) +;;; (begin +;;; (if (not rdat) +;;; (print "ERROR: action " action " not registered.") +;;; (if (< count 10) +;;; (begin +;;; (thread-sleep! 1) +;;; (area-hosts-set! acfg (make-hash-table)) ;; clear out all known hosts +;;; (print "ERROR: no server found, srv=" srv ", trying again in 1 seconds") +;;; (call acfg dbname action params (+ count 1))) +;;; (begin +;;; (error (conc "ERROR: no server found after 10 tries, srv=" srv ", giving up.")) +;;; #;(error "No server available")))))))) +;;; +;;; +;;; ;;====================================================================== +;;; ;; U T I L I T I E S +;;; ;;====================================================================== +;;; +;;; ;; get a signature for identifing this process +;;; ;; +;;; (define (get-process-signature) +;;; (cons (get-host-name)(current-process-id))) +;;; +;;; ;;====================================================================== +;;; ;; S Y S T E M S T U F F +;;; ;;====================================================================== +;;; +;;; ;; get normalized cpu load by reading from /proc/loadavg and +;;; ;; /proc/cpuinfo return all three values and the number of real cpus +;;; ;; and the number of threads returns alist '((adj-cpu-load +;;; ;; . normalized-proc-load) ... etc. keys: adj-proc-load, +;;; ;; adj-core-load, 1m-load, 5m-load, 15m-load +;;; ;; +;;; (define (get-normalized-cpu-load) +;;; (let ((res (get-normalized-cpu-load-raw)) +;;; (default `((adj-proc-load . 2) ;; there is no right answer +;;; (adj-core-load . 2) +;;; (1m-load . 2) +;;; (5m-load . 0) ;; causes a large delta - thus causing default of throttling if stuff goes wrong +;;; (15m-load . 0) +;;; (proc . 1) +;;; (core . 1) +;;; (phys . 1) +;;; (error . #t)))) +;;; (cond +;;; ((and (list? res) +;;; (> (length res) 2)) +;;; res) +;;; ((eq? res #f) default) ;; add messages? +;;; ((eq? res #f) default) ;; this would be the #eof +;;; (else default)))) +;;; +;;; (define (get-normalized-cpu-load-raw) +;;; (let* ((actual-host (get-host-name))) ;; #f is localhost +;;; (let ((data (append +;;; (with-input-from-file "/proc/loadavg" read-lines) +;;; (with-input-from-file "/proc/cpuinfo" read-lines) +;;; (list "end"))) +;;; (load-rx (regexp "^([\\d\\.]+)\\s+([\\d\\.]+)\\s+([\\d\\.]+)\\s+.*$")) +;;; (proc-rx (regexp "^processor\\s+:\\s+(\\d+)\\s*$")) +;;; (core-rx (regexp "^core id\\s+:\\s+(\\d+)\\s*$")) +;;; (phys-rx (regexp "^physical id\\s+:\\s+(\\d+)\\s*$")) +;;; (max-num (lambda (p n)(max (string->number p) n)))) +;;; ;; (print "data=" data) +;;; (if (null? data) ;; something went wrong +;;; #f +;;; (let loop ((hed (car data)) +;;; (tal (cdr data)) +;;; (loads #f) +;;; (proc-num 0) ;; processor includes threads +;;; (phys-num 0) ;; physical chip on motherboard +;;; (core-num 0)) ;; core +;;; ;; (print hed ", " loads ", " proc-num ", " phys-num ", " core-num) +;;; (if (null? tal) ;; have all our data, calculate normalized load and return result +;;; (let* ((act-proc (+ proc-num 1)) +;;; (act-phys (+ phys-num 1)) +;;; (act-core (+ core-num 1)) +;;; (adj-proc-load (/ (car loads) act-proc)) +;;; (adj-core-load (/ (car loads) act-core)) +;;; (result +;;; (append (list (cons 'adj-proc-load adj-proc-load) +;;; (cons 'adj-core-load adj-core-load)) +;;; (list (cons '1m-load (car loads)) +;;; (cons '5m-load (cadr loads)) +;;; (cons '15m-load (caddr loads))) +;;; (list (cons 'proc act-proc) +;;; (cons 'core act-core) +;;; (cons 'phys act-phys))))) +;;; result) +;;; (regex-case +;;; hed +;;; (load-rx ( x l1 l5 l15 ) (loop (car tal)(cdr tal)(map string->number (list l1 l5 l15)) proc-num phys-num core-num)) +;;; (proc-rx ( x p ) (loop (car tal)(cdr tal) loads (max-num p proc-num) phys-num core-num)) +;;; (phys-rx ( x p ) (loop (car tal)(cdr tal) loads proc-num (max-num p phys-num) core-num)) +;;; (core-rx ( x c ) (loop (car tal)(cdr tal) loads proc-num phys-num (max-num c core-num))) +;;; (else +;;; (begin +;;; ;; (print "NO MATCH: " hed) +;;; (loop (car tal)(cdr tal) loads proc-num phys-num core-num)))))))))) +;;; +;;; (define (get-host-stats acfg) +;;; (let ((stats-hash (area-stats acfg))) +;;; ;; use this opportunity to remove references to dbfiles which have not been accessed in a while +;;; (for-each +;;; (lambda (dbname) +;;; (let* ((stats (hash-table-ref stats-hash dbname)) +;;; (last-access (stat-when stats))) +;;; (if (and (> last-access 0) ;; if zero then there has been no access +;;; (> (- (current-seconds) last-access) 10)) ;; not used in ten seconds +;;; (begin +;;; (print "Removing " dbname " from stats list") +;;; (hash-table-delete! stats-hash dbname) ;; remove from stats hash +;;; (stat-dbs-set! stats (hash-table-keys stats)))))) +;;; (hash-table-keys stats-hash)) +;;; +;;; `(,(hash-table->alist (area-dbs acfg)) ;; dbname => randnum +;;; ,(map (lambda (dbname) ;; dbname is the db name +;;; (cons dbname (stat-when (hash-table-ref stats-hash dbname)))) +;;; (hash-table-keys stats-hash)) +;;; (cpuload . ,(get-normalized-cpu-load))))) +;;; #;(stats . ,(map (lambda (k) ;; create an alist from the stats data +;;; (cons k (stat->alist (hash-table-ref (area-stats acfg) k)))) +;;; (hash-table-keys (area-stats acfg)))) +;;; +;;; #;(trace +;;; ;; assv +;;; ;; cdr +;;; ;; caar +;;; ;; ;; cdr +;;; ;; call +;;; ;; finalize-all-db-handles +;;; ;; get-all-server-pkts +;;; ;; get-normalized-cpu-load +;;; ;; get-normalized-cpu-load-raw +;;; ;; launch +;;; ;; nmsg-send +;;; ;; process-db-queries +;;; ;; receive-message +;;; ;; std-peer-handler +;;; ;; update-known-servers +;;; ;; work-queue-processor +;;; ) +;;; +;;; ;;====================================================================== +;;; ;; netutil +;;; ;; move this back to ulex-netutil.scm someday? +;;; ;;====================================================================== +;;; +;;; ;; #include +;;; ;; #include +;;; ;; #include +;;; ;; #include +;;; +;;; (foreign-declare "#include \"sys/types.h\"") +;;; (foreign-declare "#include \"sys/socket.h\"") +;;; (foreign-declare "#include \"ifaddrs.h\"") +;;; (foreign-declare "#include \"arpa/inet.h\"") +;;; +;;; ;; get IP addresses from ALL interfaces +;;; (define get-all-ips +;;; (foreign-safe-lambda* scheme-object () +;;; " +;;; +;;; // from https://stackoverflow.com/questions/17909401/linux-c-get-default-interfaces-ip-address : +;;; +;;; +;;; C_word lst = C_SCHEME_END_OF_LIST, len, str, *a; +;;; // struct ifaddrs *ifa, *i; +;;; // struct sockaddr *sa; +;;; +;;; struct ifaddrs * ifAddrStruct = NULL; +;;; struct ifaddrs * ifa = NULL; +;;; void * tmpAddrPtr = NULL; +;;; +;;; if ( getifaddrs(&ifAddrStruct) != 0) +;;; C_return(C_SCHEME_FALSE); +;;; +;;; // for (i = ifa; i != NULL; i = i->ifa_next) { +;;; for (ifa = ifAddrStruct; ifa != NULL; ifa = ifa->ifa_next) { +;;; if (ifa->ifa_addr->sa_family==AF_INET) { // Check it is +;;; // a valid IPv4 address +;;; tmpAddrPtr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr; +;;; char addressBuffer[INET_ADDRSTRLEN]; +;;; inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN); +;;; // printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer); +;;; len = strlen(addressBuffer); +;;; a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len)); +;;; str = C_string(&a, len, addressBuffer); +;;; lst = C_a_pair(&a, str, lst); +;;; } +;;; +;;; // else if (ifa->ifa_addr->sa_family==AF_INET6) { // Check it is +;;; // // a valid IPv6 address +;;; // tmpAddrPtr = &((struct sockaddr_in6 *)ifa->ifa_addr)->sin6_addr; +;;; // char addressBuffer[INET6_ADDRSTRLEN]; +;;; // inet_ntop(AF_INET6, tmpAddrPtr, addressBuffer, INET6_ADDRSTRLEN); +;;; //// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer); +;;; // len = strlen(addressBuffer); +;;; // a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len)); +;;; // str = C_string(&a, len, addressBuffer); +;;; // lst = C_a_pair(&a, str, lst); +;;; // } +;;; +;;; // else { +;;; // printf(\" not an IPv4 address\\n\"); +;;; // } +;;; +;;; } +;;; +;;; freeifaddrs(ifa); +;;; C_return(lst); +;;; +;;; ")) +;;; +;;; ;; Change this to bias for addresses with a reasonable broadcast value? +;;; ;; +;;; (define (ip-pref-less? a b) +;;; (let* ((rate (lambda (ipstr) +;;; (regex-case ipstr +;;; ( "^127\\." _ 0 ) +;;; ( "^(10\\.0|192\\.168\\.)\\..*" _ 1 ) +;;; ( else 2 ) )))) +;;; (< (rate a) (rate b)))) +;;; +;;; +;;; (define (get-my-best-address) +;;; (let ((all-my-addresses (get-all-ips)) +;;; ;;(all-my-addresses-old (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name))))) +;;; ) +;;; (cond +;;; ((null? all-my-addresses) +;;; (get-host-name)) ;; no interfaces? +;;; ((eq? (length all-my-addresses) 1) +;;; (car all-my-addresses)) ;; only one to choose from, just go with it +;;; +;;; (else +;;; (car (sort all-my-addresses ip-pref-less?))) +;;; ;; (else +;;; ;; (ip->string (car (filter (lambda (x) ;; take any but 127. +;;; ;; (not (eq? (u8vector-ref x 0) 127))) +;;; ;; all-my-addresses)))) +;;; +;;; ))) +;;; +;;; (define (get-all-ips-sorted) +;;; (sort (get-all-ips) ip-pref-less?)) +;;; +;;; + Index: ulex/ulex.scm ================================================================== --- ulex/ulex.scm +++ ulex/ulex.scm @@ -28,569 +28,76 @@ (use mailbox) (module ulex * -(import scheme posix chicken data-structures ports extras files mailbox) -(import srfi-18 pkts matchable regex - typed-records srfi-69 srfi-1 - srfi-4 regex-case - (prefix sqlite3 sqlite3:) - foreign - tcp6 - ;; ulex-netutil - hostinfo - ) - -;; make it a global? Well, it is local to area module - -(define *captain-pktspec* - `((captain (host . h) - (port . p) - (pid . i) - (ipaddr . a) - ) - #;(data (hostname . h) ;; sender hostname - (port . p) ;; sender port - (ipaddr . a) ;; sender ip - (hostkey . k) ;; sending host key - store info at server under this key - (servkey . s) ;; server key - this needs to match at server end or reject the msg - (format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json - (data . d) ;; base64 encoded slln data - ))) - -;; struct for keeping track of our world - -(defstruct udat - ;; captain info - (captain-address #f) - (captain-host #f) - (captain-port #f) - (captain-pid #f) - (captain-lease 0) ;; time (unix epoc) seconds when the lease is up - (ulex-dir (conc (get-environment-variable "HOME") "/.ulex")) - (cpkts-dir (conc (get-environment-variable "HOME") "/.ulex/pkts")) - (cpkt-spec *captain-pktspec*) - ;; this processes info - (my-cpkt-key #f) ;; put Z card here when I create a pkt for myself as captain - (my-address #f) - (my-hostname #f) - (my-port #f) - (my-pid (current-process-id)) - (my-dbs '()) - ;; server and handler thread - (serv-listener #f) ;; this processes server info - (handler-thread #f) - (mboxes (make-hash-table)) ;; key => mbox - ;; other servers - (peers (make-hash-table)) ;; host-port => peer record - (dbowners (make-hash-table)) ;; dbfile => host-port - (handlers (make-hash-table)) ;; dbfile => proc - ;; (outgoing-conns (make-hash-table)) ;; host:port -> conn - (work-queue (make-queue)) ;; most stuff goes here - ;; (fast-queue (make-queue)) ;; super quick stuff goes here (e.g. ping) - (busy #f) ;; is either of the queues busy, use to switch between queuing tasks or doing immediately - ;; app info - (appname #f) - (dbtypes (make-hash-table)) ;; this should be an alist but hash is easier. dbtype => [ initproc syncproc ] - ;; cookies - (cnum 0) ;; cookie num - ) - -;;====================================================================== -;; NEW APPROACH -;;====================================================================== - -;; start-server-find-port ;; gotta have a server port ready from the very begining - -;; udata - all the connection info, captain, server, ulex db etc. MUST BE PASSED IN -;; dbpath - full path and filename of the db to talk to or a symbol naming the db? -;; callname - the remote call to execute -;; params - parameters to pass to the remote call -;; -(define (remote-call udata dbpath dbtype callname . params) - (start-server-find-port udata) ;; ensure we have a local server - (find-or-setup-captain udata) - ;; look at connect, process-request, send, send-receive - (let-values (((cookie-key host-port)(get-db-owner udata dbpath dbtype))) - (send-receive udata host-port callname cookie-key params))) - -;;====================================================================== -;; KEY FUNCTIONS - THESE ARE TOO BE EXPOSED AND USED -;;====================================================================== - -;; connection setup and management functions - -;; This is the basic setup command. Must always be -;; called before connecting to a db using connect. -;; -;; find or become the captain -;; setup and return a ulex object -;; -(define (find-or-setup-captain udata) - ;; see if we already have a captain and if the lease is ok - (if (and (udat-captain-address udata) - (udat-captain-port udata) - (< (current-seconds) (udat-captain-lease udata))) - udata - (let* ((cpkts (get-all-captain-pkts udata)) ;; read captain pkts - (captn (get-winning-pkt cpkts))) - (if captn - (let* ((port (alist-ref 'port captn)) - (host (alist-ref 'host captn)) - (ipaddr (alist-ref 'ipaddr captn)) - (pid (alist-ref 'pid captn)) - (Z (alist-ref 'Z captn))) - (udat-captain-address-set! udata ipaddr) - (udat-captain-host-set! udata host) - (udat-captain-port-set! udata port) - (udat-captain-pid-set! udata pid) - (udat-captain-lease-set! udata (+ (current-seconds) 10)) - (let-values (((success pingtime)(ping udata (conc ipaddr ":" port)))) - (if success - udata - (begin - (print "Found unreachable captain at " ipaddr ":" port ", removing pkt") - (remove-captain-pkt udata captn) - (find-or-setup-captain udata)))) - (begin - (setup-as-captain udata) ;; this saves the thread to captain-thread and starts the thread - (find-or-setup-captain udata))))))) - -;; connect to a specific dbfile -;; - if already connected - return the dbowner host-port -;; - ask the captain who to talk to for this db -;; - put the entry in the dbowners hash as dbfile => host-port -;; -(define (connect udata dbfname dbtype) - (or (hash-table-ref/default (udat-dbowners udata) dbfname #f) - (let-values (((success dbowner-host-port)(get-db-owner udata dbfname dbtype))) - (if success - (begin - ;; just clobber the record, this is the new data no matter what - (hash-table-set! (udat-dbowners udata) dbfname dbowner-host-port) - dbowner-host-port) - #f)))) - -;; returns: success pingtime -;; -;; NOTE: causes the callee to store the info on this host along with the dbs this host currently owns -;; -(define (ping udata host-port) - (let* ((start (current-milliseconds)) - (cookie (make-cookie udata)) - (dbs (udat-my-dbs udata)) - (msg (string-intersperse dbs " ")) - (res (send udata host-port 'ping cookie msg retval: #t)) - (delta (- (current-milliseconds) start))) - (values (equal? res cookie) delta))) - -;; returns: success pingtime -;; -;; NOTE: causes all references to this worker to be wiped out in the -;; callee (ususally the captain) -;; -(define (goodbye-ping udata host-port) - (let* ((start (current-milliseconds)) - (cookie (make-cookie udata)) - (dbs (udat-my-dbs udata)) - (res (send udata host-port 'goodbye cookie "nomsg" retval: #t)) - (delta (- (current-milliseconds) start))) - (values (equal? res cookie) delta))) - -(define (goodbye-captain udata) - (let* ((host-port (udat-captain-host-port udata))) - (if host-port - (goodbye-ping udata host-port) - (values #f -1)))) - -(define (get-db-owner udata dbname dbtype) - (let* ((host-port (udat-captain-host-port udata))) - (if host-port - (let* ((cookie (make-cookie udata)) - (msg #f) ;; (conc dbname " " dbtype)) - (params `(,dbname ,dbtype)) - (res (send udata host-port 'db-owner cookie msg - params: params retval: #t))) - (match (string-split res) - ((retcookie owner-host-port) - (values (equal? retcookie cookie) owner-host-port)))) - (values #f -1)))) - -;; called in ulex-handler to dispatch work, called on the workers side -;; calls (proc params data) -;; returns result with cookie -;; -;; pdat is the info of the caller, used to send the result data -;; prockey is key into udat-handlers hash dereferencing a proc -;; procparam is a first param handed to proc - often to do further derefrencing -;; NOTE: params is intended to be a list of strings, encoding on data -;; is up to the user but data must be a single line -;; -(define (process-request udata pdat dbname cookie prockey procparam data) - (let* ((dbrec (ulex-open-db udata dbname)) ;; this will be a dbconn record, looks for in udata first - (proc (hash-table-ref udata prockey))) - (let* ((result (proc dbrec procparam data))) - result))) - -;; remote-request - send to remote to process in process-request -;; uconn comes from a call to connect and can be used instead of calling connect again -;; uconn is the host-port to call -;; we send dbname to the worker so they know which file to open -;; data must be a string with no newlines, it will be handed to the proc -;; at the remote site unchanged. It is up to the user to encode/decode it's contents -;; -;; rtype: immediate, read-only, normal, low-priority -;; -(define (remote-request udata uconn rtype dbname prockey procparam data) - (let* ((cookie (make-cookie udata))) - (send-receive udata uconn rtype cookie data `(,prockey procparam)))) - -(define (ulex-open-db udata dbname) - #f) - - -;;====================================================================== -;; Ulex db -;; -;; - track who is captain, lease expire time -;; - track who owns what db, lease -;; -;;====================================================================== - -;; -;; -(define (ulex-dbfname) - (let ((dbdir (conc (get-environment-variable "HOME") "/.ulex"))) - (if (not (file-exists? dbdir)) - (create-directory dbdir #t)) - (conc dbdir "/network.db"))) - -;; always goes in ~/.ulex/network.db -;; role is captain, adjutant, node -;; -(define (ulexdb-setup) - (let* ((dbfname (ulex-dbfname)) - (have-db (file-exists? dbfname)) - (db (sqlite3:open-database dbfname))) - (sqlite3:set-busy-handler! db (sqlite3:make-busy-timeout 136000)) - (sqlite3:execute db "PRAGMA synchronous = 0;") - (if (not have-db) - (sqlite3:with-transaction - db - (lambda () - (for-each - (lambda (stmt) - (if stmt (sqlite3:execute db stmt))) - `("CREATE TABLE IF NOT EXISTS nodes - (id INTEGER PRIMARY KEY, - role TEXT NOT NULL, - host TEXT NOT NULL, - port TEXT NOT NULL, - ipadr TEXT NOT NULL, - pid INTEGER NOT NULL, - zcard TEXT NOT NULL, - regtime INTEGER DEFAULT (strftime('%s','now')), - lease_thru INTEGER DEFAULT (strftime('%s','now')), - last_update INTEGER DEFAULT (strftime('%s','now')));" - "CREATE TRIGGER IF NOT EXISTS update_nodes_trigger AFTER UPDATE ON nodes - FOR EACH ROW - BEGIN - UPDATE nodes SET last_update=(strftime('%s','now')) - WHERE id=old.id; - END;" - "CREATE TABLE IF NOT EXISTS dbs - (id INTEGER PRIMARY KEY, - dbname TEXT NOT NULL, - dbfile TEXT NOT NULL, - dbtype TEXT NOT NULL, - host_port TEXT NOT NULL, - regtime INTEGER DEFAULT (strftime('%s','now')), - lease_thru INTEGER DEFAULT (strftime('%s','now')), - last_update INTEGER DEFAULT (strftime('%s','now')));" - "CREATE TRIGGER IF NOT EXISTS update_dbs_trigger AFTER UPDATE ON dbs - FOR EACH ROW - BEGIN - UPDATE dbs SET last_update=(strftime('%s','now')) - WHERE id=old.id; - END;"))))) - db)) - -(define (get-host-port-lease db dbfname) - (sqlite3:fold-row - (lambda (rem host-port lease-thru) - (list host-port lease-thru)) - #f db "SELECT host_port,lease_thru FROM dbs WHERE dbfile = ?" dbfname)) - -(define (register-captain db host ipadr port pid zcard #!key (lease 20)) - (let* ((dbfname (ulex-dbfname)) - (host-port (conc host ":" port))) - (sqlite3:with-transaction - db - (lambda () - (match (get-host-port-lease db dbfname) - ((host-port lease-thru) - (if (> (current-seconds) lease-thru) - (begin - (sqlite3:execute db "UPDATE dbs SET host_port=?,lease_thru=? WHERE dbname=?" - (conc host ":" port) - (+ (current-seconds) lease) - dbfname) - #t) - #f)) - (#f (sqlite3:execute db "INSERT INTO dbs (dbname,dbfile,dbtype,host_port,lease_thru) VALUES (?,?,?,?,?)" - "captain" dbfname "captain" host-port (+ (current-seconds) lease))) - (else (print "ERROR: Unrecognised result from fold-row") - (exit 1))))))) - -;;====================================================================== -;; network utilities -;;====================================================================== - -(define (rate-ip ipaddr) - (regex-case ipaddr - ( "^127\\..*" _ 0 ) - ( "^(10\\.0|192\\.168)\\..*" _ 1 ) - ( else 2 ) )) - -;; Change this to bias for addresses with a reasonable broadcast value? -;; -(define (ip-pref-less? a b) - (> (rate-ip a) (rate-ip b))) - - -(define (get-my-best-address) - (let ((all-my-addresses (get-all-ips)) - ;;(all-my-addresses-old (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name))))) - ) - (cond - ((null? all-my-addresses) - (get-host-name)) ;; no interfaces? - ((eq? (length all-my-addresses) 1) - (car all-my-addresses)) ;; only one to choose from, just go with it - - (else - (car (sort all-my-addresses ip-pref-less?))) - ;; (else - ;; (ip->string (car (filter (lambda (x) ;; take any but 127. - ;; (not (eq? (u8vector-ref x 0) 127))) - ;; all-my-addresses)))) - - ))) - -(define (get-all-ips-sorted) - (sort (get-all-ips) ip-pref-less?)) - -(define (get-all-ips) - (map ip->string (vector->list - (hostinfo-addresses - (host-information (current-hostname)))))) - -(define (udat-my-host-port udata) - (if (and (udat-my-address udata)(udat-my-port udata)) - (conc (udat-my-address udata) ":" (udat-my-port udata)) - #f)) - -(define (udat-captain-host-port udata) - (if (and (udat-captain-address udata)(udat-captain-port udata)) - (conc (udat-captain-address udata) ":" (udat-captain-port udata)) - #f)) - -(define (udat-get-peer udata host-port) - (hash-table-ref/default (udat-peers udata) host-port #f)) - -;; struct for keeping track of others we are talking to - -(defstruct peer - (addr-port #f) - (hostname #f) - (pid #f) - ;; (inp #f) - ;; (oup #f) - (dbs '()) ;; list of databases this peer is currently handling - ) - -(defstruct work - (peer-dat #f) - (handlerkey #f) - (qrykey #f) - (data #f) - (start (current-milliseconds))) - -#;(defstruct dbowner - (pdat #f) - (last-update (current-seconds))) - -;;====================================================================== -;; Captain functions -;;====================================================================== - -;; NB// This needs to be started in a thread -;; -;; setup to be a captain -;; - local server MUST be started already -;; - create pkt -;; - start server port handler -;; -(define (setup-as-captain udata) - (if (create-captain-pkt udata) - (let* ((my-addr (udat-my-address udata)) - (my-port (udat-my-port udata)) - (th (make-thread (lambda () - (ulex-handler-loop udata)) "Captain handler"))) - (udat-handler-thread-set! udata th) - (udat-captain-address-set! udata my-addr) - (udat-captain-port-set! udata my-port) - (thread-start! th)) - (begin - (print "ERROR: failed to create captain pkt") - #f))) - -;; given a pkts dir read -;; -(define (get-all-captain-pkts udata) - (let* ((pktsdir (let ((d (udat-cpkts-dir udata))) - (if (file-exists? d) - d - (begin - (create-directory d #t) - d)))) - (all-pkt-files (glob (conc pktsdir "/*.pkt"))) - (pkt-spec (udat-cpkt-spec udata))) - (map (lambda (pkt-file) - (read-pkt->alist pkt-file pktspec: pkt-spec)) - all-pkt-files))) - -;; sort by D then Z, return one, choose the oldest then -;; differentiate if needed using the Z key -;;l -(define (get-winning-pkt pkts) - (if (null? pkts) - #f - (car (sort pkts (lambda (a b) - (let ((ad (string->number (alist-ref 'D a))) - (bd (string->number (alist-ref 'D b)))) - (if (eq? a b) - (let ((az (alist-ref 'Z a)) - (bz (alist-ref 'Z b))) - (string>=? az bz)) - (> ad bd)))))))) - -;; put the host, ip, port and pid into a pkt in -;; the captain pkts dir -;; - assumes user has already fired up a server -;; which will be in the udata struct -;; -(define (create-captain-pkt udata) - (if (not (udat-serv-listener udata)) - (begin - (print "ERROR: create-captain-pkt called with out a listener") - #f) - (let* ((pktdat `((port . ,(udat-my-port udata)) - (host . ,(udat-my-hostname udata)) - (ipaddr . ,(udat-my-address udata)) - (pid . ,(udat-my-pid udata)))) - (pktdir (udat-cpkts-dir udata)) - (pktspec (udat-cpkt-spec udata)) - ) - (udat-my-cpkt-key-set! - udata - (write-alist->pkt - pktdir - pktdat - pktspec: pktspec - ptype: 'captain)) - (udat-my-cpkt-key udata)))) - -;; remove pkt associated with captn (the Z key .pkt) -;; -(define (remove-captain-pkt udata captn) - (let ((Z (alist-ref 'Z captn)) - (cpktdir (udat-cpkts-dir udata))) - (delete-file* (conc cpktdir "/" Z ".pkt")))) - -;; call all known peers and tell them to delete their info on the captain -;; thus forcing them to re-read pkts and connect to a new captain -;; call this when the captain needs to exit and if an older captain is -;; detected. Due to delays in sending file meta data in NFS multiple -;; captains can be initiated in a "Storm of Captains", book soon to be -;; on Amazon -;; -(define (drop-captain udata) - (let* ((peers (hash-table-keys (udat-peers udata))) - (cookie (make-cookie udata))) - (for-each - (lambda (host-port) - (send udata host-port 'dropcaptain cookie "nomsg" retval: #t)) - peers))) - -;;====================================================================== -;; server primitives -;;====================================================================== - -(define (make-cookie udata) - (let ((newcnum (+ (udat-cnum udata) 1))) - (udat-cnum-set! udata newcnum) - (conc (udat-my-address udata) ":" - (udat-my-port udata) "-" - (udat-my-pid udata) "-" - newcnum))) - -;; create a tcp listener and return a populated udat struct with -;; my port, address, hostname, pid etc. -;; return #f if fail to find a port to allocate. -;; -;; if udata-in is #f create the record -;; if there is already a serv-listener return the udata -;; -(define (start-server-find-port udata-in #!optional (port 4242)) - (let ((udata (or udata-in (make-udat)))) - (if (udat-serv-listener udata) ;; TODO - add check that the listener is alive and ready? - udata - (handle-exceptions - exn - (if (< port 65535) - (start-server-find-port udata (+ port 1)) - #f) - (connect-server udata port))))) - -(define (connect-server udata port) - ;; (tcp-listener-socket LISTENER)(socket-name so) - ;; sockaddr-address, sockaddr-port, sockaddr->string - (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]]) - (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname))) - (udat-my-address-set! udata addr) - (udat-my-port-set! udata port) - (udat-my-hostname-set! udata (get-host-name)) - (udat-serv-listener-set! udata tlsn) - udata)) - -(define (get-peer-dat udata host-port #!optional (hostname #f)(pid #f)) - (let* ((pdat (or (udat-get-peer udata host-port) - (handle-exceptions ;; ERROR - MAKE THIS EXCEPTION HANDLER MORE SPECIFIC - exn - #f - (let ((npdat (make-peer addr-port: host-port))) - (if hostname (peer-hostname-set! npdat hostname)) - (if pid (peer-pid-set! npdat pid)) - npdat))))) - pdat)) +(import scheme + + chicken + data-structures + files + foreign + hostinfo + mailbox + matchable + ports extras + posix + regex + regex-case + srfi-1 + srfi-18 + srfi-4 + srfi-69 + tcp6 + typed-records + ) + +;; udat struct, used by both caller and callee +;; instantiated as uconn by convention +;; +(defstruct udat + ;; the listener side + (host-port #f) + (socket #f) + ;; the peers + (peers (make-hash-table)) ;; host:port->peer + ;; work handling + (work-queue (make-queue)) + (cnum 0) ;; cookie number + (mboxes (make-hash-table)) + ) + +;; struct for keeping track of others we are talking to +;; +(defstruct pdat + (host-port #f) + (conns '()) ;; list of pcon structs, pop one off when calling the peer + ) + +;; struct for peer connections, keep track of expiration etc. +;; +(defstruct pcon + (inp #f) + (oup #f) + (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59) + (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes + ) ;; send structured data to recipient ;; ;; NOTE: qrykey is what was called the "cookie" previously ;; ;; retval tells send to expect and wait for return data (one line) and return it or time out ;; this is for ping where we don't want to necessarily have set up our own server yet. ;; -(define (send udata host-port handler qrykey data +(define (send udata host-port cmd qrykey data #!key (hostname #f)(pid #f)(params '())(retval #f)) (let* ((my-host-port (udat-my-host-port udata)) (isme (equal? host-port my-host-port)) ;; am I calling ;; myself? (dat (list - handler ;; " " + cmd ;; " " my-host-port ;; " " (udat-my-pid udata) ;; " " qrykey params ;;(if (null? params) "" (conc " " ;;(string-intersperse params " "))) @@ -604,11 +111,11 @@ exn #f (let-values (((inp oup)(tcp-connect host-port))) ;; ;; CONTROL LINE: - ;; handlerkey host:port pid qrykey params ... + ;; cmdkey host:port pid qrykey params ... ;; (let ((res (if (and inp oup) (let* () (if my-host-port @@ -627,20 +134,109 @@ ) #f))) ;; #f means failed to connect and send (close-input-port inp) (close-output-port oup) res)))))) + +;;====================================================================== +;; listener +;;====================================================================== +;; create a tcp listener and return a populated udat struct with +;; my port, address, hostname, pid etc. +;; return #f if fail to find a port to allocate. +;; +;; if udata-in is #f create the record +;; if there is already a serv-listener return the udata +;; +(define (setup-listener uconn #!optional (port 4242)) + (handle-exceptions + exn + (if (< port 65535) + (setup-listener uconn (+ port 1)) + #f) + (connect-listener uconn port))) + +(define (connect-listener uconn port) + ;; (tcp-listener-socket LISTENER)(socket-name so) + ;; sockaddr-address, sockaddr-port, sockaddr->string + (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]]) + (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname))) + (udat-my-address-set! uconn addr) + (udat-my-port-set! uconn port) + (udat-my-hostname-set! uconn (get-host-name)) + (udat-serv-listener-set! uconn tlsn) + uconn)) + +(define (udat-my-host-port uconn) + (if (and (udat-my-address uconn)(udat-my-port uconn)) + (conc (udat-my-address uconn) ":" (udat-my-port uconn)) + #f)) + +;;====================================================================== +;; peers and connections +;;====================================================================== + +;; send structured data to recipient +;; +;; NOTE: qrykey is what was called the "cookie" previously +;; +;; retval tells send to expect and wait for return data (one line) and return it or time out +;; this is for ping where we don't want to necessarily have set up our own server yet. +;; +;; NOTE: see below for beginnings of code to allow re-use of tcp connections +;; - I believe (without substantial evidence) that re-using connections will +;; be beneficial ... +;; +(define (send udata host-port cmd qrykey data + #!key (hostname #f)(pid #f)(params '())(retval #f)) + (let* ((my-host-port (udat-host-port udata)) + (isme (equal? host-port my-host-port)) ;; am I calling + ;; myself? + (dat (list + cmd ;; " " + my-host-port ;; " " + qrykey + params + ))) + (if isme + (ulex-handler udata dat data) + (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC + exn + #f + (let-values (((inp oup)(tcp-connect host-port))) + ;; + ;; CONTROL LINE: + ;; cmdkey host:port pid qrykey params ... + ;; + (let ((res (if (and inp oup) + (if my-host-port + (begin + (write dat oup) + (write data oup) ;; send as sexpr + ;; (print "Sent dat: " dat " data: " data) + (if retval + (read inp) + #t)) + (begin + (print "ERROR: send called but no receiver has been setup. Please call setup first!") + #f)) + ;; NOTE: DO NOT BE TEMPTED TO LOOK AT ANY DATA ON INP HERE! + ;; (there is a listener for handling that) + #f))) ;; #f means failed to connect and send + (close-input-port inp) + (close-output-port oup) + res)))))) ;; send a request to the given host-port and register a mailbox in udata ;; wait for the mailbox data and return it ;; -(define (send-receive udata host-port handler qrykey data #!key (hostname #f)(pid #f)(params '())(timeout 20)) +(define (send-receive udata host-port cmd qrykey data #!key (hostname #f)(pid #f)(params '())(timeout 20)) (let ((mbox (make-mailbox)) (mbox-time (current-milliseconds)) (mboxes (udat-mboxes udata))) (hash-table-set! mboxes qrykey mbox) - (if (send udata host-port handler qrykey data hostname: hostname pid: pid params: params) + (if (send udata host-port cmd qrykey data hostname: hostname pid: pid params: params) (let* ((mbox-timeout-secs timeout) (mbox-timeout-result 'MBOX_TIMEOUT) (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) (mbox-receive-time (current-milliseconds))) (hash-table-delete! mboxes qrykey) @@ -651,13 +247,13 @@ ;; (define (ulex-handler udata controldat data) (print "controldat: " controldat " data: " data) (match controldat ;; (string-split controldat) - ((handlerkey host-port pid qrykey params ...) - ;; (print "handlerkey: " handlerkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params) - (case handlerkey ;; (string->symbol handlerkey) + ((cmdkey host-port pid qrykey params ...) + ;; (print "cmdkey: " cmdkey " host-port: " host-port " pid: " pid " qrykey: " qrykey " params: " params) + (case cmdkey ;; (string->symbol cmdkey) ((ack)(print "Got ack!")) ((ping) ;; special case - return result immediately on the same connection (let* ((proc (hash-table-ref/default (udat-handlers udata) 'ping #f)) (val (if proc (proc) "gotping")) (peer (make-peer addr-port: host-port pid: pid)) @@ -675,66 +271,39 @@ (dbs (if peer (peer-dbs peer) '())) (dbshash (udat-dbowners udata))) (for-each (lambda (dbfile)(hash-table-delete! dbshash dbfile)) dbs) (hash-table-delete! (udat-peers udata) host-port) qrykey)) - ((dropcaptain) - ;; remove all traces of the captain - (udat-captain-address-set! udata #f) - (udat-captain-host-set! udata #f) - (udat-captain-port-set! udata #f) - (udat-captain-pid-set! udata #f) - qrykey) - ((rucaptain) ;; remote is asking if I'm the captain - (if (udat-my-cpkt-key udata) "yes" "no")) - ((db-owner) ;; given a db name who do I send my queries to - ;; look up the file in handlers, if have an entry ping them to be sure - ;; they are still alive and then return that host:port. - ;; if no handler found or if the ping fails pick from peers the oldest that - ;; is managing the fewest dbs - (match params - ((dbfile dbtype) - (let* ((owner-host-port (hash-table-ref/default (udat-dbowners udata) dbfile #f))) - (if owner-host-port - (conc qrykey " " owner-host-port) - (let* ((pdat (or (hash-table-ref/default (udat-peers udata) host-port #f) ;; no owner - caller gets to own it! - (make-peer addr-port: host-port pid: pid dbs: `(,dbfile))))) - (hash-table-set! (udat-peers udata) host-port pdat) - (hash-table-set! (udat-dbowners udata) dbfile host-port) - (conc qrykey " " host-port))))) - (else (conc qrykey " BADDATA")))) - ;; for work items: - ;; handler is one of; immediate, read-only, read-write, high-priority ((immediate read-only normal low-priority) ;; do this work immediately ;; host-port (caller), pid (caller), qrykey (cookie), params <= all from first line ;; data => a single line encoded however you want, or should I build json into it? - (print "handlerkey=" handlerkey) + (print "cmdkey=" cmdkey) (let* ((pdat (get-peer-dat udata host-port))) (match params ;; dbfile prockey procparam ((dbfile prockey procparam) - (case handlerkey + (case cmdkey ((immediate read-only) (process-request udata pdat dbfile qrykey prockey procparam data)) ((normal low-priority) ;; split off later and add logic to support low priority (add-to-work-queue udata pdat dbfile qrykey prockey procparam data)) (else #f))) (else - (print "INFO: params=" params " handlerkey=" handlerkey " controldat=" controldat) + (print "INFO: params=" params " cmdkey=" cmdkey " controldat=" controldat) #f)))) (else - ;; (add-to-work-queue udata (get-peer-dat udata host-port) handlerkey qrykey data) + (add-to-work-queue udata (get-peer-dat udata host-port) cmdkey qrykey data) #f))) (else (print "BAD DATA? controldat=" controldat " data=" data) #f)));; handles the incoming messages and dispatches to queues ;; -(define (ulex-handler-loop udata) +(define (ulex-cmd-loop udata) (let* ((serv-listener (udat-serv-listener udata))) ;; data comes as two lines - ;; handlerkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db] + ;; cmdkey resp-addr:resp-port hostname pid qrykey [dbpath/dbfile.db] ;; data (let loop ((state 'start)) (let-values (((inp oup)(tcp-accept serv-listener))) (let* ((controldat (read inp)) (data (read inp)) @@ -742,23 +311,22 @@ (if resp (write resp oup)) (close-input-port inp) (close-output-port oup)) (loop state))))) -;; add a proc to the handler list, these are done symetrically (i.e. in all instances) +;; add a proc to the cmd list, these are done symetrically (i.e. in all instances) ;; so that the proc can be dereferenced remotely ;; -(define (register-handler udata key proc) - (hash-table-set! (udat-handlers udata) key proc)) - +(define (register-cmd udata key proc) + (hash-table-set! (udat-cmds udata) key proc)) ;;====================================================================== ;; work queues ;;====================================================================== -(define (add-to-work-queue udata peer-dat handlerkey qrykey data) - (let ((wdat (make-work peer-dat: peer-dat handlerkey: handlerkey qrykey: qrykey data: data))) +(define (add-to-work-queue udata peer-dat cmdkey qrykey data) + (let ((wdat (make-work peer-dat: peer-dat cmdkey: cmdkey qrykey: qrykey data: data))) (if (udat-busy udata) (queue-add! (udat-work-queue udata) wdat) (process-work udata wdat)) ;; passing in wdat tells process-work to first process the passed in wdat )) @@ -772,1481 +340,84 @@ (let loop ((wd (queue-remove! wqueue))) (do-work udata wd) (if (not (queue-empty? wqueue)) (loop (queue-remove! wqueue))))))) -;;====================================================================== -;; Generic db handling -;; setup a inmem db instance -;; open connection to on-disk db -;; sync on-disk db to inmem -;; get lock in on-disk db for dbowner of this db -;; put sync-proc, init-proc, on-disk handle, inmem handle in dbconn stuct -;; return the stuct -;;====================================================================== - -(defstruct dbconn - (fname #f) - (inmem #f) - (conn #f) - (sync #f) ;; sync proc - (init #f) ;; init proc - (lastsync (current-seconds)) - ) - -(defstruct dbinfo - (initproc #f) - (syncproc #f)) - -;; open inmem and disk database -;; init with initproc -;; return db struct -;; -;; appname; megatest, ulex or something else. -;; -(define (setup-db-connection udata fname-in appname dbtype) - (let* ((is-ulex (eq? appname 'ulex)) - (dbinf (if is-ulex ;; ulex is a built-in special case - (make-dbinfo initproc: ulexdb-init syncproc: ulexdb-sync) - (hash-table-ref/default (udat-dbtypes udata) dbtype #f))) - (initproc (dbinfo-initproc dbinf)) - (syncproc (dbinfo-syncproc dbinf)) - (fname (if is-ulex - (conc (udat-ulex-dir udata) "/ulex.db") - fname-in)) - (inmem-db (open-and-initdb udata #f 'inmem (dbinfo-initproc dbinf))) - (disk-db (open-and-initdb udata fname 'disk (dbinfo-initproc dbinf)))) - (make-dbconn inmem: inmem-db conn: disk-db sync: syncproc init: initproc))) - -;; dest='inmem or 'disk -;; -(define (open-and-initdb udata filename dest init-proc) - (let* ((inmem (eq? dest 'inmem)) - (dbfile (if inmem - ":INMEM:" - filename)) - (dbexists (if inmem #t (file-exists? dbfile))) - (db (sqlite3:open-database dbfile))) - (sqlite3:set-busy-handler! db (sqlite3:make-busy-timeout 136000)) - (if (not dbexists) - (init-proc db)) - db)) - - -;;====================================================================== -;; Previous Ulex db stuff -;;====================================================================== - -(define (ulexdb-init db inmem) - (sqlite3:with-transaction - db - (lambda () - (for-each - (lambda (stmt) - (if stmt (sqlite3:execute db stmt))) - `("CREATE TABLE IF NOT EXISTS processes - (id INTEGER PRIMARY KEY, - host TEXT NOT NULL, - ipadr TEXT NOT NULL, - port INTEGER NOT NULL, - pid INTEGER NOT NULL, - regtime INTEGER DEFAULT (strftime('%s','now')), - last_update INTEGER DEFAULT (strftime('%s','now')));" - (if inmem - "CREATE TRIGGER IF NOT EXISTS update_proces_trigger AFTER UPDATE ON processes - FOR EACH ROW - BEGIN - UPDATE processes SET last_update=(strftime('%s','now')) - WHERE id=old.id; - END;" - #f)))))) - -;; open databases, do initial sync -(define (ulexdb-sync dbconndat udata) - #f) - - -) ;; END OF ULEX - - -;;; ;;====================================================================== -;;; ;; D E B U G H E L P E R S -;;; ;;====================================================================== -;;; -;;; (define (dbg> . args) -;;; (with-output-to-port (current-error-port) -;;; (lambda () -;;; (apply print "dbg> " args)))) -;;; -;;; (define (debug-pp . args) -;;; (if (get-environment-variable "ULEX_DEBUG") -;;; (with-output-to-port (current-error-port) -;;; (lambda () -;;; (apply pp args))))) -;;; -;;; (define *default-debug-port* (current-error-port)) -;;; -;;; (define (sdbg> fn stage-name stage-start stage-end start-time . message) -;;; (if (get-environment-variable "ULEX_DEBUG") -;;; (with-output-to-port *default-debug-port* -;;; (lambda () -;;; (apply print "ulex:" fn " " stage-name " took " (- (if stage-end stage-end (current-milliseconds)) stage-start) " ms. " -;;; (if start-time -;;; (conc "total time " (- (current-milliseconds) start-time) -;;; " ms.") -;;; "") -;;; message -;;; ))))) - -;;====================================================================== -;; M A C R O S -;;====================================================================== -;; iup callbacks are not dumping the stack, this is a work-around -;; - -;; Some of these routines use: -;; -;; http://www.cs.toronto.edu/~gfb/scheme/simple-macros.html -;; -;; Syntax for defining macros in a simple style similar to function definiton, -;; when there is a single pattern for the argument list and there are no keywords. -;; -;; (define-simple-syntax (name arg ...) body ...) -;; -;; -;; (define-syntax define-simple-syntax -;; (syntax-rules () -;; ((_ (name arg ...) body ...) -;; (define-syntax name (syntax-rules () ((name arg ...) (begin body ...))))))) -;; -;; (define-simple-syntax (catch-and-dump proc procname) -;; (handle-exceptions -;; exn -;; (begin -;; (print-call-chain (current-error-port)) -;; (with-output-to-port (current-error-port) -;; (lambda () -;; (print ((condition-property-accessor 'exn 'message) exn)) -;; (print "Callback error in " procname) -;; (print "Full condition info:\n" (condition->list exn))))) -;; (proc))) -;; -;; -;;====================================================================== -;; R E C O R D S -;;====================================================================== - -;;; ;; information about me as a server -;;; ;; -;;; (defstruct area -;;; ;; about this area -;;; (useportlogger #f) -;;; (lowport 32768) -;;; (server-type 'auto) ;; auto=create up to five servers/pkts, main=create pkts, passive=no pkt (unless there are no pkts at all) -;;; (conn #f) -;;; (port #f) -;;; (myaddr (get-my-best-address)) -;;; pktid ;; get pkt from hosts table if needed -;;; pktfile -;;; pktsdir -;;; dbdir -;;; (dbhandles (make-hash-table)) ;; fname => list-of-dbh, NOTE: Should really never need more than one? -;;; (mutex (make-mutex)) -;;; (rtable (make-hash-table)) ;; registration table of available actions -;;; (dbs (make-hash-table)) ;; filename => random number, used for choosing what dbs I serve -;;; ;; about other servers -;;; (hosts (make-hash-table)) ;; key => hostdat -;;; (hoststats (make-hash-table)) ;; key => alist of fname => ( qcount . qtime ) -;;; (reqs (make-hash-table)) ;; uri => queue -;;; ;; work queues -;;; (wqueues (make-hash-table)) ;; fname => qdat -;;; (stats (make-hash-table)) ;; fname => totalqueries -;;; (last-srvup (current-seconds)) ;; last time we updated the known servers -;;; (cookie2mbox (make-hash-table)) ;; map cookie for outstanding request to mailbox of awaiting call -;;; (ready #f) -;;; (health (make-hash-table)) ;; ipaddr:port => num failed pings since last good ping -;;; ) -;;; -;;; ;; host stats -;;; ;; -;;; (defstruct hostdat -;;; (pkt #f) -;;; (dbload (make-hash-table)) ;; "dbfile.db" => queries/min -;;; (hostload #f) ;; normalized load ( 5min load / numcpus ) -;;; ) -;;; -;;; ;; dbdat -;;; ;; -;;; (defstruct dbdat -;;; (dbh #f) -;;; (fname #f) -;;; (write-access #f) -;;; (sths (make-hash-table)) ;; hash mapping query strings to handles -;;; ) -;;; -;;; ;; qdat -;;; ;; -;;; (defstruct qdat -;;; (writeq (make-queue)) -;;; (readq (make-queue)) -;;; (rwq (make-queue)) -;;; (logq (make-queue)) ;; do we need a queue for logging? yes, if we use sqlite3 db for logging -;;; (osshort (make-queue)) -;;; (oslong (make-queue)) -;;; (misc (make-queue)) ;; used for things like ping-full -;;; ) -;;; -;;; ;; calldat -;;; ;; -;;; (defstruct calldat -;;; (ctype 'dbwrite) -;;; (obj #f) ;; this would normally be an SQL statement e.g. SELECT, INSERT etc. -;;; (rtime (current-milliseconds))) -;;; -;;; ;; make it a global? Well, it is local to area module -;;; -;;; (define *pktspec* -;;; `((server (hostname . h) -;;; (port . p) -;;; (pid . i) -;;; (ipaddr . a) -;;; ) -;;; (data (hostname . h) ;; sender hostname -;;; (port . p) ;; sender port -;;; (ipaddr . a) ;; sender ip -;;; (hostkey . k) ;; sending host key - store info at server under this key -;;; (servkey . s) ;; server key - this needs to match at server end or reject the msg -;;; (format . f) ;; sb=serialized-base64, t=text, sx=sexpr, j=json -;;; (data . d) ;; base64 encoded slln data -;;; ))) -;;; -;;; ;; work item -;;; ;; -;;; (defstruct witem -;;; (rhost #f) ;; return host -;;; (ripaddr #f) ;; return ipaddr -;;; (rport #f) ;; return port -;;; (servkey #f) ;; the packet representing the client of this workitem, used by final send-message -;;; (rdat #f) ;; the request - usually an sql query, type is rdat -;;; (action #f) ;; the action: immediate, dbwrite, dbread,oslong, osshort -;;; (cookie #f) ;; cookie id for response -;;; (data #f) ;; the data payload, i.e. parameters -;;; (result #f) ;; the result from processing the data -;;; (caller #f)) ;; the calling peer according to rpc itself -;;; -;;; (define (trim-pktid pktid) -;;; (if (string? pktid) -;;; (substring pktid 0 4) -;;; "nopkt")) -;;; -;;; (define (any->number num) -;;; (cond -;;; ((number? num) num) -;;; ((string? num) (string->number num)) -;;; (else num))) -;;; -;;; (use trace) -;;; (trace-call-sites #t) -;;; -;;; ;;====================================================================== -;;; ;; D A T A B A S E H A N D L I N G -;;; ;;====================================================================== -;;; -;;; ;; look in dbhandles for a db, return it, else return #f -;;; ;; -;;; (define (get-dbh acfg fname) -;;; (let ((dbh-lst (hash-table-ref/default (area-dbhandles acfg) fname '()))) -;;; (if (null? dbh-lst) -;;; (begin -;;; ;; (print "opening db for " fname) -;;; (open-db acfg fname)) ;; Note that the handles get put back in the queue in the save-dbh calls -;;; (let ((rem-lst (cdr dbh-lst))) -;;; ;; (print "re-using saved connection for " fname) -;;; (hash-table-set! (area-dbhandles acfg) fname rem-lst) -;;; (car dbh-lst))))) -;;; -;;; (define (save-dbh acfg fname dbdat) -;;; ;; (print "saving dbh for " fname) -;;; (hash-table-set! (area-dbhandles acfg) fname (cons dbdat (hash-table-ref/default (area-dbhandles acfg) fname '())))) -;;; -;;; ;; open the database, if never before opened init it. put the handle in the -;;; ;; open db's hash table -;;; ;; returns: the dbdat -;;; ;; -;;; (define (open-db acfg fname) -;;; (let* ((fullname (conc (area-dbdir acfg) "/" fname)) -;;; (exists (file-exists? fullname)) -;;; (write-access (if exists -;;; (file-write-access? fullname) -;;; (file-write-access? (area-dbdir acfg)))) -;;; (db (sqlite3:open-database fullname)) -;;; (handler (sqlite3:make-busy-timeout 136000)) -;;; ) -;;; (sqlite3:set-busy-handler! db handler) -;;; (sqlite3:execute db "PRAGMA synchronous = 0;") -;;; (if (not exists) ;; need to init the db -;;; (if write-access -;;; (let ((isql (get-rsql acfg 'dbinitsql))) ;; get the init sql statements -;;; ;; (sqlite3:with-transaction -;;; ;; db -;;; ;; (lambda () -;;; (if isql -;;; (for-each -;;; (lambda (sql) -;;; (sqlite3:execute db sql)) -;;; isql))) -;;; (print "ERROR: no write access to " (area-dbdir acfg)))) -;;; (make-dbdat dbh: db fname: fname write-access: write-access))) -;;; -;;; ;; This is a low-level command to retrieve or to prepare, save and return a prepared statment -;;; ;; you must extract the db handle -;;; ;; -;;; (define (get-sth db cache stmt) -;;; (if (hash-table-exists? cache stmt) -;;; (begin -;;; ;; (print "Reusing cached stmt for " stmt) -;;; (hash-table-ref/default cache stmt #f)) -;;; (let ((sth (sqlite3:prepare db stmt))) -;;; (hash-table-set! cache stmt sth) -;;; ;; (print "prepared stmt for " stmt) -;;; sth))) -;;; -;;; ;; a little more expensive but does all the tedious deferencing - only use if you don't already -;;; ;; have dbdat and db sitting around -;;; ;; -;;; (define (full-get-sth acfg fname stmt) -;;; (let* ((dbdat (get-dbh acfg fname)) -;;; (db (dbdat-dbh dbdat)) -;;; (sths (dbdat-sths dbdat))) -;;; (get-sth db sths stmt))) -;;; -;;; ;; write to a db -;;; ;; acfg: area data -;;; ;; rdat: request data -;;; ;; hdat: (host . port) -;;; ;; -;;; ;; (define (dbwrite acfg rdat hdat data-in) -;;; ;; (let* ((dbname (car data-in)) -;;; ;; (dbdat (get-dbh acfg dbname)) -;;; ;; (db (dbdat-dbh dbdat)) -;;; ;; (sths (dbdat-sths dbdat)) -;;; ;; (stmt (calldat-obj rdat)) -;;; ;; (sth (get-sth db sths stmt)) -;;; ;; (data (cdr data-in))) -;;; ;; (print "dbname: " dbname " acfg: " acfg " rdat: " (calldat->alist rdat) " hdat: " hdat " data: " data) -;;; ;; (print "dbdat: " (dbdat->alist dbdat)) -;;; ;; (apply sqlite3:execute sth data) -;;; ;; (save-dbh acfg dbname dbdat) -;;; ;; #t -;;; ;; )) -;;; -;;; (define (finalize-all-db-handles acfg) -;;; (let* ((dbhandles (area-dbhandles acfg)) ;; dbhandles is hash of fname ==> dbdat -;;; (num 0)) -;;; (for-each -;;; (lambda (area-name) -;;; (print "Closing handles for " area-name) -;;; (let ((dbdats (hash-table-ref/default dbhandles area-name '()))) -;;; (for-each -;;; (lambda (dbdat) -;;; ;; first close all statement handles -;;; (for-each -;;; (lambda (sth) -;;; (sqlite3:finalize! sth) -;;; (set! num (+ num 1))) -;;; (hash-table-values (dbdat-sths dbdat))) -;;; ;; now close the dbh -;;; (set! num (+ num 1)) -;;; (sqlite3:finalize! (dbdat-dbh dbdat))) -;;; dbdats))) -;;; (hash-table-keys dbhandles)) -;;; (print "FINALIZED " num " dbhandles"))) -;;; -;;; ;;====================================================================== -;;; ;; W O R K Q U E U E H A N D L I N G -;;; ;;====================================================================== -;;; -;;; (define (register-db-as-mine acfg dbname) -;;; (let ((ht (area-dbs acfg))) -;;; (if (not (hash-table-ref/default ht dbname #f)) -;;; (hash-table-set! ht dbname (random 10000))))) -;;; -;;; (define (work-queue-add acfg fname witem) -;;; (let* ((work-queue-start (current-milliseconds)) -;;; (action (witem-action witem)) ;; NB the action is the index into the rdat actions -;;; (qdat (or (hash-table-ref/default (area-wqueues acfg) fname #f) -;;; (let ((newqdat (make-qdat))) -;;; (hash-table-set! (area-wqueues acfg) fname newqdat) -;;; newqdat))) -;;; (rdat (hash-table-ref/default (area-rtable acfg) action #f))) -;;; (if rdat -;;; (queue-add! -;;; (case (calldat-ctype rdat) -;;; ((dbwrite) (register-db-as-mine acfg fname)(qdat-writeq qdat)) -;;; ((dbread) (register-db-as-mine acfg fname)(qdat-readq qdat)) -;;; ((dbrw) (register-db-as-mine acfg fname)(qdat-rwq qdat)) -;;; ((oslong) (qdat-oslong qdat)) -;;; ((osshort) (qdat-osshort qdat)) -;;; ((full-ping) (qdat-misc qdat)) -;;; (else -;;; (print "ERROR: no queue for " action ". Adding to dbwrite queue.") -;;; (qdat-writeq qdat))) -;;; witem) -;;; (case action -;;; ((full-ping)(qdat-misc qdat)) -;;; (else -;;; (print "ERROR: No action " action " was registered")))) -;;; (sdbg> "work-queue-add" "queue-add" work-queue-start #f #f) -;;; #t)) ;; for now, simply return #t to indicate request got to the queue -;;; -;;; (define (doqueue acfg q fname dbdat dbh) -;;; ;; (print "doqueue: " fname) -;;; (let* ((start-time (current-milliseconds)) -;;; (qlen (queue-length q))) -;;; (if (> qlen 1) -;;; (print "Processing queue of length " qlen)) -;;; (let loop ((count 0) -;;; (responses '())) -;;; (let ((delta (- (current-milliseconds) start-time))) -;;; (if (or (queue-empty? q) -;;; (> delta 400)) ;; stop working on this queue after 400ms have passed -;;; (list count delta responses) ;; return count, delta and responses list -;;; (let* ((witem (queue-remove! q)) -;;; (action (witem-action witem)) -;;; (rdat (witem-rdat witem)) -;;; (stmt (calldat-obj rdat)) -;;; (sth (full-get-sth acfg fname stmt)) -;;; (ctype (calldat-ctype rdat)) -;;; (data (witem-data witem)) -;;; (cookie (witem-cookie witem))) -;;; ;; do the processing and save the result in witem-result -;;; (witem-result-set! -;;; witem -;;; (case ctype ;; action -;;; ((noblockwrite) ;; blind write, no ack of success returned -;;; (apply sqlite3:execute sth data) -;;; (sqlite3:last-insert-rowid dbh)) -;;; ((dbwrite) ;; blocking write -;;; (apply sqlite3:execute sth data) -;;; #t) -;;; ((dbread) ;; TODO: consider breaking this up and shipping in pieces for large query -;;; (apply sqlite3:map-row (lambda x x) sth data)) -;;; ((full-ping) 'full-ping) -;;; (else (print "Not ready for action " action) #f))) -;;; (loop (add1 count) -;;; (if cookie -;;; (cons witem responses) -;;; responses)))))))) -;;; -;;; ;; do up to 400ms of processing on each queue -;;; ;; - the work-queue-processor will allow the max 1200ms of work to complete but it will flag as overloaded -;;; ;; -;;; (define (process-db-queries acfg fname) -;;; (if (hash-table-exists? (area-wqueues acfg) fname) -;;; (let* ((process-db-queries-start-time (current-milliseconds)) -;;; (qdat (hash-table-ref/default (area-wqueues acfg) fname #f)) -;;; (queue-sym->queue (lambda (queue-sym) -;;; (case queue-sym ;; lookup the queue from qdat given a name (symbol) -;;; ((wqueue) (qdat-writeq qdat)) -;;; ((rqueue) (qdat-readq qdat)) -;;; ((rwqueue) (qdat-rwq qdat)) -;;; ((misc) (qdat-misc qdat)) -;;; (else #f)))) -;;; (dbdat (get-dbh acfg fname)) -;;; (dbh (if (dbdat? dbdat)(dbdat-dbh dbdat) #f)) -;;; (nowtime (current-seconds))) -;;; ;; handle the queues that require a transaction -;;; ;; -;;; (map ;; -;;; (lambda (queue-sym) -;;; ;; (print "processing queue " queue-sym) -;;; (let* ((queue (queue-sym->queue queue-sym))) -;;; (if (not (queue-empty? queue)) -;;; (let ((responses -;;; (sqlite3:with-transaction ;; todo - catch exceptions... -;;; dbh -;;; (lambda () -;;; (let* ((res (doqueue acfg queue fname dbdat dbh))) ;; this does the work! -;;; ;; (print "res=" res) -;;; (match res -;;; ((count delta responses) -;;; (update-stats acfg fname queue-sym delta count) -;;; (sdbg> "process-db-queries" "sqlite3-transaction" process-db-queries-start-time #f #f) -;;; responses) ;; return responses -;;; (else -;;; (print "ERROR: bad return data from doqueue " res))) -;;; ))))) -;;; ;; having completed the transaction, send the responses. -;;; ;; (print "INFO: sending " (length responses) " responses.") -;;; (let loop ((responses-left responses)) -;;; (cond -;;; ((null? responses-left) #t) -;;; (else -;;; (let* ((witem (car responses-left)) -;;; (response (cdr responses-left))) -;;; (call-deliver-response acfg (witem-ripaddr witem)(witem-rport witem) -;;; (witem-cookie witem)(witem-result witem))) -;;; (loop (cdr responses-left)))))) -;;; ))) -;;; '(wqueue rwqueue rqueue)) -;;; -;;; ;; handle misc queue -;;; ;; -;;; ;; (print "processing misc queue") -;;; (let ((queue (queue-sym->queue 'misc))) -;;; (doqueue acfg queue fname dbdat dbh)) -;;; ;; .... -;;; (save-dbh acfg fname dbdat) -;;; #t ;; just to let the tests know we got here -;;; ) -;;; #f ;; nothing processed -;;; )) -;;; -;;; ;; run all queues in parallel per db but sequentially per queue for that db. -;;; ;; - process the queues every 500 or so ms -;;; ;; - allow for long running queries to continue but all other activities for that -;;; ;; db will be blocked. -;;; ;; -;;; (define (work-queue-processor acfg) -;;; (let* ((threads (make-hash-table))) ;; fname => thread -;;; (let loop ((fnames (hash-table-keys (area-wqueues acfg))) -;;; (target-time (+ (current-milliseconds) 50))) -;;; ;;(if (not (null? fnames))(print "Processing for these databases: " fnames)) -;;; (for-each -;;; (lambda (fname) -;;; ;; (print "processing for " fname) -;;; ;;(process-db-queries acfg fname)) -;;; (let ((th (hash-table-ref/default threads fname #f))) -;;; (if (and th (not (member (thread-state th) '(dead terminated)))) -;;; (begin -;;; (print "WARNING: worker thread for " fname " is taking a long time.") -;;; (print "Thread is in state " (thread-state th))) -;;; (let ((th1 (make-thread (lambda () -;;; ;; (catch-and-dump -;;; ;; (lambda () -;;; ;; (print "Process queries for " fname) -;;; (let ((start-time (current-milliseconds))) -;;; (process-db-queries acfg fname) -;;; ;; (thread-sleep! 0.01) ;; need the thread to take at least some time -;;; (hash-table-delete! threads fname)) ;; no mutexes? -;;; fname) -;;; "th1"))) ;; )) -;;; (hash-table-set! threads fname th1) -;;; (thread-start! th1))))) -;;; fnames) -;;; ;; (thread-sleep! 0.1) ;; give the threads some time to process requests -;;; ;; burn time until 400ms is up -;;; (let ((now-time (current-milliseconds))) -;;; (if (< now-time target-time) -;;; (let ((delta (- target-time now-time))) -;;; (thread-sleep! (/ delta 1000))))) -;;; (loop (hash-table-keys (area-wqueues acfg)) -;;; (+ (current-milliseconds) 50))))) -;;; -;;; ;;====================================================================== -;;; ;; S T A T S G A T H E R I N G -;;; ;;====================================================================== -;;; -;;; (defstruct stat -;;; (qcount-avg 0) ;; coarse running average -;;; (qtime-avg 0) ;; coarse running average -;;; (qcount 0) ;; total -;;; (qtime 0) ;; total -;;; (last-qcount 0) ;; last -;;; (last-qtime 0) ;; last -;;; (dbs '()) ;; list of db files handled by this node -;;; (when 0)) ;; when the last query happened - seconds -;;; -;;; -;;; (define (update-stats acfg fname bucket duration numqueries) -;;; (let* ((key fname) ;; for now do not use bucket. Was: (conc fname "-" bucket)) ;; lazy but good enough -;;; (stats (or (hash-table-ref/default (area-stats acfg) key #f) -;;; (let ((newstats (make-stat))) -;;; (hash-table-set! (area-stats acfg) key newstats) -;;; newstats)))) -;;; ;; when the last query happended (used to remove the fname from the active list) -;;; (stat-when-set! stats (current-seconds)) -;;; ;; last values -;;; (stat-last-qcount-set! stats numqueries) -;;; (stat-last-qtime-set! stats duration) -;;; ;; total over process lifetime -;;; (stat-qcount-set! stats (+ (stat-qcount stats) numqueries)) -;;; (stat-qtime-set! stats (+ (stat-qtime stats) duration)) -;;; ;; coarse average -;;; (stat-qcount-avg-set! stats (/ (+ (stat-qcount-avg stats) numqueries) 2)) -;;; (stat-qtime-avg-set! stats (/ (+ (stat-qtime-avg stats) duration) 2)) -;;; -;;; ;; here is where we add the stats for a given dbfile -;;; (if (not (member fname (stat-dbs stats))) -;;; (stat-dbs-set! stats (cons fname (stat-dbs stats)))) -;;; -;;; )) -;;; -;;; ;;====================================================================== -;;; ;; S E R V E R S T U F F -;;; ;;====================================================================== -;;; -;;; ;; this does NOT return! -;;; ;; -;;; (define (find-free-port-and-open acfg) -;;; (let ((port (or (area-port acfg) 3200))) -;;; (handle-exceptions -;;; exn -;;; (begin -;;; (print "INFO: cannot bind to port " (rpc:default-server-port) ", trying next port") -;;; (area-port-set! acfg (+ port 1)) -;;; (find-free-port-and-open acfg)) -;;; (rpc:default-server-port port) -;;; (area-port-set! acfg port) -;;; (tcp-read-timeout 120000) -;;; ;; ((rpc:make-server (tcp-listen port)) #t) -;;; (tcp-listen (rpc:default-server-port) -;;; )))) -;;; -;;; ;; register this node by putting a packet into the pkts dir. -;;; ;; look for other servers -;;; ;; contact other servers and compile list of servers -;;; ;; there are two types of server -;;; ;; main servers - dashboards, runners and dedicated servers - need pkt -;;; ;; passive servers - test executers, step calls, list-runs - no pkt -;;; ;; -;;; (define (register-node acfg hostip port-num) -;;; ;;(mutex-lock! (area-mutex acfg)) -;;; (let* ((server-type (area-server-type acfg)) ;; auto, main, passive (no pkt created) -;;; (best-ip (or hostip (get-my-best-address))) -;;; (mtdir (area-dbdir acfg)) -;;; (pktdir (area-pktsdir acfg))) ;; conc mtdir "/.server-pkts"))) -;;; (print "Registering node " best-ip ":" port-num) -;;; (if (not mtdir) ;; require a home for this node to put or find databases -;;; #f -;;; (begin -;;; (if (not (directory? pktdir))(create-directory pktdir)) -;;; ;; server is started, now create pkt if needed -;;; (print "Starting server in " server-type " mode with port " port-num) -;;; (if (member server-type '(auto main)) ;; TODO: if auto, count number of servers registers, if > 3 then don't put out a pkt -;;; (begin -;;; (area-pktid-set! acfg -;;; (write-alist->pkt -;;; pktdir -;;; `((hostname . ,(get-host-name)) -;;; (ipaddr . ,best-ip) -;;; (port . ,port-num) -;;; (pid . ,(current-process-id))) -;;; pktspec: *pktspec* -;;; ptype: 'server)) -;;; (area-pktfile-set! acfg (conc pktdir "/" (area-pktid acfg) ".pkt")))) -;;; (area-port-set! acfg port-num) -;;; #;(mutex-unlock! (area-mutex acfg)))))) -;;; -;;; (define *cookie-seqnum* 0) -;;; (define (make-cookie key) -;;; (set! *cookie-seqnum* (add1 *cookie-seqnum*)) -;;; ;;(print "MAKE COOKIE CALLED -- on "servkey"-"*cookie-seqnum*) -;;; (conc key "-" *cookie-seqnum*) -;;; ) -;;; -;;; ;; dispatch locally if possible -;;; ;; -;;; (define (call-deliver-response acfg ipaddr port cookie data) -;;; (if (and (equal? (area-myaddr acfg) ipaddr) -;;; (equal? (area-port acfg) port)) -;;; (deliver-response acfg cookie data) -;;; ((rpc:procedure 'response ipaddr port) cookie data))) -;;; -;;; (define (deliver-response acfg cookie data) -;;; (let ((deliver-response-start (current-milliseconds))) -;;; (thread-start! (make-thread -;;; (lambda () -;;; (let loop ((tries-left 5)) -;;; ;;(print "TOP OF DELIVER_RESPONSE LOOP; triesleft="tries-left) -;;; ;;(pp (hash-table->alist (area-cookie2mbox acfg))) -;;; (let* ((mbox (hash-table-ref/default (area-cookie2mbox acfg) cookie #f))) -;;; (cond -;;; ((eq? 0 tries-left) -;;; (print "ulex:deliver-response: I give up. Mailbox never appeared. cookie="cookie) -;;; ) -;;; (mbox -;;; ;;(print "got mbox="mbox" got data="data" send.") -;;; (mailbox-send! mbox data)) -;;; (else -;;; ;;(print "no mbox yet. look for "cookie) -;;; (thread-sleep! (/ (- 6 tries-left) 10)) -;;; (loop (sub1 tries-left)))))) -;;; ;; (debug-pp (list (conc "ulex:deliver-response took " (- (current-milliseconds) deliver-response-start) " ms, cookie=" cookie " data=") data)) -;;; (sdbg> "deliver-response" "mailbox-send" deliver-response-start #f #f cookie) -;;; ) -;;; (conc "deliver-response thread for cookie="cookie)))) -;;; #t) -;;; -;;; ;; action: -;;; ;; immediate - quick actions, no need to put in queues -;;; ;; dbwrite - put in dbwrite queue -;;; ;; dbread - put in dbread queue -;;; ;; oslong - os actions, e.g. du, that could take a long time -;;; ;; osshort - os actions that should be quick, e.g. df -;;; ;; -;;; (define (request acfg from-ipaddr from-port servkey action cookie fname params) ;; std-peer-handler -;;; ;; NOTE: Use rpc:current-peer for getting return address -;;; (let* ((std-peer-handler-start (current-milliseconds)) -;;; ;; (raw-data (alist-ref 'data dat)) -;;; (rdat (hash-table-ref/default -;;; (area-rtable acfg) action #f)) ;; this looks up the sql query or other details indexed by the action -;;; (witem (make-witem ripaddr: from-ipaddr ;; rhost: from-host -;;; rport: from-port action: action -;;; rdat: rdat cookie: cookie -;;; servkey: servkey data: params ;; TODO - rename data to params -;;; caller: (rpc:current-peer)))) -;;; (if (not (equal? servkey (area-pktid acfg))) -;;; `(#f . ,(conc "I don't know you servkey=" servkey ", pktid=" (area-pktid acfg))) ;; immediately return this -;;; (let* ((ctype (if rdat -;;; (calldat-ctype rdat) ;; is this necessary? these should be identical -;;; action))) -;;; (sdbg> "std-peer-handler" "immediate" std-peer-handler-start #f #f) -;;; (case ctype -;;; ;; (dbwrite acfg rdat (cons from-ipaddr from-port) data))) -;;; ((full-ping) `(#t "ack to full ping" ,(work-queue-add acfg fname witem) ,cookie)) -;;; ((response) `(#t "ack from requestor" ,(deliver-response acfg fname params))) -;;; ((dbwrite) `(#t "db write submitted" ,(work-queue-add acfg fname witem) ,cookie)) -;;; ((dbread) `(#t "db read submitted" ,(work-queue-add acfg fname witem) ,cookie )) -;;; ((dbrw) `(#t "db read/write submitted" ,cookie)) -;;; ((osshort) `(#t "os short submitted" ,cookie)) -;;; ((oslong) `(#t "os long submitted" ,cookie)) -;;; (else `(#f "unrecognised action" ,ctype))))))) -;;; -;;; ;; Call this to start the actual server -;;; ;; -;;; ;; start_server -;;; ;; -;;; ;; mode: ' -;;; ;; handler: proc which takes pktrecieved as argument -;;; ;; -;;; -;;; (define (start-server acfg) -;;; (let* ((conn (find-free-port-and-open acfg)) -;;; (port (area-port acfg))) -;;; (rpc:publish-procedure! -;;; 'delist-db -;;; (lambda (fname) -;;; (hash-table-delete! (area-dbs acfg) fname))) -;;; (rpc:publish-procedure! -;;; 'calling-addr -;;; (lambda () -;;; (rpc:current-peer))) -;;; (rpc:publish-procedure! -;;; 'ping -;;; (lambda ()(real-ping acfg))) -;;; (rpc:publish-procedure! -;;; 'request -;;; (lambda (from-addr from-port servkey action cookie dbname params) -;;; (request acfg from-addr from-port servkey action cookie dbname params))) -;;; (rpc:publish-procedure! -;;; 'response -;;; (lambda (cookie res-dat) -;;; (deliver-response acfg cookie res-dat))) -;;; (area-ready-set! acfg #t) -;;; (area-conn-set! acfg conn) -;;; ((rpc:make-server conn) #f)));; ((tcp-listen (rpc:default-server-port)) #t) -;;; -;;; -;;; (define (launch acfg) ;; #!optional (proc std-peer-handler)) -;;; (print "starting launch") -;;; (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers) -;;; #;(let ((original-handler (current-exception-handler))) ;; is th -;;; (lambda (exception) -;;; (server-exit-procedure) -;;; (original-handler exception))) -;;; (on-exit (lambda () -;;; (shutdown acfg))) ;; (finalize-all-db-handles acfg))) -;;; ;; set up the rpc handler -;;; (let* ((th1 (make-thread -;;; (lambda ()(start-server acfg)) -;;; "server thread")) -;;; (th2 (make-thread -;;; (lambda () -;;; (print "th2 starting") -;;; (let loop () -;;; (work-queue-processor acfg) -;;; (print "work-queue-processor crashed!") -;;; (loop))) -;;; "work queue thread"))) -;;; (thread-start! th1) -;;; (thread-start! th2) -;;; (let loop () -;;; (thread-sleep! 0.025) -;;; (if (area-ready acfg) -;;; #t -;;; (loop))) -;;; ;; attempt to fix my address -;;; (let* ((all-addr (get-all-ips-sorted))) ;; could use (tcp-addresses conn)? -;;; (let loop ((rem-addrs all-addr)) -;;; (if (null? rem-addrs) -;;; (begin -;;; (print "ERROR: Failed to figure out the ip address of myself as a server. Giving up.") -;;; (exit 1)) ;; BUG Changeme to raising an exception -;;; -;;; (let* ((addr (car rem-addrs)) -;;; (good-addr (handle-exceptions -;;; exn -;;; #f -;;; ((rpc:procedure 'calling-addr addr (area-port acfg)))))) -;;; (if good-addr -;;; (begin -;;; (print "Got good-addr of " good-addr) -;;; (area-myaddr-set! acfg good-addr)) -;;; (loop (cdr rem-addrs))))))) -;;; (register-node acfg (area-myaddr acfg)(area-port acfg)) -;;; (print "INFO: Server started on " (area-myaddr acfg) ":" (area-port acfg)) -;;; ;; (update-known-servers acfg) ;; gotta do this on every start (thus why limit number of publicised servers) -;;; )) -;;; -;;; (define (clear-server-pkt acfg) -;;; (let ((pktf (area-pktfile acfg))) -;;; (if pktf (delete-file* pktf)))) -;;; -;;; (define (shutdown acfg) -;;; (let (;;(conn (area-conn acfg)) -;;; (pktf (area-pktfile acfg)) -;;; (port (area-port acfg))) -;;; (if pktf (delete-file* pktf)) -;;; (send-all "imshuttingdown") -;;; ;; (rpc:close-all-connections!) ;; don't know if this is actually needed -;;; (finalize-all-db-handles acfg))) -;;; -;;; (define (send-all msg) -;;; #f) -;;; -;;; ;; given a area record look up all the packets -;;; ;; -;;; (define (get-all-server-pkts acfg) -;;; (let ((all-pkt-files (glob (conc (area-pktsdir acfg) "/*.pkt")))) -;;; (map (lambda (pkt-file) -;;; (read-pkt->alist pkt-file pktspec: *pktspec*)) -;;; all-pkt-files))) -;;; -;;; #;((Z . "9a0212302295a19610d5796fce0370fa130758e9") -;;; (port . "34827") -;;; (pid . "28748") -;;; (hostname . "zeus") -;;; (T . "server") -;;; (D . "1549427032.0")) -;;; -;;; #;(define (get-my-best-address) -;;; (let ((all-my-addresses (get-all-ips))) ;; (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name)))))) -;;; (cond -;;; ((null? all-my-addresses) -;;; (get-host-name)) ;; no interfaces? -;;; ((eq? (length all-my-addresses) 1) -;;; (ip->string (car all-my-addresses))) ;; only one to choose from, just go with it -;;; (else -;;; (ip->string (car (filter (lambda (x) ;; take any but 127. -;;; (not (eq? (u8vector-ref x 0) 127))) -;;; all-my-addresses))))))) -;;; -;;; ;; whoami? I am my pkt -;;; ;; -;;; (define (whoami? acfg) -;;; (hash-table-ref/default (area-hosts acfg)(area-pktid acfg) #f)) -;;; -;;; ;;====================================================================== -;;; ;; "Client side" operations -;;; ;;====================================================================== -;;; -;;; (define (safe-call call-key host port . params) -;;; (handle-exceptions -;;; exn -;;; (begin -;;; (print "Call " call-key " to " host ":" port " failed") -;;; #f) -;;; (apply (rpc:procedure call-key host port) params))) -;;; -;;; ;; ;; convert to/from string / sexpr -;;; ;; -;;; ;; (define (string->sexpr str) -;;; ;; (if (string? str) -;;; ;; (with-input-from-string str read) -;;; ;; str)) -;;; ;; -;;; ;; (define (sexpr->string s) -;;; ;; (with-output-to-string (lambda ()(write s)))) -;;; -;;; ;; is the server alive? -;;; ;; -;;; (define (ping acfg host port) -;;; (let* ((myaddr (area-myaddr acfg)) -;;; (myport (area-port acfg)) -;;; (start-time (current-milliseconds)) -;;; (res (if (and (equal? myaddr host) -;;; (equal? myport port)) -;;; (real-ping acfg) -;;; ((rpc:procedure 'ping host port))))) -;;; (cons (- (current-milliseconds) start-time) -;;; res))) -;;; -;;; ;; returns ( ipaddr port alist-fname=>randnum ) -;;; (define (real-ping acfg) -;;; `(,(area-myaddr acfg) ,(area-port acfg) ,(get-host-stats acfg))) -;;; -;;; ;; is the server alive AND the queues processing? -;;; ;; -;;; #;(define (full-ping acfg servpkt) -;;; (let* ((start-time (current-milliseconds)) -;;; (res (send-message acfg servpkt '(full-ping) 'full-ping))) -;;; (cons (- (current-milliseconds) start-time) -;;; res))) ;; (equal? res "got ping")))) -;;; -;;; -;;; ;; look up all pkts and get the server id (the hash), port, host/ip -;;; ;; store this info in acfg -;;; ;; return the number of responsive servers found -;;; ;; -;;; ;; DO NOT VERIFY THAT THE SERVER IS ALIVE HERE. This is called at times where the current server is not yet alive and cannot ping itself -;;; ;; -;;; (define (update-known-servers acfg) -;;; ;; readll all pkts -;;; ;; foreach pkt; if it isn't me ping the server; if alive, add to hosts hash, else rm the pkt -;;; (let* ((start-time (current-milliseconds)) -;;; (all-pkts (delete-duplicates -;;; (append (get-all-server-pkts acfg) -;;; (hash-table-values (area-hosts acfg))))) -;;; (hostshash (area-hosts acfg)) -;;; (my-id (area-pktid acfg)) -;;; (pktsdir (area-pktsdir acfg)) ;; needed to remove pkts from non-responsive servers -;;; (numsrvs 0) -;;; (delpkt (lambda (pktsdir sid) -;;; (print "clearing out server " sid) -;;; (delete-file* (conc pktsdir "/" sid ".pkt")) -;;; (hash-table-delete! hostshash sid)))) -;;; (area-last-srvup-set! acfg (current-seconds)) -;;; (for-each -;;; (lambda (servpkt) -;;; (if (list? servpkt) -;;; ;; (pp servpkt) -;;; (let* ((shost (alist-ref 'ipaddr servpkt)) -;;; (sport (any->number (alist-ref 'port servpkt))) -;;; (res (handle-exceptions -;;; exn -;;; (begin -;;; ;; (print "INFO: bad server on " shost ":" sport) -;;; #f) -;;; (ping acfg shost sport))) -;;; (sid (alist-ref 'Z servpkt)) ;; Z code is our name for the server -;;; (url (conc shost ":" sport)) -;;; ) -;;; #;(if (or (not res) -;;; (null? res)) -;;; (begin -;;; (print "STRANGE: ping of " url " gave " res))) -;;; -;;; ;; (print "Got " res " from " shost ":" sport) -;;; (match res -;;; ((qduration . payload) -;;; ;; (print "Server pkt:" (alist-ref 'ipaddr servpkt) ":" (alist-ref 'port servpkt) -;;; ;; (if payload -;;; ;; "Success" "Fail")) -;;; (match payload -;;; ((host port stats) -;;; ;; (print "From " host ":" port " got stats: " stats) -;;; (if (and host port stats) -;;; (let ((url (conc host ":" port))) -;;; (hash-table-set! hostshash sid servpkt) -;;; ;; store based on host:port -;;; (hash-table-set! (area-hoststats acfg) sid stats)) -;;; (print "missing data from the server, not sure what that means!")) -;;; (set! numsrvs (+ numsrvs 1))) -;;; (#f -;;; (print "Removing pkt " sid " due to #f from server or failed ping") -;;; (delpkt pktsdir sid)) -;;; (else -;;; (print "Got ")(pp res)(print " from server ")(pp servpkt) " but response did not match (#f/#t . msg)"))) -;;; (else -;;; ;; here we delete the pkt - can't reach the server, remove it -;;; ;; however this logic is inadequate. we should mark the server as checked -;;; ;; and not good, if it happens a second time - then remove the pkt -;;; ;; or something similar. I.e. don't be too quick to assume the server is wedged or dead -;;; ;; could be it is simply too busy to reply -;;; (let ((bad-pings (hash-table-ref/default (area-health acfg) url 0))) -;;; (if (> bad-pings 1) ;; two bad pings - remove pkt -;;; (begin -;;; (print "INFO: " bad-pings " bad responses from " url ", deleting pkt " sid) -;;; (delpkt pktsdir sid)) -;;; (begin -;;; (print "INFO: " bad-pings " bad responses from " shost ":" sport " not deleting pkt yet") -;;; (hash-table-set! (area-health acfg) -;;; url -;;; (+ (hash-table-ref/default (area-health acfg) url 0) 1)) -;;; )) -;;; )))) -;;; ;; servpkt is not actually a pkt? -;;; (begin -;;; (print "Bad pkt " servpkt)))) -;;; all-pkts) -;;; (sdbg> "update-known-servers" "end" start-time #f #f " found " numsrvs -;;; " servers, pkts: " (map (lambda (p) -;;; (alist-ref 'Z p)) -;;; all-pkts)) -;;; numsrvs)) -;;; -;;; (defstruct srvstat -;;; (numfiles 0) ;; number of db files handled by this server - subtract 1 for the db being currently looked at -;;; (randnum #f) ;; tie breaker number assigned to by the server itself - applies only to the db under consideration -;;; (pkt #f)) ;; the server pkt -;;; -;;; ;;(define (srv->srvstat srvpkt) -;;; -;;; ;; Get the server best for given dbname and key -;;; ;; -;;; ;; NOTE: key is not currently used. The key points to the kind of query, this may be useful for directing read-only queries. -;;; ;; -;;; (define (get-best-server acfg dbname key) -;;; (let* (;; (servers (hash-table-values (area-hosts acfg))) -;;; (servers (area-hosts acfg)) -;;; (skeys (sort (hash-table-keys servers) string>=?)) ;; a stable listing -;;; (start-time (current-milliseconds)) -;;; (srvstats (make-hash-table)) ;; srvid => srvstat -;;; (url (conc (area-myaddr acfg) ":" (area-port acfg)))) -;;; ;; (print "scores for " dbname ": " (map (lambda (k)(cons k (calc-server-score acfg dbname k))) skeys)) -;;; (if (null? skeys) -;;; (if (> (update-known-servers acfg) 0) -;;; (get-best-server acfg dbname key) ;; some risk of infinite loop here, TODO add try counter -;;; (begin -;;; (print "ERROR: no server found!") ;; since this process is also a server this should never happen -;;; #f)) -;;; (begin -;;; ;; (print "in get-best-server with skeys=" skeys) -;;; (if (> (- (current-seconds) (area-last-srvup acfg)) 10) -;;; (begin -;;; (update-known-servers acfg) -;;; (sdbg> "get-best-server" "update-known-servers" start-time #f #f))) -;;; -;;; ;; for each server look at the list of dbfiles, total number of dbs being handled -;;; ;; and the rand number, save the best host -;;; ;; also do a delist-db for each server dbfile not used -;;; (let* ((best-server #f) -;;; (servers-to-delist (make-hash-table))) -;;; (for-each -;;; (lambda (srvid) -;;; (let* ((server (hash-table-ref/default servers srvid #f)) -;;; (stats (hash-table-ref/default (area-hoststats acfg) srvid '(())))) -;;; ;; (print "stats: " stats) -;;; (if server -;;; (let* ((dbweights (car stats)) -;;; (srvload (length (filter (lambda (x)(not (equal? dbname (car x)))) dbweights))) -;;; (dbrec (alist-ref dbname dbweights equal?)) ;; get the pair with fname . randscore -;;; (randnum (if dbrec -;;; dbrec ;; (cdr dbrec) -;;; 0))) -;;; (hash-table-set! srvstats srvid (make-srvstat numfiles: srvload randnum: randnum pkt: server)))))) -;;; skeys) -;;; -;;; (let* ((sorted (sort (hash-table-values srvstats) -;;; (lambda (a b) -;;; (let ((numfiles-a (srvstat-numfiles a)) -;;; (numfiles-b (srvstat-numfiles b)) -;;; (randnum-a (srvstat-randnum a)) -;;; (randnum-b (srvstat-randnum b))) -;;; (if (< numfiles-a numfiles-b) ;; Note, I don't think adding an offset works here. Goal was only move file handling to a different server if it has 2 less -;;; #t -;;; (if (and (equal? numfiles-a numfiles-b) -;;; (< randnum-a randnum-b)) -;;; #t -;;; #f)))))) -;;; (best (if (null? sorted) -;;; (begin -;;; (print "ERROR: should never be null due to self as server.") -;;; #f) -;;; (srvstat-pkt (car sorted))))) -;;; #;(print "SERVER(" url "): " dbname ": " (map (lambda (srv) -;;; (let ((p (srvstat-pkt srv))) -;;; (conc (alist-ref 'ipaddr p) ":" (alist-ref 'port p) -;;; "(" (srvstat-numfiles srv)","(srvstat-randnum srv)")"))) -;;; sorted)) -;;; best)))))) -;;; -;;; ;; send out an "I'm about to exit notice to all known servers" -;;; ;; -;;; (define (death-imminent acfg) -;;; '()) -;;; -;;; ;;====================================================================== -;;; ;; U L E X - T H E I N T E R E S T I N G S T U F F ! ! -;;; ;;====================================================================== -;;; -;;; ;; register a handler -;;; ;; NOTES: -;;; ;; dbinitsql is reserved for a list of sql statements for initializing the db -;;; ;; dbinitfn is reserved for a db init function, if exists called after dbinitsql -;;; ;; -;;; (define (register acfg key obj #!optional (ctype 'dbwrite)) -;;; (let ((ht (area-rtable acfg))) -;;; (if (hash-table-exists? ht key) -;;; (print "WARNING: redefinition of entry " key)) -;;; (hash-table-set! ht key (make-calldat obj: obj ctype: ctype)))) -;;; -;;; ;; usage: register-batch acfg '((key1 . sql1) (key2 . sql2) ... ) -;;; ;; NB// obj is often an sql query -;;; ;; -;;; (define (register-batch acfg ctype data) -;;; (let ((ht (area-rtable acfg))) -;;; (map (lambda (dat) -;;; (hash-table-set! ht (car dat)(make-calldat obj: (cdr dat) ctype: ctype))) -;;; data))) -;;; -;;; (define (initialize-area-calls-from-specfile area specfile) -;;; (let* ((callspec (with-input-from-file specfile read ))) -;;; (for-each (lambda (group) -;;; (register-batch -;;; area -;;; (car group) -;;; (cdr group))) -;;; callspec))) -;;; -;;; ;; get-rentry -;;; ;; -;;; (define (get-rentry acfg key) -;;; (hash-table-ref/default (area-rtable acfg) key #f)) -;;; -;;; (define (get-rsql acfg key) -;;; (let ((cdat (get-rentry acfg key))) -;;; (if cdat -;;; (calldat-obj cdat) -;;; #f))) -;;; -;;; -;;; -;;; ;; blocking call: -;;; ;; client server -;;; ;; ------ ------ -;;; ;; call() -;;; ;; send-message() -;;; ;; nmsg-send() -;;; ;; nmsg-receive() -;;; ;; nmsg-respond(ack,cookie) -;;; ;; ack, cookie -;;; ;; mbox-thread-wait(cookie) -;;; ;; nmsg-send(client,cookie,result) -;;; ;; nmsg-respond(ack) -;;; ;; return result -;;; ;; -;;; ;; reserved action: -;;; ;; 'immediate -;;; ;; 'dbinitsql -;;; ;; -;;; (define (call acfg dbname action params #!optional (count 0)) -;;; (let* ((call-start-time (current-milliseconds)) -;;; (srv (get-best-server acfg dbname action)) -;;; (post-get-start-time (current-milliseconds)) -;;; (rdat (hash-table-ref/default (area-rtable acfg) action #f)) -;;; (myid (trim-pktid (area-pktid acfg))) -;;; (srvid (trim-pktid (alist-ref 'Z srv))) -;;; (cookie (make-cookie myid))) -;;; (sdbg> "call" "get-best-server" call-start-time #f call-start-time " from: " myid " to server: " srvid " for " dbname " action: " action " params: " params " rdat: " rdat) -;;; (print "INFO: call to " (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv) " from " (area-myaddr acfg) ":" (area-port acfg) " for " dbname) -;;; (if (and srv rdat) ;; need both to dispatch a request -;;; (let* ((ripaddr (alist-ref 'ipaddr srv)) -;;; (rsrvid (alist-ref 'Z srv)) -;;; (rport (any->number (alist-ref 'port srv))) -;;; (res-full (if (and (equal? ripaddr (area-myaddr acfg)) -;;; (equal? rport (area-port acfg))) -;;; (request acfg ripaddr rport (area-pktid acfg) action cookie dbname params) -;;; (safe-call 'request ripaddr rport -;;; (area-myaddr acfg) -;;; (area-port acfg) -;;; #;(area-pktid acfg) -;;; rsrvid -;;; action cookie dbname params)))) -;;; ;; (print "res-full: " res-full) -;;; (match res-full -;;; ((response-ok response-msg rem ...) -;;; (let* ((send-message-time (current-milliseconds)) -;;; ;; (match res-full -;;; ;; ((response-ok response-msg) -;;; ;; (response-ok (car res-full)) -;;; ;; (response-msg (cadr res-full) -;;; ) -;;; ;; (res (take res-full 3))) ;; ctype == action, TODO: converge on one term <<=== what was this? BUG -;;; ;; (print "ulex:call: send-message took " (- send-message-time post-get-start-time) " ms params=" params) -;;; (sdbg> "call" "send-message" post-get-start-time #f call-start-time) -;;; (cond -;;; ((not response-ok) #f) -;;; ((member response-msg '("db read submitted" "db write submitted")) -;;; (let* ((cookie-id (cadddr res-full)) -;;; (mbox (make-mailbox)) -;;; (mbox-time (current-milliseconds))) -;;; (hash-table-set! (area-cookie2mbox acfg) cookie-id mbox) -;;; (let* ((mbox-timeout-secs 20) -;;; (mbox-timeout-result 'MBOX_TIMEOUT) -;;; (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) -;;; (mbox-receive-time (current-milliseconds))) -;;; (hash-table-delete! (area-cookie2mbox acfg) cookie-id) -;;; (sdbg> "call" "mailbox-receive" mbox-time #f call-start-time " from: " myid " to server: " srvid " for " dbname) -;;; ;; (print "ulex:call mailbox-receive took " (- mbox-receive-time mbox-time) "ms params=" params) -;;; res))) -;;; (else -;;; (print "Unhandled response \""response-msg"\"") -;;; #f)) -;;; ;; depending on what action (i.e. ctype) is we will block here waiting for -;;; ;; all the data (mechanism to be determined) -;;; ;; -;;; ;; if res is a "working on it" then wait -;;; ;; wait for result -;;; ;; mailbox thread wait on -;;; -;;; ;; if res is a "can't help you" then try a different server -;;; ;; if res is a "ack" (e.g. for one-shot requests) then return res -;;; )) -;;; (else -;;; (if (< count 10) -;;; (let* ((url (conc (alist-ref 'ipaddr srv) ":" (alist-ref 'port srv)))) -;;; (thread-sleep! 1) -;;; (print "ERROR: Bad result from " url ", dbname: " dbname ", action: " action ", params: " params ". Trying again in 1 second.") -;;; (call acfg dbname action params (+ count 1))) -;;; (begin -;;; (error (conc "ERROR: " count " tries, still have improper response res-full=" res-full))))))) -;;; (begin -;;; (if (not rdat) -;;; (print "ERROR: action " action " not registered.") -;;; (if (< count 10) -;;; (begin -;;; (thread-sleep! 1) -;;; (area-hosts-set! acfg (make-hash-table)) ;; clear out all known hosts -;;; (print "ERROR: no server found, srv=" srv ", trying again in 1 seconds") -;;; (call acfg dbname action params (+ count 1))) -;;; (begin -;;; (error (conc "ERROR: no server found after 10 tries, srv=" srv ", giving up.")) -;;; #;(error "No server available")))))))) -;;; -;;; -;;; ;;====================================================================== -;;; ;; U T I L I T I E S -;;; ;;====================================================================== -;;; -;;; ;; get a signature for identifing this process -;;; ;; -;;; (define (get-process-signature) -;;; (cons (get-host-name)(current-process-id))) -;;; -;;; ;;====================================================================== -;;; ;; S Y S T E M S T U F F -;;; ;;====================================================================== -;;; -;;; ;; get normalized cpu load by reading from /proc/loadavg and -;;; ;; /proc/cpuinfo return all three values and the number of real cpus -;;; ;; and the number of threads returns alist '((adj-cpu-load -;;; ;; . normalized-proc-load) ... etc. keys: adj-proc-load, -;;; ;; adj-core-load, 1m-load, 5m-load, 15m-load -;;; ;; -;;; (define (get-normalized-cpu-load) -;;; (let ((res (get-normalized-cpu-load-raw)) -;;; (default `((adj-proc-load . 2) ;; there is no right answer -;;; (adj-core-load . 2) -;;; (1m-load . 2) -;;; (5m-load . 0) ;; causes a large delta - thus causing default of throttling if stuff goes wrong -;;; (15m-load . 0) -;;; (proc . 1) -;;; (core . 1) -;;; (phys . 1) -;;; (error . #t)))) -;;; (cond -;;; ((and (list? res) -;;; (> (length res) 2)) -;;; res) -;;; ((eq? res #f) default) ;; add messages? -;;; ((eq? res #f) default) ;; this would be the #eof -;;; (else default)))) -;;; -;;; (define (get-normalized-cpu-load-raw) -;;; (let* ((actual-host (get-host-name))) ;; #f is localhost -;;; (let ((data (append -;;; (with-input-from-file "/proc/loadavg" read-lines) -;;; (with-input-from-file "/proc/cpuinfo" read-lines) -;;; (list "end"))) -;;; (load-rx (regexp "^([\\d\\.]+)\\s+([\\d\\.]+)\\s+([\\d\\.]+)\\s+.*$")) -;;; (proc-rx (regexp "^processor\\s+:\\s+(\\d+)\\s*$")) -;;; (core-rx (regexp "^core id\\s+:\\s+(\\d+)\\s*$")) -;;; (phys-rx (regexp "^physical id\\s+:\\s+(\\d+)\\s*$")) -;;; (max-num (lambda (p n)(max (string->number p) n)))) -;;; ;; (print "data=" data) -;;; (if (null? data) ;; something went wrong -;;; #f -;;; (let loop ((hed (car data)) -;;; (tal (cdr data)) -;;; (loads #f) -;;; (proc-num 0) ;; processor includes threads -;;; (phys-num 0) ;; physical chip on motherboard -;;; (core-num 0)) ;; core -;;; ;; (print hed ", " loads ", " proc-num ", " phys-num ", " core-num) -;;; (if (null? tal) ;; have all our data, calculate normalized load and return result -;;; (let* ((act-proc (+ proc-num 1)) -;;; (act-phys (+ phys-num 1)) -;;; (act-core (+ core-num 1)) -;;; (adj-proc-load (/ (car loads) act-proc)) -;;; (adj-core-load (/ (car loads) act-core)) -;;; (result -;;; (append (list (cons 'adj-proc-load adj-proc-load) -;;; (cons 'adj-core-load adj-core-load)) -;;; (list (cons '1m-load (car loads)) -;;; (cons '5m-load (cadr loads)) -;;; (cons '15m-load (caddr loads))) -;;; (list (cons 'proc act-proc) -;;; (cons 'core act-core) -;;; (cons 'phys act-phys))))) -;;; result) -;;; (regex-case -;;; hed -;;; (load-rx ( x l1 l5 l15 ) (loop (car tal)(cdr tal)(map string->number (list l1 l5 l15)) proc-num phys-num core-num)) -;;; (proc-rx ( x p ) (loop (car tal)(cdr tal) loads (max-num p proc-num) phys-num core-num)) -;;; (phys-rx ( x p ) (loop (car tal)(cdr tal) loads proc-num (max-num p phys-num) core-num)) -;;; (core-rx ( x c ) (loop (car tal)(cdr tal) loads proc-num phys-num (max-num c core-num))) -;;; (else -;;; (begin -;;; ;; (print "NO MATCH: " hed) -;;; (loop (car tal)(cdr tal) loads proc-num phys-num core-num)))))))))) -;;; -;;; (define (get-host-stats acfg) -;;; (let ((stats-hash (area-stats acfg))) -;;; ;; use this opportunity to remove references to dbfiles which have not been accessed in a while -;;; (for-each -;;; (lambda (dbname) -;;; (let* ((stats (hash-table-ref stats-hash dbname)) -;;; (last-access (stat-when stats))) -;;; (if (and (> last-access 0) ;; if zero then there has been no access -;;; (> (- (current-seconds) last-access) 10)) ;; not used in ten seconds -;;; (begin -;;; (print "Removing " dbname " from stats list") -;;; (hash-table-delete! stats-hash dbname) ;; remove from stats hash -;;; (stat-dbs-set! stats (hash-table-keys stats)))))) -;;; (hash-table-keys stats-hash)) -;;; -;;; `(,(hash-table->alist (area-dbs acfg)) ;; dbname => randnum -;;; ,(map (lambda (dbname) ;; dbname is the db name -;;; (cons dbname (stat-when (hash-table-ref stats-hash dbname)))) -;;; (hash-table-keys stats-hash)) -;;; (cpuload . ,(get-normalized-cpu-load))))) -;;; #;(stats . ,(map (lambda (k) ;; create an alist from the stats data -;;; (cons k (stat->alist (hash-table-ref (area-stats acfg) k)))) -;;; (hash-table-keys (area-stats acfg)))) -;;; -;;; #;(trace -;;; ;; assv -;;; ;; cdr -;;; ;; caar -;;; ;; ;; cdr -;;; ;; call -;;; ;; finalize-all-db-handles -;;; ;; get-all-server-pkts -;;; ;; get-normalized-cpu-load -;;; ;; get-normalized-cpu-load-raw -;;; ;; launch -;;; ;; nmsg-send -;;; ;; process-db-queries -;;; ;; receive-message -;;; ;; std-peer-handler -;;; ;; update-known-servers -;;; ;; work-queue-processor -;;; ) -;;; -;;; ;;====================================================================== -;;; ;; netutil -;;; ;; move this back to ulex-netutil.scm someday? -;;; ;;====================================================================== -;;; -;;; ;; #include -;;; ;; #include -;;; ;; #include -;;; ;; #include -;;; -;;; (foreign-declare "#include \"sys/types.h\"") -;;; (foreign-declare "#include \"sys/socket.h\"") -;;; (foreign-declare "#include \"ifaddrs.h\"") -;;; (foreign-declare "#include \"arpa/inet.h\"") -;;; -;;; ;; get IP addresses from ALL interfaces -;;; (define get-all-ips -;;; (foreign-safe-lambda* scheme-object () -;;; " -;;; -;;; // from https://stackoverflow.com/questions/17909401/linux-c-get-default-interfaces-ip-address : -;;; -;;; -;;; C_word lst = C_SCHEME_END_OF_LIST, len, str, *a; -;;; // struct ifaddrs *ifa, *i; -;;; // struct sockaddr *sa; -;;; -;;; struct ifaddrs * ifAddrStruct = NULL; -;;; struct ifaddrs * ifa = NULL; -;;; void * tmpAddrPtr = NULL; -;;; -;;; if ( getifaddrs(&ifAddrStruct) != 0) -;;; C_return(C_SCHEME_FALSE); -;;; -;;; // for (i = ifa; i != NULL; i = i->ifa_next) { -;;; for (ifa = ifAddrStruct; ifa != NULL; ifa = ifa->ifa_next) { -;;; if (ifa->ifa_addr->sa_family==AF_INET) { // Check it is -;;; // a valid IPv4 address -;;; tmpAddrPtr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr; -;;; char addressBuffer[INET_ADDRSTRLEN]; -;;; inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN); -;;; // printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer); -;;; len = strlen(addressBuffer); -;;; a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len)); -;;; str = C_string(&a, len, addressBuffer); -;;; lst = C_a_pair(&a, str, lst); -;;; } -;;; -;;; // else if (ifa->ifa_addr->sa_family==AF_INET6) { // Check it is -;;; // // a valid IPv6 address -;;; // tmpAddrPtr = &((struct sockaddr_in6 *)ifa->ifa_addr)->sin6_addr; -;;; // char addressBuffer[INET6_ADDRSTRLEN]; -;;; // inet_ntop(AF_INET6, tmpAddrPtr, addressBuffer, INET6_ADDRSTRLEN); -;;; //// printf(\"%s IP Address %s\\n\", ifa->ifa_name, addressBuffer); -;;; // len = strlen(addressBuffer); -;;; // a = C_alloc(C_SIZEOF_PAIR + C_SIZEOF_STRING(len)); -;;; // str = C_string(&a, len, addressBuffer); -;;; // lst = C_a_pair(&a, str, lst); -;;; // } -;;; -;;; // else { -;;; // printf(\" not an IPv4 address\\n\"); -;;; // } -;;; -;;; } -;;; -;;; freeifaddrs(ifa); -;;; C_return(lst); -;;; -;;; ")) -;;; -;;; ;; Change this to bias for addresses with a reasonable broadcast value? -;;; ;; -;;; (define (ip-pref-less? a b) -;;; (let* ((rate (lambda (ipstr) -;;; (regex-case ipstr -;;; ( "^127\\." _ 0 ) -;;; ( "^(10\\.0|192\\.168\\.)\\..*" _ 1 ) -;;; ( else 2 ) )))) -;;; (< (rate a) (rate b)))) -;;; -;;; -;;; (define (get-my-best-address) -;;; (let ((all-my-addresses (get-all-ips)) -;;; ;;(all-my-addresses-old (vector->list (hostinfo-addresses (hostname->hostinfo (get-host-name))))) -;;; ) -;;; (cond -;;; ((null? all-my-addresses) -;;; (get-host-name)) ;; no interfaces? -;;; ((eq? (length all-my-addresses) 1) -;;; (car all-my-addresses)) ;; only one to choose from, just go with it -;;; -;;; (else -;;; (car (sort all-my-addresses ip-pref-less?))) -;;; ;; (else -;;; ;; (ip->string (car (filter (lambda (x) ;; take any but 127. -;;; ;; (not (eq? (u8vector-ref x 0) 127))) -;;; ;; all-my-addresses)))) -;;; -;;; ))) -;;; -;;; (define (get-all-ips-sorted) -;;; (sort (get-all-ips) ip-pref-less?)) -;;; -;;; - + + +;; below was to enable re-use of connections. This seems non-trivial so for +;; now lets open on each call +;; +;; ;; given host-port get or create peer struct +;; ;; +;; (define (udat-get-peer uconn host-port) +;; (or (hash-table-ref/default (udat-peers uconn) host-port #f) +;; ;; no peer, so create pdat and init it +;; +;; ;; NEED stack of connections, pop and use; inp, oup, +;; ;; creation_time (remove and create new if over 24hrs old +;; ;; +;; (let ((pdat (make-pdat host-port: host-port))) +;; (hash-table-set! (udat-peers uconn) host-port pdat) +;; pdat))) +;; +;; ;; is pcon alive +;; +;; ;; given host-port and pdat get a pcon +;; ;; +;; (define (pdat-get-pcon pdat host-port) +;; (let loop ((conns (pdat-conns pdat))) +;; (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later +;; (init-pcon (make-pcon)) +;; (let* ((conn (pop conns))) +;; +;; ;; given host-port get a pcon struct +;; ;; +;; (define (udat-get-pcon + +;;====================================================================== +;; misc utils +;;====================================================================== + +(define (make-cookie uconn) + (let ((newcnum (+ (udat-cnum uconn) 1))) + (udat-cnum-set! uconn newcnum) + (conc (udat-my-address uconn) ":" + (udat-my-port uconn) "-" + newcnum))) + +;;====================================================================== +;; network utilities +;;====================================================================== + +;; NOTE: Look at address-info egg as alternative to some of this + +(define (rate-ip ipaddr) + (regex-case ipaddr + ( "^127\\..*" _ 0 ) + ( "^(10\\.0|192\\.168)\\..*" _ 1 ) + ( else 2 ) )) + +;; Change this to bias for addresses with a reasonable broadcast value? +;; +(define (ip-pref-less? a b) + (> (rate-ip a) (rate-ip b))) + +(define (get-my-best-address) + (let ((all-my-addresses (get-all-ips))) + (cond + ((null? all-my-addresses) + (get-host-name)) ;; no interfaces? + ((eq? (length all-my-addresses) 1) + (car all-my-addresses)) ;; only one to choose from, just go with it + (else + (car (sort all-my-addresses ip-pref-less?)))))) + +(define (get-all-ips-sorted) + (sort (get-all-ips) ip-pref-less?)) + +(define (get-all-ips) + (map ip->string (vector->list + (hostinfo-addresses + (host-information (current-hostname)))))) + +)