FilePickUpClient.java

package emissary.pickup.file;

import emissary.core.IBaseDataObject;
import emissary.parser.SessionParser;
import emissary.pickup.IPickUp;
import emissary.pickup.IPickUpSpace;
import emissary.pickup.PickUpSpace;
import emissary.pickup.PickupQueue;
import emissary.pickup.QueServer;
import emissary.pickup.WorkBundle;
import emissary.pickup.WorkUnit;
import emissary.util.Hexl;
import emissary.util.TimeUtil;

import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.io.InputStream;
import java.security.MessageDigest;
import java.util.Collection;
import java.util.Map;
import javax.annotation.Nullable;

import static emissary.core.constants.Parameters.INPUT_FILEDATE;
import static emissary.core.constants.Parameters.INPUT_FILENAME;
import static emissary.core.constants.Parameters.ORIGINAL_FILENAME;

/**
 * Pull bundles of file info from a WorkSpace and process as a normal FilePickUp. Monitors a queue rather than a
 * directory, but reads files from disk as specified in the received WorkBundle objects. Whether workBundles are
 * processed in simpleMode or not is controlled by the bundle settings not by the inherited configuration of this
 * client.
 */
public class FilePickUpClient extends PickUpSpace implements IPickUp {
    /**
     * These parameters determine the enqueing behavior. The desire is to minimize the number of remote calls from WorkSpace
     * or Distributor to an instance of this class with the getQueSize method, and at the same keep all of the places busy.
     * We do this by making the MAX_QUE_SIZE large enough to hold enough files to be processed in pollingInterval. BUT we
     * don't just make the MAX_QUE_SIZE huge because then we use too much memory. Some feeds put stuff on the Que in blocks.
     * If our que is a prime numbered size they cannot fill it completely, which will help prevent blocking maybe.
     */
    protected int pollingInterval = 500;
    protected int maxQueSize = 5;
    protected QueServer queServer;

    // work bundle currently being processed
    @Nullable
    protected WorkBundle currentBundle = null;
    @Nullable
    protected WorkUnit currentWorkUnit = null;

    protected String unixInRoot;
    protected String unixOutRoot;
    protected String digestHashType;

    @Nullable
    protected MessageDigest digest = null;

    /**
     * Create using default configuration
     */
    public FilePickUpClient() throws IOException {
        super();
        configurePlace();
        configureQueueServer();
    }

    /**
     * Create, configure, and register
     */
    public FilePickUpClient(String configInfo, String dir, String placeLoc) throws IOException {
        super(configInfo, dir, placeLoc);
        configurePlace();
        configureQueueServer();
    }

    /**
     * Create, configure, and register
     */
    public FilePickUpClient(InputStream configInfo, String dir, String placeLoc) throws IOException {
        super(configInfo, dir, placeLoc);
        configurePlace();
        configureQueueServer();
    }

    public FilePickUpClient(InputStream configInfo) throws IOException {
        super(configInfo);
        configurePlace();
        configureQueueServer();
    }

    /**
     * Configure this place
     */
    protected void configurePlace() {
        pollingInterval = configG.findIntEntry("POLLING_INTERVAL", pollingInterval);
        maxQueSize = configG.findIntEntry("MAX_QUE_SIZE", maxQueSize);
        unixInRoot = configG.findStringEntry("UNIX_IN_ROOT", null);
        unixOutRoot = configG.findStringEntry("UNIX_OUT_ROOT", null);
        digestHashType = configG.findStringEntry("DIGEST_HASH_TYPE", "SHA-256");

        try {
            digest = MessageDigest.getInstance(digestHashType);
        } catch (Exception ex) {
            logger.warn("Could not initialize message digest: ", ex);
        }
    }

    protected void configureQueueServer() {
        queServer = new FileQueServer(this, new PickupQueue(maxQueSize), pollingInterval);
        queServer.start();
    }

    /**
     * Shut down our que server thread and deregister the place
     */
    @Override
    public void shutDown() {
        logger.info("*** Shutting Down: " + keys.get(0));
        if (queServer != null) {
            logger.info("*** Stopping queue monitor ");
            queServer.shutdown();
        }
        super.shutDown();
    }

