FeedCommand.java

package emissary.command;

import emissary.command.converter.PriorityDirectoryConverter;
import emissary.command.converter.WorkspaceSortModeConverter;
import emissary.pickup.PriorityDirectory;
import emissary.pickup.WorkBundle;
import emissary.pickup.WorkSpace;

import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine.Command;
import picocli.CommandLine.Model.CommandSpec;
import picocli.CommandLine.Option;
import picocli.CommandLine.ParameterException;
import picocli.CommandLine.Spec;

import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;

@Command(description = "Start the feeder process given a particular WorkSpace implementation to distribute work to peer nodes",
        subcommands = {HelpCommand.class})
public class FeedCommand extends ServiceCommand {
    @Spec
    private CommandSpec spec;
    private static final Logger LOG = LoggerFactory.getLogger(FeedCommand.class);

    public static final String COMMAND_NAME = "feed";
    public static final int DEFAULT_PORT = 7001;

    @Option(names = {"-w", "--workspace"}, description = "fully qualified class to use as the WorkSpace implementation\nDefault: ${DEFAULT-VALUE}")
    private String workspaceClass = "emissary.pickup.WorkSpace";

    @Option(names = {"--bundleSize"}, description = "number of files to pack in each work bundle given to the peers\nDefault: ${DEFAULT-VALUE}")
    private int bundleSize = 1;

    @Option(names = {"-ci", "--caseId"}, description = "case id to assign\nDefault: ${DEFAULT-VALUE}")
    private String caseId = "auto";

    @Option(names = {"-cc", "--caseClass"}, description = "case class to assign\nDefault: <empty string>")
    private String caseClass = "";

    @Option(names = {"-ep", "--eatPrefix"}, description = "prefix to eat on input files when creating work bundles\nDefault: <empty string>")
    private String eatPrefix = "";

    @Option(names = {"-cs", "--case"}, description = "Pattern to use to find the clients in the namespace\nDefault: ${DEFAULT-VALUE}")
    private String clientPattern = "INITIAL.FILE_PICK_UP_CLIENT.INPUT.*";

    @Option(names = {"-o", "--feedOutputRoot"},
            description = "the root path to use when writing successfully parsed input, defaults to projectBase/DoneParsedData")
    private String feedOutputRoot;

    @Option(names = {"-i", "--inputRoot"},
            split = ",",
            description = "the root path or comma-separated paths to use when reading input, can use PriorityDirectory format",
            converter = PriorityDirectoryConverter.class)
    private List<PriorityDirectory> priorityDirectories;

    @Option(names = {"--sort"}, description = "order which to sort files as they are put into work bundles, defaults to Priority sort (10)",
            converter = WorkspaceSortModeConverter.class)
    private Comparator<WorkBundle> sort;

    @Option(names = {"-ns", "--namespaceName"}, description = "name to assign to the work space\nDefault: ${DEFAULT-VALUE}")
    private String workspaceName = "WorkSpace";

    @Option(names = {"-sd", "--skipDot"}, description = "skips dot files when creating work bundles\nDefault: ${DEFAULT-VALUE}")
    private boolean skipDotFile = true;

    @Option(names = {"-dirs", "--includeDirs"},
            description = "Set directory processing flag. When true directory entries are retrieved from the input area just like normal\nDefault: ${DEFAULT-VALUE}")
    private boolean includeDirs = false;

    @Option(names = {"-l", "--loop"}, description = "Controls loop functionality of workspace\nDefault: ${DEFAULT-VALUE}")
    private boolean loop = true;

    @Option(names = {"-r", "--retry"}, description = "controls if we retry or not\nDefault: ${DEFAULT-VALUE}")
    private boolean retry = true;

    @Option(names = {"--simple"}, description = "turn on simple mode\nDefault: ${DEFAULT-VALUE}")
    private boolean simple = false;

    @Option(names = {"-ft", "--fileTimestamp"},
            description = "set the use of file timestamps to control whether a file is new enough to be added to the queue\nDefault: ${DEFAULT-VALUE}")
    private boolean fileTimestamp = false;

    @Override
    public String getCommandName() {
        return COMMAND_NAME;
    }

    @Override
    public int getDefaultPort() {
        return DEFAULT_PORT;
    }

    @Override
    public void startService() {
        if (CollectionUtils.isEmpty(priorityDirectories)) {
            LOG.error("No input root or priority directories specified");
            throw new ParameterException(spec.commandLine(), "Missing required parameter '-i' for input root or priority directories");
        }

        LOG.info("Starting feeder using {} as the workspace class", workspaceClass);
        try {
            WorkSpace ws = Class.forName(workspaceClass).asSubclass(WorkSpace.class).getDeclaredConstructor(FeedCommand.class).newInstance(this);
            ws.run();
        } catch (Exception e) {
            LOG.error("Error running WorkSpace class: {} ", workspaceClass, e);
        }
    }

    @Override
    public void setupCommand() {
        setupHttp();
        setupFeed();
        reinitLogback();
    }

    public void setupFeed() {
        String flavorMode = "CLUSTER";
        if (getFlavor() != null) {
            flavorMode = flavorMode + "," + getFlavor();
        }

        // Must maintain insertion order
        Set<String> flavorSet = new LinkedHashSet<>();
        for (String f : flavorMode.split(",")) {
            flavorSet.add(f.toUpperCase(Locale.getDefault()));
        }

        overrideFlavor(String.join(",", flavorSet));
    }

    public String getWorkspaceClass() {
        return workspaceClass;
    }

    public boolean isSimple() {
        return simple;
    }

    public String getCaseId() {
        return caseId;
    }

    public String getCaseClass() {
        return caseClass;
    }

    public String getEatPrefix() {
        return eatPrefix;
    }

    public boolean isSkipDotFile() {
        return skipDotFile;
    }

    public boolean isIncludeDirs() {
        return includeDirs;
    }

    public String getClientPattern() {
        return clientPattern;
    }

    public int getBundleSize() {
        return bundleSize;
    }

    public String getOutputRoot() {
        if (this.feedOutputRoot == null) {
            this.feedOutputRoot = this.getProjectBase().resolve("DoneParsedData").toString();
        }
        return this.feedOutputRoot;
    }

    public List<PriorityDirectory> getPriorityDirectories() {
        return priorityDirectories;
    }

    public Comparator<WorkBundle> getSort() {
        return sort;
    }

    public String getWorkspaceName() {
        return workspaceName;
    }

    public boolean isLoop() {
        return loop;
    }

    public boolean isRetry() {
        return retry;
    }

    public boolean isFileTimestamp() {
        return fileTimestamp;
    }
}