1 package emissary.parser;
2
3 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory;
5
6 import java.nio.ByteBuffer;
7 import java.nio.channels.SeekableByteChannel;
8
9 /**
10 * Encapsulate the behavior necessary to slide a window through a channel and parse sessions from it. nextChunkOrDie
11 * will load the next region.
12 */
13 public abstract class FillingNIOParser extends NIOSessionParser {
14
15 private final static Logger logger = LoggerFactory.getLogger(FillingNIOParser.class);
16
17 /** position of the session start relative to the start of the current chunk */
18 protected int sessionStart = 0;
19
20 public FillingNIOParser(SeekableByteChannel channel) {
21 super(channel);
22 }
23
24 /**
25 * Get and set all stats for loading the next chunk This is specified to the RAF parser families
26 *
27 * @param data the allocated data block to fill or null
28 * @return ref to the incoming block now filled or a new one if incoming was null or size was changed
29 * @throws ParserEOFException when there is no more data
30 */
31 protected byte[] nextChunkOrDie(byte[] data) throws ParserException {
32 if (sessionStart > 0) {
33 // the sessionStart is somewhere other than the beginning of the buffer and
34 // we've discovered that we need to load more data. Compact the buffer so that
35 // the sessionStart is at the beginning of the buffer. chunkStart is incremented
36 // by sessionStart, the write offset is updated.
37 final ByteBuffer b = ByteBuffer.wrap(data);
38 b.position(sessionStart);
39 b.limit(writeOffset);
40 b.compact(); // does not re-allocate the byte array, only manipulates the current buffer.
41
42 chunkStart += sessionStart;
43 sessionStart = 0;
44 writeOffset = b.position();
45 logger.debug("Compacted buffer: sessionStart/chunkStart/writeOffset = {}/{}/{}", sessionStart, chunkStart, writeOffset);
46 }
47
48 try {
49 byte[] b = loadNextRegion(data);
50 return b;
51
52 } catch (ParserEOFException eof) {
53 if (writeOffset - sessionStart > 1) {
54 // there's data left in the buffer that's more than a newline, meaning
55 // there's data we were unable to parse, but there was no additional
56 // data to read - so the session is truncated
57 throw new ParserException("Unexpectedly malformed data at " + chunkStart);
58 } else {
59 // end of file and the last session was complete.
60 setFullyParsed(true);
61 throw eof;
62 }
63 }
64 }
65 }