JournalWriter.java
package emissary.output.roller.journal;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import static emissary.output.roller.journal.Journal.SEP;
/**
* BG Write ahead log to track progress, often files, that have been successfully flushed to disk. The file itself
* contains metadata at the start, followed by a collection of log entries of fixed length, currently 1024. Leading
* metadata is in the format:
*
* <code>
* [Journal Magic][Journal Version][Journal key.size()][Journal key.getBytes()][Journal Sequence Number][null byte][1-n Journal Entries]
* </code>
*
* Record format:
*
* <code>
* [Journal sequence number][null byte][Entry value.size()][null byte][Entry val.getBytes()][null byte][position][null padded to fixed len]
* </code>
*/
public class JournalWriter implements Closeable {
private final ReentrantLock lock = new ReentrantLock();
@Nullable
private ByteBuffer b = ByteBuffer.allocateDirect(Journal.ENTRY_LENGTH);
// full path to journal file
final Path journalPath;
// current sequence value
private long sequence;
byte version;
String key;
// persisted journal
@Nullable
FileChannel journal;
JournalEntry prev;
public JournalWriter(final Path dir, final String key) throws IOException {
this(dir, key, key);
}
public JournalWriter(final Path dir, final String journalFileName, final String key) throws IOException {
this.journalPath = dir.resolve(journalFileName + Journal.EXT);
this.key = key;
checkJournal();
}
private void checkJournal() throws IOException {
if (Files.exists(journalPath) && Files.size(journalPath) > 0L) {
throw new IllegalStateException("Journals Are Immutable");
} else {
Files.deleteIfExists(journalPath);
}
}
/**
* Write to journal
*
* @return position difference between last entry and current
*/
public long write(JournalEntry e) throws IOException {
lock.lock();
try {
if (journal == null) {
writeHeader();
}
b.clear();
b.putLong(++sequence);
b.put(SEP);
e.serialize(b);
// fixed record length format so zero out everything
nullpad();
write();
return prev == null ? e.offset : e.offset - prev.offset;
} finally {
prev = e;
lock.unlock();
}
}
private void write() throws IOException {
b.flip();
journal.write(b);
b.clear();
}
private void writeHeader() throws IOException {
this.journal = FileChannel.open(journalPath, StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
b.clear();
b.put(Journal.MAGIC);
b.put(Journal.CURRENT_VERSION);
byte[] keyBytes = key.getBytes();
b.putInt(keyBytes.length);
for (int i = 0; i < keyBytes.length; i++) {
if (b.remaining() == 0) {
write();
}
b.put(keyBytes[i]);
}
if (b.remaining() < Journal.NINE) {
write();
}
sequence = System.currentTimeMillis();
b.putLong(sequence);
b.put(SEP);
write();
}
// fill buffer with zeros from current position to limit
private void nullpad() {
while (b.hasRemaining()) {
b.put(SEP);
}
}
/**
* Closes underlying journal channel.
*/
@Override
public void close() throws IOException {
lock.lock();
try {
if (journal != null) {
journal.close();
}
journal = null;
b = null;
} finally {
lock.unlock();
}
}
}