WorkBundle.java
- package emissary.pickup;
- import emissary.util.xml.SaferJDOMUtil;
- import org.jdom2.Document;
- import org.jdom2.Element;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.io.DataInputStream;
- import java.io.DataOutputStream;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Iterator;
- import java.util.List;
- import java.util.UUID;
- import javax.annotation.Nullable;
- /**
- * Used to communicate between the TreePickUpPlace and TreeSpace about a set of files to process.
- * <p>
- * Two times are tracked for the files in each work bundle - the "youngest" modification time and the "oldest"
- * modification time, both initially entered as time from the epoch. However, the concept of "youngest" and "oldest" is
- * relative to the construction time, so that:
- * <p>
- * getOldestFileModificationTime() <= getYoungestFileModificationTime()
- */
- public final class WorkBundle implements Comparable<WorkBundle> {
- private static final Logger logger = LoggerFactory.getLogger(WorkBundle.class);
- static final int MAX_UNITS = 1024;
- // Unique ID for this work bundle
- String bundleId;
- // Configured output root for finding data on remote side
- String outputRoot;
- // Configuration passed to remote side for producing output name
- String eatPrefix;
- // Database case id for the work in this bundle
- @Nullable
- String caseId = null;
- // Priority of this work bundle
- int priority = Priority.DEFAULT;
- // Flag to note if the bundle is in simple mode
- boolean simpleMode = false;
- List<WorkUnit> workUnitList = new ArrayList<>();
- // Where being processed
- String sentTo;
- // Cumulative errors in processing tries
- int errorCount = 0;
- /**
- * The oldest file in the bundle in millis since epoch
- */
- long oldestFileModificationTime = Long.MAX_VALUE;
- /**
- * The youngest file in the bundle in millis since epoch
- */
- long youngestFileModificationTime = Long.MIN_VALUE;
- // Aggregate file size
- long totalFileSize = 0L;
- /**
- * Public default constructor
- */
- public WorkBundle() {
- bundleId = generateId();
- }
- /**
- * Public constructor with args
- *
- * @param outputRoot root directory for files
- * @param eatPrefix used when constructing output name
- */
- public WorkBundle(String outputRoot, String eatPrefix) {
- bundleId = generateId();
- this.outputRoot = outputRoot;
- this.eatPrefix = eatPrefix;
- }
- /**
- * Build one as a copy of another, generating a new unique id for the copy. Transient fields sentTo and errorCount are
- * not copied by this constructor
- *
- * @param that the work bundle to copy
- */
- public WorkBundle(WorkBundle that) {
- this.bundleId = that.bundleId;
- this.outputRoot = that.getOutputRoot();
- this.eatPrefix = that.getEatPrefix();
- this.caseId = that.getCaseId();
- this.sentTo = that.sentTo;
- this.errorCount = that.errorCount;
- this.priority = that.getPriority();
- this.simpleMode = that.getSimpleMode();
- this.oldestFileModificationTime = that.oldestFileModificationTime;
- this.youngestFileModificationTime = that.youngestFileModificationTime;
- this.totalFileSize = that.totalFileSize;
- if (!that.getWorkUnitList().isEmpty()) {
- this.addWorkUnits(that.getWorkUnitList());
- }
- resetBundleId();
- }
- /**
- * Deserialize a WorkBundle from a DataInputStream
- *
- * @param in the stream to read from
- * @return the deserialized WorkBundle
- * @throws IOException if there is a problem reading the stream or it contains more than <code>MAX_UNITS</code> work
- * units.
- */
- public static WorkBundle readFromStream(DataInputStream in) throws IOException {
- WorkBundle wb = new WorkBundle();
- wb.bundleId = readUtfOrNull(in);
- wb.outputRoot = readUtfOrNull(in);
- wb.eatPrefix = readUtfOrNull(in);
- wb.caseId = readUtfOrNull(in);
- wb.sentTo = readUtfOrNull(in);
- wb.errorCount = in.readInt();
- wb.priority = in.readInt();
- wb.simpleMode = in.readBoolean();
- wb.oldestFileModificationTime = in.readLong();
- wb.youngestFileModificationTime = in.readLong();
- wb.totalFileSize = in.readLong();
- int workUnitSize = in.readInt();
- if (workUnitSize > MAX_UNITS) {
- throw new IOException(
- "Exception when reading: WorkBundle may not contain more then " + MAX_UNITS + " WorkUnits (saw: " + workUnitSize + ").");
- }
- for (int i = 0; i < workUnitSize; i++) {
- wb.addWorkUnit(WorkUnit.readFromStream(in));
- }
- return wb;
- }
- /**
- * Serialize this WorkBundle to a DataOutputStream
- *
- * @param out the stream to write to.
- * @throws IOException if there is a problem writing to the stream.
- */
- public void writeToStream(DataOutputStream out) throws IOException {
- writeUtfOrNull(bundleId, out);
- writeUtfOrNull(outputRoot, out);
- writeUtfOrNull(eatPrefix, out);
- writeUtfOrNull(caseId, out);
- writeUtfOrNull(sentTo, out);
- out.writeInt(errorCount);
- out.writeInt(priority);
- out.writeBoolean(simpleMode);
- out.writeLong(oldestFileModificationTime);
- out.writeLong(youngestFileModificationTime);
- out.writeLong(totalFileSize);
- out.writeInt(workUnitList.size());
- if (workUnitList.size() > MAX_UNITS) {
- throw new IOException(
- "Exception when writing: WorkBundle may not contain more then " + MAX_UNITS + " WorkUnits (saw: " + workUnitList.size() + ").");
- }
- for (WorkUnit u : workUnitList) {
- u.writeToStream(out);
- }
- }
- @Nullable
- static String readUtfOrNull(DataInputStream in) throws IOException {
- if (in.readBoolean()) {
- return in.readUTF();
- }
- return null;
- }
- static void writeUtfOrNull(@Nullable String s, DataOutputStream out) throws IOException {
- out.writeBoolean(s != null);
- if (s != null) {
- out.writeUTF(s);
- }
- }
- /**
- * Set the work bundle id
- *
- * @param val the new value to set as bundle id
- */
- public void setBundleId(String val) {
- this.bundleId = val;
- }
- /**
- * Reset the unique id
- *
- * @return a copy of the new id
- */
- public String resetBundleId() {
- bundleId = generateId();
- return bundleId;
- }
- /**
- * Get the work bundle id
- */
- public String getBundleId() {
- return bundleId;
- }
- /**
- * Generate a new unique id
- *
- * @return the new id value
- */
- static String generateId() {
- return UUID.randomUUID().toString();
- }
- /**
- * Gets the value of outputRoot
- *
- * @return the value of outputRoot
- */
- public String getOutputRoot() {
- return this.outputRoot;
- }
- /**
- * Sets the value of outputRoot
- *
- * @param argOutputRoot Value to assign to this.outputRoot
- */
- public void setOutputRoot(@Nullable String argOutputRoot) {
- this.outputRoot = argOutputRoot;
- }
- /**
- * Gets the value of eatPrefix
- *
- * @return the value of eatPrefix
- */
- public String getEatPrefix() {
- return this.eatPrefix;
- }
- /**
- * Sets the value of eatPrefix
- *
- * @param argEatPrefix Value to assign to this.eatPrefix
- */
- public void setEatPrefix(@Nullable String argEatPrefix) {
- this.eatPrefix = argEatPrefix;
- }
- /**
- * Gets the list of WorkUnits in bundle
- *
- * @return the list of WorkUnits
- */
- public List<WorkUnit> getWorkUnitList() {
- return new ArrayList<>(workUnitList);
- }
- /**
- * Gets an iterator over work units
- *
- * @return iterator of WorkUnit
- */
- public Iterator<WorkUnit> getWorkUnitIterator() {
- return workUnitList.iterator();
- }
- /**
- * Add a workUnit to the list.
- *
- * @param workUnit the workUnit to add
- * @return number of WorkUnits in list after add
- * @throws IllegalStateException if adding the unit would cause the bundle to contain more than <code>MAX_UNITS</code>
- * work units
- */
- public int addWorkUnit(WorkUnit workUnit) {
- if (workUnitList.size() >= MAX_UNITS) {
- throw new IllegalStateException("WorkBundle may not contain more than " + MAX_UNITS + " WorkUnits.");
- }
- workUnitList.add(workUnit);
- return size();
- }
- /**
- * Add a workunit to the list
- *
- * @param workUnit the workUnit to add
- * @param fileModificationTimeInMillis the file modification time in milliseconds since epoch
- * @param fileSize the size of the file added.
- * @throws IllegalStateException if adding the unit would cause the bundle to contain more than <code>MAX_UNITS</code>
- * work units
- * @return number of files in this set after update
- */
- public int addWorkUnit(WorkUnit workUnit, long fileModificationTimeInMillis, long fileSize) {
- addWorkUnit(workUnit);
- if (fileModificationTimeInMillis < oldestFileModificationTime) {
- oldestFileModificationTime = fileModificationTimeInMillis;
- }
- if (fileModificationTimeInMillis > youngestFileModificationTime) {
- youngestFileModificationTime = fileModificationTimeInMillis;
- }
- totalFileSize += fileSize;
- return size();
- }
- /**
- * Add from a list, without adjusting file modification time tracking.
- *
- * @param list a list of WorkUnits to add to this bundle
- * @return the total size of WorkUnits in this bundle
- * @throws IllegalStateException if adding the units would cause the bundle to contain more than <code>MAX_UNITS</code>
- * work units
- */
- int addWorkUnits(List<WorkUnit> list) { // This appears to only be used by unit tests and the copy constructor
- if (workUnitList.size() + list.size() > MAX_UNITS) {
- throw new IllegalStateException("WorkBundle may not contain more than " + MAX_UNITS + " WorkUnits.");
- }
- workUnitList.addAll(list);
- return workUnitList.size();
- }
- /**
- * Gets the list of file names
- *
- * @return the string values of filenames
- */
- public List<String> getFileNameList() {
- ArrayList<String> fileNameList = new ArrayList<>(workUnitList.size());
- for (WorkUnit workUnit : workUnitList) {
- fileNameList.add(workUnit.getFileName());
- }
- return fileNameList;
- }
- /**
- * Gets an iterator over file names
- *
- * @return iterator of String filename values
- */
- public Iterator<String> getFileNameIterator() {
- return getFileNameList().iterator();
- }
- /**
- * Add a file to the list, without adjusting file modification time tracking.
- *
- * @param file string file name consistent with outputRoot
- * @return number of files in this set after update
- * @throws IllegalStateException if adding the file would cause the bundle to contain more than <code>MAX_UNITS</code>
- * work units
- */
- public int addFileName(String file) {
- return addWorkUnit(new WorkUnit(file));
- }
- /**
- * Add a file to the list
- *
- * @param file string file name consistent with outputRoot
- * @param fileModificationTimeInMillis the file modification time in milliseconds since epoch
- * @param fileSize the size of the file being added
- * @return number of files in this set after update
- * @throws IllegalStateException if adding the file would cause the bundle to contain more than <code>MAX_UNITS</code>
- * work units
- */
- public int addFileName(String file, long fileModificationTimeInMillis, long fileSize) {
- return addWorkUnit(new WorkUnit(file), fileModificationTimeInMillis, fileSize);
- }
- /**
- * Add files to the list, without adjusting file modification time tracking.
- *
- * @param file string file names consistent with outputRoot
- * @return number of files in this set after update
- * @throws IllegalStateException if adding the files would cause the bundle to contain more than <code>MAX_UNITS</code>
- * work units
- */
- int addFileNames(String[] file) { // This appears to only be used by unit tests
- for (String f : file) {
- addWorkUnit(new WorkUnit(f));
- }
- return size();
- }
- /**
- * Add from a list, without adjusting file modification time tracking.
- *
- * @param list the list of files to add
- * @throws IllegalStateException if adding the files would cause the bundle to contain more than <code>MAX_UNITS</code>
- * work units
- */
- int addFileNames(List<String> list) { // This appears to only be used by unit tests and the copy
- // constructor
- for (String file : list) {
- addWorkUnit(new WorkUnit(file));
- }
- return size();
- }
- /**
- * Get the number of files contained
- */
- public int size() {
- return workUnitList.size();
- }
- /**
- * Clear the files from the list
- */
- void clearFiles() {
- // This is only used for testing
- workUnitList.clear();
- oldestFileModificationTime = Long.MAX_VALUE;
- youngestFileModificationTime = Long.MIN_VALUE;
- totalFileSize = 0L;
- }
- /**
- * Gets the value of caseId
- *
- * @return the value of caseId
- */
- public String getCaseId() {
- return this.caseId;
- }
- /**
- * Sets the value of caseId
- *
- * @param argCaseId Value to assign to this.caseId
- */
- public void setCaseId(@Nullable String argCaseId) {
- this.caseId = argCaseId;
- }
- /**
- * Set the transient sentTo indicating inprogress work
- */
- public void setSentTo(@Nullable String place) {
- this.sentTo = place;
- }
- /**
- * Get the transient sentTo
- */
- public String getSentTo() {
- return sentTo;
- }
- /**
- * Get the transient error count
- */
- public int getErrorCount() {
- return errorCount;
- }
- /**
- * Increment the error count
- *
- * @return the new value
- */
- public int incrementErrorCount() {
- return ++errorCount;
- }
- /**
- * Set a new value for the error count
- */
- public void setErrorCount(int val) {
- errorCount = val;
- }
- /**
- * Set a new priority
- */
- public void setPriority(int val) {
- priority = val;
- }
- /**
- * Get the priority
- */
- public int getPriority() {
- return priority;
- }
- /**
- * Set the value for the simple flag
- *
- * @param val the new value for the flag
- */
- public void setSimpleMode(boolean val) {
- simpleMode = val;
- }
- /**
- * Get the value for the simple mode flag
- */
- public boolean getSimpleMode() {
- return simpleMode;
- }
- public long getOldestFileModificationTime() {
- return oldestFileModificationTime;
- }
- public void setOldestFileModificationTime(long oldestFileModificationTime) {
- this.oldestFileModificationTime = oldestFileModificationTime;
- }
- public long getYoungestFileModificationTime() {
- return youngestFileModificationTime;
- }
- public void setYoungestFileModificationTime(long youngestFileModificationTime) {
- this.youngestFileModificationTime = youngestFileModificationTime;
- }
- public long getTotalFileSize() {
- return totalFileSize;
- }
- public void setTotalFileSize(long totalFileSize) {
- this.totalFileSize = totalFileSize;
- }
- /**
- * Compare in priority order, lower numbers mean high priority data Note: this comparator imposes ordering that is
- * inconsistent with equals
- */
- @Override
- public int compareTo(WorkBundle that) {
- if (this.getPriority() < that.getPriority()) {
- return -1;
- } else if (that.getPriority() < this.getPriority()) {
- return 1;
- } else {
- return 0;
- }
- }
- /**
- * Provide string version
- */
- @Override
- public String toString() {
- return "WorkBundle[id=" + getBundleId() + ", pri=" + getPriority() + ", files=" + getFileNameList().toString() + ", eatPrefix="
- + getEatPrefix()
- + ", outputRoot=" + getOutputRoot() + ", sentTo=" + getSentTo() + ", errorCount=" + getErrorCount() + ", totalFileSize="
- + getTotalFileSize() + ", oldestModTime=" + getOldestFileModificationTime() + ", youngModTime=" + getYoungestFileModificationTime()
- + ", simple=" + getSimpleMode() + ", caseId=" + getCaseId() + ", size=" + size() + "]";
- }
- public String toXml() {
- Element root = new Element("workBundle");
- root.addContent(SaferJDOMUtil.simpleElement("bundleId", getBundleId()));
- root.addContent(SaferJDOMUtil.simpleElement("outputRoot", getOutputRoot()));
- root.addContent(SaferJDOMUtil.simpleElement("eatPrefix", getEatPrefix()));
- root.addContent(SaferJDOMUtil.simpleElement("caseId", getCaseId()));
- root.addContent(SaferJDOMUtil.simpleElement("sentTo", getSentTo()));
- root.addContent(SaferJDOMUtil.simpleElement("errorCount", getErrorCount()));
- root.addContent(SaferJDOMUtil.simpleElement("priority", getPriority()));
- root.addContent(SaferJDOMUtil.simpleElement("simpleMode", getSimpleMode()));
- root.addContent(SaferJDOMUtil.simpleElement("oldestFileModificationTime", getOldestFileModificationTime()));
- root.addContent(SaferJDOMUtil.simpleElement("youngestFileModificationTime", getYoungestFileModificationTime()));
- root.addContent(SaferJDOMUtil.simpleElement("totalFileSize", getTotalFileSize()));
- for (WorkUnit wu : workUnitList) {
- Element workunit = new Element("workUnit");
- workunit.addContent(SaferJDOMUtil.simpleElement("workFileName", wu.getFileName()));
- if (wu.getTransactionId() != null) {
- workunit.addContent(SaferJDOMUtil.simpleElement("transactionId", wu.getTransactionId()));
- }
- workunit.addContent(SaferJDOMUtil.simpleElement("failedToParse", wu.failedToParse()));
- workunit.addContent(SaferJDOMUtil.simpleElement("failedToProcess", wu.failedToProcess()));
- root.addContent(workunit);
- }
- Document jdom = new Document(root);
- return SaferJDOMUtil.toString(jdom);
- }
- /**
- * Build a WorkBundle object from xml
- *
- * @param xml the xml string representing a WorkBundle
- * @return the constructed WorkBundle or null on error
- */
- @Nullable
- public static WorkBundle buildWorkBundle(String xml) {
- Document jdoc;
- try {
- jdoc = SaferJDOMUtil.createDocument(xml);
- return buildWorkBundle(jdoc);
- } catch (Exception ex) {
- logger.error("Cannot make WorkBundle from " + xml, ex);
- return null;
- }
- }
- /**
- * Build a WorkBundle object from a jdom document
- *
- * @param jdom the jdom document representing a work bundle object
- * @return the constructed WorkBundle or null on error
- */
- @Nullable
- private static WorkBundle buildWorkBundle(Document jdom) {
- Element root = jdom.getRootElement();
- if (root == null) {
- logger.error("Document does not have a root element!");
- return null;
- }
- WorkBundle wb = new WorkBundle();
- wb.setBundleId(root.getChildTextTrim("bundleId"));
- String s = root.getChildTextTrim("outputRoot");
- if (s != null && s.length() > 0) {
- wb.setOutputRoot(s);
- } else {
- wb.setOutputRoot(null);
- }
- s = root.getChildTextTrim("eatPrefix");
- if (s != null && s.length() > 0) {
- wb.setEatPrefix(s);
- } else {
- wb.setEatPrefix(null);
- }
- s = root.getChildTextTrim("caseId");
- if (s != null && s.length() > 0) {
- wb.setCaseId(s);
- } else {
- wb.setCaseId(null);
- }
- s = root.getChildTextTrim("sentTo");
- if (s != null && s.length() > 0) {
- wb.setSentTo(s);
- } else {
- wb.setSentTo(null);
- }
- wb.setPriority(SaferJDOMUtil.getChildIntValue(root, "priority"));
- wb.setSimpleMode(SaferJDOMUtil.getChildBooleanValue(root, "simpleMode"));
- wb.setOldestFileModificationTime(SaferJDOMUtil.getChildLongValue(root, "oldestFileModificationTime"));
- wb.setYoungestFileModificationTime(SaferJDOMUtil.getChildLongValue(root, "youngestFileModificationTime"));
- wb.setTotalFileSize(SaferJDOMUtil.getChildLongValue(root, "totalFileSize"));
- String serr = root.getChildTextTrim("errorCount");
- if (serr != null && serr.length() > 0) {
- wb.setErrorCount(Integer.parseInt(serr));
- }
- for (Element wu : root.getChildren("workUnit")) {
- String filename = wu.getChildTextTrim("workFileName");
- String transactionId = wu.getChildTextTrim("transactionId");
- boolean failedToParse = Boolean.parseBoolean(wu.getChildTextTrim("failedToParse"));
- boolean failedToProcess = Boolean.parseBoolean(wu.getChildTextTrim("failedToProcess"));
- wb.addWorkUnit(new WorkUnit(filename, transactionId, failedToParse, failedToProcess));
- }
- return wb;
- }
- }