Megatest

Check-in [a095ada3d1]
Login
Overview
Comment:Added queuefeeder
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | v1.60
Files: files | file ages | folders
SHA1: a095ada3d17b330c5775a2dd023946427b7de538
User & Date: matt on 2015-06-18 23:12:18
Other Links: branch diff | manifest | tags
Context
2015-06-19
21:17
Missing changes check-in: 0ee351862a user: matt tags: v1.60
2015-06-18
23:12
Added queuefeeder check-in: a095ada3d1 user: matt tags: v1.60
18:00
Tweaks to loadwatch check-in: d0797d3ec2 user: mrwellan tags: v1.60
Changes

Added loadwatch/bjob-count.sh version [0c8ad639ee].







>
>
>
1
2
3
#!/bin/bash

bqueues | grep normal |awk '{print $8}'

Added loadwatch/launch-many.scm version [72e97c4511].















>
>
>
>
>
>
>
1
2
3
4
5
6
7
(let loop ((count 0))
  (if (> count 500000)
      (print "DONE")
      (let ((cmd (conc "./queuefeeder xena:22022 bsub ./testopenlava.sh " count " " (random 30))))
	(print "Running: " cmd)
	(system cmd)
	(loop (+ count 1)))))

Added loadwatch/queuefeeder-server.scm version [befbabbb2c].













































































































































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
;;======================================================================
;; Copyright 2015-2015, Matthew Welland.
;; 
;;  This program is made available under the GNU GPL version 2.0 or
;;  greater. See the accompanying file COPYING for details.
;; 
;;  This program is distributed WITHOUT ANY WARRANTY; without even the
;;  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;;  PURPOSE.
;;======================================================================

;; Queue Feeder. Use a crude droop curve to limit feeding jobs into a queue
;;               to prevent slamming the queue

;;======================================================================
;; Methodology
;;
;;   Connect to the server, the server delays the appropriate time (if 
;;   any) and then launch the task.
;;

(use nanomsg posix regex)

;; (use trace)
;; (trace nn-bind nn-socket nn-assert nn-recv nn-send thread-terminate! nn-close )

(define port  22022)

