Megatest

Check-in [4980ca9f98]
Login
Overview
Comment:Interleaving query enablement, server side coded
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | interleaved-queries
Files: files | file ages | folders
SHA1: 4980ca9f981af54ed0468ee01cb316a88ea50d49
User & Date: matt on 2012-11-12 00:04:43
Other Links: branch diff | manifest | tags
Context
2012-11-12
17:19
Backed out static only build of zmq libs in installall.sh check-in: a74be59f83 user: matt tags: interleaved-queries
00:04
Interleaving query enablement, server side coded check-in: 4980ca9f98 user: matt tags: interleaved-queries
2012-11-06
09:44
Testing ordering of loading zmq, fixes to deploy script check-in: 8c476e8627 user: mrwellan tags: trunk
Changes

Modified server.scm from [c2beddecc4] to [8e93e2dd1b].

41
42
43
44
45
46
47





48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64















65
66
67
68
69
70
71
72












73
74
75
76
77
78
79
80
81
82
83
84

85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103

104
105
106
107
108
109
110
111
112
113
114
115
116
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59










60
61
62
63
64
65
66
67
68
69
70
71
72
73
74








75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97

98
99
100
101
102
103
104
105
106
107
108
109
110
111
112





113
114
115




116
117
118
119
120
121
122







+
+
+
+
+







-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+











-
+














-
-
-
-
-
+


-
-
-
-







      (cdb:client-call zsocket 'ping #t)
      (debug:print 4 "server:self-ping - I'm alive on " iface ":" port "!")
      (mutex-lock! *heartbeat-mutex*)
      (set! *server-loop-heart-beat* (current-seconds))
      (mutex-unlock! *heartbeat-mutex*)
      (loop))))
    
(define-inline (zmqsock:get-pub  dat)(vector-ref dat 0))
(define-inline (zmqsock:get-pull dat)(vector-ref dat 1))
(define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0))
(define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0))

