1 package emissary.core.channels;
2
3 import org.apache.commons.io.IOUtils;
4 import org.apache.commons.lang3.Validate;
5
6 import java.io.IOException;
7 import java.nio.ByteBuffer;
8 import java.nio.channels.SeekableByteChannel;
9
10
11
12
13 public final class BufferedChannelFactory {
14 private BufferedChannelFactory() {}
15
16
17
18
19
20
21
22
23 public static SeekableByteChannelFactory create(final SeekableByteChannelFactory seekableByteChannelFactory,
24 final int maxBufferSize) {
25 return new BufferedChannelFactoryImpl(seekableByteChannelFactory, maxBufferSize);
26 }
27
28
29
30
31 private static class BufferedChannelFactoryImpl implements SeekableByteChannelFactory {
32
33
34
35 private final SeekableByteChannelFactory seekableByteChannelFactory;
36
37
38
39 private final int bufferSize;
40
41
42
43
44
45
46
47 public BufferedChannelFactoryImpl(final SeekableByteChannelFactory seekableByteChannelFactory,
48 final int maxBufferSize) {
49 Validate.notNull(seekableByteChannelFactory, "Required: seekableByteChannelFactory not null!");
50 Validate.isTrue(maxBufferSize > 0, "Required: maxBufferSize > 0");
51
52 this.seekableByteChannelFactory = seekableByteChannelFactory;
53
54 int b = maxBufferSize;
55 try (SeekableByteChannel sbc = seekableByteChannelFactory.create()) {
56 if (sbc.size() < maxBufferSize) {
57 b = (int) sbc.size();
58 }
59 } catch (IOException ignored) {
60
61 }
62
63 this.bufferSize = b;
64 }
65
66 @Override
67 public SeekableByteChannel create() {
68 return new BufferedSeekableByteChannel(seekableByteChannelFactory.create(), bufferSize);
69 }
70 }
71
72
73
74
75 private static class BufferedSeekableByteChannel extends AbstractSeekableByteChannel {
76
77
78
79 private final SeekableByteChannel seekableByteChannel;
80
81
82
83 private final int bufferSize;
84
85
86
87 private final ByteBuffer buffer;
88
89
90
91
92 private long bufferStart = -1;
93
94
95
96 private int bufferValidBytes = -1;
97
98
99
100
101
102
103
104 public BufferedSeekableByteChannel(final SeekableByteChannel seekableByteChannel, final int bufferSize) {
105 this.seekableByteChannel = seekableByteChannel;
106 this.bufferSize = bufferSize;
107 this.buffer = ByteBuffer.allocate(bufferSize);
108 }
109
110 @Override
111 protected void closeImpl() throws IOException {
112 seekableByteChannel.close();
113 }
114
115 @Override
116 protected int readImpl(final ByteBuffer byteBuffer) throws IOException {
117
118 final long bufferStartFromPosition = position() / bufferSize * bufferSize;
119
120 if (bufferStartFromPosition != bufferStart) {
121 buffer.position(0);
122 seekableByteChannel.position(bufferStartFromPosition);
123
124 bufferValidBytes = IOUtils.read(seekableByteChannel, buffer);
125 bufferStart = bufferStartFromPosition;
126 }
127
128 final int bufferStartOffset = (int) (position() % bufferSize);
129 final int bytesToReturn = Math.min(byteBuffer.remaining(), bufferValidBytes - bufferStartOffset);
130
131 byteBuffer.put(buffer.array(), bufferStartOffset, bytesToReturn);
132
133 return bytesToReturn;
134 }
135
136 @Override
137 protected long sizeImpl() throws IOException {
138 return seekableByteChannel.size();
139 }
140 }
141 }