Megatest

Diff
Login

Differences From Artifact [ee9b75177b]:

To Artifact [57ed21c7fb]:


72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
	 ;;        	      hostn))
	 (db              #f) ;;        (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily
	 (hostname        (get-host-name))
	 (ipaddrstr       (let ((ipstr (if (string=? "-" hostn)
					   ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					   (server:get-best-guess-address hostname)
					   #f)))
			    (if ipstr ipstr hostn))) ;; hostname)))
	 (start-port    (if (and (args:get-arg "-port")
				 (string->number (args:get-arg "-port")))
			    (string->number (args:get-arg "-port"))
			    (if (and (config-lookup  *configdat* "server" "port")
				     (string->number (config-lookup  *configdat* "server" "port")))
				(string->number (config-lookup  *configdat* "server" "port"))
				(+ 5000 (random 1001)))))
	 (link-tree-path (config-lookup *configdat* "setup" "linktree")))
    (set! *cache-on* #t)
    (root-path     (if link-tree-path 
		       link-tree-path
		       (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP!
    (handle-directory spiffy-directory-listing)
    ;; http-transport:handle-directory) ;; simple-directory-handler)
    ;; Setup the web server and a /ctrl interface
    ;;
    (vhost-map `(((* any) . ,(lambda (continue)
			       ;; open the db on the first call 
			       (if (not db)(set! db (open-db)))
			       (let* (($   (request-vars source: 'both))
				      (dat ($ 'dat))
				      (res #f))
				 (cond
				  ((equal? (uri-path (request-uri (current-request)))
					   '(/ "api"))
				   (send-response body:    (api:process-request db $) ;; the $ is the request vars proc







|








|









|







72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
	 ;;        	      hostn))
	 (db              #f) ;;        (open-db)) ;; we don't want the server to be opening and closing the db unnecesarily
	 (hostname        (get-host-name))
	 (ipaddrstr       (let ((ipstr (if (string=? "-" hostn)
					   ;; (string-intersperse (map number->string (u8vector->list (hostname->ip hostname))) ".")
					   (server:get-best-guess-address hostname)
					   #f)))
			    (if ipstr ipstr hostn))) ;; hostname))) 
	 (start-port    (if (and (args:get-arg "-port")
				 (string->number (args:get-arg "-port")))
			    (string->number (args:get-arg "-port"))
			    (if (and (config-lookup  *configdat* "server" "port")
				     (string->number (config-lookup  *configdat* "server" "port")))
				(string->number (config-lookup  *configdat* "server" "port"))
				(+ 5000 (random 1001)))))
	 (link-tree-path (config-lookup *configdat* "setup" "linktree")))
    (set! db *inmemdb*)
    (root-path     (if link-tree-path 
		       link-tree-path
		       (current-directory))) ;; WARNING: SECURITY HOLE. FIX ASAP!
    (handle-directory spiffy-directory-listing)
    ;; http-transport:handle-directory) ;; simple-directory-handler)
    ;; Setup the web server and a /ctrl interface
    ;;
    (vhost-map `(((* any) . ,(lambda (continue)
			       ;; open the db on the first call 
				 ;; This is were we set up the database connections
			       (let* (($   (request-vars source: 'both))
				      (dat ($ 'dat))
				      (res #f))
				 (cond
				  ((equal? (uri-path (request-uri (current-request)))
					   '(/ "api"))
				   (send-response body:    (api:process-request db $) ;; the $ is the request vars proc
182
183
184
185
186
187
188












































189
190
191
192
193
194
195
;; C L I E N T S
;;======================================================================

(define *http-mutex* (make-mutex))
(define *http-requests-in-progress* 0)
(define *http-connections-next-cleanup* (current-seconds))













































(define (http-transport:get-time-to-cleanup)
  (let ((res #f))
    (mutex-lock! *http-mutex*)
    (set! res (> (current-seconds) *http-connections-next-cleanup*))
    (mutex-unlock! *http-mutex*)
    res))








>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
;; C L I E N T S
;;======================================================================

(define *http-mutex* (make-mutex))
(define *http-requests-in-progress* 0)
(define *http-connections-next-cleanup* (current-seconds))

(define (http-transport:get-time-to-cleanup)
  (let ((res #f))
    (mutex-lock! *http-mutex*)
    (set! res (> (current-seconds) *http-connections-next-cleanup*))
    (mutex-unlock! *http-mutex*)
    res))

(define (http-transport:inc-requests-count)
  (mutex-lock! *http-mutex*)
  (set! *http-requests-in-progress* (+ 1 *http-requests-in-progress*))
  ;; Use this opportunity to slow things down iff there are too many requests in flight
  (if (> *http-requests-in-progress* 5)
      (begin
	(debug:print-info 0 "Whoa there buddy, ease up...")
	(thread-sleep! 1)))
  (mutex-unlock! *http-mutex*))

(define (http-transport:dec-requests-count proc) 
  (mutex-lock! *http-mutex*)
  (proc)
  (set! *http-requests-in-progress* (- *http-requests-in-progress* 1))
  (mutex-unlock! *http-mutex*))

(define (http-transport:dec-requests-count-and-close-all-connections)
  (set! *http-requests-in-progress* (- *http-requests-in-progress* 1))
  (let loop ((etime (+ (current-seconds) 5))) ;; give up in five seconds
    (if (> *http-requests-in-progress* 0)
	(if (> etime (current-seconds))
	    (begin
	      (thread-sleep! 0.05)
	      (loop etime))
	    (debug:print 0 "ERROR: requests still in progress after 5 seconds of waiting. I'm going to pass on cleaning up http connections"))
	(close-all-connections!)))
  (set! *http-connections-next-cleanup* (+ (current-seconds) 10))
  (mutex-unlock! *http-mutex*))

(define (http-transport:inc-requests-and-prep-to-close-all-connections)
  (mutex-lock! *http-mutex*)
  (set! *http-requests-in-progress* (+ 1 *http-requests-in-progress*)))

;; This next block all imported en-mass from the api branch
(define *http-requests-in-progress* 0)
(define *http-connections-next-cleanup* (current-seconds))

(define (http-transport:get-time-to-cleanup)
  (let ((res #f))
    (mutex-lock! *http-mutex*)
    (set! res (> (current-seconds) *http-connections-next-cleanup*))
    (mutex-unlock! *http-mutex*)
    res))

388
389
390
391
392
393
394

395
396
397
398
399
400
401
402
403
	 res)))))

(define (http-transport:client-connect iface port)
  (let* ((login-res   #f)
	 (uri-dat     (make-request method: 'POST uri: (uri-reference (conc "http://" iface ":" port "/ctrl"))))
	 (uri-api-dat (make-request method: 'POST uri: (uri-reference (conc "http://" iface ":" port "/api"))))
	 (serverdat   (list iface port uri-dat uri-api-dat)))

    (set! login-res (client:login serverdat))
    (if (and (not (null? login-res))
	     (car login-res))
	(begin
	  (debug:print-info 2 "Logged in and connected to " iface ":" port)
	  (set! *runremote* serverdat)
	  serverdat)
	(begin
	  (debug:print-info 0 "ERROR: Failed to login or connect to " iface ":" port)







>
|
|







432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
	 res)))))

(define (http-transport:client-connect iface port)
  (let* ((login-res   #f)
	 (uri-dat     (make-request method: 'POST uri: (uri-reference (conc "http://" iface ":" port "/ctrl"))))
	 (uri-api-dat (make-request method: 'POST uri: (uri-reference (conc "http://" iface ":" port "/api"))))
	 (serverdat   (list iface port uri-dat uri-api-dat)))
    (set! *runremote* serverdat) ;; may or may not be good ...
    (set! login-res (rmt:login))
    (if (and (list? login-res)
	     (car login-res))
	(begin
	  (debug:print-info 2 "Logged in and connected to " iface ":" port)
	  (set! *runremote* serverdat)
	  serverdat)
	(begin
	  (debug:print-info 0 "ERROR: Failed to login or connect to " iface ":" port)
434
435
436
437
438
439
440












441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479

480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
			   (if (and (string? tmo)
				    (string->number tmo))
			       (* 60 60 (string->number tmo))
			       ;; default to three days
			       (* 3 24 60 60)))))
    (debug:print-info 2 "server-timeout: " server-timeout ", server pid: " spid " on " iface ":" port)
    (let loop ((count 0))












      (thread-sleep! 4) ;; no need to do this very often
      ;; NB// sync currently does NOT return queue-length
      (let () ;; (queue-len (cdb:client-call server-info 'sync #t 1)))
      ;; (print "Server running, count is " count)
        (if (< count 1) ;; 3x3 = 9 secs aprox
            (loop (+ count 1)))
        
	;; Check that iface and port have not changed (can happen if server port collides)
	(mutex-lock! *heartbeat-mutex*)
	(set! sdat *runremote*)
	(mutex-unlock! *heartbeat-mutex*)

	(if (or (not (equal? sdat (list iface port)))
		(not spid))
	    (begin 
	      (debug:print-info 0 "interface changed, refreshing iface and port info")
	      (set! iface (car sdat))
	      (set! port  (cadr sdat))
	      (set! spid  (tasks:server-get-server-id tdb #f iface port #f))))

        ;; NOTE: Get rid of this mechanism! It really is not needed...
        ;; (open-run-close tasks:server-update-heartbeat tasks:open-db spid)
        (tasks:server-update-heartbeat tdb spid)
      
        ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
        (mutex-lock! *heartbeat-mutex*)
        (set! last-access *last-db-access*)
        (mutex-unlock! *heartbeat-mutex*)
	;; (debug:print 11 "last-access=" last-access ", server-timeout=" server-timeout)
        (if (and *server-run*
		 (> (+ last-access server-timeout)
		    (current-seconds)))
            (begin
              (debug:print-info 0 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
              (loop 0))
            (begin
              (debug:print-info 0 "Starting to shutdown the server.")
              ;; need to delete only *my* server entry (future use)
              (set! *time-to-exit* #t)

              (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name))
              (thread-sleep! 1)
              (debug:print-info 0 "Max cached queries was    " *max-cache-size*)
	      (debug:print-info 0 "Number of cached writes   " *number-of-writes*)
	      (debug:print-info 0 "Average cached write time "
				(if (eq? *number-of-writes* 0)
				    "n/a (no writes)"
				    (/ *writes-total-delay*
				       *number-of-writes*))
				" ms")
	      (debug:print-info 0 "Number non-cached queries "  *number-non-write-queries*)
	      (debug:print-info 0 "Average non-cached time   "
				(if (eq? *number-non-write-queries* 0)
				    "n/a (no queries)"
				    (/ *total-non-write-delay* 
				       *number-non-write-queries*))
				" ms")
              (debug:print-info 0 "Server shutdown complete. Exiting")
              (exit)))))))

;; all routes though here end in exit ...
(define (http-transport:launch)
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, exiting")







>
>
>
>
>
>
>
>
>
>
>
>
|
|
<
<
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|







479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499


500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
			   (if (and (string? tmo)
				    (string->number tmo))
			       (* 60 60 (string->number tmo))
			       ;; default to three days
			       (* 3 24 60 60)))))
    (debug:print-info 2 "server-timeout: " server-timeout ", server pid: " spid " on " iface ":" port)
    (let loop ((count 0))
      ;; Use this opportunity to sync the inmemdb to db
      (let ((start-time (current-milliseconds))
	    (sync-time  #f)
	    (rem-time   #f))
	(if *inmemdb* (db:sync-tables (db:tbls *inmemdb*) *inmemdb* *db*)) ;; (db:sync-to *inmemdb* *db*))
	(set! sync-time  (- (current-milliseconds) start-time))
	(debug:print 0 "SYNC: time= " sync-time)
	(set! rem-time (quotient (- 4000 sync-time) 1000))
	(if (and (< rem-time 4)
		 (> rem-time 0))
	    (thread-sleep! rem-time)))

      ;; (thread-sleep! 4) ;; no need to do this very often



      (if (< count 1) ;; 3x3 = 9 secs aprox
	  (loop (+ count 1)))
      
      ;; Check that iface and port have not changed (can happen if server port collides)
      (mutex-lock! *heartbeat-mutex*)
      (set! sdat *runremote*)
      (mutex-unlock! *heartbeat-mutex*)
      
      (if (or (not (equal? sdat (list iface port)))
	      (not spid))
	  (begin 
	    (debug:print-info 0 "interface changed, refreshing iface and port info")
	    (set! iface (car sdat))
	    (set! port  (cadr sdat))
	    (set! spid  (tasks:server-get-server-id tdb #f iface port #f))))
      
      ;; NOTE: Get rid of this mechanism! It really is not needed...
      ;; (open-run-close tasks:server-update-heartbeat tasks:open-db spid)
      (tasks:server-update-heartbeat tdb spid)
      
      ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access
      (mutex-lock! *heartbeat-mutex*)
      (set! last-access *last-db-access*)
      (mutex-unlock! *heartbeat-mutex*)
      ;; (debug:print 11 "last-access=" last-access ", server-timeout=" server-timeout)
      (if (and *server-run*
	       (> (+ last-access server-timeout)
		  (current-seconds)))
	  (begin
	    (debug:print-info 0 "Server continuing, seconds since last db access: " (- (current-seconds) last-access))
	    (loop 0))
	  (begin
	    (debug:print-info 0 "Starting to shutdown the server.")
	    ;; need to delete only *my* server entry (future use)
	    (set! *time-to-exit* #t)
	    (if *inmemdb* (db:sync-tables (db:tbls *inmemdb*) *inmemdb* *db*)) ;; (db:sync-to *inmemdb* *db*))
	    (open-run-close tasks:server-deregister-self tasks:open-db (get-host-name))
	    (thread-sleep! 1)
	    (debug:print-info 0 "Max cached queries was    " *max-cache-size*)
	    (debug:print-info 0 "Number of cached writes   " *number-of-writes*)
	    (debug:print-info 0 "Average cached write time "
			      (if (eq? *number-of-writes* 0)
				  "n/a (no writes)"
				  (/ *writes-total-delay*
				     *number-of-writes*))
			      " ms")
	    (debug:print-info 0 "Number non-cached queries "  *number-non-write-queries*)
	    (debug:print-info 0 "Average non-cached time   "
			      (if (eq? *number-non-write-queries* 0)
				  "n/a (no queries)"
				  (/ *total-non-write-delay* 
				     *number-non-write-queries*))
			      " ms")
	    (debug:print-info 0 "Server shutdown complete. Exiting")
	    (exit))))))

;; all routes though here end in exit ...
(define (http-transport:launch)
  (if (not *toppath*)
      (if (not (setup-for-run))
	  (begin
	    (debug:print 0 "ERROR: cannot find megatest.config, exiting")
514
515
516
517
518
519
520
521
522





523
524
525
526
527
528
529
530
531
532
	(debug:print-info 2 "NOT starting new server, one is already running on " (vector-ref hostinfo 1) ":" (vector-ref hostinfo 2))
	(if *toppath* 
	    (let* ((th2 (make-thread (lambda ()
				       (http-transport:run 
					(if (args:get-arg "-server")
					    (args:get-arg "-server")
					    "-"))) "Server run"))
		   (th3 (make-thread http-transport:keep-running "Keep running"))
		   (th1 (make-thread server:write-queue-handler  "write queue")))





	      (thread-start! th2)
	      (thread-start! th3)
	      (thread-start! th1)
	      (set! *didsomething* #t)
	      (thread-join! th2))
	    (debug:print 0 "ERROR: Failed to setup for megatest")))
    (exit)))

;; (use trace)
;; (trace http-transport:keep-running 







|
|
>
>
>
>
>


|







570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
	(debug:print-info 2 "NOT starting new server, one is already running on " (vector-ref hostinfo 1) ":" (vector-ref hostinfo 2))
	(if *toppath* 
	    (let* ((th2 (make-thread (lambda ()
				       (http-transport:run 
					(if (args:get-arg "-server")
					    (args:get-arg "-server")
					    "-"))) "Server run"))
		   (th3 (make-thread http-transport:keep-running "Keep running")))
;;		   (th1 (make-thread server:write-queue-handler  "write queue")))
	      (set! *cache-on* #t)
	      (set! *db*       (open-db))
	      (set! *inmemdb*  (open-in-mem-db))
	      (db:sync-tables (db:tbls *db*) *db* *inmemdb*) ;; (db:sync-to *db* *inmemdb*)

	      (thread-start! th2)
	      (thread-start! th3)
	      ;; (thread-start! th1)
	      (set! *didsomething* #t)
	      (thread-join! th2))
	    (debug:print 0 "ERROR: Failed to setup for megatest")))
    (exit)))

;; (use trace)
;; (trace http-transport:keep-running