WindowedSeekableByteChannel.java
package emissary.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import javax.annotation.Nullable;
/**
* This class provides a seekable channel for a portion, or window, within the provided ReadableByteChannel. The
* underlying window size is configured on instantiation. If you intend to move the positions for stateful processing,
* you should provide a buffer size, <code>buffsize</code>, greater than the maximum amount you wish to operate against
* at any one time.
*
* <p>
* This implementation should be able to hold a maximum window of ~4GB. This implementation uses on heap buffers so be
* wary of using Integer.MAX_VALUE as that can cause and OOME.
*/
public class WindowedSeekableByteChannel implements SeekableByteChannel {
private static final Logger logger = LoggerFactory.getLogger(WindowedSeekableByteChannel.class);
/**
* The input source
*/
private final ReadableByteChannel in;
/**
* estimated length. We have to estimate because we are buffering into a window and may not be at the end. We read ahead
* to keep buffers full, but there can be additional data.
*/
long estimatedLength;
/**
* The earliest position we can move to. Essentially, position of the underlying Channel that is at position 0 of buff1
*/
long minposition;
/* Maximum amount of data allowed in memory */
// long maxwindow;
/** flag if we've reached the end of the underlying channel */
private boolean endofchannel;
/**
* Internal buffers for windowed content
*/
@Nullable
private ByteBuffer buff1;
@Nullable
private ByteBuffer buff2;
/**
* Creates a new instance and populates buffers with data.
*/
public WindowedSeekableByteChannel(final ReadableByteChannel in, final int buffsize) throws IOException {
logger.debug("WindowSeekableByteChannel created with buffer size = {}", buffsize);
if ((in == null) || !in.isOpen()) {
throw new IllegalArgumentException("Channel must be open and not null:");
}
this.in = in;
int capacity = buffsize / 2;
if ((buffsize % 2) == 1) {
capacity++;
}
this.buff1 = ByteBuffer.allocate(capacity);
readIntoBuffer(this.buff1);
// only fill buff2 if there's more to read. otherwise save heap
if (!this.endofchannel) {
this.buff2 = ByteBuffer.allocate(capacity);
readIntoBuffer(this.buff2);
} else {
this.buff2 = ByteBuffer.allocate(0);
}
}
/**
* If necessary, will move data in the window to make room for additional data from the channel.
*/
private void realignBuffers() throws IOException {
logger.debug("realignBuffers() called: buf1 = {}, buf2 = {}", buff1, buff2);
final int qtr = this.buff1.capacity() / 2;
if (this.endofchannel || (this.buff2.remaining() > qtr)) {
logger.debug("after early return from realignBuffers(): buf1 = {}, buf2 = {}", buff1, buff2);
return;
}
// keep track of our position
final long offset = (long) this.buff1.position() + this.buff2.position();
this.buff1.position(qtr);
// push them forward
this.buff1.compact();
// read from the beginning of the buffer
this.buff2.rewind();
logger.debug("realignBuffers() called prior to fillDst: buf1 = {}, buf2 = {}", buff1, buff2);
filldst(this.buff2, this.buff1);
// chuck the bytes read into buff1
logger.debug("realignBuffers() called prior prior to buff2 compact: buf1 = {}, buf2 = {}", buff1, buff2);
this.buff2.compact();
logger.debug("realignBuffers() called prior to readIntoBuffer: buf1 = {}, buf2 = {}", buff1, buff2);
readIntoBuffer(this.buff2);
// update the offset
this.minposition += qtr;
// reset our location
setOffset(offset - qtr);
logger.debug("after realignBuffers(): buf1 = {}, buf2 = {}", buff1, buff2);
}
/**
* Determine if there are bytes available to be read.
*
* @return true if either buffer has data remaining or we have not reached the end of channel.
*/
private boolean bytesAvailable() {
return this.buff1.remaining() > 0 || this.buff2.remaining() > 0 || !this.endofchannel;
}
/**
* Attempt to read data from the open channel into the buffer provided.
* <p>
* After this call completes, we have either filled the buffer -or- have reached the end of data in the input channel.
* The buffer will have its position set to 0, and limit set to the end of the data read, which may be equal to the size
* of the buffer.
* <p>
* Has the side effect of raising the endofchannel flag if we have exhausted the bytes in the input channel. Updates the
* estimatedLength with the number of bytes read.
*
* @param buf the destination buffer.
* @return the number of bytes read into the buffer.
*/
private int readIntoBuffer(final ByteBuffer buf) throws IOException {
logger.debug("readIntoBuffer() called: {}", buf);
final int rem = buf.remaining();
int read = 0;
while ((read != -1) && (buf.remaining() > 0)) {
read = this.in.read(buf);
}
this.endofchannel = (read == -1);
// total amount read in case we hit EOS
final int totalRead = rem - buf.remaining();
this.estimatedLength += totalRead;
buf.flip();
return totalRead;
}
/**
* {@inheritDoc}
*
* @see java.nio.channels.Channel#isOpen()
*/
@Override
public boolean isOpen() {
return this.in.isOpen();
}
/**
* Closes underlying Channel and releases buffers. Further calls to this instance will result in unspecified behavior.
*
* @see java.nio.channels.Channel#close()
*/
@Override
public void close() throws IOException {
this.in.close();
this.buff1 = null;
this.buff2 = null;
}
/**
* {@inheritDoc}
*
* @see java.nio.channels.SeekableByteChannel#read(ByteBuffer)
*/
@Override
public int read(final ByteBuffer dst) throws IOException {
if (!this.isOpen()) {
throw new ClosedChannelException();
}
if (dst == null) {
throw new IllegalArgumentException("Destination ByteBuffer cannot be null");
}
// we have nothing left to read and have consumed both buffers fully.
if (this.endofchannel && (this.buff1.remaining() + this.buff2.remaining() == 0)) {
return -1;
}
// no more room in the target buffer, but we might have more to read.
// - do we want to possibly throw an exception here?
if (dst.remaining() == 0) {
return 0;
}
final int maxWrite = dst.remaining();
while (dst.hasRemaining() && bytesAvailable()) {
// filling buffers
realignBuffers();
filldst(this.buff1, dst);
filldst(this.buff2, dst);
}
final int bytesRead = maxWrite - dst.remaining();
return (this.endofchannel && bytesRead == 0) ? -1 : bytesRead;
}
/*
* Safely fill a destination buffer avoiding a buffer overflow if necessary. <p> Copies byte from src to dest. The
* number of bytes copied is the minimum of the amount of space remaining in the destination buffer
*/
private static void filldst(final ByteBuffer src, final ByteBuffer dst) {
while (src.hasRemaining() && dst.hasRemaining()) {
final int origLimit = src.limit();
// avoid buffer overflow
final int limit = (src.remaining() > dst.remaining()) ? (src.position() + dst.remaining()) : origLimit;
src.limit(limit);
dst.put(src);
// set it back
src.limit(origLimit);
}
}
/**
* {@inheritDoc}
*
* @see java.nio.channels.SeekableByteChannel#position()
*/
@Override
public long position() throws IOException {
return this.minposition + this.buff1.position() + this.buff2.position();
}
/**
* {@inheritDoc}
*
* @see java.nio.channels.SeekableByteChannel#position(long)
*/
@Override
public SeekableByteChannel position(final long newPosition) throws IOException {
// if data hasn't been read in, we'll have to do it below and see if it's available
if (this.endofchannel && (newPosition > this.estimatedLength)) {
throw new EOFException("Position is beyond EOF");
}
long tgtPosition = newPosition - this.minposition;
// see if we can move there
if (tgtPosition < 0) {
throw new IllegalStateException("Cannot move to " + newPosition + " in the stream. Minimum position is " + this.minposition);
}
while (!setOffset(tgtPosition) && !this.endofchannel) {
realignBuffers();
tgtPosition = newPosition - this.minposition;
}
if (newPosition > this.estimatedLength) {
throw new EOFException("Position is beyond EOF");
}
return this;
}
/**
* attempts to set position to specified offset in underlying buffers
*/
private boolean setOffset(final long tgtOffset) {
logger.debug("setOffset() called tgtOffset = {}, buff1 = {}, buff2 = {}", tgtOffset, buff1, buff2);
if (tgtOffset <= this.buff1.limit()) {
this.buff1.position((int) tgtOffset);
this.buff2.position(0);
} else if (tgtOffset <= (this.buff1.limit() + this.buff2.limit())) {
this.buff1.position(this.buff1.limit());
this.buff2.position((int) (tgtOffset - this.buff1.capacity()));
} else {
this.buff1.position(this.buff1.limit());
this.buff2.position(this.buff2.limit());
return false;
}
return true;
}
/**
* Returns the minimum position we can go to in the backing Channel. This is based on the current window mapped into the
* backing store.
*
* @return the minimum allowed position in the channel
*/
public long getMinPosition() {
return this.minposition;
}
/**
* Returns the maximum position we can go to in the backing Channel. This is based on the current window mapped into the
* backing store.
*
* @return the maximum allowed position in the channel
*/
public long getMaxPosition() {
return this.minposition + this.buff1.limit() + this.buff2.limit();
}
/**
* A potential size for the underlying Channel. This value can change as we read additional data into the buffer.
* Eventually, this number should reflect the true size assuming no underlying exceptions.
*
* @return an estimated length of the underlying channel
*/
@Override
public long size() throws IOException {
return this.estimatedLength;
}
/**
* This is a read only implementation.
*
* @param size The truncation size.
* @return throws ex
* @throws IOException If there is some I/O problem.
* @throws UnsupportedOperationException If the operation is not supported.
* @see java.nio.channels.SeekableByteChannel#truncate(long)
*/
@Override
public SeekableByteChannel truncate(final long size) throws IOException {
throw new UnsupportedOperationException("This implementation does not allow mutations to the underlying channel");
}
/**
* Unsupported in this implementation. This could be modified in the future to allow in memory writes.
*
* @param source The bytes to write.
* @return throws ex
* @throws IOException If there is some I/O problem.
* @throws UnsupportedOperationException If the operation is not supported.
* @see java.nio.channels.SeekableByteChannel#write(ByteBuffer)
*/
@Override
public int write(final ByteBuffer source) throws IOException {
throw new UnsupportedOperationException("This is a readonly implementation");
}
}