Megatest

Check-in [befb5004e8]
Login
Overview
Comment:Corrected part of broken sync
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | try-nanomsg
Files: files | file ages | folders
SHA1: befb5004e8aa75e3fe01cbb6e8ebf2d5b703b889
User & Date: matt on 2014-11-26 23:49:37
Other Links: branch diff | manifest | tags
Context
2014-11-28
16:01
Switched to using exceptions to pass back client/server communicatioin issues check-in: b8180b058c user: matt tags: try-nanomsg
2014-11-26
23:49
Corrected part of broken sync check-in: befb5004e8 user: matt tags: try-nanomsg
23:09
nanomsg transport fixed. check-in: 675b89e392 user: matt tags: try-nanomsg
Changes

Modified api.scm from [52a89446cf] to [8d21d552d9].

122
123
124
125
126
127
128

129
130
131
132
133
134
135
136
	   ((login)                        (apply db:login dbstruct params))
	   ((general-call)                 (let ((stmtname   (car params))
						 (run-id     (cadr params))
						 (realparams (cddr params)))
					     (db:with-db dbstruct run-id #t ;; these are all for modifying the db
							 (lambda (db)
							   (db:general-call db stmtname realparams)))))

	   ((sync-inmem->db)               (db:sync-touched dbstruct run-id force-sync: #t))
	   ((sdb-qry)                      (apply sdb:qry params))
	   ((ping)                         (current-process-id))

	   ;; TESTMETA
	   ((testmeta-get-record)       (apply db:testmeta-get-record dbstruct params))
	   ((testmeta-add-record)       (apply db:testmeta-add-record dbstruct params))
	   ((testmeta-update-field)     (apply db:testmeta-update-field dbstruct params)))))







>
|







122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
	   ((login)                        (apply db:login dbstruct params))
	   ((general-call)                 (let ((stmtname   (car params))
						 (run-id     (cadr params))
						 (realparams (cddr params)))
					     (db:with-db dbstruct run-id #t ;; these are all for modifying the db
							 (lambda (db)
							   (db:general-call db stmtname realparams)))))
	   ((sync-inmem->db)               (let ((run-id (car params)))
					     (db:sync-touched dbstruct run-id force-sync: #t)))
	   ((sdb-qry)                      (apply sdb:qry params))
	   ((ping)                         (current-process-id))

	   ;; TESTMETA
	   ((testmeta-get-record)       (apply db:testmeta-get-record dbstruct params))
	   ((testmeta-add-record)       (apply db:testmeta-add-record dbstruct params))
	   ((testmeta-update-field)     (apply db:testmeta-update-field dbstruct params)))))

Modified db.scm from [75e6e604f1] to [3d1cef4541].

323
324
325
326
327
328
329
330
331

332
333
334
335
336
337
338
	(if (or (not (number? mtime))
		(not (number? stime))
		(> mtime stime)
		force-sync)
	    (begin
	      (db:delay-if-busy rundb)
	      (db:delay-if-busy olddb)
	      (let ((num-synced (db:sync-tables db:sync-tests-only inmem refdb rundb olddb)))
		(dbr:dbstruct-set-stime! dbstruct (current-milliseconds))

		(mutex-unlock! *http-mutex*)
		num-synced)
	      (begin
		(mutex-unlock! *http-mutex*)
		0))))))

(define (db:close-main dbstruct)







<
|
>







323
324
325
326
327
328
329

330
331
332
333
334
335
336
337
338
	(if (or (not (number? mtime))
		(not (number? stime))
		(> mtime stime)
		force-sync)
	    (begin
	      (db:delay-if-busy rundb)
	      (db:delay-if-busy olddb)

	      (dbr:dbstruct-set-stime! dbstruct (current-milliseconds))
	      (let ((num-synced (db:sync-tables db:sync-tests-only inmem refdb rundb olddb)))
		(mutex-unlock! *http-mutex*)
		num-synced)
	      (begin
		(mutex-unlock! *http-mutex*)
		0))))))

