1 package emissary.output.roller.journal;
2
3 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory;
5
6 import java.io.IOException;
7 import java.nio.channels.ClosedChannelException;
8 import java.nio.file.Path;
9 import java.nio.file.Paths;
10 import java.util.ArrayDeque;
11 import java.util.Deque;
12 import java.util.UUID;
13 import java.util.concurrent.locks.Condition;
14 import java.util.concurrent.locks.ReentrantLock;
15 import javax.annotation.Nullable;
16
17
18
19
20
21
22 public class JournaledChannelPool implements AutoCloseable {
23 public static final String EXTENSION = ".bgpart";
24 private static final Logger LOG = LoggerFactory.getLogger(JournaledChannelPool.class);
25 public static final int DEFAULT_MAX = 10;
26 private final ReentrantLock lock = new ReentrantLock();
27 private final Condition freeCondition = this.lock.newCondition();
28 final int max;
29 final Path directory;
30 final String key;
31 private final Deque<JournaledChannel> free = new ArrayDeque<>();
32 private int created;
33 @Nullable
34 private JournaledChannel[] allchannels;
35
36 @SuppressWarnings("CheckedExceptionNotThrown")
37 public JournaledChannelPool(final Path directory, final String key, final int max) throws IOException {
38 this.max = max;
39 this.directory = directory;
40 this.key = key;
41 this.allchannels = new JournaledChannel[max];
42 }
43
44 int getFreeSize() {
45 return this.free.size();
46 }
47
48 int getCreatedCount() {
49 return this.created;
50 }
51
52
53
54
55
56
57 public String getKey() {
58 return this.key;
59 }
60
61
62
63
64
65
66
67
68
69 public KeyedOutput getFree() throws InterruptedException, IOException {
70 JournaledChannel jc = null;
71 this.lock.lock();
72 try {
73 checkClosed();
74 jc = findFree();
75 jc.setPosition();
76 return new KeyedOutput(this, jc);
77 } catch (Throwable t) {
78 if (jc != null) {
79 LOG.debug("Throwable occurred while obtaining channel. Returning to the pool. {}", jc.path, t);
80 free(jc);
81 }
82 throw t;
83 } finally {
84 this.lock.unlock();
85 }
86 }
87
88
89
90
91
92
93 void free(final JournaledChannel jc) {
94 if (jc == null) {
95 throw new IllegalArgumentException("Cannot return a null JournaledChannel.");
96 }
97 this.lock.lock();
98 try {
99 if (this.free.contains(jc) || !this.free.offer(jc)) {
100 LOG.warn("Could not return the channel to the pool {}", this.key);
101 }
102
103 this.freeCondition.signalAll();
104 } finally {
105 this.lock.unlock();
106 }
107 }
108
109
110
111
112
113
114
115 @Override
116 public void close() throws InterruptedException, IOException {
117 this.lock.lock();
118 try {
119 while (this.free.size() < this.created) {
120 LOG.debug("Waiting for leased {} objects.", this.created - this.free.size());
121 this.freeCondition.await();
122 }
123 for (final JournaledChannel fc : this.free) {
124 this.allchannels[fc.index].close();
125 }
126 this.allchannels = null;
127 } finally {
128 this.lock.unlock();
129 }
130 }
131
132 private void checkClosed() throws ClosedChannelException {
133 if (this.allchannels == null) {
134 throw new ClosedChannelException();
135 }
136 }
137
138 private JournaledChannel findFree() throws InterruptedException, IOException {
139
140
141 while (this.free.isEmpty()) {
142 if (this.created < this.max) {
143 createChannel();
144 } else {
145 this.freeCondition.await();
146 checkClosed();
147 }
148 }
149 return this.free.poll();
150 }
151
152 private void createChannel() throws IOException {
153 final Path p = Paths.get(this.directory.toString(), this.key + "_" + UUID.randomUUID().toString() + EXTENSION);
154 final JournaledChannel ko = new JournaledChannel(p, this.key, this.created);
155 this.allchannels[this.created++] = ko;
156 this.free.add(ko);
157 }
158
159 Path getDirectory() {
160 return this.directory;
161 }
162 }