WorkBundle.java

  1. package emissary.pickup;

  2. import emissary.util.xml.SaferJDOMUtil;

  3. import org.jdom2.Document;
  4. import org.jdom2.Element;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;

  7. import java.io.DataInputStream;
  8. import java.io.DataOutputStream;
  9. import java.io.IOException;
  10. import java.util.ArrayList;
  11. import java.util.Iterator;
  12. import java.util.List;
  13. import java.util.UUID;
  14. import javax.annotation.Nullable;

  15. /**
  16.  * Used to communicate between the TreePickUpPlace and TreeSpace about a set of files to process.
  17.  * <p>
  18.  * Two times are tracked for the files in each work bundle - the "youngest" modification time and the "oldest"
  19.  * modification time, both initially entered as time from the epoch. However, the concept of "youngest" and "oldest" is
  20.  * relative to the construction time, so that:
  21.  * <p>
  22.  * getOldestFileModificationTime() &lt;= getYoungestFileModificationTime()
  23.  */
  24. public final class WorkBundle implements Comparable<WorkBundle> {

  25.     private static final Logger logger = LoggerFactory.getLogger(WorkBundle.class);

  26.     static final int MAX_UNITS = 1024;

  27.     // Unique ID for this work bundle
  28.     String bundleId;

  29.     // Configured output root for finding data on remote side
  30.     String outputRoot;

  31.     // Configuration passed to remote side for producing output name
  32.     String eatPrefix;

  33.     // Database case id for the work in this bundle
  34.     @Nullable
  35.     String caseId = null;

  36.     // Priority of this work bundle
  37.     int priority = Priority.DEFAULT;

  38.     // Flag to note if the bundle is in simple mode
  39.     boolean simpleMode = false;

  40.     List<WorkUnit> workUnitList = new ArrayList<>();

  41.     // Where being processed
  42.     String sentTo;

  43.     // Cumulative errors in processing tries
  44.     int errorCount = 0;

  45.     /**
  46.      * The oldest file in the bundle in millis since epoch
  47.      */
  48.     long oldestFileModificationTime = Long.MAX_VALUE;

  49.     /**
  50.      * The youngest file in the bundle in millis since epoch
  51.      */
  52.     long youngestFileModificationTime = Long.MIN_VALUE;

  53.     // Aggregate file size
  54.     long totalFileSize = 0L;

  55.     /**
  56.      * Public default constructor
  57.      */
  58.     public WorkBundle() {
  59.         bundleId = generateId();
  60.     }

  61.     /**
  62.      * Public constructor with args
  63.      *
  64.      * @param outputRoot root directory for files
  65.      * @param eatPrefix used when constructing output name
  66.      */
  67.     public WorkBundle(String outputRoot, String eatPrefix) {
  68.         bundleId = generateId();
  69.         this.outputRoot = outputRoot;
  70.         this.eatPrefix = eatPrefix;
  71.     }

  72.     /**
  73.      * Build one as a copy of another, generating a new unique id for the copy. Transient fields sentTo and errorCount are
  74.      * not copied by this constructor
  75.      *
  76.      * @param that the work bundle to copy
  77.      */
  78.     public WorkBundle(WorkBundle that) {
  79.         this.bundleId = that.bundleId;
  80.         this.outputRoot = that.getOutputRoot();
  81.         this.eatPrefix = that.getEatPrefix();
  82.         this.caseId = that.getCaseId();
  83.         this.sentTo = that.sentTo;
  84.         this.errorCount = that.errorCount;
  85.         this.priority = that.getPriority();
  86.         this.simpleMode = that.getSimpleMode();
  87.         this.oldestFileModificationTime = that.oldestFileModificationTime;
  88.         this.youngestFileModificationTime = that.youngestFileModificationTime;
  89.         this.totalFileSize = that.totalFileSize;
  90.         if (!that.getWorkUnitList().isEmpty()) {
  91.             this.addWorkUnits(that.getWorkUnitList());
  92.         }
  93.         resetBundleId();
  94.     }

  95.     /**
  96.      * Deserialize a WorkBundle from a DataInputStream
  97.      *
  98.      * @param in the stream to read from
  99.      * @return the deserialized WorkBundle
  100.      * @throws IOException if there is a problem reading the stream or it contains more than <code>MAX_UNITS</code> work
  101.      *         units.
  102.      */
  103.     public static WorkBundle readFromStream(DataInputStream in) throws IOException {
  104.         WorkBundle wb = new WorkBundle();
  105.         wb.bundleId = readUtfOrNull(in);
  106.         wb.outputRoot = readUtfOrNull(in);
  107.         wb.eatPrefix = readUtfOrNull(in);
  108.         wb.caseId = readUtfOrNull(in);
  109.         wb.sentTo = readUtfOrNull(in);
  110.         wb.errorCount = in.readInt();
  111.         wb.priority = in.readInt();
  112.         wb.simpleMode = in.readBoolean();
  113.         wb.oldestFileModificationTime = in.readLong();
  114.         wb.youngestFileModificationTime = in.readLong();
  115.         wb.totalFileSize = in.readLong();
  116.         int workUnitSize = in.readInt();
  117.         if (workUnitSize > MAX_UNITS) {
  118.             throw new IOException(
  119.                     "Exception when reading: WorkBundle may not contain more then " + MAX_UNITS + " WorkUnits (saw: " + workUnitSize + ").");
  120.         }
  121.         for (int i = 0; i < workUnitSize; i++) {
  122.             wb.addWorkUnit(WorkUnit.readFromStream(in));
  123.         }
  124.         return wb;
  125.     }

  126.     /**
  127.      * Serialize this WorkBundle to a DataOutputStream
  128.      *
  129.      * @param out the stream to write to.
  130.      * @throws IOException if there is a problem writing to the stream.
  131.      */
  132.     public void writeToStream(DataOutputStream out) throws IOException {
  133.         writeUtfOrNull(bundleId, out);
  134.         writeUtfOrNull(outputRoot, out);
  135.         writeUtfOrNull(eatPrefix, out);
  136.         writeUtfOrNull(caseId, out);
  137.         writeUtfOrNull(sentTo, out);
  138.         out.writeInt(errorCount);
  139.         out.writeInt(priority);
  140.         out.writeBoolean(simpleMode);
  141.         out.writeLong(oldestFileModificationTime);
  142.         out.writeLong(youngestFileModificationTime);
  143.         out.writeLong(totalFileSize);
  144.         out.writeInt(workUnitList.size());
  145.         if (workUnitList.size() > MAX_UNITS) {
  146.             throw new IOException(
  147.                     "Exception when writing: WorkBundle may not contain more then " + MAX_UNITS + " WorkUnits (saw: " + workUnitList.size() + ").");
  148.         }
  149.         for (WorkUnit u : workUnitList) {
  150.             u.writeToStream(out);
  151.         }
  152.     }

  153.     @Nullable
  154.     static String readUtfOrNull(DataInputStream in) throws IOException {
  155.         if (in.readBoolean()) {
  156.             return in.readUTF();
  157.         }
  158.         return null;
  159.     }

  160.     static void writeUtfOrNull(@Nullable String s, DataOutputStream out) throws IOException {
  161.         out.writeBoolean(s != null);
  162.         if (s != null) {
  163.             out.writeUTF(s);
  164.         }
  165.     }

  166.     /**
  167.      * Set the work bundle id
  168.      *
  169.      * @param val the new value to set as bundle id
  170.      */
  171.     public void setBundleId(String val) {
  172.         this.bundleId = val;
  173.     }

  174.     /**
  175.      * Reset the unique id
  176.      *
  177.      * @return a copy of the new id
  178.      */
  179.     public String resetBundleId() {
  180.         bundleId = generateId();
  181.         return bundleId;
  182.     }

  183.     /**
  184.      * Get the work bundle id
  185.      */
  186.     public String getBundleId() {
  187.         return bundleId;
  188.     }

  189.     /**
  190.      * Generate a new unique id
  191.      *
  192.      * @return the new id value
  193.      */
  194.     static String generateId() {
  195.         return UUID.randomUUID().toString();
  196.     }

  197.     /**
  198.      * Gets the value of outputRoot
  199.      *
  200.      * @return the value of outputRoot
  201.      */
  202.     public String getOutputRoot() {
  203.         return this.outputRoot;
  204.     }

  205.     /**
  206.      * Sets the value of outputRoot
  207.      *
  208.      * @param argOutputRoot Value to assign to this.outputRoot
  209.      */
  210.     public void setOutputRoot(@Nullable String argOutputRoot) {
  211.         this.outputRoot = argOutputRoot;
  212.     }

  213.     /**
  214.      * Gets the value of eatPrefix
  215.      *
  216.      * @return the value of eatPrefix
  217.      */
  218.     public String getEatPrefix() {
  219.         return this.eatPrefix;
  220.     }

  221.     /**
  222.      * Sets the value of eatPrefix
  223.      *
  224.      * @param argEatPrefix Value to assign to this.eatPrefix
  225.      */
  226.     public void setEatPrefix(@Nullable String argEatPrefix) {
  227.         this.eatPrefix = argEatPrefix;
  228.     }

  229.     /**
  230.      * Gets the list of WorkUnits in bundle
  231.      *
  232.      * @return the list of WorkUnits
  233.      */
  234.     public List<WorkUnit> getWorkUnitList() {
  235.         return new ArrayList<>(workUnitList);
  236.     }

  237.     /**
  238.      * Gets an iterator over work units
  239.      *
  240.      * @return iterator of WorkUnit
  241.      */
  242.     public Iterator<WorkUnit> getWorkUnitIterator() {
  243.         return workUnitList.iterator();
  244.     }

  245.     /**
  246.      * Add a workUnit to the list.
  247.      *
  248.      * @param workUnit the workUnit to add
  249.      * @return number of WorkUnits in list after add
  250.      * @throws IllegalStateException if adding the unit would cause the bundle to contain more than <code>MAX_UNITS</code>
  251.      *         work units
  252.      */
  253.     public int addWorkUnit(WorkUnit workUnit) {
  254.         if (workUnitList.size() >= MAX_UNITS) {
  255.             throw new IllegalStateException("WorkBundle may not contain more than " + MAX_UNITS + " WorkUnits.");
  256.         }
  257.         workUnitList.add(workUnit);
  258.         return size();
  259.     }

  260.     /**
  261.      * Add a workunit to the list
  262.      *
  263.      * @param workUnit the workUnit to add
  264.      * @param fileModificationTimeInMillis the file modification time in milliseconds since epoch
  265.      * @param fileSize the size of the file added.
  266.      * @throws IllegalStateException if adding the unit would cause the bundle to contain more than <code>MAX_UNITS</code>
  267.      *         work units
  268.      * @return number of files in this set after update
  269.      */
  270.     public int addWorkUnit(WorkUnit workUnit, long fileModificationTimeInMillis, long fileSize) {
  271.         addWorkUnit(workUnit);

  272.         if (fileModificationTimeInMillis < oldestFileModificationTime) {
  273.             oldestFileModificationTime = fileModificationTimeInMillis;
  274.         }
  275.         if (fileModificationTimeInMillis > youngestFileModificationTime) {
  276.             youngestFileModificationTime = fileModificationTimeInMillis;
  277.         }
  278.         totalFileSize += fileSize;
  279.         return size();
  280.     }

  281.     /**
  282.      * Add from a list, without adjusting file modification time tracking.
  283.      *
  284.      * @param list a list of WorkUnits to add to this bundle
  285.      * @return the total size of WorkUnits in this bundle
  286.      * @throws IllegalStateException if adding the units would cause the bundle to contain more than <code>MAX_UNITS</code>
  287.      *         work units
  288.      */
  289.     int addWorkUnits(List<WorkUnit> list) { // This appears to only be used by unit tests and the copy constructor
  290.         if (workUnitList.size() + list.size() > MAX_UNITS) {
  291.             throw new IllegalStateException("WorkBundle may not contain more than " + MAX_UNITS + " WorkUnits.");
  292.         }
  293.         workUnitList.addAll(list);
  294.         return workUnitList.size();
  295.     }

  296.     /**
  297.      * Gets the list of file names
  298.      *
  299.      * @return the string values of filenames
  300.      */
  301.     public List<String> getFileNameList() {
  302.         ArrayList<String> fileNameList = new ArrayList<>(workUnitList.size());
  303.         for (WorkUnit workUnit : workUnitList) {
  304.             fileNameList.add(workUnit.getFileName());
  305.         }

  306.         return fileNameList;
  307.     }

  308.     /**
  309.      * Gets an iterator over file names
  310.      *
  311.      * @return iterator of String filename values
  312.      */
  313.     public Iterator<String> getFileNameIterator() {
  314.         return getFileNameList().iterator();
  315.     }

  316.     /**
  317.      * Add a file to the list, without adjusting file modification time tracking.
  318.      *
  319.      * @param file string file name consistent with outputRoot
  320.      * @return number of files in this set after update
  321.      * @throws IllegalStateException if adding the file would cause the bundle to contain more than <code>MAX_UNITS</code>
  322.      *         work units
  323.      */
  324.     public int addFileName(String file) {
  325.         return addWorkUnit(new WorkUnit(file));
  326.     }

  327.     /**
  328.      * Add a file to the list
  329.      *
  330.      * @param file string file name consistent with outputRoot
  331.      * @param fileModificationTimeInMillis the file modification time in milliseconds since epoch
  332.      * @param fileSize the size of the file being added
  333.      * @return number of files in this set after update
  334.      * @throws IllegalStateException if adding the file would cause the bundle to contain more than <code>MAX_UNITS</code>
  335.      *         work units
  336.      */
  337.     public int addFileName(String file, long fileModificationTimeInMillis, long fileSize) {
  338.         return addWorkUnit(new WorkUnit(file), fileModificationTimeInMillis, fileSize);
  339.     }

  340.     /**
  341.      * Add files to the list, without adjusting file modification time tracking.
  342.      *
  343.      * @param file string file names consistent with outputRoot
  344.      * @return number of files in this set after update
  345.      * @throws IllegalStateException if adding the files would cause the bundle to contain more than <code>MAX_UNITS</code>
  346.      *         work units
  347.      */
  348.     int addFileNames(String[] file) { // This appears to only be used by unit tests
  349.         for (String f : file) {
  350.             addWorkUnit(new WorkUnit(f));
  351.         }
  352.         return size();
  353.     }

  354.     /**
  355.      * Add from a list, without adjusting file modification time tracking.
  356.      *
  357.      * @param list the list of files to add
  358.      * @throws IllegalStateException if adding the files would cause the bundle to contain more than <code>MAX_UNITS</code>
  359.      *         work units
  360.      */
  361.     int addFileNames(List<String> list) { // This appears to only be used by unit tests and the copy
  362.                                           // constructor
  363.         for (String file : list) {
  364.             addWorkUnit(new WorkUnit(file));
  365.         }
  366.         return size();
  367.     }

  368.     /**
  369.      * Get the number of files contained
  370.      */
  371.     public int size() {
  372.         return workUnitList.size();
  373.     }

  374.     /**
  375.      * Clear the files from the list
  376.      */
  377.     void clearFiles() {
  378.         // This is only used for testing
  379.         workUnitList.clear();
  380.         oldestFileModificationTime = Long.MAX_VALUE;
  381.         youngestFileModificationTime = Long.MIN_VALUE;
  382.         totalFileSize = 0L;
  383.     }

  384.     /**
  385.      * Gets the value of caseId
  386.      *
  387.      * @return the value of caseId
  388.      */
  389.     public String getCaseId() {
  390.         return this.caseId;
  391.     }

  392.     /**
  393.      * Sets the value of caseId
  394.      *
  395.      * @param argCaseId Value to assign to this.caseId
  396.      */
  397.     public void setCaseId(@Nullable String argCaseId) {
  398.         this.caseId = argCaseId;
  399.     }

  400.     /**
  401.      * Set the transient sentTo indicating inprogress work
  402.      */
  403.     public void setSentTo(@Nullable String place) {
  404.         this.sentTo = place;
  405.     }

  406.     /**
  407.      * Get the transient sentTo
  408.      */
  409.     public String getSentTo() {
  410.         return sentTo;
  411.     }

  412.     /**
  413.      * Get the transient error count
  414.      */
  415.     public int getErrorCount() {
  416.         return errorCount;
  417.     }

  418.     /**
  419.      * Increment the error count
  420.      *
  421.      * @return the new value
  422.      */
  423.     public int incrementErrorCount() {
  424.         return ++errorCount;
  425.     }

  426.     /**
  427.      * Set a new value for the error count
  428.      */
  429.     public void setErrorCount(int val) {
  430.         errorCount = val;
  431.     }

  432.     /**
  433.      * Set a new priority
  434.      */
  435.     public void setPriority(int val) {
  436.         priority = val;
  437.     }

  438.     /**
  439.      * Get the priority
  440.      */
  441.     public int getPriority() {
  442.         return priority;
  443.     }

  444.     /**
  445.      * Set the value for the simple flag
  446.      *
  447.      * @param val the new value for the flag
  448.      */
  449.     public void setSimpleMode(boolean val) {
  450.         simpleMode = val;
  451.     }

  452.     /**
  453.      * Get the value for the simple mode flag
  454.      */
  455.     public boolean getSimpleMode() {
  456.         return simpleMode;
  457.     }

  458.     public long getOldestFileModificationTime() {
  459.         return oldestFileModificationTime;
  460.     }

  461.     public void setOldestFileModificationTime(long oldestFileModificationTime) {
  462.         this.oldestFileModificationTime = oldestFileModificationTime;
  463.     }

  464.     public long getYoungestFileModificationTime() {
  465.         return youngestFileModificationTime;
  466.     }

  467.     public void setYoungestFileModificationTime(long youngestFileModificationTime) {
  468.         this.youngestFileModificationTime = youngestFileModificationTime;
  469.     }

  470.     public long getTotalFileSize() {
  471.         return totalFileSize;
  472.     }

  473.     public void setTotalFileSize(long totalFileSize) {
  474.         this.totalFileSize = totalFileSize;
  475.     }

  476.     /**
  477.      * Compare in priority order, lower numbers mean high priority data Note: this comparator imposes ordering that is
  478.      * inconsistent with equals
  479.      */
  480.     @Override
  481.     public int compareTo(WorkBundle that) {
  482.         if (this.getPriority() < that.getPriority()) {
  483.             return -1;
  484.         } else if (that.getPriority() < this.getPriority()) {
  485.             return 1;
  486.         } else {
  487.             return 0;
  488.         }
  489.     }

  490.     /**
  491.      * Provide string version
  492.      */
  493.     @Override
  494.     public String toString() {
  495.         return "WorkBundle[id=" + getBundleId() + ", pri=" + getPriority() + ", files=" + getFileNameList().toString() + ", eatPrefix="
  496.                 + getEatPrefix()
  497.                 + ", outputRoot=" + getOutputRoot() + ", sentTo=" + getSentTo() + ", errorCount=" + getErrorCount() + ", totalFileSize="
  498.                 + getTotalFileSize() + ", oldestModTime=" + getOldestFileModificationTime() + ", youngModTime=" + getYoungestFileModificationTime()
  499.                 + ", simple=" + getSimpleMode() + ", caseId=" + getCaseId() + ", size=" + size() + "]";
  500.     }

  501.     public String toXml() {
  502.         Element root = new Element("workBundle");
  503.         root.addContent(SaferJDOMUtil.simpleElement("bundleId", getBundleId()));
  504.         root.addContent(SaferJDOMUtil.simpleElement("outputRoot", getOutputRoot()));
  505.         root.addContent(SaferJDOMUtil.simpleElement("eatPrefix", getEatPrefix()));
  506.         root.addContent(SaferJDOMUtil.simpleElement("caseId", getCaseId()));
  507.         root.addContent(SaferJDOMUtil.simpleElement("sentTo", getSentTo()));
  508.         root.addContent(SaferJDOMUtil.simpleElement("errorCount", getErrorCount()));
  509.         root.addContent(SaferJDOMUtil.simpleElement("priority", getPriority()));
  510.         root.addContent(SaferJDOMUtil.simpleElement("simpleMode", getSimpleMode()));
  511.         root.addContent(SaferJDOMUtil.simpleElement("oldestFileModificationTime", getOldestFileModificationTime()));
  512.         root.addContent(SaferJDOMUtil.simpleElement("youngestFileModificationTime", getYoungestFileModificationTime()));
  513.         root.addContent(SaferJDOMUtil.simpleElement("totalFileSize", getTotalFileSize()));

  514.         for (WorkUnit wu : workUnitList) {
  515.             Element workunit = new Element("workUnit");
  516.             workunit.addContent(SaferJDOMUtil.simpleElement("workFileName", wu.getFileName()));
  517.             if (wu.getTransactionId() != null) {
  518.                 workunit.addContent(SaferJDOMUtil.simpleElement("transactionId", wu.getTransactionId()));
  519.             }
  520.             workunit.addContent(SaferJDOMUtil.simpleElement("failedToParse", wu.failedToParse()));
  521.             workunit.addContent(SaferJDOMUtil.simpleElement("failedToProcess", wu.failedToProcess()));

  522.             root.addContent(workunit);
  523.         }

  524.         Document jdom = new Document(root);
  525.         return SaferJDOMUtil.toString(jdom);
  526.     }

  527.     /**
  528.      * Build a WorkBundle object from xml
  529.      *
  530.      * @param xml the xml string representing a WorkBundle
  531.      * @return the constructed WorkBundle or null on error
  532.      */
  533.     @Nullable
  534.     public static WorkBundle buildWorkBundle(String xml) {
  535.         Document jdoc;
  536.         try {
  537.             jdoc = SaferJDOMUtil.createDocument(xml);
  538.             return buildWorkBundle(jdoc);
  539.         } catch (Exception ex) {
  540.             logger.error("Cannot make WorkBundle from " + xml, ex);
  541.             return null;
  542.         }
  543.     }

  544.     /**
  545.      * Build a WorkBundle object from a jdom document
  546.      *
  547.      * @param jdom the jdom document representing a work bundle object
  548.      * @return the constructed WorkBundle or null on error
  549.      */
  550.     @Nullable
  551.     private static WorkBundle buildWorkBundle(Document jdom) {
  552.         Element root = jdom.getRootElement();
  553.         if (root == null) {
  554.             logger.error("Document does not have a root element!");
  555.             return null;
  556.         }

  557.         WorkBundle wb = new WorkBundle();
  558.         wb.setBundleId(root.getChildTextTrim("bundleId"));
  559.         String s = root.getChildTextTrim("outputRoot");
  560.         if (s != null && s.length() > 0) {
  561.             wb.setOutputRoot(s);
  562.         } else {
  563.             wb.setOutputRoot(null);
  564.         }

  565.         s = root.getChildTextTrim("eatPrefix");
  566.         if (s != null && s.length() > 0) {
  567.             wb.setEatPrefix(s);
  568.         } else {
  569.             wb.setEatPrefix(null);
  570.         }

  571.         s = root.getChildTextTrim("caseId");
  572.         if (s != null && s.length() > 0) {
  573.             wb.setCaseId(s);
  574.         } else {
  575.             wb.setCaseId(null);
  576.         }

  577.         s = root.getChildTextTrim("sentTo");
  578.         if (s != null && s.length() > 0) {
  579.             wb.setSentTo(s);
  580.         } else {
  581.             wb.setSentTo(null);
  582.         }

  583.         wb.setPriority(SaferJDOMUtil.getChildIntValue(root, "priority"));
  584.         wb.setSimpleMode(SaferJDOMUtil.getChildBooleanValue(root, "simpleMode"));
  585.         wb.setOldestFileModificationTime(SaferJDOMUtil.getChildLongValue(root, "oldestFileModificationTime"));
  586.         wb.setYoungestFileModificationTime(SaferJDOMUtil.getChildLongValue(root, "youngestFileModificationTime"));
  587.         wb.setTotalFileSize(SaferJDOMUtil.getChildLongValue(root, "totalFileSize"));
  588.         String serr = root.getChildTextTrim("errorCount");
  589.         if (serr != null && serr.length() > 0) {
  590.             wb.setErrorCount(Integer.parseInt(serr));
  591.         }

  592.         for (Element wu : root.getChildren("workUnit")) {
  593.             String filename = wu.getChildTextTrim("workFileName");
  594.             String transactionId = wu.getChildTextTrim("transactionId");
  595.             boolean failedToParse = Boolean.parseBoolean(wu.getChildTextTrim("failedToParse"));
  596.             boolean failedToProcess = Boolean.parseBoolean(wu.getChildTextTrim("failedToProcess"));
  597.             wb.addWorkUnit(new WorkUnit(filename, transactionId, failedToParse, failedToProcess));
  598.         }

  599.         return wb;
  600.     }
  601. }