36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
(mutex-unlock! mtx)
(car (string-split result)))
#f)
(loop (read-line inp)))))))
(define *max-running* 40)
(define (gather-dir-info path)
(let ((mtx1 (make-mutex))
(threads (make-hash-table))
(last-num 0)
(req (nn-socket 'req)))
(print "starting client with pid " (current-process-id))
(nn-connect req
;; "tcp://localhost:5559")
"ipc:///tmp/test-ipc")
(find-files
path
;; test: #t
action: (lambda (p res)
(let ((info (cond
((not (file-read-access? p)) '(cant-read))
((directory? p) '(dir))
((symbolic-link? p) (list 'symlink (read-symbolic-link p)))
(else '(data)))))
(if (eq? (car info) 'data)
(let loop ((start-time (current-seconds)))
(mutex-lock! mtx1)
(let* ((num-threads (hash-table-size threads))
(ok-to-run (> *max-running* num-threads)))
;; (if (> (abs (- num-threads last-num)) 2)
;; (begin
;; ;; (print "num-threads:" num-threads)
;; (set! last-num num-threads)))
(mutex-unlock! mtx1)
(if ok-to-run
(let ((run-time-start (current-seconds)))
;; (print "num threads: " num-threads)
(let ((th1 (make-thread
(lambda ()
(let ((cksum (checksum mtx1 p cmd: "md5sum"))
(run-time (- (current-seconds) run-time-start)))
(mutex-lock! mtx1)
(client-send-receive req (conc p " " cksum))
(mutex-unlock! mtx1))
(let loop2 ()
(mutex-lock! mtx1)
(let ((registered (hash-table-exists? threads p)))
(if registered
(begin
;; (print "deleting thread reference for " p)
(hash-table-delete! threads p))) ;; delete myself
(mutex-unlock! mtx1)
(if (not registered)
(begin
(thread-sleep! 0.5)
(loop2))))))
p)))
(thread-start! th1)
;; (thread-sleep! 0.05) ;; give things a little time to get going
;; (thread-join! th1) ;;
(mutex-lock! mtx1)
(hash-table-set! threads p th1)
(mutex-unlock! mtx1)
)) ;; thread is launched
(let ((run-time (- (current-seconds) start-time))) ;; couldn't launch yet
(cond
((< run-time 5)) ;; blast on through
((< run-time 30)(thread-sleep! 0.1))
((< run-time 60)(thread-sleep! 2))
((< run-time 120)(thread-sleep! 3))
|
>
>
>
>
>
|
|
|
|
|
|
|
|
|
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
(mutex-unlock! mtx)
(car (string-split result)))
#f)
(loop (read-line inp)))))))
(define *max-running* 40)
(define my-mutex-lock! conc)
(define my-mutex-unlock! conc)
;; (define my-mutex-lock! mutex-lock!)
;; (define my-mutex-unlock! mutex-unlock!)
(define (gather-dir-info path)
(let ((mtx1 (make-mutex))
(threads (make-hash-table))
(last-num 0)
(req (nn-socket 'req)))
(print "starting client with pid " (current-process-id))
(nn-connect req
;; "tcp://localhost:5559")
"ipc:///tmp/test-ipc")
(find-files
path
;; test: #t
action: (lambda (p res)
(let ((info (cond
((not (file-read-access? p)) '(cant-read))
((directory? p) '(dir))
((symbolic-link? p) (list 'symlink (read-symbolic-link p)))
(else '(data)))))
(if (eq? (car info) 'data)
(let loop ((start-time (current-seconds)))
(my-mutex-lock! mtx1)
(let* ((num-threads (hash-table-size threads))
(ok-to-run (> *max-running* num-threads)))
;; (if (> (abs (- num-threads last-num)) 2)
;; (begin
;; ;; (print "num-threads:" num-threads)
;; (set! last-num num-threads)))
(my-mutex-unlock! mtx1)
(if ok-to-run
(let ((run-time-start (current-seconds)))
;; (print "num threads: " num-threads)
(let ((th1 (make-thread
(lambda ()
(let ((cksum (checksum mtx1 p cmd: "md5sum"))
(run-time (- (current-seconds) run-time-start)))
(my-mutex-lock! mtx1)
(client-send-receive req (conc p " " cksum))
(my-mutex-unlock! mtx1))
(let loop2 ()
(my-mutex-lock! mtx1)
(let ((registered (hash-table-exists? threads p)))
(if registered
(begin
;; (print "deleting thread reference for " p)
(hash-table-delete! threads p))) ;; delete myself
(my-mutex-unlock! mtx1)
(if (not registered)
(begin
(thread-sleep! 0.5)
(loop2))))))
p)))
(thread-start! th1)
;; (thread-sleep! 0.05) ;; give things a little time to get going
;; (thread-join! th1) ;;
(my-mutex-lock! mtx1)
(hash-table-set! threads p th1)
(my-mutex-unlock! mtx1)
)) ;; thread is launched
(let ((run-time (- (current-seconds) start-time))) ;; couldn't launch yet
(cond
((< run-time 5)) ;; blast on through
((< run-time 30)(thread-sleep! 0.1))
((< run-time 60)(thread-sleep! 2))
((< run-time 120)(thread-sleep! 3))
|