Overview
Comment: | Cleaned up server starting. Should be no run-away starting of too many servers now. |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | v1.62-no-rpc |
Files: | files | file ages | folders |
SHA1: |
dc09eb179bb5ff963c2a223e9f2846b8 |
User & Date: | mrwellan on 2016-12-01 08:43:50 |
Other Links: | branch diff | manifest | tags |
Context
2016-12-01
| ||
15:58 | server fixes check-in: f0c98a8cd8 user: mrwellan tags: v1.62-no-rpc | |
08:43 | Cleaned up server starting. Should be no run-away starting of too many servers now. check-in: dc09eb179b user: mrwellan tags: v1.62-no-rpc | |
2016-11-30
| ||
23:10 | Basic server code, removed some junk and corrected couple typos check-in: bb626804c7 user: matt tags: v1.62-no-rpc | |
Changes
Modified db.scm from [77088c1205] to [fef1f3f2e3].
︙ | ︙ | |||
37 38 39 40 41 42 43 44 45 46 | (define *number-non-write-queries* 0) ;;====================================================================== ;; R E C O R D S ;;====================================================================== ;; each db entry is a pair ( db . dbfilepath ) (defstruct dbr:dbstruct (tmpdb #f) (mtdb #f) | > > | > > > > | 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | (define *number-non-write-queries* 0) ;;====================================================================== ;; R E C O R D S ;;====================================================================== ;; each db entry is a pair ( db . dbfilepath ) ;; I propose this record evolves into the area record ;; (defstruct dbr:dbstruct (tmpdb #f) (mtdb #f) (refndb #f) (homehost #f) ;; not used yet (on-homehost #f) ;; not used yet ) ;; goal is to converge on one struct for an area but for now it is too confusing ;; record for keeping state,status and count for doing roll-ups in ;; iterated tests ;; (defstruct dbr:counts (state #f) (status #f) |
︙ | ︙ |
Modified remotediff-nmsg.scm from [90308a45f2] to [50100144d4].
︙ | ︙ | |||
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | (mutex-unlock! mtx) (car (string-split result))) #f) (loop (read-line inp))))))) (define *max-running* 40) (define (gather-dir-info path) (let ((mtx1 (make-mutex)) (threads (make-hash-table)) (last-num 0) (req (nn-socket 'req))) (print "starting client with pid " (current-process-id)) (nn-connect req ;; "tcp://localhost:5559") "ipc:///tmp/test-ipc") (find-files path ;; test: #t action: (lambda (p res) (let ((info (cond ((not (file-read-access? p)) '(cant-read)) ((directory? p) '(dir)) ((symbolic-link? p) (list 'symlink (read-symbolic-link p))) (else '(data))))) (if (eq? (car info) 'data) (let loop ((start-time (current-seconds))) | > > > > > | | | | | | | | | 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | (mutex-unlock! mtx) (car (string-split result))) #f) (loop (read-line inp))))))) (define *max-running* 40) (define my-mutex-lock! conc) (define my-mutex-unlock! conc) ;; (define my-mutex-lock! mutex-lock!) ;; (define my-mutex-unlock! mutex-unlock!) (define (gather-dir-info path) (let ((mtx1 (make-mutex)) (threads (make-hash-table)) (last-num 0) (req (nn-socket 'req))) (print "starting client with pid " (current-process-id)) (nn-connect req ;; "tcp://localhost:5559") "ipc:///tmp/test-ipc") (find-files path ;; test: #t action: (lambda (p res) (let ((info (cond ((not (file-read-access? p)) '(cant-read)) ((directory? p) '(dir)) ((symbolic-link? p) (list 'symlink (read-symbolic-link p))) (else '(data))))) (if (eq? (car info) 'data) (let loop ((start-time (current-seconds))) (my-mutex-lock! mtx1) (let* ((num-threads (hash-table-size threads)) (ok-to-run (> *max-running* num-threads))) ;; (if (> (abs (- num-threads last-num)) 2) ;; (begin ;; ;; (print "num-threads:" num-threads) ;; (set! last-num num-threads))) (my-mutex-unlock! mtx1) (if ok-to-run (let ((run-time-start (current-seconds))) ;; (print "num threads: " num-threads) (let ((th1 (make-thread (lambda () (let ((cksum (checksum mtx1 p cmd: "md5sum")) (run-time (- (current-seconds) run-time-start))) (my-mutex-lock! mtx1) (client-send-receive req (conc p " " cksum)) (my-mutex-unlock! mtx1)) (let loop2 () (my-mutex-lock! mtx1) (let ((registered (hash-table-exists? threads p))) (if registered (begin ;; (print "deleting thread reference for " p) (hash-table-delete! threads p))) ;; delete myself (my-mutex-unlock! mtx1) (if (not registered) (begin (thread-sleep! 0.5) (loop2)))))) p))) (thread-start! th1) ;; (thread-sleep! 0.05) ;; give things a little time to get going ;; (thread-join! th1) ;; (my-mutex-lock! mtx1) (hash-table-set! threads p th1) (my-mutex-unlock! mtx1) )) ;; thread is launched (let ((run-time (- (current-seconds) start-time))) ;; couldn't launch yet (cond ((< run-time 5)) ;; blast on through ((< run-time 30)(thread-sleep! 0.1)) ((< run-time 60)(thread-sleep! 2)) ((< run-time 120)(thread-sleep! 3)) |
︙ | ︙ |
Modified rmt.scm from [f6fac119b0] to [8fc869d3b5].
︙ | ︙ | |||
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | (debug:print 0 *default-log-port* "ERROR: 15 tries to start/connect to server. Giving up.") (exit 1)) ;; ensure we have a record for our connection for given area ((not *runremote*) (set! *runremote* (make-remote)) (mutex-unlock! *rmt-mutex*) (rmt:send-receive cmd rid params attemptnum: attemptnum)) ;; no server contact made and this is a write, try starting a server ((and (not (remote-server-url *runremote*)) (not (member cmd api:read-only-queries))) (let ((serverconn (server:check-if-running *toppath*))) (if serverconn (remote-server-url-set! *runremote* serverconn) ;; the string can be consumed by the client setup if needed (if (not (server:start-attempted? *toppath*)) (server:kind-run *toppath*)))) (mutex-unlock! *rmt-mutex*) | > > > > > > > > > > > > > > > > > > > > > > | | 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | (debug:print 0 *default-log-port* "ERROR: 15 tries to start/connect to server. Giving up.") (exit 1)) ;; ensure we have a record for our connection for given area ((not *runremote*) (set! *runremote* (make-remote)) (mutex-unlock! *rmt-mutex*) (rmt:send-receive cmd rid params attemptnum: attemptnum)) ;; ensure we have a homehost record ((not (pair? (remote-hh-dat *runremote*))) ;; have a homehost record? (thread-sleep! 0.1) ;; since we shouldn't get here, delay a little (remote-hh-dat-set! *runremote* (common:get-homehost)) (mutex-unlock! *rmt-mutex*) (rmt:send-receive cmd rid params attemptnum: attemptnum)) ;; on homehost and this is a read ((and (cdr (remote-hh-dat *runremote*)) ;; on homehost (member cmd api:read-only-queries)) ;; this is a read (mutex-unlock! *rmt-mutex*) (rmt:open-qry-close-locally cmd 0 params)) ;; on homehost and this is a write, we already have a server ((and (cdr (remote-hh-dat *runremote*)) ;; on homehost (not (member cmd api:read-only-queries)) ;; this is a write (remote-server-url *runremote*)) ;; have a server (mutex-unlock! *rmt-mutex*) (rmt:open-qry-close-locally cmd 0 params)) ;; no server contact made and this is a write, try starting a server ((and (not (remote-server-url *runremote*)) (not (member cmd api:read-only-queries))) (let ((serverconn (server:check-if-running *toppath*))) (if serverconn (remote-server-url-set! *runremote* serverconn) ;; the string can be consumed by the client setup if needed (if (not (server:start-attempted? *toppath*)) (server:kind-run *toppath*)))) (if (cdr (remote-hh-dat *runremote*)) ;; we are on the homehost, just do the call (begin (mutex-unlock! *rmt-mutex*) (rmt:open-qry-close-locally cmd 0 params)) (begin (mutex-unlock! *rmt-mutex*) (rmt:send-receive cmd rid params attemptnum: attemptnum)))) ;; if not on homehost ensure we have a connection to a live server ((or (not (pair? (remote-hh-dat *runremote*))) ;; have a homehost record? (not (cdr (remote-hh-dat *runremote*))) ;; have record, are we on a homehost? (not (remote-conndat *runremote*))) ;; do we not have a connection? (remote-hh-dat-set! *runremote* (common:get-homehost)) (remote-conndat-set! *runremote* (rmt:get-connection-info 0)) (mutex-unlock! *rmt-mutex*) |
︙ | ︙ | |||
129 130 131 132 133 134 135 | (exit 1))) (begin (debug:print 0 *default-log-port* "WARNING: communication failed. Trying again, try num: " attemptnum) (remote-conndat-set! *runremote* #f) (server-url-set! *runremote* #f) (tasks:start-and-wait-for-server (tasks:open-db) 0 15) (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))))))))) | < | 151 152 153 154 155 156 157 158 159 160 161 162 163 164 | (exit 1))) (begin (debug:print 0 *default-log-port* "WARNING: communication failed. Trying again, try num: " attemptnum) (remote-conndat-set! *runremote* #f) (server-url-set! *runremote* #f) (tasks:start-and-wait-for-server (tasks:open-db) 0 15) (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1))))))))) (define (rmt:update-db-stats run-id rawcmd params duration) (mutex-lock! *db-stats-mutex*) (handle-exceptions exn (begin (debug:print 0 *default-log-port* "WARNING: stats collection failed in update-db-stats") |
︙ | ︙ |