Index: example/q/threaded-queue.scm ================================================================== --- example/q/threaded-queue.scm +++ example/q/threaded-queue.scm @@ -1,8 +1,8 @@ #!/opt/chicken/bin/csi -s -(use mailbox-threads typed-records matchable mailbox) +(use mailbox-threads typed-records matchable mailbox posix) ;;; create a threaded job queue ;;; submit job ;;; - command line ;;; - working dir / default pwd @@ -13,10 +13,12 @@ (define getenv get-environment-variable) (defstruct tjq:job id ;; assigned at construction time when added to waiting q state ;; assigned at construction time when added to waiting q (pid #f) ;; assigned when moved from ready q to running q + (threadobj #f) + (normal-exit #f) (exit-code #f) ;; assigned when moved from running to done (time-entered-waiting #f) (time-entered-ready #f) (time-entered-running #f) (time-entered-done #f) @@ -33,23 +35,76 @@ (define (tjq:exception e) (print "Exception: "e) ;;(print-call-chain) (exit 1)) -(define (tjq:job-thread job-id job this-thread) - (print "job-thread setup for jobid "job-id) - (make-thread - (lambda () - (print "job-thread started for jobid "job-id) - (print "job-thread finished for jobid "job-id) - #f))) - - -(define (tjq:dispatcher-thread qname job-hash sync-job-cap obj) +(define (tjq:job-thread job-id job dispatch-thread timeout-seconds) + ;;(print "job-thread setup for jobid "job-id) + (letrec + ((this-thread + (make-thread + (lambda () + (tjq:job-threadobj-set! job this-thread) + ;;(print "job-thread started for jobid "job-id) + (let loop ((pid #f)) + ;;(print "job-thr("job-id")> loop top.") + (match (thread-receive timeout-seconds 'timeout) + ('timeout + ;;(print "job-thr("job-id")> timeout; pid="pid" before cond1") + (cond + ((number? pid) ;; check if still running + ;;(print "job-thr("job-id")> timeout; pid="pid" cond1 pid-is-number branch") + (let-values (((pid-or-zero normal-exit exitcode-or-signal) + (process-wait pid #t))) + ;;(print "job-thr("job-id")> pid-or-zero="pid-or-zero) + (cond + ((zero? pid-or-zero) + ;;(print "job-thr("job-id")> timeout; pid="pid" cond2 pid-or-zero is zero branch") + ;;(print "job-thr("job-id")> zero; loop.") + (loop pid)) + (else + ;;(print "job-thr("job-id")> timeout; pid="pid" cond2 else branch") + (tjq:job-normal-exit-set! job + (if normal-exit 'normal 'signal)) + (tjq:job-exit-code-set! job exitcode-or-signal) + + (thread-send dispatch-thread (list 'job-now-done job-id)))) + ;;(print "job-thr("job-id")> after cond2") + )) + (else + ;;(print "job-thr("job-id")> timeout; pid="pid" cond1 else branch") + ;;(print "job-thr("job-id")> no action; loop") + (thread-sleep! timeout-seconds) + (loop pid)))) + ('run + ;;(print "job-thr("job-id")> run called") + (let* ((cmdline (tjq:job-cmdline job)) + (newpid (if (list? cmdline) + (process-run (car cmdline) (cdr cmdline)) + (process-run cmdline)))) + (tjq:job-pid-set! job newpid) + (thread-send dispatch-thread + (list 'job-now-running job-id)) + (loop newpid))) + (e + (print "tjq:job-thread("job-id") illegal message received:") + (pp e) + (exit 1)))) + ;;(print "job-thread finished for jobid "job-id) + #f)))) + this-thread)) + + +(define (tjq:dispatcher-thread qname job-hash sync-job-cap obj #!key + (job-thread-timeout-seconds 0.1) + (timeout-seconds 0.1) + ) (letrec (;; options to configure behavior of dispatcher - (timeout-seconds 0.01) + (stop #f) + ;;(timeout-seconds 0.5) + ;;(job-thread-timeout-seconds 0.5) ;; define long-running thread which receives requests and coordinates ;; job execution (this-thread (make-thread (lambda () @@ -61,53 +116,122 @@ (ready '()) ;; launch jobs in here (running '()) ;; wait for pid to complete, then move do done (done '())) ;; ;;(print "loop top") (match (thread-receive timeout-seconds 'timeout) + ;; (let ((res (thread-receive timeout-seconds 'timeout) + ;; (if (not stop) + + ;; 'done))) + ;; res) ('timeout - ;;*** when timeout happens, examine job queues - ;; and move jobs thru their lifecycle - ;;** scan for dones - ;; count dones, subtract from running total - ;;** count waitings - ;; move min(sync-job-cap - running total, waiting total) to ready - ;; foreach ready, thread & fork - - - (loop next-job-id waiting ready running done)) + ;;(print "to: "stop" ; next-job-id="next-job-id) + (if (and + (not next-job-id) ;; we're draining jobs + (null? waiting) + (null? ready) + (null? running) + ) + (begin + (print "Drained. Done.") + (set! stop #t) + #f) + ;;*** when timeout happens, examine job queues + ;; and move jobs thru their lifecycle + ;;** count waitings + ;; move min(sync-job-cap - running total, waiting total) to ready + ;; foreach ready, run it + ;;(print "disp: ready="ready) + (begin + (for-each (lambda (job-id) + (let* ((job (hash-table-ref job-hash job-id)) + (job-thread (tjq:job-threadobj job))) + (tjq:job-state-set! job 'ready) + (thread-send job-thread 'run))) + ready) + (let* ((new-running (flatten ready running)) + (avail-slots (- sync-job-cap (length new-running))) + (queueable-count (min avail-slots (length waiting)))) + (let-values (((new-ready new-waiting) + (split-at waiting queueable-count))) + (loop next-job-id new-waiting new-ready new-running done)))))) + + (('job-now-running job-id) + (let ((job (hash-table-ref job-hash job-id))) + (tjq:job-state-set! job 'running) + (loop next-job-id waiting ready running done))) + (('job-now-done job-id) + (let* ((job (hash-table-ref job-hash job-id)) + (successful + (and + (eq? 'normal (tjq:job-normal-exit job)) + (zero? (tjq:job-exit-code job)))) + (new-running (filter + (lambda (x) (not (eq? x job-id))) + running)) + (new-done (cons job-id done))) + (tjq:job-state-set! job 'done) + (loop next-job-id waiting ready new-running new-done))) (('method 'ping '() return-mbox) (print "got ping") (mailbox-send! return-mbox 'pong) (loop next-job-id waiting ready running done)) (('method 'submit args return-mbox) - (let* ((job-id next-job-id) - (job (apply - make-tjq:job - id: job-id - time-entered-waiting: (current-seconds) - state: 'waiting - args)) - (job-thread (tjq:job-thread job-id job this-thread))) - (hash-table-set! job-hash job-id job) - (thread-start! job-thread) - (mailbox-send! return-mbox job-id) - (loop - (add1 next-job-id) - (cons job-thread waiting) - ready running done))) - + (if (not next-job-id) + (begin + (print "refuse to submit new job -- draining jobs now.") + (mailbox-send! return-mbox #f) + (loop #f waiting ready running done)) + (let* ((job-id next-job-id) + (job (apply + make-tjq:job + id: job-id + time-entered-waiting: (current-seconds) + state: 'waiting + args)) + (job-thread (tjq:job-thread job-id job this-thread job-thread-timeout-seconds))) + (hash-table-set! job-hash job-id job) + (thread-start! job-thread) + (mailbox-send! return-mbox job-id) + (loop + (add1 next-job-id) + (cons job-id waiting) + ready running done)))) + + (('method 'kill) + ;; (for-each (job-id) + ;; (lambda (job-id) + ;; (let* ((job (hash-table-ref job-hash job-id)) + ;; (job-thread (tjq:job-threadobj job))) + ;; (thread-send job-thread 'abort)) + ) + + (('method 'drain args return-mbox) + (mailbox-send! return-mbox 'drain) + (loop #f waiting ready running done)) + + + ;; (for-each + ;; (lambda (job-id) + ;; (let* ((job (hash-table-ref job-hash job-id)) + ;; (job-thread (tjq:job-threadobj job))) + ;; (thread-join! job-thread))) + ;; '() ;; FIXME + ;; ) + (e (print "tjq:dispatcher-thread> no matching pattern. dispatcher received: ") (pp e) (exit 1)) (('method x args return-mbox) (mailbox-send! return-mbox (list 'missing-method)) (loop next-job-id waiting ready running done)) ) ;; end match - - qname))))) + ;;(print "Done dispatch thread") + #f + ))))) this-thread)) (define (tjq:new #!key (qname (gensym)) (sync-job-cap 100)) @@ -117,19 +241,31 @@ ((dispatch-thread (tjq:dispatcher-thread qname job-hash sync-job-cap obj)) (obj (lambda (op . args) (cond ((eq? op 'event-loop) - (thread-join! dispatch-thread)) + (print "got event-loop") + (thread-join! dispatch-thread) + (print "after thread-join dispatch-thread") + ) + ((eq? op 'drain) + (thread-send dispatch-thread (list 'method op args obj-mbox)) + (thread-join! dispatch-thread) + (print "Done with queue "qname) + #t) (else + ;;(print "send method op="op) (thread-send dispatch-thread (list 'method op args obj-mbox)) (let* ((res (mailbox-receive! obj-mbox))) (if (eq? res 'missing-method) (begin (print "missing method "op" called.") (tjq:exception 'missing-method)) - res))))))) + res))))) + + ) ;; end obj binding + ); end letrec bindings (thread-start! dispatch-thread) obj))) (define (test-tjq-simple) @@ -147,7 +283,44 @@ (define (test-submit) (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3))) (q 'submit cmdline: "sleep 2; echo job well done") (thread-sleep! 4))) + +(define (test-drain-simple) + (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3))) + ;(q 'submit cmdline: "sleep 2; echo job well done") + (thread-sleep! 1) + (q 'drain))) + + +(define (test-submit-bunch) + (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3))) + (for-each (lambda (x) + (let* ((cmd (conc "echo did job "x))) + (print "submit it--"x) + (q 'submit cmdline: cmd)) + ) + (iota 6)) + ;;(thread-sleep! 10) + (q 'drain) + ;;(q 'event-loop) + )) + +(define (test-submit-bunch2) + (let* ((q (tjq:new qname: 'test-q sync-job-cap: 42))) + (for-each (lambda (x) + (let* ((cmd (conc "echo did job "x))) + (print "submit it--"x) + (q 'submit cmdline: cmd)) + ) + (iota 600)) + ;;(thread-sleep! 10) + (q 'drain) + ;;(q 'event-loop) + )) + ;(test-tjq-simple) -(test-submit) +;;(test-submit) +;;(test-drain-simple) +(print "top") +(test-submit-bunch2)