ADDED example/q/threaded-queue.scm Index: example/q/threaded-queue.scm ================================================================== --- /dev/null +++ example/q/threaded-queue.scm @@ -0,0 +1,353 @@ +#!/opt/chicken/bin/csi -s + +(use mailbox-threads typed-records matchable mailbox posix) + +;;; create a threaded job queue +;;; submit job +;;; - command line +;;; - working dir / default pwd +;;; - env hash / default current env +;;; - callback on exit 0 / default noop +;;; - callback on nonzero exit / default noop +;; tjq == threaded job queue; a job is a unix command +(define getenv get-environment-variable) +(defstruct tjq:job + id ;; assigned at construction time when added to waiting q + state ;; assigned at construction time when added to waiting q + (pid #f) ;; assigned when moved from ready q to running q + (threadobj #f) + (normal-exit #f) + (exit-code #f) ;; assigned when moved from running to done + (time-entered-waiting #f) + (time-entered-ready #f) + (time-entered-running #f) + (time-entered-done #f) + ;; following are key options to submit method + (work-dir (getenv "PWD")) ;; where to execute job + (setenvs '()) ;; alist of envvars to set when running job + (cmdline "/bin/true") ;; job command line; if string, run in subshell, if list, exec. + (success-cb (lambda () #t)) ;; fires when exitcode is 0 + (fail-cb (lambda () #t)));; fires when exitcode is not 0 + + + + +(define (tjq:exception e) + (print "Exception: "e) + ;;(print-call-chain) + (exit 1)) + +(define (tjq:job-thread job-id job dispatch-thread timeout-seconds) + ;;(print "job-thread setup for jobid "job-id) + (letrec + ((this-thread + (make-thread + (lambda () + (tjq:job-threadobj-set! job this-thread) + ;;(print "job-thread started for jobid "job-id) + (let loop ((pid #f)) + ;;(print "job-thr("job-id")> loop top.") + (match (thread-receive timeout-seconds 'timeout) + ('timeout + ;;(print "job-thr("job-id")> timeout; pid="pid" before cond1") + (cond + ((number? pid) ;; check if still running + ;;(print "job-thr("job-id")> timeout; pid="pid" cond1 pid-is-number branch") + (let-values (((pid-or-zero normal-exit exitcode-or-signal) + (process-wait pid #t))) + + + ;; can get for large number of parallel threads (~ >= 42) + ;; Warning (#): in thread: (process-wait) waiting for child process failed - No child processes: 11322 + + ;; Call history: + + ;; threaded-queue.scm:56: ##sys#call-with-values + ;; threaded-queue.scm:57: process-wait <-- + + ;; Warning (#): in thread: (process-wait) waiting for child process failed - No child processes: 11323did job 467 + ;; did job 464 + + ;; Warning (#): in thread: (process-wait) waiting for child process failed - No child processes: 11318 + + ;; Call history: + + ;; threaded-queue.scm:56: ##sys#call-with-values + ;; threaded-queue.scm:57: process-wait <-- + + + ;;(print "job-thr("job-id")> pid-or-zero="pid-or-zero) + (cond + ((zero? pid-or-zero) + ;;(print "job-thr("job-id")> timeout; pid="pid" cond2 pid-or-zero is zero branch") + ;;(print "job-thr("job-id")> zero; loop.") + (loop pid)) + (else + ;;(print "job-thr("job-id")> timeout; pid="pid" cond2 else branch") + (tjq:job-normal-exit-set! job + (if normal-exit 'normal 'signal)) + (tjq:job-exit-code-set! job exitcode-or-signal) + + (thread-send dispatch-thread (list 'job-now-done job-id)))) + ;;(print "job-thr("job-id")> after cond2") + )) + (else + ;;(print "job-thr("job-id")> timeout; pid="pid" cond1 else branch") + ;;(print "job-thr("job-id")> no action; loop") + (thread-sleep! timeout-seconds) + (loop pid)))) + ('run + ;;(print "job-thr("job-id")> run called") + (let* ((cmdline (tjq:job-cmdline job)) + (newpid (if (list? cmdline) + (process-run (car cmdline) (cdr cmdline)) + (process-run cmdline)))) + (tjq:job-pid-set! job newpid) + (thread-send dispatch-thread + (list 'job-now-running job-id)) + (loop newpid))) + (e + (print "tjq:job-thread("job-id") illegal message received:") + (pp e) + (exit 1)))) + ;;(print "job-thread finished for jobid "job-id) + #f)))) + this-thread)) + + +(define (tjq:dispatcher-thread qname job-hash sync-job-cap obj #!key + (job-thread-timeout-seconds 0.1) + (timeout-seconds 0.1) + ) + (letrec + (;; options to configure behavior of dispatcher + (stop #f) + ;;(timeout-seconds 0.5) + ;;(job-thread-timeout-seconds 0.5) + ;; define long-running thread which receives requests and coordinates + ;; job execution + (this-thread + (make-thread + (lambda () + (let loop ((next-job-id 0) + ;; four job queues holding job mbox-type threads + ;; they advance from one to the next + ;; once in done, the thread has completed. + (waiting '()) ;; stay here until count(running) < sync-job-cap + (ready '()) ;; launch jobs in here + (running '()) ;; wait for pid to complete, then move do done + (done '())) ;; + ;;(print "loop top") + (match (thread-receive timeout-seconds 'timeout) + ;; (let ((res (thread-receive timeout-seconds 'timeout) + ;; (if (not stop) + + ;; 'done))) + ;; res) + ('timeout + ;;(print "to: "stop" ; next-job-id="next-job-id) + (if (and + (not next-job-id) ;; we're draining jobs + (null? waiting) + (null? ready) + (null? running) + ) + (begin + (print "Drained. Done.") + (set! stop #t) + #f) + ;;*** when timeout happens, examine job queues + ;; and move jobs thru their lifecycle + ;;** count waitings + ;; move min(sync-job-cap - running total, waiting total) to ready + ;; foreach ready, run it + ;;(print "disp: ready="ready) + (begin + (for-each (lambda (job-id) + (let* ((job (hash-table-ref job-hash job-id)) + (job-thread (tjq:job-threadobj job))) + (tjq:job-state-set! job 'ready) + (thread-send job-thread 'run))) + ready) + (let* ((new-running (flatten ready running)) + (avail-slots (- sync-job-cap (length new-running))) + (queueable-count (min avail-slots (length waiting)))) + (let-values (((new-ready new-waiting) + (split-at waiting queueable-count))) + (loop next-job-id new-waiting new-ready new-running done)))))) + + (('job-now-running job-id) + (let ((job (hash-table-ref job-hash job-id))) + (tjq:job-state-set! job 'running) + (loop next-job-id waiting ready running done))) + (('job-now-done job-id) + (let* ((job (hash-table-ref job-hash job-id)) + (successful + (and + (eq? 'normal (tjq:job-normal-exit job)) + (zero? (tjq:job-exit-code job)))) + (new-running (filter + (lambda (x) (not (eq? x job-id))) + running)) + (new-done (cons job-id done))) + (tjq:job-state-set! job 'done) + (loop next-job-id waiting ready new-running new-done))) + (('method 'ping '() return-mbox) + (print "got ping") + (mailbox-send! return-mbox 'pong) + (loop next-job-id waiting ready running done)) + + (('method 'submit args return-mbox) + (if (not next-job-id) + (begin + (print "refuse to submit new job -- draining jobs now.") + (mailbox-send! return-mbox #f) + (loop #f waiting ready running done)) + (let* ((job-id next-job-id) + (job (apply + make-tjq:job + id: job-id + time-entered-waiting: (current-seconds) + state: 'waiting + args)) + (job-thread (tjq:job-thread job-id job this-thread job-thread-timeout-seconds))) + (hash-table-set! job-hash job-id job) + (thread-start! job-thread) + (mailbox-send! return-mbox job-id) + (loop + (add1 next-job-id) + (cons job-id waiting) + ready running done)))) + + (('method 'kill) + ;; (for-each (job-id) + ;; (lambda (job-id) + ;; (let* ((job (hash-table-ref job-hash job-id)) + ;; (job-thread (tjq:job-threadobj job))) + ;; (thread-send job-thread 'abort)) + ) + + (('method 'drain args return-mbox) + (mailbox-send! return-mbox 'drain) + (loop #f waiting ready running done)) + + + ;; (for-each + ;; (lambda (job-id) + ;; (let* ((job (hash-table-ref job-hash job-id)) + ;; (job-thread (tjq:job-threadobj job))) + ;; (thread-join! job-thread))) + ;; '() ;; FIXME + ;; ) + + (e + (print "tjq:dispatcher-thread> no matching pattern. dispatcher received: ") + (pp e) + (exit 1)) + (('method x args return-mbox) + (mailbox-send! return-mbox (list 'missing-method)) + (loop next-job-id waiting ready running done)) + ) ;; end match + ;;(print "Done dispatch thread") + #f + ))))) + this-thread)) + + + +(define (tjq:new #!key (qname (gensym)) (sync-job-cap 100)) + (let* ((job-hash (make-hash-table)) + (obj-mbox (make-mailbox))) + (letrec + ((dispatch-thread + (tjq:dispatcher-thread + job-thread-timeout-seconds: 0.01 + timeout-seconds: 0.01 + + qname job-hash sync-job-cap obj)) + (obj + (lambda (op . args) + (cond + ((eq? op 'event-loop) + (print "got event-loop") + (thread-join! dispatch-thread) + (print "after thread-join dispatch-thread") + ) + ((eq? op 'drain) + (thread-send dispatch-thread (list 'method op args obj-mbox)) + (thread-join! dispatch-thread) + (print "Done with queue "qname) + #t) + (else + ;;(print "send method op="op) + (thread-send dispatch-thread (list 'method op args obj-mbox)) + (let* ((res (mailbox-receive! obj-mbox))) + (if (eq? res 'missing-method) + (begin + (print "missing method "op" called.") + (tjq:exception 'missing-method)) + res))))) + + ) ;; end obj binding + ); end letrec bindings + (thread-start! dispatch-thread) + obj))) + + +(define (test-tjq-simple) + (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3))) + ;(q 'submit "ls -l") + ;(q 'drain) + + (pp (q 'ping)) + (pp (q 'ping)) + (thread-sleep! 0.1) + ;(q 'event-loop) + ) + ) + +(define (test-submit) + (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3))) + (q 'submit cmdline: "sleep 2; echo job well done") + (thread-sleep! 4))) + + +(define (test-drain-simple) + (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3))) + ;(q 'submit cmdline: "sleep 2; echo job well done") + (thread-sleep! 1) + (q 'drain))) + + +(define (test-submit-bunch) + (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3))) + (for-each (lambda (x) + (let* ((cmd (conc "echo did job "x))) + (print "submit it--"x) + (q 'submit cmdline: cmd)) + ) + (iota 6)) + ;;(thread-sleep! 10) + (q 'drain) + ;;(q 'event-loop) + )) + +(define (test-submit-bunch2) + (let* ((q (tjq:new qname: 'test-q sync-job-cap: 20 ))) + (for-each (lambda (x) + ;;(let* ((cmd (conc "echo did job "x))) + (let* ((cmd "/bin/true")) + ;;(print "submit it--"x) + (q 'submit cmdline: cmd)) + ) + (iota 6000)) + ;;(thread-sleep! 10) + (q 'drain) + ;;(q 'event-loop) + )) + +;(test-tjq-simple) +;;(test-submit) +;;(test-drain-simple) +(print "top") +(test-submit-bunch2)