PickUpSpace.java

package emissary.pickup;

import emissary.server.mvc.adapters.WorkSpaceAdapter;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

/**
 * Implementation of a pick up place that talks to a one or more WorkSpace instances for obtaining distributed work.
 */
public abstract class PickUpSpace extends PickUpPlace implements IPickUpSpace {
    // List of workspace instances to interact with
    protected List<String> openSpaceNames = new ArrayList<>();

    // Map of how many consecutive take errors by workspace name
    protected Map<String, Integer> numConsecutiveTakeErrors = new HashMap<>();

    // Comms adapter
    protected WorkSpaceAdapter tpa = new WorkSpaceAdapter();

    // Map of last bundle size by workspace name
    protected Map<String, Integer> lastBundleSize = new HashMap<>();

    // Map of pending bundles to workspace name to facilitate replying
    protected Map<String, String> pendingBundles = new HashMap<>();

    // Number of consecutive take errors that cause space to close
    protected static final int TAKE_ERROR_MAX = 10;

    /**
     * Create using default configuration
     */
    public PickUpSpace() throws IOException {
        super();
    }

    /**
     * Create one
     * 
     * @param configInfo path to config file
     * @param dir string key of the directory to register with
     * @param placeLocation string key of this place
     */
    public PickUpSpace(String configInfo, @Nullable String dir, String placeLocation) throws IOException {
        super(configInfo, dir, placeLocation);
    }

    /**
     * Create one, figuring out the directory automatically
     * 
     * @param configInfo path to config file
     * @param placeLocation string key of this place
     */
    public PickUpSpace(String configInfo, String placeLocation) throws IOException {
        this(configInfo, null, placeLocation);
    }

    public PickUpSpace(InputStream configInfo) throws IOException {
        super(configInfo);
    }

    /**
     * Create one with stream config
     * 
     * @param configStream path to config file
     * @param theDir string key of the directory to register with
     * @param thePlaceLocation string key of this place
     */
    public PickUpSpace(InputStream configStream, String theDir, String thePlaceLocation) throws IOException {
        super(configStream, theDir, thePlaceLocation);
    }

    /**
     * Open a TreeSpace when told and start asking it for data
     * 
     * @param spaceName the remote name of the space to open
     */
    @Override
    public void openSpace(String spaceName) {
        if (openSpaceNames.contains(spaceName)) {
            logger.debug("Open spaces already includes " + spaceName);
        } else {
            openSpaceNames.add(spaceName);
            numConsecutiveTakeErrors.put(spaceName, 0);
            lastBundleSize.put(spaceName, 0);
            logger.debug("Added space " + spaceName + " (" + openSpaceNames.size() + ")");
        }
    }

    /**
     * Close down the named workspace
     */
    @Override
    public void closeSpace(String spaceName) {
        logger.info("Closing down connection to " + spaceName);
        openSpaceNames.remove(spaceName);
        lastBundleSize.remove(spaceName);
        numConsecutiveTakeErrors.remove(spaceName);
    }

    /**
     * Return name of the first space on the list or null if none
     */

    @Override
    @Nullable
    public String getSpaceName() {
        if (!openSpaceNames.isEmpty()) {
            return openSpaceNames.get(0);
        } else {
            return null;
        }
    }

    /**
     * Return the names of all the spaces on the list
     */
    @Override
    public List<String> getSpaceNames() {
        return new ArrayList<>(openSpaceNames);
    }

    /**
     * Return the count of how many spaces are on the list
     */
    @Override
    public int getSpaceCount() {
        return openSpaceNames.size();
    }

    /**
     * Take up to one item from eacho space that is active This can result in workspace instances being removed from the
     * list if we get a close message from one or if the threshold of consecutive errors is crossed
     * 
     * @return true if we got at least one
     */
    @Override
    public boolean take() {
        if (openSpaceNames.isEmpty()) {
            logger.debug("Cannot perform 'take' when no spaces are available");
            return false;
        }

        // Keep track of space we may have to close
        List<String> closers = new ArrayList<>();

        // We will take up to one bundle per workspace
        int countTaken = 0;
        for (String openSpaceName : openSpaceNames) {
            WorkBundle path = null;
            try {
                path = tpa.outboundWorkSpaceTake(openSpaceName, myKey);
            } catch (RuntimeException ex) {
                logger.error("Failed to take work from " + openSpaceName, ex);
            }

            if (path == null) {
                // Error, record it, but might be transient
                logger.error("Got a null WorkBundle from " + openSpaceName);
                numConsecutiveTakeErrors.put(openSpaceName, numConsecutiveTakeErrors.get(openSpaceName) + 1);
            } else if (path.size() == 0) {
                // Close out message
                closers.add(openSpaceName);
            } else {
                logger.debug("Received bundle of " + path.size() + " from " + openSpaceName);
                lastBundleSize.put(openSpaceName, path.size());
                numConsecutiveTakeErrors.put(openSpaceName, 0);
                pendingBundles.put(path.getBundleId(), openSpaceName);
                if (!enque(path)) {
                    logger.error("Unable to enqueue bundle " + path.getBundleId() + " from " + openSpaceName + ", losing it.");
                }
                countTaken++;
            }
        }
        cleanupFailedSpaces(closers);
        return countTaken > 0;
    }

    /**
     * Clean up any spaces that have crosse the consecutive error message threshold and any that are specified in the
     * argument
     * 
     * @param forceClosers additional spaces to close
     */
    protected void cleanupFailedSpaces(List<String> forceClosers) {
        List<String> closers = new ArrayList<>(forceClosers);
        for (String s : openSpaceNames) {
            if (getNumConsecutiveTakeErrors(s) > TAKE_ERROR_MAX) {
                logger.error("Closing down space " + s + " due to repeated errors");
                closers.add(s);
            }
        }

        for (String s : closers) {
            closeSpace(s);
        }

        if (!closers.isEmpty()) {
            logger.debug("Cleaned up " + closers.size() + " workspace instances, " + openSpaceNames.size() + " remaining");
        }
    }

    /**
     * Notify controlling space that a bundle is completed
     * 
     * @param bundleId the bundle that was completed
     * @param itWorked true if bundle processed normally
     */
    @Override
    public void bundleCompleted(String bundleId, boolean itWorked) {
        String openSpaceName = pendingBundles.get(bundleId);
        if (openSpaceName == null) {
            logger.debug("Space is gone before we could notify " + " bundle completion for " + bundleId);
        } else {
            pendingBundles.remove(bundleId);
            tpa.outboundBundleCompletion(openSpaceName, myKey, bundleId, itWorked);
        }
    }

    /**
     * Count consecutive times a WorkSpace.take() made an error
     */
    @Override
    public int getNumConsecutiveTakeErrors(String spaceName) {
        return numConsecutiveTakeErrors.get(spaceName);
    }

    /**
     * The size of the last WorkBundle successfully received
     */
    @Override
    public int getBundleSize(String spaceName) {
        return lastBundleSize.get(spaceName);
    }

    /**
     * Put a new WorkBundle on the queue
     * 
     * @param path the newly arrived WorkBundle object
     */
    @Override
    public abstract boolean enque(WorkBundle path);

    /**
     * Get the available size of the queue
     */
    public abstract int getQueSize();
}