AgentPool.java

package emissary.pool;

import emissary.core.IMobileAgent;
import emissary.core.Namespace;
import emissary.core.NamespaceException;

import org.apache.commons.pool2.impl.GenericObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import javax.annotation.Nullable;

/**
 * Extends the GenericObjectPool to hold MobileAgents, each on it's own thread.
 */
public class AgentPool extends GenericObjectPool<IMobileAgent> {

    private static final int MAX_CALCULATED_AGENT_COUNT = 50;
    private static final int BYTES_IN_GIGABYTES = 1073741824;

    /**
     * The default name by which we register into the namespace
     */
    protected static final String DEFAULT_NAMESPACE_NAME = "AgentPool";

    /**
     * Super class has private access on factory, so save here
     */
    protected MobileAgentFactory factory;

    /**
     * Our logger
     */
    protected static final Logger logger = LoggerFactory.getLogger(AgentPool.class);

    /**
     * The name used by this pool
     */
    protected String namespaceName;

    private final int initialPoolSize;

    /**
     * Compute the default size for the pool
     * 
     * @param maxMemoryInBytes System max memory used in calculating pool size
     * @param poolSizeOverride User set property for pool size
     */
    protected static int computePoolSize(final long maxMemoryInBytes, @Nullable final Integer poolSizeOverride) {

        // Override based on property
        if (poolSizeOverride != null && poolSizeOverride > 0) {
            logger.debug("Default pool size from properties {}", poolSizeOverride);
            return poolSizeOverride;
        }
        // Check that maxMemoryInBytes is a valid argument
        if (maxMemoryInBytes <= 0) {
            throw new IllegalArgumentException("Must be greater then zero.");
        }

        // 15 if less than 1 Gb
        // 20 for first Gb, +5 for each additional Gb, no more then 50 when calculated
        int size = (((int) (maxMemoryInBytes / BYTES_IN_GIGABYTES) - 1) * 5) + 20;
        size = Math.min(size, MAX_CALCULATED_AGENT_COUNT);
        logger.debug("Computed default pool size of {}", size);

        return size;
    }

    /**
     * Compute the default size for the pool
     */
    public static int computePoolSize() {
        final Integer poolSizeProperty = Integer.getInteger("agent.poolsize", null);
        final long maxMemoryInBytes = Runtime.getRuntime().maxMemory();
        return computePoolSize(maxMemoryInBytes, poolSizeProperty);
    }

    /**
     * Create and configure the pool using the default name and size
     * 
     * @param factory pool object producer
     */
    public AgentPool(MobileAgentFactory factory) {
        this(factory, AgentPool.computePoolSize(), DEFAULT_NAMESPACE_NAME);
    }

    /**
     * Create and configure the pool using the default name
     * 
     * @param maxActive max pool size
     * @param factory pool object producer
     */
    public AgentPool(MobileAgentFactory factory, int maxActive) {
        this(factory, maxActive, DEFAULT_NAMESPACE_NAME);
    }


    /**
     * Create and configure the pool using the specified name
     * 
     * @param factory pool object producer
     * @param maxActive max pool size
     * @param name name of the pool in the namespace
     */
    public AgentPool(MobileAgentFactory factory, int maxActive, String name) {
        super(factory);
        this.factory = factory;
        initialPoolSize = maxActive;
        configurePool(name);
    }

    /**
     * Configure the commons pool stuff based on our requirements
     * 
     * @param name name of the pool in the namespace
     */
    protected void configurePool(String name) {
        namespaceName = name;

        // Set blocking policy
        setBlockWhenExhausted(true);

        // Set maximum wait time when blocking on exhausted pool
        setMaxWait(Duration.ofMinutes(50));

        logger.debug("Configuring AgentPool to use {} agents", initialPoolSize);

        setMaxTotal(initialPoolSize);
        setMinIdle(initialPoolSize);
        setMaxIdle(initialPoolSize);

        bindPool();
        fillPool();
    }

    /**
     * Ensure the pool is full
     */
    protected void fillPool() {
        int level = getMaxTotal();
        // fill in the pool
        for (int i = 0; i < level; i++) {
            try {
                addObject();
            } catch (Exception e) {
                logger.error("Cannot fill AgentPool", e);
            }
        }
    }

