AbstractRollableFilter.java

package emissary.output.filter;

import emissary.config.ConfigUtil;
import emissary.config.Configurator;
import emissary.core.IBaseDataObject;
import emissary.output.io.DateStampFilenameGenerator;
import emissary.output.roller.IJournaler;
import emissary.output.roller.JournaledCoalescer;
import emissary.output.roller.journal.KeyedOutput;
import emissary.pool.AgentPool;
import emissary.roll.RollManager;
import emissary.roll.Roller;
import emissary.util.io.FileNameGenerator;

import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static emissary.roll.Roller.CFG_ROLL_INTERVAL;

public abstract class AbstractRollableFilter extends AbstractFilter {

    protected static final String configDir = System.getProperty(ConfigUtil.CONFIG_DIR_PROPERTY);

    public static final String OUTPUT_PATH = "OUTPUT_PATH";
    public static final String MAX_ROLL_FILE_SIZE = "MAX_FILE_SIZE";
    public static final String MAX_OUTPUT_APPENDERS = "MAX_OUTPUT_APPENDERS";
    public static final String ROLL_INTERVAL_UNIT = "ROLL_INTERVAL_UNIT";

    protected String defaultOutputPath = "./out";
    protected Path outputPath;
    protected int maxRollFileSize = 250 * 1024 * 1024;
    protected int maxOutputAppenders;
    protected long rollInterval = 10L;
    protected TimeUnit rollIntervalUnits = TimeUnit.MINUTES;
    protected Roller roller;
    protected IJournaler rollable;
    protected FileNameGenerator fileNameGenerator;
    protected boolean appendNewLine = true;

    /**
     * Method to convert payload(s) to an output type
     *
     * @param list the payload list
     * @param params the list of parameters
     * @return the byte representation of the payload(s)
     * @throws IOException if there is an issue outputting the data
     */
    public abstract byte[] convert(final List<IBaseDataObject> list, final Map<String, Object> params) throws IOException;

    /**
     * Initialization phase hook for the filter with provided filter configuration
     *
     * @param theConfigG passed in configuration object, usually DropOff's config
     * @param filterName the configured name of this filter or null for the default
     * @param theFilterConfig the configuration for the specific filter
     */
    @Override
    public void initialize(final Configurator theConfigG, final String filterName, final Configurator theFilterConfig) {
        super.initialize(theConfigG, filterName, theFilterConfig);
        initOutputConfig();
        initRollConfig();
        initFilenameGenerator();
        setupLocalOutputDir();
        setupRoller();
    }

    /**
     * Initialize the output config vars
     */
    protected void initOutputConfig() {
        this.defaultOutputPath = this.filterConfig.findStringEntry(OUTPUT_PATH, defaultOutputPath);
        this.outputPath = Paths.get(this.defaultOutputPath);
    }

    /**
     * Initialize a file name generator
     */
    protected void initFilenameGenerator() {
        this.fileNameGenerator =
                new DateStampFilenameGenerator(StringUtils.isNotBlank(filterName) ? "." + filterName.toLowerCase(Locale.getDefault()) : "");
    }

    /**
     * Initialize the roll specific vars
     */
    protected void initRollConfig() {
        this.maxRollFileSize = (int) this.filterConfig.findSizeEntry(MAX_ROLL_FILE_SIZE, maxRollFileSize);
        this.maxOutputAppenders = this.filterConfig.findIntEntry(MAX_OUTPUT_APPENDERS, AgentPool.computePoolSize());
        this.rollInterval = this.filterConfig.findLongEntry(CFG_ROLL_INTERVAL, rollInterval);
        this.rollIntervalUnits = TimeUnit.valueOf(this.filterConfig.findStringEntry(ROLL_INTERVAL_UNIT, rollIntervalUnits.toString()));
    }

    /**
     * Create the local output directories
     */
    @SuppressWarnings("SystemExitOutsideMain")
    protected void setupLocalOutputDir() {
        if (!Files.exists(this.outputPath)) {
            logger.info("Attempting to create {} output directory, {}", getFilterName(), this.outputPath);
            try {
                Files.createDirectories(this.outputPath);
            } catch (IOException e) {
                logger.error("Unable to create directory for {} output, exiting immediately.", getFilterName(), e);
                System.exit(1);
            }
        }
    }

    /**
     * Create the {@link JournaledCoalescer} and {@link Roller}
     */
    @SuppressWarnings("SystemExitOutsideMain")
    protected void setupRoller() {
        try {
            this.rollable = createRollable();
            this.roller = createRoller();
            manageRoller();
            logger.info("Added Roller for {} running every {} {}(s) or on size {} (bytes).", getFilterName(), this.rollInterval,
                    this.rollIntervalUnits, this.maxRollFileSize);
        } catch (Exception ex) {
            if (ex instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            logger.error("Unable to instantiate Roller for handling {} file output", getFilterName(), ex);
            System.exit(1);
        }
    }

    /**
     * Create the rollable resource
     *
     * @return the specific journaled coalescer for the filter
     * @throws IOException if there is an issue with the output path
     * @throws InterruptedException if the journal is interrupted
     */
    protected IJournaler createRollable() throws IOException, InterruptedException {
        return new JournaledCoalescer(this.outputPath, this.fileNameGenerator, this.maxOutputAppenders);
    }

    /**
     * Create the object to manage the state of the roll
     *
     * @return the roller object
     */
    protected Roller createRoller() {
        return new Roller(this.maxRollFileSize, this.rollIntervalUnits, this.rollInterval, this.rollable);
    }

    /**
     * Add the roller to the roll manager
     */
    protected void manageRoller() {
        RollManager.getManager().addRoller(this.roller);
    }

    @Override
    public int filter(final IBaseDataObject payload, final Map<String, Object> params) {
        return filter(Collections.singletonList(payload), params);
    }

    @Override
    public int filter(final IBaseDataObject payload, final Map<String, Object> params, final OutputStream output) {
        return filter(Collections.singletonList(payload), params, output);
    }

    @Override
    public int filter(final List<IBaseDataObject> payloadList, final Map<String, Object> params) {
        int code;
        try (KeyedOutput ko = this.rollable.getOutput()) {
            params.put("CONTENT_URI_" + getFilterName(), "file://" + ko.getFinalDestination().toString());
            params.put("CONTENT_FORMAT_" + getFilterName(), getFilterName());
            code = filter(payloadList, params, ko);
            if (code == STATUS_SUCCESS) {
                ko.commit();
            }
        } catch (IOException e) {
            logger.error("IOException during dropoff.", e);
            code = STATUS_FAILURE;
        }
        return code;
    }

    @Override
    public int filter(final List<IBaseDataObject> list, final Map<String, Object> params, final OutputStream output) {

        // We subtract 1 from the list because the first element is currently assumed to be the TLD
        list.get(0).putParameter("DESCENDANT_COUNT", list.size() - 1);

        try {
            output.write(convert(list, params));
            if (appendNewLine) {
                output.write("\n".getBytes());
            }
        } catch (IOException iox) {
            logger.warn("Could not write to log filter", iox);
            return STATUS_FAILURE;
        }
        return STATUS_SUCCESS;
    }


}