View Javadoc
1   package emissary.pickup;
2   
3   import emissary.util.xml.SaferJDOMUtil;
4   
5   import org.jdom2.Document;
6   import org.jdom2.Element;
7   import org.slf4j.Logger;
8   import org.slf4j.LoggerFactory;
9   
10  import java.io.DataInputStream;
11  import java.io.DataOutputStream;
12  import java.io.IOException;
13  import java.util.ArrayList;
14  import java.util.Iterator;
15  import java.util.List;
16  import java.util.UUID;
17  import javax.annotation.Nullable;
18  
19  /**
20   * Used to communicate between the TreePickUpPlace and TreeSpace about a set of files to process.
21   * <p>
22   * Two times are tracked for the files in each work bundle - the "youngest" modification time and the "oldest"
23   * modification time, both initially entered as time from the epoch. However, the concept of "youngest" and "oldest" is
24   * relative to the construction time, so that:
25   * <p>
26   * getOldestFileModificationTime() &lt;= getYoungestFileModificationTime()
27   */
28  public final class WorkBundle implements Comparable<WorkBundle> {
29  
30      private static final Logger logger = LoggerFactory.getLogger(WorkBundle.class);
31  
32      static final int MAX_UNITS = 1024;
33  
34      // Unique ID for this work bundle
35      String bundleId;
36  
37      // Configured output root for finding data on remote side
38      String outputRoot;
39  
40      // Configuration passed to remote side for producing output name
41      String eatPrefix;
42  
43      // Database case id for the work in this bundle
44      @Nullable
45      String caseId = null;
46  
47      // Priority of this work bundle
48      int priority = Priority.DEFAULT;
49  
50      // Flag to note if the bundle is in simple mode
51      boolean simpleMode = false;
52  
53      List<WorkUnit> workUnitList = new ArrayList<>();
54  
55      // Where being processed
56      String sentTo;
57  
58      // Cumulative errors in processing tries
59      int errorCount = 0;
60  
61      /**
62       * The oldest file in the bundle in millis since epoch
63       */
64      long oldestFileModificationTime = Long.MAX_VALUE;
65  
66      /**
67       * The youngest file in the bundle in millis since epoch
68       */
69      long youngestFileModificationTime = Long.MIN_VALUE;
70  
71      // Aggregate file size
72      long totalFileSize = 0L;
73  
74      /**
75       * Public default constructor
76       */
77      public WorkBundle() {
78          bundleId = generateId();
79      }
80  
81      /**
82       * Public constructor with args
83       * 
84       * @param outputRoot root directory for files
85       * @param eatPrefix used when constructing output name
86       */
87      public WorkBundle(String outputRoot, String eatPrefix) {
88          bundleId = generateId();
89          this.outputRoot = outputRoot;
90          this.eatPrefix = eatPrefix;
91      }
92  
93      /**
94       * Build one as a copy of another, generating a new unique id for the copy. Transient fields sentTo and errorCount are
95       * not copied by this constructor
96       * 
97       * @param that the work bundle to copy
98       */
99      public WorkBundle(WorkBundle that) {
100         this.bundleId = that.bundleId;
101         this.outputRoot = that.getOutputRoot();
102         this.eatPrefix = that.getEatPrefix();
103         this.caseId = that.getCaseId();
104         this.sentTo = that.sentTo;
105         this.errorCount = that.errorCount;
106         this.priority = that.getPriority();
107         this.simpleMode = that.getSimpleMode();
108         this.oldestFileModificationTime = that.oldestFileModificationTime;
109         this.youngestFileModificationTime = that.youngestFileModificationTime;
110         this.totalFileSize = that.totalFileSize;
111         if (!that.getWorkUnitList().isEmpty()) {
112             this.addWorkUnits(that.getWorkUnitList());
113         }
114         resetBundleId();
115     }
116 
117     /**
118      * Deserialize a WorkBundle from a DataInputStream
119      *
120      * @param in the stream to read from
121      * @return the deserialized WorkBundle
122      * @throws IOException if there is a problem reading the stream or it contains more than <code>MAX_UNITS</code> work
123      *         units.
124      */
125     public static WorkBundle readFromStream(DataInputStream in) throws IOException {
126         WorkBundle wb = new WorkBundle();
127         wb.bundleId = readUtfOrNull(in);
128         wb.outputRoot = readUtfOrNull(in);
129         wb.eatPrefix = readUtfOrNull(in);
130         wb.caseId = readUtfOrNull(in);
131         wb.sentTo = readUtfOrNull(in);
132         wb.errorCount = in.readInt();
133         wb.priority = in.readInt();
134         wb.simpleMode = in.readBoolean();
135         wb.oldestFileModificationTime = in.readLong();
136         wb.youngestFileModificationTime = in.readLong();
137         wb.totalFileSize = in.readLong();
138         int workUnitSize = in.readInt();
139         if (workUnitSize > MAX_UNITS) {
140             throw new IOException(
141                     "Exception when reading: WorkBundle may not contain more then " + MAX_UNITS + " WorkUnits (saw: " + workUnitSize + ").");
142         }
143         for (int i = 0; i < workUnitSize; i++) {
144             wb.addWorkUnit(WorkUnit.readFromStream(in));
145         }
146         return wb;
147     }
148 
149     /**
150      * Serialize this WorkBundle to a DataOutputStream
151      *
152      * @param out the stream to write to.
153      * @throws IOException if there is a problem writing to the stream.
154      */
155     public void writeToStream(DataOutputStream out) throws IOException {
156         writeUtfOrNull(bundleId, out);
157         writeUtfOrNull(outputRoot, out);
158         writeUtfOrNull(eatPrefix, out);
159         writeUtfOrNull(caseId, out);
160         writeUtfOrNull(sentTo, out);
161         out.writeInt(errorCount);
162         out.writeInt(priority);
163         out.writeBoolean(simpleMode);
164         out.writeLong(oldestFileModificationTime);
165         out.writeLong(youngestFileModificationTime);
166         out.writeLong(totalFileSize);
167         out.writeInt(workUnitList.size());
168         if (workUnitList.size() > MAX_UNITS) {
169             throw new IOException(
170                     "Exception when writing: WorkBundle may not contain more then " + MAX_UNITS + " WorkUnits (saw: " + workUnitList.size() + ").");
171         }
172         for (WorkUnit u : workUnitList) {
173             u.writeToStream(out);
174         }
175     }
176 
177     @Nullable
178     static String readUtfOrNull(DataInputStream in) throws IOException {
179         if (in.readBoolean()) {
180             return in.readUTF();
181         }
182         return null;
183     }
184 
185     static void writeUtfOrNull(@Nullable String s, DataOutputStream out) throws IOException {
186         out.writeBoolean(s != null);
187         if (s != null) {
188             out.writeUTF(s);
189         }
190     }
191 
192     /**
193      * Set the work bundle id
194      * 
195      * @param val the new value to set as bundle id
196      */
197     public void setBundleId(String val) {
198         this.bundleId = val;
199     }
200 
201     /**
202      * Reset the unique id
203      * 
204      * @return a copy of the new id
205      */
206     public String resetBundleId() {
207         bundleId = generateId();
208         return bundleId;
209     }
210 
211     /**
212      * Get the work bundle id
213      */
214     public String getBundleId() {
215         return bundleId;
216     }
217 
218     /**
219      * Generate a new unique id
220      * 
221      * @return the new id value
222      */
223     static String generateId() {
224         return UUID.randomUUID().toString();
225     }
226 
227     /**
228      * Gets the value of outputRoot
229      * 
230      * @return the value of outputRoot
231      */
232     public String getOutputRoot() {
233         return this.outputRoot;
234     }
235 
236     /**
237      * Sets the value of outputRoot
238      * 
239      * @param argOutputRoot Value to assign to this.outputRoot
240      */
241     public void setOutputRoot(@Nullable String argOutputRoot) {
242         this.outputRoot = argOutputRoot;
243     }
244 
245     /**
246      * Gets the value of eatPrefix
247      * 
248      * @return the value of eatPrefix
249      */
250     public String getEatPrefix() {
251         return this.eatPrefix;
252     }
253 
254     /**
255      * Sets the value of eatPrefix
256      * 
257      * @param argEatPrefix Value to assign to this.eatPrefix
258      */
259     public void setEatPrefix(@Nullable String argEatPrefix) {
260         this.eatPrefix = argEatPrefix;
261     }
262 
263     /**
264      * Gets the list of WorkUnits in bundle
265      * 
266      * @return the list of WorkUnits
267      */
268     public List<WorkUnit> getWorkUnitList() {
269         return new ArrayList<>(workUnitList);
270     }
271 
272     /**
273      * Gets an iterator over work units
274      * 
275      * @return iterator of WorkUnit
276      */
277     public Iterator<WorkUnit> getWorkUnitIterator() {
278         return workUnitList.iterator();
279     }
280 
281     /**
282      * Add a workUnit to the list.
283      *
284      * @param workUnit the workUnit to add
285      * @return number of WorkUnits in list after add
286      * @throws IllegalStateException if adding the unit would cause the bundle to contain more than <code>MAX_UNITS</code>
287      *         work units
288      */
289     public int addWorkUnit(WorkUnit workUnit) {
290         if (workUnitList.size() >= MAX_UNITS) {
291             throw new IllegalStateException("WorkBundle may not contain more than " + MAX_UNITS + " WorkUnits.");
292         }
293         workUnitList.add(workUnit);
294         return size();
295     }
296 
297     /**
298      * Add a workunit to the list
299      * 
300      * @param workUnit the workUnit to add
301      * @param fileModificationTimeInMillis the file modification time in milliseconds since epoch
302      * @param fileSize the size of the file added.
303      * @throws IllegalStateException if adding the unit would cause the bundle to contain more than <code>MAX_UNITS</code>
304      *         work units
305      * @return number of files in this set after update
306      */
307     public int addWorkUnit(WorkUnit workUnit, long fileModificationTimeInMillis, long fileSize) {
308         addWorkUnit(workUnit);
309 
310         if (fileModificationTimeInMillis < oldestFileModificationTime) {
311             oldestFileModificationTime = fileModificationTimeInMillis;
312         }
313         if (fileModificationTimeInMillis > youngestFileModificationTime) {
314             youngestFileModificationTime = fileModificationTimeInMillis;
315         }
316         totalFileSize += fileSize;
317         return size();
318     }
319 
320     /**
321      * Add from a list, without adjusting file modification time tracking.
322      * 
323      * @param list a list of WorkUnits to add to this bundle
324      * @return the total size of WorkUnits in this bundle
325      * @throws IllegalStateException if adding the units would cause the bundle to contain more than <code>MAX_UNITS</code>
326      *         work units
327      */
328     int addWorkUnits(List<WorkUnit> list) { // This appears to only be used by unit tests and the copy constructor
329         if (workUnitList.size() + list.size() > MAX_UNITS) {
330             throw new IllegalStateException("WorkBundle may not contain more than " + MAX_UNITS + " WorkUnits.");
331         }
332         workUnitList.addAll(list);
333         return workUnitList.size();
334     }
335 
336     /**
337      * Gets the list of file names
338      * 
339      * @return the string values of filenames
340      */
341     public List<String> getFileNameList() {
342         ArrayList<String> fileNameList = new ArrayList<>(workUnitList.size());
343         for (WorkUnit workUnit : workUnitList) {
344             fileNameList.add(workUnit.getFileName());
345         }
346 
347         return fileNameList;
348     }
349 
350     /**
351      * Gets an iterator over file names
352      * 
353      * @return iterator of String filename values
354      */
355     public Iterator<String> getFileNameIterator() {
356         return getFileNameList().iterator();
357     }
358 
359     /**
360      * Add a file to the list, without adjusting file modification time tracking.
361      * 
362      * @param file string file name consistent with outputRoot
363      * @return number of files in this set after update
364      * @throws IllegalStateException if adding the file would cause the bundle to contain more than <code>MAX_UNITS</code>
365      *         work units
366      */
367     public int addFileName(String file) {
368         return addWorkUnit(new WorkUnit(file));
369     }
370 
371     /**
372      * Add a file to the list
373      * 
374      * @param file string file name consistent with outputRoot
375      * @param fileModificationTimeInMillis the file modification time in milliseconds since epoch
376      * @param fileSize the size of the file being added
377      * @return number of files in this set after update
378      * @throws IllegalStateException if adding the file would cause the bundle to contain more than <code>MAX_UNITS</code>
379      *         work units
380      */
381     public int addFileName(String file, long fileModificationTimeInMillis, long fileSize) {
382         return addWorkUnit(new WorkUnit(file), fileModificationTimeInMillis, fileSize);
383     }
384 
385     /**
386      * Add files to the list, without adjusting file modification time tracking.
387      * 
388      * @param file string file names consistent with outputRoot
389      * @return number of files in this set after update
390      * @throws IllegalStateException if adding the files would cause the bundle to contain more than <code>MAX_UNITS</code>
391      *         work units
392      */
393     int addFileNames(String[] file) { // This appears to only be used by unit tests
394         for (String f : file) {
395             addWorkUnit(new WorkUnit(f));
396         }
397         return size();
398     }
399 
400     /**
401      * Add from a list, without adjusting file modification time tracking.
402      * 
403      * @param list the list of files to add
404      * @throws IllegalStateException if adding the files would cause the bundle to contain more than <code>MAX_UNITS</code>
405      *         work units
406      */
407     int addFileNames(List<String> list) { // This appears to only be used by unit tests and the copy
408                                           // constructor
409         for (String file : list) {
410             addWorkUnit(new WorkUnit(file));
411         }
412         return size();
413     }
414 
415     /**
416      * Get the number of files contained
417      */
418     public int size() {
419         return workUnitList.size();
420     }
421 
422     /**
423      * Clear the files from the list
424      */
425     void clearFiles() {
426         // This is only used for testing
427         workUnitList.clear();
428         oldestFileModificationTime = Long.MAX_VALUE;
429         youngestFileModificationTime = Long.MIN_VALUE;
430         totalFileSize = 0L;
431     }
432 
433     /**
434      * Gets the value of caseId
435      * 
436      * @return the value of caseId
437      */
438     public String getCaseId() {
439         return this.caseId;
440     }
441 
442     /**
443      * Sets the value of caseId
444      * 
445      * @param argCaseId Value to assign to this.caseId
446      */
447     public void setCaseId(@Nullable String argCaseId) {
448         this.caseId = argCaseId;
449     }
450 
451     /**
452      * Set the transient sentTo indicating inprogress work
453      */
454     public void setSentTo(@Nullable String place) {
455         this.sentTo = place;
456     }
457 
458     /**
459      * Get the transient sentTo
460      */
461     public String getSentTo() {
462         return sentTo;
463     }
464 
465     /**
466      * Get the transient error count
467      */
468     public int getErrorCount() {
469         return errorCount;
470     }
471 
472     /**
473      * Increment the error count
474      * 
475      * @return the new value
476      */
477     public int incrementErrorCount() {
478         return ++errorCount;
479     }
480 
481     /**
482      * Set a new value for the error count
483      */
484     public void setErrorCount(int val) {
485         errorCount = val;
486     }
487 
488     /**
489      * Set a new priority
490      */
491     public void setPriority(int val) {
492         priority = val;
493     }
494 
495     /**
496      * Get the priority
497      */
498     public int getPriority() {
499         return priority;
500     }
501 
502     /**
503      * Set the value for the simple flag
504      * 
505      * @param val the new value for the flag
506      */
507     public void setSimpleMode(boolean val) {
508         simpleMode = val;
509     }
510 
511     /**
512      * Get the value for the simple mode flag
513      */
514     public boolean getSimpleMode() {
515         return simpleMode;
516     }
517 
518     public long getOldestFileModificationTime() {
519         return oldestFileModificationTime;
520     }
521 
522     public void setOldestFileModificationTime(long oldestFileModificationTime) {
523         this.oldestFileModificationTime = oldestFileModificationTime;
524     }
525 
526     public long getYoungestFileModificationTime() {
527         return youngestFileModificationTime;
528     }
529 
530     public void setYoungestFileModificationTime(long youngestFileModificationTime) {
531         this.youngestFileModificationTime = youngestFileModificationTime;
532     }
533 
534     public long getTotalFileSize() {
535         return totalFileSize;
536     }
537 
538     public void setTotalFileSize(long totalFileSize) {
539         this.totalFileSize = totalFileSize;
540     }
541 
542     /**
543      * Compare in priority order, lower numbers mean high priority data Note: this comparator imposes ordering that is
544      * inconsistent with equals
545      */
546     @Override
547     public int compareTo(WorkBundle that) {
548         if (this.getPriority() < that.getPriority()) {
549             return -1;
550         } else if (that.getPriority() < this.getPriority()) {
551             return 1;
552         } else {
553             return 0;
554         }
555     }
556 
557     /**
558      * Provide string version
559      */
560     @Override
561     public String toString() {
562         return "WorkBundle[id=" + getBundleId() + ", pri=" + getPriority() + ", files=" + getFileNameList().toString() + ", eatPrefix="
563                 + getEatPrefix()
564                 + ", outputRoot=" + getOutputRoot() + ", sentTo=" + getSentTo() + ", errorCount=" + getErrorCount() + ", totalFileSize="
565                 + getTotalFileSize() + ", oldestModTime=" + getOldestFileModificationTime() + ", youngModTime=" + getYoungestFileModificationTime()
566                 + ", simple=" + getSimpleMode() + ", caseId=" + getCaseId() + ", size=" + size() + "]";
567     }
568 
569     public String toXml() {
570         Element root = new Element("workBundle");
571         root.addContent(SaferJDOMUtil.simpleElement("bundleId", getBundleId()));
572         root.addContent(SaferJDOMUtil.simpleElement("outputRoot", getOutputRoot()));
573         root.addContent(SaferJDOMUtil.simpleElement("eatPrefix", getEatPrefix()));
574         root.addContent(SaferJDOMUtil.simpleElement("caseId", getCaseId()));
575         root.addContent(SaferJDOMUtil.simpleElement("sentTo", getSentTo()));
576         root.addContent(SaferJDOMUtil.simpleElement("errorCount", getErrorCount()));
577         root.addContent(SaferJDOMUtil.simpleElement("priority", getPriority()));
578         root.addContent(SaferJDOMUtil.simpleElement("simpleMode", getSimpleMode()));
579         root.addContent(SaferJDOMUtil.simpleElement("oldestFileModificationTime", getOldestFileModificationTime()));
580         root.addContent(SaferJDOMUtil.simpleElement("youngestFileModificationTime", getYoungestFileModificationTime()));
581         root.addContent(SaferJDOMUtil.simpleElement("totalFileSize", getTotalFileSize()));
582 
583         for (WorkUnit wu : workUnitList) {
584             Element workunit = new Element("workUnit");
585             workunit.addContent(SaferJDOMUtil.simpleElement("workFileName", wu.getFileName()));
586             if (wu.getTransactionId() != null) {
587                 workunit.addContent(SaferJDOMUtil.simpleElement("transactionId", wu.getTransactionId()));
588             }
589             workunit.addContent(SaferJDOMUtil.simpleElement("failedToParse", wu.failedToParse()));
590             workunit.addContent(SaferJDOMUtil.simpleElement("failedToProcess", wu.failedToProcess()));
591 
592             root.addContent(workunit);
593         }
594 
595         Document jdom = new Document(root);
596         return SaferJDOMUtil.toString(jdom);
597     }
598 
599     /**
600      * Build a WorkBundle object from xml
601      * 
602      * @param xml the xml string representing a WorkBundle
603      * @return the constructed WorkBundle or null on error
604      */
605     @Nullable
606     public static WorkBundle buildWorkBundle(String xml) {
607         Document jdoc;
608         try {
609             jdoc = SaferJDOMUtil.createDocument(xml);
610             return buildWorkBundle(jdoc);
611         } catch (Exception ex) {
612             logger.error("Cannot make WorkBundle from " + xml, ex);
613             return null;
614         }
615     }
616 
617     /**
618      * Build a WorkBundle object from a jdom document
619      * 
620      * @param jdom the jdom document representing a work bundle object
621      * @return the constructed WorkBundle or null on error
622      */
623     @Nullable
624     private static WorkBundle buildWorkBundle(Document jdom) {
625         Element root = jdom.getRootElement();
626         if (root == null) {
627             logger.error("Document does not have a root element!");
628             return null;
629         }
630 
631         WorkBundle wb = new WorkBundle();
632         wb.setBundleId(root.getChildTextTrim("bundleId"));
633         String s = root.getChildTextTrim("outputRoot");
634         if (s != null && s.length() > 0) {
635             wb.setOutputRoot(s);
636         } else {
637             wb.setOutputRoot(null);
638         }
639 
640         s = root.getChildTextTrim("eatPrefix");
641         if (s != null && s.length() > 0) {
642             wb.setEatPrefix(s);
643         } else {
644             wb.setEatPrefix(null);
645         }
646 
647         s = root.getChildTextTrim("caseId");
648         if (s != null && s.length() > 0) {
649             wb.setCaseId(s);
650         } else {
651             wb.setCaseId(null);
652         }
653 
654         s = root.getChildTextTrim("sentTo");
655         if (s != null && s.length() > 0) {
656             wb.setSentTo(s);
657         } else {
658             wb.setSentTo(null);
659         }
660 
661         wb.setPriority(SaferJDOMUtil.getChildIntValue(root, "priority"));
662         wb.setSimpleMode(SaferJDOMUtil.getChildBooleanValue(root, "simpleMode"));
663         wb.setOldestFileModificationTime(SaferJDOMUtil.getChildLongValue(root, "oldestFileModificationTime"));
664         wb.setYoungestFileModificationTime(SaferJDOMUtil.getChildLongValue(root, "youngestFileModificationTime"));
665         wb.setTotalFileSize(SaferJDOMUtil.getChildLongValue(root, "totalFileSize"));
666         String serr = root.getChildTextTrim("errorCount");
667         if (serr != null && serr.length() > 0) {
668             wb.setErrorCount(Integer.parseInt(serr));
669         }
670 
671         for (Element wu : root.getChildren("workUnit")) {
672             String filename = wu.getChildTextTrim("workFileName");
673             String transactionId = wu.getChildTextTrim("transactionId");
674             boolean failedToParse = Boolean.parseBoolean(wu.getChildTextTrim("failedToParse"));
675             boolean failedToProcess = Boolean.parseBoolean(wu.getChildTextTrim("failedToProcess"));
676             wb.addWorkUnit(new WorkUnit(filename, transactionId, failedToParse, failedToProcess));
677         }
678 
679         return wb;
680     }
681 }