SimpleNioParser.java
package emissary.parser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
/**
* A very simple minded parser implementation that assumes each input channel is one session. This parser has no idea
* about headers and footers, just the basic session and not much of an idea about that.
*/
public class SimpleNioParser extends NIOSessionParser {
private final static Logger logger = LoggerFactory.getLogger(SimpleNioParser.class);
protected int currentSessionIndex = 0;
/**
* Create a session parser on the data
*/
@SuppressWarnings("CheckedExceptionNotThrown")
public SimpleNioParser(SeekableByteChannel channel) throws ParserException {
super(channel);
}
/**
* Creates a hashtable of elements from the session: header, footer, body, and other meta data values extracted from the
* session data.
*
* @param session The session to be decomposed into separate elements.
* @return A map of session elements
*/
protected DecomposedSession decomposeSession(@Nullable InputSession session) throws ParserException {
try {
DecomposedSession d = new DecomposedSession();
if (session != null) {
d.setHeader(makeDataSlice(session.getHeader()));
d.setFooter(makeDataSlice(session.getFooter()));
d.setData(makeDataSlice(session.getData()));
long length = d.getData() == null ? -1L : d.getData().length;
// Cook the raw metadata and transfer to DecomposedSession
Map<String, Object> md = session.getMetaData();
if (md != null) {
Map<String, String> cooked = cookMetaRecords(md);
d.setMetaData(new HashMap<>()); // clear
for (Map.Entry<String, String> entry : cooked.entrySet()) {
d.addMetaData(entry.getKey(), entry.getValue());
}
}
// Use session length if no data length
if (length < 0) {
length = session.getLength();
}
d.addMetaData(ORIG_DOC_SIZE_KEY, Long.toString(length));
}
return d;
} catch (IOException ex) {
throw new ParserException("Error while building DecomposedSession", ex);
}
}
/**
* Turn the metadata PositionRecord elements into real data
*
* @param raw map of PositionRecord objects
* @return map of metadata
*/
protected Map<String, String> cookMetaRecords(Map<String, Object> raw) throws IOException {
Map<String, String> cooked = new HashMap<>();
for (Map.Entry<String, Object> entry : raw.entrySet()) {
String key = entry.getKey();
Object tmp = entry.getValue();
if (tmp != null) {
String value;
if (tmp instanceof PositionRecord) {
value = new String(makeDataSlice((PositionRecord) tmp)).trim();
} else {
value = tmp.toString();
}
String name = renameMetadataRecord(key);
cooked.put(name, value);
}
}
return cooked;
}
/**
* Allow subclasses to arbitrarily rename metadata fields This is a do nothing function in the base class.
*
* @param s the name of the field to consider renaming
* @return the renamed field or the original name if no change
*/
protected String renameMetadataRecord(String s) {
return s;
}
/**
* Possible help to debug this factory mess
*
* @return string representation
*/
@Override
public String toString() {
return SimpleNioParser.class.getName() + " isa" + this.getClass().getName();
}
/**
* Creates a hashtable of elements from the session: header, footer, body, and other meta data values extracted from the
* session data for the next session in the data. This Simple base implementation only treats the whole file as one
* session
*
* @return next session
*/
@Override
public DecomposedSession getNextSession() throws ParserException {
try {
if (isFullyParsed()) {
throw new ParserEOFException("Past end of data");
}
long csize = channel.size();
InputSession i = new InputSession(new PositionRecord(0, csize), // overall record
new PositionRecord(0, csize)); // data record
i.setValid(true);
setFullyParsed(true);
return decomposeSession(i);
} catch (IOException ex) {
throw new ParserException("Exception occurred reading channel", ex);
}
}
/**
* Slice data from a buffer based on a single position record
*
* @param r the position record indicating absolute offsets
*/
byte[] makeDataSlice(PositionRecord r) throws IOException {
if (r.getLength() > MAX_ARRAY_SIZE_LONG) {
throw new IllegalStateException("Implementation currently only handles up to Intger.MAX_VALUE lengths");
}
int len = (int) r.getLength();
ByteBuffer n = ByteBuffer.allocate(len);
try {
channel.position(r.getPosition());
while (n.hasRemaining()) {
if (channel.read(n) == -1) {
channel.close();
break;
}
}
} catch (BufferUnderflowException ex) {
logger.warn("Underflow getting {} bytes at {}", n.capacity(), r.getPosition());
}
return n.array();
}
/**
* Slice data from a buffer based on a single position record
*
* @param records the list of position records indicating absolute offsets
*/
@Nullable
byte[] makeDataSlice(@Nullable List<PositionRecord> records) throws IOException {
if (records == null || records.isEmpty()) {
return null;
}
if (records.size() == 1) {
return makeDataSlice(records.get(0));
}
int total = 0;
for (PositionRecord r : records) {
total += (int) r.getLength();
if (total > MAX_ARRAY_SIZE || total < 0) {
throw new IllegalStateException("This implementation cannot create data larger than " + MAX_ARRAY_SIZE);
}
}
ByteBuffer n = ByteBuffer.allocate(total);
int limit = 0;
for (PositionRecord r : records) {
channel.position(r.getPosition());
limit += (int) r.getLength();
n.limit(limit);
while (n.hasRemaining()) {
if (channel.read(n) == -1) {
channel.close();
break;
}
}
}
return n.array();
}
}