ADDED attic/configure
Index: attic/configure
==================================================================
--- /dev/null
+++ attic/configure
@@ -0,0 +1,101 @@
+#!/bin/bash
+
+# Copyright 2006-2017, Matthew Welland.
+#
+# This file is part of Megatest.
+#
+# Megatest is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Megatest is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Megatest. If not, see .
+
+# Configure the build
+
+if [[ "$1"x == "x" ]];then
+ PREFIX=$PWD
+else
+ PREFIX=$1
+fi
+
+
+#======================================================================
+# Configure stuff needed for eggs
+#======================================================================
+
+function configure_dependencies () {
+
+ #======================================================================
+ # libnanomsg
+ #======================================================================
+
+ if [[ ! $(ls /usr/lib/*/libnanomsg*) ]];then
+ echo "libnanomsg build needed."
+ echo "BUILD_NANOMSG=yes" >> makefile.inc
+ fi
+
+ #======================================================================
+ # postgresql libraries
+ #======================================================================
+
+ if [[ ! $(ls /usr/lib/*/libpq.*) ]];then
+ echo "Postgresql build needed."
+ echo "BUILD_POSTGRES=yes" >> makefile.inc
+ fi
+
+ if [[ ! $(ls /usr/lib/*/libsqlite3.*) ]];then
+ echo "Sqlite3 build needed."
+ echo "BUILD_SQLITE3=yes" >> makefile.inc
+ fi
+
+}
+
+#======================================================================
+# Initialize makefile.inc
+#======================================================================
+
+echo "" > makefile.inc
+
+#======================================================================
+# Do we need Chicken?
+#======================================================================
+
+if [[ -e /usr/bin/sw_vers ]]; then
+ ARCHSTR=$(/usr/bin/sw_vers -productVersion)
+else
+ ARCHSTR=$(lsb_release -sr)
+fi
+
+echo "CHICKEN_PREFIX=$PREFIX/.$ARCHSTR" >> makefile.inc
+CHICKEN_PREFIX=$PREFIX/bin/.$ARCHSTR
+
+if [[ ! $(type csi) ]];then
+ echo "Chicken build needed."
+ echo "BUILD_CHICKEN=yes" >> makefile.inc
+ configure_dependencies
+ echo "include chicken.makefile" >> makefile.inc
+else
+ echo "CSIPATH=$(which csi)" >> makefile.inc
+ CSIPATH=$(which csi)
+ echo "CKPATH=$(dirname $(dirname $CSIPATH))" >> makefile.inc
+fi
+
+# Make setup scripts
+echo "#!/bin/bash" > setup.sh
+echo "export PATH=$CHICKEN_PREFIX/bin:\$PATH" >> setup.sh
+echo "export LD_LIBRARY_PATH=$CHICKEN_PREFIX/lib" >> setup.sh
+echo 'exec "$@"' >> setup.sh
+chmod a+x setup.sh
+
+echo "setenv PATH $CHICKEN_PREFIX/bin:\$PATH" > setup.csh
+echo "setenv LD_LIBRARY_PATH $CHICKEN_PREFIX/lib" >> setup.csh
+
+echo "All done creating makefile.inc, feel free to edit it!"
+echo "run \"setup.sh bash\" or source setup.csh to get PATH and LD_LIBRARY_PATH adjusted"
ADDED ulex/dbmgr.scm
Index: ulex/dbmgr.scm
==================================================================
--- /dev/null
+++ ulex/dbmgr.scm
@@ -0,0 +1,1131 @@
+;;======================================================================
+;; Copyright 2022, Matthew Welland.
+;;
+;; This file is part of Megatest.
+;;
+;; Megatest is free software: you can redistribute it and/or modify
+;; it under the terms of the GNU General Public License as published by
+;; the Free Software Foundation, either version 3 of the License, or
+;; (at your option) any later version.
+;;
+;; Megatest is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;; GNU General Public License for more details.
+;;
+;; You should have received a copy of the GNU General Public License
+;; along with Megatest. If not, see .
+
+;;======================================================================
+
+(declare (unit dbmgrmod))
+
+(declare (uses ulex))
+(declare (uses apimod))
+(declare (uses pkts))
+(declare (uses commonmod))
+(declare (uses dbmod))
+(declare (uses mtargs))
+(declare (uses portloggermod))
+(declare (uses debugprint))
+
+(module dbmgrmod
+ *
+
+(import scheme
+ chicken.base
+ chicken.condition
+ chicken.file
+ chicken.format
+ chicken.port
+ chicken.process
+ chicken.process-context
+ chicken.process-context.posix
+ chicken.sort
+ chicken.string
+ chicken.time
+
+ (prefix sqlite3 sqlite3:)
+ matchable
+ md5
+ message-digest
+ regex
+ s11n
+ srfi-1
+ srfi-18
+ srfi-69
+ system-information
+ typed-records
+
+ pkts
+ ulex
+
+ commonmod
+ apimod
+ dbmod
+ debugprint
+ (prefix mtargs args:)
+ portloggermod
+ )
+
+;; Configurations for server
+;; (tcp-buffer-size 2048)
+;; (max-connections 2048)
+
+;; info about me as a listener and my connections to db servers
+;; stored (for now) in *db-serv-info*
+;;
+(defstruct servdat
+ (host #f)
+ (port #f)
+ (uuid #f)
+ (dbfile #f)
+ (uconn #f) ;; this is the listener *FOR THIS PROCESS*
+ (mode #f)
+ (status 'starting)
+ (trynum 0) ;; count the number of ports we've tried
+ (conns (make-hash-table)) ;; apath/dbname => conndat
+ )
+
+(define *db-serv-info* (make-servdat))
+
+(define (servdat->url sdat)
+ (conc (servdat-host sdat)":"(servdat-port sdat)))
+
+;; db servers contact info
+;;
+(defstruct conndat
+ (apath #f)
+ (dbname #f)
+ (fullname #f)
+ (hostport #f)
+ (ipaddr #f)
+ (port #f)
+ (srvpkt #f)
+ (srvkey #f)
+ (lastmsg 0)
+ (expires 0))
+
+(define *srvpktspec*
+ `((server (host . h)
+ (port . p)
+ (servkey . k)
+ (pid . i)
+ (ipaddr . a)
+ (dbpath . d))))
+
+;;======================================================================
+;; S U P P O R T F U N C T I O N S
+;;======================================================================
+
+;; set up the api proc, seems like there should be a better place for this?
+;;
+;; IS THIS NEEDED ANYMORE? TODO - REMOVE IF POSSIBLE
+;;
+;; (define api-proc (make-parameter conc))
+;; (api-proc api:execute-requests)
+
+;; do we have a connection to apath dbname and
+;; is it not expired? then return it
+;;
+;; else setup a connection
+;;
+;; if that fails, return '(#f "some reason") ;; NB// convert to raising an exception
+;;
+(define (rmt:get-conn remdat apath dbname)
+ (let* ((fullname (db:dbname->path apath dbname)))
+ (hash-table-ref/default (servdat-conns remdat) fullname #f)))
+
+(define (rmt:drop-conn remdat apath dbname)
+ (let* ((fullname (db:dbname->path apath dbname)))
+ (hash-table-delete! (servdat-conns remdat) fullname)))
+
+(define (rmt:find-main-server uconn apath dbname)
+ (let* ((pktsdir (get-pkts-dir apath))
+ (all-srvpkts (get-all-server-pkts pktsdir *srvpktspec*))
+ (viable-srvs (get-viable-servers all-srvpkts dbname)))
+ (get-the-server uconn apath viable-srvs)))
+
+
+(define *connstart-mutex* (make-mutex))
+(define *last-main-start* 0)
+
+;; looks for a connection to main, returns if have and not exired
+;; creates new otherwise
+;;
+;; connections for other servers happens by requesting from main
+;;
+;; TODO: This is unnecessarily re-creating the record in the hash table
+;;
+(define (rmt:open-main-connection remdat apath)
+ (let* ((fullpath (db:dbname->path apath ".db/main.db"))
+ (conns (servdat-conns remdat))
+ (conn (rmt:get-conn remdat apath ".db/main.db")) ;; (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this
+ (start-rmt:run (lambda ()
+ (let* ((th1 (make-thread (lambda ()(rmt:run (get-host-name))) "non-db mode server")))
+ (thread-start! th1)
+ (thread-sleep! 1)
+ (let loop ((count 0))
+ (assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection")
+ (if (or (not *db-serv-info*)
+ (not (servdat-uconn *db-serv-info*)))
+ (begin
+ (thread-sleep! 1)
+ (loop (+ count 1)))
+ (begin
+ (servdat-mode-set! *db-serv-info* 'non-db)
+ (servdat-uconn *db-serv-info*)))))))
+ (myconn (servdat-uconn *db-serv-info*)))
+ (cond
+ ((not myconn)
+ (start-rmt:run)
+ (rmt:open-main-connection remdat apath))
+ ((and conn ;; conn is NOT a socket, just saying ...
+ (< (current-seconds) (conndat-expires conn)))
+ #t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died
+ ((and conn
+ (>= (current-seconds)(conndat-expires conn)))
+ (debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.")
+ (rmt:drop-conn remdat apath ".db/main.db") ;;
+ (rmt:open-main-connection remdat apath))
+ (else
+ ;; Below we will find or create and connect to main
+ (debug:print-info 0 *default-log-port* "rmt:open-main-connection - starting from scratch")
+ (let* ((dbname (db:run-id->dbname #f))
+ (the-srv (rmt:find-main-server myconn apath dbname))
+ (start-main-srv (lambda () ;; call IF there is no the-srv found
+ (mutex-lock! *connstart-mutex*)
+ (if (> (- (current-seconds) *last-main-start*) 5) ;; at least four seconds since last attempt to start main server
+ (begin
+ (api:run-server-process apath dbname)
+ (set! *last-main-start* (current-seconds))
+ (thread-sleep! 1))
+ (thread-sleep! 0.25))
+ (mutex-unlock! *connstart-mutex*)
+ (rmt:open-main-connection remdat apath) ;; TODO: Add limit to number of tries
+ )))
+ (if (not the-srv) ;; have server, try connecting to it
+ (start-main-srv)
+ (let* ((srv-addr (server-address the-srv)) ;; need serv
+ (ipaddr (alist-ref 'ipaddr the-srv))
+ (port (alist-ref 'port the-srv))
+ (srvkey (alist-ref 'servkey the-srv))
+ (fullpath (db:dbname->path apath dbname))
+
+ (new-the-srv (make-conndat
+ apath: apath
+ dbname: dbname
+ fullname: fullpath
+ hostport: srv-addr
+ ;; socket: (open-nn-connection srv-addr) - TODO - open ulex connection?
+ ipaddr: ipaddr
+ port: port
+ srvpkt: the-srv
+ srvkey: srvkey ;; generated by rmt:get-signature on the server side
+ lastmsg: (current-seconds)
+ expires: (+ (current-seconds)
+ (server:expiration-timeout)
+ -2) ;; this needs to be gathered during the ping
+ )))
+ (hash-table-set! conns fullpath new-the-srv)))
+ #t)))))
+
+;; NB// sinfo is a servdat struct
+;;
+(define (rmt:general-open-connection sinfo apath dbname #!key (num-tries 5))
+ (assert (not (equal? dbname ".db/main.db")) "ERROR: general-open-connection should never be called with main as the db")
+ (let* ((mdbname ".db/main.db") ;; (db:run-id->dbname #f)) TODO: put this back to the lookup when stable
+ (fullname (db:dbname->path apath dbname))
+ (conns (servdat-conns sinfo))
+ (mconn (rmt:get-conn sinfo apath ".db/main.db"))
+ (dconn (rmt:get-conn sinfo apath dbname)))
+ #;(if (and mconn
+ (not (debug:print-logger)))
+ (begin
+ (debug:print-info 0 *default-log-port* "Turning on logging to main, look in logs dir for main log.")
+ (debug:print-logger rmt:log-to-main)))
+ (cond
+ ((and mconn
+ dconn
+ (< (current-seconds)(conndat-expires dconn)))
+ #t) ;; good to go
+ ((not mconn) ;; no channel open to main? open it...
+ (rmt:open-main-connection sinfo apath)
+ (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1)))
+ ((not dconn) ;; no channel open to dbname?
+ (let* ((res (rmt:send-receive-real sinfo apath mdbname 'get-server `(,apath ,dbname))))
+ (case res
+ ((server-started)
+ (if (> num-tries 0)
+ (begin
+ (thread-sleep! 2)
+ (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1)))
+ (begin
+ (debug:print-error 0 *default-log-port* "Failed to start servers needed or open channel to "apath", "dbname)
+ (exit 1))))
+ (else
+ (if (list? res) ;; server has been registered and the info was returned. pass it on.
+ (begin ;; ("192.168.0.9" 53817
+ ;; "5e34239f48e8973b3813221e54701a01" "24310"
+ ;; "192.168.0.9"
+ ;; "/home/matt/data/megatest/tests/simplerun"
+ ;; ".db/1.db")
+ (match
+ res
+ ((host port servkey pid ipaddr apath dbname)
+ (debug:print-info 0 *default-log-port* "got "res)
+ (hash-table-set! conns
+ fullname
+ (make-conndat
+ apath: apath
+ dbname: dbname
+ hostport: (conc host":"port)
+ ;; socket: (open-nn-connection (conc host":"port)) ;; TODO - open ulex connection?
+ ipaddr: ipaddr
+ port: port
+ srvkey: servkey
+ lastmsg: (current-seconds)
+ expires: (+ (current-seconds)
+ (server:expiration-timeout)
+ -2))))
+ (else
+ (debug:print-info 0 *default-log-port* "return data from starting server did not match host port servkey pid ipaddr apath dbname " res)))
+ res)
+ (begin
+ (debug:print-info 0 *default-log-port* "Unexpected result: " res)
+ res)))))))
+ #t))
+
+;;======================================================================
+
+;; FOR DEBUGGING SET TO #t
+;; (define *localmode* #t)
+(define *localmode* #f)
+(define *dbstruct* (make-dbr:dbstruct))
+
+;; Defaults to current area
+;;
+(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f))
+ (let* ((apath *toppath*)
+ (sinfo *db-serv-info*)
+ (dbname (db:run-id->dbname rid)))
+ (if *localmode*
+ (api:execute-requests *dbstruct* cmd params)
+ (begin
+ (rmt:open-main-connection sinfo apath)
+ (if rid (rmt:general-open-connection sinfo apath dbname))
+ #;(if (not (member cmd '(log-to-main)))
+ (debug:print-info 0 *default-log-port* "rmt:send-receive "cmd" params="params))
+ (rmt:send-receive-real sinfo apath dbname cmd params)))))
+
+;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
+;; sometime in the future
+;;
+(define (rmt:send-receive-real sinfo apath dbname cmd params)
+ (assert (not (eq? 'primordial (thread-name (current-thread)))) "FATAL: Do not call rmt:send-receive-real in the primodial thread.")
+ (let* ((cdat (rmt:get-conn sinfo apath dbname)))
+ (assert cdat "FATAL: rmt:send-receive-real called without the needed channels opened")
+ (let* ((uconn (servdat-uconn sinfo)) ;; get the interface to ulex
+ ;; then send-receive using the ulex layer to host-port stored in cdat
+ (res (send-receive uconn (conndat-hostport cdat) cmd params))
+ #;(th1 (make-thread (lambda ()
+ (set! res (send-receive uconn (conndat-hostport cdat) cmd params)))
+ "send-receive thread")))
+ ;; (thread-start! th1)
+ ;; (thread-join! th1) ;; gratuitious thread stuff is so that mailbox is not used in primordial thead
+ ;; since we accessed the server we can bump the expires time up
+ (conndat-expires-set! cdat (+ (current-seconds)
+ (server:expiration-timeout)
+ -2)) ;; two second margin for network time misalignments etc.
+ res)))
+
+;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
+;; sometime in the future.
+;;
+;; Purpose - call the main.db server and request a server be started
+;; for the given area path and dbname
+;;
+
+(define (rmt:print-db-stats)
+ (let ((fmtstr "~40a~7-d~9-d~20,2-f")) ;; "~20,2-f"
+ (debug:print 18 *default-log-port* "DB Stats, "(seconds->year-week/day-time (current-seconds))"\n=====================")
+ (debug:print 18 *default-log-port* (format #f "~40a~8a~10a~10a" "Cmd" "Count" "TotTime" "Avg"))
+ (for-each (lambda (cmd)
+ (let ((cmd-dat (hash-table-ref *db-stats* cmd)))
+ (debug:print 18 *default-log-port* (format #f fmtstr cmd (vector-ref cmd-dat 0) (vector-ref cmd-dat 1) (/ (vector-ref cmd-dat 1)(vector-ref cmd-dat 0))))))
+ (sort (hash-table-keys *db-stats*)
+ (lambda (a b)
+ (> (vector-ref (hash-table-ref *db-stats* a) 0)
+ (vector-ref (hash-table-ref *db-stats* b) 0)))))))
+
+(define (rmt:get-max-query-average run-id)
+ (mutex-lock! *db-stats-mutex*)
+ (let* ((runkey (conc "run-id=" run-id " "))
+ (cmds (filter (lambda (x)
+ (substring-index runkey x))
+ (hash-table-keys *db-stats*)))
+ (res (if (null? cmds)
+ (cons 'none 0)
+ (let loop ((cmd (car cmds))
+ (tal (cdr cmds))
+ (max-cmd (car cmds))
+ (res 0))
+ (let* ((cmd-dat (hash-table-ref *db-stats* cmd))
+ (tot (vector-ref cmd-dat 0))
+ (curravg (/ (vector-ref cmd-dat 1) (vector-ref cmd-dat 0))) ;; count is never zero by construction
+ (currmax (max res curravg))
+ (newmax-cmd (if (> curravg res) cmd max-cmd)))
+ (if (null? tal)
+ (if (> tot 10)
+ (cons newmax-cmd currmax)
+ (cons 'none 0))
+ (loop (car tal)(cdr tal) newmax-cmd currmax)))))))
+ (mutex-unlock! *db-stats-mutex*)
+ res))
+
+;; host and port are used to ensure we are remove proper records
+(define (rmt:server-shutdown host port)
+ (let ((dbfile (servdat-dbfile *db-serv-info*)))
+ (debug:print-info 0 *default-log-port* "dbfile is "dbfile)
+ (if dbfile
+ (let* ((am-server (args:get-arg "-server"))
+ (dbfile (args:get-arg "-db"))
+ (apath *toppath*)
+ #;(sinfo *remotedat*)) ;; foundation for future fix
+ (if *dbstruct-db*
+ (let* ((dbdat (db:get-dbdat *dbstruct-db* apath dbfile))
+ (db (dbr:dbdat-db dbdat))
+ (inmem (dbr:dbdat-db dbdat)) ;; WRONG
+ )
+ ;; do a final sync here
+ (debug:print-info 0 *default-log-port* "Doing final sync for "apath" "dbfile" at "(current-seconds))
+ (db:sync-inmem->disk *dbstruct-db* apath dbfile force-sync: #t)
+ ;; let's finalize here
+ (debug:print-info 0 *default-log-port* "Finalizing db and inmem")
+ (if (sqlite3:database? db)
+ (sqlite3:finalize! db)
+ (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, db is not a database, not finalizing..."))
+ (if (sqlite3:database? inmem)
+ (sqlite3:finalize! inmem)
+ (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, inmem is not a database, not finalizing..."))
+ (debug:print-info 0 *default-log-port* "Finalizing db and inmem complete"))
+ (debug:print-info 0 *default-log-port* "Db was never opened, no cleanup to do."))
+ (if (not am-server)
+ (debug:print-info 0 *default-log-port* "I am not a server, should NOT get here!")
+ (if (string-match ".*/main.db$" dbfile)
+ (let ((pkt-file (conc (get-pkts-dir *toppath*)
+ "/" (servdat-uuid *db-serv-info*)
+ ".pkt")))
+ (debug:print-info 0 *default-log-port* "removing pkt "pkt-file)
+ (delete-file* pkt-file)
+ (debug:print-info 0 *default-log-port* "Releasing lock (if any) for "dbfile ", host "host", port "port)
+ (db:with-lock-db
+ (servdat-dbfile *db-serv-info*)
+ (lambda (dbh dbfile)
+ (db:release-lock dbh dbfile host port)))) ;; I'm not the server - should not have a lock to remove
+ (let* ((sdat *db-serv-info*) ;; we have a run-id server
+ (host (servdat-host sdat))
+ (port (servdat-port sdat))
+ (uuid (servdat-uuid sdat))
+ (res (rmt:deregister-server *db-serv-info* *toppath* host port uuid dbfile)))
+ (debug:print-info 0 *default-log-port* "deregistered-server, res="res)
+ (debug:print-info 0 *default-log-port* "deregistering server "host":"port" with uuid "uuid)
+ )))))))
+
+
+(define (common:run-sync?)
+ ;; (and (common:on-homehost?)
+ (args:get-arg "-server"))
+
+(define *rmt:run-mutex* (make-mutex))
+(define *rmt:run-flag* #f)
+
+;; Main entry point to start a server. was start-server
+(define (rmt:run hostn)
+ (mutex-lock! *rmt:run-mutex*)
+ (if *rmt:run-flag*
+ (begin
+ (debug:print-warn 0 *default-log-port* "rmt:run already running.")
+ (mutex-unlock! *rmt:run-mutex*))
+ (begin
+ (set! *rmt:run-flag* #t)
+ (mutex-unlock! *rmt:run-mutex*)
+ ;; ;; Configurations for server
+ ;; (tcp-buffer-size 2048)
+ ;; (max-connections 2048)
+ (debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...")
+ (if (and *db-serv-info*
+ (servdat-uconn *db-serv-info*))
+ (let* ((uconn (servdat-uconn *db-serv-info*)))
+ (wait-and-close uconn))
+ (let* ((port (portlogger:open-run-close portlogger:find-port))
+ (handler-proc (lambda (rem-host-port qrykey cmd params) ;;
+ (set! *db-last-access* (current-seconds))
+ (assert (list? params) "FATAL: handler called with non-list params")
+ (assert (args:get-arg "-server") "FATAL: handler called on non-server side. cmd="cmd", params="params)
+ (debug:print 0 *default-log-port* "handler call: "cmd", params="params)
+ (api:execute-requests *dbstruct-db* cmd params))))
+ ;; (api:process-request *dbstuct-db*
+ (if (not *db-serv-info*)
+ (set! *db-serv-info* (make-servdat host: hostn port: port)))
+ (let* ((uconn (run-listener handler-proc port))
+ (rport (udat-port uconn))) ;; the real port
+ (servdat-host-set! *db-serv-info* hostn)
+ (servdat-port-set! *db-serv-info* rport)
+ (servdat-uconn-set! *db-serv-info* uconn)
+ (wait-and-close uconn)
+ (db:print-current-query-stats)
+ )))
+ (let* ((host (servdat-host *db-serv-info*))
+ (port (servdat-port *db-serv-info*))
+ (mode (or (servdat-mode *db-serv-info*)
+ "non-db")))
+ ;; server exit stuff here
+ ;; (rmt:server-shutdown host port) - always do in on-exit
+ ;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit
+ (debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting")
+ ))))
+
+;;======================================================================
+;; S E R V E R U T I L I T I E S
+;;======================================================================
+
+
+;;======================================================================
+;; NEW SERVER METHOD
+;;======================================================================
+
+;; only use for main.db - need to re-write some of this :(
+;;
+(define (get-lock-db sdat dbfile host port)
+ (assert host "FATAL: get-lock-db called with host not set.")
+ (assert port "FATAL: get-lock-db called with port not set.")
+ (let* ((dbh (db:open-run-db dbfile db:initialize-db)) ;; open-run-db creates a standard db with schema used by all situations
+ (res (db:get-iam-server-lock dbh dbfile host port))
+ (uconn (servdat-uconn sdat)))
+ ;; res => list then already locked, check server is responsive
+ ;; => #t then sucessfully got the lock
+ ;; => #f reserved for future use as to indicate something went wrong
+ (match res
+ ((owner_pid owner_host owner_port event_time)
+ (if (server-ready? uconn (conc owner_host":"owner_port) "abc")
+ #f ;; locked by someone else
+ (begin ;; locked by someone dead and gone
+ (debug:print 0 *default-log-port* "WARNING: stale lock - have to steal it. This may fail.")
+ (db:steal-lock-db dbh dbfile port))))
+ (#t #t) ;; placeholder so that we don't touch res if it is #t
+ (else (set! res #f)))
+ (sqlite3:finalize! dbh)
+ res))
+
+
+(define (register-server pkts-dir pkt-spec host port servkey ipaddr dbpath)
+ (let* ((pkt-dat `((host . ,host)
+ (port . ,port)
+ (servkey . ,servkey)
+ (pid . ,(current-process-id))
+ (ipaddr . ,ipaddr)
+ (dbpath . ,dbpath)))
+ (uuid (write-alist->pkt
+ pkts-dir
+ pkt-dat
+ pktspec: pkt-spec
+ ptype: 'server)))
+ (debug:print 0 *default-log-port* "Server on "host":"port" registered in pkt "uuid)
+ uuid))
+
+(define (get-pkts-dir #!optional (apath #f))
+ (let* ((effective-toppath (or *toppath* apath)))
+ (assert effective-toppath
+ "ERROR: get-pkts-dir called without *toppath* set. Exiting.")
+ (let* ((pdir (conc effective-toppath "/.meta/srvpkts")))
+ (if (file-exists? pdir)
+ pdir
+ (begin
+ (handle-exceptions ;; this exception handler should NOT be needed but ...
+ exn
+ pdir
+ (create-directory pdir #t))
+ pdir)))))
+
+;; given a pkts dir read
+;;
+(define (get-all-server-pkts pktsdir-in pktspec)
+ (let* ((pktsdir (if (file-exists? pktsdir-in)
+ pktsdir-in
+ (begin
+ (create-directory pktsdir-in #t)
+ pktsdir-in)))
+ (all-pkt-files (glob (conc pktsdir "/*.pkt"))))
+ (map (lambda (pkt-file)
+ (read-pkt->alist pkt-file pktspec: pktspec))
+ all-pkt-files)))
+
+(define (server-address srv-pkt)
+ (conc (alist-ref 'host srv-pkt) ":"
+ (alist-ref 'port srv-pkt)))
+
+(define (server-ready? uconn host-port key) ;; server-address is host:port
+ (let* ((params `((cmd . ping)(key . ,key)))
+ (data `((cmd . ping)
+ (key . ,key)
+ (params . ,params))) ;; I don't get it.
+ (res (send-receive uconn host-port 'ping data)))
+ (if (eq? res 'ack) ;; yep, likely it is who we want on the other end
+ res
+ #f)))
+;; (begin (debug:print-info 0 *default-log-port* "server-ready? => "res) #f))))
+
+; from the pkts return servers associated with dbpath
+;; NOTE: Only one can be alive - have to check on each
+;; in the list of pkts returned
+;;
+(define (get-viable-servers serv-pkts dbpath)
+ (let loop ((tail serv-pkts)
+ (res '()))
+ (if (null? tail)
+ res ;; NOTE: sort by age so oldest is considered first
+ (let* ((spkt (car tail)))
+ (loop (cdr tail)
+ (if (equal? dbpath (alist-ref 'dbpath spkt))
+ (cons spkt res)
+ res))))))
+
+(define (remove-pkts-if-not-alive uconn serv-pkts)
+ (filter (lambda (pkt)
+ (let* ((host (alist-ref 'host pkt))
+ (port (alist-ref 'port pkt))
+ (host-port (conc host":"port))
+ (key (alist-ref 'servkey pkt))
+ (pktz (alist-ref 'Z pkt))
+ (res (server-ready? uconn host-port key)))
+ (if res
+ res
+ (let* ((pktsdir (get-pkts-dir *toppath*))
+ (pktpath (conc pktsdir"/"pktz".pkt")))
+ (debug:print 0 *default-log-port* "WARNING: pkt with no server "pktpath)
+ (delete-file* pktpath)
+ #f))))
+ serv-pkts))
+
+;; from viable servers get one that is alive and ready
+;;
+(define (get-the-server uconn apath serv-pkts)
+ (let loop ((tail serv-pkts))
+ (if (null? tail)
+ #f
+ (let* ((spkt (car tail))
+ (host (alist-ref 'ipaddr spkt))
+ (port (alist-ref 'port spkt))
+ (host-port (conc host":"port))
+ (dbpth (alist-ref 'dbpath spkt))
+ (srvkey (alist-ref 'Z spkt)) ;; (alist-ref 'srvkey spkt))
+ (addr (server-address spkt)))
+ (if (server-ready? uconn host-port srvkey)
+ spkt
+ (loop (cdr tail)))))))
+
+;; am I the "first" in line server? I.e. my D card is smallest
+;; use Z card as tie breaker
+;;
+(define (get-best-candidate serv-pkts dbpath)
+ (if (null? serv-pkts)
+ #f
+ (let loop ((tail serv-pkts)
+ (best (car serv-pkts)))
+ (if (null? tail)
+ best
+ (let* ((candidate (car tail))
+ (candidate-bd (string->number (alist-ref 'D candidate)))
+ (best-bd (string->number (alist-ref 'D best)))
+ ;; bigger number is younger
+ (candidate-z (alist-ref 'Z candidate))
+ (best-z (alist-ref 'Z best))
+ (new-best (cond
+ ((> best-bd candidate-bd) ;; best is younger than candidate
+ candidate)
+ ((< best-bd candidate-bd) ;; candidate is younger than best
+ best)
+ (else
+ (if (string>=? best-z candidate-z)
+ best
+ candidate))))) ;; use Z card as tie breaker
+ (if (null? tail)
+ new-best
+ (loop (cdr tail) new-best)))))))
+
+
+;;======================================================================
+;; END NEW SERVER METHOD
+;;======================================================================
+
+;; if .db/main.db check the pkts
+;;
+(define (rmt:wait-for-server pkts-dir db-file server-key)
+ (let* ((sdat *db-serv-info*))
+ (let loop ((start-time (current-seconds))
+ (changed #t)
+ (last-sdat "not this"))
+ (begin ;; let ((sdat #f))
+ (thread-sleep! 0.01)
+ (debug:print-info 0 *default-log-port* "Waiting for server alive signature")
+ (mutex-lock! *heartbeat-mutex*)
+ (set! sdat *db-serv-info*)
+ (mutex-unlock! *heartbeat-mutex*)
+ (if (and sdat
+ (not changed)
+ (> (- (current-seconds) start-time) 2))
+ (let* ((uconn (servdat-uconn sdat)))
+ (servdat-status-set! sdat 'iface-stable)
+ (debug:print-info 0 *default-log-port* "Received server alive signature, now attempting to lock in server")
+ ;; create a server pkt in *toppath*/.meta/srvpkts
+
+ ;; TODO:
+ ;; 1. change sdat to stuct
+ ;; 2. add uuid to struct
+ ;; 3. update uuid in sdat here
+ ;;
+ (servdat-uuid-set! sdat
+ (register-server
+ pkts-dir *srvpktspec*
+ (get-host-name)
+ (servdat-port sdat) server-key
+ (servdat-host sdat) db-file))
+ ;; (set! *my-signature* (servdat-uuid sdat)) ;; replace with Z, no, stick with proper key
+ ;; now read pkts and see if we are a contender
+ (let* ((all-pkts (get-all-server-pkts pkts-dir *srvpktspec*))
+ (viables (get-viable-servers all-pkts db-file))
+ (alive (remove-pkts-if-not-alive uconn viables))
+ (best-srv (get-best-candidate alive db-file))
+ (best-srv-key (if best-srv (alist-ref 'servkey best-srv) #f))
+ (i-am-srv (equal? best-srv-key server-key))
+ (delete-pkt (lambda ()
+ (let* ((pktfile (conc (get-pkts-dir *toppath*)
+ "/" (servdat-uuid *db-serv-info*)
+ ".pkt")))
+ (debug:print-info 0 *default-log-port* "Attempting to remove bogus pkt file "pktfile)
+ (delete-file* pktfile))))) ;; remove immediately instead of waiting for on-exit
+ (debug:print 0 *default-log-port* "best-srv-key: "best-srv-key", server-key: "server-key", i-am-srv: "i-am-srv)
+ ;; am I the best-srv, compare server-keys to know
+ (if i-am-srv
+ (if (get-lock-db sdat db-file (servdat-host sdat)(servdat-port sdat)) ;; (db:get-iam-server-lock *dbstruct-db* *toppath* run-id)
+ (begin
+ (debug:print-info 0 *default-log-port* "I'm the server!")
+ (servdat-dbfile-set! sdat db-file)
+ (servdat-status-set! sdat 'db-locked))
+ (begin
+ (debug:print-info 0 *default-log-port* "I'm not the server, exiting.")
+ (bdat-time-to-exit-set! *bdat* #t)
+ (delete-pkt)
+ (thread-sleep! 0.2)
+ (exit)))
+ (begin
+ (debug:print-info 0 *default-log-port*
+ "Keys do not match "best-srv-key", "server-key", exiting.")
+ (bdat-time-to-exit-set! *bdat* #t)
+ (delete-pkt)
+ (thread-sleep! 0.2)
+ (exit)))
+ sdat))
+ (begin ;; sdat not yet contains server info
+ (debug:print-info 0 *default-log-port* "Still waiting, last-sdat=" last-sdat)
+ (sleep 4)
+ (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes
+ (begin
+ (debug:print-error 0 *default-log-port* "transport appears to have died, exiting server")
+ (exit))
+ (loop start-time
+ (equal? sdat last-sdat)
+ sdat))))))))
+
+(define (rmt:register-server sinfo apath iface port server-key dbname)
+ (servdat-conns sinfo) ;; just checking types
+ (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db
+ (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath
+ (db:run-id->dbname #f)
+ 'register-server `(,iface
+ ,port
+ ,server-key
+ ,(current-process-id)
+ ,iface
+ ,apath
+ ,dbname)))
+
+(define (rmt:get-count-servers sinfo apath)
+ (servdat-conns sinfo) ;; just checking types
+ (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db
+ (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath
+ (db:run-id->dbname #f)
+ 'get-count-servers `(,apath)))
+
+(define (rmt:get-servers-info apath)
+ (rmt:send-receive 'get-servers-info #f `(,apath)))
+
+(define (rmt:deregister-server db-serv-info apath iface port server-key dbname)
+ (rmt:open-main-connection db-serv-info apath) ;; we need a channel to main.db
+ (rmt:send-receive-real db-serv-info apath ;; params: host port servkey pid ipaddr dbpath
+ (db:run-id->dbname #f)
+ 'deregister-server `(,iface
+ ,port
+ ,server-key
+ ,(current-process-id)
+ ,iface
+ ,apath
+ ,dbname)))
+
+(define (rmt:wait-for-stable-interface #!optional (num-tries-allowed 100))
+ ;; wait until *db-serv-info* stops changing
+ (let* ((stime (current-seconds)))
+ (let loop ((last-host #f)
+ (last-port #f)
+ (tries 0))
+ (let* ((curr-host (and *db-serv-info* (servdat-host *db-serv-info*)))
+ (curr-port (and *db-serv-info* (servdat-port *db-serv-info*))))
+ ;; first we verify port and interface, update *db-serv-info* in need be.
+ (cond
+ ((> tries num-tries-allowed)
+ (debug:print 0 *default-log-port* "rmt:keep-running, giving up after trying for several minutes.")
+ (exit 1))
+ ((not *db-serv-info*)
+ (thread-sleep! 0.25)
+ (loop curr-host curr-port (+ tries 1)))
+ ((or (not last-host)(not last-port))
+ (debug:print 0 *default-log-port* "rmt:keep-running, still no interface, tries="tries)
+ (thread-sleep! 0.25)
+ (loop curr-host curr-port (+ tries 1)))
+ ((or (not (equal? last-host curr-host))
+ (not (equal? last-port curr-port)))
+ (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info")
+ (thread-sleep! 0.25)
+ (loop curr-host curr-port (+ tries 1)))
+ ((< (- (current-seconds) stime) 1) ;; keep up the looping until at least 3 seconds have passed
+ (thread-sleep! 0.5)
+ (loop curr-host curr-port (+ tries 1)))
+ (else
+ (rmt:get-signature) ;; sets *my-signature* as side effect
+ (servdat-status-set! *db-serv-info* 'interface-stable)
+ (debug:print 0 *default-log-port*
+ "SERVER STARTED: " curr-host
+ ":" curr-port
+ " AT " (current-seconds) " server signature: " *my-signature*
+ " with "(servdat-trynum *db-serv-info*)" port changes")
+ (flush-output *default-log-port*)
+ #t))))))
+
+;; run rmt:keep-running in a parallel thread to monitor that the db is being
+;; used and to shutdown after sometime if it is not.
+;;
+(define (rmt:keep-running dbname)
+ ;; if none running or if > 20 seconds since
+ ;; server last used then start shutdown
+ ;; This thread waits for the server to come alive
+ (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server")
+
+ (let* ((sinfo *db-serv-info*)
+ (server-start-time (current-seconds))
+ (pkts-dir (get-pkts-dir))
+ (server-key (rmt:get-signature)) ;; This servers key
+ (is-main (equal? (args:get-arg "-db") ".db/main.db"))
+ (last-access 0)
+ (server-timeout (server:expiration-timeout))
+ (shutdown-server-sequence (lambda (host port)
+ (set! *unclean-shutdown* #f) ;; Should not be needed anymore
+ (debug:print-info 0 *default-log-port* "Starting to shutdown the server. pid="(current-process-id))
+ ;; (rmt:server-shutdown host port) -- called in on-exit
+ ;; (portlogger:open-run-close portlogger:set-port port "released") called in on-exit
+ (exit)))
+ (timed-out? (lambda ()
+ (<= (+ last-access server-timeout)
+ (current-seconds)))))
+ (servdat-dbfile-set! *db-serv-info* (args:get-arg "-db"))
+ ;; main and run db servers have both got wait logic (could/should merge it)
+ (if is-main
+ (rmt:wait-for-server pkts-dir dbname server-key)
+ (rmt:wait-for-stable-interface))
+ ;; this is our forever loop
+ (let* ((iface (servdat-host *db-serv-info*))
+ (port (servdat-port *db-serv-info*))
+ (uconn (servdat-uconn *db-serv-info*)))
+ (let loop ((count 0)
+ (bad-sync-count 0)
+ (start-time (current-milliseconds)))
+ (if (and (not is-main)
+ (common:low-noise-print 60 "servdat-status"))
+ (debug:print-info 0 *default-log-port* "servdat-status is " (servdat-status *db-serv-info*)))
+
+ (mutex-lock! *heartbeat-mutex*)
+ ;; set up the database handle
+ (if (not *dbstruct-db*) ;; no db opened yet, open the db and register with main if appropriate
+ (let ((watchdog (bdat-watchdog *bdat*)))
+ (debug:print 0 *default-log-port* "SERVER: dbprep")
+ (db:setup dbname) ;; sets *dbstruct-db* as side effect
+ (servdat-status-set! *db-serv-info* 'db-opened)
+ ;; IFF I'm not main, call into main and register self
+ (if (not is-main)
+ (let ((res (rmt:register-server sinfo
+ *toppath* iface port
+ server-key dbname)))
+ (if res ;; we are the server
+ (servdat-status-set! *db-serv-info* 'have-interface-and-db)
+ ;; now check that the db locker is alive, clear it out if not
+ (let* ((serv-info (rmt:server-info *toppath* dbname)))
+ (match serv-info
+ ((host port servkey pid ipaddr apath dbpath)
+ (if (not (server-ready? uconn (conc host":"port) servkey))
+ (begin
+ (debug:print-info 0 *default-log-port* "Server registered but not alive. Removing and trying again.")
+ (rmt:deregister-server sinfo apath host port servkey dbpath) ;; servkey pid ipaddr apath dbpath)
+ (loop (+ count 1) bad-sync-count start-time))))
+ (else
+ (debug:print 0 *default-log-port* "We are not the server for "dbname", exiting. Server info is: "serv-info)
+ (exit)))))))
+ (debug:print 0 *default-log-port*
+ "SERVER: running, db "dbname" opened, megatest version: "
+ (common:get-full-version))
+ ;; start the watchdog
+
+ ;; is this really needed?
+
+ #;(if watchdog
+ (if (not (member (thread-state watchdog)
+ '(ready running blocked
+ sleeping dead)))
+ (begin
+ (debug:print-info 0 *default-log-port* "Starting watchdog thread (in state "(thread-state watchdog)")")
+ (thread-start! watchdog))
+ (debug:print-info 0 *default-log-port* "Not starting watchdog thread (in state "(thread-state watchdog)")"))
+ (debug:print 0 *default-log-port* "ERROR: *watchdog* not setup, cannot start it."))
+ #;(loop (+ count 1) bad-sync-count start-time)
+ ))
+
+ (db:sync-inmem->disk *dbstruct-db* *toppath* dbname force-sync: #t)
+
+ (mutex-unlock! *heartbeat-mutex*)
+
+ ;; when things go wrong we don't want to be doing the various
+ ;; queries too often so we strive to run this stuff only every
+ ;; four seconds or so.
+ (let* ((sync-time (- (current-milliseconds) start-time))
+ (rem-time (quotient (- 4000 sync-time) 1000)))
+ (if (and (<= rem-time 4)
+ (> rem-time 0))
+ (thread-sleep! rem-time)))
+
+ ;; Transfer *db-last-access* to last-access to use in checking that we are still alive
+ (set! last-access *db-last-access*)
+
+ (if (< count 1) ;; 3x3 = 9 secs aprox
+ (loop (+ count 1) bad-sync-count (current-milliseconds)))
+
+ (if (common:low-noise-print 60 "dbstats")
+ (begin
+ (debug:print 0 *default-log-port* "Server stats:")
+ (db:print-current-query-stats)))
+ (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600)))
+ (cond
+ ((not *server-run*)
+ (debug:print-info 0 *default-log-port* "*server-run* set to #f. Shutting down.")
+ (shutdown-server-sequence (get-host-name) port))
+ ((timed-out?)
+ (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access))
+ (shutdown-server-sequence (get-host-name) port))
+ ((and *server-run*
+ (or (not (timed-out?))
+ (if is-main ;; do not exit if there are other servers (keep main open until all others gone)
+ (> (rmt:get-count-servers sinfo *toppath*) 1)
+ #f)))
+ (if (common:low-noise-print 120 "server continuing")
+ (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)))
+ (loop 0 bad-sync-count (current-milliseconds)))
+ (else
+ (set! *unclean-shutdown* #f)
+ (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access))
+ (shutdown-server-sequence (get-host-name) port)
+ #;(debug:print-info 0 *default-log-port* "Sending 'quit to server, received: "
+ (open-send-receive-nn (conc iface":"port) ;; do this here and not in server-shutdown
+ (sexpr->string 'quit))))))))))
+
+(define (rmt:get-reasonable-hostname)
+ (let* ((inhost (or (args:get-arg "-server") "-")))
+ (if (equal? inhost "-")
+ (get-host-name)
+ inhost)))
+
+;; Call this to start the actual server
+;;
+;; all routes though here end in exit ...
+;;
+;; This is the point at which servers are started
+;;
+(define (rmt:server-launch dbname)
+ (debug:print-info 0 *default-log-port* "Entered rmt:server-launch")
+ (let* ((th2 (make-thread (lambda ()
+ (debug:print-info 0 *default-log-port* "Server run thread started")
+ (rmt:run (rmt:get-reasonable-hostname)))
+ "Server run"))
+ (th3 (make-thread (lambda ()
+ (debug:print-info 0 *default-log-port* "Server monitor thread started")
+ (if (args:get-arg "-server")
+ (rmt:keep-running dbname)))
+ "Keep running")))
+ (thread-start! th2)
+ (thread-sleep! 0.252) ;; give the server time to settle before starting the keep-running monitor.
+ (thread-start! th3)
+ (set! *didsomething* #t)
+ (thread-join! th2)
+ (thread-join! th3))
+ #f)
+
+;;======================================================================
+;; S E R V E R - D I R E C T C A L L S
+;;======================================================================
+
+(define (rmt:kill-server run-id)
+ (rmt:send-receive 'kill-server #f (list run-id)))
+
+(define (rmt:start-server run-id)
+ (rmt:send-receive 'start-server #f (list run-id)))
+
+(define (rmt:server-info apath dbname)
+ (rmt:send-receive 'get-server-info #f (list apath dbname)))
+
+;;======================================================================
+;; Nanomsg transport
+;;======================================================================
+
+#;(define (is-port-in-use port-num)
+ (let* ((ret #f))
+ (let-values (((inp oup pid)
+ (process "netstat" (list "-tulpn" ))))
+ (let loop ((inl (read-line inp)))
+ (if (not (eof-object? inl))
+ (begin
+ (if (string-search (regexp (conc ":" port-num)) inl)
+ (begin
+ ;(print "Output: " inl)
+ (set! ret #t))
+ (loop (read-line inp)))))))
+ ret))
+
+#;(define (open-nn-connection host-port)
+ (let ((req (make-req-socket))
+ (uri (conc "tcp://" host-port)))
+ (nng-dial req uri)
+ (socket-set! req 'nng/recvtimeo 2000)
+ req))
+
+#;(define (send-receive-nn req msg)
+ (nng-send req msg)
+ (nng-recv req))
+
+#;(define (close-nn-connection req)
+ (nng-close! req))
+
+;; ;; open connection to server, send message, close connection
+;; ;;
+;; (define (open-send-close-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds
+;; (let ((req (make-req-socket 'req))
+;; (uri (conc "tcp://" host-port))
+;; (res #f)
+;; ;; (contacts (alist-ref 'contact attrib))
+;; ;; (mode (alist-ref 'mode attrib))
+;; )
+;; (socket-set! req 'nng/recvtimeo 2000)
+;; (handle-exceptions
+;; exn
+;; (let ((emsg ((condition-property-accessor 'exn 'message) exn)))
+;; ;; Send notification
+;; (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\"" )
+;; #f)
+;; (nng-dial req uri)
+;; ;; (print "Connected to the server " )
+;; (nng-send req msg)
+;; ;; (print "Request Sent")
+;; (let* ((th1 (make-thread (lambda ()
+;; (let ((resp (nng-recv req)))
+;; (nng-close! req)
+;; (set! res (if (equal? resp "ok")
+;; #t
+;; #f))))
+;; "recv thread"))
+;; (th2 (make-thread (lambda ()
+;; (thread-sleep! timeout)
+;; (thread-terminate! th1))
+;; "timer thread")))
+;; (thread-start! th1)
+;; (thread-start! th2)
+;; (thread-join! th1)
+;; res))))
+;;
+#;(define (open-send-receive-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds
+ (let ((req (make-req-socket))
+ (uri (conc "tcp://" host-port))
+ (res #f))
+ (handle-exceptions
+ exn
+ (let ((emsg ((condition-property-accessor 'exn 'message) exn)))
+ ;; Send notification
+ (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\", exn=" exn)
+ #f)
+ (nng-dial req uri)
+ (nng-send req msg)
+ (let* ((th1 (make-thread (lambda ()
+ (let ((resp (nng-recv req)))
+ (nng-close! req)
+ ;; (print resp)
+ (set! res resp)))
+ "recv thread"))
+ (th2 (make-thread (lambda ()
+ (thread-sleep! timeout)
+ (thread-terminate! th1))
+ "timer thread")))
+ (thread-start! th1)
+ (thread-start! th2)
+ (thread-join! th1)
+ res))))
+
+;;======================================================================
+;; S E R V E R U T I L I T I E S
+;;======================================================================
+
+;; run ping in separate process, safest way in some cases
+;;
+#;(define (server:ping-server ifaceport)
+ (with-input-from-pipe
+ (conc (common:get-megatest-exe) " -ping " ifaceport)
+ (lambda ()
+ (let loop ((inl (read-line))
+ (res "NOREPLY"))
+ (if (eof-object? inl)
+ (case (string->symbol res)
+ ((NOREPLY) #f)
+ ((LOGIN_OK) #t)
+ (else #f))
+ (loop (read-line) inl))))))
+
+;; NOT USED (well, ok, reference in rpc-transport but otherwise not used).
+;;
+#;(define (server:login toppath)
+ (lambda (toppath)
+ (set! *db-last-access* (current-seconds)) ;; might not be needed.
+ (if (equal? *toppath* toppath)
+ #t
+ #f)))
+
+;; (define server:sync-lock-token "SERVER_SYNC_LOCK")
+;; (define (server:release-sync-lock)
+;; (db:no-sync-del! *no-sync-db* server:sync-lock-token))
+;; (define (server:have-sync-lock?)
+;; (let* ((have-lock-pair (db:no-sync-get-lock *no-sync-db* server:sync-lock-token))
+;; (have-lock? (car have-lock-pair))
+;; (lock-time (cdr have-lock-pair))
+;; (lock-age (- (current-seconds) lock-time)))
+;; (cond
+;; (have-lock? #t)
+;; ((>lock-age
+;; (* 3 (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 180)))
+;; (server:release-sync-lock)
+;; (server:have-sync-lock?))
+;; (else #f))))
+
+)