FileDataServer.java

package emissary.pickup.file;

import emissary.core.Pausable;
import emissary.log.MDCConstants;

import jakarta.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import java.io.File;
import java.io.FilenameFilter;

/**
 * Thread to monitor a directory for files
 */
public class FileDataServer extends Pausable {

    // Logger
    protected static final Logger logger = LoggerFactory.getLogger(FileDataServer.class);

    // Directory we will monitor
    protected String theDataDir;
    protected File theDirectory;

    // Ref to my owner
    @Nullable
    protected FilePickUpPlace myParent = null;

    // protected int filesInList;

    // Thread safe termination
    protected boolean timeToShutdown = false;

    // How often to check in millis
    protected long pollingInterval = 1000;

    // How many files to group together
    protected int bundleSize = 20;

    /**
     * Create the directory monitor
     * 
     * @param inputDataDirectory directory path to monitor
     * @param parent the FPP that created me
     * @param pollingInterval how often to check for new files in millis
     */
    @SuppressWarnings("ThreadPriorityCheck")
    public FileDataServer(String inputDataDirectory, FilePickUpPlace parent, long pollingInterval) {

        // Name the thread
        super("FileInput-" + inputDataDirectory);

        myParent = parent;
        this.pollingInterval = pollingInterval;
        theDataDir = inputDataDirectory;
        theDirectory = new File(inputDataDirectory);

        if (!theDirectory.isDirectory()) {
            logger.warn(theDirectory.getName() + " is not a directory");
        }

        // Set our priority to below agent processing priority
        this.setPriority(Thread.NORM_PRIORITY - 2);

        this.setDaemon(true);
    }

    /**
     * Set the number of files to group when polling
     * 
     * @param sz the new value for bundleSize
     */
    public void setBundleSize(int sz) {
        bundleSize = sz;
    }

    /**
     * Implement the run method from Thread to start monitoring Runs until the shutdown() method is called
     */
    @Override
    public void run() {

        // Loop can be terminated by calling the shutdown() method
        while (!timeToShutdown) {

            if (checkPaused()) {
                continue;
            }

            String holdDir = myParent.getInProcessArea();
            String errDir = myParent.getErrorArea();

            // Process files currently in the pickup directory, list
            // the first bundleSize in a batch
            String[] fileList = theDirectory.list(new FilenameFilter() {
                final int maxFileToList = bundleSize;
                int filesInList = 0;

                @Override
                public boolean accept(File dir, String name) {
                    return !name.startsWith(".") && ++filesInList <= maxFileToList;
                }
            });

            // Rename all the selected files out of the polling area
            for (int i = 0; fileList != null && i < fileList.length; i++) {
                File f = new File(theDataDir, fileList[i]);

                if (!f.exists() || !f.isFile() || !f.canRead()) {
                    reportProblem(f, errDir);
                    continue;
                }

                // Move to in process area
                File newFile = new File(holdDir, fileList[i]);
                if (!f.renameTo(newFile)) {
                    // This is normal when many FileDataServers
                    // on multiple machines are looking at the
                    // same underlying filesystem space
                    logger.warn("FileDataServer - file: " + f.getPath() + " Could not be renamed to: " + newFile.getPath());
                    fileList[i] = null;
                }
            }

            int processedCount = 0;

            // Process the batch of files just collected, if any
            for (int i = 0; fileList != null && i < fileList.length; i++) {

                if (fileList[i] == null) {
                    continue;
                }

                // Notify parent to process file
                File newFile = new File(holdDir, fileList[i]);
                try {
                    MDC.put(MDCConstants.SHORT_NAME, fileList[i]);
                    myParent.processDataFile(newFile);
                    processedCount++;
                } catch (Exception e) {
                    logger.warn("***Cannot process {}", newFile, e);
                    boolean renamed = newFile.renameTo(new File(errDir, newFile.getName()));
                    if (!renamed) {
                        logger.warn("***Cannot move {} to the error directory {}", newFile, errDir);
                    }
                } finally {
                    MDC.remove(MDCConstants.SHORT_NAME);
                }
            }


            // Delay for the polling interval if there was
            // nothing to do for this last round, otherwise
            // get right back in there...
            if (processedCount == 0) {
                try {
                    Thread.sleep(pollingInterval);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }

        } // end while
    }

    /**
     * Report the problem file and move it to the error location
     * 
     * @param f the file having the problem
     */
    protected void reportProblem(File f, String errDir) {
        MDC.put(MDCConstants.SHORT_NAME, f.getPath());
        try {
            String n = f.getName();
            boolean renamed = false;

            if (f.exists()) {
                if (!f.canRead()) {
                    logger.warn("FileDataServer: cannot read file");
                    renamed = f.renameTo(new File(errDir, n));
                }

                else if (!f.isFile()) {
                    logger.warn("FileDataServer: file is not a normal file");
                    renamed = f.renameTo(new File(errDir, n));
                }

                else if (f.length() <= 0) {
                    logger.warn("FileDataServer: file has zero size");
                    renamed = f.renameTo(new File(errDir, n));
                }

                if (!renamed) {
                    logger.warn("File could not be moved");
                }
            } else {
                logger.warn("File does not exist");
            }
        } finally {
            MDC.remove(MDCConstants.SHORT_NAME);
        }
    }

    /**
     * Shutdown the thread
     */
    public void shutdown() {
        timeToShutdown = true;
    }

}