JournalReader.java
package emissary.output.roller.journal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import static emissary.output.roller.journal.Journal.CURRENT_VERSION;
import static emissary.output.roller.journal.Journal.ENTRY_LENGTH;
import static emissary.output.roller.journal.Journal.EXT;
import static emissary.output.roller.journal.Journal.MAGIC;
import static emissary.output.roller.journal.Journal.NINE;
import static emissary.output.roller.journal.Journal.SEP;
/**
* Encapsulates logic to read/deserialize a BG Journal file.
*/
public class JournalReader implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(JournalReader.class);
// full path to journal file
final Path journalPath;
// channel position where entries begin
private long begin;
// initial sequence value
private long startsequence;
// current sequence value
private long sequence;
// persisted journal
@Nullable
FileChannel journalChannel;
// not final to release on close
@Nullable
private ByteBuffer b = ByteBuffer.allocateDirect(ENTRY_LENGTH);
private final Journal journal;
private final ReentrantLock lock = new ReentrantLock();
public JournalReader(final Path journalPath) throws IOException {
this.journalPath = journalPath;
journal = new Journal(journalPath);
checkJournal();
loadEntries();
}
private void checkJournal() throws IOException {
if (Files.exists(journalPath) && Files.size(journalPath) > 0L) {
readHeader();
} else {
throw new NoSuchFileException("File does not exist " + journalPath);
}
}
private void loadEntries() throws IOException {
journalChannel.position(begin);
sequence = startsequence;
while (journalChannel.position() < journalChannel.size()) {
int read = read(ENTRY_LENGTH);
if (read != ENTRY_LENGTH) {
logger.warn("Incomplete journal read for {}. Expected {} but read {}. Sequence start {}, last successful sequence {} ({}th)",
journal.getKey(), ENTRY_LENGTH, read, startsequence, sequence, (sequence - startsequence));
break;
}
long nextSeq = getSequence();
if (nextSeq != ++sequence) {
logger.warn("Incorrect sequence value returned. Expected {} Received {}. Exiting", sequence, nextSeq);
break;
}
JournalEntry e = JournalEntry.deserialize(b);
journal.entries.add(e);
}
}
public Journal getJournal() throws IOException {
return journal;
}
// read long followed by null sep. return -1 if any issues
private long getSequence() {
long seq = b.getLong();
if (b.get() != SEP) {
return -1;
}
return seq;
}
private void checkMagic() throws IOException {
read(MAGIC.length);
byte[] mgic = new byte[MAGIC.length];
b.get(mgic);
if (!Arrays.equals(mgic, MAGIC)) {
throw new IllegalStateException("Not a Journal file. Invalid Magic");
}
}
private byte readVersion() throws IOException {
read(1);
return b.get();
}
private void readHeader() throws IOException {
this.journalChannel = FileChannel.open(journalPath, StandardOpenOption.CREATE, StandardOpenOption.READ);
checkMagic();
journal.setVersion(readVersion());
if (journal.version != CURRENT_VERSION) {
// should not happen now, but future versions may delegate to a legacy reader
throw new IllegalStateException("Incorrect Version Detected " + journal.version);
}
journal.setKey(readString());
// starting sequence number.
read(NINE);
sequence = getSequence();
if (sequence == -1) {
throw new IllegalStateException("Invalid sequence read " + sequence);
}
startsequence = sequence;
begin = journalChannel.position();
}
private String readString() throws IOException {
int read = read(4);
int len = b.getInt();
if (read < 0 || len < 1) {
throw new IllegalStateException("Negative value returned. Possible corrupt file");
}
byte[] keyBytes = new byte[len];
read(len);
b.get(keyBytes);
return new String(keyBytes);
}
// attempts to read limit bytes from buffer
private int read(int limit) throws IOException {
if (limit == 0) {
return 0;
}
b.clear();
b.limit(limit);
int total = 0;
while (total < limit) {
int read = journalChannel.read(b);
if (read == -1) {
logger.debug("EOF when trying to read {} bytes", limit);
if (total == 0) {
total = -1;
}
break;
}
total += read;
}
b.flip();
return total;
}
@Override
public void close() throws IOException {
lock.lock();
try {
if (journalChannel != null) {
journalChannel.close();
}
journalChannel = null;
b = null;
} finally {
lock.unlock();
}
}
@SuppressWarnings("PreferredInterfaceType")
public static Collection<Path> getJournalPaths(Path dir) throws IOException {
ArrayList<Path> paths = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir, "*" + EXT)) {
for (Path path : stream) {
paths.add(path);
}
}
return paths;
}
/**
* Prints contents of a Journal to stdout.
*/
@SuppressWarnings("SystemOut")
public static void main(String[] args) throws Exception {
String path = args[0];
try (JournalReader jr = new JournalReader(Paths.get(path))) {
Journal j = jr.getJournal();
System.out.println("Journal File: " + path);
System.out.println("Journal Version: " + j.getKey());
for (JournalEntry je : j.getEntries()) {
System.out.println(je.toString());
}
}
}
}