(define (server:run hostn)
  (debug:print 0 "Attempting to start the server ...")
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, cannot start server, exiting")
	    (exit))))
  (let* ((zmq-socket     #f)
	 (zmq-socket-dat #f)
	 (iface          (if (string=? "-" hostn)
			     "*" ;; (get-host-name) 
			     hostn))
	 (hostname       (get-host-name))
	 (ipaddrstr      (let ((ipstr (if (string=? "-" hostn)
					  (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					  #f)))
			   (if ipstr ipstr hostname)))
  (let* ((zmq-sdat1       #f)
	 (zmq-sdat2       #f)
	 (zmq-socket1     #f)
	 (zmq-socket2     #f)
	 (p1              #f)
	 (p2              #f)
	 (zmq-sockets-dat #f)
	 (iface           (if (string=? "-" hostn)
			      "*" ;; (get-host-name) 
			      hostn))
	 (hostname        (get-host-name))
	 (ipaddrstr       (let ((ipstr (if (string=? "-" hostn)
					   (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					   #f)))
			    (if ipstr ipstr hostname))))
	 (actual-port    #f))
    ;; (set! zmq-socket (server:find-free-port-and-open iface zmq-socket 5555 0))
    (set! zmq-socket-dat (server:find-free-port-and-open ipaddrstr zmq-socket (if (args:get-arg "-port")
									      (string->number (args:get-arg "-port"))
									      (+ 5000 (random 1001)))
						     0))
    (set! zmq-socket  (cadr  zmq-socket-dat))
    (set! actual-port (caddr zmq-socket-dat))
    (set! zmq-sockets-dat (server:setup-ports ipaddrstr (if (args:get-arg "-port")
							    (string->number (args:get-arg "-port"))
							    (+ 5000 (random 1001)))))

    (set! zmq-sdat1    (car   zmq-socket-dat))
    (set! zmq-socket1  (car   zmq-sdat1))
    (set! p1           (caddr zmq-sdat1))
    
    (set! zmq-sdat2    (cadr  zmq-socket-dat))
    (set! zmq-socket2  (car   zmq-sdat2))
    (set! p2           (caddr zmq-sdat2))

    (set! *cache-on* #t)

    ;; (set! th1 (make-thread (lambda ()
    ;;     		     (server:self-ping ipaddrstr actual-port))))
    ;; (thread-start! th1)
    
    ;; what to do when we quit
    ;;
    (on-exit (lambda ()
	       (if (and *toppath* *server-info*)
		   (begin
		     (open-run-close tasks:server-deregister-self tasks:open-db ipaddrstr))
		     (open-run-close tasks:server-deregister-self tasks:open-db ipaddrstr p1 p2))
		   (let loop () 
		     (let ((queue-len 0))
		       (thread-sleep! (random 5))
		       (mutex-lock! *incoming-mutex*)
		       (set! queue-len (length *incoming-data*))
		       (mutex-unlock! *incoming-mutex*)
		       (if (> queue-len 0)
			   (begin
			     (debug:print-info 0 "Queue not flushed, waiting ...")
			     (loop))))))))

    ;; The heavy lifting
    ;;
    (let loop ()
      ;; ;; Ugly yuk. 
      ;; (mutex-lock! *incoming-mutex*)
      ;; (set! *server-loop-heart-beat* (list 'waiting (current-seconds)))
      ;; (mutex-unlock! *incoming-mutex*)
      (let* ((rawmsg (receive-message* zmq-socket))
      (let* ((rawmsg (receive-message* zmq-socket1))
	     (params (db:string->obj rawmsg)) ;; (with-input-from-string rawmsg (lambda ()(deserialize))))
	     (res    #f))
	;;; Ugly yuk. 
	;; (mutex-lock! *incoming-mutex*)
	;; (set! *server-loop-heart-beat* (list 'working (current-seconds)))
	;; (mutex-unlock! *incoming-mutex*)
	(debug:print-info 12 "server=> received params=" params)
	(set! res (cdb:cached-access params))
	(debug:print-info 12 "server=> processed res=" res)
	(send-message zmq-socket (db:obj->string res))
	(if (not *time-to-exit*)
	    (loop)
	    (begin
170
171
172
173
174
175
176
177
178


179
180
181
182
183
184
185
186
187
188
189
190
191


192
193

194







195
196
197
198
199
200






201
202
203
204
205
206
207
176
177
178
179
180
181
182


183
184
185
186
187
188
189
190
191
192
193
194
195
196

197
198
199

200
201
202
203
204
205
206
207
208






209
210
211
212
213
214
215
216
217
218
219
220
221







-
-
+
+












-
+
+

-
+

+
+
+
+
+
+
+
-
-
-
-
-
-
+
+
+
+
+
+







		(set! *time-to-exit* #t)
		(open-run-close tasks:server-deregister-self tasks:open-db (get-host-name))
		(thread-sleep! 1)
		(debug:print-info 0 "Max cached queries was " *max-cache-size*)
		(debug:print-info 0 "Server shutdown complete. Exiting")
		(exit)))))))

(define (server:find-free-port-and-open iface s port #!key (trynum 50))
  (let ((s (if s s (make-socket 'rep)))
(define (server:find-free-port-and-open iface s port stype #!key (trynum 50))
  (let ((s (if s s (make-socket stype)))
	(p (if (number? port) port 5555))
 	(old-handler (current-exception-handler)))
    (handle-exceptions
     exn
     (begin
       (debug:print 0 "Failed to bind to port " p ", trying next port")
       (debug:print 0 "   EXCEPTION: " ((condition-property-accessor 'exn 'message) exn))
       ;; (old-handler)
       ;; (print-call-chain)
       (if (> trynum 0)
	   (server:find-free-port-and-open iface s (+ p 1) trynum: (- trynum 1))
	   (debug:print-info 0 "Tried ports up to " p 
			     " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use")))
			     " but all were in use. Please try a different port range by starting the server with parameter \" -port N\" where N is the starting port number to use"))
       (exit)) ;; To exit or not? That is the question.
     (let ((zmq-url (conc "tcp://" iface ":" p)))
       (print "Trying to start server on " zmq-url)
       (debug:print 0 "Trying to start server on " zmq-url)
       (bind-socket s zmq-url)
       (list iface s port)))))

(define (server:setup-ports ipadrstr startport)
  (let* ((s1 (server:find-free-port-and-open ipadrstr #f startport 'pub))
	 (p1 (caddr s1))
	 (s2 (server:find-free-port-and-open ipadrstr #f (+ 1 (if p1 p1 (+ startport 1))) 'pull))
	 (p2 (caddr s2)))
       (set! *runremote* #f)
       (debug:print 0 "Server started on " zmq-url)
       (mutex-lock! *heartbeat-mutex*)
       (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) iface p 0 'live))
       (mutex-unlock! *heartbeat-mutex*)
       (list iface s port)))))
    (set! *runremote* #f)
    (debug:print 0 "Server started on " ipaddrstr " ports " p1 " and p2")
    (mutex-lock! *heartbeat-mutex*)
    (set! *server-info* (open-run-close tasks:server-register tasks:open-db (current-process-id) iface p 0 'live))
    (mutex-unlock! *heartbeat-mutex*)
    (list s1 s2)))

(define (server:mk-signature)
  (message-digest-string (md5-primitive) 
			 (with-output-to-string
			   (lambda ()
			     (write (list (current-directory)
					  (argv)))))))