Sentinel.java

package emissary.core.sentinel;

import emissary.config.ConfigUtil;
import emissary.config.Configurator;
import emissary.core.IMobileAgent;
import emissary.core.Namespace;
import emissary.core.NamespaceException;
import emissary.core.sentinel.protocols.Protocol;
import emissary.pool.MobileAgentFactory;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * Track mobile agents and take action on suspicious behavior
 */
public class Sentinel implements Runnable {

    protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    public static final String DEFAULT_NAMESPACE_NAME = "Sentinel";

    // key: agent name, value: how long Sentinel has observed the mobile agent
    protected final Map<String, Tracker> trackers = new ConcurrentHashMap<>();

    // protocols contain an action to perform when the set of rule conditions are met
    protected final Set<Protocol> protocols = new LinkedHashSet<>();

    // the default configuration Sentinel.cfg
    protected Configurator config;

    // how many minutes to sleep before checking the mobile agents
    protected long pollingInterval = 5;

    // Loop control
    protected boolean timeToQuit = false;

    // turn on/off sentinel
    protected boolean enabled = false;

    /**
     * Create a Sentinel - set it running and bind into the {@link Namespace}
     */
    @SuppressWarnings("ThreadPriorityCheck")
    public Sentinel() {
        configure();
        if (this.enabled) {
            final Thread thread = new Thread(this, DEFAULT_NAMESPACE_NAME);
            thread.setPriority(Thread.NORM_PRIORITY);
            thread.setDaemon(true);
            Namespace.bind(DEFAULT_NAMESPACE_NAME, this);
            thread.start();
        } else {
            logger.info("Sentinel is disabled");
        }
    }

    /**
     * Start up the Sentinel thread
     */
    public static void start() {
        new Sentinel();
    }

    /**
     * Lookup the default Sentinel in the {@link Namespace}
     *
     * @return The registered Sentinel
     */
    public static Sentinel lookup() throws NamespaceException {
        return (Sentinel) Namespace.lookup(DEFAULT_NAMESPACE_NAME);
    }

    /**
     * Safely stop the monitoring Thread
     */
    public void quit() {
        logger.info("Stopping Sentinel...");
        this.timeToQuit = true;
        ThreadUtils.findThreadsByName(DEFAULT_NAMESPACE_NAME).forEach(Thread::interrupt);
    }

    /**
     * Runnable interface where we get to monitor stuff
     */
    @Override
    public void run() {
        logger.info("Sentinel is watching");
        while (!this.timeToQuit) {
            // Delay this loop
            try {
                Thread.sleep(TimeUnit.MINUTES.toMillis(pollingInterval));
                logger.debug("Sentinel is still watching");
                watch();
            } catch (InterruptedException ignore) {
                Thread.currentThread().interrupt();
            } catch (NamespaceException e) {
                logger.error("There was an error in lookup", e);
            }
        }
        Namespace.unbind(DEFAULT_NAMESPACE_NAME);
        logger.info("Sentinel stopped");
    }

    @Override
    public String toString() {
        return "Watching agents with " + protocols;
    }

    /**
     * Get the Configurator
     */
    protected void configure() {
        try {
            this.config = ConfigUtil.getConfigInfo(Sentinel.class);
            init();
        } catch (IOException e) {
            logger.warn("Cannot read Sentinel.cfg, taking default values");
        }
    }

    /**
     * Initialize Protocols
     */
    protected void init() {
        this.enabled = config.findBooleanEntry("ENABLED", false);
        if (this.enabled) {
            this.pollingInterval = this.config.findIntEntry("POLLING_INTERVAL_MINUTES", 5);

            logger.trace("Sentinel protocols initializing...");
            for (String protocolConfig : this.config.findEntries("PROTOCOL")) {
                try {
                    Protocol protocol = new Protocol(protocolConfig);
                    if (protocol.isEnabled()) {
                        logger.debug("Sentinel protocol initialized {}", protocol);
                        this.protocols.add(protocol);
                    } else {
                        logger.debug("Sentinel protocol disabled {}", protocol);
                    }
                } catch (RuntimeException e) {
                    logger.warn("Unable to configure Sentinel Protocol[{}]: {}", protocolConfig, e.getMessage());
                }
            }
            if (this.protocols.isEmpty()) {
                logger.warn("Sentinel initialization failed due to no protocols found, disabling");
                this.enabled = false;
            } else {
                logger.info("Sentinel initialized protocols {}", protocols);
            }
        }
    }

