View Javadoc
1   package emissary.parser;
2   
3   import org.slf4j.Logger;
4   import org.slf4j.LoggerFactory;
5   
6   import java.nio.ByteBuffer;
7   import java.nio.channels.SeekableByteChannel;
8   
9   /**
10   * Encapsulate the behavior necessary to slide a window through a channel and parse sessions from it. nextChunkOrDie
11   * will load the next region.
12   */
13  public abstract class FillingNIOParser extends NIOSessionParser {
14  
15      private final static Logger logger = LoggerFactory.getLogger(FillingNIOParser.class);
16  
17      /** position of the session start relative to the start of the current chunk */
18      protected int sessionStart = 0;
19  
20      public FillingNIOParser(SeekableByteChannel channel) {
21          super(channel);
22      }
23  
24      /**
25       * Get and set all stats for loading the next chunk This is specified to the RAF parser families
26       * 
27       * @param data the allocated data block to fill or null
28       * @return ref to the incoming block now filled or a new one if incoming was null or size was changed
29       * @throws ParserEOFException when there is no more data
30       */
31      protected byte[] nextChunkOrDie(byte[] data) throws ParserException {
32          if (sessionStart > 0) {
33              // the sessionStart is somewhere other than the beginning of the buffer and
34              // we've discovered that we need to load more data. Compact the buffer so that
35              // the sessionStart is at the beginning of the buffer. chunkStart is incremented
36              // by sessionStart, the write offset is updated.
37              final ByteBuffer b = ByteBuffer.wrap(data);
38              b.position(sessionStart);
39              b.limit(writeOffset);
40              b.compact(); // does not re-allocate the byte array, only manipulates the current buffer.
41  
42              chunkStart += sessionStart;
43              sessionStart = 0;
44              writeOffset = b.position();
45              logger.debug("Compacted buffer: sessionStart/chunkStart/writeOffset = {}/{}/{}", sessionStart, chunkStart, writeOffset);
46          }
47  
48          try {
49              byte[] b = loadNextRegion(data);
50              return b;
51  
52          } catch (ParserEOFException eof) {
53              if (writeOffset - sessionStart > 1) {
54                  // there's data left in the buffer that's more than a newline, meaning
55                  // there's data we were unable to parse, but there was no additional
56                  // data to read - so the session is truncated
57                  throw new ParserException("Unexpectedly malformed data at " + chunkStart);
58              } else {
59                  // end of file and the last session was complete.
60                  setFullyParsed(true);
61                  throw eof;
62              }
63          }
64      }
65  }