Megatest

Artifact [32eb4584d7]
Login

Artifact 32eb4584d73ef4fde6129f0ff0fd01ec185a42da:


#!/opt/chicken/bin/csi -s

(use mailbox-threads typed-records matchable mailbox)

;;; create a threaded job queue
;;; submit job
;;;   - command line
;;;   - working dir / default pwd
;;;   - env hash    / default current env
;;;   - callback on exit 0 / default noop
;;;   - callback on nonzero exit / default noop
;; tjq == threaded job queue; a job is a unix command
(define getenv get-environment-variable)
(defstruct tjq:job
    id  ;; assigned at construction time when added to waiting q
  state ;; assigned at construction time when added to waiting q
  (pid #f) ;; assigned when moved from ready q to running q
  (exit-code #f) ;; assigned when moved from running to done
  (time-entered-waiting #f)
  (time-entered-ready   #f)
  (time-entered-running #f)
  (time-entered-done    #f)
  ;; following are key options to submit method
  (work-dir     (getenv "PWD")) ;; where to execute job
  (setenvs     '())            ;; alist of envvars to set when running job
  (cmdline     "/bin/true")    ;; job command line; if string, run in subshell, if list, exec.
  (success-cb  (lambda () #t)) ;; fires when exitcode is 0
  (fail-cb     (lambda () #t)));; fires when exitcode is not 0

 
             

(define (tjq:exception e)
  (print "Exception: "e)
  ;;(print-call-chain)
  (exit 1))

(define (tjq:job-thread job-id job this-thread)
  (print "job-thread setup for jobid "job-id)
  (make-thread
   (lambda ()
     (print "job-thread started for jobid "job-id)
     (print "job-thread finished for jobid "job-id)
     #f)))


(define (tjq:dispatcher-thread qname job-hash sync-job-cap obj)
  (letrec
   (;; options to configure behavior of dispatcher
    (timeout-seconds 0.01)
    ;; define long-running thread which receives requests and coordinates
    ;; job execution
    (this-thread
     (make-thread
      (lambda ()
        (let loop ((next-job-id 0)
                   ;; four job queues holding job mbox-type threads
                   ;; they advance from one to the next
                   ;; once in done, the thread has completed.
                   (waiting '()) ;; stay here until count(running) < sync-job-cap
                   (ready   '()) ;; launch jobs in here
                   (running '()) ;; wait for pid to complete, then move do done
                   (done    '())) ;; 
          ;;(print "loop top")
          (match (thread-receive timeout-seconds 'timeout)
            ('timeout
             ;;*** when timeout happens, examine job queues
             ;;    and move jobs thru their lifecycle
             ;;** scan for dones
             ;; count dones, subtract from running total
             ;;** count waitings
             ;; move min(sync-job-cap - running total, waiting total) to ready
             ;; foreach ready, thread & fork
             
             
             (loop next-job-id waiting ready running done))
            (('method 'ping '() return-mbox)
             (print "got ping")
             (mailbox-send! return-mbox 'pong)
             (loop next-job-id waiting ready running done))
            
            (('method 'submit args return-mbox)
             (let* ((job-id next-job-id)
                    (job (apply
                          make-tjq:job
                          id: job-id
                          time-entered-waiting: (current-seconds)
                          state: 'waiting
                          args))
                    (job-thread (tjq:job-thread job-id job this-thread)))
               (hash-table-set! job-hash job-id job)
               (thread-start! job-thread)
               (mailbox-send! return-mbox job-id)
               (loop
                (add1 next-job-id)
                (cons job-thread waiting)
                ready running done)))
                    
            (e
             (print "tjq:dispatcher-thread> no matching pattern.  dispatcher received: ")
             (pp e)
             (exit 1))
            (('method x args return-mbox)
             (mailbox-send! return-mbox (list 'missing-method))
             (loop next-job-id waiting ready running done))
            ) ;; end match
          
          qname)))))
   this-thread))
  

       
(define (tjq:new #!key (qname (gensym)) (sync-job-cap 100))
  (let* ((job-hash (make-hash-table))
         (obj-mbox (make-mailbox)))
    (letrec
        ((dispatch-thread (tjq:dispatcher-thread qname job-hash sync-job-cap obj))
         (obj
          (lambda (op . args)
            (cond
             ((eq? op 'event-loop)
              (thread-join! dispatch-thread))
             (else
              (thread-send dispatch-thread (list 'method op args obj-mbox))
              (let* ((res (mailbox-receive! obj-mbox)))
                (if (eq? res 'missing-method)
                    (begin
                      (print "missing method "op" called.")
                      (tjq:exception 'missing-method))
                    res)))))))
      (thread-start! dispatch-thread)
      obj)))
         
                            
(define (test-tjq-simple)
  (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3)))
    ;(q 'submit "ls -l")
    ;(q 'drain)

    (pp (q 'ping))
    (pp (q 'ping))
    (thread-sleep! 0.1)
    ;(q 'event-loop)
    )
  )

(define (test-submit)
  (let* ((q (tjq:new qname: 'test-q sync-job-cap: 3)))
    (q 'submit cmdline: "sleep 2; echo job well done")
    (thread-sleep! 4)))

;(test-tjq-simple)
(test-submit)