Megatest

dbmgr.scm at tip
Login

File ulex/dbmgr.scm from the latest check-in


;;======================================================================
;; Copyright 2022, Matthew Welland.
;; 
;; This file is part of Megatest.
;; 
;;     Megatest is free software: you can redistribute it and/or modify
;;     it under the terms of the GNU General Public License as published by
;;     the Free Software Foundation, either version 3 of the License, or
;;     (at your option) any later version.
;; 
;;     Megatest is distributed in the hope that it will be useful,
;;     but WITHOUT ANY WARRANTY; without even the implied warranty of
;;     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
;;     GNU General Public License for more details.
;; 
;;     You should have received a copy of the GNU General Public License
;;     along with Megatest.  If not, see <http://www.gnu.org/licenses/>.

;;======================================================================

(declare (unit dbmgrmod))

(declare (uses ulex))
(declare (uses apimod))
(declare (uses pkts))
(declare (uses commonmod))
(declare (uses dbmod))
(declare (uses mtargs))
(declare (uses portloggermod))
(declare (uses debugprint))

(module dbmgrmod
    *

(import scheme
	chicken.base
	chicken.condition
	chicken.file
	chicken.format
	chicken.port
	chicken.process
	chicken.process-context
	chicken.process-context.posix
	chicken.sort
	chicken.string
	chicken.time
	
	(prefix sqlite3 sqlite3:)
	matchable
	md5
	message-digest
	regex
	s11n
	srfi-1
	srfi-18
	srfi-69
	system-information
	typed-records
	
	pkts
	ulex

	commonmod
	apimod
	dbmod
	debugprint
	(prefix mtargs args:)
	portloggermod
	)

;; Configurations for server
;; (tcp-buffer-size 2048)
;; (max-connections 2048) 

;; info about me as a listener and my connections to db servers
;; stored (for now) in *db-serv-info*
;;
(defstruct servdat
  (host #f)
  (port #f)
  (uuid #f)
  (dbfile #f)
  (uconn   #f) ;; this is the listener *FOR THIS PROCESS*
  (mode    #f)
  (status 'starting)
  (trynum 0) ;; count the number of ports we've tried
  (conns  (make-hash-table)) ;; apath/dbname => conndat
  ) 

(define *db-serv-info* (make-servdat))

(define (servdat->url sdat)
  (conc (servdat-host sdat)":"(servdat-port sdat)))

;; db servers contact info
;;
(defstruct conndat
  (apath    #f)
  (dbname   #f)
  (fullname #f)
  (hostport #f)
  (ipaddr   #f)
  (port     #f)
  (srvpkt   #f)
  (srvkey   #f)
  (lastmsg  0)
  (expires  0))

(define *srvpktspec*
  `((server (host    . h)
	    (port    . p)
	    (servkey . k)
	    (pid     . i)
	    (ipaddr  . a)
	    (dbpath  . d))))

;;======================================================================
;;  S U P P O R T   F U N C T I O N S
;;======================================================================

;; set up the api proc, seems like there should be a better place for this?
;;
;; IS THIS NEEDED ANYMORE? TODO - REMOVE IF POSSIBLE
;;
;; (define api-proc (make-parameter conc))
;; (api-proc api:execute-requests)

;; do we have a connection to apath dbname and
;; is it not expired? then return it
;;
;; else setup a connection
;;
;; if that fails, return '(#f "some reason") ;; NB// convert to raising an exception
;;
(define (rmt:get-conn remdat apath dbname)
  (let* ((fullname (db:dbname->path apath dbname)))
    (hash-table-ref/default (servdat-conns remdat) fullname #f)))

(define (rmt:drop-conn remdat apath dbname)
  (let* ((fullname (db:dbname->path apath dbname)))
    (hash-table-delete! (servdat-conns remdat) fullname)))
  
(define (rmt:find-main-server uconn apath dbname)
  (let* ((pktsdir     (get-pkts-dir apath))
	 (all-srvpkts (get-all-server-pkts pktsdir *srvpktspec*))
	 (viable-srvs (get-viable-servers all-srvpkts dbname)))
    (get-the-server uconn apath viable-srvs)))


(define *connstart-mutex* (make-mutex))
(define *last-main-start* 0)

;; looks for a connection to main, returns if have and not exired
;; creates new otherwise
;; 
;; connections for other servers happens by requesting from main
;;
;; TODO: This is unnecessarily re-creating the record in the hash table
;;
(define (rmt:open-main-connection remdat apath)
  (let* ((fullpath (db:dbname->path apath ".db/main.db"))
	 (conns    (servdat-conns remdat))
	 (conn     (rmt:get-conn remdat apath ".db/main.db")) ;; (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this
	 (start-rmt:run (lambda ()
			  (let* ((th1 (make-thread (lambda ()(rmt:run (get-host-name))) "non-db mode server")))
			    (thread-start! th1)
			    (thread-sleep! 1)
			    (let loop ((count 0))
			      (assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection")
			      (if (or (not *db-serv-info*)
				      (not (servdat-uconn *db-serv-info*)))
				  (begin
				    (thread-sleep! 1)
				    (loop (+ count 1)))
				  (begin
				    (servdat-mode-set! *db-serv-info* 'non-db)
				    (servdat-uconn *db-serv-info*)))))))
	 (myconn    (servdat-uconn *db-serv-info*)))
    (cond
     ((not myconn)
      (start-rmt:run)
      (rmt:open-main-connection remdat apath))
     ((and conn                                             ;; conn is NOT a socket, just saying ...
	   (< (current-seconds) (conndat-expires conn)))
      #t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died 
     ((and conn
	   (>= (current-seconds)(conndat-expires conn)))
      (debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.")
      (rmt:drop-conn remdat apath ".db/main.db") ;;
      (rmt:open-main-connection remdat apath))
     (else
      ;; Below we will find or create and connect to main
      (debug:print-info 0 *default-log-port* "rmt:open-main-connection - starting from scratch")
      (let* ((dbname         (db:run-id->dbname #f))
	     (the-srv        (rmt:find-main-server myconn apath dbname))
	     (start-main-srv (lambda () ;; call IF there is no the-srv found
			       (mutex-lock! *connstart-mutex*)
			       (if (> (- (current-seconds) *last-main-start*) 5) ;; at least four seconds since last attempt to start main server
				   (begin
				     (api:run-server-process apath dbname)
				     (set! *last-main-start* (current-seconds))
				     (thread-sleep! 1))
				   (thread-sleep! 0.25))
			       (mutex-unlock! *connstart-mutex*)
			       (rmt:open-main-connection remdat apath) ;; TODO: Add limit to number of tries
			       )))
	(if (not the-srv) ;; have server, try connecting to it
	    (start-main-srv)
	    (let* ((srv-addr (server-address the-srv)) ;; need serv
		   (ipaddr   (alist-ref 'ipaddr  the-srv))
		   (port     (alist-ref 'port    the-srv))
		   (srvkey   (alist-ref 'servkey the-srv))
		   (fullpath (db:dbname->path apath dbname))
		   
		   (new-the-srv (make-conndat
				 apath:   apath
				 dbname:  dbname
				 fullname: fullpath
				 hostport: srv-addr
				 ;; socket: (open-nn-connection srv-addr)  - TODO - open ulex connection?
				 ipaddr: ipaddr
				 port: port
				 srvpkt: the-srv
				 srvkey: srvkey ;; generated by rmt:get-signature on the server side
				 lastmsg: (current-seconds)
				 expires: (+ (current-seconds)
					     (server:expiration-timeout)
					     -2) ;; this needs to be gathered during the ping
				 )))
	      (hash-table-set! conns fullpath new-the-srv)))
	#t)))))

;; NB// sinfo is a servdat struct
;;
(define (rmt:general-open-connection sinfo apath dbname #!key (num-tries 5))
  (assert (not (equal? dbname ".db/main.db")) "ERROR: general-open-connection should never be called with main as the db")
  (let* ((mdbname  ".db/main.db") ;; (db:run-id->dbname #f)) TODO: put this back to the lookup when stable
	 (fullname (db:dbname->path apath dbname))
	 (conns    (servdat-conns sinfo))
	 (mconn    (rmt:get-conn sinfo apath ".db/main.db"))
	 (dconn    (rmt:get-conn sinfo apath dbname)))
    #;(if (and mconn
	     (not (debug:print-logger)))
	(begin
	  (debug:print-info 0 *default-log-port* "Turning on logging to main, look in logs dir for main log.")
	  (debug:print-logger rmt:log-to-main)))
    (cond
     ((and mconn
	   dconn
	   (< (current-seconds)(conndat-expires dconn)))
      #t) ;; good to go
     ((not mconn) ;; no channel open to main? open it...
      (rmt:open-main-connection sinfo apath)
      (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1)))
     ((not dconn)                 ;; no channel open to dbname?     
      (let* ((res (rmt:send-receive-real sinfo apath mdbname 'get-server `(,apath ,dbname))))
	(case res
	  ((server-started)
	   (if (> num-tries 0)
	       (begin
		 (thread-sleep! 2)
		 (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1)))
	       (begin
		 (debug:print-error 0 *default-log-port* "Failed to start servers needed or open channel to "apath", "dbname)
		 (exit 1))))
	  (else
	   (if (list? res) ;; server has been registered and the info was returned. pass it on.
	       (begin ;;  ("192.168.0.9" 53817
		      ;;  "5e34239f48e8973b3813221e54701a01" "24310"
		      ;;  "192.168.0.9"
		      ;;  "/home/matt/data/megatest/tests/simplerun"
		 ;;  ".db/1.db")
		 (match
		  res
		  ((host port servkey pid ipaddr apath dbname)
		   (debug:print-info 0 *default-log-port* "got "res)
		   (hash-table-set! conns
				    fullname
				    (make-conndat
				     apath: apath
				     dbname: dbname
				     hostport: (conc host":"port)
				     ;; socket: (open-nn-connection (conc host":"port)) ;; TODO - open ulex connection?
				     ipaddr: ipaddr
				     port: port
				     srvkey: servkey
				     lastmsg: (current-seconds)
				     expires: (+ (current-seconds)
						 (server:expiration-timeout)
						 -2))))
		  (else
		   (debug:print-info 0 *default-log-port* "return data from starting server did not match host port servkey pid ipaddr apath dbname " res)))
		 res)
	       (begin
		 (debug:print-info 0 *default-log-port* "Unexpected result: " res)
		 res)))))))
    #t))

;;======================================================================

;; FOR DEBUGGING SET TO #t
;; (define *localmode* #t)
(define *localmode* #f)
(define *dbstruct* (make-dbr:dbstruct))

;; Defaults to current area
;;
(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f))
  (let* ((apath      *toppath*)
	 (sinfo      *db-serv-info*)
	 (dbname     (db:run-id->dbname rid)))
    (if *localmode*
	(api:execute-requests *dbstruct* cmd params)
	(begin
	  (rmt:open-main-connection sinfo apath)
	  (if rid (rmt:general-open-connection sinfo apath dbname))
	  #;(if (not (member cmd '(log-to-main)))
	      (debug:print-info 0 *default-log-port* "rmt:send-receive "cmd" params="params))
	  (rmt:send-receive-real sinfo apath dbname cmd params)))))

;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
;; sometime in the future
;;
(define (rmt:send-receive-real sinfo apath dbname cmd params)
  (assert (not (eq? 'primordial (thread-name (current-thread)))) "FATAL: Do not call rmt:send-receive-real in the primodial thread.")
  (let* ((cdat (rmt:get-conn sinfo apath dbname)))
    (assert cdat "FATAL: rmt:send-receive-real called without the needed channels opened")
    (let* ((uconn    (servdat-uconn sinfo)) ;; get the interface to ulex
	   ;; then send-receive using the ulex layer to host-port stored in cdat
	   (res      (send-receive uconn (conndat-hostport cdat) cmd params))
	   #;(th1      (make-thread (lambda ()
				    (set! res (send-receive uconn (conndat-hostport cdat) cmd params)))
				  "send-receive thread")))
      ;; (thread-start! th1)
      ;; (thread-join! th1)   ;; gratuitious thread stuff is so that mailbox is not used in primordial thead
      ;; since we accessed the server we can bump the expires time up
      (conndat-expires-set! cdat (+ (current-seconds)
				    (server:expiration-timeout)
				    -10)) ;; ten second margin for network time misalignments etc.
      res)))

;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
;; sometime in the future.
;;
;; Purpose - call the main.db server and request a server be started
;; for the given area path and dbname
;;

(define (rmt:print-db-stats)
  (let ((fmtstr "~40a~7-d~9-d~20,2-f")) ;; "~20,2-f"
    (debug:print 18 *default-log-port* "DB Stats, "(seconds->year-week/day-time (current-seconds))"\n=====================")
    (debug:print 18 *default-log-port* (format #f "~40a~8a~10a~10a" "Cmd" "Count" "TotTime" "Avg"))
    (for-each (lambda (cmd)
		(let ((cmd-dat (hash-table-ref *db-stats* cmd)))
		  (debug:print 18 *default-log-port* (format #f fmtstr cmd (vector-ref cmd-dat 0) (vector-ref cmd-dat 1) (/ (vector-ref cmd-dat 1)(vector-ref cmd-dat 0))))))
	      (sort (hash-table-keys *db-stats*)
		    (lambda (a b)
		      (> (vector-ref (hash-table-ref *db-stats* a) 0)
			 (vector-ref (hash-table-ref *db-stats* b) 0)))))))

(define (rmt:get-max-query-average run-id)
  (mutex-lock! *db-stats-mutex*)
  (let* ((runkey (conc "run-id=" run-id " "))
	 (cmds   (filter (lambda (x)
			   (substring-index runkey x))
			 (hash-table-keys *db-stats*)))
	 (res    (if (null? cmds)
		     (cons 'none 0)
		     (let loop ((cmd (car cmds))
				(tal (cdr cmds))
				(max-cmd (car cmds))
				(res 0))
		       (let* ((cmd-dat (hash-table-ref *db-stats* cmd))
			      (tot     (vector-ref cmd-dat 0))
			      (curravg (/ (vector-ref cmd-dat 1) (vector-ref cmd-dat 0))) ;; count is never zero by construction
			      (currmax (max res curravg))
			      (newmax-cmd (if (> curravg res) cmd max-cmd)))
			 (if (null? tal)
			     (if (> tot 10)
				 (cons newmax-cmd currmax)
				 (cons 'none 0))
			     (loop (car tal)(cdr tal) newmax-cmd currmax)))))))
    (mutex-unlock! *db-stats-mutex*)
    res))

;; host and port are used to ensure we are remove proper records
(define (rmt:server-shutdown host port)
  (let ((dbfile   (servdat-dbfile *db-serv-info*)))
    (debug:print-info 0 *default-log-port* "dbfile is "dbfile)
    (if dbfile
	(let* ((am-server  (args:get-arg "-server"))
	       (dbfile     (args:get-arg "-db"))
	       (apath      *toppath*)
	       #;(sinfo     *remotedat*)) ;; foundation for future fix
	  (if *dbstruct-db*
	      (let* ((dbdat      (db:get-dbdat *dbstruct-db* apath dbfile))
		     (db         (dbr:dbdat-db dbdat))
		     (inmem      (dbr:dbdat-db dbdat))   ;; WRONG
		     )
		;; do a final sync here
		(debug:print-info 0 *default-log-port* "Doing final sync for "apath" "dbfile" at "(current-seconds))
		(db:sync-inmem->disk *dbstruct-db* apath dbfile force-sync: #t)
		;; let's finalize here
		(debug:print-info 0 *default-log-port* "Finalizing db and inmem")
		(if (sqlite3:database? db)
		    (sqlite3:finalize! db)
		    (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, db is not a database, not finalizing..."))
		(if (sqlite3:database? inmem)
		    (sqlite3:finalize! inmem)
		    (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, inmem is not a database, not finalizing..."))
		(debug:print-info 0 *default-log-port* "Finalizing db and inmem complete"))
	      (debug:print-info 0 *default-log-port* "Db was never opened, no cleanup to do."))
	  (if (not am-server)
	      (debug:print-info 0 *default-log-port* "I am not a server, should NOT get here!")
	      (if (string-match ".*/main.db$" dbfile)
		  (let ((pkt-file (conc (get-pkts-dir *toppath*)
					"/" (servdat-uuid *db-serv-info*)
					".pkt")))
		    (debug:print-info 0 *default-log-port* "removing pkt "pkt-file)
		    (delete-file* pkt-file)
		    (debug:print-info 0 *default-log-port* "Releasing lock (if any) for "dbfile ", host "host", port "port)
		    (db:with-lock-db
		     (servdat-dbfile *db-serv-info*)
		     (lambda (dbh dbfile)
		       (db:release-lock dbh dbfile host port)))) ;; I'm not the server - should not have a lock to remove
		  (let* ((sdat *db-serv-info*) ;; we have a run-id server
			 (host (servdat-host sdat))
			 (port (servdat-port sdat))
			 (uuid (servdat-uuid sdat))
			 (res  (rmt:deregister-server *db-serv-info* *toppath* host port uuid dbfile)))
		    (debug:print-info 0 *default-log-port* "deregistered-server, res="res)
		    (debug:print-info 0 *default-log-port* "deregistering server "host":"port" with uuid "uuid)
		    )))))))


(define (common:run-sync?)
    ;; (and (common:on-homehost?)
  (args:get-arg "-server"))

(define *rmt:run-mutex* (make-mutex))
(define *rmt:run-flag* #f)

;; Main entry point to start a server. was start-server
(define (rmt:run hostn)
  (mutex-lock! *rmt:run-mutex*)
  (if *rmt:run-flag*
      (begin
	(debug:print-warn 0 *default-log-port* "rmt:run already running.")
	(mutex-unlock! *rmt:run-mutex*))
      (begin
	(set! *rmt:run-flag* #t)
	(mutex-unlock! *rmt:run-mutex*)
	;;  ;; Configurations for server
	;;  (tcp-buffer-size 2048)
	;;  (max-connections 2048) 
	(debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...")
	(if (and *db-serv-info*
		 (servdat-uconn *db-serv-info*))
	    (let* ((uconn (servdat-uconn *db-serv-info*)))
	      (wait-and-close uconn))
	    (let* ((port            (portlogger:open-run-close portlogger:find-port))
		   (handler-proc    (lambda (rem-host-port qrykey cmd params) ;;
				      (set! *db-last-access* (current-seconds))
				      (assert (list? params) "FATAL: handler called with non-list params")
				      (assert (args:get-arg "-server") "FATAL: handler called on non-server side. cmd="cmd", params="params)
				      (debug:print 0 *default-log-port* "handler call: "cmd", params="params)
				      (api:execute-requests *dbstruct-db* cmd params))))
	      ;; (api:process-request *dbstuct-db* 
	      (if (not *db-serv-info*)
		  (set! *db-serv-info* (make-servdat host: hostn port: port)))
	      (let* ((uconn (run-listener handler-proc port))
		     (rport (udat-port uconn))) ;; the real port
		(servdat-host-set! *db-serv-info* hostn)
		(servdat-port-set! *db-serv-info* rport)
		(servdat-uconn-set! *db-serv-info* uconn)
		(wait-and-close uconn)
		(db:print-current-query-stats)
		)))
	(let* ((host (servdat-host *db-serv-info*))
	       (port (servdat-port *db-serv-info*))
	       (mode (or (servdat-mode *db-serv-info*)
			 "non-db")))
	  ;; server exit stuff here
	  ;; (rmt:server-shutdown host port) - always do in on-exit
	  ;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit 
	  (debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting")
	  ))))

;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================


;;======================================================================
;; NEW SERVER METHOD
;;======================================================================

;; only use for main.db - need to re-write some of this :(
;;
(define (get-lock-db sdat dbfile host port)
  (assert host "FATAL: get-lock-db called with host not set.")
  (assert port "FATAL: get-lock-db called with port not set.")
  (let* ((dbh (db:open-run-db dbfile db:initialize-db)) ;; open-run-db creates a standard db with schema used by all situations
	 (res (db:get-iam-server-lock dbh dbfile host port))
	 (uconn (servdat-uconn sdat)))
    ;; res => list then already locked, check server is responsive
    ;;     => #t then sucessfully got the lock
    ;;     => #f reserved for future use as to indicate something went wrong
    (match res
      ((owner_pid owner_host owner_port event_time)
       (if (server-ready? uconn (conc owner_host":"owner_port) "abc")
	   #f      ;; locked by someone else
	   (begin  ;; locked by someone dead and gone
	     (debug:print 0 *default-log-port* "WARNING: stale lock - have to steal it. This may fail.")
	     (db:steal-lock-db dbh dbfile port))))
      (#t  #t) ;; placeholder so that we don't touch res if it is #t
      (else (set! res #f)))
    (sqlite3:finalize! dbh)
    res))


(define (register-server pkts-dir pkt-spec host port servkey ipaddr dbpath)
  (let* ((pkt-dat `((host    . ,host)
		    (port    . ,port)
		    (servkey . ,servkey)
		    (pid     . ,(current-process-id))
		    (ipaddr  . ,ipaddr)
		    (dbpath  . ,dbpath)))
	 (uuid    (write-alist->pkt
		   pkts-dir
		   pkt-dat
		   pktspec: pkt-spec
		   ptype: 'server)))
    (debug:print 0 *default-log-port* "Server on "host":"port" registered in pkt "uuid)
    uuid))

(define (get-pkts-dir #!optional (apath #f))
  (let* ((effective-toppath (or *toppath* apath)))
    (assert effective-toppath
	    "ERROR: get-pkts-dir called without *toppath* set. Exiting.")
    (let* ((pdir (conc effective-toppath "/.meta/srvpkts")))
      (if (file-exists? pdir)
	  pdir
	  (begin
	    (handle-exceptions ;; this exception handler should NOT be needed but ...
		exn
		pdir
	      (create-directory pdir #t))
	    pdir)))))

;; given a pkts dir read 
;;
(define (get-all-server-pkts pktsdir-in pktspec)
  (let* ((pktsdir  (if (file-exists? pktsdir-in)
		       pktsdir-in
		       (begin
			 (create-directory pktsdir-in #t)
			 pktsdir-in)))
	 (all-pkt-files (glob (conc pktsdir "/*.pkt"))))
    (map (lambda (pkt-file)
	   (read-pkt->alist pkt-file pktspec: pktspec))
	 all-pkt-files)))

(define (server-address srv-pkt)
  (conc (alist-ref 'host srv-pkt) ":"
	(alist-ref 'port srv-pkt)))
	
(define (server-ready? uconn host-port key) ;; server-address is host:port
  (let* ((params `((cmd . ping)(key . ,key)))
	 (data `((cmd . ping)
		 (key . ,key)
		 (params . ,params))) ;; I don't get it.
	 (res  (send-receive uconn host-port 'ping data)))
    (if (eq? res 'ack) ;; yep, likely it is who we want on the other end
	res
	#f)))
;; (begin (debug:print-info 0 *default-log-port* "server-ready? => "res) #f))))

; from the pkts return servers associated with dbpath
;; NOTE: Only one can be alive - have to check on each
;;       in the list of pkts returned
;;
(define (get-viable-servers serv-pkts dbpath)
  (let loop ((tail serv-pkts)
	     (res  '()))
    (if (null? tail)
	res ;; NOTE: sort by age so oldest is considered first
	(let* ((spkt (car tail)))
	  (loop (cdr tail)
		(if (equal? dbpath (alist-ref 'dbpath spkt))
		    (cons spkt res)
		    res))))))

(define (remove-pkts-if-not-alive uconn serv-pkts)
  (filter (lambda (pkt)
	    (let* ((host (alist-ref 'host pkt))
		   (port (alist-ref 'port pkt))
		   (host-port (conc host":"port))
		   (key  (alist-ref 'servkey  pkt))
		   (pktz (alist-ref 'Z        pkt))
		   (res  (server-ready? uconn host-port key)))
	      (if res
		  res
		  (let* ((pktsdir (get-pkts-dir *toppath*))
			 (pktpath (conc pktsdir"/"pktz".pkt")))
		    (debug:print 0 *default-log-port* "WARNING: pkt with no server "pktpath)
		    (delete-file* pktpath)
		    #f))))
	  serv-pkts))

;; from viable servers get one that is alive and ready
;;
(define (get-the-server uconn apath serv-pkts)
  (let loop ((tail serv-pkts))
    (if (null? tail)
	#f
	(let* ((spkt  (car tail))
	       (host  (alist-ref 'ipaddr spkt))
	       (port  (alist-ref 'port spkt))
	       (host-port (conc host":"port))
	       (dbpth (alist-ref 'dbpath spkt))
	       (srvkey (alist-ref 'Z spkt)) ;; (alist-ref 'srvkey spkt))
	       (addr  (server-address spkt)))
	  (if (server-ready? uconn host-port srvkey)
	      spkt
	      (loop (cdr tail)))))))

;; am I the "first" in line server? I.e. my D card is smallest
;; use Z card as tie breaker
;;
(define (get-best-candidate serv-pkts dbpath)
  (if (null? serv-pkts)
      #f
      (let loop ((tail serv-pkts)
		 (best  (car serv-pkts)))
	(if (null? tail)
	    best
	    (let* ((candidate (car tail))
		   (candidate-bd (string->number (alist-ref 'D candidate)))
		   (best-bd      (string->number (alist-ref 'D best)))
		   ;; bigger number is younger
		   (candidate-z  (alist-ref 'Z candidate))
		   (best-z       (alist-ref 'Z best))
		   (new-best     (cond
				  ((> best-bd candidate-bd) ;; best is younger than candidate
				   candidate)
				  ((< best-bd candidate-bd) ;; candidate is younger than best
				   best)
				  (else
				   (if (string>=? best-z candidate-z)
				       best
				       candidate))))) ;; use Z card as tie breaker
	      (if (null? tail)
		  new-best
		  (loop (cdr tail) new-best)))))))
	  

;;======================================================================
;; END NEW SERVER METHOD
;;======================================================================

;; if .db/main.db check the pkts
;; 
(define (rmt:wait-for-server pkts-dir db-file server-key)
  (let* ((sdat *db-serv-info*))
    (let loop ((start-time (current-seconds))
	       (changed    #t)
	       (last-sdat  "not this"))
      (begin ;; let ((sdat #f))
	(thread-sleep! 0.01)
	(debug:print-info 0 *default-log-port* "Waiting for server alive signature")
	(mutex-lock! *heartbeat-mutex*)
	(set! sdat *db-serv-info*)
	(mutex-unlock! *heartbeat-mutex*)
	(if (and sdat
		 (not changed)
		 (> (- (current-seconds) start-time) 2))
	    (let* ((uconn (servdat-uconn sdat)))
	      (servdat-status-set! sdat 'iface-stable)
	      (debug:print-info 0 *default-log-port* "Received server alive signature, now attempting to lock in server")
	      ;; create a server pkt in *toppath*/.meta/srvpkts
	      
	      ;; TODO:
	      ;;   1. change sdat to stuct
	      ;;   2. add uuid to struct
	      ;;   3. update uuid in sdat here
	      ;;
	      (servdat-uuid-set! sdat
				 (register-server
				  pkts-dir *srvpktspec*
				  (get-host-name)
				  (servdat-port sdat) server-key
				  (servdat-host sdat) db-file))
	      ;; (set! *my-signature* (servdat-uuid sdat)) ;; replace with Z, no, stick with proper key
	      ;; now read pkts and see if we are a contender
	      (let* ((all-pkts     (get-all-server-pkts pkts-dir *srvpktspec*))
		     (viables      (get-viable-servers all-pkts db-file))
		     (alive        (remove-pkts-if-not-alive uconn viables))
		     (best-srv     (get-best-candidate alive db-file))
		     (best-srv-key (if best-srv (alist-ref 'servkey best-srv) #f))
		     (i-am-srv     (equal? best-srv-key server-key))
		     (delete-pkt   (lambda ()
				     (let* ((pktfile (conc (get-pkts-dir *toppath*)
							 "/" (servdat-uuid *db-serv-info*)
							 ".pkt")))
				       (debug:print-info 0 *default-log-port* "Attempting to remove bogus pkt file "pktfile)
				       (delete-file* pktfile))))) ;; remove immediately instead of waiting for on-exit
		(debug:print 0 *default-log-port* "best-srv-key: "best-srv-key", server-key: "server-key", i-am-srv: "i-am-srv)
		;; am I the best-srv, compare server-keys to know
		(if i-am-srv
		    (if (get-lock-db sdat db-file (servdat-host sdat)(servdat-port sdat)) ;; (db:get-iam-server-lock *dbstruct-db* *toppath* run-id)
			(begin
			  (debug:print-info 0 *default-log-port* "I'm the server!")
			  (servdat-dbfile-set! sdat db-file)
			  (servdat-status-set! sdat 'db-locked))
			(begin
			  (debug:print-info 0 *default-log-port* "I'm not the server, exiting.")
			  (bdat-time-to-exit-set! *bdat* #t)
			  (delete-pkt)
			  (thread-sleep! 0.2)
			  (exit)))
		    (begin
		      (debug:print-info 0 *default-log-port*
				   "Keys do not match "best-srv-key", "server-key", exiting.")
		      (bdat-time-to-exit-set! *bdat* #t)
		      (delete-pkt)
		      (thread-sleep! 0.2)
		      (exit)))
		sdat))
	    (begin ;; sdat not yet contains server info
	      (debug:print-info 0 *default-log-port* "Still waiting, last-sdat=" last-sdat)
	      (sleep 4)
	      (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes
		  (begin
		    (debug:print-error 0 *default-log-port* "transport appears to have died, exiting server")
		    (exit))
		  (loop start-time
			(equal? sdat last-sdat)
			sdat))))))))

(define (rmt:register-server sinfo apath iface port server-key dbname)
  (servdat-conns sinfo) ;; just checking types
  (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db
  (rmt:send-receive-real sinfo apath      ;; params: host port servkey pid ipaddr dbpath
			 (db:run-id->dbname #f)
			 'register-server `(,iface
					    ,port
					    ,server-key
					    ,(current-process-id)
					    ,iface
					    ,apath
					    ,dbname)))

(define (rmt:get-count-servers sinfo apath)
  (servdat-conns sinfo) ;; just checking types
  (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db
  (rmt:send-receive-real sinfo apath      ;; params: host port servkey pid ipaddr dbpath
			 (db:run-id->dbname #f)
			 'get-count-servers `(,apath)))

(define (rmt:get-servers-info apath)
  (rmt:send-receive 'get-servers-info #f `(,apath)))

(define (rmt:deregister-server db-serv-info apath iface port server-key dbname)
  (rmt:open-main-connection db-serv-info apath) ;; we need a channel to main.db
  (rmt:send-receive-real db-serv-info apath      ;; params: host port servkey pid ipaddr dbpath
                         (db:run-id->dbname #f)
                         'deregister-server `(,iface
                                              ,port
                                              ,server-key
                                              ,(current-process-id)
                                              ,iface
                                              ,apath
                                              ,dbname)))

(define (rmt:wait-for-stable-interface #!optional (num-tries-allowed 100))
  ;; wait until *db-serv-info* stops changing
  (let* ((stime (current-seconds)))
    (let loop ((last-host  #f)
	       (last-port  #f)
	       (tries 0))
      (let* ((curr-host (and *db-serv-info* (servdat-host *db-serv-info*)))
	     (curr-port (and *db-serv-info* (servdat-port *db-serv-info*))))
	;; first we verify port and interface, update *db-serv-info* in need be.
	(cond
	 ((> tries num-tries-allowed)
	  (debug:print 0 *default-log-port* "rmt:keep-running, giving up after trying for several minutes.")
	  (exit 1))
	 ((not *db-serv-info*)
	  (thread-sleep! 0.25)
	  (loop curr-host curr-port (+ tries 1)))
	 ((or (not last-host)(not last-port))
	  (debug:print 0 *default-log-port* "rmt:keep-running, still no interface, tries="tries)
	  (thread-sleep! 0.25)
	  (loop curr-host curr-port (+ tries 1)))
	 ((or (not (equal? last-host curr-host))
	      (not (equal? last-port curr-port)))
	  (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info")
	  (thread-sleep! 0.25)
	  (loop curr-host curr-port (+ tries 1)))
	 ((< (- (current-seconds) stime) 1) ;; keep up the looping until at least 3 seconds have passed
	  (thread-sleep! 0.5)
	  (loop curr-host curr-port (+ tries 1)))
	 (else
	  (rmt:get-signature) ;; sets *my-signature* as side effect
	  (servdat-status-set! *db-serv-info* 'interface-stable)
	  (debug:print 0 *default-log-port*
		       "SERVER STARTED: " curr-host
		       ":" curr-port
		       " AT " (current-seconds) " server signature: " *my-signature*
		       " with "(servdat-trynum *db-serv-info*)" port changes")
	  (flush-output *default-log-port*)
	  #t))))))

;; run rmt:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (rmt:keep-running dbname) 
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server")

  (let* ((sinfo             *db-serv-info*)
	 (server-start-time (current-seconds))
	 (pkts-dir          (get-pkts-dir))
	 (server-key        (rmt:get-signature)) ;; This servers key
	 (is-main           (equal? (args:get-arg "-db") ".db/main.db"))
	 (last-access       0)
	 (server-timeout    (server:expiration-timeout))
	 (shutdown-server-sequence (lambda (host port)
				     (set! *unclean-shutdown* #f) ;; Should not be needed anymore
				     (debug:print-info 0 *default-log-port* "Starting to shutdown the server. pid="(current-process-id))
				     ;; (rmt:server-shutdown host port) -- called in on-exit
				     ;; (portlogger:open-run-close portlogger:set-port port "released") called in on-exit
				     (exit)))
	 (timed-out?        (lambda ()
			      (<= (+ last-access server-timeout)
				  (current-seconds)))))
    (servdat-dbfile-set! *db-serv-info* (args:get-arg "-db"))
    ;; main and run db servers have both got wait logic (could/should merge it)
    (if is-main
	(rmt:wait-for-server pkts-dir dbname server-key)
	(rmt:wait-for-stable-interface))
    ;; this is our forever loop
    (let* ((iface (servdat-host *db-serv-info*))
	   (port  (servdat-port *db-serv-info*))
	   (uconn (servdat-uconn *db-serv-info*)))
      (let loop ((count          0)
		 (bad-sync-count 0)
		 (start-time     (current-milliseconds)))
	(if (and (not is-main)
		 (common:low-noise-print 60 "servdat-status"))
	    (debug:print-info 0 *default-log-port* "servdat-status is " (servdat-status *db-serv-info*)))

	(mutex-lock! *heartbeat-mutex*)
	;; set up the database handle
	(if (not *dbstruct-db*) ;; no db opened yet, open the db and register with main if appropriate
	      (let ((watchdog (bdat-watchdog *bdat*)))
		(debug:print 0 *default-log-port* "SERVER: dbprep")
		(db:setup dbname) ;; sets *dbstruct-db* as side effect
		(servdat-status-set! *db-serv-info* 'db-opened)
		;; IFF I'm not main, call into main and register self
		(if (not is-main)
		    (let ((res (rmt:register-server sinfo
						    *toppath* iface port
						    server-key dbname)))
		      (if res ;; we are the server
			  (servdat-status-set! *db-serv-info* 'have-interface-and-db)
			  ;; now check that the db locker is alive, clear it out if not
			  (let* ((serv-info (rmt:server-info *toppath* dbname)))
			    (match serv-info
				   ((host port servkey pid ipaddr apath dbpath)
				    (if (not (server-ready? uconn (conc host":"port) servkey))
					(begin
					  (debug:print-info 0 *default-log-port* "Server registered but not alive. Removing and trying again.")
					  (rmt:deregister-server sinfo apath host port servkey dbpath) ;; servkey pid ipaddr apath dbpath)
					  (loop (+ count 1) bad-sync-count start-time))))
				   (else
				    (debug:print 0 *default-log-port* "We are not the server for "dbname", exiting. Server info is: "serv-info)
				    (exit)))))))
		(debug:print 0 *default-log-port*
			     "SERVER: running, db "dbname" opened, megatest version: "
			   (common:get-full-version))
	      ;; start the watchdog

	      ;; is this really needed?
	      
	      #;(if watchdog
		  (if (not (member (thread-state watchdog)
				   '(ready running blocked
					   sleeping dead)))
		      (begin
			(debug:print-info 0 *default-log-port* "Starting watchdog thread (in state "(thread-state watchdog)")")
			(thread-start! watchdog))
		      (debug:print-info 0 *default-log-port* "Not starting watchdog thread (in state "(thread-state watchdog)")"))
		  (debug:print 0 *default-log-port* "ERROR: *watchdog* not setup, cannot start it."))
	      #;(loop (+ count 1) bad-sync-count start-time)
	      ))
	
	(db:sync-inmem->disk *dbstruct-db* *toppath* dbname force-sync: #t)
	
	(mutex-unlock! *heartbeat-mutex*)
	
	;; when things go wrong we don't want to be doing the various
	;; queries too often so we strive to run this stuff only every
	;; four seconds or so.
	(let* ((sync-time (- (current-milliseconds) start-time))
	       (rem-time  (quotient (- 4000 sync-time) 1000)))
	  (if (and (<= rem-time 4)
		   (>  rem-time 0))
	      (thread-sleep! rem-time)))
    
	;; Transfer *db-last-access* to last-access to use in checking that we are still alive
	(set! last-access *db-last-access*)
	
	(if (< count 1) ;; 3x3 = 9 secs aprox
	    (loop (+ count 1) bad-sync-count (current-milliseconds)))
	
	(if (common:low-noise-print 60 "dbstats")
	    (begin
	      (debug:print 0 *default-log-port* "Server stats:")
	      (db:print-current-query-stats)))
	(let* ((hrs-since-start  (/ (- (current-seconds) server-start-time) 3600)))
	  (cond
	   ((not *server-run*)
	    (debug:print-info 0 *default-log-port* "*server-run* set to #f. Shutting down.")
	    (shutdown-server-sequence (get-host-name) port))
	   ((timed-out?)
	    (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access))
	    (shutdown-server-sequence (get-host-name) port))
	   ((and *server-run*
		 (or (not (timed-out?))
		     (if is-main ;; do not exit if there are other servers (keep main open until all others gone)
			 (> (rmt:get-count-servers sinfo *toppath*) 1)
			 #f)))
	    (if (common:low-noise-print 120 "server continuing")
		(debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access)))
	    (loop 0 bad-sync-count (current-milliseconds)))
	   (else
	    (set! *unclean-shutdown* #f)
	    (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access))
	    (shutdown-server-sequence (get-host-name) port)
	    #;(debug:print-info 0 *default-log-port* "Sending 'quit to server, received: "
			      (open-send-receive-nn (conc iface":"port)      ;; do this here and not in server-shutdown
						    (sexpr->string 'quit))))))))))

(define (rmt:get-reasonable-hostname)
  (let* ((inhost (or (args:get-arg "-server") "-")))
    (if (equal? inhost "-")
	(get-host-name)
	inhost)))

;; Call this to start the actual server
;;
;; all routes though here end in exit ...
;;
;; This is the point at which servers are started
;;
(define (rmt:server-launch dbname)
  (debug:print-info 0 *default-log-port* "Entered rmt:server-launch")
  (let* ((th2 (make-thread (lambda ()
			     (debug:print-info 0 *default-log-port* "Server run thread started")
			     (rmt:run (rmt:get-reasonable-hostname)))
			   "Server run"))
	 (th3 (make-thread (lambda ()
			     (debug:print-info 0 *default-log-port* "Server monitor thread started")
			     (if (args:get-arg "-server")
				 (rmt:keep-running dbname)))
			     "Keep running")))
    (thread-start! th2)
    (thread-sleep! 0.252) ;; give the server time to settle before starting the keep-running monitor.
    (thread-start! th3)
    (set! *didsomething* #t)
    (thread-join! th2)
    (thread-join! th3))
  #f)
	    
;;======================================================================
;;  S E R V E R   -  D I R E C T   C A L L S
;;======================================================================

(define (rmt:kill-server run-id)
  (rmt:send-receive 'kill-server #f (list run-id)))

(define (rmt:start-server run-id)
  (rmt:send-receive 'start-server #f (list run-id)))

(define (rmt:server-info apath dbname)
  (rmt:send-receive 'get-server-info #f (list apath dbname)))

;;======================================================================
;; Nanomsg transport
;;======================================================================

#;(define (is-port-in-use port-num)
  (let* ((ret #f))
    (let-values (((inp oup pid)
		  (process "netstat" (list  "-tulpn" ))))
      (let loop ((inl (read-line inp)))
        (if (not (eof-object? inl))
            (begin 
	      (if (string-search (regexp (conc ":" port-num)) inl)
		  (begin
					;(print "Output: "  inl)
		    (set! ret  #t))
		  (loop (read-line inp)))))))
    ret))

#;(define (open-nn-connection host-port)
  (let ((req  (make-req-socket))
        (uri  (conc "tcp://" host-port)))
    (nng-dial req uri)
    (socket-set! req 'nng/recvtimeo 2000)
    req))

#;(define (send-receive-nn req msg)
  (nng-send req msg)
  (nng-recv req))

#;(define (close-nn-connection req)
  (nng-close! req))
  
;; ;; open connection to server, send message, close connection
;; ;;
;; (define (open-send-close-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds
;;   (let ((req  (make-req-socket 'req))
;;         (uri  (conc "tcp://" host-port))
;;         (res  #f)
;;         ;; (contacts (alist-ref 'contact attrib))
;;         ;; (mode (alist-ref 'mode attrib))
;; 	)
;;     (socket-set! req 'nng/recvtimeo 2000)
;;     (handle-exceptions
;;      exn
;;      (let ((emsg ((condition-property-accessor 'exn 'message) exn)))
;;        ;; Send notification       
;;        (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\"" )
;;        #f)
;;      (nng-dial req uri)
;;      ;; (print "Connected to the server " )
;;      (nng-send req msg)
;;      ;; (print "Request Sent")  
;;      (let* ((th1  (make-thread (lambda ()
;;                                  (let ((resp (nng-recv req)))
;;                                    (nng-close! req)
;;                                    (set! res (if (equal? resp "ok")
;;                                                  #t
;;                                                  #f))))
;;                                "recv thread"))
;;             (th2 (make-thread (lambda ()
;;                                 (thread-sleep! timeout)
;;                                 (thread-terminate! th1))
;; 			      "timer thread")))
;;        (thread-start! th1)
;;        (thread-start! th2)
;;        (thread-join! th1)
;;        res))))
;; 
#;(define (open-send-receive-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds
  (let ((req  (make-req-socket))
        (uri  (conc "tcp://" host-port))
        (res  #f)) 
    (handle-exceptions
     exn
     (let ((emsg ((condition-property-accessor 'exn 'message) exn)))
       ;; Send notification      
       (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\", exn=" exn)
       #f)
     (nng-dial req uri)
     (nng-send req msg)
     (let* ((th1  (make-thread (lambda ()
                                 (let ((resp (nng-recv req)))
                                   (nng-close! req)
                                   ;; (print resp)
                                   (set! res resp)))
                               "recv thread"))
            (th2 (make-thread (lambda ()
                                (thread-sleep! timeout)
                                (thread-terminate! th1))
                             "timer thread")))
       (thread-start! th1)
       (thread-start! th2)
       (thread-join! th1)
       res))))

;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;; run ping in separate process, safest way in some cases
;;
#;(define (server:ping-server ifaceport)
  (with-input-from-pipe 
   (conc (common:get-megatest-exe) " -ping " ifaceport)
   (lambda ()
     (let loop ((inl (read-line))
		(res "NOREPLY"))
       (if (eof-object? inl)
	   (case (string->symbol res)
	     ((NOREPLY)  #f)
	     ((LOGIN_OK) #t)
	     (else       #f))
	   (loop (read-line) inl))))))

;; NOT USED (well, ok, reference in rpc-transport but otherwise not used).
;;
#;(define (server:login toppath)
  (lambda (toppath)
    (set! *db-last-access* (current-seconds)) ;; might not be needed.
    (if (equal? *toppath* toppath)
	#t
	#f)))

;; (define server:sync-lock-token "SERVER_SYNC_LOCK")
;; (define (server:release-sync-lock)
;;   (db:no-sync-del! *no-sync-db* server:sync-lock-token))
;; (define (server:have-sync-lock?)
;;   (let* ((have-lock-pair (db:no-sync-get-lock *no-sync-db* server:sync-lock-token))
;;          (have-lock?     (car have-lock-pair))
;;          (lock-time      (cdr have-lock-pair))
;;          (lock-age       (- (current-seconds) lock-time)))
;;     (cond
;;      (have-lock? #t)
;;      ((>lock-age
;;        (* 3 (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 180)))
;;       (server:release-sync-lock)
;;       (server:have-sync-lock?))
;;      (else #f))))

)