NIOSessionParser.java
package emissary.parser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.util.Arrays;
import javax.annotation.Nullable;
/**
* Provide a basic NIO-based session parser that reads data in chunks from the underlying channel. A chunk might have
* zero or more complete sessions within it. The chunk buffer will begin at minChunkSize, but grow as large as
* maxChunkSize in order to accomodate a complete session. Sessions larger than maxChunkSize will lead to
* ParserExceptions
*/
public abstract class NIOSessionParser extends SessionParser {
// Logger
private final static Logger logger = LoggerFactory.getLogger(NIOSessionParser.class);
protected static final int MIN_CHUNK_SIZE_DEFAULT = 2 * 1024 * 1024; // 2Mb
protected static final int MAX_CHUNK_SIZE_DEFAULT = 40 * 1024 * 1024; // 40Mb
/** The data source for this parser */
protected SeekableByteChannel channel;
/** The start position of the current chunk relative to the data source */
protected int chunkStart = 0;
/** The current chunk buffer */
@Nullable
protected byte[] data = null;
/** The current write position for the current chunk buffer */
protected int writeOffset = 0;
/** Min chunk buffer size. */
protected int minChunkSize = MIN_CHUNK_SIZE_DEFAULT;
/** Max chunk to read at a time, we will never be able to read a file with a session larger than this. */
protected int maxChunkSize = MAX_CHUNK_SIZE_DEFAULT;
/** When we grow the chunk buffer to accomodate additional data, we will grow the buffer by this increment */
protected int chunkAllocationIncrement = (10 * 1024 * 1024) + 100;
/**
* Create the parser with the supplied data source
*
* @param channel the source of data
*/
public NIOSessionParser(SeekableByteChannel channel) {
this.channel = channel;
}
/**
* Get the chunking size
*/
public int getMaxChunkSize() {
return maxChunkSize;
}
/**
* Set the chunking size
*/
public void setMaxChunkSize(int value) {
if (value > 0) {
maxChunkSize = value;
}
}
/**
* Read more data, starting where the last read left off. Read in <code>chunksize</code> bytes.
*
* @param data the byte array to (re)load or null if one should be created
* @return the byte array of data
* @throws ParserException in cases where a new array can't be read.
*/
protected byte[] loadNextRegion(@Nullable byte[] data) throws ParserException {
logger.debug("loadNextRegion(): data.length = {}, maxChunkSize = {}, chunkStart = {}, writeOffset = {}",
data == null ? -1 : data.length, maxChunkSize, chunkStart, writeOffset);
if (!channel.isOpen()) {
throw new ParserEOFException("Channel is closed, likely completely consumed");
}
// Optionally create the array or recreate if old is too small
if (data == null) {
data = new byte[minChunkSize];
}
if (writeOffset >= data.length) {
// grow the byte buffer to accomodate more data
int newSize = data.length + chunkAllocationIncrement;
if (newSize > maxChunkSize) {
newSize = maxChunkSize;
}
if (data.length >= maxChunkSize) {
// if the byte array is already maxChunkSize or larger, there isn't anything more we can do
throw new ParserException("buffer size required to read session " + chunkStart + " is larger than maxChunkSize " + maxChunkSize);
}
byte[] newData = new byte[newSize];
System.arraycopy(data, 0, newData, 0, data.length);
data = newData;
}
final ByteBuffer b = ByteBuffer.wrap(data);
b.position(writeOffset);
b.limit(data.length);
try {
while (b.hasRemaining()) {
if (channel.read(b) == -1) {
channel.close();
logger.warn("Closing channel. End of channel reached at {} instead of expected {}", data.length - b.remaining(), data.length);
break;
}
}
} catch (IOException ex) {
throw new ParserException("Exception reading from channel", ex);
}
writeOffset = data.length - b.remaining();
if (writeOffset < data.length) {
logger.debug("trimming byte[] from {} to size {}", data.length, writeOffset);
data = Arrays.copyOfRange(data, 0, writeOffset);
}
return data;
}
}