Megatest

Changes On Branch 2a3c07bb4e190c5a
Login

Changes In Branch v1.64-synclaunch-threaded-q Through [2a3c07bb4e] Excluding Merge-Ins

This is equivalent to a diff from c57a166878 to 2a3c07bb4e

2018-02-09
15:17
fix bug affecting redhat check-in: b6ea4f981a user: bjbarcla tags: v1.65
00:22
wip check-in: 9d523bb50a user: bb tags: v1.64-synclaunch-threaded-q
2018-02-08
15:38
wip check-in: 2a3c07bb4e user: bb tags: v1.64-synclaunch-threaded-q
2018-02-06
06:00
Merged v1.65 to trunk check-in: e035c43096 user: matt tags: trunk
05:59
Rebuilt manual check-in: c57a166878 user: matt tags: v1.65
2018-02-05
23:33
job rate, load limit working for MTLOWESTLOAD check-in: f1ae188566 user: matt tags: v1.65

Added example/q/threaded-queue.scm version [32eb4584d7].



















































































































































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/opt/chicken/bin/csi -s

(use mailbox-threads typed-records matchable mailbox)

;;; create a threaded job queue
;;; submit job
;;;   - command line
;;;   - working dir / default pwd
;;;   - env hash    / default current env
;;;   - callback on exit 0 / default noop
;;;   - callback on nonzero exit / default noop
;; tjq == threaded job queue; a job is a unix command
(define getenv get-environment-variable)
(defstruct tjq:job
    id  ;; assigned at construction time when added to waiting q
  state ;; assigned at construction time when added to waiting q
  (pid #f) ;; assigned when moved from ready q to running q
  (exit-code #f) ;; assigned when moved from running to done
  (time-entered-waiting #f)
  (time-entered-ready   #f)
  (time-entered-running #f)
  (time-entered-done    #f)
  ;; following are key options to submit method
  (work-dir     (getenv "PWD")) ;; where to execute job
  (setenvs     '())            ;; alist of envvars to set when running job
  (cmdline     "/bin/true")    ;; job command line; if string, run in subshell, if list, exec.
  (success-cb  (lambda () #t)) ;; fires when exitcode is 0
  (fail-cb     (lambda () #t)));; fires when exitcode is not 0

 
             

(define (tjq:exception e)
  (print "Exception: "e)
  ;;(print-call-chain)
  (exit 1))

(define (tjq:job-thread job-id job this-thread)
  (print "job-thread setup for jobid "job-id)
  (make-thread
   (lambda ()
     (print "job-thread started for jobid "job-id)
     (print "job-thread finished for jobid "job-id)
     #f)))


(define (tjq:dispatcher-thread qname job-hash sync-job-cap obj)
  (letrec
   (;; options to configure behavior of dispatcher
    (timeout-seconds 0.01)
    ;; define long-running thread which receives requests and coordinates
    ;; job execution
    (this-thread
     (make-thread
      (lambda ()
        (let loop ((next-job-id 0)
                   ;; four job queues holding job mbox-type threads
                   ;; they advance from one to the next
                   ;; once in done, the thread has completed.
                   (waiting '()) ;; stay here until count(running) < sync-job-cap
                   (ready   '()) ;; launch jobs in here
                   (running '()) ;; wait for pid to complete, then move do done
                   (done    '())) ;; 
          ;;(print "loop top")
          (match (thread-receive timeout-seconds 'timeout)
            ('timeout
             ;;*** when timeout happens, examine job queues
             ;;    and move jobs thru their lifecycle
             ;;** scan for dones
             ;; count dones, subtract from running total
             ;;** count waitings
             ;; move min(sync-job-cap - running total, waiting total) to ready
             ;; foreach ready, thread & fork
             
             
             (loop next-job-id waiting ready running done))
            (('method 'ping '() return-mbox)
             (print "got ping")
             (mailbox-send! return-mbox 'pong)
             (loop next-job-id waiting ready running done))
            
            (('method 'submit args return-mbox)
             (let* ((job-id next-job-id)
                    (job (apply
                          make-tjq:job
                          id: job-id
                          time-entered-waiting: (current-seconds)
                          state: 'waiting
                          args))
                    (job-thread (tjq:job-thread job-id job this-thread)))
               (hash-table-set! job-hash job-id job)
               (thread-start! job-thread)
               (mailbox-send! return-mbox job-id)
               (loop
                (add1 next-job-id)
                (cons job-thread waiting)
                ready running done)))
                    
            (e
             (print "tjq:dispatcher-thread> no matching pattern.  dispatcher received: ")
             (pp e)
             (exit 1))
            (('method x args return-mbox)
             (mailbox-send! return-mbox (list 'missing-method))
             (loop next-job-id waiting ready running done))
            ) ;; end match
          
          qname)))))
   this-thread))
  

       
(define (tjq:new #!key (qname (gensym)) (sync-job-cap 100))
  (let* ((job-hash (make-hash-table))
         (obj-mbox (make-mailbox)))
    (letrec
        ((dispatch-thread (tjq:dispatcher-thread qname job-hash sync-job-cap obj))
         (obj
          (lambda (op . args)
            (cond
             ((eq? op 'event-loop)
              (thread-join! dispatch-thread))
             (else
              (thread-send dispatch-thread (list 'method op args obj-mbox))
              (let* ((res (mailbox-receive! obj-mbox)))
                (if (eq? res 'missing-method)
                    (begin
                      (print "missing method "op" called.")
                      (tjq:exception 'missing-method))
                    res)))))))
      (thread-start! dispatch-thread)
      obj)))
         
                            
(define (test-tjq-simple)
  (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3)))
    ;(q 'submit "ls -l")
    ;(q 'drain)

    (pp (q 'ping))
    (pp (q 'ping))
    (thread-sleep! 0.1)
    ;(q 'event-loop)
    )
  )

(define (test-submit)
  (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3)))
    (q 'submit cmdline: "sleep 2; echo job well done")
    (thread-sleep! 4)))

;(test-tjq-simple)
(test-submit)