JournaledChannel.java

package emissary.output.roller.journal;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import javax.annotation.Nullable;

/**
 * Wrapper class to allow for use of underlying channel in either OutputStream code or WritableChannel.
 */
public class JournaledChannel extends OutputStream implements SeekableByteChannel {

    static final Logger LOG = LoggerFactory.getLogger(JournaledChannel.class);
    // 128k
    static final int BUFF_SIZE = 128 * 1024;
    @Nullable
    FileChannel fc;
    Path path;
    @Nullable
    JournalEntry e;
    final int index;
    final JournalWriter journal;
    @Nullable
    ByteBuffer directBuff;

    JournaledChannel(final Path path, final String key, final int index) throws IOException {
        this.fc = FileChannel.open(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.READ, StandardOpenOption.WRITE);
        this.path = path;
        this.index = index;
        this.journal = new JournalWriter(path.getParent(), path.getFileName().toString(), key);
        this.directBuff = ByteBuffer.allocateDirect(BUFF_SIZE);
        writeEntry();
    }

    @Nullable
    private byte[] b1 = null;

    @Override
    public void write(final int b) throws IOException {
        if (this.b1 == null) {
            this.b1 = new byte[1];
        }
        this.b1[0] = (byte) b;
        this.write(this.b1);
    }

    @Override
    public void write(final byte[] bs, final int off, final int len) throws IOException {
        if ((off < 0) || (off > bs.length) || (len < 0) || ((off + len) > bs.length) || ((off + len) < 0)) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return;
        }

        int remaining = len;
        int offset = off;
        while (remaining > 0) {
            final int limit = Math.min(remaining, this.directBuff.capacity());
            this.directBuff.clear();
            this.directBuff.put(bs, offset, limit);
            this.directBuff.flip();
            while (this.directBuff.hasRemaining()) {
                if (this.fc.write(this.directBuff) <= 0) {
                    throw new IllegalStateException("no bytes written");
                }
            }
            offset += limit;
            remaining -= limit;
        }
    }

    @Override
    public int write(final ByteBuffer src) throws IOException {
        // doing this to avoid java bug that caused direct memory leaks.
        // could be removed in java 9 when they provide a JVM argument to
        // limit caching
        if (!src.isDirect()) {
            int written = 0;
            while (src.hasRemaining()) {
                this.directBuff.clear();
                while (this.directBuff.hasRemaining() && src.hasRemaining()) {
                    this.directBuff.put(src.get());
                }
                this.directBuff.flip();
                written += this.fc.write(this.directBuff);
            }
            return written;
        } else {
            return this.fc.write(src);
        }
    }

    @Override
    public long position() throws IOException {
        return this.fc.position();
    }

    /* Unsupported operations */
    @Override
    public SeekableByteChannel position(final long newPosition) throws IOException {
        throw new UnsupportedOperationException("This operation is not permitted");
    }

    @Override
    public long size() throws IOException {
        return this.fc.size();
    }

    @Override
    public boolean isOpen() {
        return (this.fc != null) && this.fc.isOpen();
    }

    /**
     * Sets the position of the channel according to the current entry. Should only be called by the pool.
     * 
     * @throws IOException If there is some I/O problem.
     */
    void setPosition() throws IOException {
        if (this.e.getOffset() != this.fc.position()) {
            this.fc.position(this.e.getOffset());
        }
    }

    /**
     * Commits writes to underlying storage. This method should only be called after a successful write.
     * 
     * @throws IOException If there is some I/O problem.
     */
    public final void commit() throws IOException {
        writeEntry();
    }

    private void writeEntry() throws IOException {
        final JournalEntry entry = new JournalEntry(this.path.toString(), this.fc.position());
        this.journal.write(entry);
        this.e = entry;
    }

    /**
     * Closes this Channel/Output Stream by releasing resources to underlying pool. Further calls result in unspecified
     * behavior
     * 
     * @throws IOException If there is some I/O problem.
     */
    @Override
    public void close() throws IOException {
        if (this.fc != null) {
            this.fc.close();
            this.fc = null;
        }
        this.journal.close();
        this.e = null;
        this.directBuff = null;
    }

    /**
     * Opens this object for writing and sets position according to current Journal Entry.
     * 
     * @throws IOException If there is some I/O problem.
     */
    void open() throws IOException {
        LOG.debug("Opening channel for writing {}", this.path);
        if (this.fc.position() != this.e.getOffset()) {
            this.fc.position(this.e.getOffset());
        }
    }

    @Override
    public int read(final ByteBuffer dst) throws IOException {
        throw new UnsupportedOperationException("This operation is not permitted");
    }

    @Override
    public SeekableByteChannel truncate(final long size) throws IOException {
        throw new UnsupportedOperationException("This operation is not permitted");
    }
}