@@ -36,14 +36,14 @@ ;; ;; Done Tested ;; [x] [ ] 1. Add columns pullport pubport to servers table ;; [x] [ ] 2. Add rm of monitor.db if older than 11/12/2012 ;; [x] [ ] 3. Add create of pullport and pubport with finding of available ports -;; [ ] [ ] 4. Add client compose of request -;; [ ] [ ] - name of client: testname/itempath-test_id-hostname -;; [ ] [ ] - name of request: callname, params -;; [ ] [ ] - request key: f(clientname, callname, params) +;; [x] [ ] 4. Add client compose of request +;; [x] [ ] - name of client: testname/itempath-test_id-hostname +;; [x] [ ] - name of request: callname, params +;; [x] [ ] - request key: f(clientname, callname, params) ;; [ ] [ ] 5. Add processing of subscription hits ;; [ ] [ ] - done when get key ;; [ ] [ ] - return results ;; [ ] [ ] 6. Add timeout processing ;; [ ] [ ] - after 60 seconds @@ -57,24 +57,24 @@ (conc "tcp://" (car hostport) ":" (cadr hostport)))) (define *server-loop-heart-beat* (current-seconds)) (define *heartbeat-mutex* (make-mutex)) -(define (server:self-ping server-info) - ;; server-info: server-id interface pullport pubport - (let ((iface (list-ref server-info 1)) - (pullport (list-ref server-info 2)) - (pubport (list-ref server-info 3))) - (server:client-connect iface pullport pubport) - (let loop () - (thread-sleep! 2) - (cdb:client-call *runremote* 'ping #t) - (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!") - (mutex-lock! *heartbeat-mutex*) - (set! *server-loop-heart-beat* (current-seconds)) - (mutex-unlock! *heartbeat-mutex*) - (loop)))) +;; (define (server:self-ping server-info) +;; ;; server-info: server-id interface pullport pubport +;; (let ((iface (list-ref server-info 1)) +;; (pullport (list-ref server-info 2)) +;; (pubport (list-ref server-info 3))) +;; (server:client-connect iface pullport pubport) +;; (let loop () +;; (thread-sleep! 2) +;; (cdb:client-call *runremote* 'ping #t) +;; (debug:print 4 "server:self-ping - I'm alive on " iface ":" pullport "/" pubport "!") +;; (mutex-lock! *heartbeat-mutex*) +;; (set! *server-loop-heart-beat* (current-seconds)) +;; (mutex-unlock! *heartbeat-mutex*) +;; (loop)))) (define-inline (zmqsock:get-pub dat)(vector-ref dat 0)) (define-inline (zmqsock:get-pull dat)(vector-ref dat 1)) (define-inline (zmqsock:set-pub! dat s)(vector-set! dat s 0)) (define-inline (zmqsock:set-pull! dat s)(vector-set! dat s 0)) @@ -137,11 +137,11 @@ (loop)))))))) ;; The heavy lifting ;; (let loop () - (print "GOT HERE EH?") + ;; (print "GOT HERE EH?") (let* ((rawmsg (receive-message* pull-socket)) (params (db:string->obj rawmsg)) ;; (with-input-from-string rawmsg (lambda ()(deserialize)))) (res #f)) (debug:print-info 12 "server=> received params=" params) (set! res (cdb:cached-access params)) @@ -176,24 +176,27 @@ (server-loop-heartbeat #f) (server-info #f) (pulse 0)) ;; BUG add a wait on server alive here!! ;; ;; Ugly yuk. - (mutex-lock! *heartbeat-mutex*) - (set! server-loop-heartbeat *server-loop-heart-beat*) - (set! server-info *server-info*) - (mutex-unlock! *heartbeat-mutex*) + ;; == (mutex-lock! *heartbeat-mutex*) + ;; == (set! server-loop-heartbeat *server-loop-heart-beat*) + ;; == (set! server-info *server-info*) + ;; == (mutex-unlock! *heartbeat-mutex*) ;; The logic here is that if the server loop gets stuck blocked in working ;; we don't want to update our heartbeat - (set! pulse (- (current-seconds) server-loop-heartbeat)) - (debug:print-info 2 "Heartbeat period is " pulse " seconds on " (cadr server-info) ":" (caddr server-info) ", last db access is " (- (current-seconds) *last-db-access*) " seconds ago") - (if (> pulse 15) ;; must stay less than 10 seconds - (begin - (open-run-close tasks:server-deregister tasks:open-db (cadr server-info) pullport: (caddr server-info)) - (debug:print 0 "ERROR: Heartbeat failed, committing servercide") - (exit)) - (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info))) + ;; == (set! pulse (- (current-seconds) server-loop-heartbeat)) + ;; == (debug:print-info 2 "Heartbeat period is " pulse " seconds on " (cadr server-info) ":" (caddr server-info) ", last db access is " (- (current-seconds) *last-db-access*) " seconds ago") + ;; == (if (> pulse 15) ;; must stay less than 10 seconds + ;; == (begin + ;; == (open-run-close tasks:server-deregister tasks:open-db (cadr server-info) pullport: (caddr server-info)) + ;; == (debug:print 0 "ERROR: Heartbeat failed, committing servercide") + ;; == (exit)) + + ;; NOTE: Get rid of this mechanism! It really is not needed... + (open-run-close tasks:server-update-heartbeat tasks:open-db (car server-info)) + ;; (if ;; (or (> numrunning 0) ;; stay alive for two days after last access (if (> (+ *last-db-access* ;; (* 48 60 60) ;; 48 hrs ;; 60 ;; one minute (* 60 60) ;; one hour @@ -316,31 +319,31 @@ (let ((host (list-ref hostinfo 0)) (iface (list-ref hostinfo 1)) (pullport (list-ref hostinfo 2)) (pubport (list-ref hostinfo 3))) (debug:print-info 2 "Setting up to connect to " hostinfo) - (handle-exceptions - exn + ;;(handle-exceptions + ;; exn (begin ;; something went wrong in connecting to the server. In this scenario it is ok ;; to try again (debug:print 0 "ERROR: Failed to open a connection to the server at: " hostinfo) (debug:print 0 " EXCEPTION: " ((condition-property-accessor 'exn 'message) exn)) (debug:print 0 " perhaps jobs killed with -9? Removing server records") (open-run-close tasks:server-deregister tasks:open-db host pullport: pullport) (server:client-setup (- numtries 1)) #f) - (server:client-connect iface pullport pubport))) - (if (> numtries 0) - (let ((exe (car (argv)))) - (debug:print-info 1 "No server available, attempting to start one...") - (process-run exe (list "-server" "-" "-debug" (conc *verbosity*))) - (sleep 5) ;; give server time to start - ;; we are starting a server, do not try again! That can lead to - ;; recursively starting many processes!!! - (server:client-setup numtries: 0)) - (debug:print-info 1 "Too many attempts, giving up"))))) + (server:client-connect iface pullport pubport))))) + ;; (if (> numtries 0) + ;; (let ((exe (car (argv)))) + ;; (debug:print-info 1 "No server available, attempting to start one...") + ;; (process-run exe (list "-server" "-" "-debug" (conc *verbosity*))) + ;; (sleep 5) ;; give server time to start + ;; ;; we are starting a server, do not try again! That can lead to + ;; ;; recursively starting many processes!!! + ;; (server:client-setup numtries: 0)) + ;; (debug:print-info 1 "Too many attempts, giving up"))))) ;; all routes though here end in exit ... (define (server:launch) (if (not *toppath*) (if (not (setup-for-run)) @@ -350,30 +353,30 @@ (debug:print-info 1 "Starting the standalone server") (let ((hostinfo (open-run-close tasks:get-best-server tasks:open-db))) (if hostinfo (debug:print-info 1 "NOT starting new server, one is already running on " (car hostinfo) ":" (cadr hostinfo)) (if *toppath* - (let* ((th1 (make-thread (lambda () - (let ((server-info #f)) - ;; wait for the server to be online and available - (let loop () - (debug:print-info 1 "Waiting for the server to come online before starting heartbeat") - (thread-sleep! 2) - (mutex-lock! *heartbeat-mutex*) - (set! server-info *server-info* ) - (mutex-unlock! *heartbeat-mutex*) - (if (not server-info)(loop))) - (debug:print 1 "Server alive, starting self-ping") - ;; (server:self-ping server-info) - )) - "Self ping")) + (let* (;; (th1 (make-thread (lambda () + ;; (let ((server-info #f)) + ;; ;; wait for the server to be online and available + ;; (let loop () + ;; (debug:print-info 1 "Waiting for the server to come online before starting heartbeat") + ;; (thread-sleep! 2) + ;; (mutex-lock! *heartbeat-mutex*) + ;; (set! server-info *server-info* ) + ;; (mutex-unlock! *heartbeat-mutex*) + ;; (if (not server-info)(loop))) + ;; (debug:print 1 "Server alive, starting self-ping") + ;; (server:self-ping server-info) + ;; )) + ;; "Self ping")) (th2 (make-thread (lambda () (server:run (args:get-arg "-server"))) "Server run")) (th3 (make-thread (lambda () (server:keep-running)) "Keep running"))) (set! *client-non-blocking-mode* #t) - (thread-start! th1) + ;; (thread-start! th1) (thread-start! th2) (thread-start! th3) (set! *didsomething* #t) (thread-join! th3)) (debug:print 0 "ERROR: Failed to setup for megatest")))