︙ | | | ︙ | |
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
(port #f)
(uuid #f)
(rep #f)
(dbfile #f)
(api-url #f)
(api-uri #f)
(api-req #f)
(status 'starting)
(trynum 0) ;; count the number of ports we've tried
)
(define (servdat->url sdat)
(conc (servdat-host sdat)":"(servdat-port sdat)))
|
>
>
|
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
(port #f)
(uuid #f)
(rep #f)
(dbfile #f)
(api-url #f)
(api-uri #f)
(api-req #f)
(uconn #f)
(mode #f)
(status 'starting)
(trynum 0) ;; count the number of ports we've tried
)
(define (servdat->url sdat)
(conc (servdat-host sdat)":"(servdat-port sdat)))
|
︙ | | | ︙ | |
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
|
;; -> http://abc.com:900/<entrypoint>
;;
(define (conndat->uri conn entrypoint)
(conc "http://"(conndat-ipaddr conn)":"(conndat-port conn)"/"entrypoint))
;; set up the api proc, seems like there should be a better place for this?
(define api-proc (make-parameter conc))
(api-proc api:process-request)
;; do we have a connection to apath dbname and
;; is it not expired? then return it
;;
;; else setup a connection
;;
;; if that fails, return '(#f "some reason") ;; NB// convert to raising an exception
|
>
>
>
|
|
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
|
;; -> http://abc.com:900/<entrypoint>
;;
(define (conndat->uri conn entrypoint)
(conc "http://"(conndat-ipaddr conn)":"(conndat-port conn)"/"entrypoint))
;; set up the api proc, seems like there should be a better place for this?
;;
;; IS THIS NEEDED ANYMORE? TODO - REMOVE IF POSSIBLE
;;
(define api-proc (make-parameter conc))
(api-proc api:execute-requests)
;; do we have a connection to apath dbname and
;; is it not expired? then return it
;;
;; else setup a connection
;;
;; if that fails, return '(#f "some reason") ;; NB// convert to raising an exception
|
︙ | | | ︙ | |
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
|
(assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection")
(if (not *server-info*)
(begin
(thread-sleep! 1)
(loop))
(begin
(servdat-mode-set! *server-info* 'non-db)
(server-uconn *server-info*))))))))
(cond
((and conn ;; conn is NOT a socket, just saying ...
(< (current-seconds) (conndat-expires conn)))
#t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died
((and conn
(>= (current-seconds)(conndat-expires conn)))
(debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.")
(if (conndat-socket conn)
(nng-close! (conndat-socket conn)))
(hash-table-set! conns fullpath #f) ;; clean up
(rmt:open-main-connection remdat apath))
(else
;; Below we will find or create and connect to main
(let* ((dbname (db:run-id->dbname #f))
(the-srv (rmt:find-main-server apath dbname))
(start-main-srv (lambda () ;; call IF there is no the-srv found
|
|
|
|
|
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
|
(assert (< count 30) "FATAL: responder failed to initialize in rmt:open-main-connection")
(if (not *server-info*)
(begin
(thread-sleep! 1)
(loop))
(begin
(servdat-mode-set! *server-info* 'non-db)
(servdat-uconn *server-info*))))))))
(cond
((and conn ;; conn is NOT a socket, just saying ...
(< (current-seconds) (conndat-expires conn)))
#t) ;; we are current and good to go - we'll deal elsewhere with a server that was killed or died
((and conn
(>= (current-seconds)(conndat-expires conn)))
(debug:print-info 0 *default-log-port* "connection to "fullpath" server expired. Reconnecting.")
#;(if (conndat-socket conn)
(nng-close! (conndat-socket conn))) ;; TODO - close the ulex server here?
(hash-table-set! conns fullpath #f) ;; clean up
(rmt:open-main-connection remdat apath))
(else
;; Below we will find or create and connect to main
(let* ((dbname (db:run-id->dbname #f))
(the-srv (rmt:find-main-server apath dbname))
(start-main-srv (lambda () ;; call IF there is no the-srv found
|
︙ | | | ︙ | |
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
|
(fullpath (db:dbname->path apath dbname))
(new-the-srv (make-conndat
apath: apath
dbname: dbname
fullname: fullpath
hostport: srv-addr
socket: (open-nn-connection srv-addr)
ipaddr: ipaddr
port: port
srvpkt: the-srv
srvkey: srvkey ;; generated by rmt:get-signature on the server side
lastmsg: (current-seconds)
expires: (+ (current-seconds) 60) ;; this needs to be gathered during the ping
)))
|
|
|
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
|
(fullpath (db:dbname->path apath dbname))
(new-the-srv (make-conndat
apath: apath
dbname: dbname
fullname: fullpath
hostport: srv-addr
;; socket: (open-nn-connection srv-addr) - TODO - open ulex connection?
ipaddr: ipaddr
port: port
srvpkt: the-srv
srvkey: srvkey ;; generated by rmt:get-signature on the server side
lastmsg: (current-seconds)
expires: (+ (current-seconds) 60) ;; this needs to be gathered during the ping
)))
|
︙ | | | ︙ | |
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
|
(debug:print-logger rmt:log-to-main)))
(cond
((or (not mconn) ;; no channel open to main?
(< (conndat-expires mconn)(+ (current-seconds) 2))) ;; restablish connection if less than 2 seconds on the lease
(if mconn ;; previously opened - clean up NB// consolidate this with the similar code in open main above
(begin
(debug:print-info 0 *default-log-port* "Clearing out connection to main that has expired.")
(nng-close! (conndat-socket mconn))
(hash-table-set! conns fullname #f)))
(rmt:open-main-connection remdat apath)
(rmt:general-open-connection remdat apath mdbname))
((not (rmt:get-conn remdat apath dbname)) ;; no channel open to dbname?
(let* ((res (rmt:send-receive-real remdat apath mdbname 'get-server `(,apath ,dbname))))
(case res
((server-started)
|
|
|
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
|
(debug:print-logger rmt:log-to-main)))
(cond
((or (not mconn) ;; no channel open to main?
(< (conndat-expires mconn)(+ (current-seconds) 2))) ;; restablish connection if less than 2 seconds on the lease
(if mconn ;; previously opened - clean up NB// consolidate this with the similar code in open main above
(begin
(debug:print-info 0 *default-log-port* "Clearing out connection to main that has expired.")
;; (nng-close! (conndat-socket mconn)) ;; TODO - close the ulex server/listener here?
(hash-table-set! conns fullname #f)))
(rmt:open-main-connection remdat apath)
(rmt:general-open-connection remdat apath mdbname))
((not (rmt:get-conn remdat apath dbname)) ;; no channel open to dbname?
(let* ((res (rmt:send-receive-real remdat apath mdbname 'get-server `(,apath ,dbname))))
(case res
((server-started)
|
︙ | | | ︙ | |
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
|
(debug:print-info 0 *default-log-port* "got "res)
(hash-table-set! conns
fullname
(make-conndat
apath: apath
dbname: dbname
hostport: (conc host":"port)
socket: (open-nn-connection (conc host":"port))
ipaddr: ipaddr
port: port
srvkey: servkey
lastmsg: (current-seconds)
expires: (+ (current-seconds) 60))))
(else
(debug:print-info 0 *default-log-port* "return data from starting server did not match host port servkey pid ipaddr apath dbname " res)))
|
|
|
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
|
(debug:print-info 0 *default-log-port* "got "res)
(hash-table-set! conns
fullname
(make-conndat
apath: apath
dbname: dbname
hostport: (conc host":"port)
;; socket: (open-nn-connection (conc host":"port)) ;; TODO - open ulex connection?
ipaddr: ipaddr
port: port
srvkey: servkey
lastmsg: (current-seconds)
expires: (+ (current-seconds) 60))))
(else
(debug:print-info 0 *default-log-port* "return data from starting server did not match host port servkey pid ipaddr apath dbname " res)))
|
︙ | | | ︙ | |
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
|
(let* ((apath *toppath*)
(remdat *remotedat*)
(conns (remotedat-conns remdat)) ;; just checking that remdat is a remotedat
(dbname (db:run-id->dbname rid)))
(if *localmode*
(let* ((dbdat (dbr:dbstruct-get-dbdat *dbstruct* dbname))
(indat `((cmd . ,cmd)(params . ,params))))
(api:process-request *dbstruct* indat)
;; (api:process-request dbdat indat)
)
(begin
(rmt:open-main-connection remdat apath)
(if rid (rmt:general-open-connection remdat apath dbname))
(rmt:send-receive-real remdat apath dbname cmd params)))))
|
>
|
|
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
|
(let* ((apath *toppath*)
(remdat *remotedat*)
(conns (remotedat-conns remdat)) ;; just checking that remdat is a remotedat
(dbname (db:run-id->dbname rid)))
(if *localmode*
(let* ((dbdat (dbr:dbstruct-get-dbdat *dbstruct* dbname))
(indat `((cmd . ,cmd)(params . ,params))))
(api:execute-requests *dbstruct* cmd params)
;; (api:process-request *dbstruct* indat)
;; (api:process-request dbdat indat)
)
(begin
(rmt:open-main-connection remdat apath)
(if rid (rmt:general-open-connection remdat apath dbname))
(rmt:send-receive-real remdat apath dbname cmd params)))))
|
︙ | | | ︙ | |
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
|
(let* ((soc (conndat-socket conn))
(key #f)
(host (conndat-ipaddr conn))
(port (conndat-port conn))
(payload `((cmd . ,cmd)
(key . ,(conndat-srvkey conn))
(params . ,params)))
(res (send-receive-nn soc ;; (open-send-receive-nn (conc host":"port)
(sexpr->string payload))))
(if (member res '("#<unspecified>")) ;; TODO - fix this in string->sexpr
#f
(string->sexpr res)))))
;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
;; sometime in the future.
;;
|
|
<
|
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
|
(let* ((soc (conndat-socket conn))
(key #f)
(host (conndat-ipaddr conn))
(port (conndat-port conn))
(payload `((cmd . ,cmd)
(key . ,(conndat-srvkey conn))
(params . ,params)))
(res (send-receive soc payload)))
(if (member res '("#<unspecified>")) ;; TODO - fix this in string->sexpr
#f
(string->sexpr res)))))
;; db is at apath/.db/dbname, rid is an intermediary solution and will be removed
;; sometime in the future.
;;
|
︙ | | | ︙ | |
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
|
;;
;; conn is a conndat record
;;
(define (server:ping conn #!key (do-exit #f))
(let* ((req (conndat-socket conn))
(srvkey (conndat-srvkey conn))
(msg (sexpr->string '(ping ,srvkey))))
(send-receive-nn req msg))) ;; (server-ready? host port server-id))
;;======================================================================
;; http-transportmod.scm contents moved here
;;======================================================================
(define (http-transport:make-server-url hostport)
(if (not hostport)
|
|
|
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
|
;;
;; conn is a conndat record
;;
(define (server:ping conn #!key (do-exit #f))
(let* ((req (conndat-socket conn))
(srvkey (conndat-srvkey conn))
(msg (sexpr->string '(ping ,srvkey))))
(send-receive req msg))) ;; (server-ready? host port server-id))
;;======================================================================
;; http-transportmod.scm contents moved here
;;======================================================================
(define (http-transport:make-server-url hostport)
(if (not hostport)
|
︙ | | | ︙ | |
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
|
(if *server-info*
(let* ((uconn (servdat-uconn *server-info*)))
(wait-and-close uconn))
(let* ((port (portlogger:open-run-close portlogger:find-port))
(handler-proc (lambda (rem-host-port qrykey cmd params) ;;
(api:execute-requests *dbstruct-db* cmd params))))
;; (api:process-request *dbstuct-db*
(set! *server-info* (make-servdat host: ipaddrstr port: port))
(let* ((uconn (run-listener handler-proc suggested-port: port))
(rport (udat-port uconn))) ;; the real port
(servdat-host-set! *server-info* hostn)
(servdat-port-set! *server-info* rport)
(servdat-uconn-set! *server-info* uconn)
(wait-and-close uconn)
(db:print-current-query-stats)
)))
(let* ((host (servdat-host *servdat-info*))
(port (servdat-port *servdat-info*))
(mode (or (servdat-mode *servdat-mode*)
"non-db")))
;; server exit stuff here
;; (rmt:server-shutdown host port) - always do in on-exit
;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit
(debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting")
))
|
|
|
|
|
|
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
|
(if *server-info*
(let* ((uconn (servdat-uconn *server-info*)))
(wait-and-close uconn))
(let* ((port (portlogger:open-run-close portlogger:find-port))
(handler-proc (lambda (rem-host-port qrykey cmd params) ;;
(api:execute-requests *dbstruct-db* cmd params))))
;; (api:process-request *dbstuct-db*
(set! *server-info* (make-servdat host: hostn port: port))
(let* ((uconn (run-listener handler-proc suggested-port: port))
(rport (udat-port uconn))) ;; the real port
(servdat-host-set! *server-info* hostn)
(servdat-port-set! *server-info* rport)
(servdat-uconn-set! *server-info* uconn)
(wait-and-close uconn)
(db:print-current-query-stats)
)))
(let* ((host (servdat-host *server-info*))
(port (servdat-port *server-info*))
(mode (or (servdat-mode *server-info*)
"non-db")))
;; server exit stuff here
;; (rmt:server-shutdown host port) - always do in on-exit
;; (portlogger:open-run-close portlogger:set-port port "released") ;; moved to on-exit
(debug:print-info 0 *default-log-port* "Server "host":"port" mode "mode"shutdown complete. Exiting")
))
|
︙ | | | ︙ | |
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
|
(conc (alist-ref 'host srv-pkt) ":"
(alist-ref 'port srv-pkt)))
(define (server-ready? host port key) ;; server-address is host:port
(let* ((data (sexpr->string `((cmd . ping)
(key . ,key)
(params . ()))))
(res (open-send-receive-nn (conc host ":" port) data)))
(if res
(string->sexpr res)
res)))
; from the pkts return servers associated with dbpath
;; NOTE: Only one can be alive - have to check on each
;; in the list of pkts returned
|
|
|
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
|
(conc (alist-ref 'host srv-pkt) ":"
(alist-ref 'port srv-pkt)))
(define (server-ready? host port key) ;; server-address is host:port
(let* ((data (sexpr->string `((cmd . ping)
(key . ,key)
(params . ()))))
(res (send-receive (conc host ":" port) data)))
(if res
(string->sexpr res)
res)))
; from the pkts return servers associated with dbpath
;; NOTE: Only one can be alive - have to check on each
;; in the list of pkts returned
|
︙ | | | ︙ | |
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
|
(if (string-search (regexp (conc ":" port-num)) inl)
(begin
;(print "Output: " inl)
(set! ret #t))
(loop (read-line inp)))))))
ret))
(define (open-nn-connection host-port)
(let ((req (make-req-socket))
(uri (conc "tcp://" host-port)))
(nng-dial req uri)
(socket-set! req 'nng/recvtimeo 2000)
req))
(define (send-receive-nn req msg)
(nng-send req msg)
(nng-recv req))
(define (close-nn-connection req)
(nng-close! req))
;; ;; open connection to server, send message, close connection
;; ;;
;; (define (open-send-close-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds
;; (let ((req (make-req-socket 'req))
;; (uri (conc "tcp://" host-port))
|
|
|
|
|
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
|
(if (string-search (regexp (conc ":" port-num)) inl)
(begin
;(print "Output: " inl)
(set! ret #t))
(loop (read-line inp)))))))
ret))
#;(define (open-nn-connection host-port)
(let ((req (make-req-socket))
(uri (conc "tcp://" host-port)))
(nng-dial req uri)
(socket-set! req 'nng/recvtimeo 2000)
req))
#;(define (send-receive-nn req msg)
(nng-send req msg)
(nng-recv req))
#;(define (close-nn-connection req)
(nng-close! req))
;; ;; open connection to server, send message, close connection
;; ;;
;; (define (open-send-close-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds
;; (let ((req (make-req-socket 'req))
;; (uri (conc "tcp://" host-port))
|
︙ | | | ︙ | |
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
|
;; (thread-terminate! th1))
;; "timer thread")))
;; (thread-start! th1)
;; (thread-start! th2)
;; (thread-join! th1)
;; res))))
;;
(define (open-send-receive-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds
(let ((req (make-req-socket))
(uri (conc "tcp://" host-port))
(res #f))
(handle-exceptions
exn
(let ((emsg ((condition-property-accessor 'exn 'message) exn)))
;; Send notification
|
|
|
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
|
;; (thread-terminate! th1))
;; "timer thread")))
;; (thread-start! th1)
;; (thread-start! th2)
;; (thread-join! th1)
;; res))))
;;
#;(define (open-send-receive-nn host-port msg #!key (timeout 3) ) ;; default timeout is 3 seconds
(let ((req (make-req-socket))
(uri (conc "tcp://" host-port))
(res #f))
(handle-exceptions
exn
(let ((emsg ((condition-property-accessor 'exn 'message) exn)))
;; Send notification
|
︙ | | | ︙ | |