View Javadoc
1   package emissary.core.channels;
2   
3   import org.apache.commons.io.IOUtils;
4   import org.apache.commons.lang3.Validate;
5   
6   import java.io.IOException;
7   import java.nio.ByteBuffer;
8   import java.nio.channels.SeekableByteChannel;
9   
10  /**
11   * Creates a SeekableByteChannel cache of a defined size analogous to BufferedInputStream.
12   */
13  public final class BufferedChannelFactory {
14      private BufferedChannelFactory() {}
15  
16      /**
17       * Creates a SeekableByteChannelFactory that caches the bytes of the passed in SeekableByteChannelFactory.
18       * 
19       * @param seekableByteChannelFactory to be cached.
20       * @param maxBufferSize maximum size of the buffer of bytes (BufferSize = Math.min(sbcf.size(), maxBufferSize)).
21       * @return the caching SeekableByteChannelFactory.
22       */
23      public static SeekableByteChannelFactory create(final SeekableByteChannelFactory seekableByteChannelFactory,
24              final int maxBufferSize) {
25          return new BufferedChannelFactoryImpl(seekableByteChannelFactory, maxBufferSize);
26      }
27  
28      /**
29       * A SeekableByteChannelFactory that caches the bytes of the passed in SeekableByteChannelFactory.
30       */
31      private static class BufferedChannelFactoryImpl implements SeekableByteChannelFactory {
32          /**
33           * The SeekableByteChannel to cache.
34           */
35          private final SeekableByteChannelFactory seekableByteChannelFactory;
36          /**
37           * The size of the buffer of bytes.
38           */
39          private final int bufferSize;
40  
41          /**
42           * Creates a SeekableByteChannelFactory that caches the bytes of the passed in SeekableByteChannelFactory.
43           * 
44           * @param seekableByteChannelFactory to be cached.
45           * @param maxBufferSize of the buffer of bytes.
46           */
47          public BufferedChannelFactoryImpl(final SeekableByteChannelFactory seekableByteChannelFactory,
48                  final int maxBufferSize) {
49              Validate.notNull(seekableByteChannelFactory, "Required: seekableByteChannelFactory not null!");
50              Validate.isTrue(maxBufferSize > 0, "Required: maxBufferSize > 0");
51  
52              this.seekableByteChannelFactory = seekableByteChannelFactory;
53  
54              int b = maxBufferSize;
55              try (SeekableByteChannel sbc = seekableByteChannelFactory.create()) {
56                  if (sbc.size() < maxBufferSize) {
57                      b = (int) sbc.size();
58                  }
59              } catch (IOException ignored) {
60                  // Leave b as maxBufferSize.
61              }
62  
63              this.bufferSize = b;
64          }
65  
66          @Override
67          public SeekableByteChannel create() {
68              return new BufferedSeekableByteChannel(seekableByteChannelFactory.create(), bufferSize);
69          }
70      }
71  
72      /**
73       * SeekableByteChannel that caches the bytes of the passed in SeekableByteChannel
74       */
75      private static class BufferedSeekableByteChannel extends AbstractSeekableByteChannel {
76          /**
77           * The SeekableByteChannel to cache.
78           */
79          private final SeekableByteChannel seekableByteChannel;
80          /**
81           * The size of the buffer of bytes.
82           */
83          private final int bufferSize;
84          /**
85           * The buffer of bytes.
86           */
87          private final ByteBuffer buffer;
88  
89          /**
90           * The starting offset of the current buffer.
91           */
92          private long bufferStart = -1;
93          /**
94           * The number of valid bytes in the current buffer.
95           */
96          private int bufferValidBytes = -1;
97  
98          /**
99           * Creates a SeekableByteChannel cache where there is a single buffer of bytes aligned on bufferSize boundaries.
100          * 
101          * @param seekableByteChannel to be cached.
102          * @param bufferSize of the buffer of bytes.
103          */
104         public BufferedSeekableByteChannel(final SeekableByteChannel seekableByteChannel, final int bufferSize) {
105             this.seekableByteChannel = seekableByteChannel;
106             this.bufferSize = bufferSize;
107             this.buffer = ByteBuffer.allocate(bufferSize);
108         }
109 
110         @Override
111         protected void closeImpl() throws IOException {
112             seekableByteChannel.close();
113         }
114 
115         @Override
116         protected int readImpl(final ByteBuffer byteBuffer) throws IOException {
117             // Determines the start of the buffer that contains the current position.
118             final long bufferStartFromPosition = position() / bufferSize * bufferSize;
119 
120             if (bufferStartFromPosition != bufferStart) {
121                 buffer.position(0);
122                 seekableByteChannel.position(bufferStartFromPosition);
123 
124                 bufferValidBytes = IOUtils.read(seekableByteChannel, buffer);
125                 bufferStart = bufferStartFromPosition;
126             }
127 
128             final int bufferStartOffset = (int) (position() % bufferSize);
129             final int bytesToReturn = Math.min(byteBuffer.remaining(), bufferValidBytes - bufferStartOffset);
130 
131             byteBuffer.put(buffer.array(), bufferStartOffset, bytesToReturn);
132 
133             return bytesToReturn;
134         }
135 
136         @Override
137         protected long sizeImpl() throws IOException {
138             return seekableByteChannel.size();
139         }
140     }
141 }