DropOffPlace.java
package emissary.output;
import emissary.config.ConfigUtil;
import emissary.config.Configurator;
import emissary.core.DataObjectFactory;
import emissary.core.Form;
import emissary.core.IBaseDataObject;
import emissary.directory.DirectoryEntry;
import emissary.output.filter.IDropOffFilter;
import emissary.place.EmptyFormPlace;
import emissary.place.ServiceProviderPlace;
import emissary.util.DataUtil;
import emissary.util.DisposeHelper;
import emissary.util.ShortNameComparator;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
/**
* DropOffPlace manages the output from the system It has evolved into a controller of sorts with way too many options,
* that controls which types of output are desired and called the appropriate output helper for the desired output.
**/
public class DropOffPlace extends ServiceProviderPlace implements EmptyFormPlace {
protected boolean doSynchronized = false;
protected Set<String> elideContentForms;
protected Set<String> noNukeForms;
protected List<IDropOffFilter> outputFilters = new ArrayList<>();
protected boolean failurePolicyTerminate = true;
protected DropOffUtil dropOffUtil;
private boolean outputCompletionPayloadSize = false;
/**
* Primary place constructor
*
* @param configInfo our config stuff from the startup
* @param dir string name of the directory to register into
* @param placeLoc string form of our key
*/
public DropOffPlace(final String configInfo, final String dir, final String placeLoc) throws IOException {
super(configInfo, dir, placeLoc);
configurePlace();
}
/**
* Test form of constructor
*/
public DropOffPlace(final String configInfo) throws IOException {
this(configInfo, "DropOffPlace.example.com:8001");
}
/**
* Test form of constructor
*/
protected DropOffPlace(final String configInfo, final String placeLocation) throws IOException {
super(configInfo, placeLocation);
configurePlace();
}
public DropOffPlace(final Configurator configInfo) throws IOException {
this.configG = configInfo;
configurePlace();
}
/**
* Constructor for hooking in to all the defaults
*/
public DropOffPlace() throws IOException {
configurePlace();
}
/**
* Setup configuration items we need and build the output filter
*/
protected void configurePlace() {
// Set configuration info on file paths
this.dropOffUtil = new DropOffUtil(configG);
this.doSynchronized = configG.findBooleanEntry("SYNCHRONIZED_PROCESS", false);
this.failurePolicyTerminate = configG.findBooleanEntry("FAILURE_TERMINATES_CHAIN", true);
this.outputCompletionPayloadSize = configG.findBooleanEntry("OUTPUT_COMPLETION_PAYLOAD_SIZE", false);
// Build and store all the filter that are desired IN THE ORDER SPECIFIED
final List<String> filterClasses = configG.findEntries("OUTPUT_FILTER");
initializeFilters(filterClasses);
}
/**
* Start up the requested filter
*
* @param filterClasses the name:class values of the configured filter for this drop off
*/
protected void initializeFilters(final List<String> filterClasses) {
for (final String entry : filterClasses) {
final String name;
final String clazz;
Configurator filterConfig = null;
final int colpos = entry.indexOf(':');
if (colpos > -1) {
name = entry.substring(0, colpos);
clazz = entry.substring(colpos + 1);
final String filterConfigName = configG.findStringEntry(name);
if (filterConfigName != null) {
try {
filterConfig = ConfigUtil.getConfigInfo(filterConfigName);
} catch (IOException configError) {
logger.warn("Specified filter configuration {} cannot be loaded", filterConfigName);
continue;
}
}
} else {
name = null;
clazz = entry;
}
try {
final Object filter = emissary.core.Factory.create(clazz);
if (filter != null && filter instanceof IDropOffFilter) {
final IDropOffFilter f = (IDropOffFilter) filter;
f.initialize(configG, name, filterConfig);
addFilter(f);
} else {
logger.error("Misconfigured filter {} is not an IDropOffFilter instance, ignoring it", clazz);
}
} catch (RuntimeException ex) {
logger.error("Unable to create or initialize {}", clazz, ex);
}
}
// Collect the set of content types to elide
this.elideContentForms = configG.findEntriesAsSet("ELIDE_CONTENT");
// collect the set of no-nuke forms
this.noNukeForms = configG.findEntriesAsSet("NO_NUKE_FORM");
if (logger.isInfoEnabled()) {
logger.debug("Setting ELIDE_CONTENT forms to " + this.elideContentForms);
final StringBuilder sb = new StringBuilder("Output Filters:");
if (this.outputFilters.size() > 0) {
for (final IDropOffFilter f : this.outputFilters) {
sb.append(" ").append(f.getFilterName()).append("(").append(f.getClass().getName()).append(")");
}
} else {
sb.append(" NONE!");
}
logger.info(sb.toString());
}
if (logger.isDebugEnabled()) {
final IBaseDataObject fakePayload = DataObjectFactory.getInstance(new byte[0], "fakename", Form.UNKNOWN);
for (final IDropOffFilter filter : getFilters()) {
final String name = filter.getFilterName();
final String spec = filter.getOutputSpec();
logger.debug("Adding filter={}, spec={}, sample={}, class={}", name, spec, this.dropOffUtil.getPathFromSpec(spec, fakePayload),
filter.getClass().getSimpleName());
}
}
}
/** {@inheritDoc } */
@Override
public void shutDown() {
super.shutDown();
for (final IDropOffFilter filter : this.outputFilters) {
logger.debug("Shutdown filter {}", filter.getFilterName());
filter.close();
}
}
/**
* "HD" agent calls this method when visiting the place. If you use {@link emissary.core.MobileAgent} this method is
* never called. This method overrides {@link ServiceProviderPlace} and allows this processing place to have access to
* all payloads wanting to be dropped off in a single list.
*
* @param payloadList list of IBaseDataObject from an {@link emissary.core.HDMobileAgent}
*/
@Override
public List<IBaseDataObject> agentProcessHeavyDuty(final List<IBaseDataObject> payloadList) throws Exception {
logger.debug("Entering DropOffPlace.agentProcessHeavyDuty with {} payload items", payloadList.size());
// Prepare each incoming payload object
for (final IBaseDataObject d : payloadList) {
try {
// checking to see if any object in the tree is marked as not outputable
if (!d.isOutputable()) {
logger.info("Skipping object since it is not able to be output ID:{}", this.dropOffUtil.getBestId(d, payloadList.get(0)));
return Collections.emptyList();
}
// Process the payload item with HDcontext=true
processData(d, true);
} catch (RuntimeException e) {
logger.error("Place.process threw:", e);
d.addProcessingError("agentProcessHD(" + myKey + "): " + e);
if (!d.currentForm().equals(Form.ERROR)) {
d.pushCurrentForm(Form.ERROR);
}
}
}
// Prepare the data and metadata for filter output
final Map<String, Object> filterParams = new HashMap<>();
preFilterHook(payloadList, filterParams);
// Run the filter on the output, indicating that
// the records are pre-sorted, if the filter cares
runOutputFilters(payloadList, filterParams);
// Any cleanup operations needed
postFilterHook(payloadList, filterParams);
if (!payloadList.isEmpty()) {
// Should have been sorted by the prefilter hook
// Just report the TLD object ID
final IBaseDataObject tld = payloadList.get(0);
if (outputCompletionPayloadSize && tld.hasContent()) {
logger.info(
"Finished DropOff for object {}, with external id: {}, with total processing time: {}ms, with filetype: {}, payload size: {} bytes",
tld.getInternalId(), this.dropOffUtil.getBestId(tld, tld),
Duration.between(tld.getCreationTimestamp(), Instant.now()).toMillis(),
tld.getFileType(), tld.getChannelSize());
} else {
logger.info("Finished DropOff for object {}, with external id: {}, with total processing time: {}ms, with filetype: {}",
tld.getInternalId(), this.dropOffUtil.getBestId(tld, tld),
Duration.between(tld.getCreationTimestamp(), Instant.now()).toMillis(),
tld.getFileType());
}
}
// Execute 'Dispose Runnables' to tidy up resources used with SeekableByteChannelFactory implementations
DisposeHelper.execute(payloadList);
// This place does not sprout, return an empty list
return Collections.emptyList();
}
/**
* Called by MobileAgent through ServiceProviderPlace to handle a single payload
*
* @param tData the payload to work on
*/
@Override
public void process(final IBaseDataObject tData) {
if (DataUtil.isEmpty(tData)) {
logger.warn("null/empty data object");
return;
}
// checking to see if the object is marked as not outputable
if (!tData.isOutputable()) {
logger.warn("Skipping object since it is not able to be output ID:{}", this.dropOffUtil.getBestId(tData, tData));
return;
}
// synchronization can be set by config file entry
if (this.doSynchronized) {
synchronized (this) {
processData(tData, false);
}
} else {
processData(tData, false);
}
// Execute 'Dispose Runnables' to tidy up resources used with SeekableByteChannelFactory implementations
DisposeHelper.execute(tData);
}
/**
* Prepare a list of payload object to be filtered
*
* @param payloadList the list of items that were eligible for output
* @param filterParams metadata needed for the output filter
*/
public void preFilterHook(final List<IBaseDataObject> payloadList, final Map<String, Object> filterParams) {
// Sort the list of records
Collections.sort(payloadList, new ShortNameComparator());
filterParams.put(IDropOffFilter.PRE_SORTED, Boolean.TRUE);
filterParams.put(IDropOffFilter.TLD_PARAM, payloadList.get(0));
// Prepare the metadata
this.dropOffUtil.processMetadata(payloadList);
}
/**
* Clean up after all filter are done
*
* @param payloadList the list of items that were eligible for output
* @param filterParams metadata needed for the output filter
*/
public void postFilterHook(final List<IBaseDataObject> payloadList, final Map<String, Object> filterParams) {
// remove the current forms we used or could have used
for (final IBaseDataObject dataObject : payloadList) {
// Save off no-nuke forms
final List<String> saveForms = new ArrayList<>();
for (final String nnf : this.noNukeForms) {
if (dataObject.searchCurrentForm(nnf) > -1) {
saveForms.add(nnf);
}
}
// nuke 'em
this.nukeMyProxies(dataObject);
// Restore the no-nukes
for (final String sf : saveForms) {
dataObject.pushCurrentForm(sf);
}
}
}
/**
* Internal method to process a single data object
*
* @param tData the payload to work on or prepare
* @param haveList true if in HD context
*/
protected void processData(final IBaseDataObject tData, final boolean haveList) {
logger.debug("DropOff is working on {}, current form is {}", tData.shortName(), tData.getAllCurrentForms());
final StringBuilder poppedForms = new StringBuilder();
String prevBin = "";
// skip the I/O for some types for all filter
for (int i = 0; i < tData.currentFormSize(); i++) {
final String cf = tData.currentFormAt(i);
if (this.elideContentForms.contains(cf)) {
tData.setData(("[[ " + tData.getAllCurrentForms() + " content elided in DropOffPlace. ]]").getBytes());
}
}
// Write out data for all the destinations we area proxy for, popping
// them off the stack as they are handled.
final Set<String> serviceProxies = getProxies();
final Set<String> cfSet = new HashSet<>();
for (int i = 0; i < tData.currentFormSize(); i++) {
final String cf = tData.currentFormAt(i);
if (serviceProxies.contains(cf) || serviceProxies.contains("*")) {
// Record extra drop offs in the transform history since
// we are taking precedence over the normal agent/directory mechanism
// Do this before popping the extra destination so that it
// is not just lost forever. Since the agent appends
// one entry to the history when it sends the agent here,
// that's the top current form and
// we don't want to duplicate that one
if (!prevBin.equals(cf) && (i > 0) && !cfSet.contains(cf) && !("UNKNOWN".equals(cf) || cf.endsWith("-PROCESSED"))) {
// e.g.: [dataType].DROP_OFF.IO.host.dom:port/DropOffPlace
final DirectoryEntry de = getDirectoryEntry();
de.setDataType("[" + cf + "]");
tData.appendTransformHistory(de.getKey());
}
// Accumulate forms we have handled in poppedForms
if (poppedForms.length() > 0) {
poppedForms.append(" ");
}
poppedForms.append(cf);
cfSet.add(cf);
prevBin = cf;
}
}
// Record the list of forms
tData.setParameter("POPPED_FORMS", poppedForms.toString());
// Do the output filtering now if we aren't in HD mode
if (!haveList) {
// There currently are no extra params needed
final Map<String, Object> filterParams = new HashMap<>();
runOutputFilters(tData, filterParams);
// Actually remove the current forms we used or could have used
this.nukeMyProxies(tData);
logger.debug("DropOff finished with {}", tData.shortName());
}
}
/**
* Run all the output filter
*
* @param target either IBaseDataObject or List thereof
* @param filterParams other parameters that filter need
*/
@SuppressWarnings("unchecked")
protected void runOutputFilters(final Object target, final Map<String, Object> filterParams) {
IBaseDataObject doTarget = null;
List<IBaseDataObject> listTarget = null;
if (target instanceof IBaseDataObject) {
doTarget = (IBaseDataObject) target;
} else if (target instanceof List) {
listTarget = (List<IBaseDataObject>) target;
} else {
logger.error("Cannot run filter on {}", target.getClass().getName());
return;
}
// Write output onto each of the filter that have been
// configured, as long as they work
for (final IDropOffFilter filter : this.outputFilters) {
final long start = System.currentTimeMillis();
// call the filter to output its data
int filterStatus = IDropOffFilter.STATUS_FAILURE;
try {
if (listTarget != null && filter.isOutputtable(listTarget)) {
filterStatus = filter.filter(listTarget, filterParams);
} else if (doTarget != null && filter.isOutputtable(doTarget)) {
filterStatus = filter.filter(doTarget, filterParams);
} else {
logger.debug("Filter {} not Outputtable for {}", filter.getFilterName(), listTarget != null ? "list" : "single payload");
filterStatus = IDropOffFilter.STATUS_SUCCESS;
}
logger.debug("Filter {} took {}s - {}", filter.getFilterName(), ((System.currentTimeMillis() - start) / 1000.0), filterStatus);
} catch (RuntimeException e) {
logger.error("Filter {} failed", filter.getFilterName(), e);
}
if ((filterStatus != IDropOffFilter.STATUS_SUCCESS) && this.failurePolicyTerminate) {
logger.error("DropOff Filter chain terminated at {} due to error return status", filter.getFilterName());
break;
}
}
}
/**
* Provide access to the filter
*
* @return a copy of the list of filter
*/
public List<IDropOffFilter> getFilters() {
return new ArrayList<>(this.outputFilters);
}
/**
* Provide access to filter names
*
* @return an array of filter names or an empty array if none
* @deprecated use {@link #getFilterNamesList()}
*/
@Deprecated
@SuppressWarnings("AvoidObjectArrays")
public String[] getFilterNames() {
return getFilterNamesList().toArray(new String[0]);
}
/**
* Provide access to filter names
*
* @return a list of filter names or an empty list if none
*/
public List<String> getFilterNamesList() {
final List<String> fnames = new ArrayList<>();
for (final IDropOffFilter f : this.outputFilters) {
fnames.add(f.getFilterName());
}
return fnames;
}
/**
* Provide access to filter by name
*
* @return the named filter or null if none by that name
*/
@Nullable
public IDropOffFilter getFilter(final String name) {
for (final IDropOffFilter f : this.outputFilters) {
if (f.getFilterName().equals(name)) {
return f;
}
}
return null;
}
public DropOffUtil getDropOffUtil() {
return this.dropOffUtil;
}
/**
* Add a filter
*
* @param filter the new filter to add, must already be configured and initialized
*/
public void addFilter(final IDropOffFilter filter) {
this.outputFilters.add(filter);
}
}