Megatest

Diff
Login

Differences From Artifact [be351e0e07]:

To Artifact [cab71edb67]:


1
2
3
4
5
6
7
8
9

10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

25
26
27
28
29
30
31
;; pub/sub with envelope address
;; Note that if you don't insert a sleep, the server will crash with SIGPIPE as soon
;; as a client disconnects.  Also a remaining client may receive tons of
;; messages afterward.

(use zmq srfi-18 sqlite3)

(define pub (make-socket 'pub))
(define pull (make-socket 'pull))


(bind-socket pub "tcp://*:5563")
(bind-socket pull "tcp://*:5564")

(define (open-db)
  (let* ((dbpath    "mockup.db")
	 (dbexists  (file-exists? dbpath))
	 (db        (open-database dbpath)) ;; (never-give-up-open-db dbpath))
	 (handler   (make-busy-timeout 10)))
    (set-busy-handler! db handler)
    (if (not dbexists)
	(for-each
	 (lambda (stmt)
	   (execute db stmt))
	 (list

	  "CREATE TABLE clients (id INTEGER PRIMARY KEY,name TEXT,num_accesses INTEGER DEFAULT 0);"
	  "CREATE TABLE vars    (var TEXT,val TEXT,CONSTRAINT vars_constraint UNIQUE (var));")))
    db))

(define cid-cache (make-hash-table))

(define (get-client-id db cname)









>















>







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
;; pub/sub with envelope address
;; Note that if you don't insert a sleep, the server will crash with SIGPIPE as soon
;; as a client disconnects.  Also a remaining client may receive tons of
;; messages afterward.

(use zmq srfi-18 sqlite3)

(define pub (make-socket 'pub))
(define pull (make-socket 'pull))
(define cname "server")

(bind-socket pub "tcp://*:5563")
(bind-socket pull "tcp://*:5564")

(define (open-db)
  (let* ((dbpath    "mockup.db")
	 (dbexists  (file-exists? dbpath))
	 (db        (open-database dbpath)) ;; (never-give-up-open-db dbpath))
	 (handler   (make-busy-timeout 10)))
    (set-busy-handler! db handler)
    (if (not dbexists)
	(for-each
	 (lambda (stmt)
	   (execute db stmt))
	 (list
	  "PRAGMA SYNCHRONOUS=0;"
	  "CREATE TABLE clients (id INTEGER PRIMARY KEY,name TEXT,num_accesses INTEGER DEFAULT 0);"
	  "CREATE TABLE vars    (var TEXT,val TEXT,CONSTRAINT vars_constraint UNIQUE (var));")))
    db))

(define cid-cache (make-hash-table))

(define (get-client-id db cname)
54
55
56
57
58
59
60


61
62
63
64
65
66
67
  (for-each
   (lambda (item)
     (let ((cname (vector-ref item 1))
	   (clcmd (vector-ref item 2))
	   (cdata (vector-ref item 3)))
       (send-message pub cname send-more: #t)
       (send-message pub (case clcmd


			   ((set)
			    (apply execute db "INSERT OR REPLACE INTO vars (var,val) VALUES (?,?);" (string-split cdata))
			    "ok")
			   ((get)
			    (let ((res "noval"))
			      (for-each-row
			       (lambda (val)







>
>







56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
  (for-each
   (lambda (item)
     (let ((cname (vector-ref item 1))
	   (clcmd (vector-ref item 2))
	   (cdata (vector-ref item 3)))
       (send-message pub cname send-more: #t)
       (send-message pub (case clcmd
			   ((sync)
			    "ok")
			   ((set)
			    (apply execute db "INSERT OR REPLACE INTO vars (var,val) VALUES (?,?);" (string-split cdata))
			    "ok")
			   ((get)
			    (let ((res "noval"))
			      (for-each-row
			       (lambda (val)
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
			  (clcmd (string->symbol (cadr parts))) ;; client cmd
			  (cdata (caddr parts))                 ;; client data
			  (svect (vector (current-seconds) cname clcmd cdata))) ;; record for the queue
		     (count-client db cname)
		     (case clcmd
		       ((sync) ;; just process the queue
			(print "Got sync from " cname)
			(process-queue queuelst)
			(loop '()))
		       ((imediate)
			(process-queue (cons svect queuelst))
			(loop '()))
		       (else
			(loop (cons svect queuelst))))))))
	     "server thread"))

(define push (make-socket 'push))
(connect-socket push "tcp://localhost:5564")

;; send a sync to the pull port
(define th2 (make-thread
	     (lambda ()
	       (let loop ()
		 (thread-sleep! 5)
		 ;; (print "Sending sync from server")
		 (send-message push "server:sync:nodat")
		 (loop)))
	     "sync thread"))

(thread-start! th1)
(thread-start! th2)
(thread-join! th1)







|

|






|
<







|






86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102

103
104
105
106
107
108
109
110
111
112
113
114
115
116
			  (clcmd (string->symbol (cadr parts))) ;; client cmd
			  (cdata (caddr parts))                 ;; client data
			  (svect (vector (current-seconds) cname clcmd cdata))) ;; record for the queue
		     (count-client db cname)
		     (case clcmd
		       ((sync) ;; just process the queue
			(print "Got sync from " cname)
			(process-queue (cons svect queuelst))
			(loop '()))
		       ((get)
			(process-queue (cons svect queuelst))
			(loop '()))
		       (else
			(loop (cons svect queuelst))))))))
	     "server thread"))

(include "mockupclientlib.scm")


;; send a sync to the pull port
(define th2 (make-thread
	     (lambda ()
	       (let loop ()
		 (thread-sleep! 5)
		 ;; (print "Sending sync from server")
		 (dbaccess "server" 'sync "nada" #f)
		 (loop)))
	     "sync thread"))

(thread-start! th1)
(thread-start! th2)
(thread-join! th1)