package emissary.output.filter;
import emissary.config.ConfigUtil;
import emissary.config.Configurator;
import emissary.core.IBaseDataObject;
import emissary.output.io.DateStampFilenameGenerator;
import emissary.output.roller.IJournaler;
import emissary.output.roller.JournaledCoalescer;
import emissary.output.roller.journal.KeyedOutput;
import emissary.pool.AgentPool;
import emissary.roll.RollManager;
import emissary.roll.Roller;
import emissary.util.io.FileNameGenerator;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static emissary.roll.Roller.CFG_ROLL_INTERVAL;
public abstract class AbstractRollableFilter extends AbstractFilter {
protected static final String configDir = System.getProperty(ConfigUtil.CONFIG_DIR_PROPERTY);
public static final String OUTPUT_PATH = "OUTPUT_PATH";
public static final String MAX_ROLL_FILE_SIZE = "MAX_FILE_SIZE";
public static final String ROLL_INTERVAL_UNIT = "ROLL_INTERVAL_UNIT";
protected String defaultOutputPath = "./out";
protected Path outputPath;
protected int maxRollFileSize = 250 * 1024 * 1024;
protected int maxOutputAppenders;
protected long rollInterval = 10L;
protected TimeUnit rollIntervalUnits = TimeUnit.MINUTES;
protected Roller roller;
protected IJournaler rollable;
protected FileNameGenerator fileNameGenerator;
protected boolean appendNewLine = true;
* Method to convert payload(s) to an output type
* @param list the payload list
* @param params the list of parameters
* @return the byte representation of the payload(s)
* @throws IOException if there is an issue outputting the data
public abstract byte[] convert(final List<IBaseDataObject> list, final Map<String, Object> params) throws IOException;
* Initialization phase hook for the filter with provided filter configuration
* @param theConfigG passed in configuration object, usually DropOff's config
* @param filterName the configured name of this filter or null for the default
* @param theFilterConfig the configuration for the specific filter
public void initialize(final Configurator theConfigG, final String filterName, final Configurator theFilterConfig) {
super.initialize(theConfigG, filterName, theFilterConfig);
* Initialize the output config vars
protected void initOutputConfig() {
this.defaultOutputPath = this.filterConfig.findStringEntry(OUTPUT_PATH, defaultOutputPath);
this.outputPath = Paths.get(this.defaultOutputPath);
* Initialize a file name generator
protected void initFilenameGenerator() {
this.fileNameGenerator =
new DateStampFilenameGenerator(StringUtils.isNotBlank(filterName) ? "." + filterName.toLowerCase(Locale.getDefault()) : "");
* Initialize the roll specific vars
protected void initRollConfig() {
this.maxRollFileSize = (int) this.filterConfig.findSizeEntry(MAX_ROLL_FILE_SIZE, maxRollFileSize);
this.maxOutputAppenders = this.filterConfig.findIntEntry(MAX_OUTPUT_APPENDERS, AgentPool.computePoolSize());
this.rollInterval = this.filterConfig.findLongEntry(CFG_ROLL_INTERVAL, rollInterval);
this.rollIntervalUnits = TimeUnit.valueOf(this.filterConfig.findStringEntry(ROLL_INTERVAL_UNIT, rollIntervalUnits.toString()));
* Create the local output directories
protected void setupLocalOutputDir() {
if (!Files.exists(this.outputPath)) {
logger.info("Attempting to create {} output directory, {}", getFilterName(), this.outputPath);
try {
} catch (IOException e) {
logger.error("Unable to create directory for {} output, exiting immediately.", getFilterName(), e);
* Create the {@link JournaledCoalescer} and {@link Roller}
protected void setupRoller() {
try {
this.rollable = createRollable();
this.roller = createRoller();
logger.info("Added Roller for {} running every {} {}(s) or on size {} (bytes).", getFilterName(), this.rollInterval,
this.rollIntervalUnits, this.maxRollFileSize);
} catch (Exception ex) {
if (ex instanceof InterruptedException) {
logger.error("Unable to instantiate Roller for handling {} file output", getFilterName(), ex);
* Create the rollable resource
* @return the specific journaled coalescer for the filter
* @throws IOException if there is an issue with the output path
* @throws InterruptedException if the journal is interrupted
protected IJournaler createRollable() throws IOException, InterruptedException {
return new JournaledCoalescer(this.outputPath, this.fileNameGenerator, this.maxOutputAppenders);
* Create the object to manage the state of the roll
* @return the roller object
protected Roller createRoller() {
return new Roller(this.maxRollFileSize, this.rollIntervalUnits, this.rollInterval, this.rollable);
* Add the roller to the roll manager
protected void manageRoller() {
public int filter(final IBaseDataObject payload, final Map<String, Object> params) {
return filter(Collections.singletonList(payload), params);
public int filter(final IBaseDataObject payload, final Map<String, Object> params, final OutputStream output) {
return filter(Collections.singletonList(payload), params, output);
public int filter(final List<IBaseDataObject> payloadList, final Map<String, Object> params) {
int code;
try (KeyedOutput ko = this.rollable.getOutput()) {
params.put("CONTENT_URI_" + getFilterName(), "file://" + ko.getFinalDestination().toString());
params.put("CONTENT_FORMAT_" + getFilterName(), getFilterName());
code = filter(payloadList, params, ko);
if (code == STATUS_SUCCESS) {
} catch (IOException e) {
logger.error("IOException during dropoff.", e);
return code;
public int filter(final List<IBaseDataObject> list, final Map<String, Object> params, final OutputStream output) {
// We subtract 1 from the list because the first element is currently assumed to be the TLD
list.get(0).putParameter("DESCENDANT_COUNT", list.size() - 1);
try {
output.write(convert(list, params));
if (appendNewLine) {
} catch (IOException iox) {
logger.warn("Could not write to log filter", iox);