Megatest

server.scm at [adbeb66c05]
Login

File server.scm artifact 9fd201bbc1 part of check-in adbeb66c05



;; Copyright 2006-2017, Matthew Welland.
;; 
;; This file is part of Megatest.
;; 
;;     Megatest is free software: you can redistribute it and/or modify
;;     it under the terms of the GNU General Public License as published by
;;     the Free Software Foundation, either version 3 of the License, or
;;     (at your option) any later version.
;; 
;;     Megatest is distributed in the hope that it will be useful,
;;     but WITHOUT ANY WARRANTY; without even the implied warranty of
;;     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
;;     GNU General Public License for more details.
;; 
;;     You should have received a copy of the GNU General Public License
;;     along with Megatest.  If not, see <http://www.gnu.org/licenses/>.
;;

;;======================================================================
;;
;; This is the Megatest specific stuff for starting and maintaining a
;; server. Anything that talks to the server should go in client.scm (maybe - might get rid of client.scm)
;; General nanomsg stuff (not Megatest specific) should go in the
;; nmsg-transport.scm file.
;;
;;======================================================================

(require-extension (srfi 18) extras tcp s11n)

(use srfi-1 posix regex regex-case srfi-69 hostinfo md5 message-digest
     directory-utils posix-extras matchable typed-records
     pkts)

(use spiffy uri-common intarweb http-client spiffy-request-vars)

(declare (unit server))

(declare (uses common))

(declare (uses db))
(import db)

;; Basic stuff for safely kicking off a server
(declare (uses portlogger))
(import portlogger)

(declare (uses nmsg-transport))
(import nmsg-transport)


;; Might want to bring the daemonizing back
;; (declare (uses daemon))

(include "common_records.scm")
(include "db_records.scm")

;;======================================================================
;; P K T S   S T U F F 
;;======================================================================

;;======================================================================
;;  N A N O M S G   B A S E D   S E R V E R
;;======================================================================

;; information about me as a server
;;
(defstruct area
  (conn  #f)
  (port  #f)
  (myaddr #f)
  (hosts (make-hash-table))
  pktid  ;; get pkt from hosts table if needed
  pktfile
  pktsdir
  mtrah
  (mutex    (make-mutex))
  )

;; make it a global? Well, it is local to area module

(define *area-info* (make-area))
(define *pktspec*
  `((server (hostname . h)
	    (port     . p)
	    (pid      . i)
	    (ipaddr   . a)
	    )
    (data   (hostname . h)  ;; sender hostname
	    (port     . p)  ;; sender port
	    (ipaddr   . a)  ;; sender ip
	    (hostkey  . k)  ;; sending host key - store info at server under this key
	    (servkey  . s)  ;; server key - this needs to match at server end or reject the msg
	    (format   . f)  ;; sb=serialized-base64, t=text, sx=sexpr, j=json
	    (data     . d)  ;; base64 encoded slln data
	    )))

(define (server:get-mtrah)
  (or (get-environment-variable "MT_RUN_AREA_HOME")
      (if (file-exists? "megatest.config")
	  (current-directory)
	  #f)))

;; get a port
;; start the nmsg server
;; look for other servers
;; contact other servers and compile list of servers
;; there are two types of server
;;     main servers - dashboards, runners and dedicated servers - need pkt
;;     passive servers - test executers, step calls, list-runs - no pkt
;;
(define (server:start-nmsg #!optional (force-server-type #f))
  (mutex-lock! (area-mutex *area-info*))
  (let* ((server-type  (or force-server-type
			   (if (args:any? "-run" "-server")
			       'main
			       'passive)))
	 (port-num     (portlogger:open-run-close portlogger:find-port))
	 (best-ip      (server:get-my-best-address))
	 (area-conn    (nmsg:start-server port-num))
	 ;; (pktspec      (area-pktspec *area-info*))
	 (mtdir        (or (server:get-mtrah)
			   (begin
			     (print "ERROR: megatest.config not found and MT_RUN_AREA_HOME is not set.")
			     #f)))
	 (pktdir       (conc mtdir
			     "/.server-pkts")))
    (if (not mtdir)
	#f
	(begin
	  (if  (not (directory? pktdir))(create-directory pktdir))
	  ;; server is started, now create pkt if needed
	  (print "Starting server in " server-type " mode")
	  (if (eq? server-type 'main)
	      (begin
		(area-pktid-set! *area-info* 
				 (write-alist->pkt
				  pktdir 
				  `((hostname . ,(get-host-name))
				    (ipaddr   . ,best-ip)
				    (port     . ,port-num)
				    (pid      . ,(current-process-id)))
				  pktspec: *pktspec*
				  ptype:   'server))
		(area-pktfile-set! *area-info* (conc pktdir "/" (area-pktid *area-info*) ".pkt"))))
	  ;; set all the area info in the 
	  (area-pktsdir-set! *area-info* pktdir)
	  (area-mtrah-set!   *area-info* mtdir)
	  (area-conn-set!    *area-info* area-conn)
	  (area-port-set!    *area-info* port-num)
	  (mutex-unlock! (area-mutex *area-info*))
	  area-conn))))