(define (db:close-main dbstruct)

Modified nmsg-transport.scm from [a65dcd51b0] to [fe6d9123a8].

76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
	  (tasks:server-set-interface-port (db:delay-if-busy tdbdat) server-id interface start-port)
	  (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep")
	  (set! *server-info* (list hostn start-port)) ;; probably not needed anymore? currently used by keep-running
	  (thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access
	  (set! *inmemdb*  dbstruct)
	  (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running")
	  (thread-start! (make-thread
			  (lambda ()(nmsg-transport:keep-running server-id))
			  "keep running"))
	  (thread-join! server-thread))
	(if (> retrynum 0)
	    (begin
	      (debug:print 0 "WARNING: Failed to connect to server (self) on host " hostn ":" start-port ", trying again.")
	      (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature")
	      (portlogger:open-run-close portlogger:set-failed start-port)







|







76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
	  (tasks:server-set-interface-port (db:delay-if-busy tdbdat) server-id interface start-port)
	  (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep")
	  (set! *server-info* (list hostn start-port)) ;; probably not needed anymore? currently used by keep-running
	  (thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access
	  (set! *inmemdb*  dbstruct)
	  (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running")
	  (thread-start! (make-thread
			  (lambda ()(nmsg-transport:keep-running server-id run-id))
			  "keep running"))
	  (thread-join! server-thread))
	(if (> retrynum 0)
	    (begin
	      (debug:print 0 "WARNING: Failed to connect to server (self) on host " hostn ":" start-port ", trying again.")
	      (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature")
	      (portlogger:open-run-close portlogger:set-failed start-port)
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
     (thread-join! send-recv)
     (if success (thread-terminate! timeout)))
    (vector success result)))

;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (nmsg-transport:keep-running server-id)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
                        (let ((sdat #f))
                          (mutex-lock! *heartbeat-mutex*)
                          (set! sdat *server-info*)







|







239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
     (thread-join! send-recv)
     (if success (thread-terminate! timeout)))
    (vector success result)))

;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (nmsg-transport:keep-running server-id run-id)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
                        (let ((sdat #f))
                          (mutex-lock! *heartbeat-mutex*)
                          (set! sdat *server-info*)
276
277
278
279
280
281
282
283
284
285
286

287
288
289
290
291
292
293
      (thread-sleep! 4) ;; no need to do this very often
      ;; NB// sync currently does NOT return queue-length
      (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1)))
      ;; (print "Server running, count is " count)
        (if (< count 1) ;; 3x3 = 9 secs aprox
            (loop (+ count 1)))
        
        ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
        (mutex-lock! *heartbeat-mutex*)
        (set! last-access *last-db-access*)
        (mutex-unlock! *heartbeat-mutex*)

        (if (and *server-run*
	       (> (+ last-access server-timeout)
		  (current-seconds)))
            (begin
              (debug:print-info 0 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
              (loop 0))
            (begin







<



>







276
277
278
279
280
281
282

283
284
285
286
287
288
289
290
291
292
293
      (thread-sleep! 4) ;; no need to do this very often
      ;; NB// sync currently does NOT return queue-length
      (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1)))
      ;; (print "Server running, count is " count)
        (if (< count 1) ;; 3x3 = 9 secs aprox
            (loop (+ count 1)))
        

        (mutex-lock! *heartbeat-mutex*)
        (set! last-access *last-db-access*)
        (mutex-unlock! *heartbeat-mutex*)
	(db:sync-touched *inmemdb* run-id force-sync: #t)
        (if (and *server-run*
	       (> (+ last-access server-timeout)
		  (current-seconds)))
            (begin
              (debug:print-info 0 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
              (loop 0))
            (begin

Modified rmt.scm from [47078702f7] to [dbecc93316].

62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
	cinfo
	;; NB// can cache the answer for server running for 10 seconds ...
	;;  ;; (and (not (rmt:write-frequency-over-limit? cmd run-id))
	(if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id)
	    (client:setup run-id)
	    #f))))

(define (rmt:send-receive cmd rid params #!key (attemptnum 0))
  ;; clean out old connections
  (mutex-lock! *db-multi-sync-mutex*)
  ;; (let ((expire-time (- (current-seconds) 60)))
  ;;   (for-each 
  ;;    (lambda (run-id)
  ;;      (let ((connection (hash-table-ref/default *runremote* run-id #f)))
  ;;        (if (and connection 







|







62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
	cinfo
	;; NB// can cache the answer for server running for 10 seconds ...
	;;  ;; (and (not (rmt:write-frequency-over-limit? cmd run-id))
	(if (tasks:server-running-or-starting? (db:delay-if-busy (tasks:open-db)) run-id)
	    (client:setup run-id)
	    #f))))

(define (rmt:send-receive cmd rid params #!key (attemptnum 1)) ;; start attemptnum at 1 so the modulo below works as expected
  ;; clean out old connections
  (mutex-lock! *db-multi-sync-mutex*)
  ;; (let ((expire-time (- (current-seconds) 60)))
  ;;   (for-each 
  ;;    (lambda (run-id)
  ;;      (let ((connection (hash-table-ref/default *runremote* run-id #f)))
  ;;        (if (and connection 
99
100
101
102
103
104
105

106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121

122
123
124
125
126
127
128
		((http)(db:string->obj res))
		((nmsg)(vector-ref res 1)))
	      (begin ;; let ((new-connection-info (client:setup run-id)))
		(debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.")
		;; (case *transport-type*
		;;   ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info))))
		(hash-table-delete! *runremote* run-id) ;; don't keep using the same connection

		(tasks:kill-server-run-id run-id tag: "api-send-receive-failed")
		(tasks:start-and-wait-for-server (tasks:open-db) run-id 15)
		;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1))))))

		;; no longer killing the server in http-transport:client-api-send-receive
		;; may kill it here but what are the criteria?
		;; start with three calls then kill server
		;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id))
		;; (thread-sleep! 2)
		(rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))))
	;; no connection info? try to start a server
	(if (and (< attemptnum 10)
		 (tasks:need-server run-id))
	    (begin
	      (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10)
	      (hash-table-delete! *runremote* run-id)

	      (client:setup run-id)
	      (thread-sleep! (random 5)) ;; give some time to settle and minimize collison?
	      (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1)))
	    (begin
	      (debug:print 0 "ERROR: Communication failed!")
	      (exit)
	      ;; (rmt:open-qry-close-locally cmd run-id params))))







>
|










|


<

>







99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120

121
122
123
124
125
126
127
128
129
		((http)(db:string->obj res))
		((nmsg)(vector-ref res 1)))
	      (begin ;; let ((new-connection-info (client:setup run-id)))
		(debug:print 0 "WARNING: Communication failed, trying call to http-transport:client-api-send-receive again.")
		;; (case *transport-type*
		;;   ((nmsg)(nn-close (http-transport:server-dat-get-socket connection-info))))
		(hash-table-delete! *runremote* run-id) ;; don't keep using the same connection
		(if (eq? (modulo attemptnum 5) 0)
		    (tasks:kill-server-run-id run-id tag: "api-send-receive-failed"))
		(tasks:start-and-wait-for-server (tasks:open-db) run-id 15)
		;; (nmsg-transport:client-api-send-receive run-id connection-info cmd param remtries: (- remtries 1))))))

		;; no longer killing the server in http-transport:client-api-send-receive
		;; may kill it here but what are the criteria?
		;; start with three calls then kill server
		;; (if (eq? attemptnum 3)(tasks:kill-server-run-id run-id))
		;; (thread-sleep! 2)
		(rmt:send-receive cmd run-id params attemptnum: (+ attemptnum 1)))))
	;; no connection info? try to start a server
	(if (and (< attemptnum 15)
		 (tasks:need-server run-id))
	    (begin

	      (hash-table-delete! *runremote* run-id)
	      (tasks:start-and-wait-for-server (db:delay-if-busy (tasks:open-db)) run-id 10)
	      (client:setup run-id)
	      (thread-sleep! (random 5)) ;; give some time to settle and minimize collison?
	      (rmt:send-receive cmd rid params attemptnum: (+ attemptnum 1)))
	    (begin
	      (debug:print 0 "ERROR: Communication failed!")
	      (exit)
	      ;; (rmt:open-qry-close-locally cmd run-id params))))