WorkBundle.java
package emissary.pickup;
import emissary.util.xml.SaferJDOMUtil;
import jakarta.annotation.Nullable;
import org.jdom2.Document;
import org.jdom2.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
/**
* Used to communicate between the TreePickUpPlace and TreeSpace about a set of files to process.
* <p>
* Two times are tracked for the files in each work bundle - the "youngest" modification time and the "oldest"
* modification time, both initially entered as time from the epoch. However, the concept of "youngest" and "oldest" is
* relative to the construction time, so that:
* <p>
* getOldestFileModificationTime() <= getYoungestFileModificationTime()
*/
public final class WorkBundle implements Comparable<WorkBundle> {
private static final Logger logger = LoggerFactory.getLogger(WorkBundle.class);
static final int MAX_UNITS = 1024;
// Unique ID for this work bundle
String bundleId;
// Configured output root for finding data on remote side
String outputRoot;
// Configuration passed to remote side for producing output name
String eatPrefix;
// Database case id for the work in this bundle
@Nullable
String caseId = null;
// Priority of this work bundle
int priority = Priority.DEFAULT;
// Flag to note if the bundle is in simple mode
boolean simpleMode = false;
List<WorkUnit> workUnitList = new ArrayList<>();
// Where being processed
String sentTo;
// Cumulative errors in processing tries
int errorCount = 0;
/**
* The oldest file in the bundle in millis since epoch
*/
long oldestFileModificationTime = Long.MAX_VALUE;
/**
* The youngest file in the bundle in millis since epoch
*/
long youngestFileModificationTime = Long.MIN_VALUE;
// Aggregate file size
long totalFileSize = 0L;
/**
* Public default constructor
*/
public WorkBundle() {
bundleId = generateId();
}
/**
* Public constructor with args
*
* @param outputRoot root directory for files
* @param eatPrefix used when constructing output name
*/
public WorkBundle(String outputRoot, String eatPrefix) {
bundleId = generateId();
this.outputRoot = outputRoot;
this.eatPrefix = eatPrefix;
}
/**
* Build one as a copy of another, generating a new unique id for the copy. Transient fields sentTo and errorCount are
* not copied by this constructor
*
* @param that the work bundle to copy
*/
public WorkBundle(WorkBundle that) {
this.bundleId = that.bundleId;
this.outputRoot = that.getOutputRoot();
this.eatPrefix = that.getEatPrefix();
this.caseId = that.getCaseId();
this.sentTo = that.sentTo;
this.errorCount = that.errorCount;
this.priority = that.getPriority();
this.simpleMode = that.getSimpleMode();
this.oldestFileModificationTime = that.oldestFileModificationTime;
this.youngestFileModificationTime = that.youngestFileModificationTime;
this.totalFileSize = that.totalFileSize;
if (!that.getWorkUnitList().isEmpty()) {
this.addWorkUnits(that.getWorkUnitList());
}
resetBundleId();
}
/**
* Deserialize a WorkBundle from a DataInputStream
*
* @param in the stream to read from
* @return the deserialized WorkBundle
* @throws IOException if there is a problem reading the stream or it contains more than <code>MAX_UNITS</code> work
* units.
*/
public static WorkBundle readFromStream(DataInputStream in) throws IOException {
WorkBundle wb = new WorkBundle();
wb.bundleId = readUtfOrNull(in);
wb.outputRoot = readUtfOrNull(in);
wb.eatPrefix = readUtfOrNull(in);
wb.caseId = readUtfOrNull(in);
wb.sentTo = readUtfOrNull(in);
wb.errorCount = in.readInt();
wb.priority = in.readInt();
wb.simpleMode = in.readBoolean();
wb.oldestFileModificationTime = in.readLong();
wb.youngestFileModificationTime = in.readLong();
wb.totalFileSize = in.readLong();
int workUnitSize = in.readInt();
if (workUnitSize > MAX_UNITS) {
throw new IOException(
"Exception when reading: WorkBundle may not contain more then " + MAX_UNITS + " WorkUnits (saw: " + workUnitSize + ").");
}
for (int i = 0; i < workUnitSize; i++) {
wb.addWorkUnit(WorkUnit.readFromStream(in));
}
return wb;
}
/**
* Serialize this WorkBundle to a DataOutputStream
*
* @param out the stream to write to.
* @throws IOException if there is a problem writing to the stream.
*/
public void writeToStream(DataOutputStream out) throws IOException {
writeUtfOrNull(bundleId, out);
writeUtfOrNull(outputRoot, out);
writeUtfOrNull(eatPrefix, out);
writeUtfOrNull(caseId, out);
writeUtfOrNull(sentTo, out);
out.writeInt(errorCount);
out.writeInt(priority);
out.writeBoolean(simpleMode);
out.writeLong(oldestFileModificationTime);
out.writeLong(youngestFileModificationTime);
out.writeLong(totalFileSize);
out.writeInt(workUnitList.size());
if (workUnitList.size() > MAX_UNITS) {
throw new IOException(
"Exception when writing: WorkBundle may not contain more then " + MAX_UNITS + " WorkUnits (saw: " + workUnitList.size() + ").");
}
for (WorkUnit u : workUnitList) {
u.writeToStream(out);
}
}
@Nullable
static String readUtfOrNull(DataInputStream in) throws IOException {
if (in.readBoolean()) {
return in.readUTF();
}
return null;
}
static void writeUtfOrNull(@Nullable String s, DataOutputStream out) throws IOException {
out.writeBoolean(s != null);
if (s != null) {
out.writeUTF(s);
}
}
/**
* Set the work bundle id
*
* @param val the new value to set as bundle id
*/
public void setBundleId(String val) {
this.bundleId = val;
}
/**
* Reset the unique id
*
* @return a copy of the new id
*/
public String resetBundleId() {
bundleId = generateId();
return bundleId;
}
/**
* Get the work bundle id
*/
public String getBundleId() {
return bundleId;
}
/**
* Generate a new unique id
*
* @return the new id value
*/
static String generateId() {
return UUID.randomUUID().toString();
}
/**
* Gets the value of outputRoot
*
* @return the value of outputRoot
*/
public String getOutputRoot() {
return this.outputRoot;
}
/**
* Sets the value of outputRoot
*
* @param argOutputRoot Value to assign to this.outputRoot
*/
public void setOutputRoot(@Nullable String argOutputRoot) {
this.outputRoot = argOutputRoot;
}
/**
* Gets the value of eatPrefix
*
* @return the value of eatPrefix
*/
public String getEatPrefix() {
return this.eatPrefix;
}
/**
* Sets the value of eatPrefix
*
* @param argEatPrefix Value to assign to this.eatPrefix
*/
public void setEatPrefix(@Nullable String argEatPrefix) {
this.eatPrefix = argEatPrefix;
}
/**
* Gets the list of WorkUnits in bundle
*
* @return the list of WorkUnits
*/
public List<WorkUnit> getWorkUnitList() {
return new ArrayList<>(workUnitList);
}
/**
* Gets an iterator over work units
*
* @return iterator of WorkUnit
*/
public Iterator<WorkUnit> getWorkUnitIterator() {
return workUnitList.iterator();
}
/**
* Add a workUnit to the list.
*
* @param workUnit the workUnit to add
* @return number of WorkUnits in list after add
* @throws IllegalStateException if adding the unit would cause the bundle to contain more than <code>MAX_UNITS</code>
* work units
*/
public int addWorkUnit(WorkUnit workUnit) {
if (workUnitList.size() >= MAX_UNITS) {
throw new IllegalStateException("WorkBundle may not contain more than " + MAX_UNITS + " WorkUnits.");
}
workUnitList.add(workUnit);
return size();
}
/**
* Add a workunit to the list
*
* @param workUnit the workUnit to add
* @param fileModificationTimeInMillis the file modification time in milliseconds since epoch
* @param fileSize the size of the file added.
* @throws IllegalStateException if adding the unit would cause the bundle to contain more than <code>MAX_UNITS</code>
* work units
* @return number of files in this set after update
*/
public int addWorkUnit(WorkUnit workUnit, long fileModificationTimeInMillis, long fileSize) {
addWorkUnit(workUnit);
if (fileModificationTimeInMillis < oldestFileModificationTime) {
oldestFileModificationTime = fileModificationTimeInMillis;
}
if (fileModificationTimeInMillis > youngestFileModificationTime) {
youngestFileModificationTime = fileModificationTimeInMillis;
}
totalFileSize += fileSize;
return size();
}
/**
* Add from a list, without adjusting file modification time tracking.
*
* @param list a list of WorkUnits to add to this bundle
* @return the total size of WorkUnits in this bundle
* @throws IllegalStateException if adding the units would cause the bundle to contain more than <code>MAX_UNITS</code>
* work units
*/
int addWorkUnits(List<WorkUnit> list) { // This appears to only be used by unit tests and the copy constructor
if (workUnitList.size() + list.size() > MAX_UNITS) {
throw new IllegalStateException("WorkBundle may not contain more than " + MAX_UNITS + " WorkUnits.");
}
workUnitList.addAll(list);
return workUnitList.size();
}
/**
* Gets the list of file names
*
* @return the string values of filenames
*/
public List<String> getFileNameList() {
ArrayList<String> fileNameList = new ArrayList<>(workUnitList.size());
for (WorkUnit workUnit : workUnitList) {
fileNameList.add(workUnit.getFileName());
}
return fileNameList;
}
/**
* Gets an iterator over file names
*
* @return iterator of String filename values
*/
public Iterator<String> getFileNameIterator() {
return getFileNameList().iterator();
}
/**
* Add a file to the list, without adjusting file modification time tracking.
*
* @param file string file name consistent with outputRoot
* @return number of files in this set after update
* @throws IllegalStateException if adding the file would cause the bundle to contain more than <code>MAX_UNITS</code>
* work units
*/
public int addFileName(String file) {
return addWorkUnit(new WorkUnit(file));
}
/**
* Add a file to the list
*
* @param file string file name consistent with outputRoot
* @param fileModificationTimeInMillis the file modification time in milliseconds since epoch
* @param fileSize the size of the file being added
* @return number of files in this set after update
* @throws IllegalStateException if adding the file would cause the bundle to contain more than <code>MAX_UNITS</code>
* work units
*/
public int addFileName(String file, long fileModificationTimeInMillis, long fileSize) {
return addWorkUnit(new WorkUnit(file), fileModificationTimeInMillis, fileSize);
}
/**
* Add files to the list, without adjusting file modification time tracking.
*
* @param file string file names consistent with outputRoot
* @return number of files in this set after update
* @throws IllegalStateException if adding the files would cause the bundle to contain more than <code>MAX_UNITS</code>
* work units
*/
int addFileNames(String[] file) { // This appears to only be used by unit tests
for (String f : file) {
addWorkUnit(new WorkUnit(f));
}
return size();
}
/**
* Add from a list, without adjusting file modification time tracking.
*
* @param list the list of files to add
* @throws IllegalStateException if adding the files would cause the bundle to contain more than <code>MAX_UNITS</code>
* work units
*/
int addFileNames(List<String> list) { // This appears to only be used by unit tests and the copy
// constructor
for (String file : list) {
addWorkUnit(new WorkUnit(file));
}
return size();
}
/**
* Get the number of files contained
*/
public int size() {
return workUnitList.size();
}
/**
* Clear the files from the list
*/
void clearFiles() {
// This is only used for testing
workUnitList.clear();
oldestFileModificationTime = Long.MAX_VALUE;
youngestFileModificationTime = Long.MIN_VALUE;
totalFileSize = 0L;
}
/**
* Gets the value of caseId
*
* @return the value of caseId
*/
public String getCaseId() {
return this.caseId;
}
/**
* Sets the value of caseId
*
* @param argCaseId Value to assign to this.caseId
*/
public void setCaseId(@Nullable String argCaseId) {
this.caseId = argCaseId;
}
/**
* Set the transient sentTo indicating inprogress work
*/
public void setSentTo(@Nullable String place) {
this.sentTo = place;
}
/**
* Get the transient sentTo
*/
public String getSentTo() {
return sentTo;
}
/**
* Get the transient error count
*/
public int getErrorCount() {
return errorCount;
}
/**
* Increment the error count
*
* @return the new value
*/
public int incrementErrorCount() {
return ++errorCount;
}
/**
* Set a new value for the error count
*/
public void setErrorCount(int val) {
errorCount = val;
}
/**
* Set a new priority
*/
public void setPriority(int val) {
priority = val;
}
/**
* Get the priority
*/
public int getPriority() {
return priority;
}
/**
* Set the value for the simple flag
*
* @param val the new value for the flag
*/
public void setSimpleMode(boolean val) {
simpleMode = val;
}
/**
* Get the value for the simple mode flag
*/
public boolean getSimpleMode() {
return simpleMode;
}
public long getOldestFileModificationTime() {
return oldestFileModificationTime;
}
public void setOldestFileModificationTime(long oldestFileModificationTime) {
this.oldestFileModificationTime = oldestFileModificationTime;
}
public long getYoungestFileModificationTime() {
return youngestFileModificationTime;
}
public void setYoungestFileModificationTime(long youngestFileModificationTime) {
this.youngestFileModificationTime = youngestFileModificationTime;
}
public long getTotalFileSize() {
return totalFileSize;
}
public void setTotalFileSize(long totalFileSize) {
this.totalFileSize = totalFileSize;
}
/**
* Compare in priority order, lower numbers mean high priority data Note: this comparator imposes ordering that is
* inconsistent with equals
*/
@Override
public int compareTo(WorkBundle that) {
if (this.getPriority() < that.getPriority()) {
return -1;
} else if (that.getPriority() < this.getPriority()) {
return 1;
} else {
return 0;
}
}
/**
* Provide string version
*/
@Override
public String toString() {
return "WorkBundle[id=" + getBundleId() + ", pri=" + getPriority() + ", files=" + getFileNameList().toString() + ", eatPrefix="
+ getEatPrefix()
+ ", outputRoot=" + getOutputRoot() + ", sentTo=" + getSentTo() + ", errorCount=" + getErrorCount() + ", totalFileSize="
+ getTotalFileSize() + ", oldestModTime=" + getOldestFileModificationTime() + ", youngModTime=" + getYoungestFileModificationTime()
+ ", simple=" + getSimpleMode() + ", caseId=" + getCaseId() + ", size=" + size() + "]";
}
public String toXml() {
Element root = new Element("workBundle");
root.addContent(SaferJDOMUtil.simpleElement("bundleId", getBundleId()));
root.addContent(SaferJDOMUtil.simpleElement("outputRoot", getOutputRoot()));
root.addContent(SaferJDOMUtil.simpleElement("eatPrefix", getEatPrefix()));
root.addContent(SaferJDOMUtil.simpleElement("caseId", getCaseId()));
root.addContent(SaferJDOMUtil.simpleElement("sentTo", getSentTo()));
root.addContent(SaferJDOMUtil.simpleElement("errorCount", getErrorCount()));
root.addContent(SaferJDOMUtil.simpleElement("priority", getPriority()));
root.addContent(SaferJDOMUtil.simpleElement("simpleMode", getSimpleMode()));
root.addContent(SaferJDOMUtil.simpleElement("oldestFileModificationTime", getOldestFileModificationTime()));
root.addContent(SaferJDOMUtil.simpleElement("youngestFileModificationTime", getYoungestFileModificationTime()));
root.addContent(SaferJDOMUtil.simpleElement("totalFileSize", getTotalFileSize()));
for (WorkUnit wu : workUnitList) {
Element workunit = new Element("workUnit");
workunit.addContent(SaferJDOMUtil.simpleElement("workFileName", wu.getFileName()));
if (wu.getTransactionId() != null) {
workunit.addContent(SaferJDOMUtil.simpleElement("transactionId", wu.getTransactionId()));
}
workunit.addContent(SaferJDOMUtil.simpleElement("failedToParse", wu.failedToParse()));
workunit.addContent(SaferJDOMUtil.simpleElement("failedToProcess", wu.failedToProcess()));
root.addContent(workunit);
}
Document jdom = new Document(root);
return SaferJDOMUtil.toString(jdom);
}
/**
* Build a WorkBundle object from xml
*
* @param xml the xml string representing a WorkBundle
* @return the constructed WorkBundle or null on error
*/
@Nullable
public static WorkBundle buildWorkBundle(String xml) {
Document jdoc;
try {
jdoc = SaferJDOMUtil.createDocument(xml);
return buildWorkBundle(jdoc);
} catch (Exception ex) {
logger.error("Cannot make WorkBundle from " + xml, ex);
return null;
}
}
/**
* Build a WorkBundle object from a jdom document
*
* @param jdom the jdom document representing a work bundle object
* @return the constructed WorkBundle or null on error
*/
@Nullable
private static WorkBundle buildWorkBundle(Document jdom) {
Element root = jdom.getRootElement();
if (root == null) {
logger.error("Document does not have a root element!");
return null;
}
WorkBundle wb = new WorkBundle();
wb.setBundleId(root.getChildTextTrim("bundleId"));
String s = root.getChildTextTrim("outputRoot");
if (s != null && s.length() > 0) {
wb.setOutputRoot(s);
} else {
wb.setOutputRoot(null);
}
s = root.getChildTextTrim("eatPrefix");
if (s != null && s.length() > 0) {
wb.setEatPrefix(s);
} else {
wb.setEatPrefix(null);
}
s = root.getChildTextTrim("caseId");
if (s != null && s.length() > 0) {
wb.setCaseId(s);
} else {
wb.setCaseId(null);
}
s = root.getChildTextTrim("sentTo");
if (s != null && s.length() > 0) {
wb.setSentTo(s);
} else {
wb.setSentTo(null);
}
wb.setPriority(SaferJDOMUtil.getChildIntValue(root, "priority"));
wb.setSimpleMode(SaferJDOMUtil.getChildBooleanValue(root, "simpleMode"));
wb.setOldestFileModificationTime(SaferJDOMUtil.getChildLongValue(root, "oldestFileModificationTime"));
wb.setYoungestFileModificationTime(SaferJDOMUtil.getChildLongValue(root, "youngestFileModificationTime"));
wb.setTotalFileSize(SaferJDOMUtil.getChildLongValue(root, "totalFileSize"));
String serr = root.getChildTextTrim("errorCount");
if (serr != null && serr.length() > 0) {
wb.setErrorCount(Integer.parseInt(serr));
}
for (Element wu : root.getChildren("workUnit")) {
String filename = wu.getChildTextTrim("workFileName");
String transactionId = wu.getChildTextTrim("transactionId");
boolean failedToParse = Boolean.parseBoolean(wu.getChildTextTrim("failedToParse"));
boolean failedToProcess = Boolean.parseBoolean(wu.getChildTextTrim("failedToProcess"));
wb.addWorkUnit(new WorkUnit(filename, transactionId, failedToParse, failedToProcess));
}
return wb;
}
}