    /**
     * Pause the QueServer to stop taking work
     */
    @Override
    public void pause() {
        logger.info("*** Pausing {} for {}", queServer.getClass().getName(), getClass().getName());
        queServer.pause();
    }

    /**
     * Unpause the QueServer to start taking work
     */
    @Override
    public void unpause() {
        logger.info("*** Unpausing {} for {}", queServer.getClass().getName(), getClass().getName());
        queServer.unpause();
    }

    /**
     * Check the status of the QueServer to see if it is taking work
     *
     * @return true if the QueServer is paused, false otherwise
     */
    @Override
    public boolean isPaused() {
        return queServer.isPaused();
    }

    /**
     * Hook for subclasses to alter the file path perhaps based on the OS at runtime. Allows one set of configuration paths
     * to work on the system across operating systems.
     *
     * @param path file path to alter
     * @return altered path
     */
    protected String fixFilePath(String path) {
        return path.replace('\\', '/');
    }

    /**
     * Find a file in the holding area that matches our guy.
     */
    @Nullable
    protected File findFileInHoldingArea(File f, @Nullable String eatPrefix) {
        if (holdingArea != null) {
            String fpart = f.getName();
            if (eatPrefix != null) {
                fpart = f.getPath().substring(eatPrefix.length());
            }

            // See if it is sitting at the InProcess level
            File hf = new File(holdingArea + fpart);
            if (hf.exists()) {
                logger.debug("Data recovered from holding area " + hf);
                return hf;
            }
            logger.debug("File did not exist in InProcess area directoy as " + hf);


            // Or if it is one level down due to an emissary.node.name-emissary.node.port dir
            for (File subdir : new File(holdingArea).listFiles(new FileFilter() {
                @Override
                public boolean accept(File d) {
                    return d.isDirectory();
                }
            })) {
                File hdf = new File(subdir + fpart);
                if (hdf.exists()) {
                    logger.debug("Data recovered from holding subdir " + hdf);
                    return hdf;
                }
                logger.debug("File did not exist in nested InProcess area as " + hdf);
            }
        }
        return null;
    }

    /**
     * Call back from queue server when a new bundle is dequeued for processing.
     *
     * @param paths the dequeued item
     * @return true if the files in the WorkBundle were handled
     */
    protected boolean processBundle(WorkBundle paths) {
        boolean success = true;
        currentBundle = paths; // for use by callbacks
        String outputRoot = fixFilePath(paths.getOutputRoot());
        String prefix = "";
        if (null != paths.getEatPrefix()) {
            prefix = fixFilePath(paths.getEatPrefix());
        }

        for (String path : paths.getFileNameList()) {
            boolean wasInHoldingArea = false;
            String opath = path;
            path = fixFilePath(path);
            File f = new File(path);
            String fixedName = fixFileName(f.getName());
            // Ensure it exists
            if (!f.exists()) {
                // If the errorCount is > 0 look in the holding area
                if (paths.getErrorCount() > 0) {
                    logger.debug("Looking for " + f + " in holding area using eatPrefix of " + prefix);
                    File holdFile = findFileInHoldingArea(f, prefix);
                    if (holdFile != null) {
                        logger.info("Switching to found holdArea file " + holdFile);
                        wasInHoldingArea = true;
                        f = holdFile;
                    } else {
                        logger.debug("File was not found in holding area " + f + " using eatPrefix of " + prefix);
                    }
                } else {
                    logger.debug("File does not exist but had errorCount of 0 so not looking in holding area");
                }
            }

            if (!f.exists()) {
                // Try to get the data from the workspace
                logger.debug("Non-existent file " + opath);
                continue;
            }

            // Ensure it can be read
            if (!f.canRead()) {
                logger.warn("Sorry, Cannot read file: " + f.getPath());
                continue;
            }

            // Only process files here, but give a hook
            // for subclasses to handle other things
            if (!f.isFile()) {
                processDirectoryEntry(outputRoot, prefix, paths.getCaseId(), f, paths.getSimpleMode());
                continue;
            }

            // Make sure it is big enough to process
            if (f.length() <= minimumContentLength) {
                logger.warn("Sorry, This file is too small (" + f.length() + " <" + minimumContentLength + "): " + path);
                // No record is made of too small items
                continue;
            }

            // Make sure it is not too big to process
            boolean isOversize = false;
            if (maximumContentLength != -1 && f.length() > maximumContentLength) {
                logger.warn("Sorry, This file is too large (" + f.length() + " <" + maximumContentLength + "): " + path);
                isOversize = true;
                // Let it continue on knowing it is too big
                // as we may need a record of the file
            }

            // Possibly rename the file to a holding area
            // if one is defined
            File toProcess = getInProcessFileNameFor(f, wasInHoldingArea ? holdingArea : prefix);
            if (holdingArea != null && toProcess != null && !wasInHoldingArea) {
                if (!renameToInProcessAreaAs(f, toProcess)) {
                    logger.error("File: " + f.getPath() + " Could not be renamed to: " + toProcess.getPath());
                    continue;
                }
            } else {
                toProcess = f;
            }

            // Start the processing. The file may be in the original
            // location or may be in the holding area
            try {
                success = processDataFile(toProcess, fixedName, isOversize, paths.getSimpleMode(), outputRoot);
                logger.debug("Finished with processDataFile on " + toProcess + " as " + fixedName);
            } catch (Exception e) {
                // Return false and let another
                // processor have a try at this work bundle
                // TODO: What is some files work and some fail?
                handleErrorInBundledFile(toProcess, fixedName, isOversize, simpleMode, e);
                success = false;
                break;
            }
        }
        logger.debug("Finished processBundle " + paths.getBundleId() + " " + (success ? "success" : "failure"));
        return success;
    }

