WorkSpace.java
package emissary.pickup;
import emissary.client.EmissaryResponse;
import emissary.command.BaseCommand;
import emissary.command.FeedCommand;
import emissary.command.ServerCommand;
import emissary.core.EmissaryException;
import emissary.core.Namespace;
import emissary.core.NamespaceException;
import emissary.directory.DirectoryAdapter;
import emissary.directory.DirectoryEntry;
import emissary.directory.DirectoryPlace;
import emissary.directory.EmissaryNode;
import emissary.directory.IDirectoryPlace;
import emissary.directory.KeyManipulator;
import emissary.pool.AgentPool;
import emissary.server.EmissaryServer;
import emissary.server.mvc.adapters.WorkSpaceAdapter;
import emissary.util.Version;
import emissary.util.io.FileFind;
import org.apache.hc.core5.http.HttpStatus;
import org.eclipse.jetty.server.Server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.annotation.Nullable;
/**
* Recursively process input and distribute files to one or more remote PickUp client instances when they ask for a
* bundle of work to do.
* <p>
* The work bundles, emissary.pickup.WorkBundle objects, are placed on a queue for sending to consumers by the producer
* here. Once a WorkBundle has been requested and sent to a consumer (i.e. FilePickUpClient) it is placed on a pending
* queue tagged with the consumer id until the consumer notifies this WorkSpace that the work has been completed. If the
* consumer goes away without notification, then the work is moved back to the outbound queue and given to another
* consumer.
*/
public class WorkSpace implements Runnable {
/** Our logger */
protected static final Logger logger = LoggerFactory.getLogger(WorkSpace.class);
protected FeedCommand feedCommand;
/**
* Pickup places we will send to, loaded and modified by directory observation during runtime
*/
protected List<String> pups = new CopyOnWriteArrayList<>();
/**
* Initial pattern for finding pickup places
*/
protected String pattern = System.getProperty(CLZ + ".clientPattern", "*.FILE_PICK_UP_CLIENT.INPUT.*");
/** The directory observer for the pattern */
protected WorkSpaceDirectoryWatcher watcher;
// Process control
protected static final String CLZ = WorkSpace.class.getName();
protected boolean wantDirectories = Boolean.getBoolean(CLZ + ".includeDirectories");
protected boolean debug = Boolean.getBoolean(CLZ + ".debug");
protected boolean simpleMode = false;
protected String outputRootPath = System.getProperty("outputRoot", null);
protected String eatPrefix = System.getProperty("eatPrefix", null);
protected int numberOfBundlesToSkip = Integer.getInteger(CLZ + ".skip", 0);
protected boolean skipDotFiles = Boolean.getBoolean(CLZ + ".skipDotFiles");
protected boolean loop = false;
protected boolean useRetryStrategy = false;
protected static final int MAX_BUNDLE_RETRIES = 5;
protected PriorityQueue<PriorityDirectory> myDirectories = new PriorityQueue<>();
// Stats tracking, map of stats per remote pick up place
protected WorkSpaceStats stats = new WorkSpaceStats();
// Thread to notify clients that there is work to do
@Nullable
protected ClientNotifier notifier = null;
// Process control for collector thread
protected boolean timeToQuit = false;
protected boolean collectorThreadHasQuit = false;
protected boolean jettyStartedHere = false;
protected static final float MEM_THRESHOLD = 0.80f;
protected long loopPauseTime = 60000L;
protected long pendingHangTime = 600000L;
protected static final long NOTIFIER_PAUSE_TIME = 1000L;
protected int retryCount = 0;
protected boolean useFileTimestamps = false;
@Nullable
protected String projectBase = null;
/**
* How many file names to send per remote message, should be 10% or less of the size of the PickUpPlace.MAX_QUE Helps
* prevent blocking if it's not a factor of the PickUpPlace.MAX_QUE size
*/
protected int filesPerMessage = Integer.getInteger(CLZ + ".filesPerBundle", 5);
protected long maxBundleSize = Long.getLong(CLZ + ".maxSizePerBundle", -1);
// Metrics collection
protected long filesProcessed = 0;
protected long bundlesProcessed = 0;
protected long bytesProcessed = 0;
// Data tracking
protected String dataCaseId = System.getProperty("caseId", null);
protected boolean caseClosed = false;
// List of WorkBundle objects we are going to distribute
protected PriorityQueue<WorkBundle> outbound = new PriorityQueue<>();
// List of WorkBundle objects that are pending completion notice
// Keyed by bundleId to quickly remove items that are processed
// normally (the expected case)
protected Map<String, WorkBundle> pending = new HashMap<>();
// Keep track of files we have seen that are either outbound or pending
// so that we can avoid using file timestamps in the collector loop
protected Map<String, Long> filesSeen = new HashMap<>();
protected Map<String, Long> filesDone = new HashMap<>();
// Used to synchronize access to the pending and outbound queues
// One lock to rule them all
@SuppressWarnings("ConstantField")
protected final Object QLOCK = new Object(); // NOSONAR
// How we register in the namespace and advertise ourselves
protected static final String DEFAULT_WORK_SPACE_NAME = "WorkSpace";
protected String workSpaceName = DEFAULT_WORK_SPACE_NAME;
protected String workSpaceUrl;
protected String workSpaceKey;
/**
* Command line entry point for sending files to a list of remote TreePickUpPlaces
*/
public static void main(final String[] args) {
try {
final WorkSpace ws = new WorkSpace(BaseCommand.parse(FeedCommand.class, args));
ws.run();
logger.info("Workspace has completed the mission [ +1 health ].");
ws.shutDown();
} catch (Exception e) {
logger.error("Bad commandline arguments, check the FeedCommand help", e);
}
System.exit(0);
}
/**
* Construct the space
*/
@SuppressWarnings("CheckedExceptionNotThrown")
public WorkSpace() throws Exception {
}
public WorkSpace(FeedCommand feedCommand) {
this.feedCommand = feedCommand;
// TODO make setting of all parameters use setters
this.loop = this.feedCommand.isLoop();
this.setRetryStrategy(this.feedCommand.isRetry());
this.setFileTimestampUsage(this.feedCommand.isFileTimestamp());
this.workSpaceName = this.feedCommand.getWorkspaceName();
this.simpleMode = this.feedCommand.isSimple();
this.projectBase = this.feedCommand.getProjectBase().toAbsolutePath().toString();
this.pattern = this.feedCommand.getClientPattern();
this.outputRootPath = this.feedCommand.getOutputRoot();
this.eatPrefix = this.feedCommand.getEatPrefix();
this.filesPerMessage = this.feedCommand.getBundleSize();
this.dataCaseId = this.feedCommand.getCaseId();
this.setSkipDotFiles(this.feedCommand.isSkipDotFile());
this.wantDirectories = this.feedCommand.isIncludeDirs();
this.setSimpleMode(this.feedCommand.isSimple());
this.myDirectories.addAll(this.feedCommand.getPriorityDirectories());
if (null != this.feedCommand.getSort()) {
this.outbound = new PriorityQueue<>(11, this.feedCommand.getSort());
}
startJetty();
register();
initializeService();
}
protected void startJetty() {
if (!EmissaryServer.isInitialized() || !EmissaryServer.getInstance().isServerRunning()) {
// TODO investigate passing the feedCommand object directly to the serverCommand
List<String> args = new ArrayList<>();
args.add("-b");
args.add(projectBase);
args.add("--agents");
args.add("1"); // feed don't need agents
args.add("-h");
args.add(this.feedCommand.getHost());
args.add("-p");
args.add(String.valueOf(this.feedCommand.getPort()));
// feed doesn't make sense in standalone
args.add("-m");
args.add("cluster");
args.add("--flavor");
args.add(this.feedCommand.getFlavor());
if (this.feedCommand.isSslEnabled()) {
args.add("--ssl");
}
if (this.feedCommand.isSniHostCheckDisabled()) {
args.add("--disableSniHostCheck");
}
try {
// To ensure the feed command starts correctly, depends on a node-{feedCommand.getPort}.cfg file
ServerCommand cmd = BaseCommand.parse(ServerCommand.class, args);
Server server = EmissaryServer.init(cmd).startServer();
final boolean jettyStatus = server.isStarted();
if (!jettyStatus) {
logger.error("Cannot start the Workspace due to EmissaryServer not starting!");
} else {
logger.info("Workspace is up and running");
this.jettyStartedHere = true;
}
} catch (EmissaryException e) {
logger.error("Error starting EmissaryServer! WorkSpace will not start!", e);
}
} else {
logger.info("EmissaryServer is already running, Workspace should be up.");
}
}
protected void initializeService() {
// Load existing pickup client list
try {
this.pups.addAll(getPickUpClients(this.pattern));
logger.info("Found {} initial clients using {} in {}", this.pups.size(), this.pattern, getKey());
logger.debug("Initial pickups : {}", this.pups);
} catch (EmissaryException ex) {
logger.error("Cannot lookup pickup places using pattern {} in {}", this.pattern, getKey(), ex);
}
// Hook our observer onto the local directory,
// so we keep in sync with any changes to the clients
this.watcher = new WorkSpaceDirectoryWatcher(this.pattern);
try {
DirectoryAdapter.register(this.watcher);
} catch (EmissaryException ex) {
logger.error("Cannot register directory observer", ex);
}
// Must be before we start processing files and directories
initializeCase();
}
/**
* Start collection of files and monitoring system progress
*/
@Override
public void run() {
// Start the collection of files
startCollector();
// Start client notifier
startNotifier();
// Start monitoring the system until all work is done
monitorProgress();
logger.debug("Ending the WorkSpace run method");
}
/**
* Stop the work space
*/
public void stop() {
this.timeToQuit = true;
}
/**
* Shut down services that were started here
*/
@SuppressWarnings("CatchingUnchecked")
public void shutDown() {
stop();
if (this.jettyStartedHere) {
final EmissaryNode node = EmissaryServer.getInstance().getNode();
if (node.isValid()) {
try {
EmissaryServer.getInstance().stop();
} catch (Exception ex) {
logger.error("Jetty cannot be shutdown", ex);
}
}
try {
AgentPool.lookup().close();
} catch (NamespaceException ex) {
logger.debug("Agent pool namespace lookup failed", ex);
}
}
}
/**
* Set the pending hang time, how long to wait after outbound queue is empty
*
* @param pendingHangTime in millis
*/
public void setPendingHangTime(final long pendingHangTime) {
this.pendingHangTime = pendingHangTime;
}
/**
* Set the loop pause time when loop is true
*
* @param pauseTimeMillis pause interval in millis
*/
public void setPauseTime(final long pauseTimeMillis) {
this.loopPauseTime = pauseTimeMillis;
}
/**
* Set or unset looping
*/
public void setLoop(final boolean on) {
this.loop = on;
}
/**
* Get the value of the loop indicator
*/
public boolean getLoop() {
return this.loop;
}
/**
* Set the use of file timestamps to control whether a file is new enough to be added to the queue
*/
public void setFileTimestampUsage(final boolean value) {
this.useFileTimestamps = value;
}
/**
* Return whether fileTimestamps can be used for collector queue control
*/
public boolean getFileTimestampUsage() {
return this.useFileTimestamps;
}
/**
* Set Retry strategy on or off
*/
public void setRetryStrategy(final boolean on) {
this.useRetryStrategy = on;
}
/**
* Get value of the retry strategy indicator
*
* @return true if retry strategy in use
*/
public boolean getRetryStrategy() {
return this.useRetryStrategy;
}
/**
* Add directory at specified priority to be monitored
*/
public void addDirectory(final String dir, final int priority) {
addDirectory(new PriorityDirectory(dir, priority));
}
/**
* Add specified PriorityDirectory object to be monitored
*/
public void addDirectory(final PriorityDirectory dir) {
this.myDirectories.add(dir);
logger.debug("Adding input directory {}", dir);
}
public List<String> getDirectories() {
final List<String> l = new ArrayList<>();
final PriorityDirectory[] pds = this.myDirectories.toArray(new PriorityDirectory[0]);
Arrays.sort(pds);
for (final PriorityDirectory pd : pds) {
l.add(pd.toString());
}
return l;
}
/**
* Set directory processing flag. When true directory entries are retrieved from the input area just like normal files.
*
* @see emissary.util.io.FileFind
* @param on the new value for directory retrieval
*/
public void setDirectoryProcessing(final boolean on) {
this.wantDirectories = on;
}
/**
* Reset the eatprefix for this workspace
*
* @param prefix the new prefix
*/
public void setEatPrefix(final String prefix) {
logger.debug("Reset eatPrefix to {}", prefix);
this.eatPrefix = prefix;
normalizeEatPrefix();
}
/**
* Make sure the eatPrefix is in canonical form
*/
protected void normalizeEatPrefix() {
if (this.eatPrefix != null && this.eatPrefix.contains("//")) {
this.eatPrefix = this.eatPrefix.replaceAll("/+", "/");
}
}
/**
* Reset the outputRoot
*
* @param value the new outputRoot value
*/
public void setOutputRoot(final String value) {
logger.debug("Reset outputRoot to {}", value);
this.outputRootPath = value;
}
/**
* Get the value of the configured outputRoot
*/
public String getOutputRoot() {
return this.outputRootPath;
}
/**
* Reset the case id
*
* @param value the new value for caseId
*/
public void setCaseId(final String value) {
logger.debug("Reset caseId to {}", value);
this.dataCaseId = value;
}
/**
* Reset the skipDotFiles flag
*
* @param value the new value for the skipDotFiles flag
*/
public void setSkipDotFiles(final boolean value) {
this.skipDotFiles = value;
}
/**
* Set the debug flag
*
* @param value the new value for the debug flag
*/
public void setDebugFlag(final boolean value) {
this.debug = value;
}
/**
* Set the simple mode flag
*
* @param value the new value for the flag
*/
public void setSimpleMode(final boolean value) {
this.simpleMode = value;
}
/**
* Get the value of the simple mode flag
*/
public boolean getSimpleMode() {
return this.simpleMode;
}
/**
* Set the pattern for finding pickup clients
*
* @param thePattern the new pattern
* @see emissary.directory.KeyManipulator#gmatch(String,String)
*/
public void setPattern(@Nullable final String thePattern) throws Exception {
if ((this.pattern != null) && this.pattern.equals(thePattern)) {
logger.debug("The pattern is already set to {}", thePattern);
return;
}
this.pattern = thePattern;
// Clear out old pick up clients
logger.warn("Clearing client list so we can look for new pattern {} in {}", thePattern, getKey());
this.pups.clear();
// Find new ones
this.pups.addAll(getPickUpClients(this.pattern));
// Set up a new observer on the directory
if (this.watcher != null) {
DirectoryAdapter.remove(this.watcher);
}
this.watcher = new WorkSpaceDirectoryWatcher(this.pattern);
DirectoryAdapter.register(this.watcher);
}
/**
* Configure the Processor. The *.cfg file is optional
*/
protected void register() {
final EmissaryNode node = EmissaryServer.getInstance().getNode();
if (node.isValid()) {
this.workSpaceUrl = node.getNodeScheme() + "://" + node.getNodeName() + ":" + node.getNodePort() + "/" + this.workSpaceName;
} else {
this.workSpaceUrl = "http://localhost:8001/" + this.workSpaceName;
logger.warn("WorkSpace is not running in a valid emissary node. Using URL {}", this.workSpaceUrl);
}
this.workSpaceKey = "WORKSPACE.WORK_SPACE.INPUT." + this.workSpaceUrl;
normalizeEatPrefix();
// Need to bind so WorkSpaceTakeWorker can find us on the callback
// The url we use to bind is in the advertisement to clients
Namespace.bind(this.workSpaceUrl, this);
}
/**
* Get the initial list of pickup client places from the local directory. Our observer will keep us in sync after this
* initial pull. This method does not cause clients to be notified.
*
* @param thePattern the key pattern to match for places of interest
*/
protected Set<String> getPickUpClients(final String thePattern) throws EmissaryException {
final Set<String> thePups = new HashSet<>();
final IDirectoryPlace dir = DirectoryPlace.lookup();
final List<DirectoryEntry> list = dir.getMatchingEntries(thePattern);
for (final DirectoryEntry d : list) {
thePups.add(d.getKey());
logger.info("Adding pickup client {}", d.getKey());
}
logger.debug("Found {} initial pickup client entries", thePups.size());
return thePups;
}
/**
* Start the file collector threads, one per directory
*/
public void startCollector() {
for (final PriorityDirectory pd : this.myDirectories) {
final WorkSpaceCollector collector = new WorkSpaceCollector(pd);
final Thread collectorThread = new Thread(collector, "WorkSpace Collector " + pd);
collectorThread.setDaemon(true);
collectorThread.start();
logger.debug("Started WorkSpace Collector thread on {}", pd);
}
}
/**
* Start the client notification Thread*
*/
public void startNotifier() {
this.notifier = new ClientNotifier();
final Thread notifierThread = new Thread(this.notifier, "WorkSpace Client Notifier");
notifierThread.setDaemon(true);
notifierThread.start();
logger.debug("Started Client Notifier thread");
}
/**
* Rotate the list of pickups so that the same old one isn't always first on the list.
*/
protected void rotatePickUps() {
// Move element(0) to the tail and shift all to the left
Collections.rotate(this.pups, -1);
}
/**
* Notify pick up place that data is available
*
* @return number of successful notices
*/
protected int notifyPickUps() {
int successCount = 0;
for (final String pup : this.pups) {
final boolean status = notifyPickUp(pup);
if (status) {
successCount++;
}
if (getOutboundQueueSize() == 0) {
break;
}
}
logger.debug("Notified {} of {} pickup places", successCount, this.pups.size());
return successCount;
}
/**
* Add one pickup and notify of work to be done
*/
protected void addPickUp(final String pup) {
if (!this.pups.contains(pup)) {
this.pups.add(pup);
if (logger.isDebugEnabled()) {
logger.debug("Adding pickup {}, new size={}: {}", pup, this.pups.size(), this.pups);
}
} else {
logger.debug("Not adding {} already on list size {}", pup, this.pups.size());
}
}
/**
* Notify one pickup
*
* @param pup the key of the one to notify
*/
protected boolean notifyPickUp(final String pup) {
final WorkSpaceAdapter tpa = new WorkSpaceAdapter();
logger.debug("Sending notice to {}", pup);
boolean notified = false;
int tryCount = 0;
while (!notified && tryCount < 5) {
final EmissaryResponse status = tpa.outboundOpenWorkSpace(pup, this.workSpaceKey);
// TODO Consider putting this method in the response
if (status.getStatus() != HttpStatus.SC_OK) {
logger.warn("Failed to notify {} on try {}: {}", pup, tryCount, status.getContentString());
try {
Thread.sleep((tryCount + 1) * 100L);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
} else {
notified = true;
}
tryCount++;
}
if (logger.isInfoEnabled()) {
logger.info("Notified {} in {} attempts: {}", pup, tryCount, (notified ? "SUCCESS" : "FAILED"));
}
return notified;
}
/**
* Return the registration key for this work space
*/
public String getKey() {
return this.workSpaceKey;
}
/**
* Return the workspace name
*/
public String getNamespaceName() {
return this.workSpaceName;
}
/**
* Remove a pickup, if it had work bundles pending completion transfer them back to the outbound queue
*
* @param remoteKey the directory observer string key that was removed
*/
protected void removePickUp(final String remoteKey) {
this.pups.remove(remoteKey);
if (logger.isDebugEnabled()) {
logger.debug("Removed pickup {}, size={}: {}", remoteKey, this.pups.size(), this.pups);
}
int pendCount = 0;
final String remoteName = KeyManipulator.getServiceHost(remoteKey);
synchronized (this.QLOCK) {
// NB: no enhanced for loop with Iterator.remove()
for (Iterator<String> i = this.pending.keySet().iterator(); i.hasNext();) {
final String id = i.next();
final WorkBundle wb = this.pending.get(id);
if (remoteName.equals(wb.getSentTo())) {
i.remove(); // remove from pending
wb.setSentTo(null); // clear in progress indicator
this.retryCount++;
if (wb.incrementErrorCount() <= MAX_BUNDLE_RETRIES) {
logger.debug("Removing pending bundle {} from pending pool, re-adding to outbound with errorCount={}", wb.getBundleId(),
wb.getErrorCount());
addOutboundBundle(wb); // send to outbound again
pendCount++;
// Set overall counts back to normal
this.bundlesProcessed--;
} else {
logger.error("Bundle {} associated with too many failures, permanently discarding", wb);
}
}
}
}
if (pendCount > 0) {
logger.info("Moved {} items back to outbound queue from {}", pendCount, remoteName);
}
}
/**
* Method called by remote PickUp client instances when they are ready to receive data from this WorkSpace Access via
* emissary.comms.http.WorkSpaceApapter
*
* @param remoteKey key of the requesting PickUp place
* @return WorkBundle at the head of the list or null if empty
*/
public WorkBundle take(final String remoteKey) {
final String remoteName = KeyManipulator.getServiceHost(remoteKey);
WorkBundle item;
synchronized (this.QLOCK) {
if (getOutboundQueueSize() == 0) {
// Empty WorkBundle will let them know to stop asking us
logger.info("Sent shutdown msg to {}", remoteName);
this.stats.shutDownSent(remoteName);
item = new WorkBundle();
} else {
// transfer from outbound to pending list and
// record who the work was given to track
// completion status
this.stats.bump(remoteName);
item = this.outbound.poll();
item.setSentTo(remoteName);
this.pending.put(item.getBundleId(), item);
logger.info("Gave bundle {} to {}", item, remoteName);
final WorkBundle nextItem = this.outbound.peek();
if (nextItem != null && logger.isInfoEnabled()) {
logger.info("After take: new top differs to prior by [oldest/youngest/size]=[{}/{}/{}]",
nextItem.getOldestFileModificationTime() - item.getOldestFileModificationTime(),
nextItem.getYoungestFileModificationTime() - item.getYoungestFileModificationTime(),
nextItem.getTotalFileSize() - item.getTotalFileSize());
}
}
}
return item;
}
/**
* Add a new bundle of work to the pending queue
*
* @param wb the new bundle
*/
protected void addOutboundBundle(final WorkBundle wb) {
int sz;
synchronized (this.QLOCK) {
this.bundlesProcessed++;
sz = this.outbound.size();
this.outbound.add(wb);
addFilesSeen(wb.getFileNameList());
}
if (logger.isInfoEnabled()) {
logger.info("Adding workbundle {} size {} filesSeen {}", wb, (sz + 1), this.filesSeen.size());
}
}
/**
* Show items that are pending completion (debug)
*
* @deprecated use {@link #showPendingItemsList()}
*/
@Deprecated
@SuppressWarnings("AvoidObjectArrays")
public String[] showPendingItems() {
return showPendingItemsList().toArray(new String[0]);
}
/**
* Show items that are pending completion (debug)
*/
public List<String> showPendingItemsList() {
final List<String> list = new ArrayList<>();
synchronized (this.QLOCK) {
for (final Map.Entry<String, WorkBundle> entry : this.pending.entrySet()) {
list.add(entry.getValue().toString());
}
}
return list;
}
/**
* Clear the pending queue
*
* @return number of items removed
*/
public int clearPendingQueue() {
final int size = getPendingQueueSize();
if (size > 0) {
synchronized (this.QLOCK) {
logger.debug("Clearing pending queue of {} items", size);
for (final Map.Entry<String, WorkBundle> entry : this.pending.entrySet()) {
removeFilesSeen(entry.getValue().getFileNameList());
}
this.pending.clear();
logger.debug("Cleared filesSeen leaving {} items", this.filesSeen.size());
}
}
return size;
}
/**
* Receive notice that a bundle was completed Normally called from emissary.server.mvc.adapters.WorkSpaceAdapter when a
* bundle completion message is received from the remote client doing the processing.
*
* @param remoteName the name of the place that did the processing
* @param bundleId the unique id of the bundle that was completed
* @param itWorked true if processed normally
* @return true if the item was removed from the pending list
*/
public boolean workCompleted(final String remoteName, final String bundleId, final boolean itWorked) {
WorkBundle item;
synchronized (this.QLOCK) {
item = this.pending.remove(bundleId);
if (item != null) {
addFilesDone(item.getFileNameList());
removeFilesSeen(item.getFileNameList());
logger.debug("Removed {} from filesSeen leaving {}", item.size(), this.filesSeen.size());
}
}
if (item == null) {
logger.info("Unknown bundle completed: {}", bundleId);
} else if (!itWorked) {
item.setSentTo(null); // clear in progress indicator
if (item.incrementErrorCount() > MAX_BUNDLE_RETRIES) {
logger.error("Bundle {} has too many errors, permanently discarded", item);
} else {
addOutboundBundle(item); // send to outbound again
}
}
if (logger.isDebugEnabled()) {
logger.debug("Bundle {} completed by {}{}", bundleId, remoteName,
(itWorked ? "" : (" but failed for the " + (item != null ? item.getErrorCount() : -1) + " time")));
}
return item != null;
}
/**
* begin the case processing, does nothing in this implementation
*/
protected void initializeCase() {
// Don't care in this implementation
logger.debug("In base initializeCase implementation (do nothing)");
}
/**
* end the case processing does nothing in this implementation
*/
protected void closeCase() {
// Don't care in this implementation
this.caseClosed = true;
logger.debug("In base closeCase implementation (do nothing)");
}
/**
* handle getting a directory in the recursive descent
*
* @param dir File for which isDirectory returns true
*/
protected void processDirectory(final File dir) {
// We don't care in this implementation
logger.debug("got a directory processDirectory({})", dir);
}
/**
* Add each fileName and its respective lastModifiedDate to the filesSeen list
*
* @param fileNames the collection of file name strings to add
*/
protected void addFilesSeen(final Collection<String> fileNames) {
for (final String fn : fileNames) {
this.filesSeen.put(fn, getFileModificationDate(fn));
}
}
/**
* Add each fileName and its respective lastModifiedDate to the filesDone list
*
* @param fileNames the collection of file name strings to add
*/
protected void addFilesDone(final Collection<String> fileNames) {
for (final String fn : fileNames) {
this.filesDone.put(fn, getFileModificationDate(fn));
}
}
/**
* Remove each fileName from the filesSeen list without regard to the timestamp
*
* @param fileNames the collection of file name strings to remove
*/
protected void removeFilesSeen(final Collection<String> fileNames) {
for (final String fn : fileNames) {
this.filesSeen.remove(fn);
}
}
/**
* Lookup a lastModified date for a file
*
* @param fn the filename
* @return the long representing the date of last modification or 0L if an error, or it does not exist
*/
protected long getFileModificationDate(final String fn) {
return new File(fn).lastModified();
}
protected long getFileSize(final String fn) {
return new File(fn).length();
}
/**
* Monitoring progress of the WorkSpace. Indicate some stats once in a while and do not let the foreground thread
* terminate while there is still work on the outbound queue or the pending lists.
*/
protected void monitorProgress() {
long outboundEmptyTimestamp = -1L;
// Do while outbound or pending work exists or collector is
// still running
while (true) {
final int outboundSize = getOutboundQueueSize();
int pendingSize = getPendingQueueSize();
final boolean reallyQuit = this.timeToQuit && (outboundSize == 0) && (pendingSize == 0);
// Rmember when outbound becomes empty
if (outboundSize == 0 && outboundEmptyTimestamp == -1L) {
outboundEmptyTimestamp = System.currentTimeMillis();
} else if (outboundSize > 0 && outboundEmptyTimestamp > 0L) {
outboundEmptyTimestamp = -1L;
}
// See if it is time to give up on pending items
if ((outboundSize == 0) && !this.loop && ((outboundEmptyTimestamp + this.pendingHangTime) < System.currentTimeMillis())) {
if (logger.isInfoEnabled()) {
logger.info("Giving up on {} items due to timeout", pendingSize);
for (final Map.Entry<String, WorkBundle> entry : this.pending.entrySet()) {
logger.info("Pending item {}: {}", entry.getKey(), entry.getValue());
}
}
clearPendingQueue();
pendingSize = 0;
}
// All work is done and collector has finished
if (outboundSize + pendingSize == 0) {
if (reallyQuit) {
break;
}
publishStats();
}
// Else sleep a while
try {
for (int si = 0; si < 3000; si++) {
Thread.sleep(10L);
if (reallyQuit) {
break;
}
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
if (!this.timeToQuit) {
publishStats();
}
}
// Case closing actions
closeCase();
}
/**
* Output some information to the logger on what we have been doing lately
*/
public void publishStats() {
logger.info(getStatsMessage());
for (Iterator<String> i = this.stats.machinesUsed(); i.hasNext();) {
final String machine = i.next();
logger.info("Machine {} took {} bundles", machine, this.stats.getCountUsed(machine));
}
}
/**
* Return the current stats to the caller
*/
public String getStatsMessage() {
final int outboundSize = getOutboundQueueSize();
final int pendingSize = getPendingQueueSize();
return "WorkSpace has outbound=" + outboundSize + ", pending=" + pendingSize + ", total bundles / files / bytes = " + this.bundlesProcessed
+ " / " + this.filesProcessed + " / " + this.bytesProcessed + " , #clients=" + getPickUpPlaceCount();
}
/**
* Return how many files processed so far
*/
public long getFilesProcessed() {
return this.filesProcessed;
}
/**
* Return how many bytes processed so far
*/
public long getBytesProcessed() {
return this.bytesProcessed;
}
/**
* Return how many pickup places are being fed
*/
public int getPickUpPlaceCount() {
return this.pups.size();
}
/**
* Return how many bundles processed so far
*/
public long getBundlesProcessed() {
return this.bundlesProcessed;
}
/**
* Return size of outbound queue
*/
public int getOutboundQueueSize() {
synchronized (this.QLOCK) {
return this.outbound.size();
}
}
public int getRetriedCount() {
return this.retryCount;
}
/**
* Return size of pending completion queue
*/
public int getPendingQueueSize() {
synchronized (this.QLOCK) {
return this.pending.size();
}
}
/**
* Overridable point to get the version string for output periodically when looping
*
* @return the version info
*/
protected String getVersionString() {
return "Emissary version: " + new Version();
}
public class ClientNotifier implements Runnable {
/**
* Create the notifier Runnable
*/
public ClientNotifier() {}
@Override
public void run() {
while (true) {
final int qsize = getOutboundQueueSize();
if (qsize > 0) {
final long start = System.currentTimeMillis();
if (logger.isDebugEnabled()) {
logger.debug("ClientNotification starting with #clients={} outbound={}", getPickUpPlaceCount(), qsize);
}
notifyPickUps();
if (logger.isDebugEnabled()) {
final long end = System.currentTimeMillis();
logger.debug("ClientNotification took {}s for #clients={}", (end - start) / 1000.0, getPickUpPlaceCount());
}
}
try {
Thread.sleep(NOTIFIER_PAUSE_TIME);
rotatePickUps();
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
final int outboundSize = getOutboundQueueSize();
final int pendingSize = getPendingQueueSize();
if (WorkSpace.this.timeToQuit && (outboundSize == 0) && (pendingSize == 0) && WorkSpace.this.collectorThreadHasQuit) {
break;
}
}
logger.debug("Off the end of the ClientNotifier run loop");
}
}
/**
* A runnable to collect files into WorkBundles and put them on the outbound queue
*/
public class WorkSpaceCollector implements Runnable {
protected PriorityDirectory myDirectory;
/**
* Create the collector runnable
*/
public WorkSpaceCollector(final PriorityDirectory myDirectory) {
this.myDirectory = myDirectory;
}
/**
* Pull all the files into bundles, emit some stats, and notify the PickUp client instances to start work. When the list
* of file bundles is empty we can quit or loop around again.
*/
@Override
public void run() {
long versionOutputTime = System.currentTimeMillis();
long start;
long stop;
long minFileTime = 0L;
// Run the processing
long lastFileCollect = 0L;
int loopCount = 0;
logger.info("Running Workspace from {}", getVersionString());
do {
start = System.currentTimeMillis();
// every hour
if (start - versionOutputTime > 3600000) {
logger.info("Continuing Workspace from {}", getVersionString());
versionOutputTime = start;
}
final WorkBundle paths = new WorkBundle(WorkSpace.this.outputRootPath, WorkSpace.this.eatPrefix);
paths.setCaseId(WorkSpace.this.dataCaseId);
paths.setSimpleMode(getSimpleMode());
logger.debug("Processing files in {}", this.myDirectory.getDirectoryName());
final int collectCount =
collectFiles(this.myDirectory, WorkSpace.this.wantDirectories, paths, WorkSpace.this.numberOfBundlesToSkip, minFileTime,
WorkSpace.this.skipDotFiles);
// Set times, so we don't redistribute files next loop
// if configured to use timestamps
if (WorkSpace.this.useFileTimestamps) {
lastFileCollect = System.currentTimeMillis();
}
stop = System.currentTimeMillis();
loopCount++;
// We can only skip bundles on the first time through
WorkSpace.this.numberOfBundlesToSkip = 0;
logger.info("Collected {} file bundles in {}s in loop iteration {}, {} items in outbound queue", collectCount,
((stop - start) / 1000.0), loopCount, WorkSpace.this.outbound.size());
if ((collectCount == 0) && WorkSpace.this.loop) {
// Wait pause time seconds and try again if looping
try {
Thread.sleep(WorkSpace.this.loopPauseTime);
} catch (InterruptedException ioex) {
Thread.currentThread().interrupt();
}
continue;
}
// time shift for next loop if configured to use tstamps
if (WorkSpace.this.useFileTimestamps) {
minFileTime = lastFileCollect;
}
} while (WorkSpace.this.loop && !WorkSpace.this.timeToQuit);
logger.debug("Off the end of the WorkSpaceCollector run method");
WorkSpace.this.collectorThreadHasQuit = true;
}
/**
* Load WorkBundle objects into our linked list of bundles Also process all directories if so instructed
*
* @return count of how many bundles collected for outbound queue
*/
protected int collectFiles(final PriorityDirectory dir, final boolean wantDirectories, final WorkBundle basePath,
final int numberOfBundlesToSkipArg, final long minFileTime, final boolean skipDotFilesArg) {
int skipped = 0;
int collected = 0;
int fileCount = 0;
long bytesInBundle = 0;
try {
int ffOptions = FileFind.FILES_FLAG;
if (wantDirectories) {
ffOptions |= FileFind.DIRECTORIES_FLAG;
}
final FileFind ff = new FileFind(ffOptions);
final Iterator<?> f = ff.find(dir.getDirectoryName());
WorkBundle paths = new WorkBundle(basePath);
paths.setPriority(dir.getPriority());
paths.setSimpleMode(getSimpleMode());
while (f.hasNext()) {
// If the outbound queue has a lot of stuff pending
// and memory is getting tight, just to sleep until
// the situation eases
pauseCollector();
final File next = (File) f.next();
final String fileName = next.getPath();
// We should only be getting these if we asked for them.
// We should only use them if we are not resuming a previous run.
if (next.isDirectory() && numberOfBundlesToSkipArg == 0) {
logger.debug("Doing directory {}", fileName);
processDirectory(next);
continue;
}
// Can we read the file?
if (!next.isFile() && !next.canRead()) {
logger.debug("Cannot access file: {}", fileName);
continue;
}
// Skip dot files possibly
// TODO Maybe we want to change this to explicitly look for "." instead of isHidden
if (skipDotFilesArg && Files.isHidden(Paths.get(fileName))) {
logger.debug("Skipping dot file {}", fileName);
continue;
}
// Is file too old? (If we aren't configured to use
// tstamps minFileTime will always be 0L
if (next.lastModified() < minFileTime) {
continue;
}
synchronized (WorkSpace.this.QLOCK) {
if (WorkSpace.this.filesDone.containsKey(fileName)) {
WorkSpace.this.filesDone.remove(fileName);
continue;
} else if (WorkSpace.this.filesSeen.containsKey(fileName)
&& WorkSpace.this.filesSeen.get(fileName) == next.lastModified()) {
logger.debug("Skipping file already seen {}, touch file to force add", fileName);
continue;
}
}
logger.debug("Adding filename to bundle {}", fileName);
// add file to workbundle (at least 1)
if (workbundleHasRoom(paths, bytesInBundle)) {
logger.debug("Added file to workbundle: {}", fileName);
paths.addFileName(fileName, getFileModificationDate(fileName), getFileSize(fileName));
bytesInBundle += next.length();
WorkSpace.this.filesProcessed++; // overall
fileCount++; // this loop
WorkSpace.this.bytesProcessed += next.length(); // overall
}
// if bundle is full, create a new empty and
// move it to the outbound queue.
if (!workbundleHasRoom(paths, bytesInBundle)) {
logger.debug("Workbundle full, adding it to outbound queue");
if (skipped < numberOfBundlesToSkipArg) {
skipped++;
} else {
addOutboundBundle(paths);
collected++;
}
// create new empty work bundle
paths = new WorkBundle(basePath);
paths.setPriority(dir.getPriority());
paths.setSimpleMode(getSimpleMode());
bytesInBundle = 0;
}
} // end while f.hasNext()
// Send residual files, not a complete set perhaps
if (paths.size() > 0) {
if (skipped < numberOfBundlesToSkipArg) {
logger.info("Skipping last bundle");
} else {
addOutboundBundle(paths);
collected++;
}
}
// clear the files done list
synchronized (WorkSpace.this.QLOCK) {
WorkSpace.this.filesDone.clear();
}
} catch (Exception e) {
logger.error("System error", e);
return collected;
}
if (!WorkSpace.this.outbound.isEmpty()) {
logger.info("Processed {} files into {} bundles, skipping {} bundles.", fileCount, collected, skipped);
}
return collected;
}
/**
* Convenience method to check if there is room in the work bundle to add more files.
*
* @param bundle the bundle to check
* @param bytesInBundle the current count of bytes in the bundle.
* @return true if bundle does not exceed max byte size, or max file count.
*/
private boolean workbundleHasRoom(final WorkBundle bundle, final long bytesInBundle) {
// must have a min size of 1 file, but cannot be over the
// max byte size, or max file count
boolean bReturn = (bundle.size() <= 0)
|| (((WorkSpace.this.maxBundleSize <= -1) || (bytesInBundle < WorkSpace.this.maxBundleSize))
&& ((WorkSpace.this.filesPerMessage <= -1) || (bundle
.size() < WorkSpace.this.filesPerMessage)));
logger.debug("workbundle has room = {}", bReturn);
return bReturn;
}
/**
* Check memory (heap) usage and wait for it to go below the threshold. We must be able to collect at least 500 file
* bundles to trigger this mechanism.
*/
protected void pauseCollector() {
final int initialQueueSize = getOutboundQueueSize();
if (initialQueueSize < 500) {
return;
}
final long intv = 30000;
final MemoryMXBean mbean = ManagementFactory.getMemoryMXBean();
MemoryUsage heap = mbean.getHeapMemoryUsage();
int count = 0;
while ((((double) heap.getUsed() / (double) heap.getCommitted()) > MEM_THRESHOLD) && (getOutboundQueueSize() > 500)) {
logger.debug("Collection memory threshold exceeded {}", heap);
try {
Thread.sleep(intv);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
count++;
heap = mbean.getHeapMemoryUsage();
}
if (count > 0 && logger.isDebugEnabled()) {
logger.debug(
"Paused collector {} times for {}s waiting for memory usage to go below threshold {} resuming at {}, queueSize was/is={}/{}",
count, (intv / 1000), MEM_THRESHOLD, heap, initialQueueSize, getOutboundQueueSize());
}
}
}
/**
* Collect per pickup statistics for this run
*/
public static class WorkSpaceStats {
final Map<String, Integer> remoteMap = new HashMap<>();
final Set<String> shutDownSent = new HashSet<>();
/**
* Increment the bundle count for the machine when it takes one
*
* @param machine the remote pickup
*/
public void bump(final String machine) {
Integer count = this.remoteMap.get(machine);
if (count == null) {
count = 1;
} else {
count = count + 1;
}
this.remoteMap.put(machine, count);
}
/**
* Indicate that shutdown msg was sent to machine
*
* @param machine the remote name
*/
public void shutDownSent(final String machine) {
this.shutDownSent.add(machine);
}
/**
* Count how many machines got shut down msg
*/
public int getShutDownCount() {
return this.shutDownSent.size();
}
/**
* Iterate over set of machines used
*/
public Iterator<String> machinesUsed() {
return this.remoteMap.keySet().iterator();
}
/**
* Count of machines used
*/
public int getCountUsed(final String machine) {
final Integer count = this.remoteMap.get(machine);
return (count == null) ? 0 : count;
}
}
/**
* Watch the directory for changes to pickup up client places
*/
public class WorkSpaceDirectoryWatcher extends DirectoryAdapter {
/**
* Watch the directory for registrations that match pattern
*
* @param pattern the pattern to match
*/
public WorkSpaceDirectoryWatcher(final String pattern) {
super(pattern);
logger.debug("PickupClient pattern is {}", pattern);
}
/**
* Accept registration notices that match our pattern
*
* @param observableKey the reporting directory
* @param placeKey the key of the matching registered place
*/
@Override
public void placeRegistered(final String observableKey, final String placeKey) {
final String k = KeyManipulator.removeExpense(placeKey);
logger.debug("Registration message from {}", k);
if (WorkSpace.this.pups.contains(k) && WorkSpace.this.useRetryStrategy) {
// This covers the case where the pickup dies and restarts
// before the Heartbeat mechanism figures out there was
// a problem.
logger.info("Already known pickup {} must be reinitialized to clear pending work.", k);
removePickUp(k);
}
if (!WorkSpace.this.pups.contains(k)) {
logger.info("New pickup place {}", k);
}
// add to list and maybe send open msg. Dup places
// will not be added but might be re-notified
addPickUp(k);
}
/**
* Accept deregistration notices that match our pattern
*
* @param observableKey the reporting directory
* @param placeKey the key of the matching deregistered place
*/
@Override
public void placeDeregistered(final String observableKey, final String placeKey) {
final String k = KeyManipulator.removeExpense(placeKey);
logger.debug("DeRegistration message from {}", k);
if (!WorkSpace.this.pups.contains(k)) {
logger.info("Unknown pickup deregistered {}", k);
} else {
logger.info("Pickup place {} is gone", k);
if (WorkSpace.this.useRetryStrategy) {
removePickUp(k);
}
}
}
}
}