;; get needed stuff from commandline
;;
(define cmd '()) ;; cmd is run to give a count of the queue length => returns number in queue

(let ((args (argv)))
  (if (> (length args) 2)
      (begin
	(set! port (cadr args))
	(set! cmd  (caddr args))) ;; no params supported
      (begin
	(print "Usage: queuefeeder-server port command")
	(print "       where the command gives an integer on stdout indicating the queue load")
	(exit))))

(print "Running queue feeder with port=" port ", command=" cmd)

(define rep   (nn-socket 'rep))

(print "connecting, got: " (nn-bind    rep  (conc "tcp://" "*" ":" port)))

(define *current-delay* 0)

(define (server soc)
  (print "server starting")
  (let loop ((msg-in (nn-recv soc))
	     (count  0))
    (if (eq? 0 (modulo count 1000))
	(print "server received: " msg-in ", count=" count))
    (cond
     ((equal? msg-in "quit")
      (nn-send soc "Ok, quitting"))
     ((and (>= (string-length msg-in) 4)
	   (equal? (substring msg-in 0 4) "ping"))
      (nn-send soc (conc (current-process-id)))
      (loop (nn-recv soc)(+ count 1)))
     (else
      (mutex-lock! *current-delay-mutex*)
      (let ((current-delay *current-delay*))
	(mutex-unlock! *current-delay-mutex*)
	(thread-sleep! current-delay)
	(nn-send soc (conc "hello " msg-in " you waited " current-delay " seconds"))
	(loop (nn-recv soc)(if (> count 20000000)
			       0
			       (+ count 1))))))))

(define (ping-self host port #!key (return-socket #t))
  ;; send a random number along with pid and check that we get it back
  (let* ((req     (nn-socket 'req))
	 (key     "ping")
	 (success #f)
	 (keepwaiting #t)
	 (ping    (make-thread
		   (lambda ()
		     (print "ping: sending string \"" key "\", expecting " (current-process-id))
		     (nn-send req key)
		     (let ((result  (nn-recv req)))
		       (if (equal? (conc (current-process-id)) result)
			   (begin
			     (print "ping, success: received \"" result "\"")
			     (set! success #t))
			   (begin
			     (print "ping, failed: received key \"" result "\"")
			     (set! keepwaiting #f)
			     (set! success #f)))))
		   "ping"))
	 (timeout (make-thread (lambda ()
				 (let loop ((count 0))
				   (thread-sleep! 1)
				   (print "still waiting after count seconds...")
				   (if (and keepwaiting (< count 10))
				       (loop (+ count 1))))
				 (if keepwaiting
				     (begin
				       (print "timeout waiting for ping")
				       (thread-terminate! ping))))
			       "timeout")))
    (nn-connect req (conc "tcp://" host ":" port))
    (handle-exceptions
     exn
     (begin
       (print-call-chain)
       (print 0 " message: " ((condition-property-accessor 'exn 'message) exn))
       (print "exn=" (condition->list exn))
       (print "ping failed to connect to " host ":" port))
     (thread-start! timeout)
     (thread-start! ping)
     (thread-join! ping)
     (if success (thread-terminate! timeout)))
    (if return-socket
	(if success req #f)
	(begin
	  (nn-close req)
	  success))))

(define *current-delay-mutex* (make-mutex))

;; update the *current-delay* value every minute or QUEUE_CHK_DELAY seconds
(thread-start! (make-thread (lambda ()
			      (let ((delay-time (string->number (or (get-environment-variable "QUEUE_CHK_DELAY") "60"))))
				(let loop ()
				  (with-input-from-pipe 
				   cmd
				   (lambda ()
				     (let* ((val       (read))
					    (droop-val (if (number? val)(/ val 50) #f)))
				       ;; val is number of jobs in queue. Use a linear droop of val/50
				       (mutex-lock! *current-delay-mutex*)
				       (set! *current-delay* (/ (or droop-val 100) 50))
				       (mutex-unlock! *current-delay-mutex*)
				       (print "droop-val=" droop-val)
				       (thread-sleep! delay-time))))
				  (loop))))))

(let ((server-thread (make-thread (lambda ()(server rep)) "server")))
  (thread-start! server-thread)
  (if (ping-self (get-host-name) port)
      (begin
	(thread-join! server-thread)
	(nn-close rep))
      (print "ping failed")))

(exit)

Added loadwatch/queuefeeder.scm version [175b252945].







































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
;;======================================================================
;; Copyright 2015-2015, Matthew Welland.
;; 
;;  This program is made available under the GNU GPL version 2.0 or
;;  greater. See the accompanying file COPYING for details.
;; 
;;  This program is distributed WITHOUT ANY WARRANTY; without even the
;;  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
;;  PURPOSE.
;;======================================================================

;; Queue Feeder. Use a crude droop curve to limit feeding jobs into a queue
;;               to prevent slamming the queue

;;======================================================================
;; Methodology
;;
;;   Connect to the server, the server delays the appropriate time (if 
;;   any) and then launch the task.
;;
(use nanomsg posix regex)

(define req   (nn-socket 'req))

;; get needed stuff from commandline
;;
(define hostport #f)
(define cmd '())

(let ((args (argv)))
  (if (> (length args) 2)
      (begin
	(set! hostport (cadr args))
	(set! cmd      (cddr args)))
      (begin
	(print "Usage: queuefeeder host:port command params ....")
	(exit))))

(nn-connect req  (conc "tcp://" hostport)) ;; xena:22022")

(define (client-send-receive soc msg)
  (nn-send soc msg)
  (nn-recv soc))

;; (define ((talk-to-server soc))
;;   (let loop ((cnt 200000))
;;     (let ((name (list-ref '("Matt" "Tom" "Bob" "Jill" "James" "Jane")(random 6))))
;;       ;; (print "Sending " name)
;;       ;; (print
;;       (client-send-receive req name) ;; )
;;       (if (> cnt 0)(loop (- cnt 1)))))
;;   (print (client-send-receive req "quit"))
;;   (nn-close req)
;;   (exit))
;; 

(thread-start! (lambda ()
		 (thread-sleep! 20)
		 (print "Give up on waiting for the server")
		 (nn-close req)
		 (exit)))
(thread-join! (thread-start! (lambda ()
			       (print (client-send-receive req (conc (current-user-name) "@" (get-host-name)))))))

(process-execute (car cmd) (cdr cmd))


Added loadwatch/testopenlava.sh version [1f61657fdf].



















>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
#!/bin/bash

job_order=$1
job_length=$2

echo "START: $job_order" > $job_order.log
sleep $job_length
echo "END: $job_order" >> $job_order.log