Index: db.scm ================================================================== --- db.scm +++ db.scm @@ -1375,17 +1375,17 @@ ;; The queue is a list of vectors where the zeroth slot indicates the type of query to ;; apply and the second slot is the time of the query and the third entry is a list of ;; values to be applied ;; -(define (db:process-queue db pubsock indata) - (let* ((data (sort indata (lambda (a b) - (< (cdb:packet-get-qtime a)(cdb:packet-get-qtime b)))))) - (for-each - (lambda (item) - (db:process-queue-item db pubsock item)) - data))) +;; (define (db:process-queue db pubsock indata) +;; (let* ((data (sort indata (lambda (a b) +;; (< (cdb:packet-get-qtime a)(cdb:packet-get-qtime b)))))) +;; (for-each +;; (lambda (item) +;; (db:process-queue-item db pubsock item)) +;; data))) (define (db:process-queue-item db item) (let* ((stmt-key (cdb:packet-get-qtype item)) (qry-sig (cdb:packet-get-query-sig item)) (return-address (cdb:packet-get-client-sig item)) @@ -1393,10 +1393,12 @@ (query (let ((q (alist-ref stmt-key db:queries))) (if q (car q) #f)))) (debug:print-info 11 "Special queries/requests stmt-key=" stmt-key ", return-address=" return-address ", query=" query ", params=" params) (cond (query + ;; transactionize needed here. + (apply sqlite3:execute db query params) (server:reply return-address qry-sig #t #t)) ((member stmt-key db:special-queries) (debug:print-info 11 "Handling special statement " stmt-key) (case stmt-key Index: zmq-transport.scm ================================================================== --- zmq-transport.scm +++ zmq-transport.scm @@ -160,10 +160,11 @@ (iface (cadr server-info)) (pullport (caddr server-info)) (pubport (cadddr server-info)) ;; id interface pullport pubport) (zmq-sockets (zmq-transport:client-connect iface pullport pubport)) (last-access 0)) + (debug:print-info 11 "heartbeat started for zmq server on " iface " " pullport " " pubport) (let loop ((count 0)) (thread-sleep! 4) ;; no need to do this very often ;; NB// sync currently does NOT return queue-length (let ((queue-len (cdb:client-call zmq-sockets 'sync #t 1))) ;; (print "Server running, count is " count) @@ -224,11 +225,18 @@ (s2 (zmq-transport:find-free-port-and-open ipaddrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pub)) (p2 (caddr s2))) (set! *runremote* #f) (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and " p2) (mutex-lock! *heartbeat-mutex*) - (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) ipaddrstr p1 p2 0 'live 'zmq)) + (set! *server-info* (open-run-close tasks:server-register + tasks:open-db + (current-process-id) + ipaddrstr p1 + 0 + 'live + 'zmq + pubport: p2)) (mutex-unlock! *heartbeat-mutex*) (list s1 s2))) (define (zmq-transport:mk-signature) (message-digest-string (md5-primitive) @@ -271,10 +279,11 @@ (sub-socket (zmq-transport:client-socket-connect iface pubport type: 'sub subscriptions: (list (zmq-transport:get-client-signature) "all"))) (zmq-sockets (vector push-socket sub-socket)) (login-res #f)) + (debug:print-info 11 "zmq-transport:client-connect started. Next is login") (set! login-res (zmq-transport:client-login zmq-sockets)) (if (and (not (null? login-res)) (car login-res)) (begin (debug:print-info 2 "Logged in and connected to " iface ":" pullport "/" pubport ".")