@@ -1,6 +1,6 @@ -;; Copyright 2006-2017, Matthew Welland. +; Copyright 2006-2017, Matthew Welland. ;; ;; This program is made available under the GNU GPL version 2.0 or ;; greater. See the accompanying file COPYING for details. ;; ;; This program is distributed WITHOUT ANY WARRANTY; without even the @@ -14,11 +14,11 @@ (define (toplevel-command . a) #f) (use srfi-1 posix srfi-69 readline ;; regex regex-case srfi-69 apropos json http-client directory-utils rpc typed-records;; (srfi 18) extras) srfi-18 extras format pkts regex regex-case (prefix dbi dbi:) - (prefix nanomsg nmsg:)) + nanomsg) (declare (uses common)) (declare (uses megatest-version)) (declare (uses margs)) (declare (uses configf)) @@ -120,10 +120,14 @@ gendot : generate a graphviz dot file from pkts. Contour actions: process : runs import, rungen and dispatch +Trigger propagation actions: + tsend a=b,c=d... : send trigger info to all recpients in the [listeners] section + tlisten -port N : listen for trigger info on port N + Selectors -immediate : apply this action immediately, default is to queue up actions -area areapatt1,area2... : apply this action only to the specified areas -target key1/key2/... : run for key1, key2, etc. -test-patt p1/p2,p3/... : % is wildcard @@ -182,10 +186,11 @@ ("-load" . #f) ;; load and exectute a scheme file ("-log" . #f) ("-msg" . M) ("-start-dir" . S) ("-set-vars" . v) + ("-config" . r) )) (define *switch-keys* '( ("-h" . #f) ("-help" . #f) @@ -368,11 +373,11 @@ (if (and (not (null? remargs)) (not (or (args:get-arg "-runstep") (args:get-arg "-envcap") (args:get-arg "-envdelta") - (member *action* '("db")) ;; very loose checks on db. + (member *action* '("db" "tsend" "tlisten")) ;; very loose checks on db and tsend/listen (equal? *action* "show") ;; just keep going if list ))) (debug:print-error 0 *default-log-port* "Unrecognised arguments: " (string-intersperse (if (list? remargs) remargs (argv)) " "))) (if (or (args:any? "-h" "help" "-help" "--help") @@ -380,12 +385,82 @@ (begin (print help) (exit 1))) ;;====================================================================== +;; Nanomsg transport +;;====================================================================== + +(define-inline (encode data) + (with-output-to-string + (lambda () + (write data)))) + +(define-inline (decode data) + (with-input-from-string + data + (lambda () + (read)))) + +(define (is-port-in-use port-num) + (let* ((ret #f)) + (let-values (((inp oup pid) + (process "netstat" (list "-tulpn" )))) + (let loop ((inl (read-line inp))) + (if (not (eof-object? inl)) + (begin + (if (string-search (regexp (conc ":" port-num)) inl) + (begin + ;(print "Output: " inl) + (set! ret #t)) + (loop (read-line inp))))))) +ret)) + +;;start a server, returns the connection +;; +(define (start-nn-server portnum ) + (let ((rep (nn-socket 'rep))) + (handle-exceptions + exn + (let ((emsg ((condition-property-accessor 'exn 'message) exn))) + (print "ERROR: Failed to start server \"" emsg "\"") + (exit 1)) + + (nn-bind rep (conc "tcp://*:" portnum))) + rep)) +;; open connection to server, send message, close connection +;; +(define (open-send-close-nn host-port msg #!key (timeout 3)) ;; default timeout is 3 seconds + (let ((req (nn-socket 'req)) + (uri (conc "tcp://" host-port)) + (res #f)) + (handle-exceptions + exn + (let ((emsg ((condition-property-accessor 'exn 'message) exn))) + (print "ERROR: Failed to connect/send to " uri " message was \"" emsg "\"") + #f) + (nn-connect req uri) + (nn-send req msg) + ;; NEED timer here! + (let* ((th1 (make-thread (lambda () + (let ((resp (nn-recv req))) + (nn-close req) + (set! res (if (equal? resp "ok") + #t + #f)))) + "recv thread")) + (th2 (make-thread (lambda () + (thread-sleep! timeout) + (thread-terminate! th1)) + "timer thread"))) + (thread-start! th1) + (thread-start! th2) + (thread-join! th1) + res)))) +;;====================================================================== ;; Runs ;;====================================================================== ;; make a runname ;; @@ -964,11 +1039,11 @@ (if (and (> cpuload maxload) (member action '("run" "archive"))) ;; do not run archive or run if load is over the specified limit (print "WARNING: cpuload too high, skipping processing of " uuid) (begin (print "RUNNING: " fullcmd) - (system fullcmd) + (system fullcmd) ;; replace with process ... (mark-processed pdb (list (alist-ref 'id pktdat))) (let-values (((ack-uuid ack-pkt) (add-z-card (construct-sdat 'P uuid 'T (case (string->symbol action) @@ -1121,11 +1196,49 @@ (let* ((install-home (common:get-install-area)) (schema-file (conc install-home "/share/db/mt-sqlite3.sql"))) (if (common:file-exists? schema-file) (system (conc "/bin/cat " schema-file))))) ((junk) - (rmt:get-keys)))))))) + (rmt:get-keys)))))) + ((tsend) + (if (null? remargs) + (print "ERROR: missing data to send to trigger listeners") + (let* ((msg (car remargs)) + (mtconfdat (simple-setup (args:get-arg "-start-dir"))) + (mtconf (car mtconfdat)) + (listeners (configf:get-section mtconf "listeners")) + (prev-seen (make-hash-table))) ;; catch duplicates + (for-each + (lambda (listener) + (let ((host-port (car listener)) + (remdat (cdr listener))) + (print "sending " msg " to " host-port) + (open-send-close-nn host-port msg timeout: 2))) + listeners)))) + ((tlisten) + (if (null? remargs) + (print "ERROR: useage for tlisten is \"mtutil tlisten portnum\"") + (let ((portnum (string->number (car remargs)))) + + (if (not portnum) + (print "ERROR: the portnumber parameter must be a number, you gave: " (car remargs)) + (begin + (if (not (is-port-in-use portnum)) + (let* ((rep (start-nn-server portnum)) + (mtconfdat (simple-setup (args:get-arg "-start-dir"))) + (mtconf (car mtconfdat)) + (script (configf:lookup mtconf "listener" "script"))) + (print "Listening on port " portnum " for messages") + (let loop ((instr (nn-recv rep))) + (print "received " instr ", running \"" script " " instr "\"") + (system (conc script " '" instr "'")) + (nn-send rep "ok") + (loop (nn-recv rep)))) + (print "ERROR: Port " portnum " already in use. Try another port"))))))) + + )) ;; the end + ;; If HTTP_HOST is defined then we must be in the cgi environment ;; so run stml and exit ;; (if (get-environment-variable "HTTP_HOST")