1 package emissary.pickup;
2
3 import emissary.client.EmissaryResponse;
4 import emissary.command.BaseCommand;
5 import emissary.command.FeedCommand;
6 import emissary.command.ServerCommand;
7 import emissary.core.EmissaryException;
8 import emissary.core.Namespace;
9 import emissary.core.NamespaceException;
10 import emissary.directory.DirectoryAdapter;
11 import emissary.directory.DirectoryEntry;
12 import emissary.directory.DirectoryPlace;
13 import emissary.directory.EmissaryNode;
14 import emissary.directory.IDirectoryPlace;
15 import emissary.directory.KeyManipulator;
16 import emissary.pool.AgentPool;
17 import emissary.server.EmissaryServer;
18 import emissary.server.mvc.adapters.WorkSpaceAdapter;
19 import emissary.util.Version;
20 import emissary.util.io.FileFind;
21
22 import jakarta.annotation.Nullable;
23 import org.apache.hc.core5.http.HttpStatus;
24 import org.eclipse.jetty.server.Server;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 import java.io.File;
29 import java.lang.management.ManagementFactory;
30 import java.lang.management.MemoryMXBean;
31 import java.lang.management.MemoryUsage;
32 import java.nio.file.Files;
33 import java.nio.file.Paths;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.Collection;
37 import java.util.Collections;
38 import java.util.HashMap;
39 import java.util.HashSet;
40 import java.util.Iterator;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.PriorityQueue;
44 import java.util.Set;
45 import java.util.concurrent.CopyOnWriteArrayList;
46
47
48
49
50
51
52
53
54
55
56
57 public class WorkSpace implements Runnable {
58
59 protected static final Logger logger = LoggerFactory.getLogger(WorkSpace.class);
60
61 protected FeedCommand feedCommand;
62
63
64
65
66 protected List<String> pups = new CopyOnWriteArrayList<>();
67
68
69
70
71 protected String pattern = System.getProperty(CLZ + ".clientPattern", "*.FILE_PICK_UP_CLIENT.INPUT.*");
72
73
74 protected WorkSpaceDirectoryWatcher watcher;
75
76
77 protected static final String CLZ = WorkSpace.class.getName();
78 protected boolean wantDirectories = Boolean.getBoolean(CLZ + ".includeDirectories");
79 protected boolean debug = Boolean.getBoolean(CLZ + ".debug");
80 protected boolean simpleMode = false;
81 protected String outputRootPath = System.getProperty("outputRoot", null);
82 protected String eatPrefix = System.getProperty("eatPrefix", null);
83 protected int numberOfBundlesToSkip = Integer.getInteger(CLZ + ".skip", 0);
84 protected boolean skipDotFiles = Boolean.getBoolean(CLZ + ".skipDotFiles");
85 protected boolean loop = false;
86 protected boolean useRetryStrategy = false;
87 protected static final int MAX_BUNDLE_RETRIES = 5;
88 protected PriorityQueue<PriorityDirectory> myDirectories = new PriorityQueue<>();
89
90
91 protected WorkSpaceStats stats = new WorkSpaceStats();
92
93
94 @Nullable
95 protected ClientNotifier notifier = null;
96
97
98 protected boolean timeToQuit = false;
99 protected boolean collectorThreadHasQuit = false;
100 protected boolean jettyStartedHere = false;
101 protected static final float MEM_THRESHOLD = 0.80f;
102 protected long loopPauseTime = 60000L;
103 protected long pendingHangTime = 600000L;
104 protected static final long NOTIFIER_PAUSE_TIME = 1000L;
105 protected int retryCount = 0;
106 protected boolean useFileTimestamps = false;
107 @Nullable
108 protected String projectBase = null;
109
110
111
112
113
114 protected int filesPerMessage = Integer.getInteger(CLZ + ".filesPerBundle", 5);
115
116 protected long maxBundleSize = Long.getLong(CLZ + ".maxSizePerBundle", -1);
117
118
119 protected long filesProcessed = 0;
120 protected long bundlesProcessed = 0;
121 protected long bytesProcessed = 0;
122
123
124 protected String dataCaseId = System.getProperty("caseId", null);
125 protected boolean caseClosed = false;
126
127
128 protected PriorityQueue<WorkBundle> outbound = new PriorityQueue<>();
129
130
131
132
133 protected Map<String, WorkBundle> pending = new HashMap<>();
134
135
136
137 protected Map<String, Long> filesSeen = new HashMap<>();
138 protected Map<String, Long> filesDone = new HashMap<>();
139
140
141
142 @SuppressWarnings("ConstantField")
143 protected final Object QLOCK = new Object();
144
145
146 protected static final String DEFAULT_WORK_SPACE_NAME = "WorkSpace";
147 protected String workSpaceName = DEFAULT_WORK_SPACE_NAME;
148
149 protected String workSpaceUrl;
150 protected String workSpaceKey;
151
152
153
154
155 public static void main(final String[] args) {
156 try {
157 final WorkSpace ws = new WorkSpace(BaseCommand.parse(FeedCommand.class, args));
158 ws.run();
159 logger.info("Workspace has completed the mission [ +1 health ].");
160 ws.shutDown();
161 } catch (Exception e) {
162 logger.error("Bad commandline arguments, check the FeedCommand help", e);
163 }
164 System.exit(0);
165 }
166
167
168
169
170 @SuppressWarnings("CheckedExceptionNotThrown")
171 public WorkSpace() throws Exception {
172
173 }
174
175 public WorkSpace(FeedCommand feedCommand) {
176 this.feedCommand = feedCommand;
177
178 this.loop = this.feedCommand.isLoop();
179 this.setRetryStrategy(this.feedCommand.isRetry());
180 this.setFileTimestampUsage(this.feedCommand.isFileTimestamp());
181 this.workSpaceName = this.feedCommand.getWorkspaceName();
182 this.simpleMode = this.feedCommand.isSimple();
183 this.projectBase = this.feedCommand.getProjectBase().toAbsolutePath().toString();
184 this.pattern = this.feedCommand.getClientPattern();
185 this.outputRootPath = this.feedCommand.getOutputRoot();
186 this.eatPrefix = this.feedCommand.getEatPrefix();
187 this.filesPerMessage = this.feedCommand.getBundleSize();
188 this.dataCaseId = this.feedCommand.getCaseId();
189 this.setSkipDotFiles(this.feedCommand.isSkipDotFile());
190 this.wantDirectories = this.feedCommand.isIncludeDirs();
191 this.setSimpleMode(this.feedCommand.isSimple());
192 this.myDirectories.addAll(this.feedCommand.getPriorityDirectories());
193
194 if (null != this.feedCommand.getSort()) {
195 this.outbound = new PriorityQueue<>(11, this.feedCommand.getSort());
196 }
197
198 startJetty();
199 register();
200 initializeService();
201 }
202
203 protected void startJetty() {
204 if (!EmissaryServer.isInitialized() || !EmissaryServer.getInstance().isServerRunning()) {
205
206 List<String> args = new ArrayList<>();
207 args.add("-b");
208 args.add(projectBase);
209 args.add("--agents");
210 args.add("1");
211 args.add("-h");
212 args.add(this.feedCommand.getHost());
213 args.add("-p");
214 args.add(String.valueOf(this.feedCommand.getPort()));
215
216 args.add("-m");
217 args.add("cluster");
218 args.add("--flavor");
219 args.add(this.feedCommand.getFlavor());
220 if (this.feedCommand.isSslEnabled()) {
221 args.add("--ssl");
222 }
223 if (this.feedCommand.isSniHostCheckDisabled()) {
224 args.add("--disableSniHostCheck");
225 }
226 try {
227
228 ServerCommand cmd = BaseCommand.parse(ServerCommand.class, args);
229 Server server = EmissaryServer.init(cmd).startServer();
230 final boolean jettyStatus = server.isStarted();
231 if (!jettyStatus) {
232 logger.error("Cannot start the Workspace due to EmissaryServer not starting!");
233 } else {
234 logger.info("Workspace is up and running");
235 this.jettyStartedHere = true;
236 }
237 } catch (EmissaryException e) {
238 logger.error("Error starting EmissaryServer! WorkSpace will not start!", e);
239 }
240 } else {
241 logger.info("EmissaryServer is already running, Workspace should be up.");
242 }
243 }
244
245 protected void initializeService() {
246
247 try {
248 this.pups.addAll(getPickUpClients(this.pattern));
249 logger.info("Found {} initial clients using {} in {}", this.pups.size(), this.pattern, getKey());
250 logger.debug("Initial pickups : {}", this.pups);
251 } catch (EmissaryException ex) {
252 logger.error("Cannot lookup pickup places using pattern {} in {}", this.pattern, getKey(), ex);
253 }
254
255
256
257 this.watcher = new WorkSpaceDirectoryWatcher(this.pattern);
258 try {
259 DirectoryAdapter.register(this.watcher);
260 } catch (EmissaryException ex) {
261 logger.error("Cannot register directory observer", ex);
262 }
263
264
265 initializeCase();
266 }
267
268
269
270
271 @Override
272 public void run() {
273
274 startCollector();
275
276
277 startNotifier();
278
279
280 monitorProgress();
281
282 logger.debug("Ending the WorkSpace run method");
283 }
284
285
286
287
288 public void stop() {
289 this.timeToQuit = true;
290 }
291
292
293
294
295 @SuppressWarnings("CatchingUnchecked")
296 public void shutDown() {
297 stop();
298 if (this.jettyStartedHere) {
299 final EmissaryNode node = EmissaryServer.getInstance().getNode();
300 if (node.isValid()) {
301 try {
302 EmissaryServer.getInstance().stop();
303 } catch (Exception ex) {
304 logger.error("Jetty cannot be shutdown", ex);
305 }
306 }
307 try {
308 AgentPool.lookup().close();
309 } catch (NamespaceException ex) {
310 logger.debug("Agent pool namespace lookup failed", ex);
311 }
312 }
313 }
314
315
316
317
318
319
320 public void setPendingHangTime(final long pendingHangTime) {
321 this.pendingHangTime = pendingHangTime;
322 }
323
324
325
326
327
328
329 public void setPauseTime(final long pauseTimeMillis) {
330 this.loopPauseTime = pauseTimeMillis;
331 }
332
333
334
335
336 public void setLoop(final boolean on) {
337 this.loop = on;
338 }
339
340
341
342
343 public boolean getLoop() {
344 return this.loop;
345 }
346
347
348
349
350 public void setFileTimestampUsage(final boolean value) {
351 this.useFileTimestamps = value;
352 }
353
354
355
356
357 public boolean getFileTimestampUsage() {
358 return this.useFileTimestamps;
359 }
360
361
362
363
364 public void setRetryStrategy(final boolean on) {
365 this.useRetryStrategy = on;
366 }
367
368
369
370
371
372
373 public boolean getRetryStrategy() {
374 return this.useRetryStrategy;
375 }
376
377
378
379
380 public void addDirectory(final String dir, final int priority) {
381 addDirectory(new PriorityDirectory(dir, priority));
382 }
383
384
385
386
387 public void addDirectory(final PriorityDirectory dir) {
388 this.myDirectories.add(dir);
389 logger.debug("Adding input directory {}", dir);
390 }
391
392 public List<String> getDirectories() {
393 final List<String> l = new ArrayList<>();
394 final PriorityDirectory[] pds = this.myDirectories.toArray(new PriorityDirectory[0]);
395 Arrays.sort(pds);
396 for (final PriorityDirectory pd : pds) {
397 l.add(pd.toString());
398 }
399 return l;
400 }
401
402
403
404
405
406
407
408 public void setDirectoryProcessing(final boolean on) {
409 this.wantDirectories = on;
410 }
411
412
413
414
415
416
417 public void setEatPrefix(final String prefix) {
418 logger.debug("Reset eatPrefix to {}", prefix);
419 this.eatPrefix = prefix;
420 normalizeEatPrefix();
421 }
422
423
424
425
426 protected void normalizeEatPrefix() {
427 if (this.eatPrefix != null && this.eatPrefix.contains("//")) {
428 this.eatPrefix = this.eatPrefix.replaceAll("/+", "/");
429 }
430 }
431
432
433
434
435
436
437 public void setOutputRoot(final String value) {
438 logger.debug("Reset outputRoot to {}", value);
439 this.outputRootPath = value;
440 }
441
442
443
444
445 public String getOutputRoot() {
446 return this.outputRootPath;
447 }
448
449
450
451
452
453
454 public void setCaseId(final String value) {
455 logger.debug("Reset caseId to {}", value);
456 this.dataCaseId = value;
457 }
458
459
460
461
462
463
464 public void setSkipDotFiles(final boolean value) {
465 this.skipDotFiles = value;
466 }
467
468
469
470
471
472
473 public void setDebugFlag(final boolean value) {
474 this.debug = value;
475 }
476
477
478
479
480
481
482 public void setSimpleMode(final boolean value) {
483 this.simpleMode = value;
484 }
485
486
487
488
489 public boolean getSimpleMode() {
490 return this.simpleMode;
491 }
492
493
494
495
496
497
498
499 public void setPattern(@Nullable final String thePattern) throws Exception {
500
501 if ((this.pattern != null) && this.pattern.equals(thePattern)) {
502 logger.debug("The pattern is already set to {}", thePattern);
503 return;
504 }
505
506 this.pattern = thePattern;
507
508
509 logger.warn("Clearing client list so we can look for new pattern {} in {}", thePattern, getKey());
510 this.pups.clear();
511
512
513 this.pups.addAll(getPickUpClients(this.pattern));
514
515
516 if (this.watcher != null) {
517 DirectoryAdapter.remove(this.watcher);
518 }
519 this.watcher = new WorkSpaceDirectoryWatcher(this.pattern);
520 DirectoryAdapter.register(this.watcher);
521 }
522
523
524
525
526 protected void register() {
527 final EmissaryNode node = EmissaryServer.getInstance().getNode();
528 if (node.isValid()) {
529 this.workSpaceUrl = node.getNodeScheme() + "://" + node.getNodeName() + ":" + node.getNodePort() + "/" + this.workSpaceName;
530 } else {
531 this.workSpaceUrl = "http://localhost:8001/" + this.workSpaceName;
532 logger.warn("WorkSpace is not running in a valid emissary node. Using URL {}", this.workSpaceUrl);
533 }
534 this.workSpaceKey = "WORKSPACE.WORK_SPACE.INPUT." + this.workSpaceUrl;
535
536 normalizeEatPrefix();
537
538
539
540 Namespace.bind(this.workSpaceUrl, this);
541 }
542
543
544
545
546
547
548
549
550 protected Set<String> getPickUpClients(final String thePattern) throws EmissaryException {
551 final Set<String> thePups = new HashSet<>();
552 final IDirectoryPlace dir = DirectoryPlace.lookup();
553 final List<DirectoryEntry> list = dir.getMatchingEntries(thePattern);
554 for (final DirectoryEntry d : list) {
555 thePups.add(d.getKey());
556 logger.info("Adding pickup client {}", d.getKey());
557 }
558 logger.debug("Found {} initial pickup client entries", thePups.size());
559 return thePups;
560 }
561
562
563
564
565 public void startCollector() {
566 for (final PriorityDirectory pd : this.myDirectories) {
567 final WorkSpaceCollector collector = new WorkSpaceCollector(pd);
568 final Thread collectorThread = new Thread(collector, "WorkSpace Collector " + pd);
569 collectorThread.setDaemon(true);
570 collectorThread.start();
571 logger.debug("Started WorkSpace Collector thread on {}", pd);
572 }
573 }
574
575
576
577
578 public void startNotifier() {
579 this.notifier = new ClientNotifier();
580 final Thread notifierThread = new Thread(this.notifier, "WorkSpace Client Notifier");
581 notifierThread.setDaemon(true);
582 notifierThread.start();
583 logger.debug("Started Client Notifier thread");
584 }
585
586
587
588
589 protected void rotatePickUps() {
590
591 Collections.rotate(this.pups, -1);
592 }
593
594
595
596
597
598
599 protected int notifyPickUps() {
600 int successCount = 0;
601 for (final String pup : this.pups) {
602 final boolean status = notifyPickUp(pup);
603 if (status) {
604 successCount++;
605 }
606 if (getOutboundQueueSize() == 0) {
607 break;
608 }
609 }
610 logger.debug("Notified {} of {} pickup places", successCount, this.pups.size());
611 return successCount;
612 }
613
614
615
616
617 protected void addPickUp(final String pup) {
618 if (!this.pups.contains(pup)) {
619 this.pups.add(pup);
620 if (logger.isDebugEnabled()) {
621 logger.debug("Adding pickup {}, new size={}: {}", pup, this.pups.size(), this.pups);
622 }
623 } else {
624 logger.debug("Not adding {} already on list size {}", pup, this.pups.size());
625 }
626 }
627
628
629
630
631
632
633 protected boolean notifyPickUp(final String pup) {
634 final WorkSpaceAdapter tpa = new WorkSpaceAdapter();
635 logger.debug("Sending notice to {}", pup);
636
637 boolean notified = false;
638 int tryCount = 0;
639
640 while (!notified && tryCount < 5) {
641 final EmissaryResponse status = tpa.outboundOpenWorkSpace(pup, this.workSpaceKey);
642
643
644 if (status.getStatus() != HttpStatus.SC_OK) {
645 logger.warn("Failed to notify {} on try {}: {}", pup, tryCount, status.getContentString());
646 try {
647 Thread.sleep((tryCount + 1) * 100L);
648 } catch (InterruptedException ignore) {
649 Thread.currentThread().interrupt();
650 }
651 } else {
652 notified = true;
653 }
654 tryCount++;
655 }
656
657 if (logger.isInfoEnabled()) {
658 logger.info("Notified {} in {} attempts: {}", pup, tryCount, notified ? "SUCCESS" : "FAILED");
659 }
660
661 return notified;
662 }
663
664
665
666
667 public String getKey() {
668 return this.workSpaceKey;
669 }
670
671
672
673
674 public String getNamespaceName() {
675 return this.workSpaceName;
676 }
677
678
679
680
681
682
683 protected void removePickUp(final String remoteKey) {
684 this.pups.remove(remoteKey);
685 if (logger.isDebugEnabled()) {
686 logger.debug("Removed pickup {}, size={}: {}", remoteKey, this.pups.size(), this.pups);
687 }
688 int pendCount = 0;
689 final String remoteName = KeyManipulator.getServiceHost(remoteKey);
690 synchronized (this.QLOCK) {
691
692 for (Iterator<String> i = this.pending.keySet().iterator(); i.hasNext();) {
693 final String id = i.next();
694 final WorkBundle wb = this.pending.get(id);
695 if (remoteName.equals(wb.getSentTo())) {
696 i.remove();
697 wb.setSentTo(null);
698 this.retryCount++;
699 if (wb.incrementErrorCount() <= MAX_BUNDLE_RETRIES) {
700 logger.debug("Removing pending bundle {} from pending pool, re-adding to outbound with errorCount={}", wb.getBundleId(),
701 wb.getErrorCount());
702 addOutboundBundle(wb);
703 pendCount++;
704
705
706 this.bundlesProcessed--;
707 } else {
708 logger.error("Bundle {} associated with too many failures, permanently discarding", wb);
709 }
710 }
711 }
712 }
713 if (pendCount > 0) {
714 logger.info("Moved {} items back to outbound queue from {}", pendCount, remoteName);
715 }
716 }
717
718
719
720
721
722
723
724
725 public WorkBundle take(final String remoteKey) {
726 final String remoteName = KeyManipulator.getServiceHost(remoteKey);
727 WorkBundle item;
728 synchronized (this.QLOCK) {
729 if (getOutboundQueueSize() == 0) {
730
731 logger.info("Sent shutdown msg to {}", remoteName);
732 this.stats.shutDownSent(remoteName);
733 item = new WorkBundle();
734 } else {
735
736
737
738 this.stats.bump(remoteName);
739 item = this.outbound.poll();
740 item.setSentTo(remoteName);
741 this.pending.put(item.getBundleId(), item);
742 logger.info("Gave bundle {} to {}", item, remoteName);
743 final WorkBundle nextItem = this.outbound.peek();
744 if (nextItem != null && logger.isInfoEnabled()) {
745 logger.info("After take: new top differs to prior by [oldest/youngest/size]=[{}/{}/{}]",
746 nextItem.getOldestFileModificationTime() - item.getOldestFileModificationTime(),
747 nextItem.getYoungestFileModificationTime() - item.getYoungestFileModificationTime(),
748 nextItem.getTotalFileSize() - item.getTotalFileSize());
749 }
750 }
751 }
752 return item;
753 }
754
755
756
757
758
759
760 protected void addOutboundBundle(final WorkBundle wb) {
761 int sz;
762 synchronized (this.QLOCK) {
763 this.bundlesProcessed++;
764 sz = this.outbound.size();
765 this.outbound.add(wb);
766 addFilesSeen(wb.getFileNameList());
767 }
768
769 if (logger.isInfoEnabled()) {
770 logger.info("Adding workbundle {} size {} filesSeen {}", wb, sz + 1, this.filesSeen.size());
771 }
772 }
773
774
775
776
777
778
779 @Deprecated
780 @SuppressWarnings("AvoidObjectArrays")
781 public String[] showPendingItems() {
782 return showPendingItemsList().toArray(new String[0]);
783 }
784
785
786
787
788 public List<String> showPendingItemsList() {
789 final List<String> list = new ArrayList<>();
790 synchronized (this.QLOCK) {
791 for (final Map.Entry<String, WorkBundle> entry : this.pending.entrySet()) {
792 list.add(entry.getValue().toString());
793 }
794 }
795 return list;
796 }
797
798
799
800
801
802
803 public int clearPendingQueue() {
804 final int size = getPendingQueueSize();
805
806 if (size > 0) {
807 synchronized (this.QLOCK) {
808 logger.debug("Clearing pending queue of {} items", size);
809 for (final Map.Entry<String, WorkBundle> entry : this.pending.entrySet()) {
810 removeFilesSeen(entry.getValue().getFileNameList());
811 }
812 this.pending.clear();
813 logger.debug("Cleared filesSeen leaving {} items", this.filesSeen.size());
814 }
815 }
816 return size;
817 }
818
819
820
821
822
823
824
825
826
827
828 public boolean workCompleted(final String remoteName, final String bundleId, final boolean itWorked) {
829 WorkBundle item;
830
831 synchronized (this.QLOCK) {
832 item = this.pending.remove(bundleId);
833 if (item != null) {
834 addFilesDone(item.getFileNameList());
835 removeFilesSeen(item.getFileNameList());
836 logger.debug("Removed {} from filesSeen leaving {}", item.size(), this.filesSeen.size());
837 }
838 }
839 if (item == null) {
840 logger.info("Unknown bundle completed: {}", bundleId);
841 } else if (!itWorked) {
842 item.setSentTo(null);
843 if (item.incrementErrorCount() > MAX_BUNDLE_RETRIES) {
844 logger.error("Bundle {} has too many errors, permanently discarded", item);
845 } else {
846 addOutboundBundle(item);
847 }
848 }
849 if (logger.isDebugEnabled()) {
850 logger.debug("Bundle {} completed by {}{}", bundleId, remoteName,
851 itWorked ? "" : (" but failed for the " + (item != null ? item.getErrorCount() : -1) + " time"));
852 }
853 return item != null;
854 }
855
856
857
858
859 protected void initializeCase() {
860
861 logger.debug("In base initializeCase implementation (do nothing)");
862 }
863
864
865
866
867 protected void closeCase() {
868
869 this.caseClosed = true;
870 logger.debug("In base closeCase implementation (do nothing)");
871 }
872
873
874
875
876
877
878 protected void processDirectory(final File dir) {
879
880 logger.debug("got a directory processDirectory({})", dir);
881 }
882
883
884
885
886
887
888 protected void addFilesSeen(final Collection<String> fileNames) {
889 for (final String fn : fileNames) {
890 this.filesSeen.put(fn, getFileModificationDate(fn));
891 }
892 }
893
894
895
896
897
898
899 protected void addFilesDone(final Collection<String> fileNames) {
900 for (final String fn : fileNames) {
901 this.filesDone.put(fn, getFileModificationDate(fn));
902 }
903 }
904
905
906
907
908
909
910 protected void removeFilesSeen(final Collection<String> fileNames) {
911 for (final String fn : fileNames) {
912 this.filesSeen.remove(fn);
913 }
914 }
915
916
917
918
919
920
921
922 protected long getFileModificationDate(final String fn) {
923 return new File(fn).lastModified();
924 }
925
926 protected long getFileSize(final String fn) {
927 return new File(fn).length();
928 }
929
930
931
932
933
934 protected void monitorProgress() {
935 long outboundEmptyTimestamp = -1L;
936
937
938
939 while (true) {
940 final int outboundSize = getOutboundQueueSize();
941 int pendingSize = getPendingQueueSize();
942 final boolean reallyQuit = this.timeToQuit && (outboundSize == 0) && (pendingSize == 0);
943
944
945 if (outboundSize == 0 && outboundEmptyTimestamp == -1L) {
946 outboundEmptyTimestamp = System.currentTimeMillis();
947 } else if (outboundSize > 0 && outboundEmptyTimestamp > 0L) {
948 outboundEmptyTimestamp = -1L;
949 }
950
951
952 if ((outboundSize == 0) && !this.loop && ((outboundEmptyTimestamp + this.pendingHangTime) < System.currentTimeMillis())) {
953 if (logger.isInfoEnabled()) {
954 logger.info("Giving up on {} items due to timeout", pendingSize);
955 for (final Map.Entry<String, WorkBundle> entry : this.pending.entrySet()) {
956 logger.info("Pending item {}: {}", entry.getKey(), entry.getValue());
957 }
958 }
959 clearPendingQueue();
960 pendingSize = 0;
961 }
962
963
964 if (outboundSize + pendingSize == 0) {
965 if (reallyQuit) {
966 break;
967 }
968 publishStats();
969 }
970
971
972 try {
973 for (int si = 0; si < 3000; si++) {
974 Thread.sleep(10L);
975 if (reallyQuit) {
976 break;
977 }
978 }
979 } catch (InterruptedException ex) {
980 Thread.currentThread().interrupt();
981 }
982
983 if (!this.timeToQuit) {
984 publishStats();
985 }
986 }
987
988
989 closeCase();
990 }
991
992
993
994
995 public void publishStats() {
996 logger.info(getStatsMessage());
997 for (Iterator<String> i = this.stats.machinesUsed(); i.hasNext();) {
998 final String machine = i.next();
999 logger.info("Machine {} took {} bundles", machine, this.stats.getCountUsed(machine));
1000 }
1001 }
1002
1003
1004
1005
1006 public String getStatsMessage() {
1007 final int outboundSize = getOutboundQueueSize();
1008 final int pendingSize = getPendingQueueSize();
1009
1010 return "WorkSpace has outbound=" + outboundSize + ", pending=" + pendingSize + ", total bundles / files / bytes = " + this.bundlesProcessed
1011 + " / " + this.filesProcessed + " / " + this.bytesProcessed + " , #clients=" + getPickUpPlaceCount();
1012 }
1013
1014
1015
1016
1017 public long getFilesProcessed() {
1018 return this.filesProcessed;
1019 }
1020
1021
1022
1023
1024 public long getBytesProcessed() {
1025 return this.bytesProcessed;
1026 }
1027
1028
1029
1030
1031 public int getPickUpPlaceCount() {
1032 return this.pups.size();
1033 }
1034
1035
1036
1037
1038 public long getBundlesProcessed() {
1039 return this.bundlesProcessed;
1040 }
1041
1042
1043
1044
1045 public int getOutboundQueueSize() {
1046 synchronized (this.QLOCK) {
1047 return this.outbound.size();
1048 }
1049 }
1050
1051 public int getRetriedCount() {
1052 return this.retryCount;
1053 }
1054
1055
1056
1057
1058 public int getPendingQueueSize() {
1059 synchronized (this.QLOCK) {
1060 return this.pending.size();
1061 }
1062 }
1063
1064
1065
1066
1067
1068
1069 protected String getVersionString() {
1070 return "Emissary version: " + new Version();
1071 }
1072
1073 public class ClientNotifier implements Runnable {
1074
1075
1076
1077 public ClientNotifier() {}
1078
1079 @Override
1080 public void run() {
1081 while (true) {
1082 final int qsize = getOutboundQueueSize();
1083 if (qsize > 0) {
1084 final long start = System.currentTimeMillis();
1085 if (logger.isDebugEnabled()) {
1086 logger.debug("ClientNotification starting with #clients={} outbound={}", getPickUpPlaceCount(), qsize);
1087 }
1088 notifyPickUps();
1089 if (logger.isDebugEnabled()) {
1090 final long end = System.currentTimeMillis();
1091 logger.debug("ClientNotification took {}s for #clients={}", (end - start) / 1000.0, getPickUpPlaceCount());
1092 }
1093 }
1094
1095 try {
1096 Thread.sleep(NOTIFIER_PAUSE_TIME);
1097 rotatePickUps();
1098 } catch (InterruptedException ignore) {
1099 Thread.currentThread().interrupt();
1100 }
1101
1102 final int outboundSize = getOutboundQueueSize();
1103 final int pendingSize = getPendingQueueSize();
1104 if (WorkSpace.this.timeToQuit && (outboundSize == 0) && (pendingSize == 0) && WorkSpace.this.collectorThreadHasQuit) {
1105 break;
1106 }
1107 }
1108
1109 logger.debug("Off the end of the ClientNotifier run loop");
1110 }
1111 }
1112
1113
1114
1115
1116 public class WorkSpaceCollector implements Runnable {
1117
1118 protected PriorityDirectory myDirectory;
1119
1120
1121
1122
1123 public WorkSpaceCollector(final PriorityDirectory myDirectory) {
1124 this.myDirectory = myDirectory;
1125 }
1126
1127
1128
1129
1130
1131 @Override
1132 public void run() {
1133 long versionOutputTime = System.currentTimeMillis();
1134 long start;
1135 long stop;
1136 long minFileTime = 0L;
1137
1138
1139 long lastFileCollect = 0L;
1140 int loopCount = 0;
1141
1142 logger.info("Running Workspace from {}", getVersionString());
1143
1144 do {
1145 start = System.currentTimeMillis();
1146
1147 if (start - versionOutputTime > 3600000) {
1148 logger.info("Continuing Workspace from {}", getVersionString());
1149 versionOutputTime = start;
1150 }
1151
1152 final WorkBundle paths = new WorkBundle(WorkSpace.this.outputRootPath, WorkSpace.this.eatPrefix);
1153 paths.setCaseId(WorkSpace.this.dataCaseId);
1154 paths.setSimpleMode(getSimpleMode());
1155
1156 logger.debug("Processing files in {}", this.myDirectory.getDirectoryName());
1157
1158 final int collectCount =
1159 collectFiles(this.myDirectory, WorkSpace.this.wantDirectories, paths, WorkSpace.this.numberOfBundlesToSkip, minFileTime,
1160 WorkSpace.this.skipDotFiles);
1161
1162
1163
1164 if (WorkSpace.this.useFileTimestamps) {
1165 lastFileCollect = System.currentTimeMillis();
1166 }
1167 stop = System.currentTimeMillis();
1168 loopCount++;
1169
1170
1171 WorkSpace.this.numberOfBundlesToSkip = 0;
1172
1173 logger.info("Collected {} file bundles in {}s in loop iteration {}, {} items in outbound queue", collectCount,
1174 (stop - start) / 1000.0, loopCount, WorkSpace.this.outbound.size());
1175
1176 if ((collectCount == 0) && WorkSpace.this.loop) {
1177
1178 try {
1179 Thread.sleep(WorkSpace.this.loopPauseTime);
1180 } catch (InterruptedException ioex) {
1181 Thread.currentThread().interrupt();
1182 }
1183 continue;
1184 }
1185
1186
1187 if (WorkSpace.this.useFileTimestamps) {
1188 minFileTime = lastFileCollect;
1189 }
1190
1191 } while (WorkSpace.this.loop && !WorkSpace.this.timeToQuit);
1192
1193 logger.debug("Off the end of the WorkSpaceCollector run method");
1194 WorkSpace.this.collectorThreadHasQuit = true;
1195 }
1196
1197
1198
1199
1200
1201
1202 protected int collectFiles(final PriorityDirectory dir, final boolean wantDirectories, final WorkBundle basePath,
1203 final int numberOfBundlesToSkipArg, final long minFileTime, final boolean skipDotFilesArg) {
1204 int skipped = 0;
1205 int collected = 0;
1206 int fileCount = 0;
1207 long bytesInBundle = 0;
1208
1209 try {
1210 int ffOptions = FileFind.FILES_FLAG;
1211 if (wantDirectories) {
1212 ffOptions |= FileFind.DIRECTORIES_FLAG;
1213 }
1214 final FileFind ff = new FileFind(ffOptions);
1215 final Iterator<?> f = ff.find(dir.getDirectoryName());
1216
1217 WorkBundle paths = new WorkBundle(basePath);
1218 paths.setPriority(dir.getPriority());
1219 paths.setSimpleMode(getSimpleMode());
1220
1221 while (f.hasNext()) {
1222
1223
1224
1225 pauseCollector();
1226
1227 final File next = (File) f.next();
1228 final String fileName = next.getPath();
1229
1230
1231
1232 if (next.isDirectory() && numberOfBundlesToSkipArg == 0) {
1233 logger.debug("Doing directory {}", fileName);
1234 processDirectory(next);
1235 continue;
1236 }
1237
1238
1239 if (!next.isFile() && !next.canRead()) {
1240 logger.debug("Cannot access file: {}", fileName);
1241 continue;
1242 }
1243
1244
1245
1246 if (skipDotFilesArg && Files.isHidden(Paths.get(fileName))) {
1247 logger.debug("Skipping dot file {}", fileName);
1248 continue;
1249 }
1250
1251
1252
1253 if (next.lastModified() < minFileTime) {
1254 continue;
1255 }
1256
1257 synchronized (WorkSpace.this.QLOCK) {
1258 if (WorkSpace.this.filesDone.containsKey(fileName)) {
1259 WorkSpace.this.filesDone.remove(fileName);
1260 continue;
1261 } else if (WorkSpace.this.filesSeen.containsKey(fileName)
1262 && WorkSpace.this.filesSeen.get(fileName) == next.lastModified()) {
1263 logger.debug("Skipping file already seen {}, touch file to force add", fileName);
1264 continue;
1265 }
1266 }
1267
1268 logger.debug("Adding filename to bundle {}", fileName);
1269
1270
1271 if (workbundleHasRoom(paths, bytesInBundle)) {
1272 logger.debug("Added file to workbundle: {}", fileName);
1273 paths.addFileName(fileName, getFileModificationDate(fileName), getFileSize(fileName));
1274 bytesInBundle += next.length();
1275 WorkSpace.this.filesProcessed++;
1276 fileCount++;
1277 WorkSpace.this.bytesProcessed += next.length();
1278 }
1279
1280
1281 if (!workbundleHasRoom(paths, bytesInBundle)) {
1282 logger.debug("Workbundle full, adding it to outbound queue");
1283 if (skipped < numberOfBundlesToSkipArg) {
1284 skipped++;
1285 } else {
1286 addOutboundBundle(paths);
1287 collected++;
1288 }
1289
1290 paths = new WorkBundle(basePath);
1291 paths.setPriority(dir.getPriority());
1292 paths.setSimpleMode(getSimpleMode());
1293 bytesInBundle = 0;
1294 }
1295
1296 }
1297
1298
1299 if (paths.size() > 0) {
1300 if (skipped < numberOfBundlesToSkipArg) {
1301 logger.info("Skipping last bundle");
1302 } else {
1303 addOutboundBundle(paths);
1304 collected++;
1305 }
1306 }
1307
1308 synchronized (WorkSpace.this.QLOCK) {
1309 WorkSpace.this.filesDone.clear();
1310 }
1311 } catch (Exception e) {
1312 logger.error("System error", e);
1313 return collected;
1314 }
1315
1316 if (!WorkSpace.this.outbound.isEmpty()) {
1317 logger.info("Processed {} files into {} bundles, skipping {} bundles.", fileCount, collected, skipped);
1318 }
1319 return collected;
1320 }
1321
1322
1323
1324
1325
1326
1327
1328
1329 private boolean workbundleHasRoom(final WorkBundle bundle, final long bytesInBundle) {
1330
1331
1332
1333 boolean bReturn = (bundle.size() <= 0)
1334 || (((WorkSpace.this.maxBundleSize <= -1) || (bytesInBundle < WorkSpace.this.maxBundleSize))
1335 && ((WorkSpace.this.filesPerMessage <= -1) || (bundle
1336 .size() < WorkSpace.this.filesPerMessage)));
1337
1338 logger.debug("workbundle has room = {}", bReturn);
1339 return bReturn;
1340 }
1341
1342
1343
1344
1345
1346 protected void pauseCollector() {
1347 final int initialQueueSize = getOutboundQueueSize();
1348 if (initialQueueSize < 500) {
1349 return;
1350 }
1351 final long intv = 30000;
1352 final MemoryMXBean mbean = ManagementFactory.getMemoryMXBean();
1353 MemoryUsage heap = mbean.getHeapMemoryUsage();
1354 int count = 0;
1355 while ((((double) heap.getUsed() / (double) heap.getCommitted()) > MEM_THRESHOLD) && (getOutboundQueueSize() > 500)) {
1356 logger.debug("Collection memory threshold exceeded {}", heap);
1357 try {
1358 Thread.sleep(intv);
1359 } catch (InterruptedException ex) {
1360 Thread.currentThread().interrupt();
1361 }
1362 count++;
1363 heap = mbean.getHeapMemoryUsage();
1364 }
1365
1366 if (count > 0 && logger.isDebugEnabled()) {
1367 logger.debug(
1368 "Paused collector {} times for {}s waiting for memory usage to go below threshold {} resuming at {}, queueSize was/is={}/{}",
1369 count, intv / 1000, MEM_THRESHOLD, heap, initialQueueSize, getOutboundQueueSize());
1370 }
1371 }
1372 }
1373
1374
1375
1376
1377 public static class WorkSpaceStats {
1378 final Map<String, Integer> remoteMap = new HashMap<>();
1379 final Set<String> shutDownSent = new HashSet<>();
1380
1381
1382
1383
1384
1385
1386 public void bump(final String machine) {
1387 Integer count = this.remoteMap.get(machine);
1388 if (count == null) {
1389 count = 1;
1390 } else {
1391 count = count + 1;
1392 }
1393 this.remoteMap.put(machine, count);
1394 }
1395
1396
1397
1398
1399
1400
1401 public void shutDownSent(final String machine) {
1402 this.shutDownSent.add(machine);
1403 }
1404
1405
1406
1407
1408 public int getShutDownCount() {
1409 return this.shutDownSent.size();
1410 }
1411
1412
1413
1414
1415 public Iterator<String> machinesUsed() {
1416 return this.remoteMap.keySet().iterator();
1417 }
1418
1419
1420
1421
1422 public int getCountUsed(final String machine) {
1423 final Integer count = this.remoteMap.get(machine);
1424 return (count == null) ? 0 : count;
1425 }
1426 }
1427
1428
1429
1430
1431 public class WorkSpaceDirectoryWatcher extends DirectoryAdapter {
1432
1433
1434
1435
1436
1437 public WorkSpaceDirectoryWatcher(final String pattern) {
1438 super(pattern);
1439 logger.debug("PickupClient pattern is {}", pattern);
1440 }
1441
1442
1443
1444
1445
1446
1447
1448 @Override
1449 public void placeRegistered(final String observableKey, final String placeKey) {
1450 final String k = KeyManipulator.removeExpense(placeKey);
1451 logger.debug("Registration message from {}", k);
1452 if (WorkSpace.this.pups.contains(k) && WorkSpace.this.useRetryStrategy) {
1453
1454
1455
1456 logger.info("Already known pickup {} must be reinitialized to clear pending work.", k);
1457 removePickUp(k);
1458 }
1459
1460 if (!WorkSpace.this.pups.contains(k)) {
1461 logger.info("New pickup place {}", k);
1462 }
1463
1464
1465
1466 addPickUp(k);
1467
1468 }
1469
1470
1471
1472
1473
1474
1475
1476 @Override
1477 public void placeDeregistered(final String observableKey, final String placeKey) {
1478 final String k = KeyManipulator.removeExpense(placeKey);
1479 logger.debug("DeRegistration message from {}", k);
1480 if (!WorkSpace.this.pups.contains(k)) {
1481 logger.info("Unknown pickup deregistered {}", k);
1482 } else {
1483 logger.info("Pickup place {} is gone", k);
1484 if (WorkSpace.this.useRetryStrategy) {
1485 removePickUp(k);
1486 }
1487 }
1488 }
1489 }
1490 }