Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -14,14 +14,13 @@ ;;====================================================================== (require-extension (srfi 18) extras tcp) ;; rpc) ;; (import (prefix rpc rpc:)) -(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest) +(use sqlite3 srfi-1 posix regex regex-case srfi-69 csv-xml s11n md5 message-digest base64) (import (prefix sqlite3 sqlite3:)) - -(use zmq) +(import (prefix base64 base64:)) (declare (unit db)) (declare (uses common)) (declare (uses keys)) (declare (uses ods)) @@ -99,11 +98,12 @@ (thread-sleep! (random 120)) (debug:print-info 0 "trying db call one more time....") (apply open-run-close-no-exception-handling proc idb params)) (apply open-run-close-no-exception-handling proc idb params))) -(define open-run-close open-run-close-exception-handling) +;; (define open-run-close open-run-close-exception-handling) +(define open-run-close open-run-close-no-exception-handling) (define *global-delta* 0) (define *last-global-delta-printed* 0) (define (open-run-close-measure proc idb . params) @@ -789,12 +789,12 @@ " run_id=? AND testname=? AND NOT (item_path='' AND testname in (SELECT DISTINCT testname FROM tests WHERE testname=? AND item_path != ''));"))) ;;(debug:print 0 "QRY: " qry) (sqlite3:execute db qry run-id newstate newstatus testname testname))) testnames)) -(define (cdb:delete-tests-in-state zmqsocket run-id state) - (cdb:client-call zmqsocket 'delete-tests-in-state #t *default-numtries* run-id state)) +(define (cdb:delete-tests-in-state serverdat run-id state) + (cdb:client-call serverdat 'delete-tests-in-state #t *default-numtries* run-id state)) ;; speed up for common cases with a little logic (define (db:test-set-state-status-by-id db test-id newstate newstatus newcomment) (cond ((and newstate newstatus newcomment) @@ -968,15 +968,15 @@ (sqlite3:execute db "UPDATE tests SET comment=? WHERE id=?;" comment test-id)) -(define (cdb:test-set-rundir! zmqsocket run-id test-name item-path rundir) - (cdb:client-call zmqsocket 'test-set-rundir #t *default-numtries* rundir run-id test-name item-path)) +(define (cdb:test-set-rundir! serverdat run-id test-name item-path rundir) + (cdb:client-call serverdat 'test-set-rundir #t *default-numtries* rundir run-id test-name item-path)) -(define (cdb:test-set-rundir-by-test-id zmqsocket test-id rundir) - (cdb:client-call zmqsocket 'test-set-rundir-by-test-id #t *default-numtries* rundir test-id)) +(define (cdb:test-set-rundir-by-test-id serverdat test-id rundir) + (cdb:client-call serverdat 'test-set-rundir-by-test-id #t *default-numtries* rundir test-id)) (define (db:test-get-rundir-from-test-id db test-id) (let ((res #f)) ;; (hash-table-ref/default *test-paths* test-id #f))) ;; (if res ;; res @@ -988,12 +988,12 @@ "SELECT rundir FROM tests WHERE id=?;" test-id) ;; (hash-table-set! *test-paths* test-id res) res)) ;; )) -(define (cdb:test-set-log! zmqsocket test-id logf) - (if (string? logf)(cdb:client-call zmqsocket 'test-set-log #f *default-numtries* logf test-id))) +(define (cdb:test-set-log! serverdat test-id logf) + (if (string? logf)(cdb:client-call serverdat 'test-set-log #f *default-numtries* logf test-id))) ;;====================================================================== ;; Misc. test related queries ;;====================================================================== @@ -1104,12 +1104,22 @@ ;; (let loop () ;; (thread-sleep! 10) ;; move save time around to minimize regular collisions? ;; (db:write-cached-data) ;; (loop))) -(define (db:obj->string obj)(with-output-to-string (lambda ()(serialize obj)))) -(define (db:string->obj msg)(with-input-from-string msg (lambda ()(deserialize)))) +(define (db:obj->string obj) + (string-substitute + (regexp "=") "_" + (base64:base64-encode (with-output-to-string (lambda ()(serialize obj)))) + #t)) + +(define (db:string->obj msg) + (with-input-from-string + (base64:base64-decode + (string-substitute + (regexp "_") "=" msg #t)) + (lambda ()(deserialize)))) (define (cdb:use-non-blocking-mode proc) (set! *client-non-blocking-mode* #t) (let ((res (proc))) (set! *client-non-blocking-mode* #f) @@ -1117,104 +1127,101 @@ ;; params = 'target cached remparams ;; ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime ;; -(define (cdb:client-call zmq-sockets qtype immediate numretries . params) - (debug:print-info 11 "cdb:client-call zmq-sockets=" zmq-sockets ", qtype=" qtype ", immediate=" immediate ", numretries=" numretries ", params=" params) - (handle-exceptions - exn - (begin - (thread-sleep! 5) - (if (> numretries 0)(apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params))) - (let* ((push-socket (vector-ref zmq-sockets 0)) - (sub-socket (vector-ref zmq-sockets 1)) - (client-sig (server:get-client-signature)) +(define (cdb:client-call serverdat qtype immediate numretries . params) + (debug:print-info 11 "cdb:client-call serverdat=" serverdat ", qtype=" qtype ", immediate=" immediate ", numretries=" numretries ", params=" params) + ;; (handle-exceptions + ;; exn + ;; (begin + ;; (thread-sleep! 5) + ;; (if (> numretries 0)(apply cdb:client-call serverdat qtype immediate (- numretries 1) params))) + (let* ((client-sig (server:get-client-signature)) (query-sig (message-digest-string (md5-primitive) (conc qtype immediate params))) (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params)))) - (res #f) - (send-receive (lambda () - (debug:print-info 11 "sending message") - (send-message push-socket zdat) - (debug:print-info 11 "message sent") - (let loop () - ;; get the sender info - ;; this should match (server:get-client-signature) - ;; we will need to process "all" messages here some day - (receive-message* sub-socket) - ;; now get the actual message - (let ((myres (db:string->obj (receive-message* sub-socket)))) - (if (equal? query-sig (vector-ref myres 1)) - (set! res (vector-ref myres 2)) - (loop)))))) - (timeout (lambda () - (let loop ((n numretries)) - (thread-sleep! 15) - (if (not res) - (if (> numretries 0) - (begin - (debug:print 2 "WARNING: no reply to query " params ", trying resend") - (debug:print-info 11 "re-sending message") - (send-message push-socket zdat) - (debug:print-info 11 "message re-sent") - (loop (- n 1))) - ;; (apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params)) - (begin - (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.") - (exit 5)))))))) - (debug:print-info 11 "Starting threads") - (let ((th1 (make-thread send-receive "send receive")) - (th2 (make-thread timeout "timeout"))) - (thread-start! th1) - (thread-start! th2) - (thread-join! th1) - (debug:print-info 11 "cdb:client-call returning res=" res) - res)))) - -(define (cdb:set-verbosity zmq-socket val) - (cdb:client-call zmq-socket 'set-verbosity #f *default-numtries* val)) - -(define (cdb:login zmq-sockets keyval signature) - (cdb:client-call zmq-sockets 'login #t *default-numtries* keyval megatest-version signature)) - -(define (cdb:logout zmq-socket keyval signature) - (cdb:client-call zmq-socket 'logout #t *default-numtries* keyval signature)) - -(define (cdb:num-clients zmq-socket) - (cdb:client-call zmq-socket 'numclients #t *default-numtries*)) - -(define (cdb:test-set-status-state zmqsocket test-id status state msg) - (if msg - (cdb:client-call zmqsocket 'state-status-msg #t *default-numtries* state status msg test-id) - (cdb:client-call zmqsocket 'state-status #t *default-numtries* state status test-id))) ;; run-id test-name item-path minutes cpuload diskfree tmpfree) - -(define (cdb:test-rollup-test_data-pass-fail zmqsocket test-id) - (cdb:client-call zmqsocket 'test_data-pf-rollup #t *default-numtries* test-id test-id test-id test-id)) - -(define (cdb:pass-fail-counts zmqsocket test-id fail-count pass-count) - (cdb:client-call zmqsocket 'pass-fail-counts #t *default-numtries* fail-count pass-count test-id)) - -(define (cdb:tests-register-test zmqsocket run-id test-name item-path) + ) + (debug:print-info 11 "zdat=" zdat) + (let* ( + (res #f) + (rawdat (server:client-send-receive serverdat zdat)) + (tmp #f)) + (debug:print-info 11 "Sent " zdat ", received " rawdat) + (set! tmp (db:string->obj rawdat)) + ;; (if (equal? query-sig (vector-ref myres 1)) + ;; (set! res + (vector-ref tmp 2) + ;; (loop (server:client-send-receive serverdat zdat))))))) + ;; (timeout (lambda () + ;; (let loop ((n numretries)) + ;; (thread-sleep! 15) + ;; (if (not res) + ;; (if (> numretries 0) + ;; (begin + ;; (debug:print 2 "WARNING: no reply to query " params ", trying resend") + ;; (debug:print-info 11 "re-sending message") + ;; (apply cdb:client-call serverdat qtype immediate numretries params) + ;; (debug:print-info 11 "message re-sent") + ;; (loop (- n 1))) + ;; ;; (apply cdb:client-call serverdats qtype immediate (- numretries 1) params)) + ;; (begin + ;; (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.") + ;; (exit 5)))))))) + ;; (send-receive) + ))) + ;; (debug:print-info 11 "Starting threads") + ;; (let ((th1 (make-thread send-receive "send receive")) + ;; (th2 (make-thread timeout "timeout"))) + ;; (thread-start! th1) + ;; (thread-start! th2) + ;; (thread-join! th1) + ;; (debug:print-info 11 "cdb:client-call returning res=" res) + ;; res)))) + +(define (cdb:set-verbosity serverdat val) + (cdb:client-call serverdat 'set-verbosity #f *default-numtries* val)) + +(define (cdb:login serverdat keyval signature) + (cdb:client-call serverdat 'login #t *default-numtries* keyval megatest-version signature)) + +(define (cdb:logout serverdat keyval signature) + (cdb:client-call serverdat 'logout #t *default-numtries* keyval signature)) + +(define (cdb:num-clients serverdat) + (cdb:client-call serverdat 'numclients #t *default-numtries*)) + +(define (cdb:test-set-status-state serverdat test-id status state msg) + (if msg + (cdb:client-call serverdat 'state-status-msg #t *default-numtries* state status msg test-id) + (cdb:client-call serverdat 'state-status #t *default-numtries* state status test-id))) ;; run-id test-name item-path minutes cpuload diskfree tmpfree) + +(define (cdb:test-rollup-test_data-pass-fail serverdat test-id) + (cdb:client-call serverdat 'test_data-pf-rollup #t *default-numtries* test-id test-id test-id test-id)) + +(define (cdb:pass-fail-counts serverdat test-id fail-count pass-count) + (cdb:client-call serverdat 'pass-fail-counts #t *default-numtries* fail-count pass-count test-id)) + +(define (cdb:tests-register-test serverdat run-id test-name item-path) (let ((item-paths (if (equal? item-path "") (list item-path) (list item-path "")))) - (cdb:client-call zmqsocket 'register-test #t *default-numtries* run-id test-name item-path))) - -(define (cdb:flush-queue zmqsocket) - (cdb:client-call zmqsocket 'flush #f *default-numtries*)) - -(define (cdb:kill-server zmqsocket) - (cdb:client-call zmqsocket 'killserver #f *default-numtries*)) - -(define (cdb:roll-up-pass-fail-counts zmqsocket run-id test-name item-path status) - (cdb:client-call zmqsocket 'immediate #f *default-numtries* open-run-close db:roll-up-pass-fail-counts #f run-id test-name item-path status)) - -(define (cdb:get-test-info zmqsocket run-id test-name item-path) - (cdb:client-call zmqsocket 'immediate #f *default-numtries* open-run-close db:get-test-info #f run-id test-name item-path)) - -(define (cdb:get-test-info-by-id zmqsocket test-id) - (cdb:client-call zmqsocket 'immediate #f *default-numtries* open-run-close db:get-test-info-by-id #f test-id)) + (cdb:client-call serverdat 'register-test #t *default-numtries* run-id test-name item-path))) + +(define (cdb:flush-queue serverdat) + (cdb:client-call serverdat 'flush #f *default-numtries*)) + +(define (cdb:kill-server serverdat) + (cdb:client-call serverdat 'killserver #f *default-numtries*)) + +(define (cdb:roll-up-pass-fail-counts serverdat run-id test-name item-path status) + (cdb:client-call serverdat 'immediate #f *default-numtries* open-run-close db:roll-up-pass-fail-counts #f run-id test-name item-path status)) + +(define (cdb:get-test-info serverdat run-id test-name item-path) + (cdb:client-call serverdat 'immediate #f *default-numtries* open-run-close db:get-test-info #f run-id test-name item-path)) + +(define (cdb:get-test-info-by-id serverdat test-id) + (cdb:client-call serverdat 'immediate #f *default-numtries* open-run-close db:get-test-info-by-id #f test-id)) ;; db should be db open proc or #f (define (cdb:remote-run proc db . params) (apply cdb:client-call *runremote* 'immediate #f *default-numtries* open-run-close proc #f params)) @@ -1276,62 +1283,62 @@ (for-each (lambda (item) (db:process-queue-item db pubsock item)) data))) -(define (db:process-queue-item db pubsock item) +(define (db:process-queue-item db item) (let* ((stmt-key (cdb:packet-get-qtype item)) (qry-sig (cdb:packet-get-query-sig item)) (return-address (cdb:packet-get-client-sig item)) (params (cdb:packet-get-params item)) (query (let ((q (alist-ref stmt-key db:queries))) (if q (car q) #f)))) - (debug:print-info 11 "Special queries/requests stmt-key=" stmt-key ", return-address=" return-address ", qrery=" query ", params=" params) + (debug:print-info 11 "Special queries/requests stmt-key=" stmt-key ", return-address=" return-address ", query=" query ", params=" params) (cond (query (apply sqlite3:execute db query params) - (server:reply pubsock return-address qry-sig #t #t)) + (server:reply return-address qry-sig #t #t)) ((member stmt-key db:special-queries) (debug:print-info 11 "Handling special statement " stmt-key) (case stmt-key ((immediate) (let ((proc (car params)) (remparams (cdr params))) ;; we are being handed a procedure so call it (debug:print-info 11 "Running (apply " proc " " remparams ")") - (server:reply pubsock return-address qry-sig #t (apply proc remparams)))) + (server:reply return-address qry-sig #t (apply proc remparams)))) ((login) (if (< (length params) 3) ;; should get toppath, version and signature - '(#f "login failed due to missing params") ;; missing params + (server:reply return-address qry-sig '(#f "login failed due to missing params")) ;; missing params (let ((calling-path (car params)) (calling-vers (cadr params)) (client-key (caddr params))) (if (and (equal? calling-path *toppath*) (equal? megatest-version calling-vers)) (begin (hash-table-set! *logged-in-clients* client-key (current-seconds)) - (server:reply pubsock return-address qry-sig #t '(#t "successful login"))) ;; path matches - pass! Should vet the caller at this time ... - (list #f (conc "Login failed due to mismatch paths: " calling-path ", " *toppath*)))))) + (server:reply return-address qry-sig #t '(#t "successful login"))) ;; path matches - pass! Should vet the caller at this time ... + (server:reply return-address qry-sig #f (list #f (conc "Login failed due to mismatch paths: " calling-path ", " *toppath*))))))) ((flush sync) - (server:reply pubsock return-address qry-sig #t 1)) ;; (length data))) + (server:reply return-address qry-sig #t 1)) ;; (length data))) ((set-verbosity) (set! *verbosity* (car params)) - (server:reply pubsock return-address qry-sig #t '(#t *verbosity*))) + (server:reply return-address qry-sig #t '(#t *verbosity*))) ((killserver) (debug:print 0 "WARNING: Server going down in 15 seconds by user request!") (open-run-close tasks:server-deregister tasks:open-db - (cadr *server-info*) - pullport: (caddr *server-info*)) + (car *runremote*) + pullport: (cadr *runremote*)) (thread-start! (make-thread (lambda ()(thread-sleep! 15)(exit)))) - (server:reply pubsock return-address qry-sig #t '(#t "exit process started"))) + (server:reply return-address qry-sig #t '(#t "exit process started"))) (else ;; not a command, i.e. is a query (debug:print 0 "ERROR: Unrecognised query/command " stmt-key) (server:reply pubsock return-address qry-sig #f 'failed)))) (else (debug:print-info 11 "Executing " stmt-key " for " params) (apply sqlite3:execute (hash-table-ref queries stmt-key) params) - (server:reply pubsock return-address qry-sig #t #t))))) + (server:reply return-address qry-sig #t #t))))) (define (db:test-get-records-for-index-file db run-id test-name) (let ((res '())) (sqlite3:for-each-row (lambda (id itempath state status run_duration logf comment) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -11,11 +11,13 @@ (require-extension (srfi 18) extras tcp s11n) (use sqlite3 srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest) (import (prefix sqlite3 sqlite3:)) -(use zmq) +(use spiffy uri-common intarweb http-client spiffy-request-vars) + +(tcp-buffer-size 2048) (declare (unit server)) (declare (uses common)) (declare (uses db)) @@ -23,210 +25,151 @@ (declare (uses tasks)) ;; tasks are where stuff is maintained about what is running. (include "common_records.scm") (include "db_records.scm") -;; Transition to pub --> sub with pull <-- push -;; -;; 1. client sends request to server via push to the pull port -;; 2. server puts request in queue or processes immediately as appropriate -;; 3. server puts responses from completed requests into pub port -;; -;; TODO -;; -;; Done Tested -;; [x] [ ] 1. Add columns pullport pubport to servers table -;; [x] [ ] 2. Add rm of monitor.db if older than 11/12/2012 -;; [x] [ ] 3. Add create of pullport and pubport with finding of available ports -;; [x] [ ] 4. Add client compose of request -;; [x] [ ] - name of client: testname/itempath-test_id-hostname -;; [x] [ ] - name of request: callname, params -;; [x] [ ] - request key: f(clientname, callname, params) -;; [x] [ ] 5. Add processing of subscription hits -;; [x] [ ] - done when get key -;; [x] [ ] - return results -;; [x] [ ] 6. Add timeout processing -;; [x] [ ] - after 60 seconds -;; [ ] [ ] i. check server alive, connect to new if necessary -;; [ ] [ ] ii. resend request -;; [ ] [ ] 7. Turn self ping back on - (define (server:make-server-url hostport) (if (not hostport) #f - (conc "tcp://" (car hostport) ":" (cadr hostport)))) + (conc "http://" (car hostport) ":" (cadr hostport)))) (define *server-loop-heart-beat* (current-seconds)) (define *heartbeat-mutex* (make-mutex)) +;;====================================================================== +;; S E R V E R +;;====================================================================== + +;; Call this to start the actual server +;; -(define-inline (zmqsock:get-pub dat)(vector-ref dat 0)) -(define-inline (zmqsock:get-pull dat)(vector-ref dat 1)) -(define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0)) -(define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0)) +(define *db:process-queue-mutex* (make-mutex)) (define (server:run hostn) (debug:print 2 "Attempting to start the server ...") (if (not *toppath*) (if (not (setup-for-run)) (begin (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting") (exit)))) - (let* ((zmq-sdat1 #f) - (zmq-sdat2 #f) - (pull-socket #f) - (pub-socket #f) - (p1 #f) - (p2 #f) - (zmq-sockets-dat #f) - (iface (if (string=? "-" hostn) - "*" ;; (get-host-name) - hostn)) + (let* (;; (iface (if (string=? "-" hostn) + ;; #f ;; (get-host-name) + ;; hostn)) + (db #f) ;; (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily (hostname (get-host-name)) (ipaddrstr (let ((ipstr (if (string=? "-" hostn) (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".") #f))) - (if ipstr ipstr hostname))) - (last-run 0)) - (set! zmq-sockets-dat (server:setup-ports ipaddrstr (if (args:get-arg "-port") - (string->number (args:get-arg "-port")) - (+ 5000 (random 1001))))) - - (set! zmq-sdat1 (car zmq-sockets-dat)) - (set! pull-socket (cadr zmq-sdat1)) ;; (iface s port) - (set! p1 (caddr zmq-sdat1)) - - (set! zmq-sdat2 (cadr zmq-sockets-dat)) - (set! pub-socket (cadr zmq-sdat2)) - (set! p2 (caddr zmq-sdat2)) - - (set! *cache-on* #t) - - ;; what to do when we quit - ;; -;; (on-exit (lambda () -;; (if (and *toppath* *server-info*) -;; (open-run-close tasks:server-deregister-self tasks:open-db (car *server-info*)) -;; (let loop () -;; (let ((queue-len 0)) -;; (thread-sleep! (random 5)) -;; (mutex-lock! *incoming-mutex*) -;; (set! queue-len (length *incoming-data*)) -;; (mutex-unlock! *incoming-mutex*) -;; (if (> queue-len 0) -;; (begin -;; (debug:print-info 0 "Queue not flushed, waiting ...") -;; (loop)))))))) - - ;; The heavy lifting - ;; - ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime - ;; - (let loop ((queue-lst '())) - (let* ((rawmsg (receive-message* pull-socket)) - (packet (db:string->obj rawmsg)) - (qtype (cdb:packet-get-qtype packet))) - (debug:print-info 12 "server=> received packet=" packet) - (if (not (member qtype '(sync ping))) - (begin - (mutex-lock! *heartbeat-mutex*) - (set! *last-db-access* (current-seconds)) - (mutex-unlock! *heartbeat-mutex*))) - (if #t ;; (cdb:packet-get-immediate packet) ;; process immediately or put in queue - (begin - (open-run-close db:process-queue #f pub-socket (cons packet queue-lst)) - (loop '())) - (loop (cons packet queue-lst))))))) - -;; run server:keep-running in a parallel thread to monitor that the db is being -;; used and to shutdown after sometime if it is not. -;; -(define (server:keep-running) - ;; if none running or if > 20 seconds since - ;; server last used then start shutdown - ;; This thread waits for the server to come alive - (let* ((server-info (let loop () - (let ((sdat #f)) - (mutex-lock! *heartbeat-mutex*) - (set! sdat *server-info*) - (mutex-unlock! *heartbeat-mutex*) - (if sdat sdat - (begin - (sleep 4) - (loop)))))) - (iface (cadr server-info)) - (pullport (caddr server-info)) - (pubport (cadddr server-info)) ;; id interface pullport pubport) - (zmq-sockets (server:client-connect iface pullport pubport)) - (last-access 0)) - (let loop ((count 0)) - (thread-sleep! 4) ;; no need to do this very often - ;; NB// sync currently does NOT return queue-length - (let ((queue-len (cdb:client-call zmq-sockets 'sync #t 1))) - ;; (print "Server running, count is " count) - (if (< count 1) ;; 3x3 = 9 secs aprox - (loop (+ count 1))) - - ;; NOTE: Get rid of this mechanism! It really is not needed... - (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) - - ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access - (mutex-lock! *heartbeat-mutex*) - (set! last-access *last-db-access*) - (mutex-unlock! *heartbeat-mutex*) - (if (> (+ last-access - ;; (* 50 60 60) ;; 48 hrs - ;; 60 ;; one minute - ;; (* 60 60) ;; one hour - (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. - ) - (current-seconds)) - (begin - (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) - (loop 0)) - (begin - (debug:print-info 0 "Starting to shutdown the server.") - ;; need to delete only *my* server entry (future use) - (set! *time-to-exit* #t) - (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name)) - (thread-sleep! 1) - (debug:print-info 0 "Max cached queries was " *max-cache-size*) - (debug:print-info 0 "Server shutdown complete. Exiting") - (exit))))))) - -(define (server:find-free-port-and-open iface s port stype #!key (trynum 50)) - (let ((s (if s s (make-socket stype))) - (p (if (number? port) port 5555)) - (old-handler (current-exception-handler))) - (handle-exceptions - exn - (begin - (debug:print 0 "Failed to bind to port " p ", trying next port") - (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) - ;; (old-handler) - ;; (print-call-chain) - (if (> trynum 0) - (server:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1)) - (debug:print-info 0 "Tried ports up to " p - " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use")) - (exit)) ;; To exit or not? That is the question. - (let ((zmq-url (conc "tcp://" iface ":" p))) - (debug:print 2 "Trying to start server on " zmq-url) - (bind-socket s zmq-url) - (list iface s port))))) - -(define (server:setup-ports ipaddrstr startport) - (let* ((s1 (server:find-free-port-and-open ipaddrstr #f startport 'pull)) - (p1 (caddr s1)) - (s2 (server:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub)) - (p2 (caddr s2))) - (set! *runremote* #f) - (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2) - (mutex-lock! *heartbeat-mutex*) - (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live)) - (mutex-unlock! *heartbeat-mutex*) - (list s1 s2))) + (if ipstr ipstr hostn))) ;; hostname))) + (start-port (if (args:get-arg "-port") + (string->number (args:get-arg "-port")) + (+ 5000 (random 1001)))) + (link-tree-path (config-lookup *configdat* "setup" "linktree"))) + (set! *cache-on* #t) + (root-path (if link-tree-path + link-tree-path + (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP! + + ;; Setup the web server and a /ctrl interface + ;; + (vhost-map `(((* any) . ,(lambda (continue) + ;; open the db on the first call + (if (not db)(set! db (open-db))) + (let* (($ (request-vars source: 'both)) + (dat ($ 'dat)) + (res #f)) + (cond + ((equal? (uri-path (request-uri (current-request))) + '(/ "hey")) + (send-response body: "hey there!\n" + headers: '((content-type text/plain)))) + ;; This is the /ctrl path where data is handed to the server and + ;; responses + ((equal? (uri-path (request-uri (current-request))) + '(/ "ctrl")) + (let* ((packet (db:string->obj dat)) + (qtype (cdb:packet-get-qtype packet))) + (debug:print-info 12 "server=> received packet=" packet) + (if (not (member qtype '(sync ping))) + (begin + (mutex-lock! *heartbeat-mutex*) + (set! *last-db-access* (current-seconds)) + (mutex-unlock! *heartbeat-mutex*))) + ;; (mutex-lock! *db:process-queue-mutex*) ;; trying a mutex + ;; (set! res (open-run-close db:process-queue-item open-db packet)) + (set! res (db:process-queue-item db packet)) + ;; (mutex-unlock! *db:process-queue-mutex*) + (debug:print-info 11 "Return value from db:process-queue-item is " res) + (send-response body: (conc "ctrl data\n" + res + "") + headers: '((content-type text/plain))))) + (else (continue)))))))) + (server:try-start-server ipaddrstr start-port) + ;; lite3:finalize! db))) + )) + + + +;; (define (server:main-loop) +;; (print "INFO: Exectuing main server loop") +;; (access-log "megatest-http.log") +;; (server-bind-address #f) +;; (define-page (main-page-path) +;; (lambda () +;; (let ((dat ($ "dat"))) +;; ;; (with-request-variables (dat) +;; (debug:print-info 12 "Got dat=" dat) +;; (let* ((packet (db:string->obj dat)) +;; (qtype (cdb:packet-get-qtype packet))) +;; (debug:print-info 12 "server=> received packet=" packet) +;; (if (not (member qtype '(sync ping))) +;; (begin +;; (mutex-lock! *heartbeat-mutex*) +;; (set! *last-db-access* (current-seconds)) +;; (mutex-unlock! *heartbeat-mutex*))) +;; (let ((res (open-run-close db:process-queue-item open-db packet))) +;; (debug:print-info 11 "Return value from db:process-queue-item is " res) +;; res)))))) + +;;; (use spiffy uri-common intarweb) +;;; +;;; (root-path "/var/www") +;;; +;;; (vhost-map `(((* any) . ,(lambda (continue) +;;; (if (equal? (uri-path (request-uri (current-request))) +;;; '(/ "hey")) +;;; (send-response body: "hey there!\n" +;;; headers: '((content-type text/plain))) +;;; (continue)))))) +;;; +;;; (start-server port: 12345) + +;; This is recursively run by server:run until sucessful +;; +(define (server:try-start-server ipaddrstr portnum) + (handle-exceptions + exn + (begin + (print-error-message exn) + (if (< portnum 9000) + (begin + (print "WARNING: failed to start on portnum: " portnum ", trying next port") + (thread-sleep! 0.1) + (open-run-close tasks:remove-server-records tasks:open-db) + (server:try-start-server ipaddrstr (+ portnum 1))) + (print "ERROR: Tried and tried but could not start the server"))) + (set! *runremote* (list ipaddrstr portnum)) + (open-run-close tasks:remove-server-records tasks:open-db) + (open-run-close tasks:server-register + tasks:open-db + (current-process-id) + ipaddrstr portnum 0 'live) + (print "INFO: Trying to start server on " ipaddrstr ":" portnum) + ;; This starts the spiffy server + (start-server port: portnum) + (print "INFO: server has been stopped"))) (define (server:mk-signature) (message-digest-string (md5-primitive) (with-output-to-string (lambda () @@ -235,68 +178,87 @@ ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== +;; When using zmq this would send the message back (two step process) +;; with spiffy or rpc this simply returns the return data to be returned +;; +(define (server:reply return-addr query-sig success/fail result) + (debug:print-info 11 "server:reply return-addr=" return-addr ", result=" result) + ;; (send-message pubsock target send-more: #t) + ;; (send-message pubsock + (db:obj->string (vector success/fail query-sig result))) ;;====================================================================== -;; C L I E N T S +;; C L I E N T S ;;====================================================================== (define (server:get-client-signature) (if *my-client-signature* *my-client-signature* (let ((sig (server:mk-signature))) (set! *my-client-signature* sig) *my-client-signature*))) -;; -(define (server:client-socket-connect iface port #!key (context #f)(type 'req)(subscriptions '())) - (debug:print-info 3 "client-connect " iface ":" port ", type=" type ", subscriptions=" subscriptions) - (let ((connect-ok #f) - (zmq-socket (if context - (make-socket type context) - (make-socket type))) - (conurl (server:make-server-url (list iface port)))) - (if (socket? zmq-socket) - (begin - ;; first apply subscriptions - (for-each (lambda (subscription) - (debug:print 2 "Subscribing to " subscription) - (socket-option-set! zmq-socket 'subscribe subscription)) - subscriptions) - (connect-socket zmq-socket conurl) - zmq-socket) - (begin - (debug:print 0 "ERROR: Failed to open socket to " conurl) - #f)))) - -(define (server:client-login zmq-sockets) - (cdb:login zmq-sockets *toppath* (server:get-client-signature))) +;; +;; +;; 1 Hello, world! Goodbye Dolly +;; Send msg to serverdat and receive result +(define (server:client-send-receive serverdat msg) + (let* ((url (server:make-server-url serverdat)) + (fullurl (conc url "/ctrl")) ;; (conc url "/?dat=" msg))) + (numretries 0)) + (handle-exceptions + exn + (if (< numretries 200) + (server:client-send-receive serverdat msg)) + (begin + (debug:print-info 11 "fullurl=" fullurl "\n") + ;; set up the http-client here + (max-retry-attempts 100) + (retry-request? (lambda (request) + (thread-sleep! (/ (if (> numretries 100) 100 numretries) 10)) + (set! numretries (+ numretries 1)) + #t)) + ;; send the data and get the response + ;; extract the needed info from the http data and + ;; process and return it. + (let* ((res (with-input-from-request fullurl + ;; #f + ;; msg + (list (cons 'dat msg)) + read-string))) + (debug:print-info 11 "got res=" res) + (let ((match (string-search (regexp "(.*)<.body>") res))) + (debug:print-info 11 "match=" match) + (let ((final (cadr match))) + (debug:print-info 11 "final=" final) + final))))))) + +(define (server:client-login serverdat) + (max-retry-attempts 100) + (cdb:login serverdat *toppath* (server:get-client-signature))) ;; Not currently used! But, I think it *should* be used!!! -(define (server:client-logout zmq-socket) - (let ((ok (and (socket? zmq-socket) - (cdb:logout zmq-socket *toppath* (server:get-client-signature))))) - ;; (close-socket zmq-socket) +(define (server:client-logout serverdat) + (let ((ok (and (socket? serverdat) + (cdb:logout serverdat *toppath* (server:get-client-signature))))) + ;; (close-socket serverdat) ok)) -(define (server:client-connect iface pullport pubport) - (let* ((push-socket (server:client-socket-connect iface pullport type: 'push)) - (sub-socket (server:client-socket-connect iface pubport - type: 'sub - subscriptions: (list (server:get-client-signature) "all"))) - (zmq-sockets (vector push-socket sub-socket)) - (login-res #f)) - (set! login-res (server:client-login zmq-sockets)) +(define (server:client-connect iface port) + (let* ((login-res #f) + (serverdat (list iface port))) + (set! login-res (server:client-login serverdat)) (if (and (not (null? login-res)) (car login-res)) (begin - (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".") - (set! *runremote* zmq-sockets) - zmq-sockets) + (debug:print-info 2 "Logged in and connected to " iface ":" port) + (set! *runremote* serverdat) + serverdat) (begin - (debug:print-info 2 "Failed to login or connect to " conurl) + (debug:print-info 2 "Failed to login or connect to " iface ":" port) (set! *runremote* #f) #f)))) ;; Do all the connection work, start a server if not already running (define (server:client-setup #!key (numtries 50)) @@ -307,37 +269,26 @@ (exit)))) (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo (let ((host (list-ref hostinfo 0)) (iface (list-ref hostinfo 1)) - (pullport (list-ref hostinfo 2)) - (pubport (list-ref hostinfo 3))) + (port (list-ref hostinfo 2)) + (pid (list-ref hostinfo 3))) (debug:print-info 2 "Setting up to connect to " hostinfo) - ;; (handle-exceptions - ;; exn - ;; (begin - ;; ;; something went wrong in connecting to the server. In this scenario it is ok - ;; ;; to try again - ;; (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo) - ;; (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) - ;; (debug:print 0 " perhaps jobs killed with -9? Removing server records") - ;; (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport) - ;; (server:client-setup (- numtries 1)) - ;; #f) - (server:client-connect iface pullport pubport)) ;; ) + (server:client-connect iface port)) ;; ) (if (> numtries 0) (let ((exe (car (argv))) (pid #f)) (debug:print-info 0 "No server available, attempting to start one...") - ;; (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) - ;; (string-intersperse *verbosity* ",") - ;; (conc *verbosity*))))) - (set! pid (process-fork (lambda () - ;; (current-input-port (open-input-file "/dev/null")) - ;; (current-output-port (open-output-file "/dev/null")) - ;; (current-error-port (open-output-file "/dev/null")) - (server:launch)))) ;; should never get here .... + (set! pid (process-run exe (list "-server" "-" "-debug" (if (list? *verbosity*) + (string-intersperse *verbosity* ",") + (conc *verbosity*))))) + ;; (set! pid (process-fork (lambda () + ;; (current-input-port (open-input-file "/dev/null")) + ;; (current-output-port (open-output-file "/dev/null")) + ;; (current-error-port (open-output-file "/dev/null")) + ;; (server:launch)))) (let loop ((count 0)) (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if (not hostinfo) (begin (debug:print-info 0 "Waiting for server pid=" pid " to start") @@ -346,10 +297,67 @@ (loop (+ count 1))))))) ;; we are starting a server, do not try again! That can lead to ;; recursively starting many processes!!! (server:client-setup numtries: 0)) (debug:print-info 1 "Too many attempts, giving up"))))) + +;; run server:keep-running in a parallel thread to monitor that the db is being +;; used and to shutdown after sometime if it is not. +;; +(define (server:keep-running) + ;; if none running or if > 20 seconds since + ;; server last used then start shutdown + ;; This thread waits for the server to come alive + (let* ((server-info (let loop () + (let ((sdat #f)) + (mutex-lock! *heartbeat-mutex*) + (set! sdat *runremote*) + (mutex-unlock! *heartbeat-mutex*) + (if sdat sdat + (begin + (sleep 4) + (loop)))))) + (iface (car server-info)) + (port (cadr server-info)) + (last-access 0) + (tdb (tasks:open-db)) + (spid (tasks:server-get-server-id tdb #f iface port #f))) + (print "Keep-running got server pid " spid ", using iface " iface " and port " port) + (let loop ((count 0)) + (thread-sleep! 4) ;; no need to do this very often + ;; NB// sync currently does NOT return queue-length + (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1))) + ;; (print "Server running, count is " count) + (if (< count 1) ;; 3x3 = 9 secs aprox + (loop (+ count 1))) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (tasks:server-update-heartbeat tdb spid) + + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access + (mutex-lock! *heartbeat-mutex*) + (set! last-access *last-db-access*) + (mutex-unlock! *heartbeat-mutex*) + (if (> (+ last-access + ;; (* 50 60 60) ;; 48 hrs + ;; 60 ;; one minute + ;; (* 60 60) ;; one hour + (* 45 60) ;; 45 minutes, until the db deletion bug is fixed. + ) + (current-seconds)) + (begin + (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access)) + (loop 0)) + (begin + (debug:print-info 0 "Starting to shutdown the server.") + ;; need to delete only *my* server entry (future use) + (set! *time-to-exit* #t) + (tasks:server-deregister-self tdb (get-host-name)) + (thread-sleep! 1) + (debug:print-info 0 "Max cached queries was " *max-cache-size*) + (debug:print-info 0 "Server shutdown complete. Exiting") + (exit))))))) ;; all routes though here end in exit ... (define (server:launch) (if (not *toppath*) (if (not (setup-for-run)) @@ -356,40 +364,24 @@ (begin (debug:print 0 "ERROR: cannot find megatest.config, exiting") (exit)))) (debug:print-info 2 "Starting the standalone server") (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) + (debug:print 11 "server:launch hostinfo=" hostinfo) (if hostinfo (debug:print-info 2 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) (if *toppath* - (let* (;; (th1 (make-thread (lambda () - ;; (let ((server-info #f)) - ;; ;; wait for the server to be online and available - ;; (let loop () - ;; (debug:print-info 2 "Waiting for the server to come online before starting heartbeat") - ;; (thread-sleep! 2) - ;; (mutex-lock! *heartbeat-mutex*) - ;; (set! server-info *server-info* ) - ;; (mutex-unlock! *heartbeat-mutex*) - ;; (if (not server-info)(loop))) - ;; (debug:print 2 "Server alive, starting self-ping") - ;; (server:self-ping server-info) - ;; )) - ;; "Self ping")) - (th2 (make-thread (lambda () + (let* ((th2 (make-thread (lambda () (server:run (if (args:get-arg "-server") (args:get-arg "-server") "-"))) "Server run")) (th3 (make-thread (lambda ()(server:keep-running)) "Keep running")) ) - (set! *client-non-blocking-mode* #t) - ;; (thread-start! th1) (thread-start! th2) (thread-start! th3) (set! *didsomething* #t) - ;; (thread-join! th3) (thread-join! th2) ) (debug:print 0 "ERROR: Failed to setup for megatest"))) (exit))) @@ -396,16 +388,15 @@ (define (server:client-signal-handler signum) (handle-exceptions exn (debug:print " ... exiting ...") (let ((th1 (make-thread (lambda () - (if (not *received-response*) - (receive-message* *runremote*))) ;; flush out last call if applicable + "") ;; do nothing for now (was flush out last call if applicable) "eat response")) (th2 (make-thread (lambda () (debug:print 0 "ERROR: Received ^C, attempting clean exit. Please be patient and wait a few seconds before hitting ^C again.") - (thread-sleep! 3) ;; give the flush three seconds to do it's stuff + (thread-sleep! 1) ;; give the flush one second to do it's stuff (debug:print 0 " Done.") (exit 4)) "exit on ^C timer"))) (thread-start! th2) (thread-start! th1) @@ -417,70 +408,5 @@ (debug:print-info 2 "connected as client") (begin (debug:print 0 "ERROR: Failed to connect as client") (exit)))) -;;====================================================================== -;; Defunct functions -;;====================================================================== - -;; ping a server and return number of clients or #f (if no response) -;; NOT IN USE! -(define (server:ping host port #!key (secs 10)(return-socket #f)) - (cdb:use-non-blocking-mode - (lambda () - (let* ((res #f) - (th1 (make-thread - (lambda () - (let* ((zmq-context (make-context 1)) - (zmq-socket (server:client-connect host port context: zmq-context))) - (if zmq-socket - (if (server:client-login zmq-socket) - (let ((numclients (cdb:num-clients zmq-socket))) - (if (not return-socket) - (begin - (server:client-logout zmq-socket) - (close-socket zmq-socket))) - (set! res (list #t numclients (if return-socket zmq-socket #f)))) - (begin - ;; (close-socket zmq-socket) - (set! res (list #f "CAN'T LOGIN" #f)))) - (set! res (list #f "CAN'T CONNECT" #f))))) - "Ping: th1")) - (th2 (make-thread - (lambda () - (let loop ((count 1)) - (debug:print-info 1 "Ping " count " server on " host " at port " port) - (thread-sleep! 2) - (if (< count (/ secs 2)) - (loop (+ count 1)))) - ;; (thread-terminate! th1) - (set! res (list #f "TIMED OUT" #f))) - "Ping: th2"))) - (thread-start! th2) - (thread-start! th1) - (handle-exceptions - exn - (set! res (list #f "TIMED OUT" #f)) - (thread-join! th1 secs)) - res)))) - -;; (define (server:self-ping server-info) -;; ;; server-info: server-id interface pullport pubport -;; (let ((iface (list-ref server-info 1)) -;; (pullport (list-ref server-info 2)) -;; (pubport (list-ref server-info 3))) -;; (server:client-connect iface pullport pubport) -;; (let loop () -;; (thread-sleep! 2) -;; (cdb:client-call *runremote* 'ping #t) -;; (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!") -;; (mutex-lock! *heartbeat-mutex*) -;; (set! *server-loop-heart-beat* (current-seconds)) -;; (mutex-unlock! *heartbeat-mutex*) -;; (loop)))) - -(define (server:reply pubsock target query-sig success/fail result) - (debug:print-info 11 "server:reply target=" target ", result=" result) - (send-message pubsock target send-more: #t) - (send-message pubsock (db:obj->string (vector success/fail query-sig result)))) - Index: tasks.scm ================================================================== --- tasks.scm +++ tasks.scm @@ -60,18 +60,17 @@ CONSTRAINT monitors_constraint UNIQUE (pid,hostname));") (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS servers (id INTEGER PRIMARY KEY, pid INTEGER, interface TEXT, hostname TEXT, - pullport INTEGER, - pubport INTEGER, + port INTEGER, start_time TIMESTAMP, priority INTEGER, state TEXT, mt_version TEXT, heartbeat TIMESTAMP, - CONSTRAINT servers_constraint UNIQUE (pid,hostname,pullport,pubport));") + CONSTRAINT servers_constraint UNIQUE (pid,hostname,port));") (sqlite3:execute mdb "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY, server_id INTEGER, pid INTEGER, hostname TEXT, cmdline TEXT, @@ -85,58 +84,64 @@ ;;====================================================================== ;; Server and client management ;;====================================================================== ;; state: 'live, 'shutting-down, 'dead -(define (tasks:server-register mdb pid interface pullport pubport priority state) +(define (tasks:server-register mdb pid interface port priority state) + (debug:print-info 11 "tasks:server-register " pid " " interface " " port " " priority " " state) (sqlite3:execute mdb - "INSERT OR REPLACE INTO servers (pid,hostname,pullport,pubport,start_time,priority,state,mt_version,heartbeat,interface) - VALUES(?, ?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?);" - pid (get-host-name) pullport pubport priority (conc state) megatest-version interface) + "INSERT OR REPLACE INTO servers (pid,hostname,port,start_time,priority,state,mt_version,heartbeat,interface) + VALUES(?, ?, ?, strftime('%s','now'), ?, ?, ?, strftime('%s','now'),?);" + pid (get-host-name) port priority (conc state) megatest-version interface) (list - (tasks:server-get-server-id mdb (get-host-name) pullport pid) + (tasks:server-get-server-id mdb (get-host-name) interface port pid) interface - pullport - pubport)) + port + )) ;; NB// two servers with same pid on different hosts will be removed from the list if pid: is used! -(define (tasks:server-deregister mdb hostname #!key (pullport #f)(pid #f)(action 'markdead)) - (debug:print-info 11 "server-deregister " hostname ", pullport " pullport ", pid " pid) +(define (tasks:server-deregister mdb hostname #!key (port #f)(pid #f)(action 'markdead)) + (debug:print-info 11 "server-deregister " hostname ", port " port ", pid " pid) (if pid (case action ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE pid=?;" pid)) (else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE pid=?;" pid))) - (if pullport + (if port (case action - ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND pullport=?;" hostname port)) - (else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND pullport=?;" hostname pullport))) + ((delete)(sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND port=?;" hostname port)) + (else (sqlite3:execute mdb "UPDATE servers SET state='dead' WHERE hostname=? AND port=?;" hostname port))) (debug:print 0 "ERROR: tasks:server-deregister called with neither pid nor port specified")))) (define (tasks:server-deregister-self mdb hostname) (tasks:server-deregister mdb hostname pid: (current-process-id))) -(define (tasks:server-get-server-id mdb hostname pullport pid) +(define (tasks:server-get-server-id mdb hostname iface port pid) (let ((res #f)) (sqlite3:for-each-row (lambda (id) (set! res id)) mdb - (if (and hostname pid) - "SELECT id FROM servers WHERE hostname=? AND pid=?;" - "SELECT id FROM servers WHERE hostname=? AND pullport=?;") - hostname (if pid pid pullport)) + (cond + ((and hostname pid) "SELECT id FROM servers WHERE hostname=? AND pid=?;") + ((and iface port) "SELECT id FROM servers WHERE interface=? AND port=?;") + ((and hostname port) "SELECT id FROM servers WHERE hostname=? AND port=?;") + (else + (begin + (debug:print 0 "ERROR: tasks:server-get-server-id needs (hostname and pid) OR (iface and port) OR (hostname and port)") + "SELECT id FROM servers WHERE pid=-999;"))) + (if hostname hostname iface)(if pid pid port)) res)) (define (tasks:server-update-heartbeat mdb server-id) (sqlite3:execute mdb "UPDATE servers SET heartbeat=strftime('%s','now') WHERE id=?;" server-id)) ;; alive servers keep the heartbeat field upto date with seconds every 6 or so seconds -(define (tasks:server-alive? mdb server-id #!key (hostname #f)(pullport #f)(pid #f)) +(define (tasks:server-alive? mdb server-id #!key (iface #f)(hostname #f)(port #f)(pid #f)) (let* ((server-id (if server-id server-id - (tasks:server-get-server-id mdb hostname pullport pid))) + (tasks:server-get-server-id mdb hostname iface port pid))) (heartbeat-delta 99e9)) (sqlite3:for-each-row (lambda (delta) (set! heartbeat-delta delta)) mdb "SELECT strftime('%s','now')-heartbeat FROM servers WHERE id=?;" server-id) @@ -144,11 +149,11 @@ (define (tasks:client-register mdb pid hostname cmdline) (sqlite3:execute mdb "INSERT OR REPLACE INTO clients (server_id,pid,hostname,cmdline,login_time) VALUES(?,?,?,?,strftime('%s','now'));") - (tasks:server-get-server-id mdb hostname #f pid) + (tasks:server-get-server-id mdb hostname #f #f pid) pid hostname cmdline) (define (tasks:client-logout mdb pid hostname cmdline) (sqlite3:execute mdb @@ -171,47 +176,53 @@ ;; remove any others. will not necessarily remove all! (define (tasks:get-best-server mdb) (let ((res '()) (best #f)) (sqlite3:for-each-row - (lambda (id hostname interface pullport pubport pid) - (set! res (cons (list hostname interface pullport pubport pid) res)) - (debug:print-info 2 "Found existing server " hostname ":" pullport " registered in db")) + (lambda (id hostname interface port pid) + (set! res (cons (list hostname interface port pid id) res)) + (debug:print-info 2 "Found existing server " hostname ":" port " registered in db")) mdb - "SELECT id,hostname,interface,pullport,pubport,pid FROM servers + "SELECT id,hostname,interface,port,pid FROM servers WHERE strftime('%s','now')-heartbeat < 10 AND mt_version=? ORDER BY start_time ASC LIMIT 1;" megatest-version) - (if (null? res) #f - (let loop ((hed (car res)) - (tal (cdr res))) - ;; (print "hed=" hed ", tal=" tal) - (let* ((host (list-ref hed 0)) - (iface (list-ref hed 1)) - (pullport (list-ref hed 2)) - (pubport (list-ref hed 3)) - (pid (list-ref hed 4)) - (alive (open-run-close tasks:server-alive? tasks:open-db #f hostname: host pullport: pullport))) - (if alive - (begin - (debug:print-info 2 "Found an existing, alive, server " host ", " pullport " and " pubport ".") - (list host iface pullport pubport)) - (begin - (debug:print-info 1 "Marking " host ":" pullport " as dead in server registry.") - (if pullport - (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport) - (open-run-close tasks:server-deregister tasks:open-db host pid: pid)) - (if (null? tal) - #f - (loop (car tal)(cdr tal)))))))))) - -(define (tasks:mark-server hostname pullport pid state) + ;; for now we are keeping only one server registered in the db, return #f or first server found + (if (null? res) #f (car res)))) + +;; BUG: This logic is probably needed unless methodology changes completely... +;; +;; (if (null? res) #f +;; (let loop ((hed (car res)) +;; (tal (cdr res))) +;; ;; (print "hed=" hed ", tal=" tal) +;; (let* ((host (list-ref hed 0)) +;; (iface (list-ref hed 1)) +;; (port (list-ref hed 2)) +;; (pid (list-ref hed 4)) +;; (alive (open-run-close tasks:server-alive? tasks:open-db #f hostname: host port: port))) +;; (if alive +;; (begin +;; (debug:print-info 2 "Found an existing, alive, server " host ", " port ".") +;; (list host iface port)) +;; (begin +;; (debug:print-info 1 "Marking " host ":" port " as dead in server registry.") +;; (if port +;; (open-run-close tasks:server-deregister tasks:open-db host port: port) +;; (open-run-close tasks:server-deregister tasks:open-db host pid: pid)) +;; (if (null? tal) +;; #f +;; (loop (car tal)(cdr tal)))))))))) + +(define (tasks:remove-server-records mdb) + (sqlite3:execute mdb "DELETE FROM servers;")) + +(define (tasks:mark-server hostname port pid state) (if port (open-run-close tasks:server-deregister tasks:open-db hostname port: port) (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid))) -;; NOTE: NOT PORTED TO WORK WITH pullport/pubport (define (tasks:kill-server status hostname port pid) (debug:print-info 1 "Removing defunct server record for " hostname ":" port) (if port (open-run-close tasks:server-deregister tasks:open-db hostname port: port) (open-run-close tasks:server-deregister tasks:open-db hostname pid: pid)) @@ -242,14 +253,14 @@ (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row - (lambda (id pid hostname interface pullport pubport start-time priority state mt-version last-update) - (set! res (cons (vector id pid hostname interface pullport pubport start-time priority state mt-version last-update) res))) + (lambda (id pid hostname interface port start-time priority state mt-version last-update) + (set! res (cons (vector id pid hostname interface port start-time priority state mt-version last-update) res))) mdb - "SELECT id,pid,hostname,interface,pullport,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update FROM servers ORDER BY start_time DESC;") + "SELECT id,pid,hostname,interface,port,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update FROM servers ORDER BY start_time DESC;") res)) ;;====================================================================== ;; Tasks and Task monitors ADDED testhttp/example-client.scm Index: testhttp/example-client.scm ================================================================== --- /dev/null +++ testhttp/example-client.scm @@ -0,0 +1,6 @@ +(use regex http-client) + +(print (with-input-from-request "http://localhost:8083/?foo=1" #f + (lambda () + (let ((match (string-search (regexp "(.*)<.body>") (caddr (string-split (read-string) "\n"))))) + (cadr match))))) ADDED testhttp/example-server.scm Index: testhttp/example-server.scm ================================================================== --- /dev/null +++ testhttp/example-server.scm @@ -0,0 +1,26 @@ +(use spiffy awful) + +(tcp-buffer-size 2048) +(enable-sxml #t) + +(define (hello-world) + (define-page (main-page-path) + (lambda () + (with-request-variables (foo) + foo)))) + +(define (start-server #!key (portnum 8080)) + (handle-exceptions + exn + (begin + (print-error-message exn) + (if (< portnum 9000) + (begin + (print "WARNING: failed to start on portnum: " portnum ", trying next port") + (sleep 1) + (start-server portnum: (+ portnum 1))) + (print "ERROR: Tried and tried but could not start the server"))) + (print "INFO: Trying to start server on portnum: " portnum) + (awful-start hello-world port: portnum))) + +(start-server) ADDED testhttp/mockupclient.scm Index: testhttp/mockupclient.scm ================================================================== --- /dev/null +++ testhttp/mockupclient.scm @@ -0,0 +1,35 @@ +(use posix) + +(define cname "Bob") +(define runtime 10) +(let ((args (argv))) + (if (< (length args) 3) + (begin + (print "Usage: mockupclient clientname runtime") + (exit)) + (begin + (set! cname (cadr args)) + (set! runtime (string->number (caddr args)))))) + +;; (define start-delay (/ (random 100) 9)) +;; (define runtime (+ 1 (/ (random 200) 2))) + +(print "Starting client " cname " with runtime " runtime) + +(include "mockupclientlib.scm") + +(set! endtime (+ (current-seconds) runtime)) + +(let loop () + (let ((x (random 15)) + (varname (list-ref (list "hello" "goodbye" "saluton" "kiaorana")(random 4)))) + (case x + ;; ((1)(dbaccess cname 'sync "nodat" #f)) + ((2 3 4 5)(dbaccess cname 'set varname (random 999))) + ((6 7 8 9 10)(print cname ": Get \"" varname "\" " (dbaccess cname 'get varname #f))) + (else + (thread-sleep! 0.011))) + (if (< (current-seconds) endtime) + (loop)))) + +(print "Client " cname " all done!!") ADDED testhttp/mockupclientlib.scm Index: testhttp/mockupclientlib.scm ================================================================== --- /dev/null +++ testhttp/mockupclientlib.scm @@ -0,0 +1,33 @@ +(define sub (make-socket 'sub)) +(define push (make-socket 'push)) +(socket-option-set! sub 'subscribe cname) +(connect-socket sub "tcp://localhost:5563") +(connect-socket push "tcp://localhost:5564") + +(define (dbaccess cname cmd var val #!key (numtries 1)) + (let* ((msg (conc cname ":" cmd ":" (if val (conc var " " val) var))) + (res #f) + (do-access (lambda () + (print "Sending msg: " msg) + (send-message push msg) + (print "Message " msg " sent") + (print "Client " cname " waiting for response to " msg) + (print "Client " cname " received address " (receive-message* sub)) + (set! res (receive-message* sub))))) + (let ((th1 (make-thread do-access "do access")) + (th2 (make-thread (lambda () + (thread-sleep! 5) + (if (not res) + (if (> numtries 0) + (begin + (print "WARNING: access timed out for " cname ", trying again. Trys remaining=" numtries) + (dbaccess cname cmd var val numtries: (- numtries 1))) + (begin + (print "ERROR: dbaccess timed out. Exiting") + (exit))))) + "timeout thread"))) + (thread-start! th1) + (thread-start! th2) + (thread-join! th1) + res))) + ADDED testhttp/mockupserver.scm Index: testhttp/mockupserver.scm ================================================================== --- /dev/null +++ testhttp/mockupserver.scm @@ -0,0 +1,140 @@ +;; pub/sub with envelope address +;; Note that if you don't insert a sleep, the server will crash with SIGPIPE as soon +;; as a client disconnects. Also a remaining client may receive tons of +;; messages afterward. + +(use srfi-18 sqlite3 spiffy) + +(define cname "server") +(define total-db-accesses 0) +(define start-time (current-seconds)) + +;; setup the server here +(tcp-buffer-size 2048) +(server-port 5563) + +(define (open-db) + (let* ((dbpath "mockup.db") + (dbexists (file-exists? dbpath)) + (db (open-database dbpath)) ;; (never-give-up-open-db dbpath)) + (handler (make-busy-timeout 10))) + (set-busy-handler! db handler) + (if (not dbexists) + (for-each + (lambda (stmt) + (execute db stmt)) + (list + "PRAGMA SYNCHRONOUS=0;" + "CREATE TABLE clients (id INTEGER PRIMARY KEY,name TEXT,num_accesses INTEGER DEFAULT 0);" + "CREATE TABLE vars (var TEXT,val TEXT,CONSTRAINT vars_constraint UNIQUE (var));"))) + db)) + +(define cid-cache (make-hash-table)) + +(define (get-client-id db cname) + (let ((cid (hash-table-ref/default cid-cache cname #f))) + (if cid + cid + (begin + (execute db "INSERT OR REPLACE INTO clients (name) VALUES(?);" cname) + (for-each-row + (lambda (id) + (set! cid id)) + db + "SELECT id FROM clients WHERE name=?;" cname) + (hash-table-set! cid-cache cname cid) + (set! total-db-accesses (+ total-db-accesses 2)) + cid)))) + +(define (count-client db cname) + (let ((cid (get-client-id db cname))) + (execute db "UPDATE clients SET num_accesses=num_accesses+1 WHERE id=?;" cid) + (set! total-db-accesses (+ total-db-accesses 1)) + )) + +(define db (open-db)) +;; (define queuelst '()) +;; (define mx1 (make-mutex)) + +(define max-queue-len 0) + +(define (process-queue queuelst) + (let ((queuelen (length queuelst))) + (if (> queuelen max-queue-len) + (set! max-queue-len queuelen)) + (for-each + (lambda (item) + (let ((cname (vector-ref item 1)) + (clcmd (vector-ref item 2)) + (cdata (vector-ref item 3))) + (send-message pub cname send-more: #t) + (send-message pub (case clcmd + ((sync) + (conc queuelen)) + ((set) + (set! total-db-accesses (+ total-db-accesses 1)) + (apply execute db "INSERT OR REPLACE INTO vars (var,val) VALUES (?,?);" (string-split cdata)) + "ok") + ((get) + (set! total-db-accesses (+ total-db-accesses 1)) + (let ((res "noval")) + (for-each-row + (lambda (val) + (set! res val)) + db + "SELECT val FROM vars WHERE var=?;" cdata) + res)) + (else (conc "unk cmd: " clcmd)))))) + queuelst))) + +(define th1 (make-thread + (lambda () + (let ((last-run 0)) ;; current-seconds when run last + (let loop ((queuelst '())) + (let* ((indat (receive-message* pull)) + (parts (string-split indat ":")) + (cname (car parts)) ;; client name + (clcmd (string->symbol (cadr parts))) ;; client cmd + (cdata (caddr parts)) ;; client data + (svect (vector (current-seconds) cname clcmd cdata))) ;; record for the queue + (count-client db cname) + (case clcmd + ((sync) ;; just process the queue + (print "Got sync from " cname) + (process-queue (cons svect queuelst)) + (loop '())) + ((get) + (process-queue (cons svect queuelst)) + (loop '())) + (else + (loop (cons svect queuelst)))))))) + "server thread")) + +(include "mockupclientlib.scm") + +;; ;; send a sync to the pull port +;; (define th2 (make-thread +;; (lambda () +;; (let ((last-action-time (current-seconds))) +;; (let loop () +;; (thread-sleep! 5) +;; (let ((queuelen (string->number (dbaccess "server" 'sync "nada" #f))) +;; (last-action-delta #f)) +;; (if (> queuelen 1)(set! last-action-time (current-seconds))) +;; (set! last-action-delta (- (current-seconds) last-action-time)) +;; (print "Server: Got queuelen=" queuelen ", last-action-delta=" last-action-delta) +;; (if (< last-action-delta 60) +;; (loop) +;; (print "Server exiting, 25 seconds since last access")))))) +;; "sync thread")) + +(handle-not-found + + +(thread-start! th1) +(thread-start! th2) +(thread-join! th2) + +(let* ((run-time (- (current-seconds) start-time)) + (queries/second (/ total-db-accesses run-time))) + (print "Server exited! Total db accesses=" total-db-accesses " in " run-time " seconds for " queries/second " queries/second with max queue length of: " max-queue-len)) ADDED testhttp/testclient.scm Index: testhttp/testclient.scm ================================================================== --- /dev/null +++ testhttp/testclient.scm @@ -0,0 +1,8 @@ +(use http-client) + +(with-input-from-request "http://localhost:12345/hey" + ;; #f + ;; msg + (list (cons 'dat "Testing eh")) + read-string) + ADDED testhttp/testserver.scm Index: testhttp/testserver.scm ================================================================== --- /dev/null +++ testhttp/testserver.scm @@ -0,0 +1,16 @@ +(use spiffy uri-common intarweb spiffy-request-vars) + +(root-path "/var/www") + +(vhost-map `(((* any) . ,(lambda (continue) + (let (($ (request-vars source: 'both))) + (print ($ 'dat)) + (if (equal? (uri-path (request-uri (current-request))) + '(/ "hey")) + (send-response body: "hey there!\n" + headers: '((content-type text/plain))) + (continue))))))) + +(start-server port: 12345) + + Index: tests/tests.scm ================================================================== --- tests/tests.scm +++ tests/tests.scm @@ -79,11 +79,11 @@ (test "setup for run" #t (begin (setup-for-run) (string? (getenv "MT_RUN_AREA_HOME")))) (test "server-register, get-best-server" #t (let ((res #f)) - (open-run-close tasks:server-register tasks:open-db 1 "bob" 1234 1235 100 'live) + (open-run-close tasks:server-register tasks:open-db 1 "bob" 1234 100 'live) (set! res (open-run-close tasks:get-best-server tasks:open-db)) (number? (cadddr res)))) (test "de-register server" #t (let ((res #f)) (open-run-close tasks:server-deregister tasks:open-db "bob" pullport: 1234) @@ -96,21 +96,18 @@ (number? (caddr dat))))) (test #f #t (let ((zmq-socket (server:client-connect (cadr hostinfo) (caddr hostinfo) - (cadddr hostinfo)))) + ;; (cadddr hostinfo) + ))) (set! *runremote* zmq-socket) - (socket? (vector-ref *runremote* 0)))) + (string? (car *runremote*)))) (test #f #t (let ((res (server:client-login *runremote*))) (car res))) -(test #f #t (socket? (vector-ref *runremote* 0))) - -;; (test #f #t (server:client-setup)) - (test #f #t (car (cdb:login *runremote* *toppath* *my-client-signature*))) ;;====================================================================== ;; C O N F I G F I L E S ;;======================================================================