View Javadoc
1   package emissary.pickup;
2   
3   import emissary.client.EmissaryResponse;
4   import emissary.command.BaseCommand;
5   import emissary.command.FeedCommand;
6   import emissary.command.ServerCommand;
7   import emissary.core.EmissaryException;
8   import emissary.core.Namespace;
9   import emissary.core.NamespaceException;
10  import emissary.directory.DirectoryAdapter;
11  import emissary.directory.DirectoryEntry;
12  import emissary.directory.DirectoryPlace;
13  import emissary.directory.EmissaryNode;
14  import emissary.directory.IDirectoryPlace;
15  import emissary.directory.KeyManipulator;
16  import emissary.pool.AgentPool;
17  import emissary.server.EmissaryServer;
18  import emissary.server.mvc.adapters.WorkSpaceAdapter;
19  import emissary.util.Version;
20  import emissary.util.io.FileFind;
21  
22  import jakarta.annotation.Nullable;
23  import org.apache.hc.core5.http.HttpStatus;
24  import org.eclipse.jetty.server.Server;
25  import org.slf4j.Logger;
26  import org.slf4j.LoggerFactory;
27  
28  import java.io.File;
29  import java.lang.management.ManagementFactory;
30  import java.lang.management.MemoryMXBean;
31  import java.lang.management.MemoryUsage;
32  import java.nio.file.Files;
33  import java.nio.file.Paths;
34  import java.util.ArrayList;
35  import java.util.Arrays;
36  import java.util.Collection;
37  import java.util.Collections;
38  import java.util.HashMap;
39  import java.util.HashSet;
40  import java.util.Iterator;
41  import java.util.List;
42  import java.util.Map;
43  import java.util.PriorityQueue;
44  import java.util.Set;
45  import java.util.concurrent.CopyOnWriteArrayList;
46  
47  /**
48   * Recursively process input and distribute files to one or more remote PickUp client instances when they ask for a
49   * bundle of work to do.
50   * <p>
51   * The work bundles, emissary.pickup.WorkBundle objects, are placed on a queue for sending to consumers by the producer
52   * here. Once a WorkBundle has been requested and sent to a consumer (i.e. FilePickUpClient) it is placed on a pending
53   * queue tagged with the consumer id until the consumer notifies this WorkSpace that the work has been completed. If the
54   * consumer goes away without notification, then the work is moved back to the outbound queue and given to another
55   * consumer.
56   */
57  public class WorkSpace implements Runnable {
58      /** Our logger */
59      protected static final Logger logger = LoggerFactory.getLogger(WorkSpace.class);
60  
61      protected FeedCommand feedCommand;
62  
63      /**
64       * Pickup places we will send to, loaded and modified by directory observation during runtime
65       */
66      protected List<String> pups = new CopyOnWriteArrayList<>();
67  
68      /**
69       * Initial pattern for finding pickup places
70       */
71      protected String pattern = System.getProperty(CLZ + ".clientPattern", "*.FILE_PICK_UP_CLIENT.INPUT.*");
72  
73      /** The directory observer for the pattern */
74      protected WorkSpaceDirectoryWatcher watcher;
75  
76      // Process control
77      protected static final String CLZ = WorkSpace.class.getName();
78      protected boolean wantDirectories = Boolean.getBoolean(CLZ + ".includeDirectories");
79      protected boolean debug = Boolean.getBoolean(CLZ + ".debug");
80      protected boolean simpleMode = false;
81      protected String outputRootPath = System.getProperty("outputRoot", null);
82      protected String eatPrefix = System.getProperty("eatPrefix", null);
83      protected int numberOfBundlesToSkip = Integer.getInteger(CLZ + ".skip", 0);
84      protected boolean skipDotFiles = Boolean.getBoolean(CLZ + ".skipDotFiles");
85      protected boolean loop = false;
86      protected boolean useRetryStrategy = false;
87      protected static final int MAX_BUNDLE_RETRIES = 5;
88      protected PriorityQueue<PriorityDirectory> myDirectories = new PriorityQueue<>();
89  
90      // Stats tracking, map of stats per remote pick up place
91      protected WorkSpaceStats stats = new WorkSpaceStats();
92  
93      // Thread to notify clients that there is work to do
94      @Nullable
95      protected ClientNotifier notifier = null;
96  
97      // Process control for collector thread
98      protected boolean timeToQuit = false;
99      protected boolean collectorThreadHasQuit = false;
100     protected boolean jettyStartedHere = false;
101     protected static final float MEM_THRESHOLD = 0.80f;
102     protected long loopPauseTime = 60000L;
103     protected long pendingHangTime = 600000L;
104     protected static final long NOTIFIER_PAUSE_TIME = 1000L;
105     protected int retryCount = 0;
106     protected boolean useFileTimestamps = false;
107     @Nullable
108     protected String projectBase = null;
109 
110     /**
111      * How many file names to send per remote message, should be 10% or less of the size of the PickUpPlace.MAX_QUE Helps
112      * prevent blocking if it's not a factor of the PickUpPlace.MAX_QUE size
113      */
114     protected int filesPerMessage = Integer.getInteger(CLZ + ".filesPerBundle", 5);
115 
116     protected long maxBundleSize = Long.getLong(CLZ + ".maxSizePerBundle", -1);
117 
118     // Metrics collection
119     protected long filesProcessed = 0;
120     protected long bundlesProcessed = 0;
121     protected long bytesProcessed = 0;
122 
123     // Data tracking
124     protected String dataCaseId = System.getProperty("caseId", null);
125     protected boolean caseClosed = false;
126 
127     // List of WorkBundle objects we are going to distribute
128     protected PriorityQueue<WorkBundle> outbound = new PriorityQueue<>();
129 
130     // List of WorkBundle objects that are pending completion notice
131     // Keyed by bundleId to quickly remove items that are processed
132     // normally (the expected case)
133     protected Map<String, WorkBundle> pending = new HashMap<>();
134 
135     // Keep track of files we have seen that are either outbound or pending
136     // so that we can avoid using file timestamps in the collector loop
137     protected Map<String, Long> filesSeen = new HashMap<>();
138     protected Map<String, Long> filesDone = new HashMap<>();
139 
140     // Used to synchronize access to the pending and outbound queues
141     // One lock to rule them all
142     @SuppressWarnings("ConstantField")
143     protected final Object QLOCK = new Object(); // NOSONAR
144 
145     // How we register in the namespace and advertise ourselves
146     protected static final String DEFAULT_WORK_SPACE_NAME = "WorkSpace";
147     protected String workSpaceName = DEFAULT_WORK_SPACE_NAME;
148 
149     protected String workSpaceUrl;
150     protected String workSpaceKey;
151 
152     /**
153      * Command line entry point for sending files to a list of remote TreePickUpPlaces
154      */
155     public static void main(final String[] args) {
156         try {
157             final WorkSpace ws = new WorkSpace(BaseCommand.parse(FeedCommand.class, args));
158             ws.run();
159             logger.info("Workspace has completed the mission [ +1 health ].");
160             ws.shutDown();
161         } catch (Exception e) {
162             logger.error("Bad commandline arguments, check the FeedCommand help", e);
163         }
164         System.exit(0);
165     }
166 
167     /**
168      * Construct the space
169      */
170     @SuppressWarnings("CheckedExceptionNotThrown")
171     public WorkSpace() throws Exception {
172 
173     }
174 
175     public WorkSpace(FeedCommand feedCommand) {
176         this.feedCommand = feedCommand;
177         // TODO make setting of all parameters use setters
178         this.loop = this.feedCommand.isLoop();
179         this.setRetryStrategy(this.feedCommand.isRetry());
180         this.setFileTimestampUsage(this.feedCommand.isFileTimestamp());
181         this.workSpaceName = this.feedCommand.getWorkspaceName();
182         this.simpleMode = this.feedCommand.isSimple();
183         this.projectBase = this.feedCommand.getProjectBase().toAbsolutePath().toString();
184         this.pattern = this.feedCommand.getClientPattern();
185         this.outputRootPath = this.feedCommand.getOutputRoot();
186         this.eatPrefix = this.feedCommand.getEatPrefix();
187         this.filesPerMessage = this.feedCommand.getBundleSize();
188         this.dataCaseId = this.feedCommand.getCaseId();
189         this.setSkipDotFiles(this.feedCommand.isSkipDotFile());
190         this.wantDirectories = this.feedCommand.isIncludeDirs();
191         this.setSimpleMode(this.feedCommand.isSimple());
192         this.myDirectories.addAll(this.feedCommand.getPriorityDirectories());
193 
194         if (null != this.feedCommand.getSort()) {
195             this.outbound = new PriorityQueue<>(11, this.feedCommand.getSort());
196         }
197 
198         startJetty();
199         register();
200         initializeService();
201     }
202 
203     protected void startJetty() {
204         if (!EmissaryServer.isInitialized() || !EmissaryServer.getInstance().isServerRunning()) {
205             // TODO investigate passing the feedCommand object directly to the serverCommand
206             List<String> args = new ArrayList<>();
207             args.add("-b");
208             args.add(projectBase);
209             args.add("--agents");
210             args.add("1"); // feed don't need agents
211             args.add("-h");
212             args.add(this.feedCommand.getHost());
213             args.add("-p");
214             args.add(String.valueOf(this.feedCommand.getPort()));
215             // feed doesn't make sense in standalone
216             args.add("-m");
217             args.add("cluster");
218             args.add("--flavor");
219             args.add(this.feedCommand.getFlavor());
220             if (this.feedCommand.isSslEnabled()) {
221                 args.add("--ssl");
222             }
223             if (this.feedCommand.isSniHostCheckDisabled()) {
224                 args.add("--disableSniHostCheck");
225             }
226             try {
227                 // To ensure the feed command starts correctly, depends on a node-{feedCommand.getPort}.cfg file
228                 ServerCommand cmd = BaseCommand.parse(ServerCommand.class, args);
229                 Server server = EmissaryServer.init(cmd).startServer();
230                 final boolean jettyStatus = server.isStarted();
231                 if (!jettyStatus) {
232                     logger.error("Cannot start the Workspace due to EmissaryServer not starting!");
233                 } else {
234                     logger.info("Workspace is up and running");
235                     this.jettyStartedHere = true;
236                 }
237             } catch (EmissaryException e) {
238                 logger.error("Error starting EmissaryServer! WorkSpace will not start!", e);
239             }
240         } else {
241             logger.info("EmissaryServer is already running, Workspace should be up.");
242         }
243     }
244 
245     protected void initializeService() {
246         // Load existing pickup client list
247         try {
248             this.pups.addAll(getPickUpClients(this.pattern));
249             logger.info("Found {} initial clients using {} in {}", this.pups.size(), this.pattern, getKey());
250             logger.debug("Initial pickups : {}", this.pups);
251         } catch (EmissaryException ex) {
252             logger.error("Cannot lookup pickup places using pattern {} in {}", this.pattern, getKey(), ex);
253         }
254 
255         // Hook our observer onto the local directory,
256         // so we keep in sync with any changes to the clients
257         this.watcher = new WorkSpaceDirectoryWatcher(this.pattern);
258         try {
259             DirectoryAdapter.register(this.watcher);
260         } catch (EmissaryException ex) {
261             logger.error("Cannot register directory observer", ex);
262         }
263 
264         // Must be before we start processing files and directories
265         initializeCase();
266     }
267 
268     /**
269      * Start collection of files and monitoring system progress
270      */
271     @Override
272     public void run() {
273         // Start the collection of files
274         startCollector();
275 
276         // Start client notifier
277         startNotifier();
278 
279         // Start monitoring the system until all work is done
280         monitorProgress();
281 
282         logger.debug("Ending the WorkSpace run method");
283     }
284 
285     /**
286      * Stop the work space
287      */
288     public void stop() {
289         this.timeToQuit = true;
290     }
291 
292     /**
293      * Shut down services that were started here
294      */
295     @SuppressWarnings("CatchingUnchecked")
296     public void shutDown() {
297         stop();
298         if (this.jettyStartedHere) {
299             final EmissaryNode node = EmissaryServer.getInstance().getNode();
300             if (node.isValid()) {
301                 try {
302                     EmissaryServer.getInstance().stop();
303                 } catch (Exception ex) {
304                     logger.error("Jetty cannot be shutdown", ex);
305                 }
306             }
307             try {
308                 AgentPool.lookup().close();
309             } catch (NamespaceException ex) {
310                 logger.debug("Agent pool namespace lookup failed", ex);
311             }
312         }
313     }
314 
315     /**
316      * Set the pending hang time, how long to wait after outbound queue is empty
317      *
318      * @param pendingHangTime in millis
319      */
320     public void setPendingHangTime(final long pendingHangTime) {
321         this.pendingHangTime = pendingHangTime;
322     }
323 
324     /**
325      * Set the loop pause time when loop is true
326      *
327      * @param pauseTimeMillis pause interval in millis
328      */
329     public void setPauseTime(final long pauseTimeMillis) {
330         this.loopPauseTime = pauseTimeMillis;
331     }
332 
333     /**
334      * Set or unset looping
335      */
336     public void setLoop(final boolean on) {
337         this.loop = on;
338     }
339 
340     /**
341      * Get the value of the loop indicator
342      */
343     public boolean getLoop() {
344         return this.loop;
345     }
346 
347     /**
348      * Set the use of file timestamps to control whether a file is new enough to be added to the queue
349      */
350     public void setFileTimestampUsage(final boolean value) {
351         this.useFileTimestamps = value;
352     }
353 
354     /**
355      * Return whether fileTimestamps can be used for collector queue control
356      */
357     public boolean getFileTimestampUsage() {
358         return this.useFileTimestamps;
359     }
360 
361     /**
362      * Set Retry strategy on or off
363      */
364     public void setRetryStrategy(final boolean on) {
365         this.useRetryStrategy = on;
366     }
367 
368     /**
369      * Get value of the retry strategy indicator
370      *
371      * @return true if retry strategy in use
372      */
373     public boolean getRetryStrategy() {
374         return this.useRetryStrategy;
375     }
376 
377     /**
378      * Add directory at specified priority to be monitored
379      */
380     public void addDirectory(final String dir, final int priority) {
381         addDirectory(new PriorityDirectory(dir, priority));
382     }
383 
384     /**
385      * Add specified PriorityDirectory object to be monitored
386      */
387     public void addDirectory(final PriorityDirectory dir) {
388         this.myDirectories.add(dir);
389         logger.debug("Adding input directory {}", dir);
390     }
391 
392     public List<String> getDirectories() {
393         final List<String> l = new ArrayList<>();
394         final PriorityDirectory[] pds = this.myDirectories.toArray(new PriorityDirectory[0]);
395         Arrays.sort(pds);
396         for (final PriorityDirectory pd : pds) {
397             l.add(pd.toString());
398         }
399         return l;
400     }
401 
402     /**
403      * Set directory processing flag. When true directory entries are retrieved from the input area just like normal files.
404      *
405      * @see emissary.util.io.FileFind
406      * @param on the new value for directory retrieval
407      */
408     public void setDirectoryProcessing(final boolean on) {
409         this.wantDirectories = on;
410     }
411 
412     /**
413      * Reset the eatprefix for this workspace
414      *
415      * @param prefix the new prefix
416      */
417     public void setEatPrefix(final String prefix) {
418         logger.debug("Reset eatPrefix to {}", prefix);
419         this.eatPrefix = prefix;
420         normalizeEatPrefix();
421     }
422 
423     /**
424      * Make sure the eatPrefix is in canonical form
425      */
426     protected void normalizeEatPrefix() {
427         if (this.eatPrefix != null && this.eatPrefix.contains("//")) {
428             this.eatPrefix = this.eatPrefix.replaceAll("/+", "/");
429         }
430     }
431 
432     /**
433      * Reset the outputRoot
434      *
435      * @param value the new outputRoot value
436      */
437     public void setOutputRoot(final String value) {
438         logger.debug("Reset outputRoot to {}", value);
439         this.outputRootPath = value;
440     }
441 
442     /**
443      * Get the value of the configured outputRoot
444      */
445     public String getOutputRoot() {
446         return this.outputRootPath;
447     }
448 
449     /**
450      * Reset the case id
451      *
452      * @param value the new value for caseId
453      */
454     public void setCaseId(final String value) {
455         logger.debug("Reset caseId to {}", value);
456         this.dataCaseId = value;
457     }
458 
459     /**
460      * Reset the skipDotFiles flag
461      *
462      * @param value the new value for the skipDotFiles flag
463      */
464     public void setSkipDotFiles(final boolean value) {
465         this.skipDotFiles = value;
466     }
467 
468     /**
469      * Set the debug flag
470      *
471      * @param value the new value for the debug flag
472      */
473     public void setDebugFlag(final boolean value) {
474         this.debug = value;
475     }
476 
477     /**
478      * Set the simple mode flag
479      *
480      * @param value the new value for the flag
481      */
482     public void setSimpleMode(final boolean value) {
483         this.simpleMode = value;
484     }
485 
486     /**
487      * Get the value of the simple mode flag
488      */
489     public boolean getSimpleMode() {
490         return this.simpleMode;
491     }
492 
493     /**
494      * Set the pattern for finding pickup clients
495      *
496      * @param thePattern the new pattern
497      * @see emissary.directory.KeyManipulator#gmatch(String,String)
498      */
499     public void setPattern(@Nullable final String thePattern) throws Exception {
500 
501         if ((this.pattern != null) && this.pattern.equals(thePattern)) {
502             logger.debug("The pattern is already set to {}", thePattern);
503             return;
504         }
505 
506         this.pattern = thePattern;
507 
508         // Clear out old pick up clients
509         logger.warn("Clearing client list so we can look for new pattern {} in {}", thePattern, getKey());
510         this.pups.clear();
511 
512         // Find new ones
513         this.pups.addAll(getPickUpClients(this.pattern));
514 
515         // Set up a new observer on the directory
516         if (this.watcher != null) {
517             DirectoryAdapter.remove(this.watcher);
518         }
519         this.watcher = new WorkSpaceDirectoryWatcher(this.pattern);
520         DirectoryAdapter.register(this.watcher);
521     }
522 
523     /**
524      * Configure the Processor. The *.cfg file is optional
525      */
526     protected void register() {
527         final EmissaryNode node = EmissaryServer.getInstance().getNode();
528         if (node.isValid()) {
529             this.workSpaceUrl = node.getNodeScheme() + "://" + node.getNodeName() + ":" + node.getNodePort() + "/" + this.workSpaceName;
530         } else {
531             this.workSpaceUrl = "http://localhost:8001/" + this.workSpaceName;
532             logger.warn("WorkSpace is not running in a valid emissary node. Using URL {}", this.workSpaceUrl);
533         }
534         this.workSpaceKey = "WORKSPACE.WORK_SPACE.INPUT." + this.workSpaceUrl;
535 
536         normalizeEatPrefix();
537 
538         // Need to bind so WorkSpaceTakeWorker can find us on the callback
539         // The url we use to bind is in the advertisement to clients
540         Namespace.bind(this.workSpaceUrl, this);
541     }
542 
543 
544     /**
545      * Get the initial list of pickup client places from the local directory. Our observer will keep us in sync after this
546      * initial pull. This method does not cause clients to be notified.
547      *
548      * @param thePattern the key pattern to match for places of interest
549      */
550     protected Set<String> getPickUpClients(final String thePattern) throws EmissaryException {
551         final Set<String> thePups = new HashSet<>();
552         final IDirectoryPlace dir = DirectoryPlace.lookup();
553         final List<DirectoryEntry> list = dir.getMatchingEntries(thePattern);
554         for (final DirectoryEntry d : list) {
555             thePups.add(d.getKey());
556             logger.info("Adding pickup client {}", d.getKey());
557         }
558         logger.debug("Found {} initial pickup client entries", thePups.size());
559         return thePups;
560     }
561 
562     /**
563      * Start the file collector threads, one per directory
564      */
565     public void startCollector() {
566         for (final PriorityDirectory pd : this.myDirectories) {
567             final WorkSpaceCollector collector = new WorkSpaceCollector(pd);
568             final Thread collectorThread = new Thread(collector, "WorkSpace Collector " + pd);
569             collectorThread.setDaemon(true);
570             collectorThread.start();
571             logger.debug("Started WorkSpace Collector thread on {}", pd);
572         }
573     }
574 
575     /**
576      * Start the client notification Thread*
577      */
578     public void startNotifier() {
579         this.notifier = new ClientNotifier();
580         final Thread notifierThread = new Thread(this.notifier, "WorkSpace Client Notifier");
581         notifierThread.setDaemon(true);
582         notifierThread.start();
583         logger.debug("Started Client Notifier thread");
584     }
585 
586     /**
587      * Rotate the list of pickups so that the same old one isn't always first on the list.
588      */
589     protected void rotatePickUps() {
590         // Move element(0) to the tail and shift all to the left
591         Collections.rotate(this.pups, -1);
592     }
593 
594     /**
595      * Notify pick up place that data is available
596      *
597      * @return number of successful notices
598      */
599     protected int notifyPickUps() {
600         int successCount = 0;
601         for (final String pup : this.pups) {
602             final boolean status = notifyPickUp(pup);
603             if (status) {
604                 successCount++;
605             }
606             if (getOutboundQueueSize() == 0) {
607                 break;
608             }
609         }
610         logger.debug("Notified {} of {} pickup places", successCount, this.pups.size());
611         return successCount;
612     }
613 
614     /**
615      * Add one pickup and notify of work to be done
616      */
617     protected void addPickUp(final String pup) {
618         if (!this.pups.contains(pup)) {
619             this.pups.add(pup);
620             if (logger.isDebugEnabled()) {
621                 logger.debug("Adding pickup {}, new size={}: {}", pup, this.pups.size(), this.pups);
622             }
623         } else {
624             logger.debug("Not adding {} already on list size {}", pup, this.pups.size());
625         }
626     }
627 
628     /**
629      * Notify one pickup
630      *
631      * @param pup the key of the one to notify
632      */
633     protected boolean notifyPickUp(final String pup) {
634         final WorkSpaceAdapter tpa = new WorkSpaceAdapter();
635         logger.debug("Sending notice to {}", pup);
636 
637         boolean notified = false;
638         int tryCount = 0;
639 
640         while (!notified && tryCount < 5) {
641             final EmissaryResponse status = tpa.outboundOpenWorkSpace(pup, this.workSpaceKey);
642 
643             // TODO Consider putting this method in the response
644             if (status.getStatus() != HttpStatus.SC_OK) {
645                 logger.warn("Failed to notify {} on try {}: {}", pup, tryCount, status.getContentString());
646                 try {
647                     Thread.sleep((tryCount + 1) * 100L);
648                 } catch (InterruptedException ignore) {
649                     Thread.currentThread().interrupt();
650                 }
651             } else {
652                 notified = true;
653             }
654             tryCount++;
655         }
656 
657         if (logger.isInfoEnabled()) {
658             logger.info("Notified {} in {} attempts: {}", pup, tryCount, notified ? "SUCCESS" : "FAILED");
659         }
660 
661         return notified;
662     }
663 
664     /**
665      * Return the registration key for this work space
666      */
667     public String getKey() {
668         return this.workSpaceKey;
669     }
670 
671     /**
672      * Return the workspace name
673      */
674     public String getNamespaceName() {
675         return this.workSpaceName;
676     }
677 
678     /**
679      * Remove a pickup, if it had work bundles pending completion transfer them back to the outbound queue
680      *
681      * @param remoteKey the directory observer string key that was removed
682      */
683     protected void removePickUp(final String remoteKey) {
684         this.pups.remove(remoteKey);
685         if (logger.isDebugEnabled()) {
686             logger.debug("Removed pickup {}, size={}: {}", remoteKey, this.pups.size(), this.pups);
687         }
688         int pendCount = 0;
689         final String remoteName = KeyManipulator.getServiceHost(remoteKey);
690         synchronized (this.QLOCK) {
691             // NB: no enhanced for loop with Iterator.remove()
692             for (Iterator<String> i = this.pending.keySet().iterator(); i.hasNext();) {
693                 final String id = i.next();
694                 final WorkBundle wb = this.pending.get(id);
695                 if (remoteName.equals(wb.getSentTo())) {
696                     i.remove(); // remove from pending
697                     wb.setSentTo(null); // clear in progress indicator
698                     this.retryCount++;
699                     if (wb.incrementErrorCount() <= MAX_BUNDLE_RETRIES) {
700                         logger.debug("Removing pending bundle {} from pending pool, re-adding to outbound with errorCount={}", wb.getBundleId(),
701                                 wb.getErrorCount());
702                         addOutboundBundle(wb); // send to outbound again
703                         pendCount++;
704 
705                         // Set overall counts back to normal
706                         this.bundlesProcessed--;
707                     } else {
708                         logger.error("Bundle {} associated with too many failures, permanently discarding", wb);
709                     }
710                 }
711             }
712         }
713         if (pendCount > 0) {
714             logger.info("Moved {} items back to outbound queue from {}", pendCount, remoteName);
715         }
716     }
717 
718     /**
719      * Method called by remote PickUp client instances when they are ready to receive data from this WorkSpace Access via
720      * emissary.comms.http.WorkSpaceApapter
721      *
722      * @param remoteKey key of the requesting PickUp place
723      * @return WorkBundle at the head of the list or null if empty
724      */
725     public WorkBundle take(final String remoteKey) {
726         final String remoteName = KeyManipulator.getServiceHost(remoteKey);
727         WorkBundle item;
728         synchronized (this.QLOCK) {
729             if (getOutboundQueueSize() == 0) {
730                 // Empty WorkBundle will let them know to stop asking us
731                 logger.info("Sent shutdown msg to {}", remoteName);
732                 this.stats.shutDownSent(remoteName);
733                 item = new WorkBundle();
734             } else {
735                 // transfer from outbound to pending list and
736                 // record who the work was given to track
737                 // completion status
738                 this.stats.bump(remoteName);
739                 item = this.outbound.poll();
740                 item.setSentTo(remoteName);
741                 this.pending.put(item.getBundleId(), item);
742                 logger.info("Gave bundle {} to {}", item, remoteName);
743                 final WorkBundle nextItem = this.outbound.peek();
744                 if (nextItem != null && logger.isInfoEnabled()) {
745                     logger.info("After take: new top differs to prior by [oldest/youngest/size]=[{}/{}/{}]",
746                             nextItem.getOldestFileModificationTime() - item.getOldestFileModificationTime(),
747                             nextItem.getYoungestFileModificationTime() - item.getYoungestFileModificationTime(),
748                             nextItem.getTotalFileSize() - item.getTotalFileSize());
749                 }
750             }
751         }
752         return item;
753     }
754 
755     /**
756      * Add a new bundle of work to the pending queue
757      *
758      * @param wb the new bundle
759      */
760     protected void addOutboundBundle(final WorkBundle wb) {
761         int sz;
762         synchronized (this.QLOCK) {
763             this.bundlesProcessed++;
764             sz = this.outbound.size();
765             this.outbound.add(wb);
766             addFilesSeen(wb.getFileNameList());
767         }
768 
769         if (logger.isInfoEnabled()) {
770             logger.info("Adding workbundle {} size {} filesSeen {}", wb, sz + 1, this.filesSeen.size());
771         }
772     }
773 
774     /**
775      * Show items that are pending completion (debug)
776      *
777      * @deprecated use {@link #showPendingItemsList()}
778      */
779     @Deprecated
780     @SuppressWarnings("AvoidObjectArrays")
781     public String[] showPendingItems() {
782         return showPendingItemsList().toArray(new String[0]);
783     }
784 
785     /**
786      * Show items that are pending completion (debug)
787      */
788     public List<String> showPendingItemsList() {
789         final List<String> list = new ArrayList<>();
790         synchronized (this.QLOCK) {
791             for (final Map.Entry<String, WorkBundle> entry : this.pending.entrySet()) {
792                 list.add(entry.getValue().toString());
793             }
794         }
795         return list;
796     }
797 
798     /**
799      * Clear the pending queue
800      *
801      * @return number of items removed
802      */
803     public int clearPendingQueue() {
804         final int size = getPendingQueueSize();
805 
806         if (size > 0) {
807             synchronized (this.QLOCK) {
808                 logger.debug("Clearing pending queue of {} items", size);
809                 for (final Map.Entry<String, WorkBundle> entry : this.pending.entrySet()) {
810                     removeFilesSeen(entry.getValue().getFileNameList());
811                 }
812                 this.pending.clear();
813                 logger.debug("Cleared filesSeen leaving {} items", this.filesSeen.size());
814             }
815         }
816         return size;
817     }
818 
819     /**
820      * Receive notice that a bundle was completed Normally called from emissary.server.mvc.adapters.WorkSpaceAdapter when a
821      * bundle completion message is received from the remote client doing the processing.
822      *
823      * @param remoteName the name of the place that did the processing
824      * @param bundleId the unique id of the bundle that was completed
825      * @param itWorked true if processed normally
826      * @return true if the item was removed from the pending list
827      */
828     public boolean workCompleted(final String remoteName, final String bundleId, final boolean itWorked) {
829         WorkBundle item;
830 
831         synchronized (this.QLOCK) {
832             item = this.pending.remove(bundleId);
833             if (item != null) {
834                 addFilesDone(item.getFileNameList());
835                 removeFilesSeen(item.getFileNameList());
836                 logger.debug("Removed {} from filesSeen leaving {}", item.size(), this.filesSeen.size());
837             }
838         }
839         if (item == null) {
840             logger.info("Unknown bundle completed: {}", bundleId);
841         } else if (!itWorked) {
842             item.setSentTo(null); // clear in progress indicator
843             if (item.incrementErrorCount() > MAX_BUNDLE_RETRIES) {
844                 logger.error("Bundle {} has too many errors, permanently discarded", item);
845             } else {
846                 addOutboundBundle(item); // send to outbound again
847             }
848         }
849         if (logger.isDebugEnabled()) {
850             logger.debug("Bundle {} completed by {}{}", bundleId, remoteName,
851                     itWorked ? "" : (" but failed for the " + (item != null ? item.getErrorCount() : -1) + " time"));
852         }
853         return item != null;
854     }
855 
856     /**
857      * begin the case processing, does nothing in this implementation
858      */
859     protected void initializeCase() {
860         // Don't care in this implementation
861         logger.debug("In base initializeCase implementation (do nothing)");
862     }
863 
864     /**
865      * end the case processing does nothing in this implementation
866      */
867     protected void closeCase() {
868         // Don't care in this implementation
869         this.caseClosed = true;
870         logger.debug("In base closeCase implementation (do nothing)");
871     }
872 
873     /**
874      * handle getting a directory in the recursive descent
875      *
876      * @param dir File for which isDirectory returns true
877      */
878     protected void processDirectory(final File dir) {
879         // We don't care in this implementation
880         logger.debug("got a directory processDirectory({})", dir);
881     }
882 
883     /**
884      * Add each fileName and its respective lastModifiedDate to the filesSeen list
885      *
886      * @param fileNames the collection of file name strings to add
887      */
888     protected void addFilesSeen(final Collection<String> fileNames) {
889         for (final String fn : fileNames) {
890             this.filesSeen.put(fn, getFileModificationDate(fn));
891         }
892     }
893 
894     /**
895      * Add each fileName and its respective lastModifiedDate to the filesDone list
896      *
897      * @param fileNames the collection of file name strings to add
898      */
899     protected void addFilesDone(final Collection<String> fileNames) {
900         for (final String fn : fileNames) {
901             this.filesDone.put(fn, getFileModificationDate(fn));
902         }
903     }
904 
905     /**
906      * Remove each fileName from the filesSeen list without regard to the timestamp
907      *
908      * @param fileNames the collection of file name strings to remove
909      */
910     protected void removeFilesSeen(final Collection<String> fileNames) {
911         for (final String fn : fileNames) {
912             this.filesSeen.remove(fn);
913         }
914     }
915 
916     /**
917      * Lookup a lastModified date for a file
918      *
919      * @param fn the filename
920      * @return the long representing the date of last modification or 0L if an error, or it does not exist
921      */
922     protected long getFileModificationDate(final String fn) {
923         return new File(fn).lastModified();
924     }
925 
926     protected long getFileSize(final String fn) {
927         return new File(fn).length();
928     }
929 
930     /**
931      * Monitoring progress of the WorkSpace. Indicate some stats once in a while and do not let the foreground thread
932      * terminate while there is still work on the outbound queue or the pending lists.
933      */
934     protected void monitorProgress() {
935         long outboundEmptyTimestamp = -1L;
936 
937         // Do while outbound or pending work exists or collector is
938         // still running
939         while (true) {
940             final int outboundSize = getOutboundQueueSize();
941             int pendingSize = getPendingQueueSize();
942             final boolean reallyQuit = this.timeToQuit && (outboundSize == 0) && (pendingSize == 0);
943 
944             // Rmember when outbound becomes empty
945             if (outboundSize == 0 && outboundEmptyTimestamp == -1L) {
946                 outboundEmptyTimestamp = System.currentTimeMillis();
947             } else if (outboundSize > 0 && outboundEmptyTimestamp > 0L) {
948                 outboundEmptyTimestamp = -1L;
949             }
950 
951             // See if it is time to give up on pending items
952             if ((outboundSize == 0) && !this.loop && ((outboundEmptyTimestamp + this.pendingHangTime) < System.currentTimeMillis())) {
953                 if (logger.isInfoEnabled()) {
954                     logger.info("Giving up on {} items due to timeout", pendingSize);
955                     for (final Map.Entry<String, WorkBundle> entry : this.pending.entrySet()) {
956                         logger.info("Pending item {}: {}", entry.getKey(), entry.getValue());
957                     }
958                 }
959                 clearPendingQueue();
960                 pendingSize = 0;
961             }
962 
963             // All work is done and collector has finished
964             if (outboundSize + pendingSize == 0) {
965                 if (reallyQuit) {
966                     break;
967                 }
968                 publishStats();
969             }
970 
971             // Else sleep a while
972             try {
973                 for (int si = 0; si < 3000; si++) {
974                     Thread.sleep(10L);
975                     if (reallyQuit) {
976                         break;
977                     }
978                 }
979             } catch (InterruptedException ex) {
980                 Thread.currentThread().interrupt();
981             }
982 
983             if (!this.timeToQuit) {
984                 publishStats();
985             }
986         }
987 
988         // Case closing actions
989         closeCase();
990     }
991 
992     /**
993      * Output some information to the logger on what we have been doing lately
994      */
995     public void publishStats() {
996         logger.info(getStatsMessage());
997         for (Iterator<String> i = this.stats.machinesUsed(); i.hasNext();) {
998             final String machine = i.next();
999             logger.info("Machine {} took {} bundles", machine, this.stats.getCountUsed(machine));
1000         }
1001     }
1002 
1003     /**
1004      * Return the current stats to the caller
1005      */
1006     public String getStatsMessage() {
1007         final int outboundSize = getOutboundQueueSize();
1008         final int pendingSize = getPendingQueueSize();
1009 
1010         return "WorkSpace has outbound=" + outboundSize + ", pending=" + pendingSize + ", total bundles / files / bytes = " + this.bundlesProcessed
1011                 + " / " + this.filesProcessed + " / " + this.bytesProcessed + " , #clients=" + getPickUpPlaceCount();
1012     }
1013 
1014     /**
1015      * Return how many files processed so far
1016      */
1017     public long getFilesProcessed() {
1018         return this.filesProcessed;
1019     }
1020 
1021     /**
1022      * Return how many bytes processed so far
1023      */
1024     public long getBytesProcessed() {
1025         return this.bytesProcessed;
1026     }
1027 
1028     /**
1029      * Return how many pickup places are being fed
1030      */
1031     public int getPickUpPlaceCount() {
1032         return this.pups.size();
1033     }
1034 
1035     /**
1036      * Return how many bundles processed so far
1037      */
1038     public long getBundlesProcessed() {
1039         return this.bundlesProcessed;
1040     }
1041 
1042     /**
1043      * Return size of outbound queue
1044      */
1045     public int getOutboundQueueSize() {
1046         synchronized (this.QLOCK) {
1047             return this.outbound.size();
1048         }
1049     }
1050 
1051     public int getRetriedCount() {
1052         return this.retryCount;
1053     }
1054 
1055     /**
1056      * Return size of pending completion queue
1057      */
1058     public int getPendingQueueSize() {
1059         synchronized (this.QLOCK) {
1060             return this.pending.size();
1061         }
1062     }
1063 
1064     /**
1065      * Overridable point to get the version string for output periodically when looping
1066      *
1067      * @return the version info
1068      */
1069     protected String getVersionString() {
1070         return "Emissary version: " + new Version();
1071     }
1072 
1073     public class ClientNotifier implements Runnable {
1074         /**
1075          * Create the notifier Runnable
1076          */
1077         public ClientNotifier() {}
1078 
1079         @Override
1080         public void run() {
1081             while (true) {
1082                 final int qsize = getOutboundQueueSize();
1083                 if (qsize > 0) {
1084                     final long start = System.currentTimeMillis();
1085                     if (logger.isDebugEnabled()) {
1086                         logger.debug("ClientNotification starting with #clients={} outbound={}", getPickUpPlaceCount(), qsize);
1087                     }
1088                     notifyPickUps();
1089                     if (logger.isDebugEnabled()) {
1090                         final long end = System.currentTimeMillis();
1091                         logger.debug("ClientNotification took {}s for #clients={}", (end - start) / 1000.0, getPickUpPlaceCount());
1092                     }
1093                 }
1094 
1095                 try {
1096                     Thread.sleep(NOTIFIER_PAUSE_TIME);
1097                     rotatePickUps();
1098                 } catch (InterruptedException ignore) {
1099                     Thread.currentThread().interrupt();
1100                 }
1101 
1102                 final int outboundSize = getOutboundQueueSize();
1103                 final int pendingSize = getPendingQueueSize();
1104                 if (WorkSpace.this.timeToQuit && (outboundSize == 0) && (pendingSize == 0) && WorkSpace.this.collectorThreadHasQuit) {
1105                     break;
1106                 }
1107             }
1108 
1109             logger.debug("Off the end of the ClientNotifier run loop");
1110         }
1111     }
1112 
1113     /**
1114      * A runnable to collect files into WorkBundles and put them on the outbound queue
1115      */
1116     public class WorkSpaceCollector implements Runnable {
1117 
1118         protected PriorityDirectory myDirectory;
1119 
1120         /**
1121          * Create the collector runnable
1122          */
1123         public WorkSpaceCollector(final PriorityDirectory myDirectory) {
1124             this.myDirectory = myDirectory;
1125         }
1126 
1127         /**
1128          * Pull all the files into bundles, emit some stats, and notify the PickUp client instances to start work. When the list
1129          * of file bundles is empty we can quit or loop around again.
1130          */
1131         @Override
1132         public void run() {
1133             long versionOutputTime = System.currentTimeMillis();
1134             long start;
1135             long stop;
1136             long minFileTime = 0L;
1137 
1138             // Run the processing
1139             long lastFileCollect = 0L;
1140             int loopCount = 0;
1141 
1142             logger.info("Running Workspace from {}", getVersionString());
1143 
1144             do {
1145                 start = System.currentTimeMillis();
1146                 // every hour
1147                 if (start - versionOutputTime > 3600000) {
1148                     logger.info("Continuing Workspace from {}", getVersionString());
1149                     versionOutputTime = start;
1150                 }
1151 
1152                 final WorkBundle paths = new WorkBundle(WorkSpace.this.outputRootPath, WorkSpace.this.eatPrefix);
1153                 paths.setCaseId(WorkSpace.this.dataCaseId);
1154                 paths.setSimpleMode(getSimpleMode());
1155 
1156                 logger.debug("Processing files in {}", this.myDirectory.getDirectoryName());
1157 
1158                 final int collectCount =
1159                         collectFiles(this.myDirectory, WorkSpace.this.wantDirectories, paths, WorkSpace.this.numberOfBundlesToSkip, minFileTime,
1160                                 WorkSpace.this.skipDotFiles);
1161 
1162                 // Set times, so we don't redistribute files next loop
1163                 // if configured to use timestamps
1164                 if (WorkSpace.this.useFileTimestamps) {
1165                     lastFileCollect = System.currentTimeMillis();
1166                 }
1167                 stop = System.currentTimeMillis();
1168                 loopCount++;
1169 
1170                 // We can only skip bundles on the first time through
1171                 WorkSpace.this.numberOfBundlesToSkip = 0;
1172 
1173                 logger.info("Collected {} file bundles in {}s in loop iteration {}, {} items in outbound queue", collectCount,
1174                         (stop - start) / 1000.0, loopCount, WorkSpace.this.outbound.size());
1175 
1176                 if ((collectCount == 0) && WorkSpace.this.loop) {
1177                     // Wait pause time seconds and try again if looping
1178                     try {
1179                         Thread.sleep(WorkSpace.this.loopPauseTime);
1180                     } catch (InterruptedException ioex) {
1181                         Thread.currentThread().interrupt();
1182                     }
1183                     continue;
1184                 }
1185 
1186                 // time shift for next loop if configured to use tstamps
1187                 if (WorkSpace.this.useFileTimestamps) {
1188                     minFileTime = lastFileCollect;
1189                 }
1190 
1191             } while (WorkSpace.this.loop && !WorkSpace.this.timeToQuit);
1192 
1193             logger.debug("Off the end of the WorkSpaceCollector run method");
1194             WorkSpace.this.collectorThreadHasQuit = true;
1195         }
1196 
1197         /**
1198          * Load WorkBundle objects into our linked list of bundles Also process all directories if so instructed
1199          *
1200          * @return count of how many bundles collected for outbound queue
1201          */
1202         protected int collectFiles(final PriorityDirectory dir, final boolean wantDirectories, final WorkBundle basePath,
1203                 final int numberOfBundlesToSkipArg, final long minFileTime, final boolean skipDotFilesArg) {
1204             int skipped = 0;
1205             int collected = 0;
1206             int fileCount = 0;
1207             long bytesInBundle = 0;
1208 
1209             try {
1210                 int ffOptions = FileFind.FILES_FLAG;
1211                 if (wantDirectories) {
1212                     ffOptions |= FileFind.DIRECTORIES_FLAG;
1213                 }
1214                 final FileFind ff = new FileFind(ffOptions);
1215                 final Iterator<?> f = ff.find(dir.getDirectoryName());
1216 
1217                 WorkBundle paths = new WorkBundle(basePath);
1218                 paths.setPriority(dir.getPriority());
1219                 paths.setSimpleMode(getSimpleMode());
1220 
1221                 while (f.hasNext()) {
1222                     // If the outbound queue has a lot of stuff pending
1223                     // and memory is getting tight, just to sleep until
1224                     // the situation eases
1225                     pauseCollector();
1226 
1227                     final File next = (File) f.next();
1228                     final String fileName = next.getPath();
1229 
1230                     // We should only be getting these if we asked for them.
1231                     // We should only use them if we are not resuming a previous run.
1232                     if (next.isDirectory() && numberOfBundlesToSkipArg == 0) {
1233                         logger.debug("Doing directory {}", fileName);
1234                         processDirectory(next);
1235                         continue;
1236                     }
1237 
1238                     // Can we read the file?
1239                     if (!next.isFile() && !next.canRead()) {
1240                         logger.debug("Cannot access file: {}", fileName);
1241                         continue;
1242                     }
1243 
1244                     // Skip dot files possibly
1245                     // TODO Maybe we want to change this to explicitly look for "." instead of isHidden
1246                     if (skipDotFilesArg && Files.isHidden(Paths.get(fileName))) {
1247                         logger.debug("Skipping dot file {}", fileName);
1248                         continue;
1249                     }
1250 
1251                     // Is file too old? (If we aren't configured to use
1252                     // tstamps minFileTime will always be 0L
1253                     if (next.lastModified() < minFileTime) {
1254                         continue;
1255                     }
1256 
1257                     synchronized (WorkSpace.this.QLOCK) {
1258                         if (WorkSpace.this.filesDone.containsKey(fileName)) {
1259                             WorkSpace.this.filesDone.remove(fileName);
1260                             continue;
1261                         } else if (WorkSpace.this.filesSeen.containsKey(fileName)
1262                                 && WorkSpace.this.filesSeen.get(fileName) == next.lastModified()) {
1263                             logger.debug("Skipping file already seen {}, touch file to force add", fileName);
1264                             continue;
1265                         }
1266                     }
1267 
1268                     logger.debug("Adding filename to bundle {}", fileName);
1269 
1270                     // add file to workbundle (at least 1)
1271                     if (workbundleHasRoom(paths, bytesInBundle)) {
1272                         logger.debug("Added file to workbundle: {}", fileName);
1273                         paths.addFileName(fileName, getFileModificationDate(fileName), getFileSize(fileName));
1274                         bytesInBundle += next.length();
1275                         WorkSpace.this.filesProcessed++; // overall
1276                         fileCount++; // this loop
1277                         WorkSpace.this.bytesProcessed += next.length(); // overall
1278                     }
1279                     // if bundle is full, create a new empty and
1280                     // move it to the outbound queue.
1281                     if (!workbundleHasRoom(paths, bytesInBundle)) {
1282                         logger.debug("Workbundle full, adding it to outbound queue");
1283                         if (skipped < numberOfBundlesToSkipArg) {
1284                             skipped++;
1285                         } else {
1286                             addOutboundBundle(paths);
1287                             collected++;
1288                         }
1289                         // create new empty work bundle
1290                         paths = new WorkBundle(basePath);
1291                         paths.setPriority(dir.getPriority());
1292                         paths.setSimpleMode(getSimpleMode());
1293                         bytesInBundle = 0;
1294                     }
1295 
1296                 } // end while f.hasNext()
1297 
1298                 // Send residual files, not a complete set perhaps
1299                 if (paths.size() > 0) {
1300                     if (skipped < numberOfBundlesToSkipArg) {
1301                         logger.info("Skipping last bundle");
1302                     } else {
1303                         addOutboundBundle(paths);
1304                         collected++;
1305                     }
1306                 }
1307                 // clear the files done list
1308                 synchronized (WorkSpace.this.QLOCK) {
1309                     WorkSpace.this.filesDone.clear();
1310                 }
1311             } catch (Exception e) {
1312                 logger.error("System error", e);
1313                 return collected;
1314             }
1315 
1316             if (!WorkSpace.this.outbound.isEmpty()) {
1317                 logger.info("Processed {} files into {} bundles, skipping {} bundles.", fileCount, collected, skipped);
1318             }
1319             return collected;
1320         }
1321 
1322         /**
1323          * Convenience method to check if there is room in the work bundle to add more files.
1324          *
1325          * @param bundle the bundle to check
1326          * @param bytesInBundle the current count of bytes in the bundle.
1327          * @return true if bundle does not exceed max byte size, or max file count.
1328          */
1329         private boolean workbundleHasRoom(final WorkBundle bundle, final long bytesInBundle) {
1330 
1331             // must have a min size of 1 file, but cannot be over the
1332             // max byte size, or max file count
1333             boolean bReturn = (bundle.size() <= 0)
1334                     || (((WorkSpace.this.maxBundleSize <= -1) || (bytesInBundle < WorkSpace.this.maxBundleSize))
1335                             && ((WorkSpace.this.filesPerMessage <= -1) || (bundle
1336                                     .size() < WorkSpace.this.filesPerMessage)));
1337 
1338             logger.debug("workbundle has room = {}", bReturn);
1339             return bReturn;
1340         }
1341 
1342         /**
1343          * Check memory (heap) usage and wait for it to go below the threshold. We must be able to collect at least 500 file
1344          * bundles to trigger this mechanism.
1345          */
1346         protected void pauseCollector() {
1347             final int initialQueueSize = getOutboundQueueSize();
1348             if (initialQueueSize < 500) {
1349                 return;
1350             }
1351             final long intv = 30000;
1352             final MemoryMXBean mbean = ManagementFactory.getMemoryMXBean();
1353             MemoryUsage heap = mbean.getHeapMemoryUsage();
1354             int count = 0;
1355             while ((((double) heap.getUsed() / (double) heap.getCommitted()) > MEM_THRESHOLD) && (getOutboundQueueSize() > 500)) {
1356                 logger.debug("Collection memory threshold exceeded {}", heap);
1357                 try {
1358                     Thread.sleep(intv);
1359                 } catch (InterruptedException ex) {
1360                     Thread.currentThread().interrupt();
1361                 }
1362                 count++;
1363                 heap = mbean.getHeapMemoryUsage();
1364             }
1365 
1366             if (count > 0 && logger.isDebugEnabled()) {
1367                 logger.debug(
1368                         "Paused collector {} times for {}s waiting for memory usage to go below threshold {} resuming at {}, queueSize was/is={}/{}",
1369                         count, intv / 1000, MEM_THRESHOLD, heap, initialQueueSize, getOutboundQueueSize());
1370             }
1371         }
1372     }
1373 
1374     /**
1375      * Collect per pickup statistics for this run
1376      */
1377     public static class WorkSpaceStats {
1378         final Map<String, Integer> remoteMap = new HashMap<>();
1379         final Set<String> shutDownSent = new HashSet<>();
1380 
1381         /**
1382          * Increment the bundle count for the machine when it takes one
1383          *
1384          * @param machine the remote pickup
1385          */
1386         public void bump(final String machine) {
1387             Integer count = this.remoteMap.get(machine);
1388             if (count == null) {
1389                 count = 1;
1390             } else {
1391                 count = count + 1;
1392             }
1393             this.remoteMap.put(machine, count);
1394         }
1395 
1396         /**
1397          * Indicate that shutdown msg was sent to machine
1398          *
1399          * @param machine the remote name
1400          */
1401         public void shutDownSent(final String machine) {
1402             this.shutDownSent.add(machine);
1403         }
1404 
1405         /**
1406          * Count how many machines got shut down msg
1407          */
1408         public int getShutDownCount() {
1409             return this.shutDownSent.size();
1410         }
1411 
1412         /**
1413          * Iterate over set of machines used
1414          */
1415         public Iterator<String> machinesUsed() {
1416             return this.remoteMap.keySet().iterator();
1417         }
1418 
1419         /**
1420          * Count of machines used
1421          */
1422         public int getCountUsed(final String machine) {
1423             final Integer count = this.remoteMap.get(machine);
1424             return (count == null) ? 0 : count;
1425         }
1426     }
1427 
1428     /**
1429      * Watch the directory for changes to pickup up client places
1430      */
1431     public class WorkSpaceDirectoryWatcher extends DirectoryAdapter {
1432         /**
1433          * Watch the directory for registrations that match pattern
1434          *
1435          * @param pattern the pattern to match
1436          */
1437         public WorkSpaceDirectoryWatcher(final String pattern) {
1438             super(pattern);
1439             logger.debug("PickupClient pattern is {}", pattern);
1440         }
1441 
1442         /**
1443          * Accept registration notices that match our pattern
1444          *
1445          * @param observableKey the reporting directory
1446          * @param placeKey the key of the matching registered place
1447          */
1448         @Override
1449         public void placeRegistered(final String observableKey, final String placeKey) {
1450             final String k = KeyManipulator.removeExpense(placeKey);
1451             logger.debug("Registration message from {}", k);
1452             if (WorkSpace.this.pups.contains(k) && WorkSpace.this.useRetryStrategy) {
1453                 // This covers the case where the pickup dies and restarts
1454                 // before the Heartbeat mechanism figures out there was
1455                 // a problem.
1456                 logger.info("Already known pickup {} must be reinitialized to clear pending work.", k);
1457                 removePickUp(k);
1458             }
1459 
1460             if (!WorkSpace.this.pups.contains(k)) {
1461                 logger.info("New pickup place {}", k);
1462             }
1463 
1464             // add to list and maybe send open msg. Dup places
1465             // will not be added but might be re-notified
1466             addPickUp(k);
1467 
1468         }
1469 
1470         /**
1471          * Accept deregistration notices that match our pattern
1472          *
1473          * @param observableKey the reporting directory
1474          * @param placeKey the key of the matching deregistered place
1475          */
1476         @Override
1477         public void placeDeregistered(final String observableKey, final String placeKey) {
1478             final String k = KeyManipulator.removeExpense(placeKey);
1479             logger.debug("DeRegistration message from {}", k);
1480             if (!WorkSpace.this.pups.contains(k)) {
1481                 logger.info("Unknown pickup deregistered {}", k);
1482             } else {
1483                 logger.info("Pickup place {} is gone", k);
1484                 if (WorkSpace.this.useRetryStrategy) {
1485                     removePickUp(k);
1486                 }
1487             }
1488         }
1489     }
1490 }