ADDED example/q/threaded-queue.scm Index: example/q/threaded-queue.scm ================================================================== --- /dev/null +++ example/q/threaded-queue.scm @@ -0,0 +1,153 @@ +#!/opt/chicken/bin/csi -s + +(use mailbox-threads typed-records matchable mailbox) + +;;; create a threaded job queue +;;; submit job +;;; - command line +;;; - working dir / default pwd +;;; - env hash / default current env +;;; - callback on exit 0 / default noop +;;; - callback on nonzero exit / default noop +;; tjq == threaded job queue; a job is a unix command +(define getenv get-environment-variable) +(defstruct tjq:job + id ;; assigned at construction time when added to waiting q + state ;; assigned at construction time when added to waiting q + (pid #f) ;; assigned when moved from ready q to running q + (exit-code #f) ;; assigned when moved from running to done + (time-entered-waiting #f) + (time-entered-ready #f) + (time-entered-running #f) + (time-entered-done #f) + ;; following are key options to submit method + (work-dir (getenv "PWD")) ;; where to execute job + (setenvs '()) ;; alist of envvars to set when running job + (cmdline "/bin/true") ;; job command line; if string, run in subshell, if list, exec. + (success-cb (lambda () #t)) ;; fires when exitcode is 0 + (fail-cb (lambda () #t)));; fires when exitcode is not 0 + + + + +(define (tjq:exception e) + (print "Exception: "e) + ;;(print-call-chain) + (exit 1)) + +(define (tjq:job-thread job-id job this-thread) + (print "job-thread setup for jobid "job-id) + (make-thread + (lambda () + (print "job-thread started for jobid "job-id) + (print "job-thread finished for jobid "job-id) + #f))) + + +(define (tjq:dispatcher-thread qname job-hash sync-job-cap obj) + (letrec + (;; options to configure behavior of dispatcher + (timeout-seconds 0.01) + ;; define long-running thread which receives requests and coordinates + ;; job execution + (this-thread + (make-thread + (lambda () + (let loop ((next-job-id 0) + ;; four job queues holding job mbox-type threads + ;; they advance from one to the next + ;; once in done, the thread has completed. + (waiting '()) ;; stay here until count(running) < sync-job-cap + (ready '()) ;; launch jobs in here + (running '()) ;; wait for pid to complete, then move do done + (done '())) ;; + ;;(print "loop top") + (match (thread-receive timeout-seconds 'timeout) + ('timeout + ;;*** when timeout happens, examine job queues + ;; and move jobs thru their lifecycle + ;;** scan for dones + ;; count dones, subtract from running total + ;;** count waitings + ;; move min(sync-job-cap - running total, waiting total) to ready + ;; foreach ready, thread & fork + + + (loop next-job-id waiting ready running done)) + (('method 'ping '() return-mbox) + (print "got ping") + (mailbox-send! return-mbox 'pong) + (loop next-job-id waiting ready running done)) + + (('method 'submit args return-mbox) + (let* ((job-id next-job-id) + (job (apply + make-tjq:job + id: job-id + time-entered-waiting: (current-seconds) + state: 'waiting + args)) + (job-thread (tjq:job-thread job-id job this-thread))) + (hash-table-set! job-hash job-id job) + (thread-start! job-thread) + (mailbox-send! return-mbox job-id) + (loop + (add1 next-job-id) + (cons job-thread waiting) + ready running done))) + + (e + (print "tjq:dispatcher-thread> no matching pattern. dispatcher received: ") + (pp e) + (exit 1)) + (('method x args return-mbox) + (mailbox-send! return-mbox (list 'missing-method)) + (loop next-job-id waiting ready running done)) + ) ;; end match + + qname))))) + this-thread)) + + + +(define (tjq:new #!key (qname (gensym)) (sync-job-cap 100)) + (let* ((job-hash (make-hash-table)) + (obj-mbox (make-mailbox))) + (letrec + ((dispatch-thread (tjq:dispatcher-thread qname job-hash sync-job-cap obj)) + (obj + (lambda (op . args) + (cond + ((eq? op 'event-loop) + (thread-join! dispatch-thread)) + (else + (thread-send dispatch-thread (list 'method op args obj-mbox)) + (let* ((res (mailbox-receive! obj-mbox))) + (if (eq? res 'missing-method) + (begin + (print "missing method "op" called.") + (tjq:exception 'missing-method)) + res))))))) + (thread-start! dispatch-thread) + obj))) + + +(define (test-tjq-simple) + (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3))) + ;(q 'submit "ls -l") + ;(q 'drain) + + (pp (q 'ping)) + (pp (q 'ping)) + (thread-sleep! 0.1) + ;(q 'event-loop) + ) + ) + +(define (test-submit) + (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3))) + (q 'submit cmdline: "sleep 2; echo job well done") + (thread-sleep! 4))) + +;(test-tjq-simple) +(test-submit)