    /**
     * Reset the factory. Pool will be emptied and refilled
     * 
     * @param factory the new factory
     */
    public void resetFactory(MobileAgentFactory factory) {
        // Ideally we will need to drop and recreate the entire pool
        // in order to get around this deprecated method, but that has
        // impact on the global namespace, most weirdly for the caller
        // of this method since the reference they hold is obsoleted by
        // making this call
        this.factory = factory;
        emptyPool();
        fillPool();
    }

    /**
     * Bind the pool into the namespace
     */
    protected void bindPool() {
        // register this pool in the namespace
        Namespace.bind(namespaceName, this);
    }

    /**
     * Get the name used to register this pool
     */
    public String getPoolName() {
        return namespaceName;
    }

    /**
     * Get an agent from the pool
     */
    public IMobileAgent borrowAgent() throws Exception {
        try {
            IMobileAgent a = borrowObject();
            logger.trace("POOL borrow active={}", getNumActive());
            return a;
        } catch (Exception e) {
            logger.info("AgentPool.borrowAgent did not work, stats={}", this);
            throw e;
        }
    }

    /*
     * Get the total current agents in the pool
     */
    public synchronized int getCurrentPoolSize() {
        return getNumIdle() + getNumActive();
    }

    protected void emptyPool() {
        int numberKilled = 0;
        int numberToKill = getCurrentPoolSize();
        long waitTil = System.currentTimeMillis() + (30 * 60 * 1000); // 30 min
        logger.debug("Going to kill {} agents", numberToKill);
        try {
            while (getCurrentPoolSize() != 0) {
                if (System.currentTimeMillis() > waitTil) {
                    throw new InterruptedException("Too long, tired of waiting. Some MobileAgents are going to die poorly");
                }

                logger.debug("Emptying pool, {} active, {} idle", getNumActive(), getNumIdle());
                int currentIdle = getNumIdle();
                int killedThisRound = 0;
                setMaxIdle(0); // so the returnAgent call below destroys the agent
                for (int i = 0; i < currentIdle; i++) {
                    IMobileAgent a;
                    try {
                        a = borrowAgent();
                    } catch (Exception e) {
                        logger.error("Error trying to borrowAgent", e);
                        continue;
                    }

                    a.killAgent();
                    numberKilled++;
                    killedThisRound++;

                    try {
                        // destroys the object, needed to decrement the numIdle
                        returnAgent(a);
                    } catch (RuntimeException e) {
                        logger.error("Error trying to returnAgent: {}", a.getName(), e);
                    }
                }
                logger.debug("Killed {} agents this round, {} total killed", killedThisRound, numberKilled);
                // give some space for working agents to be returned
                setMaxIdle(numberToKill - numberKilled);
                Thread.sleep(5000);
            }
            logger.info("Pool is now empty");
        } catch (InterruptedException e) {
            logger.error("emptyPool interrupted", e);
            Thread.currentThread().interrupt();
        } finally {
            setMaxIdle(0); // just in case
        }
    }

    /**
     * Gracefully close down all agents and unbind the pool
     */
    @Override
    public void close() {
        logger.info("Closing the agent pool");
        setMaxTotal(0);
        emptyPool();
        super.close();
        Namespace.unbind(getPoolName());
        logger.info("Done closing the agent pool");
    }

    /**
     * Forcibly stop all agents and unbind the pool
     */
    public void kill() {
        logger.info("Killing the agent pool");
        super.close();
        Namespace.unbind(getPoolName());
        logger.info("Done killing the agent pool");
    }

    /**
     * Return an agent to the pool
     */
    public void returnAgent(IMobileAgent agent) {
        logger.trace("Returning {}", agent.getName());
        returnObject(agent);
        logger.trace("POOL return active={}", getNumActive());
    }

    /**
     * Return the default named agent pool instance from the namespace
     */
    public static AgentPool lookup() throws NamespaceException {
        return (AgentPool) Namespace.lookup(DEFAULT_NAMESPACE_NAME);
    }

    /**
     * Return the specified agent pool instance from the Namespace
     */
    public static AgentPool lookup(String name) throws NamespaceException {
        return (AgentPool) Namespace.lookup(name);
    }

    /**
     * To string for lightweight reporting
     */
    @Override
    public synchronized String toString() {
        return "Poolsize active/idle = " + getNumActive() + "/" + getNumIdle() + " - " + getPoolName();
    }

    /**
     * Get the name of the class being used from the factory
     * 
     * @return class name for the agents
     */
    public String getClassName() {
        return factory.getClassString();
    }

    /**
     * Try to predict whether a borrow will block/grow the pool
     */
    public boolean isAgentAvailable() {
        return getNumIdle() > 0;
    }
}