Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -1109,27 +1109,28 @@ ;; params = 'target cached remparams ;; ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime ;; (define (cdb:client-call zmq-sockets qtype immediate numretries . params) - (debug:print-info 11 "cdb:client-call zmq-sockets=" zmq-sockets " params=" params) + (debug:print-info 11 "cdb:client-call zmq-sockets=" zmq-sockets ", qtype=" qtype ", immediate=" immediate ", numretries=" numretries ", params=" params) (let* ((push-socket (vector-ref zmq-sockets 0)) (sub-socket (vector-ref zmq-sockets 1)) - (client-sig (server:get-client-signature)) - (query-sig (message-digest-string (md5-primitive) (conc qtype immediate params))) - (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params)))) + (client-sig (server:get-client-signature)) + (query-sig (message-digest-string (md5-primitive) (conc qtype immediate params))) + (zdat (db:obj->string (vector client-sig qtype immediate query-sig params (current-seconds)))) ;; (with-output-to-string (lambda ()(serialize params)))) (res #f) (send-receive (lambda () + (debug:print-info 11 "sending message") (send-message push-socket zdat) - (db:string->obj - (let ((rmsg (if *client-non-blocking-mode* receive-message* receive-message))) - ;; get the sender info - ;; this should match (server:get-client-signature) - ;; we will need to process "all" messages here some day - (rmsg sub-socket) - ;; now get the actual message - (rmsg sub-socket))))) + (debug:print-info 11 "message sent") + (let ((rmsg receive-message*)) ;; (if *client-non-blocking-mode* receive-message* receive-message))) + ;; get the sender info + ;; this should match (server:get-client-signature) + ;; we will need to process "all" messages here some day + (rmsg sub-socket) + ;; now get the actual message + (set! res (db:string->obj (rmsg sub-socket)))))) (timeout (lambda () (thread-sleep! 5) (if (not res) (if (> numretries 0) (begin @@ -1136,15 +1137,17 @@ (debug:print 0 "WARNING: no reply to query " params ", trying again") (apply cdb:client-call zmq-sockets qtype immediate (- numretries 1) params)) (begin (debug:print 0 "ERROR: cdb:client-call timed out " params ", exiting.") (exit 5))))))) + (debug:print-info 11 "Starting threads") (let ((th1 (make-thread send-receive "send receive")) (th2 (make-thread timeout "timeout"))) (thread-start! th1) (thread-start! th2) - (thread-join! th1) + (thread-join! th1) + (debug:print-info 11 "cdb:client-call returning res=" res) res))) (define (cdb:set-verbosity zmq-socket val) (cdb:client-call zmq-socket 'set-verbosity #f *default-numtries* val)) @@ -1228,11 +1231,12 @@ '(tests:test-set-toplog "UPDATE tests SET final_logf=? WHERE run_id=? AND testname=? AND item_path='';") )) ;; do not run these as part of the transaction (define db:special-queries '(rollup-tests-pass-fail - db:roll-up-pass-fail-counts)) + db:roll-up-pass-fail-counts + login)) ;; not used, intended to indicate to run in calling process (define db:run-local-queries '()) ;; rollup-tests-pass-fail)) ;; The queue is a list of vectors where the zeroth slot indicates the type of query to @@ -1248,12 +1252,13 @@ (if (> (length data) 0) (debug:print-info 4 "Writing cached data " data)) ;; prepare the needed statements, do each only once (for-each (lambda (request-item) - (let ((stmt-key (cdb:get-qtype request-item))) - (if (not (hash-table-ref/default queries stmt-key #f)) + (let ((stmt-key (cdb:packet-get-qtype request-item))) + (if (and (not (hash-table-ref/default queries stmt-key #f)) + (not (member stmt-key db:special-queries))) (let ((stmt (alist-ref stmt-key db:queries))) (if stmt (hash-table-set! queries stmt-key (sqlite3:prepare db (car stmt))) (if (procedure? stmt-key) (hash-table-set! queries stmt-key #f) @@ -1265,24 +1270,38 @@ (let outerloop ((special-qry #f) (stmts data)) (if special-qry ;; handle a query that cannot be part of the grouped queries - (let* ((stmt-key (cdb:get-qtype special-qry)) - (return-address (cdb:get-client-sig special-qry)) - (qry (hash-table-ref queries stmt-key)) - (params (cdb:get-params special-qry))) - (if (string? qry) - (begin - (apply sqlite3:execute db qry params) - (server:reply return-address #t)) - (if (procedure? stmt-key) - (begin - ;; we are being handed a procedure so call it - (debug:print-info 11 "Running (apply " stmt-key " " db " " params ")") - (server:reply return-address (apply stmt-key db params))) - (debug:print 0 "ERROR: Unrecognised queued call " qry " " params))) + (let* ((stmt-key (cdb:packet-get-qtype special-qry)) + (return-address (cdb:packet-get-client-sig special-qry)) + (qry (hash-table-ref/default queries stmt-key #f)) + (params (cdb:packet-get-params special-qry))) + (cond + ((string? qry) + (apply sqlite3:execute db qry params) + (server:reply pubsock return-address #t)) + ((procedure? stmt-key) + ;; we are being handed a procedure so call it + (debug:print-info 11 "Running (apply " stmt-key " " db " " params ")") + (server:reply pubsock return-address (apply stmt-key db params))) + (else + (case stmt-key + ((login) + (if (< (length params) 3) ;; should get toppath, version and signature + '(#f "login failed due to missing params") ;; missing params + (let ((calling-path (car params)) + (calling-vers (cadr params)) + (client-key (caddr params))) + (if (and (equal? calling-path *toppath*) + (equal? megatest-version calling-vers)) + (begin + (hash-table-set! *logged-in-clients* client-key (current-seconds)) + (server:reply pubsock return-address '(#t "successful login"))) ;; path matches - pass! Should vet the caller at this time ... + (list #f (conc "Login failed due to mismatch paths: " calling-path ", " *toppath*)))))) + (else + (debug:print 0 "ERROR: Unrecognised queued call " qry " " params))))) (if (not (null? stmts)) (outerloop #f stmts))) ;; handle normal queries (let ((rem (sqlite3:with-transaction @@ -1302,11 +1321,11 @@ (debug:print-info 11 "Handling special statement " stmt-key) (cons hed tal)) (begin (debug:print-info 11 "Executing " stmt-key " for " params) (apply sqlite3:execute (hash-table-ref queries stmt-key) params) - (server:reply return-address #t) + (server:reply pubsock return-address #t) (if (not (null? tal)) (innerloop (car tal)(cdr tal)) '())) )))))))) (if (not (null? rem)) Index: server.scm ================================================================== --- server.scm +++ server.scm @@ -136,23 +136,24 @@ ;; The heavy lifting ;; ;; make-vector-record cdb packet client-sig qtype immediate query-sig params qtime ;; (let loop ((queue-lst '())) - ;; (print "GOT HERE EH?") + (print "GOT HERE EH?") (let* ((rawmsg (receive-message* pull-socket)) (packet (db:string->obj rawmsg))) (debug:print-info 12 "server=> received packet=" packet) (if (cdb:packet-get-immediate packet) ;; process immediately or put in queue (begin - (db:process-queue pubsock (cons packet queue)) + (db:process-queue pub-socket (cons packet queue-lst)) (loop '())) - (loop (cons packet queue))))))) + (loop (cons packet queue-lst))))))) (define (server:reply pubsock target result) + (debug:print-info 11 "server:reply target=" target ", result=" result) (send-message pubsock target send-more: #t) - (send-message pubsock result)) + (send-message pubsock (db:obj->string result))) ;; run server:keep-running in a parallel thread to monitor that the db is being ;; used and to shutdown after sometime if it is not. ;; (define (server:keep-running) @@ -169,15 +170,15 @@ (sleep 4) (loop)))))) (iface (cadr server-info)) (pullport (caddr server-info)) (pubport (cadddr server-info)) ;; id interface pullport pubport) - ;; (zmq-sockets (server:client-connect iface pullport pubport))) + ;; (zmq-sockets (server:client-connect iface pullport pubport)) ) (let loop ((count 0)) (thread-sleep! 4) ;; no need to do this very often - ;; (let ((queue-len (string->number (cdb:client-call zmq-sockets 'sync #t 1)))) + ;; (let ((queue-len (string->number (cdb:client-call zmq-sockets 'sync #t 1)))) ;; (print "Server running, count is " count) (if (< count 1) ;; 3x3 = 9 secs aprox (loop (+ count 1))) ;; NOTE: Get rid of this mechanism! It really is not needed... @@ -223,13 +224,13 @@ (debug:print 0 "Trying to start server on " zmq-url) (bind-socket s zmq-url) (list iface s port))))) (define (server:setup-ports ipaddrstr startport) - (let* ((s1 (server:find-free-port-and-open ipaddrstr #f startport 'pub)) + (let* ((s1 (server:find-free-port-and-open ipaddrstr #f startport 'pull)) (p1 (caddr s1)) - (s2 (server:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pull)) + (s2 (server:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub)) (p2 (caddr s2))) (set! *runremote* #f) (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2) (mutex-lock! *heartbeat-mutex*) (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live))