    /**
     * Checks to see if the mobile agents are processing the same data since the last polling event
     *
     * @throws NamespaceException if there is a problem looking up resources in the {@link Namespace}
     */
    protected void watch() throws NamespaceException {
        List<String> agentKeys = Namespace.keySet().stream()
                .filter(k -> k.startsWith(MobileAgentFactory.AGENT_NAME))
                .sorted()
                .collect(Collectors.toList());
        for (String agentKey : agentKeys) {
            watch(agentKey);
        }
        protocols.forEach(protocol -> protocol.run(trackers));
    }

    /**
     * Checks to see if the mobile agent is processing the same data since the last polling event
     *
     * @param agentKey the agent key, i.e. MobileAgent-01
     * @throws NamespaceException if there is a problem looking up resources in the {@link Namespace}
     */
    protected void watch(String agentKey) throws NamespaceException {
        logger.trace("Searching for agent [{}]", agentKey);
        IMobileAgent mobileAgent = (IMobileAgent) Namespace.lookup(agentKey);
        Tracker trackedAgent = trackers.computeIfAbsent(mobileAgent.getName(), Tracker::new);
        if (mobileAgent.isInUse()) {
            if (!Objects.equals(mobileAgent.agentId(), trackedAgent.getAgentId())
                    || !Objects.equals(mobileAgent.getLastPlaceProcessed(), trackedAgent.getDirectoryEntryKey())) {
                trackedAgent.clear();
                trackedAgent.setAgentId(mobileAgent.agentId());
                trackedAgent.setDirectoryEntryKey(mobileAgent.getLastPlaceProcessed());
            }
            trackedAgent.incrementTimer(pollingInterval);
            logger.trace("Agent acquired {}", trackedAgent);
        } else {
            trackedAgent.clear();
            logger.trace("Agent not in use [{}]", agentKey);
        }
    }

    public static class Tracker implements Comparable<Tracker> {
        private final String agentName;
        private String agentId;
        private String shortName;
        private String directoryEntryKey;
        private long timer = -1;

        public Tracker(String agentName) {
            this.agentName = agentName;
        }

        public String getAgentName() {
            return agentName;
        }

        public String getAgentId() {
            return agentId;
        }

        public void setAgentId(String agentId) {
            if (StringUtils.contains(agentId, "No_AgentID_Set")) {
                clear();
            } else {
                this.agentId = agentId;
                if (StringUtils.contains(agentId, "Agent-")) {
                    this.shortName = getShortName(agentId);
                }
            }
        }

        public String getShortName() {
            return shortName;
        }

        public static String getShortName(String agentId) {
            return StringUtils.substringAfter(StringUtils.substringAfter(agentId, "Agent-"), "-");
        }

        public String getDirectoryEntryKey() {
            return directoryEntryKey;
        }

        public void setDirectoryEntryKey(String directoryEntryKey) {
            this.directoryEntryKey = directoryEntryKey;
        }

        public String getPlaceName() {
            return getPlaceName(this.directoryEntryKey);
        }

        public static String getPlaceName(String directoryEntryKey) {
            return StringUtils.defaultString(StringUtils.substringAfterLast(directoryEntryKey, "/"));
        }

        public long getTimer() {
            return timer;
        }

        public void resetTimer() {
            this.timer = -1;
        }

        public void incrementTimer(long time) {
            if (this.timer == -1) {
                this.timer = 0;
            } else {
                this.timer += time;
            }
        }

        public void clear() {
            this.agentId = "";
            this.shortName = "";
            this.directoryEntryKey = "";
            resetTimer();
        }

        @Override
        public int compareTo(Tracker o) {
            return this.agentName.compareTo(o.agentName);
        }

        @Override
        public String toString() {
            return new StringJoiner(", ", "{", "}")
                    .add("\"agentName\":\"" + agentName + "\"")
                    .add("\"directoryEntry\":\"" + directoryEntryKey + "\"")
                    .add("\"shortName\":\"" + shortName + "\"")
                    .add("\"timeInMinutes\":" + timer)
                    .toString();
        }
    }
}