Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -14,12 +14,13 @@ ;;====================================================================== (require-extension (srfi 18) extras tcp) ;; rpc) ;; (import (prefix rpc rpc:)) -(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest) +(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest base64) (import (prefix sqlite3 sqlite3:)) +(import (prefix base64 base64:)) (declare (unit db)) (declare (uses common)) (declare (uses keys)) (declare (uses ods)) @@ -1096,12 +1097,22 @@ ;; (let loop () ;; (thread-sleep! 10) ;; move save time around to minimize regular collisions? ;; (db:write-cached-data) ;; (loop))) -(define (db:obj->string obj)(with-output-to-string (lambda ()(serialize obj)))) -(define (db:string->obj msg)(with-input-from-string msg (lambda ()(deserialize)))) +(define (db:obj->string obj) + (string-substitute + (regexp "=") "_" + (base64:base64-encode (with-output-to-string (lambda ()(serialize obj)))) + #t)) + +(define (db:string->obj msg) + (with-input-from-string + (base64:base64-decode + (string-substitute + (regexp "_") "=" msg #t)) + (lambda ()(deserialize)))) (define (cdb:use-non-blocking-mode proc) (set! *client-non-blocking-mode* #t) (let ((res (proc))) (set! *client-non-blocking-mode* #f) @@ -1111,52 +1122,55 @@ ;; ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime ;; (define (cdb:client-call serverdat qtype immediate numretries . params) (debug:print-info 11 "cdb:client-call serverdat=" serverdat ", qtype=" qtype ", immediate=" immediate ", numretries=" numretries ", params=" params) - (handle-exceptions - exn - (begin - (thread-sleep! 5) - (if (> numretries 0)(apply cdb:client-call serverdat qtype immediate (- numretries 1) params))) + ;; (handle-exceptions + ;; exn + ;; (begin + ;; (thread-sleep! 5) + ;; (if (> numretries 0)(apply cdb:client-call serverdat qtype immediate (- numretries 1) params))) (let* ((client-sig (server:get-client-signature)) (query-sig (message-digest-string (md5-primitive) (conc qtype immediate params))) (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params)))) + ) + ;; (print "zdat=" zdat) + (let* ( (res #f) - (send-receive (lambda () - (let loop ((res (server:client-send-receive serverdat zdat))) - ;; get the sender info - ;; this should match (server:get-client-signature) - ;; we will need to process "all" messages here some day - ;; now get the actual message - (let ((myres (db:string->obj res))) - (if (equal? query-sig (vector-ref myres 1)) - (set! res (vector-ref myres 2)) - (loop (server:client-send-receive serverdat zdat))))))) - (timeout (lambda () - (let loop ((n numretries)) - (thread-sleep! 15) - (if (not res) - (if (> numretries 0) - (begin - (debug:print 2 "WARNING: no reply to query " params ", trying resend") - (debug:print-info 11 "re-sending message") - (send-message push-socket zdat) - (debug:print-info 11 "message re-sent") - (loop (- n 1))) - ;; (apply cdb:client-call serverdats qtype immediate (- numretries 1) params)) - (begin - (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.") - (exit 5)))))))) - (debug:print-info 11 "Starting threads") - (let ((th1 (make-thread send-receive "send receive")) - (th2 (make-thread timeout "timeout"))) - (thread-start! th1) - (thread-start! th2) - (thread-join! th1) - (debug:print-info 11 "cdb:client-call returning res=" res) - res)))) + (rawdat (server:client-send-receive serverdat zdat)) + (tmp #f)) + (print "Sent " zdat ", received " rawdat) + (set! tmp (db:string->obj newres)) + ;; (if (equal? query-sig (vector-ref myres 1)) + ;; (set! res + (vector-ref myres 2) + ;; (loop (server:client-send-receive serverdat zdat))))))) + ;; (timeout (lambda () + ;; (let loop ((n numretries)) + ;; (thread-sleep! 15) + ;; (if (not res) + ;; (if (> numretries 0) + ;; (begin + ;; (debug:print 2 "WARNING: no reply to query " params ", trying resend") + ;; (debug:print-info 11 "re-sending message") + ;; (apply cdb:client-call serverdat qtype immediate numretries params) + ;; (debug:print-info 11 "message re-sent") + ;; (loop (- n 1))) + ;; ;; (apply cdb:client-call serverdats qtype immediate (- numretries 1) params)) + ;; (begin + ;; (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.") + ;; (exit 5)))))))) + ;; (send-receive) + ))) + ;; (debug:print-info 11 "Starting threads") + ;; (let ((th1 (make-thread send-receive "send receive")) + ;; (th2 (make-thread timeout "timeout"))) + ;; (thread-start! th1) + ;; (thread-start! th2) + ;; (thread-join! th1) + ;; (debug:print-info 11 "cdb:client-call returning res=" res) + ;; res)))) (define (cdb:set-verbosity serverdat val) (cdb:client-call serverdat 'set-verbosity #f *default-numtries* val)) (define (cdb:login serverdat keyval signature) @@ -1249,35 +1263,10 @@ set-verbosity killserver)) ;; not used, intended to indicate to run in calling process (define db:run-local-queries '()) ;; rollup-tests-pass-fail)) - - - - - - - - -UPDATE DB:PROCESS_QUEUE@@@@ - - - - - - - - - - - - - - - - ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -46,38 +46,42 @@ (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") (exit)))) - (let* ((iface (if (string=? "-" hostn) - "*" ;; (get-host-name) - hostn)) + (let* (;; (iface (if (string=? "-" hostn) + ;; #f ;; (get-host-name) + ;; hostn)) (hostname (get-host-name)) (ipaddrstr (let ((ipstr (if (string=? "-" hostn) (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") #f))) - (if ipstr ipstr hostname))) + (if ipstr ipstr hostn))) ;; hostname))) (start-port (if (args:get-arg "-port") (string->number (args:get-arg "-port")) (+ 5000 (random 1001))))) (set! *cache-on* #t) (server:try-start-server ipaddrstr start-port))) (define (server:main-loop) + (print "INFO: Exectuing main server loop") + (access-log "megatest-http.log") + (server-bind-address #f) (define-page (main-page-path) (lambda () (with-request-variables (dat) + (print "Got dat=" dat) (let* ((packet (db:string->obj dat)) (qtype (cdb:packet-get-qtype packet))) (debug:print-info 12 "server=> received packet=" packet) (if (not (member qtype '(sync ping))) (begin (mutex-lock! *heartbeat-mutex*) (set! *last-db-access* (current-seconds)) (mutex-unlock! *heartbeat-mutex*))) - (open-run-close db:process-queue #f pub-socket (cons packet queue-lst))))))) + (open-run-close db:process-queue-item packet)))))) ;; This is recursively run by server:run until sucessful ;; (define (server:try-start-server ipaddrstr portnum) @@ -86,21 +90,23 @@ (begin (print-error-message exn) (if (< portnum 9000) (begin (print "WARNING: failed to start on portnum: " portnum ", trying next port") - (sleep 1) + (thread-sleep! 0.1) + (open-run-close tasks:remove-server-records tasks:open-db) (server:try-start-server ipaddrstr (+ portnum 1))) (print "ERROR: Tried and tried but could not start the server"))) - (print "INFO: Trying to start server on portnum: " portnum) - (set! *runremote* (list ipaddrstr portnum)) + (open-run-close tasks:remove-server-records tasks:open-db) (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr portnum 0 'live) - (awful-start server:main-loop ip-address: ipaddrstr port: portnum))) + (print "INFO: Trying to start server on " ipaddrstr ":" portnum) + (awful-start server:main-loop port: portnum) ;; ip-address: ipaddrstr + (print "INFO: server has been stopped"))) (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () @@ -109,10 +115,15 @@ ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== +(define (server:reply pubsock target query-sig success/fail result) + (debug:print-info 11 "server:reply target=" target ", result=" result) + ;; (send-message pubsock target send-more: #t) + ;; (send-message pubsock + (db:obj->string (vector success/fail query-sig result))) ;;====================================================================== ;; C L I E N T S ;;====================================================================== @@ -125,13 +136,17 @@ ;; ;; ;; 1 Hello, world! Goodbye Dolly ;; Send msg to serverdat and receive result (define (server:client-send-receive serverdat msg) - (let* ((res (with-input-from-request (conc (server:make-server-url serverdat) "/?dat=" msg) #f read-string)) - (match (string-search (regexp "(.*)<.body>") (caddr (string-split res "\n"))))) - (cadr match))) + (let* ((url (server:make-server-url serverdat)) + (fullurl (conc url "/?dat=" msg))) + (print "url=" url ", fullurl=" fullurl) + (let* ((res (with-input-from-request fullurl #f read-string))) + (print "got res=" res) + (let ((match (string-search (regexp "(.*)<.body>") (caddr (string-split res "\n"))))) + (cadr match))))) (define (server:client-login serverdat) (cdb:login serverdat *toppath* (server:get-client-signature))) ;; Not currently used! But, I think it *should* be used!!! @@ -191,10 +206,67 @@ (loop (+ count 1))))))) ;; we are starting a server, do not try again! That can lead to ;; recursively starting many processes!!! (server:client-setup numtries: 0)) (debug:print-info 1 "Too many attempts, giving up"))))) + +;; run server:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (server:keep-running) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *runremote*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (sleep 4) + (loop)))))) + (iface (car server-info)) + (port (cadr server-info)) + (last-access 0)) + ;; (print "Keep-running got server-info " server-info) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + (let ((queue-len (cdb:client-call server-info 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (if (> (+ last-access + ;; (* 50 60 60) ;; 48 hrs + ;; 60 ;; one minute + ;; (* 60 60) ;; one hour + (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit))))))) + + ;; all routes though here end in exit ... (define (server:launch) (if (not *toppath*) (if (not (setup-for-run)) @@ -209,16 +281,16 @@ (let* ((th2 (make-thread (lambda () (server:run (if (args:get-arg "-server") (args:get-arg "-server") "-"))) "Server run")) - (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) + ;; (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) ) (set! *client-non-blocking-mode* #t) ;; (thread-start! th1) (thread-start! th2) - (thread-start! th3) + ;; (thread-start! th3) (set! *didsomething* #t) ;; (thread-join! th3) (thread-join! th2) ) (debug:print 0 "ERROR: Failed to setup for megatest"))) Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -199,10 +199,13 @@ (open-run-close tasks:server-deregister tasks:open-db host pid: pid)) (if (null? tal) #f (loop (car tal)(cdr tal)))))))))) +(define (tasks:remove-server-records mdb) + (sqlite3:exec mdb "DELETE FROM servers;")) + (define (tasks:mark-server hostname port pid state) (if port (open-run-close tasks:server-deregister tasks:open-db hostname port: port) (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid)))