Index: dashboard.scm ================================================================== --- dashboard.scm +++ dashboard.scm @@ -98,10 +98,11 @@ typed-records sparse-vectors format srfi-4 srfi-14 + srfi-18 ) ;; (include "common_records.scm") ;; (include "db_records.scm") ;; (include "run_records.scm") @@ -3754,7 +3755,12 @@ ;; ease debugging by loading ~/.dashboardrc (let ((debugcontrolf (get-debugcontrolf))) (if debugcontrolf (load debugcontrolf))) -(main) +(import srfi-18) + +(thread-join! + (thread-start! + (make-thread main "main"))) + Index: rmtmod.scm ================================================================== --- rmtmod.scm +++ rmtmod.scm @@ -1642,46 +1642,57 @@ ;; S E R V E R ;; ====================================================================== (define (http-get-function fnkey) (hash-table-ref/default *http-functions* fnkey (lambda () "nothing here yet"))) + +(define *rmt:run-mutex* (make-mutex)) +(define *rmt:run-flag* #f) ;; Main entry point to start a server. was start-server (define (rmt:run hostn) - ;; ;; Configurations for server - ;; (tcp-buffer-size 2048) - ;; (max-connections 2048) - (debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...") - (if (and *db-serv-info* - (servdat-uconn *db-serv-info*)) - (let* ((uconn (servdat-uconn *db-serv-info*))) - (wait-and-close uconn)) - (let* ((port (portlogger:open-run-close portlogger:find-port)) - (handler-proc (lambda (rem-host-port qrykey cmd params) ;; - (set! *db-last-access* (current-seconds)) - (assert (list? params) "FATAL: handler called with non-list params") - (api:execute-requests *dbstruct-db* cmd params)))) - ;; (api:process-request *dbstuct-db* - (if (not *db-serv-info*) - (set! *db-serv-info* (make-servdat host: hostn port: port))) - (let* ((uconn (run-listener handler-proc port)) - (rport (udat-port uconn))) ;; the real port - (servdat-host-set! *db-serv-info* hostn) - (servdat-port-set! *db-serv-info* rport) - (servdat-uconn-set! *db-serv-info* uconn) - (wait-and-close uconn) - (db:print-current-query-stats) - ))) - (let* ((host (servdat-host *db-serv-info*)) - (port (servdat-port *db-serv-info*)) - (mode (or (servdat-mode *db-serv-info*) - "non-db"))) - ;; server exit stuff here - ;; (rmt:server-shutdown host port) - always do in on-exit - ;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit - (debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting") - )) + (mutex-lock! *rmt:run-mutex*) + (if *rmt:run-flag* + (begin + (debug:print-warn 0 *default-log-port* "rmt:run already running.") + (mutex-unlock! *rmt:run-mutex*)) + (begin + (set! *rmt:run-flag* #t) + (mutex-unlock! *rmt:run-mutex*) + ;; ;; Configurations for server + ;; (tcp-buffer-size 2048) + ;; (max-connections 2048) + (debug:print 2 *default-log-port* "PID: "(current-process-id)". Attempting to start the server ...") + (if (and *db-serv-info* + (servdat-uconn *db-serv-info*)) + (let* ((uconn (servdat-uconn *db-serv-info*))) + (wait-and-close uconn)) + (let* ((port (portlogger:open-run-close portlogger:find-port)) + (handler-proc (lambda (rem-host-port qrykey cmd params) ;; + (set! *db-last-access* (current-seconds)) + (assert (list? params) "FATAL: handler called with non-list params") + (api:execute-requests *dbstruct-db* cmd params)))) + ;; (api:process-request *dbstuct-db* + (if (not *db-serv-info*) + (set! *db-serv-info* (make-servdat host: hostn port: port))) + (let* ((uconn (run-listener handler-proc port)) + (rport (udat-port uconn))) ;; the real port + (servdat-host-set! *db-serv-info* hostn) + (servdat-port-set! *db-serv-info* rport) + (servdat-uconn-set! *db-serv-info* uconn) + (wait-and-close uconn) + (db:print-current-query-stats) + ))) + (let* ((host (servdat-host *db-serv-info*)) + (port (servdat-port *db-serv-info*)) + (mode (or (servdat-mode *db-serv-info*) + "non-db"))) + ;; server exit stuff here + ;; (rmt:server-shutdown host port) - always do in on-exit + ;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit + (debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting") + )))) ;;====================================================================== ;; S E R V E R U T I L I T I E S ;;====================================================================== Index: tests/simplerun/debug.scm ================================================================== --- tests/simplerun/debug.scm +++ tests/simplerun/debug.scm @@ -14,11 +14,11 @@ (lambda () (let loop ((r 1) (i 1)) (print "register-test "r" test"i) (rmt:register-test r "test1" (conc "item_" i)) - (if (< i 1000) + (if (< i 100000) (loop r (+ i 1)) (if (< r 100) (begin (print "get-tests-for-run "r) (rmt:get-tests-for-run r "%" '() '() 0 #f #f #f #f #f 0 #f) Index: ulex/ulex.scm ================================================================== --- ulex/ulex.scm +++ ulex/ulex.scm @@ -64,11 +64,11 @@ chicken.pretty-print address-info mailbox matchable - queues + ;; queues regex regex-case s11n srfi-1 srfi-18 @@ -88,16 +88,17 @@ (host-port #f) (socket #f) ;; the peers (peers (make-hash-table)) ;; host:port->peer ;; work handling - (work-queue (make-queue)) - (work-proc #f) ;; set by user - (cnum 0) ;; cookie number - (mboxes (make-hash-table)) - (avail-cmboxes '()) ;; list of ( . ) for re-use + (work-queue (make-mailbox)) + (work-proc #f) ;; set by user + (cnum 0) ;; cookie number + (mboxes (make-hash-table)) ;; for the replies + (avail-cmboxes '()) ;; list of ( . ) for re-use ;; threads + (numthreads 10) (cmd-thread #f) (work-queue-thread #f) ) ;; ;; struct for keeping track of others we are talking to @@ -305,11 +306,12 @@ ;;====================================================================== ;; rdat is (rem-host-port qrykey cmd params) (define (add-to-work-queue uconn rdat) - (queue-add! (udat-work-queue uconn) rdat)) + #;(queue-add! (udat-work-queue uconn) rdat) + (mailbox-send! (udat-work-queue uconn) rdat)) (define (do-work uconn rdat) (let* ((proc (udat-work-proc uconn))) ;; get it each time - conceivebly it could change ;; put this following into a do-work procedure (match rdat @@ -318,20 +320,29 @@ ;; send 'response as cmd and result as params (send uconn rem-host-port qrykey 'response result))) ;; could check for ack (else (print "ERROR: rdat "rdat", did not match rem-host-port qrykey cmd params"))))) - (define (process-work-queue uconn) (let ((wqueue (udat-work-queue uconn)) - (proc (udat-work-proc uconn))) - (let loop () - (if (queue-empty? wqueue) - (thread-sleep! 0.1) - (let ((rdat (queue-remove! wqueue))) - (do-work uconn rdat))) - (loop)))) + (proc (udat-work-proc uconn)) + (numthr (udat-numthreads uconn))) + (let loop ((thnum 1) + (threads '())) + (let ((thlst (cons (make-thread (lambda () + (let ((rdat (mailbox-receive! wqueue #f 'MBOX_TIMEOUT))) + (do-work uconn rdat))) + (conc "work thread " thnum)) + threads))) + (if (< thnum numthr) + (loop (+ thnum 1) + thlst) + (begin + (print "ULEX: Starting "(length thlst)" worker threads.") + (map thread-start! thlst) + (print "ULEX: Threads started. Joining all.") + (map thread-join! thlst))))))) ;; below was to enable re-use of connections. This seems non-trivial so for ;; now lets open on each call ;; ;; ;; given host-port get or create peer struct