@@ -181,10 +181,52 @@ ;;====================================================================== ;; C L I E N T S ;;====================================================================== (define *http-mutex* (make-mutex)) +(define *http-requests-in-progress* 0) +(define *http-connections-next-cleanup* (current-seconds)) + +(define (http-transport:get-time-to-cleanup) + (let ((res #f)) + (mutex-lock! *http-mutex*) + (set! res (> (current-seconds) *http-connections-next-cleanup*)) + (mutex-unlock! *http-mutex*) + res)) + +(define (http-transport:inc-requests-count) + (mutex-lock! *http-mutex*) + (set! *http-requests-in-progress* (+ 1 *http-requests-in-progress*)) + ;; Use this opportunity to slow things down iff there are too many requests in flight + (if (> *http-requests-in-progress* 5) + (begin + (debug:print-info 0 "Whoa there buddy, ease up...") + (thread-sleep! 1))) + (mutex-unlock! *http-mutex*)) + +(define (http-transport:dec-requests-count proc) + (mutex-lock! *http-mutex*) + (proc) + (set! *http-requests-in-progress* (- *http-requests-in-progress* 1)) + (mutex-unlock! *http-mutex*)) + +(define (http-transport:dec-requests-count-and-close-all-connections) + (set! *http-requests-in-progress* (- *http-requests-in-progress* 1)) + (let loop ((etime (+ (current-seconds) 5))) ;; give up in five seconds + (if (> *http-requests-in-progress* 0) + (if (> etime (current-seconds)) + (begin + (thread-sleep! 0.05) + (loop etime)) + (debug:print 0 "ERROR: requests still in progress after 5 seconds of waiting. I'm going to pass on cleaning up http connections")) + (close-all-connections!))) + (set! *http-connections-next-cleanup* (+ (current-seconds) 10)) + (mutex-unlock! *http-mutex*)) + +(define (http-transport:inc-requests-and-prep-to-close-all-connections) + (mutex-lock! *http-mutex*) + (set! *http-requests-in-progress* (+ 1 *http-requests-in-progress*))) ;; (system "megatest -list-servers | grep alive || megatest -server - -daemonize && sleep 4") ;; ;; @@ -216,17 +258,30 @@ ;; #t)) ;; send the data and get the response ;; extract the needed info from the http data and ;; process and return it. (let* ((send-recieve (lambda () - (mutex-lock! *http-mutex*) - (set! res (with-input-from-request - fullurl - (list (cons 'dat msg)) - read-string)) - (close-all-connections!) - (mutex-unlock! *http-mutex*))) + (let ((dat #f) + (cleanup (http-transport:get-time-to-cleanup))) + (if cleanup + (begin + (debug:print-info 0 "Running cleanup mode") + (http-transport:inc-requests-and-prep-to-close-all-connections)) + (http-transport:inc-requests-count)) + ;; Do the actual data transfer + (set! dat (with-input-from-request + fullurl + (list (cons 'dat msg)) + read-string)) + (if cleanup + ;; mutex already set + (begin + (set! res dat) + (http-transport:dec-requests-count-and-close-all-connections)) + (http-transport:dec-requests-count + (lambda () + (set! res dat))))))) (time-out (lambda () (thread-sleep! 45) (if (not res) (begin (debug:print 0 "WARNING: communication with the server timed out.") @@ -281,19 +336,30 @@ ;; (with-input-from-request "http://localhost/echo-service" ;; '((test . "value")) read-string) (let* ((send-recieve (lambda () - (mutex-lock! *http-mutex*) - (set! res (with-input-from-request - fullurl - (list (cons 'key "thekey") - (cons 'cmd cmd) - (cons 'params params)) - read-string)) - (close-all-connections!) - (mutex-unlock! *http-mutex*))) + (let ((dat #f) + (cleanup (http-transport:get-time-to-cleanup))) + (if cleanup + (http-transport:inc-requests-and-prep-to-close-all-connections) + (http-transport:inc-requests-count)) + ;; Do the actual data transfer NB// KEPP THIS IN SYNC WITH http-transport:client-send-receive + (set! dat (with-input-from-request + fullurl + (list (cons 'key "thekey") + (cons 'cmd cmd) + (cons 'params params)) + read-string)) + (if cleanup + ;; mutex already set + (begin + (set! res dat) + (http-transport:dec-requests-count-and-close-all-connections)) + (http-transport:dec-requests-count + (lambda () + (set! res dat))))))) (time-out (lambda () (thread-sleep! 45) (if (not res) (begin (debug:print 0 "WARNING: communication with the server timed out.")