View Javadoc
1   package emissary.output.roller.journal;
2   
3   import jakarta.annotation.Nullable;
4   import org.slf4j.Logger;
5   import org.slf4j.LoggerFactory;
6   
7   import java.io.IOException;
8   import java.nio.channels.ClosedChannelException;
9   import java.nio.file.Path;
10  import java.nio.file.Paths;
11  import java.util.ArrayDeque;
12  import java.util.Deque;
13  import java.util.UUID;
14  import java.util.concurrent.locks.Condition;
15  import java.util.concurrent.locks.ReentrantLock;
16  
17  /**
18   * Pool implementation that utilizes a Journal to durably track state out written data. The implementation will create
19   * up to the maximum configured output channels. Channels are lazily initialized to minimize resource utilization. The
20   * Journal is only updated when files are created and committed.
21   */
22  public class JournaledChannelPool implements AutoCloseable {
23      public static final String EXTENSION = ".bgpart";
24      private static final Logger LOG = LoggerFactory.getLogger(JournaledChannelPool.class);
25      public static final int DEFAULT_MAX = 10;
26      private final ReentrantLock lock = new ReentrantLock();
27      private final Condition freeCondition = this.lock.newCondition();
28      final int max;
29      final Path directory;
30      final String key;
31      private final Deque<JournaledChannel> free = new ArrayDeque<>();
32      private int created;
33      @Nullable
34      private JournaledChannel[] allchannels;
35  
36      @SuppressWarnings("CheckedExceptionNotThrown")
37      public JournaledChannelPool(final Path directory, final String key, final int max) throws IOException {
38          this.max = max;
39          this.directory = directory;
40          this.key = key;
41          this.allchannels = new JournaledChannel[max];
42      }
43  
44      int getFreeSize() {
45          return this.free.size();
46      }
47  
48      int getCreatedCount() {
49          return this.created;
50      }
51  
52      /**
53       * Supplied key to identify the pool.
54       * 
55       * @return key
56       */
57      public String getKey() {
58          return this.key;
59      }
60  
61      /**
62       * Returns an available output from the pool. This method will block if there are no free objects available and the max
63       * number of outputs has been created.
64       * 
65       * @return an available KeyedOutput from the pool
66       * @throws IOException If there is some I/O problem.
67       * @throws InterruptedException If interrupted.
68       */
69      public KeyedOutput getFree() throws InterruptedException, IOException {
70          JournaledChannel jc = null;
71          this.lock.lock();
72          try {
73              checkClosed();
74              jc = findFree();
75              jc.setPosition();
76              return new KeyedOutput(this, jc);
77          } catch (Throwable t) {
78              if (jc != null) {
79                  LOG.debug("Throwable occurred while obtaining channel. Returning to the pool. {}", jc.path, t);
80                  free(jc);
81              }
82              throw t;
83          } finally {
84              this.lock.unlock();
85          }
86      }
87  
88      /**
89       * Flushes underlying channel and writes journal entry, updating current position.
90       * 
91       * @param jc the JournaledChannel to flush
92       */
93      void free(final JournaledChannel jc) {
94          if (jc == null) {
95              throw new IllegalArgumentException("Cannot return a null JournaledChannel.");
96          }
97          this.lock.lock();
98          try {
99              if (this.free.contains(jc) || !this.free.offer(jc)) {
100                 LOG.warn("Could not return the channel to the pool {}", this.key);
101             }
102             // signal everyone since close and find may be waiting
103             this.freeCondition.signalAll();
104         } finally {
105             this.lock.unlock();
106         }
107     }
108 
109     /**
110      * Closes the underlying pool. This method will block if any resources have not been returned.
111      * 
112      * @throws IOException If there is some I/O problem.
113      */
114     @Override
115     public void close() throws IOException {
116         this.lock.lock();
117         try {
118             boolean interrupted = false;
119             while (this.free.size() < this.created) {
120                 LOG.debug("Waiting for leased {} objects.", this.created - this.free.size());
121                 try {
122                     this.freeCondition.await();
123                 } catch (InterruptedException ie) {
124                     interrupted = true;
125                     LOG.debug("Interrupted while waiting for free condition", ie);
126                 }
127             }
128             for (final JournaledChannel fc : this.free) {
129                 this.allchannels[fc.index].close();
130             }
131             this.allchannels = null;
132             if (interrupted) {
133                 Thread.currentThread().interrupt(); // Restore previous interrupt status
134             }
135         } finally {
136             this.lock.unlock();
137         }
138     }
139 
140     private void checkClosed() throws ClosedChannelException {
141         if (this.allchannels == null) {
142             throw new ClosedChannelException();
143         }
144     }
145 
146     private JournaledChannel findFree() throws InterruptedException, IOException {
147         // if nothing is available, and we can create additional channels, do it
148         // could get closed when we await
149         while (this.free.isEmpty()) {
150             if (this.created < this.max) {
151                 createChannel();
152             } else {
153                 this.freeCondition.await();
154                 checkClosed();
155             }
156         }
157         return this.free.poll();
158     }
159 
160     private void createChannel() throws IOException {
161         final Path p = Paths.get(this.directory.toString(), this.key + "_" + UUID.randomUUID().toString() + EXTENSION);
162         final JournaledChannel ko = new JournaledChannel(p, this.key, this.created);
163         this.allchannels[this.created++] = ko;
164         this.free.add(ko);
165     }
166 
167     Path getDirectory() {
168         return this.directory;
169     }
170 }