FilePickUpPlace.java
package emissary.pickup.file;
import emissary.pickup.IPickUp;
import emissary.pickup.PickUpPlace;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Monitor one or more directories and pickup files for the system
*/
public class FilePickUpPlace extends PickUpPlace implements IPickUp {
// How often to check directories in millis
protected int pollingInterval = 30000;
// List of FileDataServer objects
protected List<FileDataServer> theDataServer = new ArrayList<>();
// How many files the FileDataServer should group
protected static final int DEFAULT_BUNDLE_SIZE = 20;
protected int bundleSize;
// Input directories to poll
protected String[] inputDataDirs;
/**
* Create using default configuration
*/
public FilePickUpPlace() throws IOException {
super();
configurePlace();
startDataServer();
}
/**
* Create, configure, and register
*/
public FilePickUpPlace(String configInfo, String dir, String placeLoc) throws IOException {
super(configInfo, dir, placeLoc);
configurePlace();
startDataServer();
}
/**
* Configure this place
* <ul>
* <li>POLLING_INTERVAL: how long to sleep between directory polls</li>
* <li>BUNDLE_SIZE: how many files to group in a bundle</li>
* <li>INPUT_DATA: one or more directories to pull files from</li>
* </ul>
*/
protected void configurePlace() {
pollingInterval = configG.findIntEntry("POLLING_INTERVAL", pollingInterval);
bundleSize = configG.findIntEntry("BUNDLE_SIZE", DEFAULT_BUNDLE_SIZE);
List<String> params = configG.findEntries("INPUT_DATA");
inputDataDirs = params.toArray(new String[0]);
}
/**
* Shutdown the directory monitoring threads and close resources
*/
@Override
public void shutDown() {
for (FileDataServer fileDataServer : theDataServer) {
logger.info("*** Stopping FilePickUpPlace ");
fileDataServer.shutdown();
}
super.shutDown();
}
/**
* Pause the DataServers
*/
@Override
public void pause() {
for (FileDataServer i : theDataServer) {
logger.info("*** Pausing {} for {}", i.getClass().getName(), getClass().getName());
i.pause();
}
}
/**
* Unpause the DataServers
*/
@Override
public void unpause() {
for (FileDataServer i : theDataServer) {
logger.info("*** Unpausing {} for {}", i.getClass().getName(), getClass().getName());
i.unpause();
}
}
/**
* Check the status of the DataServers
*
* @return true if any data server is paused, false otherwise
*/
@Override
public boolean isPaused() {
for (FileDataServer i : theDataServer) {
if (i.isPaused()) {
return true;
}
}
return false;
}
/**
* For each input directory start a new server thread.
*/
@SuppressWarnings("ThreadPriorityCheck")
public void startDataServer() {
for (int i = 0; i < inputDataDirs.length; i++) {
FileDataServer fds = new FileDataServer(inputDataDirs[i], this, pollingInterval);
// Tell it how many files to pick up at a time
fds.setBundleSize(bundleSize);
// Set priority below agent processing
fds.setPriority(Thread.NORM_PRIORITY - 1);
// It's a thread so use the start method
fds.start();
// Add it to our list
theDataServer.add(fds);
}
}
}