View Javadoc
1   package emissary.output.roller.journal;
2   
3   import org.slf4j.Logger;
4   import org.slf4j.LoggerFactory;
5   
6   import java.io.IOException;
7   import java.nio.channels.ClosedChannelException;
8   import java.nio.file.Path;
9   import java.nio.file.Paths;
10  import java.util.ArrayDeque;
11  import java.util.Deque;
12  import java.util.UUID;
13  import java.util.concurrent.locks.Condition;
14  import java.util.concurrent.locks.ReentrantLock;
15  import javax.annotation.Nullable;
16  
17  /**
18   * Pool implementation that utilizes a Journal to durably track state out written data. The implementation will create
19   * up to the maximum configured output channels. Channels are lazily initialized to minimize resource utilization. The
20   * Journal is only updated when files are created and committed.
21   */
22  public class JournaledChannelPool implements AutoCloseable {
23      public static final String EXTENSION = ".bgpart";
24      private static final Logger LOG = LoggerFactory.getLogger(JournaledChannelPool.class);
25      public static final int DEFAULT_MAX = 10;
26      private final ReentrantLock lock = new ReentrantLock();
27      private final Condition freeCondition = this.lock.newCondition();
28      final int max;
29      final Path directory;
30      final String key;
31      private final Deque<JournaledChannel> free = new ArrayDeque<>();
32      private int created;
33      @Nullable
34      private JournaledChannel[] allchannels;
35  
36      @SuppressWarnings("CheckedExceptionNotThrown")
37      public JournaledChannelPool(final Path directory, final String key, final int max) throws IOException {
38          this.max = max;
39          this.directory = directory;
40          this.key = key;
41          this.allchannels = new JournaledChannel[max];
42      }
43  
44      int getFreeSize() {
45          return this.free.size();
46      }
47  
48      int getCreatedCount() {
49          return this.created;
50      }
51  
52      /**
53       * Supplied key to identify the pool.
54       * 
55       * @return key
56       */
57      public String getKey() {
58          return this.key;
59      }
60  
61      /**
62       * Returns an available output from the pool. This method will block if there are no free objects available and the max
63       * number of outputs has been created.
64       * 
65       * @return an available KeyedOutput from the pool
66       * @throws IOException If there is some I/O problem.
67       * @throws InterruptedException If interrupted.
68       */
69      public KeyedOutput getFree() throws InterruptedException, IOException {
70          JournaledChannel jc = null;
71          this.lock.lock();
72          try {
73              checkClosed();
74              jc = findFree();
75              jc.setPosition();
76              return new KeyedOutput(this, jc);
77          } catch (Throwable t) {
78              if (jc != null) {
79                  LOG.debug("Throwable occurred while obtaining channel. Returning to the pool. {}", jc.path, t);
80                  free(jc);
81              }
82              throw t;
83          } finally {
84              this.lock.unlock();
85          }
86      }
87  
88      /**
89       * Flushes underlying channel and writes journal entry, updating current position.
90       * 
91       * @param jc the JournaledChannel to flush
92       */
93      void free(final JournaledChannel jc) {
94          if (jc == null) {
95              throw new IllegalArgumentException("Cannot return a null JournaledChannel.");
96          }
97          this.lock.lock();
98          try {
99              if (this.free.contains(jc) || !this.free.offer(jc)) {
100                 LOG.warn("Could not return the channel to the pool {}", this.key);
101             }
102             // signal everyone since close and find may be waiting
103             this.freeCondition.signalAll();
104         } finally {
105             this.lock.unlock();
106         }
107     }
108 
109     /**
110      * Closes the underlying pool. This method will block if any resources have not been returned.
111      * 
112      * @throws InterruptedException If interrupted.
113      * @throws IOException If there is some I/O problem.
114      */
115     @Override
116     public void close() throws InterruptedException, IOException {
117         this.lock.lock();
118         try {
119             while (this.free.size() < this.created) {
120                 LOG.debug("Waiting for leased {} objects.", this.created - this.free.size());
121                 this.freeCondition.await();
122             }
123             for (final JournaledChannel fc : this.free) {
124                 this.allchannels[fc.index].close();
125             }
126             this.allchannels = null;
127         } finally {
128             this.lock.unlock();
129         }
130     }
131 
132     private void checkClosed() throws ClosedChannelException {
133         if (this.allchannels == null) {
134             throw new ClosedChannelException();
135         }
136     }
137 
138     private JournaledChannel findFree() throws InterruptedException, IOException {
139         // if nothing is available, and we can create additional channels, do it
140         // could get closed when we await
141         while (this.free.isEmpty()) {
142             if (this.created < this.max) {
143                 createChannel();
144             } else {
145                 this.freeCondition.await();
146                 checkClosed();
147             }
148         }
149         return this.free.poll();
150     }
151 
152     private void createChannel() throws IOException {
153         final Path p = Paths.get(this.directory.toString(), this.key + "_" + UUID.randomUUID().toString() + EXTENSION);
154         final JournaledChannel ko = new JournaledChannel(p, this.key, this.created);
155         this.allchannels[this.created++] = ko;
156         this.free.add(ko);
157     }
158 
159     Path getDirectory() {
160         return this.directory;
161     }
162 }