Index: Makefile ================================================================== --- Makefile +++ Makefile @@ -24,10 +24,18 @@ SRCFILES = # all : $(PREFIX)/bin/.$(ARCHSTR) mtest dboard mtut ndboard all : $(PREFIX)/bin/.$(ARCHSTR) mtest # add dboard mtut and tcmt back later + +# Configuration stuff +transport-flavor : + @echo Creating transport-flavor with full as flavor. Options include: full, simple + echo full > transport-flavor + +ulex.scm dbmgrmod.scm : ulex.scm.template dbmgrmod.scm.template transport-flavor ulex-*/*scm + ./configure # module source files MSRCFILES = autoload.scm dbi.scm ducttape-lib.scm pkts.scm stml2.scm \ cookie.scm mutils.scm mtargs.scm apimod.scm ulex.scm \ configfmod.scm commonmod.scm dbmod.scm rmtmod.scm \ @@ -36,23 +44,17 @@ itemsmod.scm keysmod.scm mtmod.scm rmtmod.scm \ tasksmod.scm pgdb.scm launchmod.scm runsmod.scm \ portloggermod.scm archivemod.scm ezstepsmod.scm \ subrunmod.scm bigmod.scm testsmod.scm dbmgrmod.scm -# GUISRCF = - GUIMODFILES = tree.scm dashboard-tests.scm vgmod.scm \ dashboard-context-menu.scm dcommon.scm -# dashboard-guimonitor.scm - mofiles/dashboard-context-menu.o : mofiles/dcommon.o mofiles/dashboard-tests.o : mofiles/dcommon.o -# mofiles/dcommon.o mofiles/tree.o : mofiles/gutils.o OFILES = $(SRCFILES:%.scm=%.o) -# GOFILES = $(GUISRCF:%.scm=%.o) MOFILES = $(addprefix mofiles/,$(MSRCFILES:%.scm=%.o)) GMOFILES = $(addprefix mofiles/,$(GUIMODFILES:%.scm=%.o)) # compiled import files ADDED attic/configure Index: attic/configure ================================================================== --- /dev/null +++ attic/configure @@ -0,0 +1,101 @@ +#!/bin/bash + +# Copyright 2006-2017, Matthew Welland. +# +# This file is part of Megatest. +# +# Megatest is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Megatest is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Megatest. If not, see . + +# Configure the build + +if [[ "$1"x == "x" ]];then + PREFIX=$PWD +else + PREFIX=$1 +fi + + +#====================================================================== +# Configure stuff needed for eggs +#====================================================================== + +function configure_dependencies () { + + #====================================================================== + # libnanomsg + #====================================================================== + + if [[ ! $(ls /usr/lib/*/libnanomsg*) ]];then + echo "libnanomsg build needed." + echo "BUILD_NANOMSG=yes" >> makefile.inc + fi + + #====================================================================== + # postgresql libraries + #====================================================================== + + if [[ ! $(ls /usr/lib/*/libpq.*) ]];then + echo "Postgresql build needed." + echo "BUILD_POSTGRES=yes" >> makefile.inc + fi + + if [[ ! $(ls /usr/lib/*/libsqlite3.*) ]];then + echo "Sqlite3 build needed." + echo "BUILD_SQLITE3=yes" >> makefile.inc + fi + +} + +#====================================================================== +# Initialize makefile.inc +#====================================================================== + +echo "" > makefile.inc + +#====================================================================== +# Do we need Chicken? +#====================================================================== + +if [[ -e /usr/bin/sw_vers ]]; then + ARCHSTR=$(/usr/bin/sw_vers -productVersion) +else + ARCHSTR=$(lsb_release -sr) +fi + +echo "CHICKEN_PREFIX=$PREFIX/.$ARCHSTR" >> makefile.inc +CHICKEN_PREFIX=$PREFIX/bin/.$ARCHSTR + +if [[ ! $(type csi) ]];then + echo "Chicken build needed." + echo "BUILD_CHICKEN=yes" >> makefile.inc + configure_dependencies + echo "include chicken.makefile" >> makefile.inc +else + echo "CSIPATH=$(which csi)" >> makefile.inc + CSIPATH=$(which csi) + echo "CKPATH=$(dirname $(dirname $CSIPATH))" >> makefile.inc +fi + +# Make setup scripts +echo "#!/bin/bash" > setup.sh +echo "export PATH=$CHICKEN_PREFIX/bin:\$PATH" >> setup.sh +echo "export LD_LIBRARY_PATH=$CHICKEN_PREFIX/lib" >> setup.sh +echo 'exec "$@"' >> setup.sh +chmod a+x setup.sh + +echo "setenv PATH $CHICKEN_PREFIX/bin:\$PATH" > setup.csh +echo "setenv LD_LIBRARY_PATH $CHICKEN_PREFIX/lib" >> setup.csh + +echo "All done creating makefile.inc, feel free to edit it!" +echo "run \"setup.sh bash\" or source setup.csh to get PATH and LD_LIBRARY_PATH adjusted" Index: commonmod.scm ================================================================== --- commonmod.scm +++ commonmod.scm @@ -184,10 +184,11 @@ make-and-init-bigdata call-with-environment-variables common:simple-file-lock common:simple-file-lock-and-wait common:simple-file-release-lock +common:with-simple-file-lock common:fail-safe get-file-descriptor-count common:get-this-exe-fullpath common:get-sync-lock-filepath common:find-local-megatest @@ -1242,10 +1243,17 @@ (define (common:simple-file-release-lock fname) (handle-exceptions exn #f ;; I don't really care why this failed (at least for now) (delete-file* fname))) + +(define (common:with-simple-file-lock fname proc) + (let* ((lkfname (conc fname ".lock"))) + (common:simple-file-lock-and-wait lkfname) + (let ((res (proc))) + (common:simple-file-release-lock lkfname) + res))) ;;====================================================================== ;; PUlled below from common.scm ;;====================================================================== Index: configfmod.scm ================================================================== --- configfmod.scm +++ configfmod.scm @@ -49,10 +49,11 @@ configf:write-alist configf:write-config find-config getenv mytarget + my-with-lock nice-path process:cmd-run->list runconfig:read runconfigs-get safe-setenv @@ -114,14 +115,19 @@ ;;====================================================================== ;; while targets are Megatest specific they are a useful concept (define mytarget (make-parameter #f)) +;; fake locker +(define (fake-locker fname proc)(proc)) + ;; locking is optional, many environments don't care (e.g. running on one machine) ;; NOTE: the locker must follow the same syntax as with-dot-lock* +;; with-dot-lock* has problems if /tmp and the file being +;; locked are not on the same filesystem ;; -(define my-with-lock (make-parameter with-dot-lock*)) +(define my-with-lock (make-parameter fake-locker)) ;; with-dot-lock*)) ;;====================================================================== ;; move debug stuff to separate module then put these back where they belong ;;====================================================================== ;;====================================================================== @@ -1190,11 +1196,11 @@ ;;====================================================================== ;; DO THE LOCKING AROUND THE CALL ;;====================================================================== ;; -(define (configf:write-alist cdat fname) +(define (configf:write-alist cdat fname #!optional (check-written #f)) ;; (if (not (common:faux-lock fname)) ;; (debug:print 0 *default-log-port* "INFO: NEED LOCKING ADDED HERE " fname) ((my-with-lock) fname (lambda () @@ -1202,26 +1208,27 @@ (res (begin (with-output-to-file fname ;; first write out the file (lambda () (pp dat))) - ;; I don't like this. It makes write-alist opaque and complicated. -mrw- - (if (file-exists? fname) ;; now verify it is readable - (if (configf:read-alist fname) - #t ;; data is good. - (begin - (handle-exceptions - exn - (begin - (debug:print 0 *default-log-port* "deleting " fname " failed, exn=" exn) - #f) - (debug:print 0 *default-log-port* "WARNING: content " dat " for cache " fname " is not readable. Deleting generated file.") - (delete-file fname)) - #f)) - #f)))) + ;; I don't like this. It makes write-alist complicated + ;; move to something like write-and-verify-alist. -mrw- + (if check-written + (if (file-exists? fname) ;; now verify it is readable + (if (configf:read-alist fname) + 'data-good ;; data is good. + (handle-exceptions + exn + (begin + (debug:print 0 *default-log-port* "deleting " fname " failed, exn=" exn) + 'data-bad) + (debug:print 0 *default-log-port* "WARNING: content " dat " for cache " fname " is not readable. Deleting generated file.") + (delete-file fname))) + 'data-not-there) + 'data-not-checked)))) res)))) (define (common:get-fields cfgdat) (let ((fields (hash-table-ref/default cfgdat "fields" '()))) (map car fields))) ) Index: configure ================================================================== --- configure +++ configure @@ -15,87 +15,18 @@ # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Megatest. If not, see . -# Configure the build - -if [[ "$1"x == "x" ]];then - PREFIX=$PWD -else - PREFIX=$1 -fi - - -#====================================================================== -# Configure stuff needed for eggs -#====================================================================== - -function configure_dependencies () { - - #====================================================================== - # libnanomsg - #====================================================================== - - if [[ ! $(ls /usr/lib/*/libnanomsg*) ]];then - echo "libnanomsg build needed." - echo "BUILD_NANOMSG=yes" >> makefile.inc - fi - - #====================================================================== - # postgresql libraries - #====================================================================== - - if [[ ! $(ls /usr/lib/*/libpq.*) ]];then - echo "Postgresql build needed." - echo "BUILD_POSTGRES=yes" >> makefile.inc - fi - - if [[ ! $(ls /usr/lib/*/libsqlite3.*) ]];then - echo "Sqlite3 build needed." - echo "BUILD_SQLITE3=yes" >> makefile.inc - fi - -} - -#====================================================================== -# Initialize makefile.inc -#====================================================================== - -echo "" > makefile.inc - -#====================================================================== -# Do we need Chicken? -#====================================================================== - -if [[ -e /usr/bin/sw_vers ]]; then - ARCHSTR=$(/usr/bin/sw_vers -productVersion) -else - ARCHSTR=$(lsb_release -sr) -fi - -echo "CHICKEN_PREFIX=$PREFIX/.$ARCHSTR" >> makefile.inc -CHICKEN_PREFIX=$PREFIX/bin/.$ARCHSTR - -if [[ ! $(type csi) ]];then - echo "Chicken build needed." - echo "BUILD_CHICKEN=yes" >> makefile.inc - configure_dependencies - echo "include chicken.makefile" >> makefile.inc -else - echo "CSIPATH=$(which csi)" >> makefile.inc - CSIPATH=$(which csi) - echo "CKPATH=$(dirname $(dirname $CSIPATH))" >> makefile.inc -fi - -# Make setup scripts -echo "#!/bin/bash" > setup.sh -echo "export PATH=$CHICKEN_PREFIX/bin:\$PATH" >> setup.sh -echo "export LD_LIBRARY_PATH=$CHICKEN_PREFIX/lib" >> setup.sh -echo 'exec "$@"' >> setup.sh -chmod a+x setup.sh - -echo "setenv PATH $CHICKEN_PREFIX/bin:\$PATH" > setup.csh -echo "setenv LD_LIBRARY_PATH $CHICKEN_PREFIX/lib" >> setup.csh - -echo "All done creating makefile.inc, feel free to edit it!" -echo "run \"setup.sh bash\" or source setup.csh to get PATH and LD_LIBRARY_PATH adjusted" +# Flavors include: simple, full and none + +# look at build.config (not a version controlled file and +# create ulex.scm and dbmgr.scm + +if [[ -e transport-flavor ]];then + FLAVOR=$(cat transport-flavor) +else + FLAVOR=full +fi + +sed -e "s/FLAVOR/$FLAVOR/" ulex.scm.template > ulex.scm +sed -e "s/FLAVOR/$FLAVOR/" dbmgrmod.scm.template > dbmgrmod.scm Index: dbmod.scm ================================================================== --- dbmod.scm +++ dbmod.scm @@ -3454,12 +3454,14 @@ (db:with-db dbstruct run-id #f (lambda (db) - (let* ((stmth (db:get-cache-stmth dbstruct db qry))) - (sqlite3:first-result stmth run-id)))))) + (let* (#;(stmth (db:get-cache-stmth dbstruct db qry))) + #;(sqlite3:first-result stmth run-id) + (sqlite3:first-result db qry run-id) + ))))) ;; For a given testname how many items are running? Used to determine ;; probability for regenerating html ;; (define (db:get-count-tests-running-for-testname dbstruct run-id testname) Index: dcommon.scm ================================================================== --- dcommon.scm +++ dcommon.scm @@ -26,10 +26,11 @@ (declare (uses commonmod)) (declare (uses configfmod)) (declare (uses rmtmod)) (declare (uses mtargs)) (declare (uses testsmod)) +(declare (uses dbmgrmod)) (module dcommon * (import scheme @@ -62,10 +63,11 @@ srfi-1 ) (import mtver dbmod + dbmgrmod commonmod debugprint configfmod rmtmod ;; gutils Index: launchmod.scm ================================================================== --- launchmod.scm +++ launchmod.scm @@ -663,11 +663,12 @@ (let* ((tconfig-fname (conc work-area "/.testconfig")) (tconfig-tmpfile (conc tconfig-fname ".tmp")) (tconfig (tests:get-testconfig test-name item-path tconfigreg #t force-create: #t)) ;; 'return-procs))) (scripts (configf:get-section tconfig "scripts"))) ;; create .testconfig file - (configf:write-alist tconfig tconfig-tmpfile) + (configf:write-alist tconfig tconfig-tmpfile #t) ;; the #t forces a check of the written data + (assert (file-exists? tconfig-tmpfile) "FATAL: We just wrote the dang file, how can it not exist?") (move-file tconfig-tmpfile tconfig-fname #t) (delete-file* ".final-status") ;; extract scripts from testconfig and write them to files in test run dir (for-each Index: megatest.scm ================================================================== --- megatest.scm +++ megatest.scm @@ -167,12 +167,14 @@ ;; ;; ulex parameters ;; (work-method 'direct) ;; (return-method 'direct) ;; ulex parameters - (work-method 'mailbox) - (return-method 'mailbox) + ;; (work-method 'mailbox) + ;; (return-method 'mailbox) + +(my-with-lock common:with-simple-file-lock) ;; fake out readline usage of toplevel-command (define (toplevel-command . a) #f) (define *didsomething* #f) (define *db* #f) ;; this is only for the repl, do not use in general!!!! Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -1191,11 +1191,11 @@ (if (and no-hurry (debug:debug-mode 18)) (rmt:print-db-stats)) (let ((th1 (make-thread (lambda () ;; thread for cleaning up, give it five seconds (let* ((start-time (current-seconds))) - (if *db-serv-info* + #;(if *db-serv-info* (let* ((host (servdat-host *db-serv-info*)) (port (servdat-port *db-serv-info*))) (debug:print-info 0 *default-log-port* "Shutting down server/responder.") ;; ;; TODO - add flushing/waiting on the work queue Index: runsmod.scm ================================================================== --- runsmod.scm +++ runsmod.scm @@ -244,10 +244,23 @@ (begin (hash-table-set! *runs:denoise* key currtime) #t) #f))) +(define *too-soon-delays* (make-hash-table)) + +;; to-soon delay, when matching event happened in less than dseconds delay wseconds +;; +(define (runs:too-soon-delay key dseconds wseconds) + (let* ((last-time (hash-table-ref/default *too-soon-delays* key #f))) + (if (and last-time + (< (- (current-seconds) last-time) dseconds)) + (begin + (debug:print-info 0 *default-log-port* "Whoa, slow down there ... "key" has been too recently seen.") + (thread-sleep! wseconds))) + (hash-table-set! *too-soon-delays* key (current-seconds)))) + (define (runs:can-run-more-tests runsdat run-id jobgroup max-concurrent-jobs) ;; Take advantage of a good place to exit if running the one-pass methodology (if (and (> (runs:dat-can-run-more-tests-count runsdat) 20) (args:get-arg "-one-pass")) @@ -1467,11 +1480,13 @@ newtal: newtal itemmaps: itemmaps ;; prereqs-not-met: prereqs-not-met ))) (runs:dat-regfull-set! runsdat regfull) - + + (runs:too-soon-delay (conc "loop delay " hed) 1 1) + (if (> num-running 0) (set! last-time-some-running (current-seconds))) (if (> (current-seconds)(+ last-time-some-running (or (configf:lookup *configdat* "setup" "give-up-waiting") 36000))) (hash-table-set! *max-tries-hash* tfullname (+ (hash-table-ref/default *max-tries-hash* tfullname 0) 1))) @@ -1494,10 +1509,11 @@ (if (or (not (null? tal))(not (null? reg))) (loop (runs:queue-next-hed tal reg reglen regfull) (runs:queue-next-tal tal reg reglen regfull) (runs:queue-next-reg tal reg reglen regfull) reruns)))) + ;; (loop (car tal)(cdr tal) reg reruns)))) (runs:incremental-print-results run-id) (debug:print 4 *default-log-port* "TOP OF LOOP => " "test-name: " test-name @@ -1725,10 +1741,11 @@ (equal? (configf:lookup *configdat* "setup" "run-wait") "yes")) (> num-running 0)) (begin ;; Here we mark any old defunct tests as incomplete. Do this every fifteen minutes ;; (debug:print 0 *default-log-port* "Got here eh! num-running=" num-running " (> num-running 0) " (> num-running 0)) + (thread-sleep! 5) ;; let's always sleep, prevents abutting calls to rum:get-count-tests-running-for-run-id - didn't help (if (> (current-seconds)(+ last-time-incomplete 900)) (let ((actual-num-running (rmt:get-count-tests-running-for-run-id run-id))) (debug:print-info 0 *default-log-port* "Marking stuck tests as INCOMPLETE while waiting for run " run-id ". Running as pid " (current-process-id) " on " (get-host-name)) (set! last-time-incomplete (current-seconds)) ;; FIXME, this might be causing slow down - use of set! @@ -1735,11 +1752,10 @@ (runs:find-and-mark-incomplete-and-check-end-of-run run-id #f) (debug:print-info 0 *default-log-port* "run-wait specified, waiting on " actual-num-running " tests in RUNNING, REMOTEHOSTSTART or LAUNCHED state at " (time->string (seconds->local-time (current-seconds)))))) ;; (if (runs:dat-load-mgmt-function runsdat)((runs:dat-load-mgmt-function runsdat))) - (thread-sleep! 5) ;; (if (>= num-running max-concurrent-jobs) 5 1)) (wait-loop (rmt:get-count-tests-running-for-run-id run-id) num-running)))) ;; LET* ((test-record ;; we get here on "drop through". All done! ;; this is moved to runs:run-testes since this function is getting called twice to ensure everthing is completed. Index: tests/simplerun/tests/test1/testconfig ================================================================== --- tests/simplerun/tests/test1/testconfig +++ tests/simplerun/tests/test1/testconfig @@ -24,11 +24,11 @@ [requirements] # waiton setup priority 0 # Iteration for your tests are controlled by the items section -[items] +# [items] # PARTOFDAY morning noon afternoon evening night # test_meta is a section for storing additional data on your test [test_meta] author matt ADDED ulex-full/dbmgr.scm Index: ulex-full/dbmgr.scm ================================================================== --- /dev/null +++ ulex-full/dbmgr.scm @@ -0,0 +1,1131 @@ +;;====================================================================== +;; Copyright 2022, Matthew Welland. +;; +;; This file is part of Megatest. +;; +;; Megatest is free software: you can redistribute it and/or modify +;; it under the terms of the GNU General Public License as published by +;; the Free Software Foundation, either version 3 of the License, or +;; (at your option) any later version. +;; +;; Megatest is distributed in the hope that it will be useful, +;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +;; GNU General Public License for more details. +;; +;; You should have received a copy of the GNU General Public License +;; along with Megatest. If not, see . + +;;====================================================================== + +(declare (unit dbmgrmod)) + +(declare (uses ulex)) +(declare (uses apimod)) +(declare (uses pkts)) +(declare (uses commonmod)) +(declare (uses dbmod)) +(declare (uses mtargs)) +(declare (uses portloggermod)) +(declare (uses debugprint)) + +(module dbmgrmod + * + +(import scheme + chicken.base + chicken.condition + chicken.file + chicken.format + chicken.port + chicken.process + chicken.process-context + chicken.process-context.posix + chicken.sort + chicken.string + chicken.time + + (prefix sqlite3 sqlite3:) + matchable + md5 + message-digest + regex + s11n + srfi-1 + srfi-18 + srfi-69 + system-information + typed-records + + pkts + ulex + + commonmod + apimod + dbmod + debugprint + (prefix mtargs args:) + portloggermod + ) + +;; Configurations for server +;; (tcp-buffer-size 2048) +;; (max-connections 2048) + +;; info about me as a listener and my connections to db servers +;; stored (for now) in *db-serv-info* +;; +(defstruct servdat + (host #f) + (port #f) + (uuid #f) + (dbfile #f) + (uconn #f) ;; this is the listener *FOR THIS PROCESS* + (mode #f) + (status 'starting) + (trynum 0) ;; count the number of ports we've tried + (conns (make-hash-table)) ;; apath/dbname => conndat + ) + +(define *db-serv-info* (make-servdat)) + +(define (servdat->url sdat) + (conc (servdat-host sdat)":"(servdat-port sdat))) + +;; db servers contact info +;; +(defstruct conndat + (apath #f) + (dbname #f) + (fullname #f) + (hostport #f) + (ipaddr #f) + (port #f) + (srvpkt #f) + (srvkey #f) + (lastmsg 0) + (expires 0)) + +(define *srvpktspec* + `((server (host . h) + (port . p) + (servkey . k) + (pid . i) + (ipaddr . a) + (dbpath . d)))) + +;;====================================================================== +;; S U P P O R T F U N C T I O N S +;;====================================================================== + +;; set up the api proc, seems like there should be a better place for this? +;; +;; IS THIS NEEDED ANYMORE? TODO - REMOVE IF POSSIBLE +;; +;; (define api-proc (make-parameter conc)) +;; (api-proc api:execute-requests) + +;; do we have a connection to apath dbname and +;; is it not expired? then return it +;; +;; else setup a connection +;; +;; if that fails, return '(#f "some reason") ;; NB// convert to raising an exception +;; +(define (rmt:get-conn remdat apath dbname) + (let* ((fullname (db:dbname->path apath dbname))) + (hash-table-ref/default (servdat-conns remdat) fullname #f))) + +(define (rmt:drop-conn remdat apath dbname) + (let* ((fullname (db:dbname->path apath dbname))) + (hash-table-delete! (servdat-conns remdat) fullname))) + +(define (rmt:find-main-server uconn apath dbname) + (let* ((pktsdir (get-pkts-dir apath)) + (all-srvpkts (get-all-server-pkts pktsdir *srvpktspec*)) + (viable-srvs (get-viable-servers all-srvpkts dbname))) + (get-the-server uconn apath viable-srvs))) + + +(define *connstart-mutex* (make-mutex)) +(define *last-main-start* 0) + +;; looks for a connection to main, returns if have and not exired +;; creates new otherwise +;; +;; connections for other servers happens by requesting from main +;; +;; TODO: This is unnecessarily re-creating the record in the hash table +;; +(define (rmt:open-main-connection remdat apath) + (let* ((fullpath (db:dbname->path apath ".db/main.db")) + (conns (servdat-conns remdat)) + (conn (rmt:get-conn remdat apath ".db/main.db")) ;; (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this + (start-rmt:run (lambda () + (let* ((th1 (make-thread (lambda ()(rmt:run (get-host-name))) "non-db mode server"))) + (thread-start! th1) + (thread-sleep! 1) + (let loop ((count 0)) + (assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection") + (if (or (not *db-serv-info*) + (not (servdat-uconn *db-serv-info*))) + (begin + (thread-sleep! 1) + (loop (+ count 1))) + (begin + (servdat-mode-set! *db-serv-info* 'non-db) + (servdat-uconn *db-serv-info*))))))) + (myconn (servdat-uconn *db-serv-info*))) + (cond + ((not myconn) + (start-rmt:run) + (rmt:open-main-connection remdat apath)) + ((and conn ;; conn is NOT a socket, just saying ... + (< (current-seconds) (conndat-expires conn))) + #t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died + ((and conn + (>= (current-seconds)(conndat-expires conn))) + (debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.") + (rmt:drop-conn remdat apath ".db/main.db") ;; + (rmt:open-main-connection remdat apath)) + (else + ;; Below we will find or create and connect to main + (debug:print-info 0 *default-log-port* "rmt:open-main-connection - starting from scratch") + (let* ((dbname (db:run-id->dbname #f)) + (the-srv (rmt:find-main-server myconn apath dbname)) + (start-main-srv (lambda () ;; call IF there is no the-srv found + (mutex-lock! *connstart-mutex*) + (if (> (- (current-seconds) *last-main-start*) 5) ;; at least four seconds since last attempt to start main server + (begin + (api:run-server-process apath dbname) + (set! *last-main-start* (current-seconds)) + (thread-sleep! 1)) + (thread-sleep! 0.25)) + (mutex-unlock! *connstart-mutex*) + (rmt:open-main-connection remdat apath) ;; TODO: Add limit to number of tries + ))) + (if (not the-srv) ;; have server, try connecting to it + (start-main-srv) + (let* ((srv-addr (server-address the-srv)) ;; need serv + (ipaddr (alist-ref 'ipaddr the-srv)) + (port (alist-ref 'port the-srv)) + (srvkey (alist-ref 'servkey the-srv)) + (fullpath (db:dbname->path apath dbname)) + + (new-the-srv (make-conndat + apath: apath + dbname: dbname + fullname: fullpath + hostport: srv-addr + ;; socket: (open-nn-connection srv-addr) - TODO - open ulex connection? + ipaddr: ipaddr + port: port + srvpkt: the-srv + srvkey: srvkey ;; generated by rmt:get-signature on the server side + lastmsg: (current-seconds) + expires: (+ (current-seconds) + (server:expiration-timeout) + -2) ;; this needs to be gathered during the ping + ))) + (hash-table-set! conns fullpath new-the-srv))) + #t))))) + +;; NB// sinfo is a servdat struct +;; +(define (rmt:general-open-connection sinfo apath dbname #!key (num-tries 5)) + (assert (not (equal? dbname ".db/main.db")) "ERROR: general-open-connection should never be called with main as the db") + (let* ((mdbname ".db/main.db") ;; (db:run-id->dbname #f)) TODO: put this back to the lookup when stable + (fullname (db:dbname->path apath dbname)) + (conns (servdat-conns sinfo)) + (mconn (rmt:get-conn sinfo apath ".db/main.db")) + (dconn (rmt:get-conn sinfo apath dbname))) + #;(if (and mconn + (not (debug:print-logger))) + (begin + (debug:print-info 0 *default-log-port* "Turning on logging to main, look in logs dir for main log.") + (debug:print-logger rmt:log-to-main))) + (cond + ((and mconn + dconn + (< (current-seconds)(conndat-expires dconn))) + #t) ;; good to go + ((not mconn) ;; no channel open to main? open it... + (rmt:open-main-connection sinfo apath) + (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1))) + ((not dconn) ;; no channel open to dbname? + (let* ((res (rmt:send-receive-real sinfo apath mdbname 'get-server `(,apath ,dbname)))) + (case res + ((server-started) + (if (> num-tries 0) + (begin + (thread-sleep! 2) + (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1))) + (begin + (debug:print-error 0 *default-log-port* "Failed to start servers needed or open channel to "apath", "dbname) + (exit 1)))) + (else + (if (list? res) ;; server has been registered and the info was returned. pass it on. + (begin ;; ("192.168.0.9" 53817 + ;; "5e34239f48e8973b3813221e54701a01" "24310" + ;; "192.168.0.9" + ;; "/home/matt/data/megatest/tests/simplerun" + ;; ".db/1.db") + (match + res + ((host port servkey pid ipaddr apath dbname) + (debug:print-info 0 *default-log-port* "got "res) + (hash-table-set! conns + fullname + (make-conndat + apath: apath + dbname: dbname + hostport: (conc host":"port) + ;; socket: (open-nn-connection (conc host":"port)) ;; TODO - open ulex connection? + ipaddr: ipaddr + port: port + srvkey: servkey + lastmsg: (current-seconds) + expires: (+ (current-seconds) + (server:expiration-timeout) + -2)))) + (else + (debug:print-info 0 *default-log-port* "return data from starting server did not match host port servkey pid ipaddr apath dbname " res))) + res) + (begin + (debug:print-info 0 *default-log-port* "Unexpected result: " res) + res))))))) + #t)) + +;;====================================================================== + +;; FOR DEBUGGING SET TO #t +;; (define *localmode* #t) +(define *localmode* #f) +(define *dbstruct* (make-dbr:dbstruct)) + +;; Defaults to current area +;; +(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) + (let* ((apath *toppath*) + (sinfo *db-serv-info*) + (dbname (db:run-id->dbname rid))) + (if *localmode* + (api:execute-requests *dbstruct* cmd params) + (begin + (rmt:open-main-connection sinfo apath) + (if rid (rmt:general-open-connection sinfo apath dbname)) + #;(if (not (member cmd '(log-to-main))) + (debug:print-info 0 *default-log-port* "rmt:send-receive "cmd" params="params)) + (rmt:send-receive-real sinfo apath dbname cmd params))))) + +;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed +;; sometime in the future +;; +(define (rmt:send-receive-real sinfo apath dbname cmd params) + (assert (not (eq? 'primordial (thread-name (current-thread)))) "FATAL: Do not call rmt:send-receive-real in the primodial thread.") + (let* ((cdat (rmt:get-conn sinfo apath dbname))) + (assert cdat "FATAL: rmt:send-receive-real called without the needed channels opened") + (let* ((uconn (servdat-uconn sinfo)) ;; get the interface to ulex + ;; then send-receive using the ulex layer to host-port stored in cdat + (res (send-receive uconn (conndat-hostport cdat) cmd params)) + #;(th1 (make-thread (lambda () + (set! res (send-receive uconn (conndat-hostport cdat) cmd params))) + "send-receive thread"))) + ;; (thread-start! th1) + ;; (thread-join! th1) ;; gratuitious thread stuff is so that mailbox is not used in primordial thead + ;; since we accessed the server we can bump the expires time up + (conndat-expires-set! cdat (+ (current-seconds) + (server:expiration-timeout) + -2)) ;; two second margin for network time misalignments etc. + res))) + +;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed +;; sometime in the future. +;; +;; Purpose - call the main.db server and request a server be started +;; for the given area path and dbname +;; + +(define (rmt:print-db-stats) + (let ((fmtstr "~40a~7-d~9-d~20,2-f")) ;; "~20,2-f" + (debug:print 18 *default-log-port* "DB Stats, "(seconds->year-week/day-time (current-seconds))"\n=====================") + (debug:print 18 *default-log-port* (format #f "~40a~8a~10a~10a" "Cmd" "Count" "TotTime" "Avg")) + (for-each (lambda (cmd) + (let ((cmd-dat (hash-table-ref *db-stats* cmd))) + (debug:print 18 *default-log-port* (format #f fmtstr cmd (vector-ref cmd-dat 0) (vector-ref cmd-dat 1) (/ (vector-ref cmd-dat 1)(vector-ref cmd-dat 0)))))) + (sort (hash-table-keys *db-stats*) + (lambda (a b) + (> (vector-ref (hash-table-ref *db-stats* a) 0) + (vector-ref (hash-table-ref *db-stats* b) 0))))))) + +(define (rmt:get-max-query-average run-id) + (mutex-lock! *db-stats-mutex*) + (let* ((runkey (conc "run-id=" run-id " ")) + (cmds (filter (lambda (x) + (substring-index runkey x)) + (hash-table-keys *db-stats*))) + (res (if (null? cmds) + (cons 'none 0) + (let loop ((cmd (car cmds)) + (tal (cdr cmds)) + (max-cmd (car cmds)) + (res 0)) + (let* ((cmd-dat (hash-table-ref *db-stats* cmd)) + (tot (vector-ref cmd-dat 0)) + (curravg (/ (vector-ref cmd-dat 1) (vector-ref cmd-dat 0))) ;; count is never zero by construction + (currmax (max res curravg)) + (newmax-cmd (if (> curravg res) cmd max-cmd))) + (if (null? tal) + (if (> tot 10) + (cons newmax-cmd currmax) + (cons 'none 0)) + (loop (car tal)(cdr tal) newmax-cmd currmax))))))) + (mutex-unlock! *db-stats-mutex*) + res)) + +;; host and port are used to ensure we are remove proper records +(define (rmt:server-shutdown host port) + (let ((dbfile (servdat-dbfile *db-serv-info*))) + (debug:print-info 0 *default-log-port* "dbfile is "dbfile) + (if dbfile + (let* ((am-server (args:get-arg "-server")) + (dbfile (args:get-arg "-db")) + (apath *toppath*) + #;(sinfo *remotedat*)) ;; foundation for future fix + (if *dbstruct-db* + (let* ((dbdat (db:get-dbdat *dbstruct-db* apath dbfile)) + (db (dbr:dbdat-db dbdat)) + (inmem (dbr:dbdat-db dbdat)) ;; WRONG + ) + ;; do a final sync here + (debug:print-info 0 *default-log-port* "Doing final sync for "apath" "dbfile" at "(current-seconds)) + (db:sync-inmem->disk *dbstruct-db* apath dbfile force-sync: #t) + ;; let's finalize here + (debug:print-info 0 *default-log-port* "Finalizing db and inmem") + (if (sqlite3:database? db) + (sqlite3:finalize! db) + (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, db is not a database, not finalizing...")) + (if (sqlite3:database? inmem) + (sqlite3:finalize! inmem) + (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, inmem is not a database, not finalizing...")) + (debug:print-info 0 *default-log-port* "Finalizing db and inmem complete")) + (debug:print-info 0 *default-log-port* "Db was never opened, no cleanup to do.")) + (if (not am-server) + (debug:print-info 0 *default-log-port* "I am not a server, should NOT get here!") + (if (string-match ".*/main.db$" dbfile) + (let ((pkt-file (conc (get-pkts-dir *toppath*) + "/" (servdat-uuid *db-serv-info*) + ".pkt"))) + (debug:print-info 0 *default-log-port* "removing pkt "pkt-file) + (delete-file* pkt-file) + (debug:print-info 0 *default-log-port* "Releasing lock (if any) for "dbfile ", host "host", port "port) + (db:with-lock-db + (servdat-dbfile *db-serv-info*) + (lambda (dbh dbfile) + (db:release-lock dbh dbfile host port)))) ;; I'm not the server - should not have a lock to remove + (let* ((sdat *db-serv-info*) ;; we have a run-id server + (host (servdat-host sdat)) + (port (servdat-port sdat)) + (uuid (servdat-uuid sdat)) + (res (rmt:deregister-server *db-serv-info* *toppath* host port uuid dbfile))) + (debug:print-info 0 *default-log-port* "deregistered-server, res="res) + (debug:print-info 0 *default-log-port* "deregistering server "host":"port" with uuid "uuid) + ))))))) + + +(define (common:run-sync?) + ;; (and (common:on-homehost?) + (args:get-arg "-server")) + +(define *rmt:run-mutex* (make-mutex)) +(define *rmt:run-flag* #f) + +;; Main entry point to start a server. was start-server +(define (rmt:run hostn) + (mutex-lock! *rmt:run-mutex*) + (if *rmt:run-flag* + (begin + (debug:print-warn 0 *default-log-port* "rmt:run already running.") + (mutex-unlock! *rmt:run-mutex*)) + (begin + (set! *rmt:run-flag* #t) + (mutex-unlock! *rmt:run-mutex*) + ;; ;; Configurations for server + ;; (tcp-buffer-size 2048) + ;; (max-connections 2048) + (debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...") + (if (and *db-serv-info* + (servdat-uconn *db-serv-info*)) + (let* ((uconn (servdat-uconn *db-serv-info*))) + (wait-and-close uconn)) + (let* ((port (portlogger:open-run-close portlogger:find-port)) + (handler-proc (lambda (rem-host-port qrykey cmd params) ;; + (set! *db-last-access* (current-seconds)) + (assert (list? params) "FATAL: handler called with non-list params") + (assert (args:get-arg "-server") "FATAL: handler called on non-server side. cmd="cmd", params="params) + (debug:print 0 *default-log-port* "handler call: "cmd", params="params) + (api:execute-requests *dbstruct-db* cmd params)))) + ;; (api:process-request *dbstuct-db* + (if (not *db-serv-info*) + (set! *db-serv-info* (make-servdat host: hostn port: port))) + (let* ((uconn (run-listener handler-proc port)) + (rport (udat-port uconn))) ;; the real port + (servdat-host-set! *db-serv-info* hostn) + (servdat-port-set! *db-serv-info* rport) + (servdat-uconn-set! *db-serv-info* uconn) + (wait-and-close uconn) + (db:print-current-query-stats) + ))) + (let* ((host (servdat-host *db-serv-info*)) + (port (servdat-port *db-serv-info*)) + (mode (or (servdat-mode *db-serv-info*) + "non-db"))) + ;; server exit stuff here + ;; (rmt:server-shutdown host port) - always do in on-exit + ;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit + (debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting") + )))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + + +;;====================================================================== +;; NEW SERVER METHOD +;;====================================================================== + +;; only use for main.db - need to re-write some of this :( +;; +(define (get-lock-db sdat dbfile host port) + (assert host "FATAL: get-lock-db called with host not set.") + (assert port "FATAL: get-lock-db called with port not set.") + (let* ((dbh (db:open-run-db dbfile db:initialize-db)) ;; open-run-db creates a standard db with schema used by all situations + (res (db:get-iam-server-lock dbh dbfile host port)) + (uconn (servdat-uconn sdat))) + ;; res => list then already locked, check server is responsive + ;; => #t then sucessfully got the lock + ;; => #f reserved for future use as to indicate something went wrong + (match res + ((owner_pid owner_host owner_port event_time) + (if (server-ready? uconn (conc owner_host":"owner_port) "abc") + #f ;; locked by someone else + (begin ;; locked by someone dead and gone + (debug:print 0 *default-log-port* "WARNING: stale lock - have to steal it. This may fail.") + (db:steal-lock-db dbh dbfile port)))) + (#t #t) ;; placeholder so that we don't touch res if it is #t + (else (set! res #f))) + (sqlite3:finalize! dbh) + res)) + + +(define (register-server pkts-dir pkt-spec host port servkey ipaddr dbpath) + (let* ((pkt-dat `((host . ,host) + (port . ,port) + (servkey . ,servkey) + (pid . ,(current-process-id)) + (ipaddr . ,ipaddr) + (dbpath . ,dbpath))) + (uuid (write-alist->pkt + pkts-dir + pkt-dat + pktspec: pkt-spec + ptype: 'server))) + (debug:print 0 *default-log-port* "Server on "host":"port" registered in pkt "uuid) + uuid)) + +(define (get-pkts-dir #!optional (apath #f)) + (let* ((effective-toppath (or *toppath* apath))) + (assert effective-toppath + "ERROR: get-pkts-dir called without *toppath* set. Exiting.") + (let* ((pdir (conc effective-toppath "/.meta/srvpkts"))) + (if (file-exists? pdir) + pdir + (begin + (handle-exceptions ;; this exception handler should NOT be needed but ... + exn + pdir + (create-directory pdir #t)) + pdir))))) + +;; given a pkts dir read +;; +(define (get-all-server-pkts pktsdir-in pktspec) + (let* ((pktsdir (if (file-exists? pktsdir-in) + pktsdir-in + (begin + (create-directory pktsdir-in #t) + pktsdir-in))) + (all-pkt-files (glob (conc pktsdir "/*.pkt")))) + (map (lambda (pkt-file) + (read-pkt->alist pkt-file pktspec: pktspec)) + all-pkt-files))) + +(define (server-address srv-pkt) + (conc (alist-ref 'host srv-pkt) ":" + (alist-ref 'port srv-pkt))) + +(define (server-ready? uconn host-port key) ;; server-address is host:port + (let* ((params `((cmd . ping)(key . ,key))) + (data `((cmd . ping) + (key . ,key) + (params . ,params))) ;; I don't get it. + (res (send-receive uconn host-port 'ping data))) + (if (eq? res 'ack) ;; yep, likely it is who we want on the other end + res + #f))) +;; (begin (debug:print-info 0 *default-log-port* "server-ready? => "res) #f)))) + +; from the pkts return servers associated with dbpath +;; NOTE: Only one can be alive - have to check on each +;; in the list of pkts returned +;; +(define (get-viable-servers serv-pkts dbpath) + (let loop ((tail serv-pkts) + (res '())) + (if (null? tail) + res ;; NOTE: sort by age so oldest is considered first + (let* ((spkt (car tail))) + (loop (cdr tail) + (if (equal? dbpath (alist-ref 'dbpath spkt)) + (cons spkt res) + res)))))) + +(define (remove-pkts-if-not-alive uconn serv-pkts) + (filter (lambda (pkt) + (let* ((host (alist-ref 'host pkt)) + (port (alist-ref 'port pkt)) + (host-port (conc host":"port)) + (key (alist-ref 'servkey pkt)) + (pktz (alist-ref 'Z pkt)) + (res (server-ready? uconn host-port key))) + (if res + res + (let* ((pktsdir (get-pkts-dir *toppath*)) + (pktpath (conc pktsdir"/"pktz".pkt"))) + (debug:print 0 *default-log-port* "WARNING: pkt with no server "pktpath) + (delete-file* pktpath) + #f)))) + serv-pkts)) + +;; from viable servers get one that is alive and ready +;; +(define (get-the-server uconn apath serv-pkts) + (let loop ((tail serv-pkts)) + (if (null? tail) + #f + (let* ((spkt (car tail)) + (host (alist-ref 'ipaddr spkt)) + (port (alist-ref 'port spkt)) + (host-port (conc host":"port)) + (dbpth (alist-ref 'dbpath spkt)) + (srvkey (alist-ref 'Z spkt)) ;; (alist-ref 'srvkey spkt)) + (addr (server-address spkt))) + (if (server-ready? uconn host-port srvkey) + spkt + (loop (cdr tail))))))) + +;; am I the "first" in line server? I.e. my D card is smallest +;; use Z card as tie breaker +;; +(define (get-best-candidate serv-pkts dbpath) + (if (null? serv-pkts) + #f + (let loop ((tail serv-pkts) + (best (car serv-pkts))) + (if (null? tail) + best + (let* ((candidate (car tail)) + (candidate-bd (string->number (alist-ref 'D candidate))) + (best-bd (string->number (alist-ref 'D best))) + ;; bigger number is younger + (candidate-z (alist-ref 'Z candidate)) + (best-z (alist-ref 'Z best)) + (new-best (cond + ((> best-bd candidate-bd) ;; best is younger than candidate + candidate) + ((< best-bd candidate-bd) ;; candidate is younger than best + best) + (else + (if (string>=? best-z candidate-z) + best + candidate))))) ;; use Z card as tie breaker + (if (null? tail) + new-best + (loop (cdr tail) new-best))))))) + + +;;====================================================================== +;; END NEW SERVER METHOD +;;====================================================================== + +;; if .db/main.db check the pkts +;; +(define (rmt:wait-for-server pkts-dir db-file server-key) + (let* ((sdat *db-serv-info*)) + (let loop ((start-time (current-seconds)) + (changed #t) + (last-sdat "not this")) + (begin ;; let ((sdat #f)) + (thread-sleep! 0.01) + (debug:print-info 0 *default-log-port* "Waiting for server alive signature") + (mutex-lock! *heartbeat-mutex*) + (set! sdat *db-serv-info*) + (mutex-unlock! *heartbeat-mutex*) + (if (and sdat + (not changed) + (> (- (current-seconds) start-time) 2)) + (let* ((uconn (servdat-uconn sdat))) + (servdat-status-set! sdat 'iface-stable) + (debug:print-info 0 *default-log-port* "Received server alive signature, now attempting to lock in server") + ;; create a server pkt in *toppath*/.meta/srvpkts + + ;; TODO: + ;; 1. change sdat to stuct + ;; 2. add uuid to struct + ;; 3. update uuid in sdat here + ;; + (servdat-uuid-set! sdat + (register-server + pkts-dir *srvpktspec* + (get-host-name) + (servdat-port sdat) server-key + (servdat-host sdat) db-file)) + ;; (set! *my-signature* (servdat-uuid sdat)) ;; replace with Z, no, stick with proper key + ;; now read pkts and see if we are a contender + (let* ((all-pkts (get-all-server-pkts pkts-dir *srvpktspec*)) + (viables (get-viable-servers all-pkts db-file)) + (alive (remove-pkts-if-not-alive uconn viables)) + (best-srv (get-best-candidate alive db-file)) + (best-srv-key (if best-srv (alist-ref 'servkey best-srv) #f)) + (i-am-srv (equal? best-srv-key server-key)) + (delete-pkt (lambda () + (let* ((pktfile (conc (get-pkts-dir *toppath*) + "/" (servdat-uuid *db-serv-info*) + ".pkt"))) + (debug:print-info 0 *default-log-port* "Attempting to remove bogus pkt file "pktfile) + (delete-file* pktfile))))) ;; remove immediately instead of waiting for on-exit + (debug:print 0 *default-log-port* "best-srv-key: "best-srv-key", server-key: "server-key", i-am-srv: "i-am-srv) + ;; am I the best-srv, compare server-keys to know + (if i-am-srv + (if (get-lock-db sdat db-file (servdat-host sdat)(servdat-port sdat)) ;; (db:get-iam-server-lock *dbstruct-db* *toppath* run-id) + (begin + (debug:print-info 0 *default-log-port* "I'm the server!") + (servdat-dbfile-set! sdat db-file) + (servdat-status-set! sdat 'db-locked)) + (begin + (debug:print-info 0 *default-log-port* "I'm not the server, exiting.") + (bdat-time-to-exit-set! *bdat* #t) + (delete-pkt) + (thread-sleep! 0.2) + (exit))) + (begin + (debug:print-info 0 *default-log-port* + "Keys do not match "best-srv-key", "server-key", exiting.") + (bdat-time-to-exit-set! *bdat* #t) + (delete-pkt) + (thread-sleep! 0.2) + (exit))) + sdat)) + (begin ;; sdat not yet contains server info + (debug:print-info 0 *default-log-port* "Still waiting, last-sdat=" last-sdat) + (sleep 4) + (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes + (begin + (debug:print-error 0 *default-log-port* "transport appears to have died, exiting server") + (exit)) + (loop start-time + (equal? sdat last-sdat) + sdat)))))))) + +(define (rmt:register-server sinfo apath iface port server-key dbname) + (servdat-conns sinfo) ;; just checking types + (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db + (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath + (db:run-id->dbname #f) + 'register-server `(,iface + ,port + ,server-key + ,(current-process-id) + ,iface + ,apath + ,dbname))) + +(define (rmt:get-count-servers sinfo apath) + (servdat-conns sinfo) ;; just checking types + (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db + (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath + (db:run-id->dbname #f) + 'get-count-servers `(,apath))) + +(define (rmt:get-servers-info apath) + (rmt:send-receive 'get-servers-info #f `(,apath))) + +(define (rmt:deregister-server db-serv-info apath iface port server-key dbname) + (rmt:open-main-connection db-serv-info apath) ;; we need a channel to main.db + (rmt:send-receive-real db-serv-info apath ;; params: host port servkey pid ipaddr dbpath + (db:run-id->dbname #f) + 'deregister-server `(,iface + ,port + ,server-key + ,(current-process-id) + ,iface + ,apath + ,dbname))) + +(define (rmt:wait-for-stable-interface #!optional (num-tries-allowed 100)) + ;; wait until *db-serv-info* stops changing + (let* ((stime (current-seconds))) + (let loop ((last-host #f) + (last-port #f) + (tries 0)) + (let* ((curr-host (and *db-serv-info* (servdat-host *db-serv-info*))) + (curr-port (and *db-serv-info* (servdat-port *db-serv-info*)))) + ;; first we verify port and interface, update *db-serv-info* in need be. + (cond + ((> tries num-tries-allowed) + (debug:print 0 *default-log-port* "rmt:keep-running, giving up after trying for several minutes.") + (exit 1)) + ((not *db-serv-info*) + (thread-sleep! 0.25) + (loop curr-host curr-port (+ tries 1))) + ((or (not last-host)(not last-port)) + (debug:print 0 *default-log-port* "rmt:keep-running, still no interface, tries="tries) + (thread-sleep! 0.25) + (loop curr-host curr-port (+ tries 1))) + ((or (not (equal? last-host curr-host)) + (not (equal? last-port curr-port))) + (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info") + (thread-sleep! 0.25) + (loop curr-host curr-port (+ tries 1))) + ((< (- (current-seconds) stime) 1) ;; keep up the looping until at least 3 seconds have passed + (thread-sleep! 0.5) + (loop curr-host curr-port (+ tries 1))) + (else + (rmt:get-signature) ;; sets *my-signature* as side effect + (servdat-status-set! *db-serv-info* 'interface-stable) + (debug:print 0 *default-log-port* + "SERVER STARTED: " curr-host + ":" curr-port + " AT " (current-seconds) " server signature: " *my-signature* + " with "(servdat-trynum *db-serv-info*)" port changes") + (flush-output *default-log-port*) + #t)))))) + +;; run rmt:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (rmt:keep-running dbname) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server") + + (let* ((sinfo *db-serv-info*) + (server-start-time (current-seconds)) + (pkts-dir (get-pkts-dir)) + (server-key (rmt:get-signature)) ;; This servers key + (is-main (equal? (args:get-arg "-db") ".db/main.db")) + (last-access 0) + (server-timeout (server:expiration-timeout)) + (shutdown-server-sequence (lambda (host port) + (set! *unclean-shutdown* #f) ;; Should not be needed anymore + (debug:print-info 0 *default-log-port* "Starting to shutdown the server. pid="(current-process-id)) + ;; (rmt:server-shutdown host port) -- called in on-exit + ;; (portlogger:open-run-close portlogger:set-port port "released") called in on-exit + (exit))) + (timed-out? (lambda () + (<= (+ last-access server-timeout) + (current-seconds))))) + (servdat-dbfile-set! *db-serv-info* (args:get-arg "-db")) + ;; main and run db servers have both got wait logic (could/should merge it) + (if is-main + (rmt:wait-for-server pkts-dir dbname server-key) + (rmt:wait-for-stable-interface)) + ;; this is our forever loop + (let* ((iface (servdat-host *db-serv-info*)) + (port (servdat-port *db-serv-info*)) + (uconn (servdat-uconn *db-serv-info*))) + (let loop ((count 0) + (bad-sync-count 0) + (start-time (current-milliseconds))) + (if (and (not is-main) + (common:low-noise-print 60 "servdat-status")) + (debug:print-info 0 *default-log-port* "servdat-status is " (servdat-status *db-serv-info*))) + + (mutex-lock! *heartbeat-mutex*) + ;; set up the database handle + (if (not *dbstruct-db*) ;; no db opened yet, open the db and register with main if appropriate + (let ((watchdog (bdat-watchdog *bdat*))) + (debug:print 0 *default-log-port* "SERVER: dbprep") + (db:setup dbname) ;; sets *dbstruct-db* as side effect + (servdat-status-set! *db-serv-info* 'db-opened) + ;; IFF I'm not main, call into main and register self + (if (not is-main) + (let ((res (rmt:register-server sinfo + *toppath* iface port + server-key dbname))) + (if res ;; we are the server + (servdat-status-set! *db-serv-info* 'have-interface-and-db) + ;; now check that the db locker is alive, clear it out if not + (let* ((serv-info (rmt:server-info *toppath* dbname))) + (match serv-info + ((host port servkey pid ipaddr apath dbpath) + (if (not (server-ready? uconn (conc host":"port) servkey)) + (begin + (debug:print-info 0 *default-log-port* "Server registered but not alive. Removing and trying again.") + (rmt:deregister-server sinfo apath host port servkey dbpath) ;; servkey pid ipaddr apath dbpath) + (loop (+ count 1) bad-sync-count start-time)))) + (else + (debug:print 0 *default-log-port* "We are not the server for "dbname", exiting. Server info is: "serv-info) + (exit))))))) + (debug:print 0 *default-log-port* + "SERVER: running, db "dbname" opened, megatest version: " + (common:get-full-version)) + ;; start the watchdog + + ;; is this really needed? + + #;(if watchdog + (if (not (member (thread-state watchdog) + '(ready running blocked + sleeping dead))) + (begin + (debug:print-info 0 *default-log-port* "Starting watchdog thread (in state "(thread-state watchdog)")") + (thread-start! watchdog)) + (debug:print-info 0 *default-log-port* "Not starting watchdog thread (in state "(thread-state watchdog)")")) + (debug:print 0 *default-log-port* "ERROR: *watchdog* not setup, cannot start it.")) + #;(loop (+ count 1) bad-sync-count start-time) + )) + + (db:sync-inmem->disk *dbstruct-db* *toppath* dbname force-sync: #t) + + (mutex-unlock! *heartbeat-mutex*) + + ;; when things go wrong we don't want to be doing the various + ;; queries too often so we strive to run this stuff only every + ;; four seconds or so. + (let* ((sync-time (- (current-milliseconds) start-time)) + (rem-time (quotient (- 4000 sync-time) 1000))) + (if (and (<= rem-time 4) + (> rem-time 0)) + (thread-sleep! rem-time))) + + ;; Transfer *db-last-access* to last-access to use in checking that we are still alive + (set! last-access *db-last-access*) + + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1) bad-sync-count (current-milliseconds))) + + (if (common:low-noise-print 60 "dbstats") + (begin + (debug:print 0 *default-log-port* "Server stats:") + (db:print-current-query-stats))) + (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600))) + (cond + ((not *server-run*) + (debug:print-info 0 *default-log-port* "*server-run* set to #f. Shutting down.") + (shutdown-server-sequence (get-host-name) port)) + ((timed-out?) + (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access)) + (shutdown-server-sequence (get-host-name) port)) + ((and *server-run* + (or (not (timed-out?)) + (if is-main ;; do not exit if there are other servers (keep main open until all others gone) + (> (rmt:get-count-servers sinfo *toppath*) 1) + #f))) + (if (common:low-noise-print 120 "server continuing") + (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access))) + (loop 0 bad-sync-count (current-milliseconds))) + (else + (set! *unclean-shutdown* #f) + (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access)) + (shutdown-server-sequence (get-host-name) port) + #;(debug:print-info 0 *default-log-port* "Sending 'quit to server, received: " + (open-send-receive-nn (conc iface":"port) ;; do this here and not in server-shutdown + (sexpr->string 'quit)))))))))) + +(define (rmt:get-reasonable-hostname) + (let* ((inhost (or (args:get-arg "-server") "-"))) + (if (equal? inhost "-") + (get-host-name) + inhost))) + +;; Call this to start the actual server +;; +;; all routes though here end in exit ... +;; +;; This is the point at which servers are started +;; +(define (rmt:server-launch dbname) + (debug:print-info 0 *default-log-port* "Entered rmt:server-launch") + (let* ((th2 (make-thread (lambda () + (debug:print-info 0 *default-log-port* "Server run thread started") + (rmt:run (rmt:get-reasonable-hostname))) + "Server run")) + (th3 (make-thread (lambda () + (debug:print-info 0 *default-log-port* "Server monitor thread started") + (if (args:get-arg "-server") + (rmt:keep-running dbname))) + "Keep running"))) + (thread-start! th2) + (thread-sleep! 0.252) ;; give the server time to settle before starting the keep-running monitor. + (thread-start! th3) + (set! *didsomething* #t) + (thread-join! th2) + (thread-join! th3)) + #f) + +;;====================================================================== +;; S E R V E R - D I R E C T C A L L S +;;====================================================================== + +(define (rmt:kill-server run-id) + (rmt:send-receive 'kill-server #f (list run-id))) + +(define (rmt:start-server run-id) + (rmt:send-receive 'start-server #f (list run-id))) + +(define (rmt:server-info apath dbname) + (rmt:send-receive 'get-server-info #f (list apath dbname))) + +;;====================================================================== +;; Nanomsg transport +;;====================================================================== + +#;(define (is-port-in-use port-num) + (let* ((ret #f)) + (let-values (((inp oup pid) + (process "netstat" (list "-tulpn" )))) + (let loop ((inl (read-line inp))) + (if (not (eof-object? inl)) + (begin + (if (string-search (regexp (conc ":" port-num)) inl) + (begin + ;(print "Output: " inl) + (set! ret #t)) + (loop (read-line inp))))))) + ret)) + +#;(define (open-nn-connection host-port) + (let ((req (make-req-socket)) + (uri (conc "tcp://" host-port))) + (nng-dial req uri) + (socket-set! req 'nng/recvtimeo 2000) + req)) + +#;(define (send-receive-nn req msg) + (nng-send req msg) + (nng-recv req)) + +#;(define (close-nn-connection req) + (nng-close! req)) + +;; ;; open connection to server, send message, close connection +;; ;; +;; (define (open-send-close-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds +;; (let ((req (make-req-socket 'req)) +;; (uri (conc "tcp://" host-port)) +;; (res #f) +;; ;; (contacts (alist-ref 'contact attrib)) +;; ;; (mode (alist-ref 'mode attrib)) +;; ) +;; (socket-set! req 'nng/recvtimeo 2000) +;; (handle-exceptions +;; exn +;; (let ((emsg ((condition-property-accessor 'exn 'message) exn))) +;; ;; Send notification +;; (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\"" ) +;; #f) +;; (nng-dial req uri) +;; ;; (print "Connected to the server " ) +;; (nng-send req msg) +;; ;; (print "Request Sent") +;; (let* ((th1 (make-thread (lambda () +;; (let ((resp (nng-recv req))) +;; (nng-close! req) +;; (set! res (if (equal? resp "ok") +;; #t +;; #f)))) +;; "recv thread")) +;; (th2 (make-thread (lambda () +;; (thread-sleep! timeout) +;; (thread-terminate! th1)) +;; "timer thread"))) +;; (thread-start! th1) +;; (thread-start! th2) +;; (thread-join! th1) +;; res)))) +;; +#;(define (open-send-receive-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds + (let ((req (make-req-socket)) + (uri (conc "tcp://" host-port)) + (res #f)) + (handle-exceptions + exn + (let ((emsg ((condition-property-accessor 'exn 'message) exn))) + ;; Send notification + (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\", exn=" exn) + #f) + (nng-dial req uri) + (nng-send req msg) + (let* ((th1 (make-thread (lambda () + (let ((resp (nng-recv req))) + (nng-close! req) + ;; (print resp) + (set! res resp))) + "recv thread")) + (th2 (make-thread (lambda () + (thread-sleep! timeout) + (thread-terminate! th1)) + "timer thread"))) + (thread-start! th1) + (thread-start! th2) + (thread-join! th1) + res)))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +;; run ping in separate process, safest way in some cases +;; +#;(define (server:ping-server ifaceport) + (with-input-from-pipe + (conc (common:get-megatest-exe) " -ping " ifaceport) + (lambda () + (let loop ((inl (read-line)) + (res "NOREPLY")) + (if (eof-object? inl) + (case (string->symbol res) + ((NOREPLY) #f) + ((LOGIN_OK) #t) + (else #f)) + (loop (read-line) inl)))))) + +;; NOT USED (well, ok, reference in rpc-transport but otherwise not used). +;; +#;(define (server:login toppath) + (lambda (toppath) + (set! *db-last-access* (current-seconds)) ;; might not be needed. + (if (equal? *toppath* toppath) + #t + #f))) + +;; (define server:sync-lock-token "SERVER_SYNC_LOCK") +;; (define (server:release-sync-lock) +;; (db:no-sync-del! *no-sync-db* server:sync-lock-token)) +;; (define (server:have-sync-lock?) +;; (let* ((have-lock-pair (db:no-sync-get-lock *no-sync-db* server:sync-lock-token)) +;; (have-lock? (car have-lock-pair)) +;; (lock-time (cdr have-lock-pair)) +;; (lock-age (- (current-seconds) lock-time))) +;; (cond +;; (have-lock? #t) +;; ((>lock-age +;; (* 3 (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 180))) +;; (server:release-sync-lock) +;; (server:have-sync-lock?)) +;; (else #f)))) + +) ADDED ulex-full/ulex.scm Index: ulex-full/ulex.scm ================================================================== --- /dev/null +++ ulex-full/ulex.scm @@ -0,0 +1,569 @@ +;; ulex: Distributed sqlite3 db +;;; +;; Copyright (C) 2018-2021 Matt Welland +;; Redistribution and use in source and binary forms, with or without +;; modification, is permitted. +;; +;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS +;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE +;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT +;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +;; DAMAGE. + +;;====================================================================== +;; ABOUT: +;; See README in the distribution at https://www.kiatoa.com/fossils/ulex +;; NOTES: +;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity. +;; +;;====================================================================== + +(module ulex + * + #;( + + ;; NOTE: looking for the handler proc - find the run-listener :) + + run-listener ;; (run-listener handler-proc [port]) => uconn + + ;; NOTE: handler-proc params; + ;; (handler-proc rem-host-port qrykey cmd params) + + send-receive ;; (send-receive uconn host-port cmd data) + + ;; NOTE: cmd can be any plain text symbol except for these; + ;; 'ping 'ack 'goodbye 'response + + set-work-handler ;; (set-work-handler proc) + + wait-and-close ;; (wait-and-close uconn) + + ulex-listener? + + ;; needed to get the interface:port that was automatically found + udat-port + udat-host-port + + ;; for testing only + ;; pp-uconn + + ;; parameters + work-method ;; parameter; 'threads, 'mailbox, 'limited, 'direct + return-method ;; parameter; 'mailbox, 'polling, 'direct + ) + +(import scheme + chicken.base + chicken.file + chicken.io + chicken.time + chicken.condition + chicken.string + chicken.sort + chicken.pretty-print + + address-info + mailbox + matchable + ;; queues + regex + regex-case + simple-exceptions + s11n + srfi-1 + srfi-18 + srfi-4 + srfi-69 + system-information + tcp6 + typed-records + ) + +;; udat struct, used by both caller and callee +;; instantiated as uconn by convention +;; +(defstruct udat + ;; the listener side + (port #f) + (host-port #f) + (socket #f) + ;; the peers + (peers (make-hash-table)) ;; host:port->peer + ;; work handling + (work-queue (make-mailbox)) + (work-proc #f) ;; set by user + (cnum 0) ;; cookie number + (mboxes (make-hash-table)) ;; for the replies + (avail-cmboxes '()) ;; list of ( . ) for re-use + ;; threads + (numthreads 10) + (cmd-thread #f) + (work-queue-thread #f) + (num-threads-running 0) + ) + +;; Parameters + +;; work-method: +(define work-method (make-parameter 'mailbox)) +;; mailbox - all rdat goes through mailbox +;; threads - all rdat immediately executed in new thread +;; direct - no queuing +;; + +;; return-method, return the result to waiting send-receive: +(define return-method (make-parameter 'mailbox)) +;; mailbox - create a mailbox and use it for passing returning results to send-receive +;; polling - put the result in a hash table keyed by qrykey and send-receive can poll it for result +;; direct - no queuing, result is passed back in single tcp connection +;; + +;; ;; struct for keeping track of others we are talking to +;; ;; +;; (defstruct pdat +;; (host-port #f) +;; (conns '()) ;; list of pcon structs, pop one off when calling the peer +;; ) +;; +;; ;; struct for peer connections, keep track of expiration etc. +;; ;; +;; (defstruct pcon +;; (inp #f) +;; (oup #f) +;; (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59) +;; (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes +;; ) + +;;====================================================================== +;; listener +;;====================================================================== + +;; is uconn a ulex connector (listener) +;; +(define (ulex-listener? uconn) + (udat? uconn)) + +;; create a tcp listener and return a populated udat struct with +;; my port, address, hostname, pid etc. +;; return #f if fail to find a port to allocate. +;; +;; if udata-in is #f create the record +;; if there is already a serv-listener return the udata +;; +(define (setup-listener uconn #!optional (port 4242)) + (handle-exceptions + exn + (if (< port 65535) + (setup-listener uconn (+ port 1)) + #f) + (connect-listener uconn port))) + +(define (connect-listener uconn port) + ;; (tcp-listener-socket LISTENER)(socket-name so) + ;; sockaddr-address, sockaddr-port, sockaddr->string + (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]]) + (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname))) + (udat-port-set! uconn port) + (udat-host-port-set! uconn (conc addr":"port)) + (udat-socket-set! uconn tlsn) + uconn)) + +;; run-listener does all the work of starting a listener in a thread +;; it then returns control +;; +(define (run-listener handler-proc #!optional (port-suggestion 4242)) + (let* ((uconn (make-udat))) + (udat-work-proc-set! uconn handler-proc) + (if (setup-listener uconn port-suggestion) + (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop")) + (th2 (make-thread (lambda () + (case (work-method) + ((mailbox limited) + (process-work-queue uconn)))) + "Ulex work queue processor"))) + ;; (tcp-buffer-size 2048) + (thread-start! th1) + (thread-start! th2) + (udat-cmd-thread-set! uconn th1) + (udat-work-queue-thread-set! uconn th2) + (print "cmd loop and process workers started, listening on "(udat-host-port uconn)".") + uconn) + (assert #f "ERROR: run-listener called without proper setup.")))) + +(define (wait-and-close uconn) + (thread-join! (udat-cmd-thread uconn)) + (tcp-close (udat-socket uconn))) + +;;====================================================================== +;; peers and connections +;;====================================================================== + +(define *send-mutex* (make-mutex)) + +;; send structured data to recipient +;; +;; NOTE: qrykey is what was called the "cookie" previously +;; +;; retval tells send to expect and wait for return data (one line) and return it or time out +;; this is for ping where we don't want to necessarily have set up our own server yet. +;; +;; NOTE: see below for beginnings of code to allow re-use of tcp connections +;; - I believe (without substantial evidence) that re-using connections will +;; be beneficial ... +;; +(define (send udata host-port qrykey cmd params) + (let* ((my-host-port (udat-host-port udata)) ;; remote will return to this + (isme #f #;(equal? host-port my-host-port)) ;; calling myself? + ;; dat is a self-contained work block that can be sent or handled locally + (dat (list my-host-port qrykey cmd params #;(cons (current-seconds)(current-milliseconds))))) + (cond + (isme (ulex-handler udata dat)) ;; no transmission needed + (else + (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC? + exn + (message exn) + (begin + ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP + (let-values (((inp oup)(tcp-connect host-port))) + (let ((res (if (and inp oup) + (begin + (serialize dat oup) + (close-output-port oup) + (deserialize inp) + ) + (begin + (print "ERROR: send called but no receiver has been setup. Please call setup first!") + #f)))) + (close-input-port inp) + ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP + res)))))))) ;; res will always be 'ack unless return-method is direct + +(define (send-via-polling uconn host-port cmd data) + (let* ((qrykey (make-cookie uconn)) + (sres (send uconn host-port qrykey cmd data))) + (case sres + ((ack) + (let loop ((start-time (current-milliseconds))) + (if (> (current-milliseconds)(+ start-time 10000)) ;; ten seconds timeout + (begin + (print "ULEX ERROR: timed out waiting for response from "host-port", "cmd" "data) + #f) + (let* ((result (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) ;; NOTE: we are re-using mboxes hash + (if result ;; result is '(status . result-data) or #f for nothing yet + (begin + (hash-table-delete! (udat-mboxes uconn) qrykey) + (cdr result)) + (begin + (thread-sleep! 0.01) + (loop start-time))))))) + (else + (print "ULEX ERROR: Communication failed? sres="sres) + #f)))) + +(define (send-via-mailbox uconn host-port cmd data) + (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse? + (qrykey (car cmbox)) + (mbox (cdr cmbox)) + (mbox-time (current-milliseconds)) + (sres (send uconn host-port qrykey cmd data))) ;; short res + (if (eq? sres 'ack) + (let* ((mbox-timeout-secs 120 #;(if (eq? 'primordial (thread-name (current-thread))) + #f + 120)) ;; timeout) + (mbox-timeout-result 'MBOX_TIMEOUT) + (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) + (mbox-receive-time (current-milliseconds))) + ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it? + (hash-table-delete! (udat-mboxes uconn) qrykey) + (if (eq? res 'MBOX_TIMEOUT) + (begin + (print "WARNING: mbox timed out for query "cmd", with data "data + ", waiting for response from "host-port".") + + ;; here it might make sense to clean up connection records and force clean start? + ;; NO. The progam using ulex needs to do the reset. Right thing here is exception + + #f) ;; convert to raising exception? + res)) + (begin + (print "ERROR: Communication failed? Got "sres) + #f)))) + +;; send a request to the given host-port and register a mailbox in udata +;; wait for the mailbox data and return it +;; +(define (send-receive uconn host-port cmd data) + (let* ((start-time (current-milliseconds)) + (result (cond + ((member cmd '(ping goodbye)) ;; these are immediate + (send uconn host-port 'ping cmd data)) + ((eq? (work-method) 'direct) + ;; the result from send will be the actual result, not an 'ack + (send uconn host-port 'direct cmd data)) + (else + (case (return-method) + ((polling) + (send-via-polling uconn host-port cmd data)) + ((mailbox) + (send-via-mailbox uconn host-port cmd data)) + (else + (print "ULEX ERROR: unrecognised return-method "(return-method)".") + #f))))) + (duration (- (current-milliseconds) start-time))) + ;; this is ONLY for development and debugging. It will be removed once Ulex is stable. + (if (< 5000 duration) + (print "ULEX WARNING: round-trip took "(inexact->exact (round (/ duration 1000))) + " seconds; "cmd", host-port="host-port", data="data)) + result)) + + +;;====================================================================== +;; responder side +;;====================================================================== + +;; take a request, rdat, and if not immediate put it in the work queue +;; +;; Reserved cmds; ack ping goodbye response +;; +(define (ulex-handler uconn rdat) + (assert (list? rdat) "FATAL: ulex-handler give rdat as not list") + (match rdat ;; (string-split controldat) + ((rem-host-port qrykey cmd params);; timedata) + ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params) + (case cmd + ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack) + ((ping) + ;; (print "Got Ping!") + ;; (add-to-work-queue uconn rdat) + 'ack) + ((goodbye) + ;; just clear out references to the caller. NOT COMPLETE + (add-to-work-queue uconn rdat) + 'ack) + ((response) ;; this is a result from remote processing, send it as mail ... + (case (return-method) + ((polling) + (hash-table-set! (udat-mboxes uconn) qrykey (cons 'ok params)) + 'ack) + ((mailbox) + (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) + (if mbox + (begin + (mailbox-send! mbox params) ;; params here is our result + 'ack) + (begin + (print "ERROR: received result but no associated mbox for cookie "qrykey) + 'no-mbox-found)))) + (else (print "ULEX ERROR: unrecognised return-method "(return-method)) + 'bad-return-method))) + (else ;; generic request - hand it to the work queue + (add-to-work-queue uconn rdat) + 'ack))) + (else + (print "ULEX ERROR: bad rdat "rdat) + 'bad-rdat))) + +;; given an already set up uconn start the cmd-loop +;; +(define (ulex-cmd-loop uconn) + (let* ((serv-listener (udat-socket uconn)) + (listener (lambda () + (let loop ((state 'start)) + (let-values (((inp oup)(tcp-accept serv-listener))) + ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP + (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params) + (resp (ulex-handler uconn rdat))) + (serialize resp oup) + (close-input-port inp) + (close-output-port oup) + ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP + ) + (loop state)))))) + ;; start N of them + (let loop ((thnum 0) + (threads '())) + (if (< thnum 100) + (let* ((th (make-thread listener (conc "listener" thnum)))) + (thread-start! th) + (loop (+ thnum 1) + (cons th threads))) + (map thread-join! threads))))) + +;; add a proc to the cmd list, these are done symetrically (i.e. in all instances) +;; so that the proc can be dereferenced remotely +;; +(define (set-work-handler uconn proc) + (udat-work-proc-set! uconn proc)) + +;;====================================================================== +;; work queues - this is all happening on the listener side +;;====================================================================== + +;; rdat is (rem-host-port qrykey cmd params) + +(define (add-to-work-queue uconn rdat) + #;(queue-add! (udat-work-queue uconn) rdat) + (case (work-method) + ((threads) + (thread-start! (make-thread (lambda () + (do-work uconn rdat)) + "worker thread"))) + ((mailbox) + (mailbox-send! (udat-work-queue uconn) rdat)) + ((direct) + (do-work uconn rdat)) + (else + (print "ULEX ERROR: work-method "(work-method)" not recognised, using mailbox.") + (mailbox-send! (udat-work-queue uconn) rdat)))) + +;; move the logic to return the result somewhere else? +;; +(define (do-work uconn rdat) + (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change + ;; put this following into a do-work procedure + (match rdat + ((rem-host-port qrykey cmd params) + (let* ((start-time (current-milliseconds)) + (result (proc rem-host-port qrykey cmd params)) + (end-time (current-milliseconds)) + (run-time (- end-time start-time))) + (case (work-method) + ((direct) result) + (else + (print "ULEX: work "cmd", "params" done in "run-time" ms") + ;; send 'response as cmd and result as params + (send uconn rem-host-port qrykey 'response result) ;; could check for ack + (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time)))))) + (MBOX_TIMEOUT 'do-work-timeout) + (else + (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params"))))) + +;; NEW APPROACH: +;; +(define (process-work-queue uconn) + (let ((wqueue (udat-work-queue uconn)) + (proc (udat-work-proc uconn)) + (numthr (udat-numthreads uconn))) + (let loop ((thnum 1) + (threads '())) + (let ((thlst (cons (make-thread (lambda () + (let work-loop () + (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT))) + (do-work uconn rdat)) + (work-loop))) + (conc "work thread " thnum)) + threads))) + (if (< thnum numthr) + (loop (+ thnum 1) + thlst) + (begin + (print "ULEX: Starting "(length thlst)" worker threads.") + (map thread-start! thlst) + (print "ULEX: Threads started. Joining all.") + (map thread-join! thlst))))))) + +;; below was to enable re-use of connections. This seems non-trivial so for +;; now lets open on each call +;; +;; ;; given host-port get or create peer struct +;; ;; +;; (define (udat-get-peer uconn host-port) +;; (or (hash-table-ref/default (udat-peers uconn) host-port #f) +;; ;; no peer, so create pdat and init it +;; +;; ;; NEED stack of connections, pop and use; inp, oup, +;; ;; creation_time (remove and create new if over 24hrs old +;; ;; +;; (let ((pdat (make-pdat host-port: host-port))) +;; (hash-table-set! (udat-peers uconn) host-port pdat) +;; pdat))) +;; +;; ;; is pcon alive +;; +;; ;; given host-port and pdat get a pcon +;; ;; +;; (define (pdat-get-pcon pdat host-port) +;; (let loop ((conns (pdat-conns pdat))) +;; (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later +;; (init-pcon (make-pcon)) +;; (let* ((conn (pop conns))) +;; +;; ;; given host-port get a pcon struct +;; ;; +;; (define (udat-get-pcon + +;;====================================================================== +;; misc utils +;;====================================================================== + +(define (make-cookie uconn) + (let ((newcnum (+ (udat-cnum uconn) 1))) + (udat-cnum-set! uconn newcnum) + (conc (udat-host-port uconn) ":" + newcnum))) + +;; cookie/mboxes + +;; we store each mbox with a cookie ( . ) +;; +(define (get-cmbox uconn) + (if (null? (udat-avail-cmboxes uconn)) + (let ((cookie (make-cookie uconn)) + (mbox (make-mailbox))) + (hash-table-set! (udat-mboxes uconn) cookie mbox) + `(,cookie . ,mbox)) + (let ((cmbox (car (udat-avail-cmboxes uconn)))) + (udat-avail-cmboxes-set! uconn (cdr (udat-avail-cmboxes uconn))) + cmbox))) + +(define (put-cmbox uconn cmbox) + (udat-avail-cmboxes-set! uconn (cons cmbox (udat-avail-cmboxes uconn)))) + +(define (pp-uconn uconn) + (pp (udat->alist uconn))) + + +;;====================================================================== +;; network utilities +;;====================================================================== + +;; NOTE: Look at address-info egg as alternative to some of this + +(define (rate-ip ipaddr) + (regex-case ipaddr + ( "^127\\..*" _ 0 ) + ( "^(10\\.0|192\\.168)\\..*" _ 1 ) + ( else 2 ) )) + +;; Change this to bias for addresses with a reasonable broadcast value? +;; +(define (ip-pref-less? a b) + (> (rate-ip a) (rate-ip b))) + +(define (get-my-best-address) + (let ((all-my-addresses (get-all-ips))) + (cond + ((null? all-my-addresses) + (get-host-name)) ;; no interfaces? + ((eq? (length all-my-addresses) 1) + (car all-my-addresses)) ;; only one to choose from, just go with it + (else + (car (sort all-my-addresses ip-pref-less?)))))) + +(define (get-all-ips-sorted) + (sort (get-all-ips) ip-pref-less?)) + +(define (get-all-ips) + (map address-info-host + (filter (lambda (x) + (equal? (address-info-type x) "tcp")) + (address-infos (get-host-name))))) + +) ADDED ulex-none/dbmgr.scm Index: ulex-none/dbmgr.scm ================================================================== --- /dev/null +++ ulex-none/dbmgr.scm @@ -0,0 +1,1123 @@ +;;====================================================================== +;; Copyright 2022, Matthew Welland. +;; +;; This file is part of Megatest. +;; +;; Megatest is free software: you can redistribute it and/or modify +;; it under the terms of the GNU General Public License as published by +;; the Free Software Foundation, either version 3 of the License, or +;; (at your option) any later version. +;; +;; Megatest is distributed in the hope that it will be useful, +;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +;; GNU General Public License for more details. +;; +;; You should have received a copy of the GNU General Public License +;; along with Megatest. If not, see . + +;;====================================================================== + +(declare (unit dbmgrmod)) + +(declare (uses ulex)) +(declare (uses apimod)) +(declare (uses pkts)) +(declare (uses commonmod)) +(declare (uses dbmod)) +(declare (uses mtargs)) +(declare (uses portloggermod)) +(declare (uses debugprint)) + +(module dbmgrmod + * + +(import scheme + chicken.base + chicken.condition + chicken.file + chicken.format + chicken.port + chicken.process + chicken.process-context + chicken.process-context.posix + chicken.sort + chicken.string + chicken.time + + (prefix sqlite3 sqlite3:) + matchable + md5 + message-digest + regex + s11n + srfi-1 + srfi-18 + srfi-69 + system-information + typed-records + + pkts + ulex + + commonmod + apimod + dbmod + debugprint + (prefix mtargs args:) + portloggermod + ) + +;; ;; Configurations for server +;; ;; (tcp-buffer-size 2048) +;; ;; (max-connections 2048) +;; +;; ;; info about me as a listener and my connections to db servers +;; ;; stored (for now) in *db-serv-info* +;; ;; +;; (defstruct servdat +;; (host #f) +;; (port #f) +;; (uuid #f) +;; (dbfile #f) +;; (uconn #f) ;; this is the listener *FOR THIS PROCESS* +;; (mode #f) +;; (status 'starting) +;; (trynum 0) ;; count the number of ports we've tried +;; (conns (make-hash-table)) ;; apath/dbname => conndat +;; ) +;; +;; (define *db-serv-info* (make-servdat)) +;; +;; (define (servdat->url sdat) +;; (conc (servdat-host sdat)":"(servdat-port sdat))) +;; +;; ;; db servers contact info +;; ;; +;; (defstruct conndat +;; (apath #f) +;; (dbname #f) +;; (fullname #f) +;; (hostport #f) +;; (ipaddr #f) +;; (port #f) +;; (srvpkt #f) +;; (srvkey #f) +;; (lastmsg 0) +;; (expires 0)) +;; +;; (define *srvpktspec* +;; `((server (host . h) +;; (port . p) +;; (servkey . k) +;; (pid . i) +;; (ipaddr . a) +;; (dbpath . d)))) +;; +;; ;;====================================================================== +;; ;; S U P P O R T F U N C T I O N S +;; ;;====================================================================== +;; +;; ;; set up the api proc, seems like there should be a better place for this? +;; ;; +;; ;; IS THIS NEEDED ANYMORE? TODO - REMOVE IF POSSIBLE +;; ;; +;; ;; (define api-proc (make-parameter conc)) +;; ;; (api-proc api:execute-requests) +;; +;; ;; do we have a connection to apath dbname and +;; ;; is it not expired? then return it +;; ;; +;; ;; else setup a connection +;; ;; +;; ;; if that fails, return '(#f "some reason") ;; NB// convert to raising an exception +;; ;; +;; (define (rmt:get-conn remdat apath dbname) +;; (let* ((fullname (db:dbname->path apath dbname))) +;; (hash-table-ref/default (servdat-conns remdat) fullname #f))) +;; +;; (define (rmt:drop-conn remdat apath dbname) +;; (let* ((fullname (db:dbname->path apath dbname))) +;; (hash-table-delete! (servdat-conns remdat) fullname))) +;; +;; (define (rmt:find-main-server uconn apath dbname) +;; (let* ((pktsdir (get-pkts-dir apath)) +;; (all-srvpkts (get-all-server-pkts pktsdir *srvpktspec*)) +;; (viable-srvs (get-viable-servers all-srvpkts dbname))) +;; (get-the-server uconn apath viable-srvs))) +;; +;; +;; (define *connstart-mutex* (make-mutex)) +;; (define *last-main-start* 0) +;; +;; ;; looks for a connection to main, returns if have and not exired +;; ;; creates new otherwise +;; ;; +;; ;; connections for other servers happens by requesting from main +;; ;; +;; ;; TODO: This is unnecessarily re-creating the record in the hash table +;; ;; +;; (define (rmt:open-main-connection remdat apath) +;; (let* ((fullpath (db:dbname->path apath ".db/main.db")) +;; (conns (servdat-conns remdat)) +;; (conn (rmt:get-conn remdat apath ".db/main.db")) ;; (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this +;; (start-rmt:run (lambda () +;; (let* ((th1 (make-thread (lambda ()(rmt:run (get-host-name))) "non-db mode server"))) +;; (thread-start! th1) +;; (thread-sleep! 1) +;; (let loop ((count 0)) +;; (assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection") +;; (if (or (not *db-serv-info*) +;; (not (servdat-uconn *db-serv-info*))) +;; (begin +;; (thread-sleep! 1) +;; (loop (+ count 1))) +;; (begin +;; (servdat-mode-set! *db-serv-info* 'non-db) +;; (servdat-uconn *db-serv-info*))))))) +;; (myconn (servdat-uconn *db-serv-info*))) +;; (cond +;; ((not myconn) +;; (start-rmt:run) +;; (rmt:open-main-connection remdat apath)) +;; ((and conn ;; conn is NOT a socket, just saying ... +;; (< (current-seconds) (conndat-expires conn))) +;; #t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died +;; ((and conn +;; (>= (current-seconds)(conndat-expires conn))) +;; (debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.") +;; (rmt:drop-conn remdat apath ".db/main.db") ;; +;; (rmt:open-main-connection remdat apath)) +;; (else +;; ;; Below we will find or create and connect to main +;; (debug:print-info 0 *default-log-port* "rmt:open-main-connection - starting from scratch") +;; (let* ((dbname (db:run-id->dbname #f)) +;; (the-srv (rmt:find-main-server myconn apath dbname)) +;; (start-main-srv (lambda () ;; call IF there is no the-srv found +;; (mutex-lock! *connstart-mutex*) +;; (if (> (- (current-seconds) *last-main-start*) 5) ;; at least four seconds since last attempt to start main server +;; (begin +;; (api:run-server-process apath dbname) +;; (set! *last-main-start* (current-seconds)) +;; (thread-sleep! 1)) +;; (thread-sleep! 0.25)) +;; (mutex-unlock! *connstart-mutex*) +;; (rmt:open-main-connection remdat apath) ;; TODO: Add limit to number of tries +;; ))) +;; (if (not the-srv) ;; have server, try connecting to it +;; (start-main-srv) +;; (let* ((srv-addr (server-address the-srv)) ;; need serv +;; (ipaddr (alist-ref 'ipaddr the-srv)) +;; (port (alist-ref 'port the-srv)) +;; (srvkey (alist-ref 'servkey the-srv)) +;; (fullpath (db:dbname->path apath dbname)) +;; +;; (new-the-srv (make-conndat +;; apath: apath +;; dbname: dbname +;; fullname: fullpath +;; hostport: srv-addr +;; ;; socket: (open-nn-connection srv-addr) - TODO - open ulex connection? +;; ipaddr: ipaddr +;; port: port +;; srvpkt: the-srv +;; srvkey: srvkey ;; generated by rmt:get-signature on the server side +;; lastmsg: (current-seconds) +;; expires: (+ (current-seconds) +;; (server:expiration-timeout) +;; -2) ;; this needs to be gathered during the ping +;; ))) +;; (hash-table-set! conns fullpath new-the-srv))) +;; #t))))) +;; +;; ;; NB// sinfo is a servdat struct +;; ;; +;; (define (rmt:general-open-connection sinfo apath dbname #!key (num-tries 5)) +;; (assert (not (equal? dbname ".db/main.db")) "ERROR: general-open-connection should never be called with main as the db") +;; (let* ((mdbname ".db/main.db") ;; (db:run-id->dbname #f)) TODO: put this back to the lookup when stable +;; (fullname (db:dbname->path apath dbname)) +;; (conns (servdat-conns sinfo)) +;; (mconn (rmt:get-conn sinfo apath ".db/main.db")) +;; (dconn (rmt:get-conn sinfo apath dbname))) +;; #;(if (and mconn +;; (not (debug:print-logger))) +;; (begin +;; (debug:print-info 0 *default-log-port* "Turning on logging to main, look in logs dir for main log.") +;; (debug:print-logger rmt:log-to-main))) +;; (cond +;; ((and mconn +;; dconn +;; (< (current-seconds)(conndat-expires dconn))) +;; #t) ;; good to go +;; ((not mconn) ;; no channel open to main? open it... +;; (rmt:open-main-connection sinfo apath) +;; (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1))) +;; ((not dconn) ;; no channel open to dbname? +;; (let* ((res (rmt:send-receive-real sinfo apath mdbname 'get-server `(,apath ,dbname)))) +;; (case res +;; ((server-started) +;; (if (> num-tries 0) +;; (begin +;; (thread-sleep! 2) +;; (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1))) +;; (begin +;; (debug:print-error 0 *default-log-port* "Failed to start servers needed or open channel to "apath", "dbname) +;; (exit 1)))) +;; (else +;; (if (list? res) ;; server has been registered and the info was returned. pass it on. +;; (begin ;; ("192.168.0.9" 53817 +;; ;; "5e34239f48e8973b3813221e54701a01" "24310" +;; ;; "192.168.0.9" +;; ;; "/home/matt/data/megatest/tests/simplerun" +;; ;; ".db/1.db") +;; (match +;; res +;; ((host port servkey pid ipaddr apath dbname) +;; (debug:print-info 0 *default-log-port* "got "res) +;; (hash-table-set! conns +;; fullname +;; (make-conndat +;; apath: apath +;; dbname: dbname +;; hostport: (conc host":"port) +;; ;; socket: (open-nn-connection (conc host":"port)) ;; TODO - open ulex connection? +;; ipaddr: ipaddr +;; port: port +;; srvkey: servkey +;; lastmsg: (current-seconds) +;; expires: (+ (current-seconds) +;; (server:expiration-timeout) +;; -2)))) +;; (else +;; (debug:print-info 0 *default-log-port* "return data from starting server did not match host port servkey pid ipaddr apath dbname " res))) +;; res) +;; (begin +;; (debug:print-info 0 *default-log-port* "Unexpected result: " res) +;; res))))))) +;; #t)) +;; +;; ;;====================================================================== +;; +;; ;; FOR DEBUGGING SET TO #t +;; ;; (define *localmode* #t) +;; (define *localmode* #f) +(define *dbstruct* (make-dbr:dbstruct)) + +;; Defaults to current area +;; +(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) + (let* ((apath *toppath*) + (dbname (db:run-id->dbname rid))) + (api:execute-requests *dbstruct* cmd params))) + +;; ;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed +;; ;; sometime in the future +;; ;; +;; (define (rmt:send-receive-real sinfo apath dbname cmd params) +;; (assert (not (eq? 'primordial (thread-name (current-thread)))) "FATAL: Do not call rmt:send-receive-real in the primodial thread.") +;; (let* ((cdat (rmt:get-conn sinfo apath dbname))) +;; (assert cdat "FATAL: rmt:send-receive-real called without the needed channels opened") +;; (let* ((uconn (servdat-uconn sinfo)) ;; get the interface to ulex +;; ;; then send-receive using the ulex layer to host-port stored in cdat +;; (res (send-receive uconn (conndat-hostport cdat) cmd params)) +;; #;(th1 (make-thread (lambda () +;; (set! res (send-receive uconn (conndat-hostport cdat) cmd params))) +;; "send-receive thread"))) +;; ;; (thread-start! th1) +;; ;; (thread-join! th1) ;; gratuitious thread stuff is so that mailbox is not used in primordial thead +;; ;; since we accessed the server we can bump the expires time up +;; (conndat-expires-set! cdat (+ (current-seconds) +;; (server:expiration-timeout) +;; -2)) ;; two second margin for network time misalignments etc. +;; res))) +;; +;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed +;; sometime in the future. +;; +;; Purpose - call the main.db server and request a server be started +;; for the given area path and dbname +;; + +(define (rmt:print-db-stats) + (let ((fmtstr "~40a~7-d~9-d~20,2-f")) ;; "~20,2-f" + (debug:print 18 *default-log-port* "DB Stats, "(seconds->year-week/day-time (current-seconds))"\n=====================") + (debug:print 18 *default-log-port* (format #f "~40a~8a~10a~10a" "Cmd" "Count" "TotTime" "Avg")) + (for-each (lambda (cmd) + (let ((cmd-dat (hash-table-ref *db-stats* cmd))) + (debug:print 18 *default-log-port* (format #f fmtstr cmd (vector-ref cmd-dat 0) (vector-ref cmd-dat 1) (/ (vector-ref cmd-dat 1)(vector-ref cmd-dat 0)))))) + (sort (hash-table-keys *db-stats*) + (lambda (a b) + (> (vector-ref (hash-table-ref *db-stats* a) 0) + (vector-ref (hash-table-ref *db-stats* b) 0))))))) + +(define (rmt:get-max-query-average run-id) + (mutex-lock! *db-stats-mutex*) + (let* ((runkey (conc "run-id=" run-id " ")) + (cmds (filter (lambda (x) + (substring-index runkey x)) + (hash-table-keys *db-stats*))) + (res (if (null? cmds) + (cons 'none 0) + (let loop ((cmd (car cmds)) + (tal (cdr cmds)) + (max-cmd (car cmds)) + (res 0)) + (let* ((cmd-dat (hash-table-ref *db-stats* cmd)) + (tot (vector-ref cmd-dat 0)) + (curravg (/ (vector-ref cmd-dat 1) (vector-ref cmd-dat 0))) ;; count is never zero by construction + (currmax (max res curravg)) + (newmax-cmd (if (> curravg res) cmd max-cmd))) + (if (null? tal) + (if (> tot 10) + (cons newmax-cmd currmax) + (cons 'none 0)) + (loop (car tal)(cdr tal) newmax-cmd currmax))))))) + (mutex-unlock! *db-stats-mutex*) + res)) + +;; ;; host and port are used to ensure we are remove proper records +;; (define (rmt:server-shutdown host port) +;; (let ((dbfile (servdat-dbfile *db-serv-info*))) +;; (debug:print-info 0 *default-log-port* "dbfile is "dbfile) +;; (if dbfile +;; (let* ((am-server (args:get-arg "-server")) +;; (dbfile (args:get-arg "-db")) +;; (apath *toppath*) +;; #;(sinfo *remotedat*)) ;; foundation for future fix +;; (if *dbstruct-db* +;; (let* ((dbdat (db:get-dbdat *dbstruct-db* apath dbfile)) +;; (db (dbr:dbdat-db dbdat)) +;; (inmem (dbr:dbdat-db dbdat)) ;; WRONG +;; ) +;; ;; do a final sync here +;; (debug:print-info 0 *default-log-port* "Doing final sync for "apath" "dbfile" at "(current-seconds)) +;; (db:sync-inmem->disk *dbstruct-db* apath dbfile force-sync: #t) +;; ;; let's finalize here +;; (debug:print-info 0 *default-log-port* "Finalizing db and inmem") +;; (if (sqlite3:database? db) +;; (sqlite3:finalize! db) +;; (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, db is not a database, not finalizing...")) +;; (if (sqlite3:database? inmem) +;; (sqlite3:finalize! inmem) +;; (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, inmem is not a database, not finalizing...")) +;; (debug:print-info 0 *default-log-port* "Finalizing db and inmem complete")) +;; (debug:print-info 0 *default-log-port* "Db was never opened, no cleanup to do.")) +;; (if (not am-server) +;; (debug:print-info 0 *default-log-port* "I am not a server, should NOT get here!") +;; (if (string-match ".*/main.db$" dbfile) +;; (let ((pkt-file (conc (get-pkts-dir *toppath*) +;; "/" (servdat-uuid *db-serv-info*) +;; ".pkt"))) +;; (debug:print-info 0 *default-log-port* "removing pkt "pkt-file) +;; (delete-file* pkt-file) +;; (debug:print-info 0 *default-log-port* "Releasing lock (if any) for "dbfile ", host "host", port "port) +;; (db:with-lock-db +;; (servdat-dbfile *db-serv-info*) +;; (lambda (dbh dbfile) +;; (db:release-lock dbh dbfile host port)))) ;; I'm not the server - should not have a lock to remove +;; (let* ((sdat *db-serv-info*) ;; we have a run-id server +;; (host (servdat-host sdat)) +;; (port (servdat-port sdat)) +;; (uuid (servdat-uuid sdat)) +;; (res (rmt:deregister-server *db-serv-info* *toppath* host port uuid dbfile))) +;; (debug:print-info 0 *default-log-port* "deregistered-server, res="res) +;; (debug:print-info 0 *default-log-port* "deregistering server "host":"port" with uuid "uuid) +;; ))))))) +;; +;; +;; (define (common:run-sync?) +;; ;; (and (common:on-homehost?) +;; (args:get-arg "-server")) +;; +;; (define *rmt:run-mutex* (make-mutex)) +;; (define *rmt:run-flag* #f) +;; +;; ;; Main entry point to start a server. was start-server +;; (define (rmt:run hostn) +;; (mutex-lock! *rmt:run-mutex*) +;; (if *rmt:run-flag* +;; (begin +;; (debug:print-warn 0 *default-log-port* "rmt:run already running.") +;; (mutex-unlock! *rmt:run-mutex*)) +;; (begin +;; (set! *rmt:run-flag* #t) +;; (mutex-unlock! *rmt:run-mutex*) +;; ;; ;; Configurations for server +;; ;; (tcp-buffer-size 2048) +;; ;; (max-connections 2048) +;; (debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...") +;; (if (and *db-serv-info* +;; (servdat-uconn *db-serv-info*)) +;; (let* ((uconn (servdat-uconn *db-serv-info*))) +;; (wait-and-close uconn)) +;; (let* ((port (portlogger:open-run-close portlogger:find-port)) +;; (handler-proc (lambda (rem-host-port qrykey cmd params) ;; +;; (set! *db-last-access* (current-seconds)) +;; (assert (list? params) "FATAL: handler called with non-list params") +;; (assert (args:get-arg "-server") "FATAL: handler called on non-server side. cmd="cmd", params="params) +;; (debug:print 0 *default-log-port* "handler call: "cmd", params="params) +;; (api:execute-requests *dbstruct-db* cmd params)))) +;; ;; (api:process-request *dbstuct-db* +;; (if (not *db-serv-info*) +;; (set! *db-serv-info* (make-servdat host: hostn port: port))) +;; (let* ((uconn (run-listener handler-proc port)) +;; (rport (udat-port uconn))) ;; the real port +;; (servdat-host-set! *db-serv-info* hostn) +;; (servdat-port-set! *db-serv-info* rport) +;; (servdat-uconn-set! *db-serv-info* uconn) +;; (wait-and-close uconn) +;; (db:print-current-query-stats) +;; ))) +;; (let* ((host (servdat-host *db-serv-info*)) +;; (port (servdat-port *db-serv-info*)) +;; (mode (or (servdat-mode *db-serv-info*) +;; "non-db"))) +;; ;; server exit stuff here +;; ;; (rmt:server-shutdown host port) - always do in on-exit +;; ;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit +;; (debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting") +;; )))) +;; +;; ;;====================================================================== +;; ;; S E R V E R U T I L I T I E S +;; ;;====================================================================== +;; +;; +;; ;;====================================================================== +;; ;; NEW SERVER METHOD +;; ;;====================================================================== +;; +;; ;; only use for main.db - need to re-write some of this :( +;; ;; +;; (define (get-lock-db sdat dbfile host port) +;; (assert host "FATAL: get-lock-db called with host not set.") +;; (assert port "FATAL: get-lock-db called with port not set.") +;; (let* ((dbh (db:open-run-db dbfile db:initialize-db)) ;; open-run-db creates a standard db with schema used by all situations +;; (res (db:get-iam-server-lock dbh dbfile host port)) +;; (uconn (servdat-uconn sdat))) +;; ;; res => list then already locked, check server is responsive +;; ;; => #t then sucessfully got the lock +;; ;; => #f reserved for future use as to indicate something went wrong +;; (match res +;; ((owner_pid owner_host owner_port event_time) +;; (if (server-ready? uconn (conc owner_host":"owner_port) "abc") +;; #f ;; locked by someone else +;; (begin ;; locked by someone dead and gone +;; (debug:print 0 *default-log-port* "WARNING: stale lock - have to steal it. This may fail.") +;; (db:steal-lock-db dbh dbfile port)))) +;; (#t #t) ;; placeholder so that we don't touch res if it is #t +;; (else (set! res #f))) +;; (sqlite3:finalize! dbh) +;; res)) +;; +;; +;; (define (register-server pkts-dir pkt-spec host port servkey ipaddr dbpath) +;; (let* ((pkt-dat `((host . ,host) +;; (port . ,port) +;; (servkey . ,servkey) +;; (pid . ,(current-process-id)) +;; (ipaddr . ,ipaddr) +;; (dbpath . ,dbpath))) +;; (uuid (write-alist->pkt +;; pkts-dir +;; pkt-dat +;; pktspec: pkt-spec +;; ptype: 'server))) +;; (debug:print 0 *default-log-port* "Server on "host":"port" registered in pkt "uuid) +;; uuid)) +;; +;; (define (get-pkts-dir #!optional (apath #f)) +;; (let* ((effective-toppath (or *toppath* apath))) +;; (assert effective-toppath +;; "ERROR: get-pkts-dir called without *toppath* set. Exiting.") +;; (let* ((pdir (conc effective-toppath "/.meta/srvpkts"))) +;; (if (file-exists? pdir) +;; pdir +;; (begin +;; (handle-exceptions ;; this exception handler should NOT be needed but ... +;; exn +;; pdir +;; (create-directory pdir #t)) +;; pdir))))) +;; +;; ;; given a pkts dir read +;; ;; +;; (define (get-all-server-pkts pktsdir-in pktspec) +;; (let* ((pktsdir (if (file-exists? pktsdir-in) +;; pktsdir-in +;; (begin +;; (create-directory pktsdir-in #t) +;; pktsdir-in))) +;; (all-pkt-files (glob (conc pktsdir "/*.pkt")))) +;; (map (lambda (pkt-file) +;; (read-pkt->alist pkt-file pktspec: pktspec)) +;; all-pkt-files))) +;; +;; (define (server-address srv-pkt) +;; (conc (alist-ref 'host srv-pkt) ":" +;; (alist-ref 'port srv-pkt))) + +(define (server-ready? uconn host-port key) ;; server-address is host:port + #;(let* ((params `((cmd . ping)(key . ,key))) + (data `((cmd . ping) + (key . ,key) + (params . ,params))) ;; I don't get it. + (res (send-receive uconn host-port 'ping data))) + (if (eq? res 'ack) ;; yep, likely it is who we want on the other end + res + #f)) + #t) + +;; ; from the pkts return servers associated with dbpath +;; ;; NOTE: Only one can be alive - have to check on each +;; ;; in the list of pkts returned +;; ;; +;; (define (get-viable-servers serv-pkts dbpath) +;; (let loop ((tail serv-pkts) +;; (res '())) +;; (if (null? tail) +;; res ;; NOTE: sort by age so oldest is considered first +;; (let* ((spkt (car tail))) +;; (loop (cdr tail) +;; (if (equal? dbpath (alist-ref 'dbpath spkt)) +;; (cons spkt res) +;; res)))))) +;; +;; (define (remove-pkts-if-not-alive uconn serv-pkts) +;; (filter (lambda (pkt) +;; (let* ((host (alist-ref 'host pkt)) +;; (port (alist-ref 'port pkt)) +;; (host-port (conc host":"port)) +;; (key (alist-ref 'servkey pkt)) +;; (pktz (alist-ref 'Z pkt)) +;; (res (server-ready? uconn host-port key))) +;; (if res +;; res +;; (let* ((pktsdir (get-pkts-dir *toppath*)) +;; (pktpath (conc pktsdir"/"pktz".pkt"))) +;; (debug:print 0 *default-log-port* "WARNING: pkt with no server "pktpath) +;; (delete-file* pktpath) +;; #f)))) +;; serv-pkts)) +;; +;; ;; from viable servers get one that is alive and ready +;; ;; +;; (define (get-the-server uconn apath serv-pkts) +;; (let loop ((tail serv-pkts)) +;; (if (null? tail) +;; #f +;; (let* ((spkt (car tail)) +;; (host (alist-ref 'ipaddr spkt)) +;; (port (alist-ref 'port spkt)) +;; (host-port (conc host":"port)) +;; (dbpth (alist-ref 'dbpath spkt)) +;; (srvkey (alist-ref 'Z spkt)) ;; (alist-ref 'srvkey spkt)) +;; (addr (server-address spkt))) +;; (if (server-ready? uconn host-port srvkey) +;; spkt +;; (loop (cdr tail))))))) +;; +;; ;; am I the "first" in line server? I.e. my D card is smallest +;; ;; use Z card as tie breaker +;; ;; +;; (define (get-best-candidate serv-pkts dbpath) +;; (if (null? serv-pkts) +;; #f +;; (let loop ((tail serv-pkts) +;; (best (car serv-pkts))) +;; (if (null? tail) +;; best +;; (let* ((candidate (car tail)) +;; (candidate-bd (string->number (alist-ref 'D candidate))) +;; (best-bd (string->number (alist-ref 'D best))) +;; ;; bigger number is younger +;; (candidate-z (alist-ref 'Z candidate)) +;; (best-z (alist-ref 'Z best)) +;; (new-best (cond +;; ((> best-bd candidate-bd) ;; best is younger than candidate +;; candidate) +;; ((< best-bd candidate-bd) ;; candidate is younger than best +;; best) +;; (else +;; (if (string>=? best-z candidate-z) +;; best +;; candidate))))) ;; use Z card as tie breaker +;; (if (null? tail) +;; new-best +;; (loop (cdr tail) new-best))))))) +;; +;; +;; ;;====================================================================== +;; ;; END NEW SERVER METHOD +;; ;;====================================================================== +;; +;; ;; if .db/main.db check the pkts +;; ;; +;; (define (rmt:wait-for-server pkts-dir db-file server-key) +;; (let* ((sdat *db-serv-info*)) +;; (let loop ((start-time (current-seconds)) +;; (changed #t) +;; (last-sdat "not this")) +;; (begin ;; let ((sdat #f)) +;; (thread-sleep! 0.01) +;; (debug:print-info 0 *default-log-port* "Waiting for server alive signature") +;; (mutex-lock! *heartbeat-mutex*) +;; (set! sdat *db-serv-info*) +;; (mutex-unlock! *heartbeat-mutex*) +;; (if (and sdat +;; (not changed) +;; (> (- (current-seconds) start-time) 2)) +;; (let* ((uconn (servdat-uconn sdat))) +;; (servdat-status-set! sdat 'iface-stable) +;; (debug:print-info 0 *default-log-port* "Received server alive signature, now attempting to lock in server") +;; ;; create a server pkt in *toppath*/.meta/srvpkts +;; +;; ;; TODO: +;; ;; 1. change sdat to stuct +;; ;; 2. add uuid to struct +;; ;; 3. update uuid in sdat here +;; ;; +;; (servdat-uuid-set! sdat +;; (register-server +;; pkts-dir *srvpktspec* +;; (get-host-name) +;; (servdat-port sdat) server-key +;; (servdat-host sdat) db-file)) +;; ;; (set! *my-signature* (servdat-uuid sdat)) ;; replace with Z, no, stick with proper key +;; ;; now read pkts and see if we are a contender +;; (let* ((all-pkts (get-all-server-pkts pkts-dir *srvpktspec*)) +;; (viables (get-viable-servers all-pkts db-file)) +;; (alive (remove-pkts-if-not-alive uconn viables)) +;; (best-srv (get-best-candidate alive db-file)) +;; (best-srv-key (if best-srv (alist-ref 'servkey best-srv) #f)) +;; (i-am-srv (equal? best-srv-key server-key)) +;; (delete-pkt (lambda () +;; (let* ((pktfile (conc (get-pkts-dir *toppath*) +;; "/" (servdat-uuid *db-serv-info*) +;; ".pkt"))) +;; (debug:print-info 0 *default-log-port* "Attempting to remove bogus pkt file "pktfile) +;; (delete-file* pktfile))))) ;; remove immediately instead of waiting for on-exit +;; (debug:print 0 *default-log-port* "best-srv-key: "best-srv-key", server-key: "server-key", i-am-srv: "i-am-srv) +;; ;; am I the best-srv, compare server-keys to know +;; (if i-am-srv +;; (if (get-lock-db sdat db-file (servdat-host sdat)(servdat-port sdat)) ;; (db:get-iam-server-lock *dbstruct-db* *toppath* run-id) +;; (begin +;; (debug:print-info 0 *default-log-port* "I'm the server!") +;; (servdat-dbfile-set! sdat db-file) +;; (servdat-status-set! sdat 'db-locked)) +;; (begin +;; (debug:print-info 0 *default-log-port* "I'm not the server, exiting.") +;; (bdat-time-to-exit-set! *bdat* #t) +;; (delete-pkt) +;; (thread-sleep! 0.2) +;; (exit))) +;; (begin +;; (debug:print-info 0 *default-log-port* +;; "Keys do not match "best-srv-key", "server-key", exiting.") +;; (bdat-time-to-exit-set! *bdat* #t) +;; (delete-pkt) +;; (thread-sleep! 0.2) +;; (exit))) +;; sdat)) +;; (begin ;; sdat not yet contains server info +;; (debug:print-info 0 *default-log-port* "Still waiting, last-sdat=" last-sdat) +;; (sleep 4) +;; (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes +;; (begin +;; (debug:print-error 0 *default-log-port* "transport appears to have died, exiting server") +;; (exit)) +;; (loop start-time +;; (equal? sdat last-sdat) +;; sdat)))))))) +;; +;; (define (rmt:register-server sinfo apath iface port server-key dbname) +;; (servdat-conns sinfo) ;; just checking types +;; (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db +;; (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath +;; (db:run-id->dbname #f) +;; 'register-server `(,iface +;; ,port +;; ,server-key +;; ,(current-process-id) +;; ,iface +;; ,apath +;; ,dbname))) +;; +;; (define (rmt:get-count-servers sinfo apath) +;; (servdat-conns sinfo) ;; just checking types +;; (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db +;; (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath +;; (db:run-id->dbname #f) +;; 'get-count-servers `(,apath))) + +(define (rmt:get-servers-info apath) + (rmt:send-receive 'get-servers-info #f `(,apath))) + +;; (define (rmt:deregister-server db-serv-info apath iface port server-key dbname) +;; (rmt:open-main-connection db-serv-info apath) ;; we need a channel to main.db +;; (rmt:send-receive-real db-serv-info apath ;; params: host port servkey pid ipaddr dbpath +;; (db:run-id->dbname #f) +;; 'deregister-server `(,iface +;; ,port +;; ,server-key +;; ,(current-process-id) +;; ,iface +;; ,apath +;; ,dbname))) +;; +;; (define (rmt:wait-for-stable-interface #!optional (num-tries-allowed 100)) +;; ;; wait until *db-serv-info* stops changing +;; (let* ((stime (current-seconds))) +;; (let loop ((last-host #f) +;; (last-port #f) +;; (tries 0)) +;; (let* ((curr-host (and *db-serv-info* (servdat-host *db-serv-info*))) +;; (curr-port (and *db-serv-info* (servdat-port *db-serv-info*)))) +;; ;; first we verify port and interface, update *db-serv-info* in need be. +;; (cond +;; ((> tries num-tries-allowed) +;; (debug:print 0 *default-log-port* "rmt:keep-running, giving up after trying for several minutes.") +;; (exit 1)) +;; ((not *db-serv-info*) +;; (thread-sleep! 0.25) +;; (loop curr-host curr-port (+ tries 1))) +;; ((or (not last-host)(not last-port)) +;; (debug:print 0 *default-log-port* "rmt:keep-running, still no interface, tries="tries) +;; (thread-sleep! 0.25) +;; (loop curr-host curr-port (+ tries 1))) +;; ((or (not (equal? last-host curr-host)) +;; (not (equal? last-port curr-port))) +;; (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info") +;; (thread-sleep! 0.25) +;; (loop curr-host curr-port (+ tries 1))) +;; ((< (- (current-seconds) stime) 1) ;; keep up the looping until at least 3 seconds have passed +;; (thread-sleep! 0.5) +;; (loop curr-host curr-port (+ tries 1))) +;; (else +;; (rmt:get-signature) ;; sets *my-signature* as side effect +;; (servdat-status-set! *db-serv-info* 'interface-stable) +;; (debug:print 0 *default-log-port* +;; "SERVER STARTED: " curr-host +;; ":" curr-port +;; " AT " (current-seconds) " server signature: " *my-signature* +;; " with "(servdat-trynum *db-serv-info*)" port changes") +;; (flush-output *default-log-port*) +;; #t)))))) +;; +;; ;; run rmt:keep-running in a parallel thread to monitor that the db is being +;; ;; used and to shutdown after sometime if it is not. +;; ;; +;; (define (rmt:keep-running dbname) +;; ;; if none running or if > 20 seconds since +;; ;; server last used then start shutdown +;; ;; This thread waits for the server to come alive +;; (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server") +;; +;; (let* ((sinfo *db-serv-info*) +;; (server-start-time (current-seconds)) +;; (pkts-dir (get-pkts-dir)) +;; (server-key (rmt:get-signature)) ;; This servers key +;; (is-main (equal? (args:get-arg "-db") ".db/main.db")) +;; (last-access 0) +;; (server-timeout (server:expiration-timeout)) +;; (shutdown-server-sequence (lambda (host port) +;; (set! *unclean-shutdown* #f) ;; Should not be needed anymore +;; (debug:print-info 0 *default-log-port* "Starting to shutdown the server. pid="(current-process-id)) +;; ;; (rmt:server-shutdown host port) -- called in on-exit +;; ;; (portlogger:open-run-close portlogger:set-port port "released") called in on-exit +;; (exit))) +;; (timed-out? (lambda () +;; (<= (+ last-access server-timeout) +;; (current-seconds))))) +;; (servdat-dbfile-set! *db-serv-info* (args:get-arg "-db")) +;; ;; main and run db servers have both got wait logic (could/should merge it) +;; (if is-main +;; (rmt:wait-for-server pkts-dir dbname server-key) +;; (rmt:wait-for-stable-interface)) +;; ;; this is our forever loop +;; (let* ((iface (servdat-host *db-serv-info*)) +;; (port (servdat-port *db-serv-info*)) +;; (uconn (servdat-uconn *db-serv-info*))) +;; (let loop ((count 0) +;; (bad-sync-count 0) +;; (start-time (current-milliseconds))) +;; (if (and (not is-main) +;; (common:low-noise-print 60 "servdat-status")) +;; (debug:print-info 0 *default-log-port* "servdat-status is " (servdat-status *db-serv-info*))) +;; +;; (mutex-lock! *heartbeat-mutex*) +;; ;; set up the database handle +;; (if (not *dbstruct-db*) ;; no db opened yet, open the db and register with main if appropriate +;; (let ((watchdog (bdat-watchdog *bdat*))) +;; (debug:print 0 *default-log-port* "SERVER: dbprep") +;; (db:setup dbname) ;; sets *dbstruct-db* as side effect +;; (servdat-status-set! *db-serv-info* 'db-opened) +;; ;; IFF I'm not main, call into main and register self +;; (if (not is-main) +;; (let ((res (rmt:register-server sinfo +;; *toppath* iface port +;; server-key dbname))) +;; (if res ;; we are the server +;; (servdat-status-set! *db-serv-info* 'have-interface-and-db) +;; ;; now check that the db locker is alive, clear it out if not +;; (let* ((serv-info (rmt:server-info *toppath* dbname))) +;; (match serv-info +;; ((host port servkey pid ipaddr apath dbpath) +;; (if (not (server-ready? uconn (conc host":"port) servkey)) +;; (begin +;; (debug:print-info 0 *default-log-port* "Server registered but not alive. Removing and trying again.") +;; (rmt:deregister-server sinfo apath host port servkey dbpath) ;; servkey pid ipaddr apath dbpath) +;; (loop (+ count 1) bad-sync-count start-time)))) +;; (else +;; (debug:print 0 *default-log-port* "We are not the server for "dbname", exiting. Server info is: "serv-info) +;; (exit))))))) +;; (debug:print 0 *default-log-port* +;; "SERVER: running, db "dbname" opened, megatest version: " +;; (common:get-full-version)) +;; ;; start the watchdog +;; +;; ;; is this really needed? +;; +;; #;(if watchdog +;; (if (not (member (thread-state watchdog) +;; '(ready running blocked +;; sleeping dead))) +;; (begin +;; (debug:print-info 0 *default-log-port* "Starting watchdog thread (in state "(thread-state watchdog)")") +;; (thread-start! watchdog)) +;; (debug:print-info 0 *default-log-port* "Not starting watchdog thread (in state "(thread-state watchdog)")")) +;; (debug:print 0 *default-log-port* "ERROR: *watchdog* not setup, cannot start it.")) +;; #;(loop (+ count 1) bad-sync-count start-time) +;; )) +;; +;; (db:sync-inmem->disk *dbstruct-db* *toppath* dbname force-sync: #t) +;; +;; (mutex-unlock! *heartbeat-mutex*) +;; +;; ;; when things go wrong we don't want to be doing the various +;; ;; queries too often so we strive to run this stuff only every +;; ;; four seconds or so. +;; (let* ((sync-time (- (current-milliseconds) start-time)) +;; (rem-time (quotient (- 4000 sync-time) 1000))) +;; (if (and (<= rem-time 4) +;; (> rem-time 0)) +;; (thread-sleep! rem-time))) +;; +;; ;; Transfer *db-last-access* to last-access to use in checking that we are still alive +;; (set! last-access *db-last-access*) +;; +;; (if (< count 1) ;; 3x3 = 9 secs aprox +;; (loop (+ count 1) bad-sync-count (current-milliseconds))) +;; +;; (if (common:low-noise-print 60 "dbstats") +;; (begin +;; (debug:print 0 *default-log-port* "Server stats:") +;; (db:print-current-query-stats))) +;; (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600))) +;; (cond +;; ((not *server-run*) +;; (debug:print-info 0 *default-log-port* "*server-run* set to #f. Shutting down.") +;; (shutdown-server-sequence (get-host-name) port)) +;; ((timed-out?) +;; (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access)) +;; (shutdown-server-sequence (get-host-name) port)) +;; ((and *server-run* +;; (or (not (timed-out?)) +;; (if is-main ;; do not exit if there are other servers (keep main open until all others gone) +;; (> (rmt:get-count-servers sinfo *toppath*) 1) +;; #f))) +;; (if (common:low-noise-print 120 "server continuing") +;; (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access))) +;; (loop 0 bad-sync-count (current-milliseconds))) +;; (else +;; (set! *unclean-shutdown* #f) +;; (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access)) +;; (shutdown-server-sequence (get-host-name) port) +;; #;(debug:print-info 0 *default-log-port* "Sending 'quit to server, received: " +;; (open-send-receive-nn (conc iface":"port) ;; do this here and not in server-shutdown +;; (sexpr->string 'quit)))))))))) +;; +;; (define (rmt:get-reasonable-hostname) +;; (let* ((inhost (or (args:get-arg "-server") "-"))) +;; (if (equal? inhost "-") +;; (get-host-name) +;; inhost))) +;; +;; Call this to start the actual server +;; +;; all routes though here end in exit ... +;; +;; This is the point at which servers are started +;; +(define (rmt:server-launch dbname) + (debug:print-info 0 *default-log-port* "Entered rmt:server-launch") + #;(let* ((th2 (make-thread (lambda () + (debug:print-info 0 *default-log-port* "Server run thread started") + (rmt:run (rmt:get-reasonable-hostname))) + "Server run")) + (th3 (make-thread (lambda () + (debug:print-info 0 *default-log-port* "Server monitor thread started") + (if (args:get-arg "-server") + (rmt:keep-running dbname))) + "Keep running"))) + (thread-start! th2) + (thread-sleep! 0.252) ;; give the server time to settle before starting the keep-running monitor. + (thread-start! th3) + (set! *didsomething* #t) + (thread-join! th2) + (thread-join! th3)) + #f) +;; +;; ;;====================================================================== +;; ;; S E R V E R - D I R E C T C A L L S +;; ;;====================================================================== +;; +;; (define (rmt:kill-server run-id) +;; (rmt:send-receive 'kill-server #f (list run-id))) +;; +;; (define (rmt:start-server run-id) +;; (rmt:send-receive 'start-server #f (list run-id))) +;; +;; (define (rmt:server-info apath dbname) +;; (rmt:send-receive 'get-server-info #f (list apath dbname))) +;; +;; ;;====================================================================== +;; ;; Nanomsg transport +;; ;;====================================================================== +;; +;; #;(define (is-port-in-use port-num) +;; (let* ((ret #f)) +;; (let-values (((inp oup pid) +;; (process "netstat" (list "-tulpn" )))) +;; (let loop ((inl (read-line inp))) +;; (if (not (eof-object? inl)) +;; (begin +;; (if (string-search (regexp (conc ":" port-num)) inl) +;; (begin +;; ;(print "Output: " inl) +;; (set! ret #t)) +;; (loop (read-line inp))))))) +;; ret)) +;; +;; #;(define (open-nn-connection host-port) +;; (let ((req (make-req-socket)) +;; (uri (conc "tcp://" host-port))) +;; (nng-dial req uri) +;; (socket-set! req 'nng/recvtimeo 2000) +;; req)) +;; +;; #;(define (send-receive-nn req msg) +;; (nng-send req msg) +;; (nng-recv req)) +;; +;; #;(define (close-nn-connection req) +;; (nng-close! req)) +;; +;; ;; ;; open connection to server, send message, close connection +;; ;; ;; +;; ;; (define (open-send-close-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds +;; ;; (let ((req (make-req-socket 'req)) +;; ;; (uri (conc "tcp://" host-port)) +;; ;; (res #f) +;; ;; ;; (contacts (alist-ref 'contact attrib)) +;; ;; ;; (mode (alist-ref 'mode attrib)) +;; ;; ) +;; ;; (socket-set! req 'nng/recvtimeo 2000) +;; ;; (handle-exceptions +;; ;; exn +;; ;; (let ((emsg ((condition-property-accessor 'exn 'message) exn))) +;; ;; ;; Send notification +;; ;; (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\"" ) +;; ;; #f) +;; ;; (nng-dial req uri) +;; ;; ;; (print "Connected to the server " ) +;; ;; (nng-send req msg) +;; ;; ;; (print "Request Sent") +;; ;; (let* ((th1 (make-thread (lambda () +;; ;; (let ((resp (nng-recv req))) +;; ;; (nng-close! req) +;; ;; (set! res (if (equal? resp "ok") +;; ;; #t +;; ;; #f)))) +;; ;; "recv thread")) +;; ;; (th2 (make-thread (lambda () +;; ;; (thread-sleep! timeout) +;; ;; (thread-terminate! th1)) +;; ;; "timer thread"))) +;; ;; (thread-start! th1) +;; ;; (thread-start! th2) +;; ;; (thread-join! th1) +;; ;; res)))) +;; ;; +;; #;(define (open-send-receive-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds +;; (let ((req (make-req-socket)) +;; (uri (conc "tcp://" host-port)) +;; (res #f)) +;; (handle-exceptions +;; exn +;; (let ((emsg ((condition-property-accessor 'exn 'message) exn))) +;; ;; Send notification +;; (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\", exn=" exn) +;; #f) +;; (nng-dial req uri) +;; (nng-send req msg) +;; (let* ((th1 (make-thread (lambda () +;; (let ((resp (nng-recv req))) +;; (nng-close! req) +;; ;; (print resp) +;; (set! res resp))) +;; "recv thread")) +;; (th2 (make-thread (lambda () +;; (thread-sleep! timeout) +;; (thread-terminate! th1)) +;; "timer thread"))) +;; (thread-start! th1) +;; (thread-start! th2) +;; (thread-join! th1) +;; res)))) +;; +;; ;;====================================================================== +;; ;; S E R V E R U T I L I T I E S +;; ;;====================================================================== +;; +;; ;; run ping in separate process, safest way in some cases +;; ;; +;; #;(define (server:ping-server ifaceport) +;; (with-input-from-pipe +;; (conc (common:get-megatest-exe) " -ping " ifaceport) +;; (lambda () +;; (let loop ((inl (read-line)) +;; (res "NOREPLY")) +;; (if (eof-object? inl) +;; (case (string->symbol res) +;; ((NOREPLY) #f) +;; ((LOGIN_OK) #t) +;; (else #f)) +;; (loop (read-line) inl)))))) +;; +;; ;; NOT USED (well, ok, reference in rpc-transport but otherwise not used). +;; ;; +;; #;(define (server:login toppath) +;; (lambda (toppath) +;; (set! *db-last-access* (current-seconds)) ;; might not be needed. +;; (if (equal? *toppath* toppath) +;; #t +;; #f))) +;; +;; ;; (define server:sync-lock-token "SERVER_SYNC_LOCK") +;; ;; (define (server:release-sync-lock) +;; ;; (db:no-sync-del! *no-sync-db* server:sync-lock-token)) +;; ;; (define (server:have-sync-lock?) +;; ;; (let* ((have-lock-pair (db:no-sync-get-lock *no-sync-db* server:sync-lock-token)) +;; ;; (have-lock? (car have-lock-pair)) +;; ;; (lock-time (cdr have-lock-pair)) +;; ;; (lock-age (- (current-seconds) lock-time))) +;; ;; (cond +;; ;; (have-lock? #t) +;; ;; ((>lock-age +;; ;; (* 3 (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 180))) +;; ;; (server:release-sync-lock) +;; ;; (server:have-sync-lock?)) +;; ;; (else #f)))) + +) ADDED ulex-none/ulex.scm Index: ulex-none/ulex.scm ================================================================== --- /dev/null +++ ulex-none/ulex.scm @@ -0,0 +1,569 @@ +;; ulex: Distributed sqlite3 db +;;; +;; Copyright (C) 2018-2021 Matt Welland +;; Redistribution and use in source and binary forms, with or without +;; modification, is permitted. +;; +;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS +;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE +;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT +;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +;; DAMAGE. + +;;====================================================================== +;; ABOUT: +;; See README in the distribution at https://www.kiatoa.com/fossils/ulex +;; NOTES: +;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity. +;; +;;====================================================================== + +(module ulex + * + #;( + + ;; NOTE: looking for the handler proc - find the run-listener :) + + run-listener ;; (run-listener handler-proc [port]) => uconn + + ;; NOTE: handler-proc params; + ;; (handler-proc rem-host-port qrykey cmd params) + + send-receive ;; (send-receive uconn host-port cmd data) + + ;; NOTE: cmd can be any plain text symbol except for these; + ;; 'ping 'ack 'goodbye 'response + + set-work-handler ;; (set-work-handler proc) + + wait-and-close ;; (wait-and-close uconn) + + ulex-listener? + + ;; needed to get the interface:port that was automatically found + udat-port + udat-host-port + + ;; for testing only + ;; pp-uconn + + ;; parameters + work-method ;; parameter; 'threads, 'mailbox, 'limited, 'direct + return-method ;; parameter; 'mailbox, 'polling, 'direct + ) + +(import scheme + chicken.base + chicken.file + chicken.io + chicken.time + chicken.condition + chicken.string + chicken.sort + chicken.pretty-print + + address-info + mailbox + matchable + ;; queues + regex + regex-case + simple-exceptions + s11n + srfi-1 + srfi-18 + srfi-4 + srfi-69 + system-information + tcp6 + typed-records + ) + +;; ;; udat struct, used by both caller and callee +;; ;; instantiated as uconn by convention +;; ;; +;; (defstruct udat +;; ;; the listener side +;; (port #f) +;; (host-port #f) +;; (socket #f) +;; ;; the peers +;; (peers (make-hash-table)) ;; host:port->peer +;; ;; work handling +;; (work-queue (make-mailbox)) +;; (work-proc #f) ;; set by user +;; (cnum 0) ;; cookie number +;; (mboxes (make-hash-table)) ;; for the replies +;; (avail-cmboxes '()) ;; list of ( . ) for re-use +;; ;; threads +;; (numthreads 10) +;; (cmd-thread #f) +;; (work-queue-thread #f) +;; (num-threads-running 0) +;; ) +;; +;; ;; Parameters +;; +;; ;; work-method: +;; (define work-method (make-parameter 'mailbox)) +;; ;; mailbox - all rdat goes through mailbox +;; ;; threads - all rdat immediately executed in new thread +;; ;; direct - no queuing +;; ;; +;; +;; ;; return-method, return the result to waiting send-receive: +;; (define return-method (make-parameter 'mailbox)) +;; ;; mailbox - create a mailbox and use it for passing returning results to send-receive +;; ;; polling - put the result in a hash table keyed by qrykey and send-receive can poll it for result +;; ;; direct - no queuing, result is passed back in single tcp connection +;; ;; +;; +;; ;; ;; struct for keeping track of others we are talking to +;; ;; ;; +;; ;; (defstruct pdat +;; ;; (host-port #f) +;; ;; (conns '()) ;; list of pcon structs, pop one off when calling the peer +;; ;; ) +;; ;; +;; ;; ;; struct for peer connections, keep track of expiration etc. +;; ;; ;; +;; ;; (defstruct pcon +;; ;; (inp #f) +;; ;; (oup #f) +;; ;; (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59) +;; ;; (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes +;; ;; ) +;; +;; ;;====================================================================== +;; ;; listener +;; ;;====================================================================== +;; +;; ;; is uconn a ulex connector (listener) +;; ;; +;; (define (ulex-listener? uconn) +;; (udat? uconn)) +;; +;; ;; create a tcp listener and return a populated udat struct with +;; ;; my port, address, hostname, pid etc. +;; ;; return #f if fail to find a port to allocate. +;; ;; +;; ;; if udata-in is #f create the record +;; ;; if there is already a serv-listener return the udata +;; ;; +;; (define (setup-listener uconn #!optional (port 4242)) +;; (handle-exceptions +;; exn +;; (if (< port 65535) +;; (setup-listener uconn (+ port 1)) +;; #f) +;; (connect-listener uconn port))) +;; +;; (define (connect-listener uconn port) +;; ;; (tcp-listener-socket LISTENER)(socket-name so) +;; ;; sockaddr-address, sockaddr-port, sockaddr->string +;; (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]]) +;; (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname))) +;; (udat-port-set! uconn port) +;; (udat-host-port-set! uconn (conc addr":"port)) +;; (udat-socket-set! uconn tlsn) +;; uconn)) +;; +;; ;; run-listener does all the work of starting a listener in a thread +;; ;; it then returns control +;; ;; +;; (define (run-listener handler-proc #!optional (port-suggestion 4242)) +;; (let* ((uconn (make-udat))) +;; (udat-work-proc-set! uconn handler-proc) +;; (if (setup-listener uconn port-suggestion) +;; (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop")) +;; (th2 (make-thread (lambda () +;; (case (work-method) +;; ((mailbox limited) +;; (process-work-queue uconn)))) +;; "Ulex work queue processor"))) +;; ;; (tcp-buffer-size 2048) +;; (thread-start! th1) +;; (thread-start! th2) +;; (udat-cmd-thread-set! uconn th1) +;; (udat-work-queue-thread-set! uconn th2) +;; (print "cmd loop and process workers started, listening on "(udat-host-port uconn)".") +;; uconn) +;; (assert #f "ERROR: run-listener called without proper setup.")))) +;; +;; (define (wait-and-close uconn) +;; (thread-join! (udat-cmd-thread uconn)) +;; (tcp-close (udat-socket uconn))) +;; +;; ;;====================================================================== +;; ;; peers and connections +;; ;;====================================================================== +;; +;; (define *send-mutex* (make-mutex)) +;; +;; ;; send structured data to recipient +;; ;; +;; ;; NOTE: qrykey is what was called the "cookie" previously +;; ;; +;; ;; retval tells send to expect and wait for return data (one line) and return it or time out +;; ;; this is for ping where we don't want to necessarily have set up our own server yet. +;; ;; +;; ;; NOTE: see below for beginnings of code to allow re-use of tcp connections +;; ;; - I believe (without substantial evidence) that re-using connections will +;; ;; be beneficial ... +;; ;; +;; (define (send udata host-port qrykey cmd params) +;; (let* ((my-host-port (udat-host-port udata)) ;; remote will return to this +;; (isme #f #;(equal? host-port my-host-port)) ;; calling myself? +;; ;; dat is a self-contained work block that can be sent or handled locally +;; (dat (list my-host-port qrykey cmd params #;(cons (current-seconds)(current-milliseconds))))) +;; (cond +;; (isme (ulex-handler udata dat)) ;; no transmission needed +;; (else +;; (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC? +;; exn +;; (message exn) +;; (begin +;; ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP +;; (let-values (((inp oup)(tcp-connect host-port))) +;; (let ((res (if (and inp oup) +;; (begin +;; (serialize dat oup) +;; (close-output-port oup) +;; (deserialize inp) +;; ) +;; (begin +;; (print "ERROR: send called but no receiver has been setup. Please call setup first!") +;; #f)))) +;; (close-input-port inp) +;; ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP +;; res)))))))) ;; res will always be 'ack unless return-method is direct +;; +;; (define (send-via-polling uconn host-port cmd data) +;; (let* ((qrykey (make-cookie uconn)) +;; (sres (send uconn host-port qrykey cmd data))) +;; (case sres +;; ((ack) +;; (let loop ((start-time (current-milliseconds))) +;; (if (> (current-milliseconds)(+ start-time 10000)) ;; ten seconds timeout +;; (begin +;; (print "ULEX ERROR: timed out waiting for response from "host-port", "cmd" "data) +;; #f) +;; (let* ((result (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) ;; NOTE: we are re-using mboxes hash +;; (if result ;; result is '(status . result-data) or #f for nothing yet +;; (begin +;; (hash-table-delete! (udat-mboxes uconn) qrykey) +;; (cdr result)) +;; (begin +;; (thread-sleep! 0.01) +;; (loop start-time))))))) +;; (else +;; (print "ULEX ERROR: Communication failed? sres="sres) +;; #f)))) +;; +;; (define (send-via-mailbox uconn host-port cmd data) +;; (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse? +;; (qrykey (car cmbox)) +;; (mbox (cdr cmbox)) +;; (mbox-time (current-milliseconds)) +;; (sres (send uconn host-port qrykey cmd data))) ;; short res +;; (if (eq? sres 'ack) +;; (let* ((mbox-timeout-secs 120 #;(if (eq? 'primordial (thread-name (current-thread))) +;; #f +;; 120)) ;; timeout) +;; (mbox-timeout-result 'MBOX_TIMEOUT) +;; (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) +;; (mbox-receive-time (current-milliseconds))) +;; ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it? +;; (hash-table-delete! (udat-mboxes uconn) qrykey) +;; (if (eq? res 'MBOX_TIMEOUT) +;; (begin +;; (print "WARNING: mbox timed out for query "cmd", with data "data +;; ", waiting for response from "host-port".") +;; +;; ;; here it might make sense to clean up connection records and force clean start? +;; ;; NO. The progam using ulex needs to do the reset. Right thing here is exception +;; +;; #f) ;; convert to raising exception? +;; res)) +;; (begin +;; (print "ERROR: Communication failed? Got "sres) +;; #f)))) +;; +;; ;; send a request to the given host-port and register a mailbox in udata +;; ;; wait for the mailbox data and return it +;; ;; +;; (define (send-receive uconn host-port cmd data) +;; (let* ((start-time (current-milliseconds)) +;; (result (cond +;; ((member cmd '(ping goodbye)) ;; these are immediate +;; (send uconn host-port 'ping cmd data)) +;; ((eq? (work-method) 'direct) +;; ;; the result from send will be the actual result, not an 'ack +;; (send uconn host-port 'direct cmd data)) +;; (else +;; (case (return-method) +;; ((polling) +;; (send-via-polling uconn host-port cmd data)) +;; ((mailbox) +;; (send-via-mailbox uconn host-port cmd data)) +;; (else +;; (print "ULEX ERROR: unrecognised return-method "(return-method)".") +;; #f))))) +;; (duration (- (current-milliseconds) start-time))) +;; ;; this is ONLY for development and debugging. It will be removed once Ulex is stable. +;; (if (< 5000 duration) +;; (print "ULEX WARNING: round-trip took "(inexact->exact (round (/ duration 1000))) +;; " seconds; "cmd", host-port="host-port", data="data)) +;; result)) +;; +;; +;; ;;====================================================================== +;; ;; responder side +;; ;;====================================================================== +;; +;; ;; take a request, rdat, and if not immediate put it in the work queue +;; ;; +;; ;; Reserved cmds; ack ping goodbye response +;; ;; +;; (define (ulex-handler uconn rdat) +;; (assert (list? rdat) "FATAL: ulex-handler give rdat as not list") +;; (match rdat ;; (string-split controldat) +;; ((rem-host-port qrykey cmd params);; timedata) +;; ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params) +;; (case cmd +;; ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack) +;; ((ping) +;; ;; (print "Got Ping!") +;; ;; (add-to-work-queue uconn rdat) +;; 'ack) +;; ((goodbye) +;; ;; just clear out references to the caller. NOT COMPLETE +;; (add-to-work-queue uconn rdat) +;; 'ack) +;; ((response) ;; this is a result from remote processing, send it as mail ... +;; (case (return-method) +;; ((polling) +;; (hash-table-set! (udat-mboxes uconn) qrykey (cons 'ok params)) +;; 'ack) +;; ((mailbox) +;; (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) +;; (if mbox +;; (begin +;; (mailbox-send! mbox params) ;; params here is our result +;; 'ack) +;; (begin +;; (print "ERROR: received result but no associated mbox for cookie "qrykey) +;; 'no-mbox-found)))) +;; (else (print "ULEX ERROR: unrecognised return-method "(return-method)) +;; 'bad-return-method))) +;; (else ;; generic request - hand it to the work queue +;; (add-to-work-queue uconn rdat) +;; 'ack))) +;; (else +;; (print "ULEX ERROR: bad rdat "rdat) +;; 'bad-rdat))) +;; +;; ;; given an already set up uconn start the cmd-loop +;; ;; +;; (define (ulex-cmd-loop uconn) +;; (let* ((serv-listener (udat-socket uconn)) +;; (listener (lambda () +;; (let loop ((state 'start)) +;; (let-values (((inp oup)(tcp-accept serv-listener))) +;; ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP +;; (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params) +;; (resp (ulex-handler uconn rdat))) +;; (serialize resp oup) +;; (close-input-port inp) +;; (close-output-port oup) +;; ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP +;; ) +;; (loop state)))))) +;; ;; start N of them +;; (let loop ((thnum 0) +;; (threads '())) +;; (if (< thnum 100) +;; (let* ((th (make-thread listener (conc "listener" thnum)))) +;; (thread-start! th) +;; (loop (+ thnum 1) +;; (cons th threads))) +;; (map thread-join! threads))))) +;; +;; ;; add a proc to the cmd list, these are done symetrically (i.e. in all instances) +;; ;; so that the proc can be dereferenced remotely +;; ;; +;; (define (set-work-handler uconn proc) +;; (udat-work-proc-set! uconn proc)) +;; +;; ;;====================================================================== +;; ;; work queues - this is all happening on the listener side +;; ;;====================================================================== +;; +;; ;; rdat is (rem-host-port qrykey cmd params) +;; +;; (define (add-to-work-queue uconn rdat) +;; #;(queue-add! (udat-work-queue uconn) rdat) +;; (case (work-method) +;; ((threads) +;; (thread-start! (make-thread (lambda () +;; (do-work uconn rdat)) +;; "worker thread"))) +;; ((mailbox) +;; (mailbox-send! (udat-work-queue uconn) rdat)) +;; ((direct) +;; (do-work uconn rdat)) +;; (else +;; (print "ULEX ERROR: work-method "(work-method)" not recognised, using mailbox.") +;; (mailbox-send! (udat-work-queue uconn) rdat)))) +;; +;; ;; move the logic to return the result somewhere else? +;; ;; +;; (define (do-work uconn rdat) +;; (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change +;; ;; put this following into a do-work procedure +;; (match rdat +;; ((rem-host-port qrykey cmd params) +;; (let* ((start-time (current-milliseconds)) +;; (result (proc rem-host-port qrykey cmd params)) +;; (end-time (current-milliseconds)) +;; (run-time (- end-time start-time))) +;; (case (work-method) +;; ((direct) result) +;; (else +;; (print "ULEX: work "cmd", "params" done in "run-time" ms") +;; ;; send 'response as cmd and result as params +;; (send uconn rem-host-port qrykey 'response result) ;; could check for ack +;; (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time)))))) +;; (MBOX_TIMEOUT 'do-work-timeout) +;; (else +;; (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params"))))) +;; +;; ;; NEW APPROACH: +;; ;; +;; (define (process-work-queue uconn) +;; (let ((wqueue (udat-work-queue uconn)) +;; (proc (udat-work-proc uconn)) +;; (numthr (udat-numthreads uconn))) +;; (let loop ((thnum 1) +;; (threads '())) +;; (let ((thlst (cons (make-thread (lambda () +;; (let work-loop () +;; (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT))) +;; (do-work uconn rdat)) +;; (work-loop))) +;; (conc "work thread " thnum)) +;; threads))) +;; (if (< thnum numthr) +;; (loop (+ thnum 1) +;; thlst) +;; (begin +;; (print "ULEX: Starting "(length thlst)" worker threads.") +;; (map thread-start! thlst) +;; (print "ULEX: Threads started. Joining all.") +;; (map thread-join! thlst))))))) +;; +;; ;; below was to enable re-use of connections. This seems non-trivial so for +;; ;; now lets open on each call +;; ;; +;; ;; ;; given host-port get or create peer struct +;; ;; ;; +;; ;; (define (udat-get-peer uconn host-port) +;; ;; (or (hash-table-ref/default (udat-peers uconn) host-port #f) +;; ;; ;; no peer, so create pdat and init it +;; ;; +;; ;; ;; NEED stack of connections, pop and use; inp, oup, +;; ;; ;; creation_time (remove and create new if over 24hrs old +;; ;; ;; +;; ;; (let ((pdat (make-pdat host-port: host-port))) +;; ;; (hash-table-set! (udat-peers uconn) host-port pdat) +;; ;; pdat))) +;; ;; +;; ;; ;; is pcon alive +;; ;; +;; ;; ;; given host-port and pdat get a pcon +;; ;; ;; +;; ;; (define (pdat-get-pcon pdat host-port) +;; ;; (let loop ((conns (pdat-conns pdat))) +;; ;; (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later +;; ;; (init-pcon (make-pcon)) +;; ;; (let* ((conn (pop conns))) +;; ;; +;; ;; ;; given host-port get a pcon struct +;; ;; ;; +;; ;; (define (udat-get-pcon +;; +;; ;;====================================================================== +;; ;; misc utils +;; ;;====================================================================== +;; +;; (define (make-cookie uconn) +;; (let ((newcnum (+ (udat-cnum uconn) 1))) +;; (udat-cnum-set! uconn newcnum) +;; (conc (udat-host-port uconn) ":" +;; newcnum))) +;; +;; ;; cookie/mboxes +;; +;; ;; we store each mbox with a cookie ( . ) +;; ;; +;; (define (get-cmbox uconn) +;; (if (null? (udat-avail-cmboxes uconn)) +;; (let ((cookie (make-cookie uconn)) +;; (mbox (make-mailbox))) +;; (hash-table-set! (udat-mboxes uconn) cookie mbox) +;; `(,cookie . ,mbox)) +;; (let ((cmbox (car (udat-avail-cmboxes uconn)))) +;; (udat-avail-cmboxes-set! uconn (cdr (udat-avail-cmboxes uconn))) +;; cmbox))) +;; +;; (define (put-cmbox uconn cmbox) +;; (udat-avail-cmboxes-set! uconn (cons cmbox (udat-avail-cmboxes uconn)))) +;; +;; (define (pp-uconn uconn) +;; (pp (udat->alist uconn))) +;; +;; +;; ;;====================================================================== +;; ;; network utilities +;; ;;====================================================================== +;; +;; ;; NOTE: Look at address-info egg as alternative to some of this +;; +;; (define (rate-ip ipaddr) +;; (regex-case ipaddr +;; ( "^127\\..*" _ 0 ) +;; ( "^(10\\.0|192\\.168)\\..*" _ 1 ) +;; ( else 2 ) )) +;; +;; ;; Change this to bias for addresses with a reasonable broadcast value? +;; ;; +;; (define (ip-pref-less? a b) +;; (> (rate-ip a) (rate-ip b))) +;; +;; (define (get-my-best-address) +;; (let ((all-my-addresses (get-all-ips))) +;; (cond +;; ((null? all-my-addresses) +;; (get-host-name)) ;; no interfaces? +;; ((eq? (length all-my-addresses) 1) +;; (car all-my-addresses)) ;; only one to choose from, just go with it +;; (else +;; (car (sort all-my-addresses ip-pref-less?)))))) +;; +;; (define (get-all-ips-sorted) +;; (sort (get-all-ips) ip-pref-less?)) +;; +;; (define (get-all-ips) +;; (map address-info-host +;; (filter (lambda (x) +;; (equal? (address-info-type x) "tcp")) +;; (address-infos (get-host-name))))) +;; +) ADDED ulex-simple/dbmgr.scm Index: ulex-simple/dbmgr.scm ================================================================== --- /dev/null +++ ulex-simple/dbmgr.scm @@ -0,0 +1,1101 @@ +;;====================================================================== +;; Copyright 2022, Matthew Welland. +;; +;; This file is part of Megatest. +;; +;; Megatest is free software: you can redistribute it and/or modify +;; it under the terms of the GNU General Public License as published by +;; the Free Software Foundation, either version 3 of the License, or +;; (at your option) any later version. +;; +;; Megatest is distributed in the hope that it will be useful, +;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +;; GNU General Public License for more details. +;; +;; You should have received a copy of the GNU General Public License +;; along with Megatest. If not, see . + +;;====================================================================== + +(declare (unit dbmgrmod)) + +(declare (uses ulex)) +(declare (uses apimod)) +(declare (uses pkts)) +(declare (uses commonmod)) +(declare (uses dbmod)) +(declare (uses mtargs)) +(declare (uses portloggermod)) +(declare (uses debugprint)) + +(module dbmgrmod + * + +(import scheme + chicken.base + chicken.condition + chicken.file + chicken.format + chicken.port + chicken.process + chicken.process-context + chicken.process-context.posix + chicken.sort + chicken.string + chicken.time + + (prefix sqlite3 sqlite3:) + matchable + md5 + message-digest + regex + s11n + srfi-1 + srfi-18 + srfi-69 + system-information + typed-records + + pkts + ulex + + commonmod + apimod + dbmod + debugprint + (prefix mtargs args:) + portloggermod + ) + +;; Configurations for server +;; (tcp-buffer-size 2048) +;; (max-connections 2048) + +;; info about me as a listener and my connections to db servers +;; stored (for now) in *db-serv-info* +;; +(defstruct servdat + (host #f) + (port #f) + (uuid #f) + (dbfile #f) + (uconn #f) ;; this is the listener *FOR THIS PROCESS* + (mode #f) + (status 'starting) + (trynum 0) ;; count the number of ports we've tried + (conns (make-hash-table)) ;; apath/dbname => conndat + ) + +(define *db-serv-info* (make-servdat)) + +(define (servdat->url sdat) + (conc (servdat-host sdat)":"(servdat-port sdat))) + +;; db servers contact info +;; +(defstruct conndat + (apath #f) + (dbname #f) + (fullname #f) + (hostport #f) + (ipaddr #f) + (port #f) + (srvpkt #f) + (srvkey #f) + (lastmsg 0) + (expires 0)) + +(define *srvpktspec* + `((server (host . h) + (port . p) + (servkey . k) + (pid . i) + (ipaddr . a) + (dbpath . d)))) + +;;====================================================================== +;; S U P P O R T F U N C T I O N S +;;====================================================================== + +;; set up the api proc, seems like there should be a better place for this? +;; +;; IS THIS NEEDED ANYMORE? TODO - REMOVE IF POSSIBLE +;; +;; (define api-proc (make-parameter conc)) +;; (api-proc api:execute-requests) + +;; do we have a connection to apath dbname and +;; is it not expired? then return it +;; +;; else setup a connection +;; +;; if that fails, return '(#f "some reason") ;; NB// convert to raising an exception +;; +(define (rmt:get-conn remdat apath dbname) + (let* ((fullname (db:dbname->path apath dbname))) + (hash-table-ref/default (servdat-conns remdat) fullname #f))) + +(define (rmt:drop-conn remdat apath dbname) + (let* ((fullname (db:dbname->path apath dbname))) + (hash-table-delete! (servdat-conns remdat) fullname))) + +(define (rmt:find-main-server uconn apath dbname) + (let* ((pktsdir (get-pkts-dir apath)) + (all-srvpkts (get-all-server-pkts pktsdir *srvpktspec*)) + (viable-srvs (get-viable-servers all-srvpkts dbname))) + (get-the-server uconn apath viable-srvs))) + + +(define *connstart-mutex* (make-mutex)) +(define *last-main-start* 0) + +;; looks for a connection to main, returns if have and not exired +;; creates new otherwise +;; +;; connections for other servers happens by requesting from main +;; +;; TODO: This is unnecessarily re-creating the record in the hash table +;; +(define (rmt:open-main-connection remdat apath) + (let* ((fullpath (db:dbname->path apath ".db/main.db")) + (conns (servdat-conns remdat)) + (conn (rmt:get-conn remdat apath ".db/main.db"))) ;; (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this + (cond + ((and conn ;; conn is NOT a socket, just saying ... + (< (current-seconds) (conndat-expires conn))) + #t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died + ((and conn + (>= (current-seconds)(conndat-expires conn))) + (debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.") + (rmt:drop-conn remdat apath ".db/main.db") ;; + (rmt:open-main-connection remdat apath)) + (else + ;; Below we will find or create and connect to main + (debug:print-info 0 *default-log-port* "rmt:open-main-connection - starting from scratch") + (let* ((dbname (db:run-id->dbname #f)) + (the-srv (rmt:find-main-server myconn apath dbname)) + (start-main-srv (lambda () ;; call IF there is no the-srv found + (mutex-lock! *connstart-mutex*) + (if (> (- (current-seconds) *last-main-start*) 5) ;; at least four seconds since last attempt to start main server + (begin + (api:run-server-process apath dbname) + (set! *last-main-start* (current-seconds)) + (thread-sleep! 1)) + (thread-sleep! 0.25)) + (mutex-unlock! *connstart-mutex*) + (rmt:open-main-connection remdat apath) ;; TODO: Add limit to number of tries + ))) + (if (not the-srv) ;; have server, try connecting to it + (start-main-srv) + (let* ((srv-addr (server-address the-srv)) ;; need serv + (ipaddr (alist-ref 'ipaddr the-srv)) + (port (alist-ref 'port the-srv)) + (srvkey (alist-ref 'servkey the-srv)) + (fullpath (db:dbname->path apath dbname)) + + (new-the-srv (make-conndat + apath: apath + dbname: dbname + fullname: fullpath + hostport: srv-addr + ;; socket: (open-nn-connection srv-addr) - TODO - open ulex connection? + ipaddr: ipaddr + port: port + srvpkt: the-srv + srvkey: srvkey ;; generated by rmt:get-signature on the server side + lastmsg: (current-seconds) + expires: (+ (current-seconds) + (server:expiration-timeout) + -2) ;; this needs to be gathered during the ping + ))) + (hash-table-set! conns fullpath new-the-srv))) + #t))))) + +;; NB// sinfo is a servdat struct +;; +(define (rmt:general-open-connection sinfo apath dbname #!key (num-tries 5)) + (assert (not (equal? dbname ".db/main.db")) "ERROR: general-open-connection should never be called with main as the db") + (let* ((mdbname ".db/main.db") ;; (db:run-id->dbname #f)) TODO: put this back to the lookup when stable + (fullname (db:dbname->path apath dbname)) + (conns (servdat-conns sinfo)) + (mconn (rmt:get-conn sinfo apath ".db/main.db")) + (dconn (rmt:get-conn sinfo apath dbname))) + #;(if (and mconn + (not (debug:print-logger))) + (begin + (debug:print-info 0 *default-log-port* "Turning on logging to main, look in logs dir for main log.") + (debug:print-logger rmt:log-to-main))) + (cond + ((and mconn + dconn + (< (current-seconds)(conndat-expires dconn))) + #t) ;; good to go + ((not mconn) ;; no channel open to main? open it... + (rmt:open-main-connection sinfo apath) + (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1))) + ((not dconn) ;; no channel open to dbname? + (let* ((res (rmt:send-receive-real sinfo apath mdbname 'get-server `(,apath ,dbname)))) + (case res + ((server-started) + (if (> num-tries 0) + (begin + (thread-sleep! 2) + (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1))) + (begin + (debug:print-error 0 *default-log-port* "Failed to start servers needed or open channel to "apath", "dbname) + (exit 1)))) + (else + (if (list? res) ;; server has been registered and the info was returned. pass it on. + (begin ;; ("192.168.0.9" 53817 + ;; "5e34239f48e8973b3813221e54701a01" "24310" + ;; "192.168.0.9" + ;; "/home/matt/data/megatest/tests/simplerun" + ;; ".db/1.db") + (match + res + ((host port servkey pid ipaddr apath dbname) + (debug:print-info 0 *default-log-port* "got "res) + (hash-table-set! conns + fullname + (make-conndat + apath: apath + dbname: dbname + hostport: (conc host":"port) + ;; socket: (open-nn-connection (conc host":"port)) ;; TODO - open ulex connection? + ipaddr: ipaddr + port: port + srvkey: servkey + lastmsg: (current-seconds) + expires: (+ (current-seconds) + (server:expiration-timeout) + -2)))) + (else + (debug:print-info 0 *default-log-port* "return data from starting server did not match host port servkey pid ipaddr apath dbname " res))) + res) + (begin + (debug:print-info 0 *default-log-port* "Unexpected result: " res) + res))))))) + #t)) + +;;====================================================================== + +;; FOR DEBUGGING SET TO #t +;; (define *localmode* #t) +(define *localmode* #f) +(define *dbstruct* (make-dbr:dbstruct)) + +;; Defaults to current area +;; +(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) + (let* ((apath *toppath*) + (sinfo *db-serv-info*) + (dbname (db:run-id->dbname rid))) + (if *localmode* + (api:execute-requests *dbstruct* cmd params) + (begin + (rmt:open-main-connection sinfo apath) + (if rid (rmt:general-open-connection sinfo apath dbname)) + #;(if (not (member cmd '(log-to-main))) + (debug:print-info 0 *default-log-port* "rmt:send-receive "cmd" params="params)) + (assert (not (eq? 'primordial (thread-name (current-thread)))) "FATAL: Do not call rmt:send-receive-real in the primodial thread.") + (let* ((cdat (rmt:get-conn sinfo apath dbname))) + (assert cdat "FATAL: rmt:send-receive-real called without the needed channels opened") + (let* ((uconn (servdat-uconn sinfo)) ;; get the interface to ulex + ;; then send-receive using the ulex layer to host-port stored in cdat + (res (send-receive uconn (conndat-hostport cdat) cmd params))) + (conndat-expires-set! cdat (+ (current-seconds) + (server:expiration-timeout) + -2)) ;; two second margin for network time misalignments etc. + res))) + +;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed +;; sometime in the future. +;; +;; Purpose - call the main.db server and request a server be started +;; for the given area path and dbname +;; + +(define (rmt:print-db-stats) + (let ((fmtstr "~40a~7-d~9-d~20,2-f")) ;; "~20,2-f" + (debug:print 18 *default-log-port* "DB Stats, "(seconds->year-week/day-time (current-seconds))"\n=====================") + (debug:print 18 *default-log-port* (format #f "~40a~8a~10a~10a" "Cmd" "Count" "TotTime" "Avg")) + (for-each (lambda (cmd) + (let ((cmd-dat (hash-table-ref *db-stats* cmd))) + (debug:print 18 *default-log-port* (format #f fmtstr cmd (vector-ref cmd-dat 0) (vector-ref cmd-dat 1) (/ (vector-ref cmd-dat 1)(vector-ref cmd-dat 0)))))) + (sort (hash-table-keys *db-stats*) + (lambda (a b) + (> (vector-ref (hash-table-ref *db-stats* a) 0) + (vector-ref (hash-table-ref *db-stats* b) 0))))))) + +(define (rmt:get-max-query-average run-id) + (mutex-lock! *db-stats-mutex*) + (let* ((runkey (conc "run-id=" run-id " ")) + (cmds (filter (lambda (x) + (substring-index runkey x)) + (hash-table-keys *db-stats*))) + (res (if (null? cmds) + (cons 'none 0) + (let loop ((cmd (car cmds)) + (tal (cdr cmds)) + (max-cmd (car cmds)) + (res 0)) + (let* ((cmd-dat (hash-table-ref *db-stats* cmd)) + (tot (vector-ref cmd-dat 0)) + (curravg (/ (vector-ref cmd-dat 1) (vector-ref cmd-dat 0))) ;; count is never zero by construction + (currmax (max res curravg)) + (newmax-cmd (if (> curravg res) cmd max-cmd))) + (if (null? tal) + (if (> tot 10) + (cons newmax-cmd currmax) + (cons 'none 0)) + (loop (car tal)(cdr tal) newmax-cmd currmax))))))) + (mutex-unlock! *db-stats-mutex*) + res)) + +;; host and port are used to ensure we are remove proper records +(define (rmt:server-shutdown host port) + (let ((dbfile (servdat-dbfile *db-serv-info*))) + (debug:print-info 0 *default-log-port* "dbfile is "dbfile) + (if dbfile + (let* ((am-server (args:get-arg "-server")) + (dbfile (args:get-arg "-db")) + (apath *toppath*) + #;(sinfo *remotedat*)) ;; foundation for future fix + (if *dbstruct-db* + (let* ((dbdat (db:get-dbdat *dbstruct-db* apath dbfile)) + (db (dbr:dbdat-db dbdat)) + (inmem (dbr:dbdat-db dbdat)) ;; WRONG + ) + ;; do a final sync here + (debug:print-info 0 *default-log-port* "Doing final sync for "apath" "dbfile" at "(current-seconds)) + (db:sync-inmem->disk *dbstruct-db* apath dbfile force-sync: #t) + ;; let's finalize here + (debug:print-info 0 *default-log-port* "Finalizing db and inmem") + (if (sqlite3:database? db) + (sqlite3:finalize! db) + (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, db is not a database, not finalizing...")) + (if (sqlite3:database? inmem) + (sqlite3:finalize! inmem) + (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, inmem is not a database, not finalizing...")) + (debug:print-info 0 *default-log-port* "Finalizing db and inmem complete")) + (debug:print-info 0 *default-log-port* "Db was never opened, no cleanup to do.")) + (if (not am-server) + (debug:print-info 0 *default-log-port* "I am not a server, should NOT get here!") + (if (string-match ".*/main.db$" dbfile) + (let ((pkt-file (conc (get-pkts-dir *toppath*) + "/" (servdat-uuid *db-serv-info*) + ".pkt"))) + (debug:print-info 0 *default-log-port* "removing pkt "pkt-file) + (delete-file* pkt-file) + (debug:print-info 0 *default-log-port* "Releasing lock (if any) for "dbfile ", host "host", port "port) + (db:with-lock-db + (servdat-dbfile *db-serv-info*) + (lambda (dbh dbfile) + (db:release-lock dbh dbfile host port)))) ;; I'm not the server - should not have a lock to remove + (let* ((sdat *db-serv-info*) ;; we have a run-id server + (host (servdat-host sdat)) + (port (servdat-port sdat)) + (uuid (servdat-uuid sdat)) + (res (rmt:deregister-server *db-serv-info* *toppath* host port uuid dbfile))) + (debug:print-info 0 *default-log-port* "deregistered-server, res="res) + (debug:print-info 0 *default-log-port* "deregistering server "host":"port" with uuid "uuid) + ))))))) + + +(define (common:run-sync?) + ;; (and (common:on-homehost?) + (args:get-arg "-server")) + +(define *rmt:run-mutex* (make-mutex)) +(define *rmt:run-flag* #f) + +;; Main entry point to start a server. was start-server +(define (rmt:run hostn) + (mutex-lock! *rmt:run-mutex*) + (if *rmt:run-flag* + (begin + (debug:print-warn 0 *default-log-port* "rmt:run already running.") + (mutex-unlock! *rmt:run-mutex*)) + (begin + (set! *rmt:run-flag* #t) + (mutex-unlock! *rmt:run-mutex*) + ;; ;; Configurations for server + ;; (tcp-buffer-size 2048) + ;; (max-connections 2048) + (debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...") + (if (and *db-serv-info* + (servdat-uconn *db-serv-info*)) + (let* ((uconn (servdat-uconn *db-serv-info*))) + (wait-and-close uconn)) + (let* ((port (portlogger:open-run-close portlogger:find-port)) + (handler-proc (lambda (rem-host-port qrykey cmd params) ;; + (set! *db-last-access* (current-seconds)) + (assert (list? params) "FATAL: handler called with non-list params") + (assert (args:get-arg "-server") "FATAL: handler called on non-server side. cmd="cmd", params="params) + (debug:print 0 *default-log-port* "handler call: "cmd", params="params) + (api:execute-requests *dbstruct-db* cmd params)))) + ;; (api:process-request *dbstuct-db* + (if (not *db-serv-info*) + (set! *db-serv-info* (make-servdat host: hostn port: port))) + (let* ((uconn (run-listener handler-proc port)) + (rport (udat-port uconn))) ;; the real port + (servdat-host-set! *db-serv-info* hostn) + (servdat-port-set! *db-serv-info* rport) + (servdat-uconn-set! *db-serv-info* uconn) + (wait-and-close uconn) + (db:print-current-query-stats) + ))) + (let* ((host (servdat-host *db-serv-info*)) + (port (servdat-port *db-serv-info*)) + (mode (or (servdat-mode *db-serv-info*) + "non-db"))) + ;; server exit stuff here + ;; (rmt:server-shutdown host port) - always do in on-exit + ;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit + (debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting") + )))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + + +;;====================================================================== +;; NEW SERVER METHOD +;;====================================================================== + +;; only use for main.db - need to re-write some of this :( +;; +(define (get-lock-db sdat dbfile host port) + (assert host "FATAL: get-lock-db called with host not set.") + (assert port "FATAL: get-lock-db called with port not set.") + (let* ((dbh (db:open-run-db dbfile db:initialize-db)) ;; open-run-db creates a standard db with schema used by all situations + (res (db:get-iam-server-lock dbh dbfile host port)) + (uconn (servdat-uconn sdat))) + ;; res => list then already locked, check server is responsive + ;; => #t then sucessfully got the lock + ;; => #f reserved for future use as to indicate something went wrong + (match res + ((owner_pid owner_host owner_port event_time) + (if (server-ready? uconn (conc owner_host":"owner_port) "abc") + #f ;; locked by someone else + (begin ;; locked by someone dead and gone + (debug:print 0 *default-log-port* "WARNING: stale lock - have to steal it. This may fail.") + (db:steal-lock-db dbh dbfile port)))) + (#t #t) ;; placeholder so that we don't touch res if it is #t + (else (set! res #f))) + (sqlite3:finalize! dbh) + res)) + + +(define (register-server pkts-dir pkt-spec host port servkey ipaddr dbpath) + (let* ((pkt-dat `((host . ,host) + (port . ,port) + (servkey . ,servkey) + (pid . ,(current-process-id)) + (ipaddr . ,ipaddr) + (dbpath . ,dbpath))) + (uuid (write-alist->pkt + pkts-dir + pkt-dat + pktspec: pkt-spec + ptype: 'server))) + (debug:print 0 *default-log-port* "Server on "host":"port" registered in pkt "uuid) + uuid)) + +(define (get-pkts-dir #!optional (apath #f)) + (let* ((effective-toppath (or *toppath* apath))) + (assert effective-toppath + "ERROR: get-pkts-dir called without *toppath* set. Exiting.") + (let* ((pdir (conc effective-toppath "/.meta/srvpkts"))) + (if (file-exists? pdir) + pdir + (begin + (handle-exceptions ;; this exception handler should NOT be needed but ... + exn + pdir + (create-directory pdir #t)) + pdir))))) + +;; given a pkts dir read +;; +(define (get-all-server-pkts pktsdir-in pktspec) + (let* ((pktsdir (if (file-exists? pktsdir-in) + pktsdir-in + (begin + (create-directory pktsdir-in #t) + pktsdir-in))) + (all-pkt-files (glob (conc pktsdir "/*.pkt")))) + (map (lambda (pkt-file) + (read-pkt->alist pkt-file pktspec: pktspec)) + all-pkt-files))) + +(define (server-address srv-pkt) + (conc (alist-ref 'host srv-pkt) ":" + (alist-ref 'port srv-pkt))) + +(define (server-ready? uconn host-port key) ;; server-address is host:port + (let* ((params `((cmd . ping)(key . ,key))) + (data `((cmd . ping) + (key . ,key) + (params . ,params))) ;; I don't get it. + (res (send-receive uconn host-port 'ping data))) + (if (eq? res 'ack) ;; yep, likely it is who we want on the other end + res + #f))) +;; (begin (debug:print-info 0 *default-log-port* "server-ready? => "res) #f)))) + +; from the pkts return servers associated with dbpath +;; NOTE: Only one can be alive - have to check on each +;; in the list of pkts returned +;; +(define (get-viable-servers serv-pkts dbpath) + (let loop ((tail serv-pkts) + (res '())) + (if (null? tail) + res ;; NOTE: sort by age so oldest is considered first + (let* ((spkt (car tail))) + (loop (cdr tail) + (if (equal? dbpath (alist-ref 'dbpath spkt)) + (cons spkt res) + res)))))) + +(define (remove-pkts-if-not-alive uconn serv-pkts) + (filter (lambda (pkt) + (let* ((host (alist-ref 'host pkt)) + (port (alist-ref 'port pkt)) + (host-port (conc host":"port)) + (key (alist-ref 'servkey pkt)) + (pktz (alist-ref 'Z pkt)) + (res (server-ready? uconn host-port key))) + (if res + res + (let* ((pktsdir (get-pkts-dir *toppath*)) + (pktpath (conc pktsdir"/"pktz".pkt"))) + (debug:print 0 *default-log-port* "WARNING: pkt with no server "pktpath) + (delete-file* pktpath) + #f)))) + serv-pkts)) + +;; from viable servers get one that is alive and ready +;; +(define (get-the-server uconn apath serv-pkts) + (let loop ((tail serv-pkts)) + (if (null? tail) + #f + (let* ((spkt (car tail)) + (host (alist-ref 'ipaddr spkt)) + (port (alist-ref 'port spkt)) + (host-port (conc host":"port)) + (dbpth (alist-ref 'dbpath spkt)) + (srvkey (alist-ref 'Z spkt)) ;; (alist-ref 'srvkey spkt)) + (addr (server-address spkt))) + (if (server-ready? uconn host-port srvkey) + spkt + (loop (cdr tail))))))) + +;; am I the "first" in line server? I.e. my D card is smallest +;; use Z card as tie breaker +;; +(define (get-best-candidate serv-pkts dbpath) + (if (null? serv-pkts) + #f + (let loop ((tail serv-pkts) + (best (car serv-pkts))) + (if (null? tail) + best + (let* ((candidate (car tail)) + (candidate-bd (string->number (alist-ref 'D candidate))) + (best-bd (string->number (alist-ref 'D best))) + ;; bigger number is younger + (candidate-z (alist-ref 'Z candidate)) + (best-z (alist-ref 'Z best)) + (new-best (cond + ((> best-bd candidate-bd) ;; best is younger than candidate + candidate) + ((< best-bd candidate-bd) ;; candidate is younger than best + best) + (else + (if (string>=? best-z candidate-z) + best + candidate))))) ;; use Z card as tie breaker + (if (null? tail) + new-best + (loop (cdr tail) new-best))))))) + + +;;====================================================================== +;; END NEW SERVER METHOD +;;====================================================================== + +;; if .db/main.db check the pkts +;; +(define (rmt:wait-for-server pkts-dir db-file server-key) + (let* ((sdat *db-serv-info*)) + (let loop ((start-time (current-seconds)) + (changed #t) + (last-sdat "not this")) + (begin ;; let ((sdat #f)) + (thread-sleep! 0.01) + (debug:print-info 0 *default-log-port* "Waiting for server alive signature") + (mutex-lock! *heartbeat-mutex*) + (set! sdat *db-serv-info*) + (mutex-unlock! *heartbeat-mutex*) + (if (and sdat + (not changed) + (> (- (current-seconds) start-time) 2)) + (let* ((uconn (servdat-uconn sdat))) + (servdat-status-set! sdat 'iface-stable) + (debug:print-info 0 *default-log-port* "Received server alive signature, now attempting to lock in server") + ;; create a server pkt in *toppath*/.meta/srvpkts + + ;; TODO: + ;; 1. change sdat to stuct + ;; 2. add uuid to struct + ;; 3. update uuid in sdat here + ;; + (servdat-uuid-set! sdat + (register-server + pkts-dir *srvpktspec* + (get-host-name) + (servdat-port sdat) server-key + (servdat-host sdat) db-file)) + ;; (set! *my-signature* (servdat-uuid sdat)) ;; replace with Z, no, stick with proper key + ;; now read pkts and see if we are a contender + (let* ((all-pkts (get-all-server-pkts pkts-dir *srvpktspec*)) + (viables (get-viable-servers all-pkts db-file)) + (alive (remove-pkts-if-not-alive uconn viables)) + (best-srv (get-best-candidate alive db-file)) + (best-srv-key (if best-srv (alist-ref 'servkey best-srv) #f)) + (i-am-srv (equal? best-srv-key server-key)) + (delete-pkt (lambda () + (let* ((pktfile (conc (get-pkts-dir *toppath*) + "/" (servdat-uuid *db-serv-info*) + ".pkt"))) + (debug:print-info 0 *default-log-port* "Attempting to remove bogus pkt file "pktfile) + (delete-file* pktfile))))) ;; remove immediately instead of waiting for on-exit + (debug:print 0 *default-log-port* "best-srv-key: "best-srv-key", server-key: "server-key", i-am-srv: "i-am-srv) + ;; am I the best-srv, compare server-keys to know + (if i-am-srv + (if (get-lock-db sdat db-file (servdat-host sdat)(servdat-port sdat)) ;; (db:get-iam-server-lock *dbstruct-db* *toppath* run-id) + (begin + (debug:print-info 0 *default-log-port* "I'm the server!") + (servdat-dbfile-set! sdat db-file) + (servdat-status-set! sdat 'db-locked)) + (begin + (debug:print-info 0 *default-log-port* "I'm not the server, exiting.") + (bdat-time-to-exit-set! *bdat* #t) + (delete-pkt) + (thread-sleep! 0.2) + (exit))) + (begin + (debug:print-info 0 *default-log-port* + "Keys do not match "best-srv-key", "server-key", exiting.") + (bdat-time-to-exit-set! *bdat* #t) + (delete-pkt) + (thread-sleep! 0.2) + (exit))) + sdat)) + (begin ;; sdat not yet contains server info + (debug:print-info 0 *default-log-port* "Still waiting, last-sdat=" last-sdat) + (sleep 4) + (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes + (begin + (debug:print-error 0 *default-log-port* "transport appears to have died, exiting server") + (exit)) + (loop start-time + (equal? sdat last-sdat) + sdat)))))))) + +(define (rmt:register-server sinfo apath iface port server-key dbname) + (servdat-conns sinfo) ;; just checking types + (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db + (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath + (db:run-id->dbname #f) + 'register-server `(,iface + ,port + ,server-key + ,(current-process-id) + ,iface + ,apath + ,dbname))) + +(define (rmt:get-count-servers sinfo apath) + (servdat-conns sinfo) ;; just checking types + (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db + (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath + (db:run-id->dbname #f) + 'get-count-servers `(,apath))) + +(define (rmt:get-servers-info apath) + (rmt:send-receive 'get-servers-info #f `(,apath))) + +(define (rmt:deregister-server db-serv-info apath iface port server-key dbname) + (rmt:open-main-connection db-serv-info apath) ;; we need a channel to main.db + (rmt:send-receive-real db-serv-info apath ;; params: host port servkey pid ipaddr dbpath + (db:run-id->dbname #f) + 'deregister-server `(,iface + ,port + ,server-key + ,(current-process-id) + ,iface + ,apath + ,dbname))) + +(define (rmt:wait-for-stable-interface #!optional (num-tries-allowed 100)) + ;; wait until *db-serv-info* stops changing + (let* ((stime (current-seconds))) + (let loop ((last-host #f) + (last-port #f) + (tries 0)) + (let* ((curr-host (and *db-serv-info* (servdat-host *db-serv-info*))) + (curr-port (and *db-serv-info* (servdat-port *db-serv-info*)))) + ;; first we verify port and interface, update *db-serv-info* in need be. + (cond + ((> tries num-tries-allowed) + (debug:print 0 *default-log-port* "rmt:keep-running, giving up after trying for several minutes.") + (exit 1)) + ((not *db-serv-info*) + (thread-sleep! 0.25) + (loop curr-host curr-port (+ tries 1))) + ((or (not last-host)(not last-port)) + (debug:print 0 *default-log-port* "rmt:keep-running, still no interface, tries="tries) + (thread-sleep! 0.25) + (loop curr-host curr-port (+ tries 1))) + ((or (not (equal? last-host curr-host)) + (not (equal? last-port curr-port))) + (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info") + (thread-sleep! 0.25) + (loop curr-host curr-port (+ tries 1))) + ((< (- (current-seconds) stime) 1) ;; keep up the looping until at least 3 seconds have passed + (thread-sleep! 0.5) + (loop curr-host curr-port (+ tries 1))) + (else + (rmt:get-signature) ;; sets *my-signature* as side effect + (servdat-status-set! *db-serv-info* 'interface-stable) + (debug:print 0 *default-log-port* + "SERVER STARTED: " curr-host + ":" curr-port + " AT " (current-seconds) " server signature: " *my-signature* + " with "(servdat-trynum *db-serv-info*)" port changes") + (flush-output *default-log-port*) + #t)))))) + +;; run rmt:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (rmt:keep-running dbname) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server") + + (let* ((sinfo *db-serv-info*) + (server-start-time (current-seconds)) + (pkts-dir (get-pkts-dir)) + (server-key (rmt:get-signature)) ;; This servers key + (is-main (equal? (args:get-arg "-db") ".db/main.db")) + (last-access 0) + (server-timeout (server:expiration-timeout)) + (shutdown-server-sequence (lambda (host port) + (set! *unclean-shutdown* #f) ;; Should not be needed anymore + (debug:print-info 0 *default-log-port* "Starting to shutdown the server. pid="(current-process-id)) + ;; (rmt:server-shutdown host port) -- called in on-exit + ;; (portlogger:open-run-close portlogger:set-port port "released") called in on-exit + (exit))) + (timed-out? (lambda () + (<= (+ last-access server-timeout) + (current-seconds))))) + (servdat-dbfile-set! *db-serv-info* (args:get-arg "-db")) + ;; main and run db servers have both got wait logic (could/should merge it) + (if is-main + (rmt:wait-for-server pkts-dir dbname server-key) + (rmt:wait-for-stable-interface)) + ;; this is our forever loop + (let* ((iface (servdat-host *db-serv-info*)) + (port (servdat-port *db-serv-info*)) + (uconn (servdat-uconn *db-serv-info*))) + (let loop ((count 0) + (bad-sync-count 0) + (start-time (current-milliseconds))) + (if (and (not is-main) + (common:low-noise-print 60 "servdat-status")) + (debug:print-info 0 *default-log-port* "servdat-status is " (servdat-status *db-serv-info*))) + + (mutex-lock! *heartbeat-mutex*) + ;; set up the database handle + (if (not *dbstruct-db*) ;; no db opened yet, open the db and register with main if appropriate + (let ((watchdog (bdat-watchdog *bdat*))) + (debug:print 0 *default-log-port* "SERVER: dbprep") + (db:setup dbname) ;; sets *dbstruct-db* as side effect + (servdat-status-set! *db-serv-info* 'db-opened) + ;; IFF I'm not main, call into main and register self + (if (not is-main) + (let ((res (rmt:register-server sinfo + *toppath* iface port + server-key dbname))) + (if res ;; we are the server + (servdat-status-set! *db-serv-info* 'have-interface-and-db) + ;; now check that the db locker is alive, clear it out if not + (let* ((serv-info (rmt:server-info *toppath* dbname))) + (match serv-info + ((host port servkey pid ipaddr apath dbpath) + (if (not (server-ready? uconn (conc host":"port) servkey)) + (begin + (debug:print-info 0 *default-log-port* "Server registered but not alive. Removing and trying again.") + (rmt:deregister-server sinfo apath host port servkey dbpath) ;; servkey pid ipaddr apath dbpath) + (loop (+ count 1) bad-sync-count start-time)))) + (else + (debug:print 0 *default-log-port* "We are not the server for "dbname", exiting. Server info is: "serv-info) + (exit))))))) + (debug:print 0 *default-log-port* + "SERVER: running, db "dbname" opened, megatest version: " + (common:get-full-version)) + ;; start the watchdog + + ;; is this really needed? + + #;(if watchdog + (if (not (member (thread-state watchdog) + '(ready running blocked + sleeping dead))) + (begin + (debug:print-info 0 *default-log-port* "Starting watchdog thread (in state "(thread-state watchdog)")") + (thread-start! watchdog)) + (debug:print-info 0 *default-log-port* "Not starting watchdog thread (in state "(thread-state watchdog)")")) + (debug:print 0 *default-log-port* "ERROR: *watchdog* not setup, cannot start it.")) + #;(loop (+ count 1) bad-sync-count start-time) + )) + + (db:sync-inmem->disk *dbstruct-db* *toppath* dbname force-sync: #t) + + (mutex-unlock! *heartbeat-mutex*) + + ;; when things go wrong we don't want to be doing the various + ;; queries too often so we strive to run this stuff only every + ;; four seconds or so. + (let* ((sync-time (- (current-milliseconds) start-time)) + (rem-time (quotient (- 4000 sync-time) 1000))) + (if (and (<= rem-time 4) + (> rem-time 0)) + (thread-sleep! rem-time))) + + ;; Transfer *db-last-access* to last-access to use in checking that we are still alive + (set! last-access *db-last-access*) + + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1) bad-sync-count (current-milliseconds))) + + (if (common:low-noise-print 60 "dbstats") + (begin + (debug:print 0 *default-log-port* "Server stats:") + (db:print-current-query-stats))) + (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600))) + (cond + ((not *server-run*) + (debug:print-info 0 *default-log-port* "*server-run* set to #f. Shutting down.") + (shutdown-server-sequence (get-host-name) port)) + ((timed-out?) + (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access)) + (shutdown-server-sequence (get-host-name) port)) + ((and *server-run* + (or (not (timed-out?)) + (if is-main ;; do not exit if there are other servers (keep main open until all others gone) + (> (rmt:get-count-servers sinfo *toppath*) 1) + #f))) + (if (common:low-noise-print 120 "server continuing") + (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access))) + (loop 0 bad-sync-count (current-milliseconds))) + (else + (set! *unclean-shutdown* #f) + (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access)) + (shutdown-server-sequence (get-host-name) port) + #;(debug:print-info 0 *default-log-port* "Sending 'quit to server, received: " + (open-send-receive-nn (conc iface":"port) ;; do this here and not in server-shutdown + (sexpr->string 'quit)))))))))) + +(define (rmt:get-reasonable-hostname) + (let* ((inhost (or (args:get-arg "-server") "-"))) + (if (equal? inhost "-") + (get-host-name) + inhost))) + +;; Call this to start the actual server +;; +;; all routes though here end in exit ... +;; +;; This is the point at which servers are started +;; +(define (rmt:server-launch dbname) + (debug:print-info 0 *default-log-port* "Entered rmt:server-launch") + (let* ((th2 (make-thread (lambda () + (debug:print-info 0 *default-log-port* "Server run thread started") + (rmt:run (rmt:get-reasonable-hostname))) + "Server run")) + (th3 (make-thread (lambda () + (debug:print-info 0 *default-log-port* "Server monitor thread started") + (if (args:get-arg "-server") + (rmt:keep-running dbname))) + "Keep running"))) + (thread-start! th2) + (thread-sleep! 0.252) ;; give the server time to settle before starting the keep-running monitor. + (thread-start! th3) + (set! *didsomething* #t) + (thread-join! th2) + (thread-join! th3)) + #f) + +;;====================================================================== +;; S E R V E R - D I R E C T C A L L S +;;====================================================================== + +(define (rmt:kill-server run-id) + (rmt:send-receive 'kill-server #f (list run-id))) + +(define (rmt:start-server run-id) + (rmt:send-receive 'start-server #f (list run-id))) + +(define (rmt:server-info apath dbname) + (rmt:send-receive 'get-server-info #f (list apath dbname))) + +;;====================================================================== +;; Nanomsg transport +;;====================================================================== + +#;(define (is-port-in-use port-num) + (let* ((ret #f)) + (let-values (((inp oup pid) + (process "netstat" (list "-tulpn" )))) + (let loop ((inl (read-line inp))) + (if (not (eof-object? inl)) + (begin + (if (string-search (regexp (conc ":" port-num)) inl) + (begin + ;(print "Output: " inl) + (set! ret #t)) + (loop (read-line inp))))))) + ret)) + +#;(define (open-nn-connection host-port) + (let ((req (make-req-socket)) + (uri (conc "tcp://" host-port))) + (nng-dial req uri) + (socket-set! req 'nng/recvtimeo 2000) + req)) + +#;(define (send-receive-nn req msg) + (nng-send req msg) + (nng-recv req)) + +#;(define (close-nn-connection req) + (nng-close! req)) + +;; ;; open connection to server, send message, close connection +;; ;; +;; (define (open-send-close-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds +;; (let ((req (make-req-socket 'req)) +;; (uri (conc "tcp://" host-port)) +;; (res #f) +;; ;; (contacts (alist-ref 'contact attrib)) +;; ;; (mode (alist-ref 'mode attrib)) +;; ) +;; (socket-set! req 'nng/recvtimeo 2000) +;; (handle-exceptions +;; exn +;; (let ((emsg ((condition-property-accessor 'exn 'message) exn))) +;; ;; Send notification +;; (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\"" ) +;; #f) +;; (nng-dial req uri) +;; ;; (print "Connected to the server " ) +;; (nng-send req msg) +;; ;; (print "Request Sent") +;; (let* ((th1 (make-thread (lambda () +;; (let ((resp (nng-recv req))) +;; (nng-close! req) +;; (set! res (if (equal? resp "ok") +;; #t +;; #f)))) +;; "recv thread")) +;; (th2 (make-thread (lambda () +;; (thread-sleep! timeout) +;; (thread-terminate! th1)) +;; "timer thread"))) +;; (thread-start! th1) +;; (thread-start! th2) +;; (thread-join! th1) +;; res)))) +;; +#;(define (open-send-receive-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds + (let ((req (make-req-socket)) + (uri (conc "tcp://" host-port)) + (res #f)) + (handle-exceptions + exn + (let ((emsg ((condition-property-accessor 'exn 'message) exn))) + ;; Send notification + (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\", exn=" exn) + #f) + (nng-dial req uri) + (nng-send req msg) + (let* ((th1 (make-thread (lambda () + (let ((resp (nng-recv req))) + (nng-close! req) + ;; (print resp) + (set! res resp))) + "recv thread")) + (th2 (make-thread (lambda () + (thread-sleep! timeout) + (thread-terminate! th1)) + "timer thread"))) + (thread-start! th1) + (thread-start! th2) + (thread-join! th1) + res)))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +;; run ping in separate process, safest way in some cases +;; +#;(define (server:ping-server ifaceport) + (with-input-from-pipe + (conc (common:get-megatest-exe) " -ping " ifaceport) + (lambda () + (let loop ((inl (read-line)) + (res "NOREPLY")) + (if (eof-object? inl) + (case (string->symbol res) + ((NOREPLY) #f) + ((LOGIN_OK) #t) + (else #f)) + (loop (read-line) inl)))))) + +;; NOT USED (well, ok, reference in rpc-transport but otherwise not used). +;; +#;(define (server:login toppath) + (lambda (toppath) + (set! *db-last-access* (current-seconds)) ;; might not be needed. + (if (equal? *toppath* toppath) + #t + #f))) + +;; (define server:sync-lock-token "SERVER_SYNC_LOCK") +;; (define (server:release-sync-lock) +;; (db:no-sync-del! *no-sync-db* server:sync-lock-token)) +;; (define (server:have-sync-lock?) +;; (let* ((have-lock-pair (db:no-sync-get-lock *no-sync-db* server:sync-lock-token)) +;; (have-lock? (car have-lock-pair)) +;; (lock-time (cdr have-lock-pair)) +;; (lock-age (- (current-seconds) lock-time))) +;; (cond +;; (have-lock? #t) +;; ((>lock-age +;; (* 3 (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 180))) +;; (server:release-sync-lock) +;; (server:have-sync-lock?)) +;; (else #f)))) + +) Index: ulex-simple/ulex.scm ================================================================== --- ulex-simple/ulex.scm +++ ulex-simple/ulex.scm @@ -24,11 +24,12 @@ ;; Why sql-de-lite and not say, dbi? - performance mostly, then simplicity. ;; ;;====================================================================== (module ulex - ( + * + #;( ;; NOTE: looking for the handler proc - find the run-listener :) run-listener ;; (run-listener handler-proc [port]) => uconn @@ -50,15 +51,20 @@ udat-port udat-host-port ;; for testing only ;; pp-uconn + + ;; parameters + work-method ;; parameter; 'threads, 'mailbox, 'limited, 'direct + return-method ;; parameter; 'mailbox, 'polling, 'direct ) (import scheme chicken.base chicken.file + chicken.io chicken.time chicken.condition chicken.string chicken.sort chicken.pretty-print @@ -67,10 +73,11 @@ mailbox matchable ;; queues regex regex-case + simple-exceptions s11n srfi-1 srfi-18 srfi-4 srfi-69 @@ -94,347 +101,469 @@ (work-proc #f) ;; set by user (cnum 0) ;; cookie number (mboxes (make-hash-table)) ;; for the replies (avail-cmboxes '()) ;; list of ( . ) for re-use ;; threads - (numthreads 50) - (cmd-thread #f) - (work-queue-thread #f) - ) - -;; ;; struct for keeping track of others we are talking to -;; ;; -;; (defstruct pdat -;; (host-port #f) -;; (conns '()) ;; list of pcon structs, pop one off when calling the peer -;; ) -;; -;; ;; struct for peer connections, keep track of expiration etc. -;; ;; -;; (defstruct pcon -;; (inp #f) -;; (oup #f) -;; (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59) -;; (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes -;; ) - -;;====================================================================== -;; listener -;;====================================================================== - -;; is uconn a ulex connector (listener) -;; -(define (ulex-listener? uconn) - (udat? uconn)) - -;; create a tcp listener and return a populated udat struct with -;; my port, address, hostname, pid etc. -;; return #f if fail to find a port to allocate. -;; -;; if udata-in is #f create the record -;; if there is already a serv-listener return the udata -;; -(define (setup-listener uconn #!optional (port 4242)) - (handle-exceptions - exn - (if (< port 65535) - (setup-listener uconn (+ port 1)) - #f) - (connect-listener uconn port))) - -(define (connect-listener uconn port) - ;; (tcp-listener-socket LISTENER)(socket-name so) - ;; sockaddr-address, sockaddr-port, sockaddr->string - (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]]) - (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname))) - (udat-port-set! uconn port) - (udat-host-port-set! uconn (conc addr":"port)) - (udat-socket-set! uconn tlsn) - uconn)) - -;; run-listener does all the work of starting a listener in a thread -;; it then returns control -;; -(define (run-listener handler-proc #!optional (port-suggestion 4242)) - (let* ((uconn (make-udat))) - (udat-work-proc-set! uconn handler-proc) - (if (setup-listener uconn port-suggestion) - (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop")) - #;(th2 (make-thread (lambda ()(process-work-queue uconn)) "Ulex work queue processor"))) - (tcp-buffer-size 2048) - ;; (max-connections 2048) - (thread-start! th1) - #;(thread-start! th2) - (udat-cmd-thread-set! uconn th1) - #;(udat-work-queue-thread-set! uconn th2) - (print "cmd loop and process workers started") - uconn) - (assert #f "ERROR: run-listener called without proper setup.")))) - -(define (wait-and-close uconn) - (thread-join! (udat-cmd-thread uconn)) - (tcp-close (udat-socket uconn))) - -;;====================================================================== -;; peers and connections -;;====================================================================== - -(define *send-mutex* (make-mutex)) - -;; send structured data to recipient -;; -;; NOTE: qrykey is what was called the "cookie" previously -;; -;; retval tells send to expect and wait for return data (one line) and return it or time out -;; this is for ping where we don't want to necessarily have set up our own server yet. -;; -;; NOTE: see below for beginnings of code to allow re-use of tcp connections -;; - I believe (without substantial evidence) that re-using connections will -;; be beneficial ... -;; -(define (send udata host-port qrykey cmd params) - (mutex-lock! *send-mutex*) - (let* ((my-host-port (udat-host-port udata)) ;; remote will return to this - (isme #f #;(equal? host-port my-host-port)) ;; calling myself? - ;; dat is a self-contained work block that can be sent or handled locally - (dat (list my-host-port qrykey cmd params))) - (if isme - (ulex-handler udata dat) ;; no transmission needed - (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC? - exn - #f - (let-values (((inp oup)(tcp-connect host-port))) - (let ((res (if (and inp oup) - (begin - (serialize dat oup) - (deserialize inp)) ;; yes, we always want an ack - (begin - (print "ERROR: send called but no receiver has been setup. Please call setup first!") - #f)))) - (close-input-port inp) - (close-output-port oup) - (mutex-unlock! *send-mutex*) - res)))))) ;; res will always be 'ack - -;; send a request to the given host-port and register a mailbox in udata -;; wait for the mailbox data and return it -;; -(define (send-receive uconn host-port cmd data) - (cond - ((member cmd '(ping goodbye)) ;; these are immediate - (send uconn host-port 'ping cmd data)) - (else - (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse? - (qrykey (car cmbox)) - (mbox (cdr cmbox)) - (mbox-time (current-milliseconds)) - (sres (send uconn host-port qrykey cmd data))) ;; short res - sres)))) - -;;====================================================================== -;; responder side -;;====================================================================== - -;; take a request, rdat, and if not immediate put it in the work queue -;; -;; Reserved cmds; ack ping goodbye response -;; -(define (ulex-handler uconn rdat) - (assert (list? rdat) "FATAL: ulex-handler give rdat as not list") - (match rdat ;; (string-split controldat) - ((rem-host-port qrykey cmd params) - ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params) - (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) - (case cmd - ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack) - ((ping) - ;; (print "Got Ping!") - ;; (add-to-work-queue uconn rdat) - 'ack) - (else - (do-work uconn rdat))))) - (else - (print "BAD DATA? controldat=" rdat) - 'ack) ;; send ack anyway? - )) - -;; given an already set up uconn start the cmd-loop -;; -(define (ulex-cmd-loop uconn) - (let* ((serv-listener (udat-socket uconn))) - (let loop ((state 'start)) - (let-values (((inp oup)(tcp-accept serv-listener))) - (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params) - (resp (ulex-handler uconn rdat))) - (if resp (serialize resp oup)) - (close-input-port inp) - (close-output-port oup)) - (loop state))))) -;;(define (ulex-cmd-loop uconn) -;; (let* ((serv-listener (udat-socket uconn)) -;; ;; (old-listener (lambda () -;; ;; (let loop ((state 'start)) -;; ;; (let-values (((inp oup)(tcp-accept serv-listener))) -;; ;; (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params) -;; ;; (resp (ulex-handler uconn rdat))) -;; ;; (if resp (serialize resp oup)) -;; ;; (close-input-port inp) -;; ;; (close-output-port oup)) -;; ;; (loop state))))) -;; (server (make-tcp-server -;; serv-listener -;; (lambda () -;; (let* ((rdat (deserialize )) ;; '(my-host-port qrykey cmd params) -;; (resp (ulex-handler uconn rdat))) -;; (if resp (serialize resp) resp)))))) -;; (server))) - -;; add a proc to the cmd list, these are done symetrically (i.e. in all instances) -;; so that the proc can be dereferenced remotely -;; -(define (set-work-handler uconn proc) - (udat-work-proc-set! uconn proc)) - -;;====================================================================== -;; work queues - this is all happening on the listener side -;;====================================================================== - -;; rdat is (rem-host-port qrykey cmd params) - -(define (add-to-work-queue uconn rdat) - #;(queue-add! (udat-work-queue uconn) rdat) - (mailbox-send! (udat-work-queue uconn) rdat)) - -(define (do-work uconn rdat) - (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change - ;; put this following into a do-work procedure - (match rdat - ((rem-host-port qrykey cmd params) - (let* ((start-time (current-milliseconds)) - (result (proc rem-host-port qrykey cmd params)) - (end-time (current-milliseconds)) - (run-time (- end-time start-time))) - result)) - (else - (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params") - #f)))) - -(define (process-work-queue uconn) - (let ((wqueue (udat-work-queue uconn)) - (proc (udat-work-proc uconn)) - (numthr (udat-numthreads uconn))) - (let loop ((thnum 1) - (threads '())) - (let ((thlst (cons (make-thread (lambda () - (let work-loop () - (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT))) - (do-work uconn rdat)) - (work-loop))) - (conc "work thread " thnum)) - threads))) - (if (< thnum numthr) - (loop (+ thnum 1) - thlst) - (begin - (print "ULEX: Starting "(length thlst)" worker threads.") - (map thread-start! thlst) - (print "ULEX: Threads started. Joining all.") - (map thread-join! thlst))))))) - -;; below was to enable re-use of connections. This seems non-trivial so for -;; now lets open on each call -;; -;; ;; given host-port get or create peer struct -;; ;; -;; (define (udat-get-peer uconn host-port) -;; (or (hash-table-ref/default (udat-peers uconn) host-port #f) -;; ;; no peer, so create pdat and init it -;; -;; ;; NEED stack of connections, pop and use; inp, oup, -;; ;; creation_time (remove and create new if over 24hrs old -;; ;; -;; (let ((pdat (make-pdat host-port: host-port))) -;; (hash-table-set! (udat-peers uconn) host-port pdat) -;; pdat))) -;; -;; ;; is pcon alive -;; -;; ;; given host-port and pdat get a pcon -;; ;; -;; (define (pdat-get-pcon pdat host-port) -;; (let loop ((conns (pdat-conns pdat))) -;; (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later -;; (init-pcon (make-pcon)) -;; (let* ((conn (pop conns))) -;; -;; ;; given host-port get a pcon struct -;; ;; -;; (define (udat-get-pcon - -;;====================================================================== -;; misc utils -;;====================================================================== - -(define (make-cookie uconn) - (let ((newcnum (+ (udat-cnum uconn) 1))) - (udat-cnum-set! uconn newcnum) - (conc (udat-host-port uconn) ":" - newcnum))) - -;; cookie/mboxes - -;; we store each mbox with a cookie ( . ) -;; -(define (get-cmbox uconn) - (if (null? (udat-avail-cmboxes uconn)) - (let ((cookie (make-cookie uconn)) - (mbox (make-mailbox))) - (hash-table-set! (udat-mboxes uconn) cookie mbox) - `(,cookie . ,mbox)) - (let ((cmbox (car (udat-avail-cmboxes uconn)))) - (udat-avail-cmboxes-set! uconn (cdr (udat-avail-cmboxes uconn))) - cmbox))) - -(define (put-cmbox uconn cmbox) - (udat-avail-cmboxes-set! uconn (cons cmbox (udat-avail-cmboxes uconn)))) - -(define (pp-uconn uconn) - (pp (udat->alist uconn))) - - -;;====================================================================== -;; network utilities -;;====================================================================== - -;; NOTE: Look at address-info egg as alternative to some of this - -(define (rate-ip ipaddr) - (regex-case ipaddr - ( "^127\\..*" _ 0 ) - ( "^(10\\.0|192\\.168)\\..*" _ 1 ) - ( else 2 ) )) - -;; Change this to bias for addresses with a reasonable broadcast value? -;; -(define (ip-pref-less? a b) - (> (rate-ip a) (rate-ip b))) - -(define (get-my-best-address) - (let ((all-my-addresses (get-all-ips))) - (cond - ((null? all-my-addresses) - (get-host-name)) ;; no interfaces? - ((eq? (length all-my-addresses) 1) - (car all-my-addresses)) ;; only one to choose from, just go with it - (else - (car (sort all-my-addresses ip-pref-less?)))))) - -(define (get-all-ips-sorted) - (sort (get-all-ips) ip-pref-less?)) - -(define (get-all-ips) - (map address-info-host - (filter (lambda (x) - (equal? (address-info-type x) "tcp")) - (address-infos (get-host-name))))) - + (numthreads 10) + (cmd-thread #f) + (work-queue-thread #f) + (num-threads-running 0) + ) + +;; == << ;; Parameters +;; == << +;; == << ;; work-method: +;; == << (define work-method (make-parameter 'mailbox)) +;; == << ;; mailbox - all rdat goes through mailbox +;; == << ;; threads - all rdat immediately executed in new thread +;; == << ;; direct - no queuing +;; == << ;; +;; == << +;; == << ;; return-method, return the result to waiting send-receive: +;; == << (define return-method (make-parameter 'mailbox)) +;; == << ;; mailbox - create a mailbox and use it for passing returning results to send-receive +;; == << ;; polling - put the result in a hash table keyed by qrykey and send-receive can poll it for result +;; == << ;; direct - no queuing, result is passed back in single tcp connection +;; == << ;; +;; == << +;; == << ;; ;; struct for keeping track of others we are talking to +;; == << ;; ;; +;; == << ;; (defstruct pdat +;; == << ;; (host-port #f) +;; == << ;; (conns '()) ;; list of pcon structs, pop one off when calling the peer +;; == << ;; ) +;; == << ;; +;; == << ;; ;; struct for peer connections, keep track of expiration etc. +;; == << ;; ;; +;; == << ;; (defstruct pcon +;; == << ;; (inp #f) +;; == << ;; (oup #f) +;; == << ;; (exp (+ (current-seconds) 59)) ;; expires at this time, set to (+ (current-seconds) 59) +;; == << ;; (lifetime (+ (current-seconds) 600)) ;; throw away and create new after five minutes +;; == << ;; ) +;; == << +;; == << ;;====================================================================== +;; == << ;; listener +;; == << ;;====================================================================== +;; == << +;; == << ;; is uconn a ulex connector (listener) +;; == << ;; +;; == << (define (ulex-listener? uconn) +;; == << (udat? uconn)) +;; == << +;; == << ;; create a tcp listener and return a populated udat struct with +;; == << ;; my port, address, hostname, pid etc. +;; == << ;; return #f if fail to find a port to allocate. +;; == << ;; +;; == << ;; if udata-in is #f create the record +;; == << ;; if there is already a serv-listener return the udata +;; == << ;; +;; == << (define (setup-listener uconn #!optional (port 4242)) +;; == << (handle-exceptions +;; == << exn +;; == << (if (< port 65535) +;; == << (setup-listener uconn (+ port 1)) +;; == << #f) +;; == << (connect-listener uconn port))) +;; == << +;; == << (define (connect-listener uconn port) +;; == << ;; (tcp-listener-socket LISTENER)(socket-name so) +;; == << ;; sockaddr-address, sockaddr-port, sockaddr->string +;; == << (let* ((tlsn (tcp-listen port 1000 #f)) ;; (tcp-listen TCPPORT [BACKLOG [HOST]]) +;; == << (addr (get-my-best-address))) ;; (hostinfo-addresses (host-information (current-hostname))) +;; == << (udat-port-set! uconn port) +;; == << (udat-host-port-set! uconn (conc addr":"port)) +;; == << (udat-socket-set! uconn tlsn) +;; == << uconn)) +;; == << +;; == << ;; run-listener does all the work of starting a listener in a thread +;; == << ;; it then returns control +;; == << ;; +;; == << (define (run-listener handler-proc #!optional (port-suggestion 4242)) +;; == << (let* ((uconn (make-udat))) +;; == << (udat-work-proc-set! uconn handler-proc) +;; == << (if (setup-listener uconn port-suggestion) +;; == << (let* ((th1 (make-thread (lambda ()(ulex-cmd-loop uconn)) "Ulex command loop")) +;; == << (th2 (make-thread (lambda () +;; == << (case (work-method) +;; == << ((mailbox limited) +;; == << (process-work-queue uconn)))) +;; == << "Ulex work queue processor"))) +;; == << ;; (tcp-buffer-size 2048) +;; == << (thread-start! th1) +;; == << (thread-start! th2) +;; == << (udat-cmd-thread-set! uconn th1) +;; == << (udat-work-queue-thread-set! uconn th2) +;; == << (print "cmd loop and process workers started, listening on "(udat-host-port uconn)".") +;; == << uconn) +;; == << (assert #f "ERROR: run-listener called without proper setup.")))) +;; == << +;; == << (define (wait-and-close uconn) +;; == << (thread-join! (udat-cmd-thread uconn)) +;; == << (tcp-close (udat-socket uconn))) +;; == << +;; == << ;;====================================================================== +;; == << ;; peers and connections +;; == << ;;====================================================================== +;; == << +;; == << (define *send-mutex* (make-mutex)) +;; == << +;; == << ;; send structured data to recipient +;; == << ;; +;; == << ;; NOTE: qrykey is what was called the "cookie" previously +;; == << ;; +;; == << ;; retval tells send to expect and wait for return data (one line) and return it or time out +;; == << ;; this is for ping where we don't want to necessarily have set up our own server yet. +;; == << ;; +;; == << ;; NOTE: see below for beginnings of code to allow re-use of tcp connections +;; == << ;; - I believe (without substantial evidence) that re-using connections will +;; == << ;; be beneficial ... +;; == << ;; +;; == << (define (send udata host-port qrykey cmd params) +;; == << (let* ((my-host-port (udat-host-port udata)) ;; remote will return to this +;; == << (isme #f #;(equal? host-port my-host-port)) ;; calling myself? +;; == << ;; dat is a self-contained work block that can be sent or handled locally +;; == << (dat (list my-host-port qrykey cmd params #;(cons (current-seconds)(current-milliseconds))))) +;; == << (cond +;; == << (isme (ulex-handler udata dat)) ;; no transmission needed +;; == << (else +;; == << (handle-exceptions ;; TODO - MAKE THIS EXCEPTION CMD SPECIFIC? +;; == << exn +;; == << (message exn) +;; == << (begin +;; == << ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP +;; == << (let-values (((inp oup)(tcp-connect host-port))) +;; == << (let ((res (if (and inp oup) +;; == << (begin +;; == << (serialize dat oup) +;; == << (close-output-port oup) +;; == << (deserialize inp) +;; == << ) +;; == << (begin +;; == << (print "ERROR: send called but no receiver has been setup. Please call setup first!") +;; == << #f)))) +;; == << (close-input-port inp) +;; == << ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP +;; == << res)))))))) ;; res will always be 'ack unless return-method is direct +;; == << +;; == << (define (send-via-polling uconn host-port cmd data) +;; == << (let* ((qrykey (make-cookie uconn)) +;; == << (sres (send uconn host-port qrykey cmd data))) +;; == << (case sres +;; == << ((ack) +;; == << (let loop ((start-time (current-milliseconds))) +;; == << (if (> (current-milliseconds)(+ start-time 10000)) ;; ten seconds timeout +;; == << (begin +;; == << (print "ULEX ERROR: timed out waiting for response from "host-port", "cmd" "data) +;; == << #f) +;; == << (let* ((result (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) ;; NOTE: we are re-using mboxes hash +;; == << (if result ;; result is '(status . result-data) or #f for nothing yet +;; == << (begin +;; == << (hash-table-delete! (udat-mboxes uconn) qrykey) +;; == << (cdr result)) +;; == << (begin +;; == << (thread-sleep! 0.01) +;; == << (loop start-time))))))) +;; == << (else +;; == << (print "ULEX ERROR: Communication failed? sres="sres) +;; == << #f)))) +;; == << +;; == << (define (send-via-mailbox uconn host-port cmd data) +;; == << (let* ((cmbox (get-cmbox uconn)) ;; would it be better to keep a stack of mboxes to reuse? +;; == << (qrykey (car cmbox)) +;; == << (mbox (cdr cmbox)) +;; == << (mbox-time (current-milliseconds)) +;; == << (sres (send uconn host-port qrykey cmd data))) ;; short res +;; == << (if (eq? sres 'ack) +;; == << (let* ((mbox-timeout-secs 120 #;(if (eq? 'primordial (thread-name (current-thread))) +;; == << #f +;; == << 120)) ;; timeout) +;; == << (mbox-timeout-result 'MBOX_TIMEOUT) +;; == << (res (mailbox-receive! mbox mbox-timeout-secs mbox-timeout-result)) +;; == << (mbox-receive-time (current-milliseconds))) +;; == << ;; (put-cmbox uconn cmbox) ;; reuse mbox and cookie. is it worth it? +;; == << (hash-table-delete! (udat-mboxes uconn) qrykey) +;; == << (if (eq? res 'MBOX_TIMEOUT) +;; == << (begin +;; == << (print "WARNING: mbox timed out for query "cmd", with data "data +;; == << ", waiting for response from "host-port".") +;; == << +;; == << ;; here it might make sense to clean up connection records and force clean start? +;; == << ;; NO. The progam using ulex needs to do the reset. Right thing here is exception +;; == << +;; == << #f) ;; convert to raising exception? +;; == << res)) +;; == << (begin +;; == << (print "ERROR: Communication failed? Got "sres) +;; == << #f)))) +;; == << +;; == << ;; send a request to the given host-port and register a mailbox in udata +;; == << ;; wait for the mailbox data and return it +;; == << ;; +;; == << (define (send-receive uconn host-port cmd data) +;; == << (let* ((start-time (current-milliseconds)) +;; == << (result (cond +;; == << ((member cmd '(ping goodbye)) ;; these are immediate +;; == << (send uconn host-port 'ping cmd data)) +;; == << ((eq? (work-method) 'direct) +;; == << ;; the result from send will be the actual result, not an 'ack +;; == << (send uconn host-port 'direct cmd data)) +;; == << (else +;; == << (case (return-method) +;; == << ((polling) +;; == << (send-via-polling uconn host-port cmd data)) +;; == << ((mailbox) +;; == << (send-via-mailbox uconn host-port cmd data)) +;; == << (else +;; == << (print "ULEX ERROR: unrecognised return-method "(return-method)".") +;; == << #f))))) +;; == << (duration (- (current-milliseconds) start-time))) +;; == << ;; this is ONLY for development and debugging. It will be removed once Ulex is stable. +;; == << (if (< 5000 duration) +;; == << (print "ULEX WARNING: round-trip took "(inexact->exact (round (/ duration 1000))) +;; == << " seconds; "cmd", host-port="host-port", data="data)) +;; == << result)) +;; == << +;; == << +;; == << ;;====================================================================== +;; == << ;; responder side +;; == << ;;====================================================================== +;; == << +;; == << ;; take a request, rdat, and if not immediate put it in the work queue +;; == << ;; +;; == << ;; Reserved cmds; ack ping goodbye response +;; == << ;; +;; == << (define (ulex-handler uconn rdat) +;; == << (assert (list? rdat) "FATAL: ulex-handler give rdat as not list") +;; == << (match rdat ;; (string-split controldat) +;; == << ((rem-host-port qrykey cmd params);; timedata) +;; == << ;; (print "ulex-handler got: "rem-host-port" qrykey: "qrykey" cmd: "cmd" params: "params) +;; == << (case cmd +;; == << ;; ((ack )(print "Got ack! But why? Should NOT get here.") 'ack) +;; == << ((ping) +;; == << ;; (print "Got Ping!") +;; == << ;; (add-to-work-queue uconn rdat) +;; == << 'ack) +;; == << ((goodbye) +;; == << ;; just clear out references to the caller. NOT COMPLETE +;; == << (add-to-work-queue uconn rdat) +;; == << 'ack) +;; == << ((response) ;; this is a result from remote processing, send it as mail ... +;; == << (case (return-method) +;; == << ((polling) +;; == << (hash-table-set! (udat-mboxes uconn) qrykey (cons 'ok params)) +;; == << 'ack) +;; == << ((mailbox) +;; == << (let ((mbox (hash-table-ref/default (udat-mboxes uconn) qrykey #f))) +;; == << (if mbox +;; == << (begin +;; == << (mailbox-send! mbox params) ;; params here is our result +;; == << 'ack) +;; == << (begin +;; == << (print "ERROR: received result but no associated mbox for cookie "qrykey) +;; == << 'no-mbox-found)))) +;; == << (else (print "ULEX ERROR: unrecognised return-method "(return-method)) +;; == << 'bad-return-method))) +;; == << (else ;; generic request - hand it to the work queue +;; == << (add-to-work-queue uconn rdat) +;; == << 'ack))) +;; == << (else +;; == << (print "ULEX ERROR: bad rdat "rdat) +;; == << 'bad-rdat))) +;; == << +;; == << ;; given an already set up uconn start the cmd-loop +;; == << ;; +;; == << (define (ulex-cmd-loop uconn) +;; == << (let* ((serv-listener (udat-socket uconn)) +;; == << (listener (lambda () +;; == << (let loop ((state 'start)) +;; == << (let-values (((inp oup)(tcp-accept serv-listener))) +;; == << ;; (mutex-lock! *send-mutex*) ;; DOESN'T SEEM TO HELP +;; == << (let* ((rdat (deserialize inp)) ;; '(my-host-port qrykey cmd params) +;; == << (resp (ulex-handler uconn rdat))) +;; == << (serialize resp oup) +;; == << (close-input-port inp) +;; == << (close-output-port oup) +;; == << ;; (mutex-unlock! *send-mutex*) ;; DOESN'T SEEM TO HELP +;; == << ) +;; == << (loop state)))))) +;; == << ;; start N of them +;; == << (let loop ((thnum 0) +;; == << (threads '())) +;; == << (if (< thnum 100) +;; == << (let* ((th (make-thread listener (conc "listener" thnum)))) +;; == << (thread-start! th) +;; == << (loop (+ thnum 1) +;; == << (cons th threads))) +;; == << (map thread-join! threads))))) +;; == << +;; == << ;; add a proc to the cmd list, these are done symetrically (i.e. in all instances) +;; == << ;; so that the proc can be dereferenced remotely +;; == << ;; +;; == << (define (set-work-handler uconn proc) +;; == << (udat-work-proc-set! uconn proc)) +;; == << +;; == << ;;====================================================================== +;; == << ;; work queues - this is all happening on the listener side +;; == << ;;====================================================================== +;; == << +;; == << ;; rdat is (rem-host-port qrykey cmd params) +;; == << +;; == << (define (add-to-work-queue uconn rdat) +;; == << #;(queue-add! (udat-work-queue uconn) rdat) +;; == << (case (work-method) +;; == << ((threads) +;; == << (thread-start! (make-thread (lambda () +;; == << (do-work uconn rdat)) +;; == << "worker thread"))) +;; == << ((mailbox) +;; == << (mailbox-send! (udat-work-queue uconn) rdat)) +;; == << ((direct) +;; == << (do-work uconn rdat)) +;; == << (else +;; == << (print "ULEX ERROR: work-method "(work-method)" not recognised, using mailbox.") +;; == << (mailbox-send! (udat-work-queue uconn) rdat)))) +;; == << +;; == << ;; move the logic to return the result somewhere else? +;; == << ;; +;; == << (define (do-work uconn rdat) +;; == << (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change +;; == << ;; put this following into a do-work procedure +;; == << (match rdat +;; == << ((rem-host-port qrykey cmd params) +;; == << (let* ((start-time (current-milliseconds)) +;; == << (result (proc rem-host-port qrykey cmd params)) +;; == << (end-time (current-milliseconds)) +;; == << (run-time (- end-time start-time))) +;; == << (case (work-method) +;; == << ((direct) result) +;; == << (else +;; == << (print "ULEX: work "cmd", "params" done in "run-time" ms") +;; == << ;; send 'response as cmd and result as params +;; == << (send uconn rem-host-port qrykey 'response result) ;; could check for ack +;; == << (print "ULEX: response sent back to "rem-host-port" in "(- (current-milliseconds) end-time)))))) +;; == << (MBOX_TIMEOUT 'do-work-timeout) +;; == << (else +;; == << (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params"))))) +;; == << +;; == << ;; NEW APPROACH: +;; == << ;; +;; == << (define (process-work-queue uconn) +;; == << (let ((wqueue (udat-work-queue uconn)) +;; == << (proc (udat-work-proc uconn)) +;; == << (numthr (udat-numthreads uconn))) +;; == << (let loop ((thnum 1) +;; == << (threads '())) +;; == << (let ((thlst (cons (make-thread (lambda () +;; == << (let work-loop () +;; == << (let ((rdat (mailbox-receive! wqueue 24000 'MBOX_TIMEOUT))) +;; == << (do-work uconn rdat)) +;; == << (work-loop))) +;; == << (conc "work thread " thnum)) +;; == << threads))) +;; == << (if (< thnum numthr) +;; == << (loop (+ thnum 1) +;; == << thlst) +;; == << (begin +;; == << (print "ULEX: Starting "(length thlst)" worker threads.") +;; == << (map thread-start! thlst) +;; == << (print "ULEX: Threads started. Joining all.") +;; == << (map thread-join! thlst))))))) +;; == << +;; == << ;; below was to enable re-use of connections. This seems non-trivial so for +;; == << ;; now lets open on each call +;; == << ;; +;; == << ;; ;; given host-port get or create peer struct +;; == << ;; ;; +;; == << ;; (define (udat-get-peer uconn host-port) +;; == << ;; (or (hash-table-ref/default (udat-peers uconn) host-port #f) +;; == << ;; ;; no peer, so create pdat and init it +;; == << ;; +;; == << ;; ;; NEED stack of connections, pop and use; inp, oup, +;; == << ;; ;; creation_time (remove and create new if over 24hrs old +;; == << ;; ;; +;; == << ;; (let ((pdat (make-pdat host-port: host-port))) +;; == << ;; (hash-table-set! (udat-peers uconn) host-port pdat) +;; == << ;; pdat))) +;; == << ;; +;; == << ;; ;; is pcon alive +;; == << ;; +;; == << ;; ;; given host-port and pdat get a pcon +;; == << ;; ;; +;; == << ;; (define (pdat-get-pcon pdat host-port) +;; == << ;; (let loop ((conns (pdat-conns pdat))) +;; == << ;; (if (null? conns) ;; none? make and return - do NOT add - it will be pushed back on list later +;; == << ;; (init-pcon (make-pcon)) +;; == << ;; (let* ((conn (pop conns))) +;; == << ;; +;; == << ;; ;; given host-port get a pcon struct +;; == << ;; ;; +;; == << ;; (define (udat-get-pcon +;; == << +;; == << ;;====================================================================== +;; == << ;; misc utils +;; == << ;;====================================================================== +;; == << +;; == << (define (make-cookie uconn) +;; == << (let ((newcnum (+ (udat-cnum uconn) 1))) +;; == << (udat-cnum-set! uconn newcnum) +;; == << (conc (udat-host-port uconn) ":" +;; == << newcnum))) +;; == << +;; == << ;; cookie/mboxes +;; == << +;; == << ;; we store each mbox with a cookie ( . ) +;; == << ;; +;; == << (define (get-cmbox uconn) +;; == << (if (null? (udat-avail-cmboxes uconn)) +;; == << (let ((cookie (make-cookie uconn)) +;; == << (mbox (make-mailbox))) +;; == << (hash-table-set! (udat-mboxes uconn) cookie mbox) +;; == << `(,cookie . ,mbox)) +;; == << (let ((cmbox (car (udat-avail-cmboxes uconn)))) +;; == << (udat-avail-cmboxes-set! uconn (cdr (udat-avail-cmboxes uconn))) +;; == << cmbox))) +;; == << +;; == << (define (put-cmbox uconn cmbox) +;; == << (udat-avail-cmboxes-set! uconn (cons cmbox (udat-avail-cmboxes uconn)))) +;; == << +;; == << (define (pp-uconn uconn) +;; == << (pp (udat->alist uconn))) +;; == << +;; == << +;; == << ;;====================================================================== +;; == << ;; network utilities +;; == << ;;====================================================================== +;; == << +;; == << ;; NOTE: Look at address-info egg as alternative to some of this +;; == << +;; == << (define (rate-ip ipaddr) +;; == << (regex-case ipaddr +;; == << ( "^127\\..*" _ 0 ) +;; == << ( "^(10\\.0|192\\.168)\\..*" _ 1 ) +;; == << ( else 2 ) )) +;; == << +;; == << ;; Change this to bias for addresses with a reasonable broadcast value? +;; == << ;; +;; == << (define (ip-pref-less? a b) +;; == << (> (rate-ip a) (rate-ip b))) +;; == << +;; == << (define (get-my-best-address) +;; == << (let ((all-my-addresses (get-all-ips))) +;; == << (cond +;; == << ((null? all-my-addresses) +;; == << (get-host-name)) ;; no interfaces? +;; == << ((eq? (length all-my-addresses) 1) +;; == << (car all-my-addresses)) ;; only one to choose from, just go with it +;; == << (else +;; == << (car (sort all-my-addresses ip-pref-less?)))))) +;; == << +;; == << (define (get-all-ips-sorted) +;; == << (sort (get-all-ips) ip-pref-less?)) +;; == << +;; == << (define (get-all-ips) +;; == << (map address-info-host +;; == << (filter (lambda (x) +;; == << (equal? (address-info-type x) "tcp")) +;; == << (address-infos (get-host-name))))) +;; == << ) DELETED ulex.scm Index: ulex.scm ================================================================== --- ulex.scm +++ /dev/null @@ -1,24 +0,0 @@ -;;====================================================================== -;; Copyright 2019, Matthew Welland. -;; -;; This file is part of Megatest. -;; -;; Megatest is free software: you can redistribute it and/or modify -;; it under the terms of the GNU General Public License as published by -;; the Free Software Foundation, either version 3 of the License, or -;; (at your option) any later version. -;; -;; Megatest is distributed in the hope that it will be useful, -;; but WITHOUT ANY WARRANTY; without even the implied warranty of -;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -;; GNU General Public License for more details. -;; -;; You should have received a copy of the GNU General Public License -;; along with Megatest. If not, see . - -;;====================================================================== - -(declare (unit ulex)) - -(include "ulex/ulex.scm") -;; (include "ulex-simple/ulex.scm") ADDED ulex.scm.template Index: ulex.scm.template ================================================================== --- /dev/null +++ ulex.scm.template @@ -0,0 +1,23 @@ +;;====================================================================== +;; Copyright 2019, Matthew Welland. +;; +;; This file is part of Megatest. +;; +;; Megatest is free software: you can redistribute it and/or modify +;; it under the terms of the GNU General Public License as published by +;; the Free Software Foundation, either version 3 of the License, or +;; (at your option) any later version. +;; +;; Megatest is distributed in the hope that it will be useful, +;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +;; GNU General Public License for more details. +;; +;; You should have received a copy of the GNU General Public License +;; along with Megatest. If not, see . + +;;====================================================================== + +(declare (unit ulex)) + +(include "ulex-FLAVOR/ulex.scm") ADDED ulex/dbmgr.scm Index: ulex/dbmgr.scm ================================================================== --- /dev/null +++ ulex/dbmgr.scm @@ -0,0 +1,1131 @@ +;;====================================================================== +;; Copyright 2022, Matthew Welland. +;; +;; This file is part of Megatest. +;; +;; Megatest is free software: you can redistribute it and/or modify +;; it under the terms of the GNU General Public License as published by +;; the Free Software Foundation, either version 3 of the License, or +;; (at your option) any later version. +;; +;; Megatest is distributed in the hope that it will be useful, +;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +;; GNU General Public License for more details. +;; +;; You should have received a copy of the GNU General Public License +;; along with Megatest. If not, see . + +;;====================================================================== + +(declare (unit dbmgrmod)) + +(declare (uses ulex)) +(declare (uses apimod)) +(declare (uses pkts)) +(declare (uses commonmod)) +(declare (uses dbmod)) +(declare (uses mtargs)) +(declare (uses portloggermod)) +(declare (uses debugprint)) + +(module dbmgrmod + * + +(import scheme + chicken.base + chicken.condition + chicken.file + chicken.format + chicken.port + chicken.process + chicken.process-context + chicken.process-context.posix + chicken.sort + chicken.string + chicken.time + + (prefix sqlite3 sqlite3:) + matchable + md5 + message-digest + regex + s11n + srfi-1 + srfi-18 + srfi-69 + system-information + typed-records + + pkts + ulex + + commonmod + apimod + dbmod + debugprint + (prefix mtargs args:) + portloggermod + ) + +;; Configurations for server +;; (tcp-buffer-size 2048) +;; (max-connections 2048) + +;; info about me as a listener and my connections to db servers +;; stored (for now) in *db-serv-info* +;; +(defstruct servdat + (host #f) + (port #f) + (uuid #f) + (dbfile #f) + (uconn #f) ;; this is the listener *FOR THIS PROCESS* + (mode #f) + (status 'starting) + (trynum 0) ;; count the number of ports we've tried + (conns (make-hash-table)) ;; apath/dbname => conndat + ) + +(define *db-serv-info* (make-servdat)) + +(define (servdat->url sdat) + (conc (servdat-host sdat)":"(servdat-port sdat))) + +;; db servers contact info +;; +(defstruct conndat + (apath #f) + (dbname #f) + (fullname #f) + (hostport #f) + (ipaddr #f) + (port #f) + (srvpkt #f) + (srvkey #f) + (lastmsg 0) + (expires 0)) + +(define *srvpktspec* + `((server (host . h) + (port . p) + (servkey . k) + (pid . i) + (ipaddr . a) + (dbpath . d)))) + +;;====================================================================== +;; S U P P O R T F U N C T I O N S +;;====================================================================== + +;; set up the api proc, seems like there should be a better place for this? +;; +;; IS THIS NEEDED ANYMORE? TODO - REMOVE IF POSSIBLE +;; +;; (define api-proc (make-parameter conc)) +;; (api-proc api:execute-requests) + +;; do we have a connection to apath dbname and +;; is it not expired? then return it +;; +;; else setup a connection +;; +;; if that fails, return '(#f "some reason") ;; NB// convert to raising an exception +;; +(define (rmt:get-conn remdat apath dbname) + (let* ((fullname (db:dbname->path apath dbname))) + (hash-table-ref/default (servdat-conns remdat) fullname #f))) + +(define (rmt:drop-conn remdat apath dbname) + (let* ((fullname (db:dbname->path apath dbname))) + (hash-table-delete! (servdat-conns remdat) fullname))) + +(define (rmt:find-main-server uconn apath dbname) + (let* ((pktsdir (get-pkts-dir apath)) + (all-srvpkts (get-all-server-pkts pktsdir *srvpktspec*)) + (viable-srvs (get-viable-servers all-srvpkts dbname))) + (get-the-server uconn apath viable-srvs))) + + +(define *connstart-mutex* (make-mutex)) +(define *last-main-start* 0) + +;; looks for a connection to main, returns if have and not exired +;; creates new otherwise +;; +;; connections for other servers happens by requesting from main +;; +;; TODO: This is unnecessarily re-creating the record in the hash table +;; +(define (rmt:open-main-connection remdat apath) + (let* ((fullpath (db:dbname->path apath ".db/main.db")) + (conns (servdat-conns remdat)) + (conn (rmt:get-conn remdat apath ".db/main.db")) ;; (hash-table-ref/default conns fullpath #f)) ;; TODO - create call for this + (start-rmt:run (lambda () + (let* ((th1 (make-thread (lambda ()(rmt:run (get-host-name))) "non-db mode server"))) + (thread-start! th1) + (thread-sleep! 1) + (let loop ((count 0)) + (assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection") + (if (or (not *db-serv-info*) + (not (servdat-uconn *db-serv-info*))) + (begin + (thread-sleep! 1) + (loop (+ count 1))) + (begin + (servdat-mode-set! *db-serv-info* 'non-db) + (servdat-uconn *db-serv-info*))))))) + (myconn (servdat-uconn *db-serv-info*))) + (cond + ((not myconn) + (start-rmt:run) + (rmt:open-main-connection remdat apath)) + ((and conn ;; conn is NOT a socket, just saying ... + (< (current-seconds) (conndat-expires conn))) + #t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died + ((and conn + (>= (current-seconds)(conndat-expires conn))) + (debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.") + (rmt:drop-conn remdat apath ".db/main.db") ;; + (rmt:open-main-connection remdat apath)) + (else + ;; Below we will find or create and connect to main + (debug:print-info 0 *default-log-port* "rmt:open-main-connection - starting from scratch") + (let* ((dbname (db:run-id->dbname #f)) + (the-srv (rmt:find-main-server myconn apath dbname)) + (start-main-srv (lambda () ;; call IF there is no the-srv found + (mutex-lock! *connstart-mutex*) + (if (> (- (current-seconds) *last-main-start*) 5) ;; at least four seconds since last attempt to start main server + (begin + (api:run-server-process apath dbname) + (set! *last-main-start* (current-seconds)) + (thread-sleep! 1)) + (thread-sleep! 0.25)) + (mutex-unlock! *connstart-mutex*) + (rmt:open-main-connection remdat apath) ;; TODO: Add limit to number of tries + ))) + (if (not the-srv) ;; have server, try connecting to it + (start-main-srv) + (let* ((srv-addr (server-address the-srv)) ;; need serv + (ipaddr (alist-ref 'ipaddr the-srv)) + (port (alist-ref 'port the-srv)) + (srvkey (alist-ref 'servkey the-srv)) + (fullpath (db:dbname->path apath dbname)) + + (new-the-srv (make-conndat + apath: apath + dbname: dbname + fullname: fullpath + hostport: srv-addr + ;; socket: (open-nn-connection srv-addr) - TODO - open ulex connection? + ipaddr: ipaddr + port: port + srvpkt: the-srv + srvkey: srvkey ;; generated by rmt:get-signature on the server side + lastmsg: (current-seconds) + expires: (+ (current-seconds) + (server:expiration-timeout) + -2) ;; this needs to be gathered during the ping + ))) + (hash-table-set! conns fullpath new-the-srv))) + #t))))) + +;; NB// sinfo is a servdat struct +;; +(define (rmt:general-open-connection sinfo apath dbname #!key (num-tries 5)) + (assert (not (equal? dbname ".db/main.db")) "ERROR: general-open-connection should never be called with main as the db") + (let* ((mdbname ".db/main.db") ;; (db:run-id->dbname #f)) TODO: put this back to the lookup when stable + (fullname (db:dbname->path apath dbname)) + (conns (servdat-conns sinfo)) + (mconn (rmt:get-conn sinfo apath ".db/main.db")) + (dconn (rmt:get-conn sinfo apath dbname))) + #;(if (and mconn + (not (debug:print-logger))) + (begin + (debug:print-info 0 *default-log-port* "Turning on logging to main, look in logs dir for main log.") + (debug:print-logger rmt:log-to-main))) + (cond + ((and mconn + dconn + (< (current-seconds)(conndat-expires dconn))) + #t) ;; good to go + ((not mconn) ;; no channel open to main? open it... + (rmt:open-main-connection sinfo apath) + (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1))) + ((not dconn) ;; no channel open to dbname? + (let* ((res (rmt:send-receive-real sinfo apath mdbname 'get-server `(,apath ,dbname)))) + (case res + ((server-started) + (if (> num-tries 0) + (begin + (thread-sleep! 2) + (rmt:general-open-connection sinfo apath dbname num-tries: (- num-tries 1))) + (begin + (debug:print-error 0 *default-log-port* "Failed to start servers needed or open channel to "apath", "dbname) + (exit 1)))) + (else + (if (list? res) ;; server has been registered and the info was returned. pass it on. + (begin ;; ("192.168.0.9" 53817 + ;; "5e34239f48e8973b3813221e54701a01" "24310" + ;; "192.168.0.9" + ;; "/home/matt/data/megatest/tests/simplerun" + ;; ".db/1.db") + (match + res + ((host port servkey pid ipaddr apath dbname) + (debug:print-info 0 *default-log-port* "got "res) + (hash-table-set! conns + fullname + (make-conndat + apath: apath + dbname: dbname + hostport: (conc host":"port) + ;; socket: (open-nn-connection (conc host":"port)) ;; TODO - open ulex connection? + ipaddr: ipaddr + port: port + srvkey: servkey + lastmsg: (current-seconds) + expires: (+ (current-seconds) + (server:expiration-timeout) + -2)))) + (else + (debug:print-info 0 *default-log-port* "return data from starting server did not match host port servkey pid ipaddr apath dbname " res))) + res) + (begin + (debug:print-info 0 *default-log-port* "Unexpected result: " res) + res))))))) + #t)) + +;;====================================================================== + +;; FOR DEBUGGING SET TO #t +;; (define *localmode* #t) +(define *localmode* #f) +(define *dbstruct* (make-dbr:dbstruct)) + +;; Defaults to current area +;; +(define (rmt:send-receive cmd rid params #!key (attemptnum 1)(area-dat #f)) + (let* ((apath *toppath*) + (sinfo *db-serv-info*) + (dbname (db:run-id->dbname rid))) + (if *localmode* + (api:execute-requests *dbstruct* cmd params) + (begin + (rmt:open-main-connection sinfo apath) + (if rid (rmt:general-open-connection sinfo apath dbname)) + #;(if (not (member cmd '(log-to-main))) + (debug:print-info 0 *default-log-port* "rmt:send-receive "cmd" params="params)) + (rmt:send-receive-real sinfo apath dbname cmd params))))) + +;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed +;; sometime in the future +;; +(define (rmt:send-receive-real sinfo apath dbname cmd params) + (assert (not (eq? 'primordial (thread-name (current-thread)))) "FATAL: Do not call rmt:send-receive-real in the primodial thread.") + (let* ((cdat (rmt:get-conn sinfo apath dbname))) + (assert cdat "FATAL: rmt:send-receive-real called without the needed channels opened") + (let* ((uconn (servdat-uconn sinfo)) ;; get the interface to ulex + ;; then send-receive using the ulex layer to host-port stored in cdat + (res (send-receive uconn (conndat-hostport cdat) cmd params)) + #;(th1 (make-thread (lambda () + (set! res (send-receive uconn (conndat-hostport cdat) cmd params))) + "send-receive thread"))) + ;; (thread-start! th1) + ;; (thread-join! th1) ;; gratuitious thread stuff is so that mailbox is not used in primordial thead + ;; since we accessed the server we can bump the expires time up + (conndat-expires-set! cdat (+ (current-seconds) + (server:expiration-timeout) + -2)) ;; two second margin for network time misalignments etc. + res))) + +;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed +;; sometime in the future. +;; +;; Purpose - call the main.db server and request a server be started +;; for the given area path and dbname +;; + +(define (rmt:print-db-stats) + (let ((fmtstr "~40a~7-d~9-d~20,2-f")) ;; "~20,2-f" + (debug:print 18 *default-log-port* "DB Stats, "(seconds->year-week/day-time (current-seconds))"\n=====================") + (debug:print 18 *default-log-port* (format #f "~40a~8a~10a~10a" "Cmd" "Count" "TotTime" "Avg")) + (for-each (lambda (cmd) + (let ((cmd-dat (hash-table-ref *db-stats* cmd))) + (debug:print 18 *default-log-port* (format #f fmtstr cmd (vector-ref cmd-dat 0) (vector-ref cmd-dat 1) (/ (vector-ref cmd-dat 1)(vector-ref cmd-dat 0)))))) + (sort (hash-table-keys *db-stats*) + (lambda (a b) + (> (vector-ref (hash-table-ref *db-stats* a) 0) + (vector-ref (hash-table-ref *db-stats* b) 0))))))) + +(define (rmt:get-max-query-average run-id) + (mutex-lock! *db-stats-mutex*) + (let* ((runkey (conc "run-id=" run-id " ")) + (cmds (filter (lambda (x) + (substring-index runkey x)) + (hash-table-keys *db-stats*))) + (res (if (null? cmds) + (cons 'none 0) + (let loop ((cmd (car cmds)) + (tal (cdr cmds)) + (max-cmd (car cmds)) + (res 0)) + (let* ((cmd-dat (hash-table-ref *db-stats* cmd)) + (tot (vector-ref cmd-dat 0)) + (curravg (/ (vector-ref cmd-dat 1) (vector-ref cmd-dat 0))) ;; count is never zero by construction + (currmax (max res curravg)) + (newmax-cmd (if (> curravg res) cmd max-cmd))) + (if (null? tal) + (if (> tot 10) + (cons newmax-cmd currmax) + (cons 'none 0)) + (loop (car tal)(cdr tal) newmax-cmd currmax))))))) + (mutex-unlock! *db-stats-mutex*) + res)) + +;; host and port are used to ensure we are remove proper records +(define (rmt:server-shutdown host port) + (let ((dbfile (servdat-dbfile *db-serv-info*))) + (debug:print-info 0 *default-log-port* "dbfile is "dbfile) + (if dbfile + (let* ((am-server (args:get-arg "-server")) + (dbfile (args:get-arg "-db")) + (apath *toppath*) + #;(sinfo *remotedat*)) ;; foundation for future fix + (if *dbstruct-db* + (let* ((dbdat (db:get-dbdat *dbstruct-db* apath dbfile)) + (db (dbr:dbdat-db dbdat)) + (inmem (dbr:dbdat-db dbdat)) ;; WRONG + ) + ;; do a final sync here + (debug:print-info 0 *default-log-port* "Doing final sync for "apath" "dbfile" at "(current-seconds)) + (db:sync-inmem->disk *dbstruct-db* apath dbfile force-sync: #t) + ;; let's finalize here + (debug:print-info 0 *default-log-port* "Finalizing db and inmem") + (if (sqlite3:database? db) + (sqlite3:finalize! db) + (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, db is not a database, not finalizing...")) + (if (sqlite3:database? inmem) + (sqlite3:finalize! inmem) + (debug:print-info 0 *default-log-port* "in rmt:server-shutdown, inmem is not a database, not finalizing...")) + (debug:print-info 0 *default-log-port* "Finalizing db and inmem complete")) + (debug:print-info 0 *default-log-port* "Db was never opened, no cleanup to do.")) + (if (not am-server) + (debug:print-info 0 *default-log-port* "I am not a server, should NOT get here!") + (if (string-match ".*/main.db$" dbfile) + (let ((pkt-file (conc (get-pkts-dir *toppath*) + "/" (servdat-uuid *db-serv-info*) + ".pkt"))) + (debug:print-info 0 *default-log-port* "removing pkt "pkt-file) + (delete-file* pkt-file) + (debug:print-info 0 *default-log-port* "Releasing lock (if any) for "dbfile ", host "host", port "port) + (db:with-lock-db + (servdat-dbfile *db-serv-info*) + (lambda (dbh dbfile) + (db:release-lock dbh dbfile host port)))) ;; I'm not the server - should not have a lock to remove + (let* ((sdat *db-serv-info*) ;; we have a run-id server + (host (servdat-host sdat)) + (port (servdat-port sdat)) + (uuid (servdat-uuid sdat)) + (res (rmt:deregister-server *db-serv-info* *toppath* host port uuid dbfile))) + (debug:print-info 0 *default-log-port* "deregistered-server, res="res) + (debug:print-info 0 *default-log-port* "deregistering server "host":"port" with uuid "uuid) + ))))))) + + +(define (common:run-sync?) + ;; (and (common:on-homehost?) + (args:get-arg "-server")) + +(define *rmt:run-mutex* (make-mutex)) +(define *rmt:run-flag* #f) + +;; Main entry point to start a server. was start-server +(define (rmt:run hostn) + (mutex-lock! *rmt:run-mutex*) + (if *rmt:run-flag* + (begin + (debug:print-warn 0 *default-log-port* "rmt:run already running.") + (mutex-unlock! *rmt:run-mutex*)) + (begin + (set! *rmt:run-flag* #t) + (mutex-unlock! *rmt:run-mutex*) + ;; ;; Configurations for server + ;; (tcp-buffer-size 2048) + ;; (max-connections 2048) + (debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...") + (if (and *db-serv-info* + (servdat-uconn *db-serv-info*)) + (let* ((uconn (servdat-uconn *db-serv-info*))) + (wait-and-close uconn)) + (let* ((port (portlogger:open-run-close portlogger:find-port)) + (handler-proc (lambda (rem-host-port qrykey cmd params) ;; + (set! *db-last-access* (current-seconds)) + (assert (list? params) "FATAL: handler called with non-list params") + (assert (args:get-arg "-server") "FATAL: handler called on non-server side. cmd="cmd", params="params) + (debug:print 0 *default-log-port* "handler call: "cmd", params="params) + (api:execute-requests *dbstruct-db* cmd params)))) + ;; (api:process-request *dbstuct-db* + (if (not *db-serv-info*) + (set! *db-serv-info* (make-servdat host: hostn port: port))) + (let* ((uconn (run-listener handler-proc port)) + (rport (udat-port uconn))) ;; the real port + (servdat-host-set! *db-serv-info* hostn) + (servdat-port-set! *db-serv-info* rport) + (servdat-uconn-set! *db-serv-info* uconn) + (wait-and-close uconn) + (db:print-current-query-stats) + ))) + (let* ((host (servdat-host *db-serv-info*)) + (port (servdat-port *db-serv-info*)) + (mode (or (servdat-mode *db-serv-info*) + "non-db"))) + ;; server exit stuff here + ;; (rmt:server-shutdown host port) - always do in on-exit + ;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit + (debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting") + )))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + + +;;====================================================================== +;; NEW SERVER METHOD +;;====================================================================== + +;; only use for main.db - need to re-write some of this :( +;; +(define (get-lock-db sdat dbfile host port) + (assert host "FATAL: get-lock-db called with host not set.") + (assert port "FATAL: get-lock-db called with port not set.") + (let* ((dbh (db:open-run-db dbfile db:initialize-db)) ;; open-run-db creates a standard db with schema used by all situations + (res (db:get-iam-server-lock dbh dbfile host port)) + (uconn (servdat-uconn sdat))) + ;; res => list then already locked, check server is responsive + ;; => #t then sucessfully got the lock + ;; => #f reserved for future use as to indicate something went wrong + (match res + ((owner_pid owner_host owner_port event_time) + (if (server-ready? uconn (conc owner_host":"owner_port) "abc") + #f ;; locked by someone else + (begin ;; locked by someone dead and gone + (debug:print 0 *default-log-port* "WARNING: stale lock - have to steal it. This may fail.") + (db:steal-lock-db dbh dbfile port)))) + (#t #t) ;; placeholder so that we don't touch res if it is #t + (else (set! res #f))) + (sqlite3:finalize! dbh) + res)) + + +(define (register-server pkts-dir pkt-spec host port servkey ipaddr dbpath) + (let* ((pkt-dat `((host . ,host) + (port . ,port) + (servkey . ,servkey) + (pid . ,(current-process-id)) + (ipaddr . ,ipaddr) + (dbpath . ,dbpath))) + (uuid (write-alist->pkt + pkts-dir + pkt-dat + pktspec: pkt-spec + ptype: 'server))) + (debug:print 0 *default-log-port* "Server on "host":"port" registered in pkt "uuid) + uuid)) + +(define (get-pkts-dir #!optional (apath #f)) + (let* ((effective-toppath (or *toppath* apath))) + (assert effective-toppath + "ERROR: get-pkts-dir called without *toppath* set. Exiting.") + (let* ((pdir (conc effective-toppath "/.meta/srvpkts"))) + (if (file-exists? pdir) + pdir + (begin + (handle-exceptions ;; this exception handler should NOT be needed but ... + exn + pdir + (create-directory pdir #t)) + pdir))))) + +;; given a pkts dir read +;; +(define (get-all-server-pkts pktsdir-in pktspec) + (let* ((pktsdir (if (file-exists? pktsdir-in) + pktsdir-in + (begin + (create-directory pktsdir-in #t) + pktsdir-in))) + (all-pkt-files (glob (conc pktsdir "/*.pkt")))) + (map (lambda (pkt-file) + (read-pkt->alist pkt-file pktspec: pktspec)) + all-pkt-files))) + +(define (server-address srv-pkt) + (conc (alist-ref 'host srv-pkt) ":" + (alist-ref 'port srv-pkt))) + +(define (server-ready? uconn host-port key) ;; server-address is host:port + (let* ((params `((cmd . ping)(key . ,key))) + (data `((cmd . ping) + (key . ,key) + (params . ,params))) ;; I don't get it. + (res (send-receive uconn host-port 'ping data))) + (if (eq? res 'ack) ;; yep, likely it is who we want on the other end + res + #f))) +;; (begin (debug:print-info 0 *default-log-port* "server-ready? => "res) #f)))) + +; from the pkts return servers associated with dbpath +;; NOTE: Only one can be alive - have to check on each +;; in the list of pkts returned +;; +(define (get-viable-servers serv-pkts dbpath) + (let loop ((tail serv-pkts) + (res '())) + (if (null? tail) + res ;; NOTE: sort by age so oldest is considered first + (let* ((spkt (car tail))) + (loop (cdr tail) + (if (equal? dbpath (alist-ref 'dbpath spkt)) + (cons spkt res) + res)))))) + +(define (remove-pkts-if-not-alive uconn serv-pkts) + (filter (lambda (pkt) + (let* ((host (alist-ref 'host pkt)) + (port (alist-ref 'port pkt)) + (host-port (conc host":"port)) + (key (alist-ref 'servkey pkt)) + (pktz (alist-ref 'Z pkt)) + (res (server-ready? uconn host-port key))) + (if res + res + (let* ((pktsdir (get-pkts-dir *toppath*)) + (pktpath (conc pktsdir"/"pktz".pkt"))) + (debug:print 0 *default-log-port* "WARNING: pkt with no server "pktpath) + (delete-file* pktpath) + #f)))) + serv-pkts)) + +;; from viable servers get one that is alive and ready +;; +(define (get-the-server uconn apath serv-pkts) + (let loop ((tail serv-pkts)) + (if (null? tail) + #f + (let* ((spkt (car tail)) + (host (alist-ref 'ipaddr spkt)) + (port (alist-ref 'port spkt)) + (host-port (conc host":"port)) + (dbpth (alist-ref 'dbpath spkt)) + (srvkey (alist-ref 'Z spkt)) ;; (alist-ref 'srvkey spkt)) + (addr (server-address spkt))) + (if (server-ready? uconn host-port srvkey) + spkt + (loop (cdr tail))))))) + +;; am I the "first" in line server? I.e. my D card is smallest +;; use Z card as tie breaker +;; +(define (get-best-candidate serv-pkts dbpath) + (if (null? serv-pkts) + #f + (let loop ((tail serv-pkts) + (best (car serv-pkts))) + (if (null? tail) + best + (let* ((candidate (car tail)) + (candidate-bd (string->number (alist-ref 'D candidate))) + (best-bd (string->number (alist-ref 'D best))) + ;; bigger number is younger + (candidate-z (alist-ref 'Z candidate)) + (best-z (alist-ref 'Z best)) + (new-best (cond + ((> best-bd candidate-bd) ;; best is younger than candidate + candidate) + ((< best-bd candidate-bd) ;; candidate is younger than best + best) + (else + (if (string>=? best-z candidate-z) + best + candidate))))) ;; use Z card as tie breaker + (if (null? tail) + new-best + (loop (cdr tail) new-best))))))) + + +;;====================================================================== +;; END NEW SERVER METHOD +;;====================================================================== + +;; if .db/main.db check the pkts +;; +(define (rmt:wait-for-server pkts-dir db-file server-key) + (let* ((sdat *db-serv-info*)) + (let loop ((start-time (current-seconds)) + (changed #t) + (last-sdat "not this")) + (begin ;; let ((sdat #f)) + (thread-sleep! 0.01) + (debug:print-info 0 *default-log-port* "Waiting for server alive signature") + (mutex-lock! *heartbeat-mutex*) + (set! sdat *db-serv-info*) + (mutex-unlock! *heartbeat-mutex*) + (if (and sdat + (not changed) + (> (- (current-seconds) start-time) 2)) + (let* ((uconn (servdat-uconn sdat))) + (servdat-status-set! sdat 'iface-stable) + (debug:print-info 0 *default-log-port* "Received server alive signature, now attempting to lock in server") + ;; create a server pkt in *toppath*/.meta/srvpkts + + ;; TODO: + ;; 1. change sdat to stuct + ;; 2. add uuid to struct + ;; 3. update uuid in sdat here + ;; + (servdat-uuid-set! sdat + (register-server + pkts-dir *srvpktspec* + (get-host-name) + (servdat-port sdat) server-key + (servdat-host sdat) db-file)) + ;; (set! *my-signature* (servdat-uuid sdat)) ;; replace with Z, no, stick with proper key + ;; now read pkts and see if we are a contender + (let* ((all-pkts (get-all-server-pkts pkts-dir *srvpktspec*)) + (viables (get-viable-servers all-pkts db-file)) + (alive (remove-pkts-if-not-alive uconn viables)) + (best-srv (get-best-candidate alive db-file)) + (best-srv-key (if best-srv (alist-ref 'servkey best-srv) #f)) + (i-am-srv (equal? best-srv-key server-key)) + (delete-pkt (lambda () + (let* ((pktfile (conc (get-pkts-dir *toppath*) + "/" (servdat-uuid *db-serv-info*) + ".pkt"))) + (debug:print-info 0 *default-log-port* "Attempting to remove bogus pkt file "pktfile) + (delete-file* pktfile))))) ;; remove immediately instead of waiting for on-exit + (debug:print 0 *default-log-port* "best-srv-key: "best-srv-key", server-key: "server-key", i-am-srv: "i-am-srv) + ;; am I the best-srv, compare server-keys to know + (if i-am-srv + (if (get-lock-db sdat db-file (servdat-host sdat)(servdat-port sdat)) ;; (db:get-iam-server-lock *dbstruct-db* *toppath* run-id) + (begin + (debug:print-info 0 *default-log-port* "I'm the server!") + (servdat-dbfile-set! sdat db-file) + (servdat-status-set! sdat 'db-locked)) + (begin + (debug:print-info 0 *default-log-port* "I'm not the server, exiting.") + (bdat-time-to-exit-set! *bdat* #t) + (delete-pkt) + (thread-sleep! 0.2) + (exit))) + (begin + (debug:print-info 0 *default-log-port* + "Keys do not match "best-srv-key", "server-key", exiting.") + (bdat-time-to-exit-set! *bdat* #t) + (delete-pkt) + (thread-sleep! 0.2) + (exit))) + sdat)) + (begin ;; sdat not yet contains server info + (debug:print-info 0 *default-log-port* "Still waiting, last-sdat=" last-sdat) + (sleep 4) + (if (> (- (current-seconds) start-time) 120) ;; been waiting for two minutes + (begin + (debug:print-error 0 *default-log-port* "transport appears to have died, exiting server") + (exit)) + (loop start-time + (equal? sdat last-sdat) + sdat)))))))) + +(define (rmt:register-server sinfo apath iface port server-key dbname) + (servdat-conns sinfo) ;; just checking types + (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db + (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath + (db:run-id->dbname #f) + 'register-server `(,iface + ,port + ,server-key + ,(current-process-id) + ,iface + ,apath + ,dbname))) + +(define (rmt:get-count-servers sinfo apath) + (servdat-conns sinfo) ;; just checking types + (rmt:open-main-connection sinfo apath) ;; we need a channel to main.db + (rmt:send-receive-real sinfo apath ;; params: host port servkey pid ipaddr dbpath + (db:run-id->dbname #f) + 'get-count-servers `(,apath))) + +(define (rmt:get-servers-info apath) + (rmt:send-receive 'get-servers-info #f `(,apath))) + +(define (rmt:deregister-server db-serv-info apath iface port server-key dbname) + (rmt:open-main-connection db-serv-info apath) ;; we need a channel to main.db + (rmt:send-receive-real db-serv-info apath ;; params: host port servkey pid ipaddr dbpath + (db:run-id->dbname #f) + 'deregister-server `(,iface + ,port + ,server-key + ,(current-process-id) + ,iface + ,apath + ,dbname))) + +(define (rmt:wait-for-stable-interface #!optional (num-tries-allowed 100)) + ;; wait until *db-serv-info* stops changing + (let* ((stime (current-seconds))) + (let loop ((last-host #f) + (last-port #f) + (tries 0)) + (let* ((curr-host (and *db-serv-info* (servdat-host *db-serv-info*))) + (curr-port (and *db-serv-info* (servdat-port *db-serv-info*)))) + ;; first we verify port and interface, update *db-serv-info* in need be. + (cond + ((> tries num-tries-allowed) + (debug:print 0 *default-log-port* "rmt:keep-running, giving up after trying for several minutes.") + (exit 1)) + ((not *db-serv-info*) + (thread-sleep! 0.25) + (loop curr-host curr-port (+ tries 1))) + ((or (not last-host)(not last-port)) + (debug:print 0 *default-log-port* "rmt:keep-running, still no interface, tries="tries) + (thread-sleep! 0.25) + (loop curr-host curr-port (+ tries 1))) + ((or (not (equal? last-host curr-host)) + (not (equal? last-port curr-port))) + (debug:print-info 0 *default-log-port* "WARNING: interface changed, refreshing iface and port info") + (thread-sleep! 0.25) + (loop curr-host curr-port (+ tries 1))) + ((< (- (current-seconds) stime) 1) ;; keep up the looping until at least 3 seconds have passed + (thread-sleep! 0.5) + (loop curr-host curr-port (+ tries 1))) + (else + (rmt:get-signature) ;; sets *my-signature* as side effect + (servdat-status-set! *db-serv-info* 'interface-stable) + (debug:print 0 *default-log-port* + "SERVER STARTED: " curr-host + ":" curr-port + " AT " (current-seconds) " server signature: " *my-signature* + " with "(servdat-trynum *db-serv-info*)" port changes") + (flush-output *default-log-port*) + #t)))))) + +;; run rmt:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (rmt:keep-running dbname) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (debug:print-info 0 *default-log-port* "Starting the sync-back, keep alive thread in server") + + (let* ((sinfo *db-serv-info*) + (server-start-time (current-seconds)) + (pkts-dir (get-pkts-dir)) + (server-key (rmt:get-signature)) ;; This servers key + (is-main (equal? (args:get-arg "-db") ".db/main.db")) + (last-access 0) + (server-timeout (server:expiration-timeout)) + (shutdown-server-sequence (lambda (host port) + (set! *unclean-shutdown* #f) ;; Should not be needed anymore + (debug:print-info 0 *default-log-port* "Starting to shutdown the server. pid="(current-process-id)) + ;; (rmt:server-shutdown host port) -- called in on-exit + ;; (portlogger:open-run-close portlogger:set-port port "released") called in on-exit + (exit))) + (timed-out? (lambda () + (<= (+ last-access server-timeout) + (current-seconds))))) + (servdat-dbfile-set! *db-serv-info* (args:get-arg "-db")) + ;; main and run db servers have both got wait logic (could/should merge it) + (if is-main + (rmt:wait-for-server pkts-dir dbname server-key) + (rmt:wait-for-stable-interface)) + ;; this is our forever loop + (let* ((iface (servdat-host *db-serv-info*)) + (port (servdat-port *db-serv-info*)) + (uconn (servdat-uconn *db-serv-info*))) + (let loop ((count 0) + (bad-sync-count 0) + (start-time (current-milliseconds))) + (if (and (not is-main) + (common:low-noise-print 60 "servdat-status")) + (debug:print-info 0 *default-log-port* "servdat-status is " (servdat-status *db-serv-info*))) + + (mutex-lock! *heartbeat-mutex*) + ;; set up the database handle + (if (not *dbstruct-db*) ;; no db opened yet, open the db and register with main if appropriate + (let ((watchdog (bdat-watchdog *bdat*))) + (debug:print 0 *default-log-port* "SERVER: dbprep") + (db:setup dbname) ;; sets *dbstruct-db* as side effect + (servdat-status-set! *db-serv-info* 'db-opened) + ;; IFF I'm not main, call into main and register self + (if (not is-main) + (let ((res (rmt:register-server sinfo + *toppath* iface port + server-key dbname))) + (if res ;; we are the server + (servdat-status-set! *db-serv-info* 'have-interface-and-db) + ;; now check that the db locker is alive, clear it out if not + (let* ((serv-info (rmt:server-info *toppath* dbname))) + (match serv-info + ((host port servkey pid ipaddr apath dbpath) + (if (not (server-ready? uconn (conc host":"port) servkey)) + (begin + (debug:print-info 0 *default-log-port* "Server registered but not alive. Removing and trying again.") + (rmt:deregister-server sinfo apath host port servkey dbpath) ;; servkey pid ipaddr apath dbpath) + (loop (+ count 1) bad-sync-count start-time)))) + (else + (debug:print 0 *default-log-port* "We are not the server for "dbname", exiting. Server info is: "serv-info) + (exit))))))) + (debug:print 0 *default-log-port* + "SERVER: running, db "dbname" opened, megatest version: " + (common:get-full-version)) + ;; start the watchdog + + ;; is this really needed? + + #;(if watchdog + (if (not (member (thread-state watchdog) + '(ready running blocked + sleeping dead))) + (begin + (debug:print-info 0 *default-log-port* "Starting watchdog thread (in state "(thread-state watchdog)")") + (thread-start! watchdog)) + (debug:print-info 0 *default-log-port* "Not starting watchdog thread (in state "(thread-state watchdog)")")) + (debug:print 0 *default-log-port* "ERROR: *watchdog* not setup, cannot start it.")) + #;(loop (+ count 1) bad-sync-count start-time) + )) + + (db:sync-inmem->disk *dbstruct-db* *toppath* dbname force-sync: #t) + + (mutex-unlock! *heartbeat-mutex*) + + ;; when things go wrong we don't want to be doing the various + ;; queries too often so we strive to run this stuff only every + ;; four seconds or so. + (let* ((sync-time (- (current-milliseconds) start-time)) + (rem-time (quotient (- 4000 sync-time) 1000))) + (if (and (<= rem-time 4) + (> rem-time 0)) + (thread-sleep! rem-time))) + + ;; Transfer *db-last-access* to last-access to use in checking that we are still alive + (set! last-access *db-last-access*) + + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1) bad-sync-count (current-milliseconds))) + + (if (common:low-noise-print 60 "dbstats") + (begin + (debug:print 0 *default-log-port* "Server stats:") + (db:print-current-query-stats))) + (let* ((hrs-since-start (/ (- (current-seconds) server-start-time) 3600))) + (cond + ((not *server-run*) + (debug:print-info 0 *default-log-port* "*server-run* set to #f. Shutting down.") + (shutdown-server-sequence (get-host-name) port)) + ((timed-out?) + (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access)) + (shutdown-server-sequence (get-host-name) port)) + ((and *server-run* + (or (not (timed-out?)) + (if is-main ;; do not exit if there are other servers (keep main open until all others gone) + (> (rmt:get-count-servers sinfo *toppath*) 1) + #f))) + (if (common:low-noise-print 120 "server continuing") + (debug:print-info 0 *default-log-port* "Server continuing, seconds since last db access: " (- (current-seconds) last-access))) + (loop 0 bad-sync-count (current-milliseconds))) + (else + (set! *unclean-shutdown* #f) + (debug:print-info 0 *default-log-port* "Server timed out. seconds since last db access: " (- (current-seconds) last-access)) + (shutdown-server-sequence (get-host-name) port) + #;(debug:print-info 0 *default-log-port* "Sending 'quit to server, received: " + (open-send-receive-nn (conc iface":"port) ;; do this here and not in server-shutdown + (sexpr->string 'quit)))))))))) + +(define (rmt:get-reasonable-hostname) + (let* ((inhost (or (args:get-arg "-server") "-"))) + (if (equal? inhost "-") + (get-host-name) + inhost))) + +;; Call this to start the actual server +;; +;; all routes though here end in exit ... +;; +;; This is the point at which servers are started +;; +(define (rmt:server-launch dbname) + (debug:print-info 0 *default-log-port* "Entered rmt:server-launch") + (let* ((th2 (make-thread (lambda () + (debug:print-info 0 *default-log-port* "Server run thread started") + (rmt:run (rmt:get-reasonable-hostname))) + "Server run")) + (th3 (make-thread (lambda () + (debug:print-info 0 *default-log-port* "Server monitor thread started") + (if (args:get-arg "-server") + (rmt:keep-running dbname))) + "Keep running"))) + (thread-start! th2) + (thread-sleep! 0.252) ;; give the server time to settle before starting the keep-running monitor. + (thread-start! th3) + (set! *didsomething* #t) + (thread-join! th2) + (thread-join! th3)) + #f) + +;;====================================================================== +;; S E R V E R - D I R E C T C A L L S +;;====================================================================== + +(define (rmt:kill-server run-id) + (rmt:send-receive 'kill-server #f (list run-id))) + +(define (rmt:start-server run-id) + (rmt:send-receive 'start-server #f (list run-id))) + +(define (rmt:server-info apath dbname) + (rmt:send-receive 'get-server-info #f (list apath dbname))) + +;;====================================================================== +;; Nanomsg transport +;;====================================================================== + +#;(define (is-port-in-use port-num) + (let* ((ret #f)) + (let-values (((inp oup pid) + (process "netstat" (list "-tulpn" )))) + (let loop ((inl (read-line inp))) + (if (not (eof-object? inl)) + (begin + (if (string-search (regexp (conc ":" port-num)) inl) + (begin + ;(print "Output: " inl) + (set! ret #t)) + (loop (read-line inp))))))) + ret)) + +#;(define (open-nn-connection host-port) + (let ((req (make-req-socket)) + (uri (conc "tcp://" host-port))) + (nng-dial req uri) + (socket-set! req 'nng/recvtimeo 2000) + req)) + +#;(define (send-receive-nn req msg) + (nng-send req msg) + (nng-recv req)) + +#;(define (close-nn-connection req) + (nng-close! req)) + +;; ;; open connection to server, send message, close connection +;; ;; +;; (define (open-send-close-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds +;; (let ((req (make-req-socket 'req)) +;; (uri (conc "tcp://" host-port)) +;; (res #f) +;; ;; (contacts (alist-ref 'contact attrib)) +;; ;; (mode (alist-ref 'mode attrib)) +;; ) +;; (socket-set! req 'nng/recvtimeo 2000) +;; (handle-exceptions +;; exn +;; (let ((emsg ((condition-property-accessor 'exn 'message) exn))) +;; ;; Send notification +;; (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\"" ) +;; #f) +;; (nng-dial req uri) +;; ;; (print "Connected to the server " ) +;; (nng-send req msg) +;; ;; (print "Request Sent") +;; (let* ((th1 (make-thread (lambda () +;; (let ((resp (nng-recv req))) +;; (nng-close! req) +;; (set! res (if (equal? resp "ok") +;; #t +;; #f)))) +;; "recv thread")) +;; (th2 (make-thread (lambda () +;; (thread-sleep! timeout) +;; (thread-terminate! th1)) +;; "timer thread"))) +;; (thread-start! th1) +;; (thread-start! th2) +;; (thread-join! th1) +;; res)))) +;; +#;(define (open-send-receive-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds + (let ((req (make-req-socket)) + (uri (conc "tcp://" host-port)) + (res #f)) + (handle-exceptions + exn + (let ((emsg ((condition-property-accessor 'exn 'message) exn))) + ;; Send notification + (debug:print 0 *default-log-port* "ERROR: Failed to connect / send to " uri " message was \"" emsg "\", exn=" exn) + #f) + (nng-dial req uri) + (nng-send req msg) + (let* ((th1 (make-thread (lambda () + (let ((resp (nng-recv req))) + (nng-close! req) + ;; (print resp) + (set! res resp))) + "recv thread")) + (th2 (make-thread (lambda () + (thread-sleep! timeout) + (thread-terminate! th1)) + "timer thread"))) + (thread-start! th1) + (thread-start! th2) + (thread-join! th1) + res)))) + +;;====================================================================== +;; S E R V E R U T I L I T I E S +;;====================================================================== + +;; run ping in separate process, safest way in some cases +;; +#;(define (server:ping-server ifaceport) + (with-input-from-pipe + (conc (common:get-megatest-exe) " -ping " ifaceport) + (lambda () + (let loop ((inl (read-line)) + (res "NOREPLY")) + (if (eof-object? inl) + (case (string->symbol res) + ((NOREPLY) #f) + ((LOGIN_OK) #t) + (else #f)) + (loop (read-line) inl)))))) + +;; NOT USED (well, ok, reference in rpc-transport but otherwise not used). +;; +#;(define (server:login toppath) + (lambda (toppath) + (set! *db-last-access* (current-seconds)) ;; might not be needed. + (if (equal? *toppath* toppath) + #t + #f))) + +;; (define server:sync-lock-token "SERVER_SYNC_LOCK") +;; (define (server:release-sync-lock) +;; (db:no-sync-del! *no-sync-db* server:sync-lock-token)) +;; (define (server:have-sync-lock?) +;; (let* ((have-lock-pair (db:no-sync-get-lock *no-sync-db* server:sync-lock-token)) +;; (have-lock? (car have-lock-pair)) +;; (lock-time (cdr have-lock-pair)) +;; (lock-age (- (current-seconds) lock-time))) +;; (cond +;; (have-lock? #t) +;; ((>lock-age +;; (* 3 (configf:lookup-number *configdat* "server" "minimum-intersync-delay" default: 180))) +;; (server:release-sync-lock) +;; (server:have-sync-lock?)) +;; (else #f)))) + +)