    protected void handleErrorInBundledFile(File toProcess, String fixedName, boolean isOversize, boolean simpleMode, Exception e) {
        // Error either way but louder if debug is on
        if (logger.isDebugEnabled()) {
            logger.error("Cannot complete " + toProcess.getPath() + " as " + fixedName + " [isOversize=" + isOversize + ", simpleMode="
                    + simpleMode + "]", e);
        } else {
            logger.error("Cannot complete " + toProcess.getPath() + " as " + fixedName, e);
        }

        String errDir = getErrorArea();
        // Move the problem file to the error area if there is one
        if (errDir != null) {
            if (!toProcess.renameTo(new File(errDir, toProcess.getName()))) {
                logger.error("Cannot rename " + toProcess.getName() + " to the error location " + errDir);
            } else {
                logger.error("Moved " + toProcess + " to the errorArea " + errDir);
            }
        } else {
            logger.error("There is no configured errorArea in which to drop failed input files like " + toProcess);
        }
    }

    /**
     * Add incoming information to the queue of file names to process and notify anyone waiting on the queue
     *
     * @param paths the WorkBundle object containing files to queue up
     * @return true if it was enqueued, false if we are too busy to handle it
     */
    @Override
    public boolean enque(WorkBundle paths) {
        return queServer.enque(paths);
    }

    /**
     * Return the size of the queue so push mode doesn't send us too much.
     *
     * @return available size on queue
     */
    @Override
    public int getQueSize() {
        return queServer.getQueSize();
    }

    /**
     * A little thread class to wake up once in a while and check the queue for data objects.
     */
    protected class FileQueServer extends QueServer {
        public FileQueServer(IPickUpSpace space, PickupQueue queue, long pollingInterval) {
            super(space, queue, pollingInterval, "FileQueServer");
        }

        /**
         * When taking an item from the queue process it our custom way
         *
         * @param path the bundle from the queue
         */
        @Override
        public boolean processQueueItem(WorkBundle path) {
            return processBundle(path);
        }
    }

