Megatest

Check-in [eab23b866a]
Login
Overview
Comment:Basic server functioning, responding to ping and login
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | try-nanomsg
Files: files | file ages | folders
SHA1: eab23b866aa87d5f998b3d9ebae67bb5afbba3ec
User & Date: matt on 2014-11-23 20:56:49
Other Links: branch diff | manifest | tags
Context
2014-11-23
23:13
ping and login working check-in: 5f6b43d51b user: matt tags: try-nanomsg
20:56
Basic server functioning, responding to ping and login check-in: eab23b866a user: matt tags: try-nanomsg
18:14
Partial implementation. Not yet functional check-in: 120292c013 user: matt tags: try-nanomsg
Changes

Modified api.scm from [0d03b6cbef] to [cd180ba59c].

48
49
50
51
52
53
54

55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131


132
133
134
135
136
137
138
139
140
    login
    testmeta-get-record))

;; These are called by the server on recipt of /api calls
;;    - keep it simple, only return the actual result of the call, i.e. no meta info here
;;
(define (api:execute-requests dbstruct cmd params)

  (case (string->symbol cmd)
    ;; SERVERS
    ((start-server)                 (apply server:kind-run params))
    ((kill-server)                  (set! *server-run* #f))

    ;; KEYS
    ((get-key-val-pairs)            (apply db:get-key-val-pairs dbstruct params))
    ((get-keys)                     (db:get-keys dbstruct))

    ;; TESTS
    ((test-toplevel-num-items)         (apply db:test-toplevel-num-items dbstruct params))
    ((get-test-info-by-id)	       (apply db:get-test-info-by-id dbstruct params))
    ((test-get-rundir-from-test-id)    (apply db:test-get-rundir-from-test-id dbstruct params))
    ((test-set-state-status-by-id)     (apply db:test-set-state-status-by-id dbstruct params))
    ((get-count-tests-running)         (apply db:get-count-tests-running dbstruct params))
    ((get-count-tests-running-in-jobgroup) (apply db:get-count-tests-running-in-jobgroup dbstruct params))
    ((delete-test-records)             (apply db:delete-test-records dbstruct params))
    ;; ((delete-test-step-records)        (apply db:delete-test-step-records dbstruct params))
    ((delete-old-deleted-test-records) (apply db:delete-old-deleted-test-records dbstruct params))
    ((test-set-status-state)           (apply db:test-set-status-state dbstruct params))
    ((get-previous-test-run-record)    (apply db:get-previous-test-run-record dbstruct params))
    ((get-matching-previous-test-run-records)(apply db:get-matching-previous-test-run-records dbstruct params))
    ((test-get-logfile-info)           (apply db:test-get-logfile-info dbstruct params))
    ((test-get-records-for-index-file)  (apply db:test-get-records-for-index-file dbstruct params))
    ((get-testinfo-state-status)       (apply db:get-testinfo-state-status dbstruct params))
    ((test-set-top-process-pid)        (apply db:test-set-top-process-pid dbstruct params))
    ((test-get-top-process-pid)        (apply db:test-get-top-process-pid dbstruct params))
    ((test-get-paths-matching-keynames-target-new) (apply db:test-get-paths-matching-keynames-target-new dbstruct params))
    ((get-prereqs-not-met)             (apply db:get-prereqs-not-met dbstruct params))
    ((roll-up-pass-fail-counts)        (apply db:roll-up-pass-fail-counts dbstruct params))
    ((update-fail-pass-counts)         (apply db:general-call dbstruct 'update-pass-fail-counts params))
    ((get-count-tests-running-for-run-id) (apply db:get-count-tests-running-for-run-id dbstruct params))

    ;; RUNS
    ((get-run-info)                 (apply db:get-run-info dbstruct params))
    ((get-run-status)               (apply db:get-run-status dbstruct params))
    ((set-run-status)               (apply db:set-run-status dbstruct params))
    ((register-run)                 (apply db:register-run dbstruct params))
    ((set-tests-state-status)       (apply db:set-tests-state-status dbstruct params))
    ((get-tests-for-run)            (apply db:get-tests-for-run dbstruct params))
    ((get-test-id)                  (apply db:get-test-id dbstruct params))
    ((get-tests-for-runs-mindata)   (apply db:get-tests-for-runs-mindata dbstruct params))
    ((get-run-name-from-id)         (apply db:get-run-name-from-id dbstruct params))
    ((delete-run)                   (apply db:delete-run dbstruct params))
    ((get-runs)                     (apply db:get-runs dbstruct params))
    ((get-all-run-ids)              (db:get-all-run-ids dbstruct))
    ((get-prev-run-ids)             (apply db:get-prev-run-ids dbstruct params))
    ((get-run-ids-matching-target)  (apply db:get-run-ids-matching-target dbstruct params))
    ((get-runs-by-patt)             (apply db:get-runs-by-patt dbstruct params))
    ((lock/unlock-run)              (apply db:lock/unlock-run dbstruct params))
    ((update-run-event_time)        (apply db:update-run-event_time dbstruct params))
    ((find-and-mark-incomplete)     (apply db:find-and-mark-incomplete dbstruct params))

    ;; STEPS
    ((teststep-set-status!)         (apply db:teststep-set-status! dbstruct params))

    ;; TEST DATA
    ((test-data-rollup)             (apply db:test-data-rollup dbstruct params))
    ((csv->test-data)               (apply db:csv->test-data dbstruct params))
    ((get-steps-data)               (apply db:get-steps-data dbstruct params))

    ;; MISC
    ((login)                        (apply db:login dbstruct params))
    ((general-call)                 (let ((stmtname   (car params))
					  (run-id     (cadr params))
					  (realparams (cddr params)))
				      (db:with-db dbstruct run-id #t ;; these are all for modifying the db
						  (lambda (db)
						    (db:general-call db stmtname realparams)))))
    ((sync-inmem->db)               (db:sync-touched dbstruct run-id force-sync: #t))
    ((sdb-qry)                      (apply sdb:qry params))
    ((ping)                         (current-process-id))

    ;; TESTMETA
    ((testmeta-get-record)       (apply db:testmeta-get-record dbstruct params))
    ((testmeta-add-record)       (apply db:testmeta-add-record dbstruct params))
    ((testmeta-update-field)     (apply db:testmeta-update-field dbstruct params))


    (else
     (list "ERROR" 0))))

;; http-server  send-response
;;                 api:process-request
;;                    db:*
;;
;; NB// Runs on the server as part of the server loop
;;







>
|
|
|
|

|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|

|
|

|
|
|
|

|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
>
>
|
|







48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
    login
    testmeta-get-record))

;; These are called by the server on recipt of /api calls
;;    - keep it simple, only return the actual result of the call, i.e. no meta info here
;;
(define (api:execute-requests dbstruct cmd params)
  (let ((res
	 (case (string->symbol cmd)
	   ;; SERVERS
	   ((start-server)                 (apply server:kind-run params))
	   ((kill-server)                  (set! *server-run* #f))

	   ;; KEYS
	   ((get-key-val-pairs)            (apply db:get-key-val-pairs dbstruct params))
	   ((get-keys)                     (db:get-keys dbstruct))

	   ;; TESTS
	   ((test-toplevel-num-items)         (apply db:test-toplevel-num-items dbstruct params))
	   ((get-test-info-by-id)	       (apply db:get-test-info-by-id dbstruct params))
	   ((test-get-rundir-from-test-id)    (apply db:test-get-rundir-from-test-id dbstruct params))
	   ((test-set-state-status-by-id)     (apply db:test-set-state-status-by-id dbstruct params))
	   ((get-count-tests-running)         (apply db:get-count-tests-running dbstruct params))
	   ((get-count-tests-running-in-jobgroup) (apply db:get-count-tests-running-in-jobgroup dbstruct params))
	   ((delete-test-records)             (apply db:delete-test-records dbstruct params))
	   ;; ((delete-test-step-records)        (apply db:delete-test-step-records dbstruct params))
	   ((delete-old-deleted-test-records) (apply db:delete-old-deleted-test-records dbstruct params))
	   ((test-set-status-state)           (apply db:test-set-status-state dbstruct params))
	   ((get-previous-test-run-record)    (apply db:get-previous-test-run-record dbstruct params))
	   ((get-matching-previous-test-run-records)(apply db:get-matching-previous-test-run-records dbstruct params))
	   ((test-get-logfile-info)           (apply db:test-get-logfile-info dbstruct params))
	   ((test-get-records-for-index-file)  (apply db:test-get-records-for-index-file dbstruct params))
	   ((get-testinfo-state-status)       (apply db:get-testinfo-state-status dbstruct params))
	   ((test-set-top-process-pid)        (apply db:test-set-top-process-pid dbstruct params))
	   ((test-get-top-process-pid)        (apply db:test-get-top-process-pid dbstruct params))
	   ((test-get-paths-matching-keynames-target-new) (apply db:test-get-paths-matching-keynames-target-new dbstruct params))
	   ((get-prereqs-not-met)             (apply db:get-prereqs-not-met dbstruct params))
	   ((roll-up-pass-fail-counts)        (apply db:roll-up-pass-fail-counts dbstruct params))
	   ((update-fail-pass-counts)         (apply db:general-call dbstruct 'update-pass-fail-counts params))
	   ((get-count-tests-running-for-run-id) (apply db:get-count-tests-running-for-run-id dbstruct params))

	   ;; RUNS
	   ((get-run-info)                 (apply db:get-run-info dbstruct params))
	   ((get-run-status)               (apply db:get-run-status dbstruct params))
	   ((set-run-status)               (apply db:set-run-status dbstruct params))
	   ((register-run)                 (apply db:register-run dbstruct params))
	   ((set-tests-state-status)       (apply db:set-tests-state-status dbstruct params))
	   ((get-tests-for-run)            (apply db:get-tests-for-run dbstruct params))
	   ((get-test-id)                  (apply db:get-test-id dbstruct params))
	   ((get-tests-for-runs-mindata)   (apply db:get-tests-for-runs-mindata dbstruct params))
	   ((get-run-name-from-id)         (apply db:get-run-name-from-id dbstruct params))
	   ((delete-run)                   (apply db:delete-run dbstruct params))
	   ((get-runs)                     (apply db:get-runs dbstruct params))
	   ((get-all-run-ids)              (db:get-all-run-ids dbstruct))
	   ((get-prev-run-ids)             (apply db:get-prev-run-ids dbstruct params))
	   ((get-run-ids-matching-target)  (apply db:get-run-ids-matching-target dbstruct params))
	   ((get-runs-by-patt)             (apply db:get-runs-by-patt dbstruct params))
	   ((lock/unlock-run)              (apply db:lock/unlock-run dbstruct params))
	   ((update-run-event_time)        (apply db:update-run-event_time dbstruct params))
	   ((find-and-mark-incomplete)     (apply db:find-and-mark-incomplete dbstruct params))

	   ;; STEPS
	   ((teststep-set-status!)         (apply db:teststep-set-status! dbstruct params))

	   ;; TEST DATA
	   ((test-data-rollup)             (apply db:test-data-rollup dbstruct params))
	   ((csv->test-data)               (apply db:csv->test-data dbstruct params))
	   ((get-steps-data)               (apply db:get-steps-data dbstruct params))

	   ;; MISC
	   ((login)                        (apply db:login dbstruct params))
	   ((general-call)                 (let ((stmtname   (car params))
						 (run-id     (cadr params))
						 (realparams (cddr params)))
					     (db:with-db dbstruct run-id #t ;; these are all for modifying the db
							 (lambda (db)
							   (db:general-call db stmtname realparams)))))
	   ((sync-inmem->db)               (db:sync-touched dbstruct run-id force-sync: #t))
	   ((sdb-qry)                      (apply sdb:qry params))
	   ((ping)                         (current-process-id))

	   ;; TESTMETA
	   ((testmeta-get-record)       (apply db:testmeta-get-record dbstruct params))
	   ((testmeta-add-record)       (apply db:testmeta-add-record dbstruct params))
	   ((testmeta-update-field)     (apply db:testmeta-update-field dbstruct params)))))
    (vector #t res)))
	 ;; NO ELSE - let it return undef
	 ;;(else
	 ;; (list "ERROR" 0))))

;; http-server  send-response
;;                 api:process-request
;;                    db:*
;;
;; NB// Runs on the server as part of the server loop
;;

Modified db.scm from [930b139284] to [f9ca78a55d].

2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
     (string-substitute
      (regexp "=") "_"
      (base64:base64-encode 
       (z3:encode-buffer
	(with-output-to-string
	  (lambda ()(serialize obj)))))
      #t))
    ((zmq nm)(with-output-to-string (lambda ()(serialize obj))))
    (else obj)))

(define (db:string->obj msg #!key (transport 'http))
  (case transport
    ;; ((fs) msg)
    ((http fs)
     (if (string? msg)
	 (with-input-from-string 
	     (z3:decode-buffer
	      (base64:base64-decode
	       (string-substitute 
		(regexp "_") "=" msg #t)))
	   (lambda ()(deserialize)))
	 (begin
	   (debug:print 0 "ERROR: reception failed. Received " msg " but cannot translate it.")
	   #f))) ;; crude reply for when things go awry
    ((zmq nm)(with-input-from-string msg (lambda ()(deserialize))))
    (else msg)))

(define (db:test-set-status-state dbstruct run-id test-id status state msg)
  (let ((dbdat  (db:get-db dbstruct run-id)))
    (if (member state '("LAUNCHED" "REMOTEHOSTSTART"))
	(db:general-call dbdat 'set-test-start-time (list test-id)))
    (if msg







|
















|







2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
     (string-substitute
      (regexp "=") "_"
      (base64:base64-encode 
       (z3:encode-buffer
	(with-output-to-string
	  (lambda ()(serialize obj)))))
      #t))
    ((zmq nmsg)(with-output-to-string (lambda ()(serialize obj))))
    (else obj)))

(define (db:string->obj msg #!key (transport 'http))
  (case transport
    ;; ((fs) msg)
    ((http fs)
     (if (string? msg)
	 (with-input-from-string 
	     (z3:decode-buffer
	      (base64:base64-decode
	       (string-substitute 
		(regexp "_") "=" msg #t)))
	   (lambda ()(deserialize)))
	 (begin
	   (debug:print 0 "ERROR: reception failed. Received " msg " but cannot translate it.")
	   #f))) ;; crude reply for when things go awry
    ((zmq nmsg)(with-input-from-string msg (lambda ()(deserialize))))
    (else msg)))

(define (db:test-set-status-state dbstruct run-id test-id status state msg)
  (let ((dbdat  (db:get-db dbstruct run-id)))
    (if (member state '("LAUNCHED" "REMOTEHOSTSTART"))
	(db:general-call dbdat 'set-test-start-time (list test-id)))
    (if msg

Modified nmsg-transport.scm from [2f563bf500] to [5a0952db48].

67
68
69
70
71
72
73
74
75
76
77
78
79
80
81


82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
  (debug:print 2 "Attempting to start the server ...")
  (let* ((start-port      (portlogger:open-run-close portlogger:find-port))
	 (server-thread   (make-thread (lambda ()
					 (nmsg-transport:try-start-server dbstruct run-id start-port server-id))
				       "server thread"))
	 (tdbdat          (tasks:open-db)))
    (thread-start! server-thread)
    (if (nmsg-transport:ping hostn start-port)
	(begin
	  (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep")
	  (set! *server-info* (list hostn start-port)) ;; probably not needed anymore?
	  (thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access
	  (set! *inmemdb*  dbstruct)
	  (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running")
	  (thread-start! nmsg-transport:keep-running)


	  (thread-join! server-thread))
	(begin
	  (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature")
	  (portlogger:open-run-close portlogger:set-failed start-port)
	  (nmsg-transport:run dbstruct hostn run-id server-id)))))

(define (nmsg-transport:try-start-server dbstruct run-id portnum server-id)
  (let ((repsoc (nn-socket 'rep)))
    (nn-bind repsoc (conc "tcp://*:" portnum))
    (let loop ((msg-in (nn-recv repsoc)))
      (cond
       ((equal? msg-in "quit")
	(nn-send repsoc "Ok, quitting"))
       ((and (>= (string-length msg-in) 4)
	     (equal? (substring msg-in 0 4) "ping"))
	(nn-send repsoc (conc (current-process-id)))
	(loop (nn-recv repsoc)))
       (else
	(let* ((dat    (db:string->obj msg-in transport: 'nm))
	       (cmd    (vector-ref dat 0))
	       (params (vector-ref dat 1))
	       (result (api:execute-requests dbstruct cmd params))
	       (newdat (db:obj->string result transport: 'nm)))
	  (nn-send repsoc newdat)
	  (loop (nn-recv repsoc))))))))

;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (nmsg-transport:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
			(let ((sdat #f))
			  (mutex-lock! *heartbeat-mutex*)
			  (set! sdat *server-info*)
			  (mutex-unlock! *heartbeat-mutex*)
			  (if sdat sdat
			      (begin
				(debug:print 12 "WARNING: server not started yet, waiting few seconds before trying again")
				(sleep 4)
				(loop))))))
	 (iface       (cadr server-info))
	 (pullport    (caddr server-info))
	 (pubport     (cadddr server-info)) ;; id interface pullport pubport)
	 ;; (nmsg-sockets (nmsg-transport:client-connect iface pullport pubport))
	 (last-access 0))
    (debug:print-info 11 "heartbeat started for nmsg server on " iface " " pullport " " pubport)
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      ;; NB// sync currently does NOT return queue-length
      ;; GET REAL QUEUE LENGTH FROM THE VARIABLE
      (let ((queue-len 0)) ;; FOR NOW DO NOT DO THIS (cdb:client-call nmsg-sockets 'sync #t 1)))
      ;; (print "Server running, count is " count)
	(if (< count 1) ;; 3x3 = 9 secs aprox
	    (loop (+ count 1)))

	;; NOTE: Get rid of this mechanism! It really is not needed...
	;; (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))

	;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
	(mutex-lock! *heartbeat-mutex*)
	(set! last-access *last-db-access*)
	(mutex-unlock! *heartbeat-mutex*)
	(if (> (+ last-access
		  ;; (* 50 60 60)    ;; 48 hrs
		  ;; 60              ;; one minute
		  ;; (* 60 60)       ;; one hour
		  (* 45 60)          ;; 45 minutes, until the db deletion bug is fixed.
		  )
	       (current-seconds))
	    (begin
	      (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
	      (loop 0))
	    (begin
	      (debug:print-info 0 "Starting to shutdown the server.")
	      ;; need to delete only *my* server entry (future use)
	      (set! *time-to-exit* #t)
	      (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name))
	      (thread-sleep! 1)
	      (debug:print-info 0 "Max cached queries was " *max-cache-size*)
	      (debug:print-info 0 "Server shutdown complete. Exiting")
	      (exit)))))))

;; all routes though here end in exit ...
;;
(define (nmsg-transport:launch run-id)
  (let* ((tdbdat   (tasks:open-db))
	 (dbstruct (db:setup run-id))
	 (hostn    (or (args:get-arg "-server") "-")))







|


|



|
>
>


















|



|



<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109


























































110
111
112
113
114
115
116
  (debug:print 2 "Attempting to start the server ...")
  (let* ((start-port      (portlogger:open-run-close portlogger:find-port))
	 (server-thread   (make-thread (lambda ()
					 (nmsg-transport:try-start-server dbstruct run-id start-port server-id))
				       "server thread"))
	 (tdbdat          (tasks:open-db)))
    (thread-start! server-thread)
    (if (nmsg-transport:ping hostn start-port timeout: 2 expected-key: (current-process-id))
	(begin
	  (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "dbprep")
	  (set! *server-info* (list hostn start-port)) ;; probably not needed anymore? currently used by keep-running
	  (thread-sleep! 3) ;; give some margin for queries to complete before switching from file based access to server based access
	  (set! *inmemdb*  dbstruct)
	  (tasks:server-set-state! (db:delay-if-busy tdbdat) server-id "running")
	  (thread-start! (make-thread
			  (lambda ()(nmsg-transport:keep-running server-id))
			  "keep running"))
	  (thread-join! server-thread))
	(begin
	  (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id "failed to start, never received server alive signature")
	  (portlogger:open-run-close portlogger:set-failed start-port)
	  (nmsg-transport:run dbstruct hostn run-id server-id)))))

(define (nmsg-transport:try-start-server dbstruct run-id portnum server-id)
  (let ((repsoc (nn-socket 'rep)))
    (nn-bind repsoc (conc "tcp://*:" portnum))
    (let loop ((msg-in (nn-recv repsoc)))
      (cond
       ((equal? msg-in "quit")
	(nn-send repsoc "Ok, quitting"))
       ((and (>= (string-length msg-in) 4)
	     (equal? (substring msg-in 0 4) "ping"))
	(nn-send repsoc (conc (current-process-id)))
	(loop (nn-recv repsoc)))
       (else
	(let* ((dat    (db:string->obj msg-in transport: 'nmsg))
	       (cmd    (vector-ref dat 0))
	       (params (vector-ref dat 1))
	       (result (api:execute-requests dbstruct cmd params))
	       (newdat (db:obj->string result transport: 'nmsg)))
	  (nn-send repsoc newdat)
	  (loop (nn-recv repsoc))))))))




























































;; all routes though here end in exit ...
;;
(define (nmsg-transport:launch run-id)
  (let* ((tdbdat   (tasks:open-db))
	 (dbstruct (db:setup run-id))
	 (hostn    (or (args:get-arg "-server") "-")))
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
;;======================================================================

;; ping the server at host:port
;;   return the open socket if successful (return-socket == #t)
;;   expect the key expected-key returned in payload
;;   send our-key or #f as payload
;;
(define (nmsg-transport:ping hostn port #!key (return-socket #t)(expected-key #f)(our-key #f))
  ;; send a random number along with pid and check that we get it back
  (let* ((req     (nn-socket 'req))
	 (host    (if (or (not hostn)
			  (equal? hostn "-")) ;; use localhost
		      (get-host-name)
		      hostn))
	 (success #f)
	 (keepwaiting #t)
	 (dat     (db:obj->string (vector "ping" our-key) transport: 'nm))
	 (ping    (make-thread
		   (lambda ()
		     (nn-send req dat)
		     (let* ((result  (nn-recv req))
			    (key     (vector-ref (db:string->obj result transport: 'nm) 1)))
		       (if (or (not expect-key) ;; just getting a reply is good enough then
			       (equal? (conc (current-process-id)) expected-key)) 
			   (begin
			     ;; (print "ping, success: received \"" result "\"")
			     (set! success #t))
			   (begin
			     ;; (print "ping, failed: received key \"" result "\"")
			     (set! keepwaiting #f)
			     (set! success #f)))))
		   "ping"))
	 (timeout (make-thread (lambda ()
				 (let loop ((count 0))
				   (thread-sleep! 1)
				   (print "still waiting after count seconds...")
				   (if (and keepwaiting (< count 10))
				       (loop (+ count 1))))
				 (if keepwaiting
				     (begin
				       (print "timeout waiting for ping")
				       (thread-terminate! ping))))
			       "timeout")))
    (nn-connect req (conc "tcp://" host ":" port))
    (handle-exceptions
     exn
     (begin
       (print-call-chain)
       (print 0 " message: " ((condition-property-accessor 'exn 'message) exn))
       (print "exn=" (condition->list exn))
       (print "ping failed to connect to " host ":" port))
     (thread-start! timeout)
     (thread-start! ping)
     (thread-join! ping)
     (if success (thread-terminate! timeout)))
    (if return-socket
	(if success req #f)
	(begin







|








|




|
|
|












|










|
|
|
|







165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
;;======================================================================

;; ping the server at host:port
;;   return the open socket if successful (return-socket == #t)
;;   expect the key expected-key returned in payload
;;   send our-key or #f as payload
;;
(define (nmsg-transport:ping hostn port #!key (timeout 3)(return-socket #t)(expected-key #f)(our-key #f))
  ;; send a random number along with pid and check that we get it back
  (let* ((req     (nn-socket 'req))
	 (host    (if (or (not hostn)
			  (equal? hostn "-")) ;; use localhost
		      (get-host-name)
		      hostn))
	 (success #f)
	 (keepwaiting #t)
	 (dat     (db:obj->string (vector "ping" our-key) transport: 'nmsg))
	 (ping    (make-thread
		   (lambda ()
		     (nn-send req dat)
		     (let* ((result  (nn-recv req))
			    (key     (vector-ref (db:string->obj result transport: 'nmsg) 1)))
		       (if (or (not expected-key) ;; just getting a reply is good enough then
			       (equal? key expected-key)) 
			   (begin
			     ;; (print "ping, success: received \"" result "\"")
			     (set! success #t))
			   (begin
			     ;; (print "ping, failed: received key \"" result "\"")
			     (set! keepwaiting #f)
			     (set! success #f)))))
		   "ping"))
	 (timeout (make-thread (lambda ()
				 (let loop ((count 0))
				   (thread-sleep! 1)
				   (print "still waiting after count seconds...")
				   (if (and keepwaiting (< count timeout)) ;; yes, this is very aproximate
				       (loop (+ count 1))))
				 (if keepwaiting
				     (begin
				       (print "timeout waiting for ping")
				       (thread-terminate! ping))))
			       "timeout")))
    (nn-connect req (conc "tcp://" host ":" port))
    (handle-exceptions
     exn
     (begin
       ;; (print-call-chain)
       ;; (print 0 " message: " ((condition-property-accessor 'exn 'message) exn))
       ;; (print "exn=" (condition->list exn))
       (debug:print-info 1 "ping failed to connect to " host ":" port))
     (thread-start! timeout)
     (thread-start! ping)
     (thread-join! ping)
     (if success (thread-terminate! timeout)))
    (if return-socket
	(if success req #f)
	(begin
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312



313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353

354
355
356
357
358
359
360
361
	  (debug:print-info 2 "Failed to login or connect to " conurl)
	  (set! *runremote* #f)
	  #f))))

;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (nmsg-transport:keep-running)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
                        (let ((sdat #f))
                          (mutex-lock! *heartbeat-mutex*)
                          (set! sdat *runremote*)
                          (mutex-unlock! *heartbeat-mutex*)
                          (if sdat sdat
                              (begin



                                (sleep 4)
                                (loop))))))
         (iface       (car server-info))
         (port        (cadr server-info))
         (last-access 0)
	 (tdb         (tasks:open-db))
	 (spid        (tasks:server-get-server-id tdb #f iface port #f)))
    (print "Keep-running got server pid " spid ", using iface " iface " and port " port)
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      ;; NB// sync currently does NOT return queue-length
      (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1)))
      ;; (print "Server running, count is " count)
        (if (< count 1) ;; 3x3 = 9 secs aprox
            (loop (+ count 1)))
        
        ;; NOTE: Get rid of this mechanism! It really is not needed...
        (tasks:server-update-heartbeat tdb spid)
      
        ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
        (mutex-lock! *heartbeat-mutex*)
        (set! last-access *last-db-access*)
        (mutex-unlock! *heartbeat-mutex*)
        (if (> (+ last-access
                  ;; (* 50 60 60)    ;; 48 hrs
                  ;; 60              ;; one minute
                  ;; (* 60 60)       ;; one hour
                  (* 45 60)          ;; 45 minutes, until the db deletion bug is fixed.
                  )
               (current-seconds))
            (begin
              (debug:print-info 2 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
              (loop 0))
            (begin
              (debug:print-info 0 "Starting to shutdown the server.")
              ;; need to delete only *my* server entry (future use)
              (set! *time-to-exit* #t)
              (tasks:server-deregister-self tdb (get-host-name))
              (thread-sleep! 1)
              (debug:print-info 0 "Max cached queries was " *max-cache-size*)
              (debug:print-info 0 "Server shutdown complete. Exiting")

              (exit)))))))


(define (nmsg-transport:client-signal-handler signum)
  (handle-exceptions
   exn
   (debug:print " ... exiting ...")
   (let ((th1 (make-thread (lambda ()







|






|

|
|
>
>
>
|




|
<
|








<
<
<












|



<

|
<
<

>
|







239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265

266
267
268
269
270
271
272
273
274



275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290

291
292


293
294
295
296
297
298
299
300
301
302
	  (debug:print-info 2 "Failed to login or connect to " conurl)
	  (set! *runremote* #f)
	  #f))))

;; run nmsg-transport:keep-running in a parallel thread to monitor that the db is being 
;; used and to shutdown after sometime if it is not.
;;
(define (nmsg-transport:keep-running server-id)
  ;; if none running or if > 20 seconds since 
  ;; server last used then start shutdown
  ;; This thread waits for the server to come alive
  (let* ((server-info (let loop ()
                        (let ((sdat #f))
                          (mutex-lock! *heartbeat-mutex*)
                          (set! sdat *server-info*)
                          (mutex-unlock! *heartbeat-mutex*)
                          (if sdat 
			      (begin
				(debug:print-info 0 "keep-running got sdat=" sdat)
				sdat)
                              (begin
                                (thread-sleep! 0.5)
                                (loop))))))
         (iface       (car server-info))
         (port        (cadr server-info))
         (last-access 0)
	 (tdbdat      (tasks:open-db)))

    (print "Keep-running got server pid " server-id ", using iface " iface " and port " port)
    (let loop ((count 0))
      (thread-sleep! 4) ;; no need to do this very often
      ;; NB// sync currently does NOT return queue-length
      (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1)))
      ;; (print "Server running, count is " count)
        (if (< count 1) ;; 3x3 = 9 secs aprox
            (loop (+ count 1)))
        



        ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
        (mutex-lock! *heartbeat-mutex*)
        (set! last-access *last-db-access*)
        (mutex-unlock! *heartbeat-mutex*)
        (if (> (+ last-access
                  ;; (* 50 60 60)    ;; 48 hrs
                  ;; 60              ;; one minute
                  ;; (* 60 60)       ;; one hour
                  (* 45 60)          ;; 45 minutes, until the db deletion bug is fixed.
                  )
               (current-seconds))
            (begin
              (debug:print-info 0 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
              (loop 0))
            (begin
              (debug:print-info 0 "Starting to shutdown the server.")

              (set! *time-to-exit* #t)
              (tasks:server-delete-record (db:delay-if-busy tdbdat) server-id " http-transport:keep-running")


              (debug:print-info 0 "Server shutdown complete. Exiting")
              ;; (exit)
	      ))))))


(define (nmsg-transport:client-signal-handler signum)
  (handle-exceptions
   exn
   (debug:print " ... exiting ...")
   (let ((th1 (make-thread (lambda ()

Modified rmt.scm from [a7d45cc611] to [db091f3ce4].

84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
  (let* ((run-id          (if rid rid 0))
	 (connection-info (rmt:get-connection-info run-id))
	 (jparams         (db:obj->string params)))
    (if connection-info
	;; use the server if have connection info
	(let* ((dat     (case *transport-type*
			  ((http)(http-transport:client-api-send-receive run-id connection-info cmd jparams))
			  ((nm)  (nm-transport:client-api-send-receive   run-id connection-info cmd jparams))
			  (else  (exit))))
	       (res     (if (vector? dat) (vector-ref dat 1) #f))
	       (success (if (vector? dat) (vector-ref dat 0) #f)))
	  (http-transport:server-dat-update-last-access connection-info)
	  (if success
	      (db:string->obj res)
	      (begin ;; let ((new-connection-info (client:setup run-id)))







|







84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
  (let* ((run-id          (if rid rid 0))
	 (connection-info (rmt:get-connection-info run-id))
	 (jparams         (db:obj->string params)))
    (if connection-info
	;; use the server if have connection info
	(let* ((dat     (case *transport-type*
			  ((http)(http-transport:client-api-send-receive run-id connection-info cmd jparams))
			  ((nmsg)(nmsg-transport:client-api-send-receive   run-id connection-info cmd jparams))
			  (else  (exit))))
	       (res     (if (vector? dat) (vector-ref dat 1) #f))
	       (success (if (vector? dat) (vector-ref dat 0) #f)))
	  (http-transport:server-dat-update-last-access connection-info)
	  (if success
	      (db:string->obj res)
	      (begin ;; let ((new-connection-info (client:setup run-id)))