FileDataServer.java

  1. package emissary.pickup.file;

  2. import emissary.core.Pausable;
  3. import emissary.log.MDCConstants;

  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.slf4j.MDC;

  7. import java.io.File;
  8. import java.io.FilenameFilter;
  9. import javax.annotation.Nullable;

  10. /**
  11.  * Thread to monitor a directory for files
  12.  */
  13. public class FileDataServer extends Pausable {

  14.     // Logger
  15.     protected static final Logger logger = LoggerFactory.getLogger(FileDataServer.class);

  16.     // Directory we will monitor
  17.     protected String theDataDir;
  18.     protected File theDirectory;

  19.     // Ref to my owner
  20.     @Nullable
  21.     protected FilePickUpPlace myParent = null;

  22.     // protected int filesInList;

  23.     // Thread safe termination
  24.     protected boolean timeToShutdown = false;

  25.     // How often to check in millis
  26.     protected long pollingInterval = 1000;

  27.     // How many files to group together
  28.     protected int bundleSize = 20;

  29.     /**
  30.      * Create the directory monitor
  31.      *
  32.      * @param inputDataDirectory directory path to monitor
  33.      * @param parent the FPP that created me
  34.      * @param pollingInterval how often to check for new files in millis
  35.      */
  36.     @SuppressWarnings("ThreadPriorityCheck")
  37.     public FileDataServer(String inputDataDirectory, FilePickUpPlace parent, long pollingInterval) {

  38.         // Name the thread
  39.         super("FileInput-" + inputDataDirectory);

  40.         myParent = parent;
  41.         this.pollingInterval = pollingInterval;
  42.         theDataDir = inputDataDirectory;
  43.         theDirectory = new File(inputDataDirectory);

  44.         if (!theDirectory.isDirectory()) {
  45.             logger.warn(theDirectory.getName() + " is not a directory");
  46.         }

  47.         // Set our priority to below agent processing priority
  48.         this.setPriority(Thread.NORM_PRIORITY - 2);

  49.         this.setDaemon(true);
  50.     }

  51.     /**
  52.      * Set the number of files to group when polling
  53.      *
  54.      * @param sz the new value for bundleSize
  55.      */
  56.     public void setBundleSize(int sz) {
  57.         bundleSize = sz;
  58.     }

  59.     /**
  60.      * Implement the run method from Thread to start monitoring Runs until the shutdown() method is called
  61.      */
  62.     @Override
  63.     public void run() {

  64.         // Loop can be terminated by calling the shutdown() method
  65.         while (!timeToShutdown) {

  66.             if (checkPaused()) {
  67.                 continue;
  68.             }

  69.             String holdDir = myParent.getInProcessArea();
  70.             String errDir = myParent.getErrorArea();

  71.             // Process files currently in the pickup directory, list
  72.             // the first bundleSize in a batch
  73.             String[] fileList = theDirectory.list(new FilenameFilter() {
  74.                 final int maxFileToList = bundleSize;
  75.                 int filesInList = 0;

  76.                 @Override
  77.                 public boolean accept(File dir, String name) {
  78.                     return !name.startsWith(".") && ++filesInList <= maxFileToList;
  79.                 }
  80.             });

  81.             // Rename all the selected files out of the polling area
  82.             for (int i = 0; fileList != null && i < fileList.length; i++) {
  83.                 File f = new File(theDataDir, fileList[i]);

  84.                 if (!f.exists() || !f.isFile() || !f.canRead()) {
  85.                     reportProblem(f, errDir);
  86.                     continue;
  87.                 }

  88.                 // Move to in process area
  89.                 File newFile = new File(holdDir, fileList[i]);
  90.                 if (!f.renameTo(newFile)) {
  91.                     // This is normal when many FileDataServers
  92.                     // on multiple machines are looking at the
  93.                     // same underlying filesystem space
  94.                     logger.warn("FileDataServer - file: " + f.getPath() + " Could not be renamed to: " + newFile.getPath());
  95.                     fileList[i] = null;
  96.                 }
  97.             }

  98.             int processedCount = 0;

  99.             // Process the batch of files just collected, if any
  100.             for (int i = 0; fileList != null && i < fileList.length; i++) {

  101.                 if (fileList[i] == null) {
  102.                     continue;
  103.                 }

  104.                 // Notify parent to process file
  105.                 File newFile = new File(holdDir, fileList[i]);
  106.                 try {
  107.                     MDC.put(MDCConstants.SHORT_NAME, fileList[i]);
  108.                     myParent.processDataFile(newFile);
  109.                     processedCount++;
  110.                 } catch (Exception e) {
  111.                     logger.warn("***Cannot process {}", newFile, e);
  112.                     boolean renamed = newFile.renameTo(new File(errDir, newFile.getName()));
  113.                     if (!renamed) {
  114.                         logger.warn("***Cannot move {} to the error directory {}", newFile, errDir);
  115.                     }
  116.                 } finally {
  117.                     MDC.remove(MDCConstants.SHORT_NAME);
  118.                 }
  119.             }


  120.             // Delay for the polling interval if there was
  121.             // nothing to do for this last round, otherwise
  122.             // get right back in there...
  123.             if (processedCount == 0) {
  124.                 try {
  125.                     Thread.sleep(pollingInterval);
  126.                 } catch (InterruptedException e) {
  127.                     Thread.currentThread().interrupt();
  128.                 }
  129.             }

  130.         } // end while
  131.     }

  132.     /**
  133.      * Report the problem file and move it to the error location
  134.      *
  135.      * @param f the file having the problem
  136.      */
  137.     protected void reportProblem(File f, String errDir) {
  138.         MDC.put(MDCConstants.SHORT_NAME, f.getPath());
  139.         try {
  140.             String n = f.getName();
  141.             boolean renamed = false;

  142.             if (f.exists()) {
  143.                 if (!f.canRead()) {
  144.                     logger.warn("FileDataServer: cannot read file");
  145.                     renamed = f.renameTo(new File(errDir, n));
  146.                 }

  147.                 else if (!f.isFile()) {
  148.                     logger.warn("FileDataServer: file is not a normal file");
  149.                     renamed = f.renameTo(new File(errDir, n));
  150.                 }

  151.                 else if (f.length() <= 0) {
  152.                     logger.warn("FileDataServer: file has zero size");
  153.                     renamed = f.renameTo(new File(errDir, n));
  154.                 }

  155.                 if (!renamed) {
  156.                     logger.warn("File could not be moved");
  157.                 }
  158.             } else {
  159.                 logger.warn("File does not exist");
  160.             }
  161.         } finally {
  162.             MDC.remove(MDCConstants.SHORT_NAME);
  163.         }
  164.     }

  165.     /**
  166.      * Shutdown the thread
  167.      */
  168.     public void shutdown() {
  169.         timeToShutdown = true;
  170.     }

  171. }