    /**
     * Add in a target bin parameter with user and date Override point for subclasses
     *
     * @param d the nascent data object from the SessionProducer
     * @param f the file it came from
     */
    @Override
    protected void dataObjectCreated(IBaseDataObject d, File f) {
        super.dataObjectCreated(d, f);
        String fixedDirName = fixFileName(f.getParent()).replace('\\', '/');
        String eatPrefix = currentBundle.getEatPrefix();
        currentBundle.getOutputRoot();
        boolean simpleParam = Boolean.parseBoolean(d.getStringParameter("SIMPLE_MODE"));
        if (eatPrefix != null && eatPrefix.length() > 0 && fixedDirName.startsWith(eatPrefix)) {
            fixedDirName = fixedDirName.substring(eatPrefix.length());
        }


        // payloadHandler.setup(d);
        d.putParameter("TARGETBIN", fixedDirName);
        d.putParameter(SessionParser.ORIG_DOC_SIZE_KEY, Integer.valueOf(d.dataLength()));
        d.setPriority(currentBundle.getPriority());

        // Fix up the complete path
        String ep = currentBundle.getEatPrefix();
        String fn = f.getAbsolutePath();
        if (ep != null && fn.startsWith(ep)) {
            fn = fn.substring(ep.length());
        }

        if (simpleParam) {
            d.putParameter(ORIGINAL_FILENAME, fn);
        }

        d.putParameter(INPUT_FILEDATE, TimeUtil.getDateAsISO8601(f.lastModified()));
        d.putParameter(INPUT_FILENAME, f.getName());

        // Fix up the case/project metadata, e.g. PROJECT:GERONIMO22
        String cid = currentBundle.getCaseId();
        if (cid != null && cid.indexOf(":") > 0) {
            String[] parts = cid.split(":");
            if (d.getParameter(parts[0]) == null) {
                d.putParameter(parts[0], parts[1]);
            }

            if (simpleParam && fn != null && digest != null) {
                final MessageDigest theDigest = this.digest;
                synchronized (theDigest) {
                    theDigest.reset();
                    byte[] hash = theDigest.digest(fn.getBytes());
                    d.setFilename(parts[1] + "-" + Hexl.toUnformattedHexString(hash));
                }
            }
        } else {
            // Take care of the caseid
            String fixedCaseId = caseIdHook(cid, d.shortName(), f.toString(), d.getParameters());
            if (fixedCaseId == null) {
                // current yyyyjjj
                fixedCaseId = TimeUtil.getCurrentDateOrdinal();
            }
            d.putParameter("DATABASE_CASE_ID", fixedCaseId);
        }
    }

    /**
     * Generate a filename using the file's path and a prefix
     * 
     * @param filePath the path of the file
     * @param prefix a prefix to prepend to the resultant filename
     * @return the generated filename
     */
    protected String createFilename(String filePath, String prefix) {
        final MessageDigest theDigest = this.digest;
        synchronized (theDigest) {
            theDigest.reset();
            byte[] hash = theDigest.digest(filePath.getBytes());
            return new File(prefix + "-" + Hexl.toUnformattedHexString(hash)).getName();
        }
    }

    /**
     * Hook to allow derived classes to handle various aspects of caseId generation. This do-nothing impementation just
     * returns the caseId argument unchanged.
     *
     * @param initialCaseId the initial case id
     * @param sessionName name of the current session
     * @param fileName path and name of file from File.path()
     * @param metadata Map of data object metadata accumulated so far
     * @return fixed up name of the caseId
     */
    protected String caseIdHook(String initialCaseId, String sessionName, String fileName, Map<String, Collection<Object>> metadata) {
        return initialCaseId;
    }


    /**
     * Allow subclasses to do things with work bundles containing directory entries. This would be highly unusual.
     *
     * @param root the outputRoot of the current work bundle
     * @param prefix the prefix of the current work bundle
     * @param caseid the caseid of the current work bundle
     * @param dir the directory entry encountered
     * @param simpleMode true if the workBundle indicated simpleMode
     */
    protected void processDirectoryEntry(String root, String prefix, String caseid, @Nullable File dir, boolean simpleMode) {
        if (dir != null) {
            logger.warn("Entry " + dir.getName() + " ignored");
        }
    }

}