;; action:
;;   immediate - quick actions, no need to put in queues
;;   dbwrite   - put in dbwrite queue
;;   dbread    - put in dbread queue
;;   oslong    - os actions, e.g. du, that could take a long time
;;   osshort   - os actions that should be quick, e.g. df
;;
(define (server:std-handler dat)
  (let* ((from-host (alist-ref 'hostname dat))
	 (from-port (alist-ref 'port     dat))
	 (servkey   (alist-ref 'servkey  dat))
	 (hostkey   (alist-ref 'hostkey  dat))
	 (data      (alist-ref 'data     dat))
	 (action    (alist-ref 'action   dat)))
    ;; first, if you don't know who I am then I'm ignoring you
    (if (not (equal? servkey (area-pktid *area-info*)))
	`(#f . "I don't know you") ;; immediately return this
	(case action               ;; else carry on
	  ((immediate)
	   (case data
	     ((ping)	`(#t  "success"))
	     (else      `(#t  "I didn't recognise " data))))
	  ((dbwrite)    `(#t  "db write submitted"))
	  ((dbread)     `(#t  "db read submitted"))
	  ((oslong)     `(#t  "os long submitted"))
	  ((dbwrite)    `(#t  "os short submitted"))
	  (else         `(#f  "unrecognised action" action))))))

;; Call this to start the actual server
;;
;; start_server
;;
;;   mode: '
;;   handler: proc which takes pktrecieved as argument
;;
(define (server:launch mode #!optional (proc server:std-handler))
  (let* ((start-time    (current-seconds))
	 (rep           (server:start-nmsg mode))
	 (last-msg-time (current-seconds))
	 (th1           (make-thread
			 (lambda ()
			   (let loop ()
			     (let ((dat (server:receive rep)))
			       (set! last-msg-time (current-seconds))
			       ;; (print "received: " pktdat)
			       (if (not (eof-object? dat))
				   (let ((resdat (proc dat)))
				     (print "Got " dat)
				     (print "Responding with " resdat)
				     (nmsg:send rep (sexpr->string resdat)) ;; (with-output-to-string (lambda ()(write resdat))))
				     (loop))))))
			 "message handler"))
	 (th2           (make-thread
			 (lambda ()
			   (let loop ()
			     (thread-sleep! 10)
			     (if (> (- (current-seconds) last-msg-time) 60) ;; timeout after 60 seconds
				 (begin
				   (print "Waited for 60 seconds and no messages, exiting now.")
				   (exit))
				 (loop)))))))
    (thread-start! th1)
    (thread-start! th2)
    (thread-join! th1)))

;; get the response
;;
(define (server:receive rep)
  (let ((instr (nmsg:recv rep)))
    (if (string? instr)
	(string->sexpr instr) ;; (with-input-from-string instr read)
	instr)))

(define (server:shutdown)
  (let ((conn (area-conn    *area-info*))
	(pktf (area-pktfile *area-info*))
	(port (area-port    *area-info*)))
    (if conn
	(begin
	  (if pktf (delete-file* pktf))
	  (server:send-all "imshuttingdown")
	  (nmsg:close conn)
	  (portlogger:open-run-close portlogger:release-port port)))))

(define (server:send-all msg)
  #f)

;; given a area record look up all the packets
;;
(define (server:get-all-server-pkts rec)
  (let ((all-pkt-files (glob (conc (area-pktsdir rec) "/*.pkt"))))
;;	(pktspec       (area-pktspec rec)))
    (map (lambda (pkt-file)
	   (read-pkt->alist pkt-file pktspec: *pktspec*))
	 all-pkt-files)))

#;((Z . "9a0212302295a19610d5796fce0370fa130758e9")
  (port . "34827")
  (pid . "28748")
  (hostname . "zeus")
  (T . "server")
  (D . "1549427032.0"))

;; srvpkt is the info for the server we wish to send the message to
;;
(define (server:send servpkt data action)
  (let* ((port   (alist-ref 'port     servpkt))
	 (host   (alist-ref 'hostname servpkt))
	 (ip     (alist-ref 'ipaddr   servpkt))
	 (hkey   (alist-ref 'Z        servpkt))
	 (addr   (conc (or ip host) ":" port)) ;; fall back to host if ip not provided
	 (myport (area-port *area-info*))
	 (myhost (area-myaddr *area-info*))
	 (mykey  (area-pktid  *area-info*))
	 (msg    (with-output-to-string
		   (lambda ()
		     (write `((hostname . ,myhost)
			      (port     . ,myport)
			      (servkey  . ,hkey)     ;; server looks at this to ensure message is for them
			      (hostkey  . ,mykey)
			      (action   . ,action)    ;; formating of the message
			      (data     . ,data))
			    ;; *pktspec*
			    ;; ptype: 'data))
			    )))))
    ;; (print "msg: " msg)
    (if (and port host)
	(string->sexpr ;; begin
	  ;; (print "sending ")(pp msg)(print " to " addr)
	  (nmsg:open-send-receive addr msg))
	#f)))

(define (server:get-my-best-address)
  (ip->string (car (filter (lambda (x)
			     (not (eq? (u8vector-ref x 0) 127)))
			   (vector->list (hostinfo-addresses (hostname->hostinfo "zeus")))))))

;; whoami? I am my pkt
;;
(define (server:whoami? area)
  (hash-table-ref/default (area-hosts area)(area-pktid area) #f))

;;======================================================================
;; "Client side" operations
;;======================================================================

;; convert to/from string / sexpr

(define (string->sexpr str)
  (if (string? str)
      (with-input-from-string str read)
      str))

(define (sexpr->string s)
  (with-output-to-string (lambda ()(write s))))

;; is the server alive?
;;
(define (server:ping servpkt)
  (let* ((start-time (current-milliseconds))
	 (res        (server:send servpkt 'ping 'immediate)))
    (cons (- (current-milliseconds) start-time)
	  res))) ;; (equal? res "got ping"))))

;; look up all pkts and get the server id (the hash), port, host/ip
;; store this info in the global struct *area-info*
;;
;; pass in *area-info* as rec
;;
(define (server:update-known-servers rec)
  ;; readll all pkts
  ;; foreach pkt; if it isn't me ping the server; if alive, add to hosts hash, else rm the pkt
  (let ((all-pkts  (delete-duplicates
		    (append (server:get-all-server-pkts rec)
			    (hash-table-values (area-hosts rec)))))
	(hostshash (area-hosts rec))
	(my-id     (area-pktid rec))
	(pktsdir   (area-pktsdir rec)) ;; needed to remove pkts from non-responsive servers
	)
    (for-each
     (lambda (servpkt)
       (let* ((res (server:ping servpkt))
	      (sid (alist-ref 'Z servpkt)) ;; Z code is our name for the server
	      )
	 (match res
	   ((qduration . payload)
	    (print "Server pkt:")(pp servpkt)
	    (print "res: ")(pp res)
	    (match payload
	      ((code message)
	       (print "code: " code " message: " message)
	       (if code
		   (hash-table-set! hostshash sid servpkt)
		   (print "got #f from the server, not sure what that means!")))
	      (else
	       (print "Got ")(pp res)(print " from server ")(pp servpkt) " but response did not match (#f/#t . msg)")))
	   (else
	    ;; here we delete the pkt - can't reach the server, remove it
	    ;; however this logic is inadequate. we should mark the server as checked
	    ;; and not good, if it happens a second time - then remove the pkt
	    ;; or something similar. I.e. don't be too quick to assume the server is wedged or dead
	    ;; could be it is simply too busy to reply
	    (print "clearing out server " sid)
	    (delete-file* (conc pktsdir "/" sid ".pkt"))
	    (hash-table-delete! hostshash side)))))
     all-pkts)))

;; send out an "I'm about to exit notice to all known servers"
;;
(define (server:imminent-death)
  '())


;;======================================================================
;; S E R V E R   U T I L I T I E S 
;;======================================================================

;; get a signature for identifing this process
(define (server:get-process-signature)
  (cons (get-host-name)(current-process-id)))