JournaledChannelPool.java

package emissary.output.roller.journal;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.UUID;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;

/**
 * Pool implementation that utilizes a Journal to durably track state out written data. The implementation will create
 * up to the maximum configured output channels. Channels are lazily initialized to minimize resource utilization. The
 * Journal is only updated when files are created and committed.
 */
public class JournaledChannelPool implements AutoCloseable {
    public static final String EXTENSION = ".bgpart";
    private static final Logger LOG = LoggerFactory.getLogger(JournaledChannelPool.class);
    public static final int DEFAULT_MAX = 10;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition freeCondition = this.lock.newCondition();
    final int max;
    final Path directory;
    final String key;
    private final Deque<JournaledChannel> free = new ArrayDeque<>();
    private int created;
    @Nullable
    private JournaledChannel[] allchannels;

    @SuppressWarnings("CheckedExceptionNotThrown")
    public JournaledChannelPool(final Path directory, final String key, final int max) throws IOException {
        this.max = max;
        this.directory = directory;
        this.key = key;
        this.allchannels = new JournaledChannel[max];
    }

    int getFreeSize() {
        return this.free.size();
    }

    int getCreatedCount() {
        return this.created;
    }

    /**
     * Supplied key to identify the pool.
     * 
     * @return key
     */
    public String getKey() {
        return this.key;
    }

    /**
     * Returns an available output from the pool. This method will block if there are no free objects available and the max
     * number of outputs has been created.
     * 
     * @return an available KeyedOutput from the pool
     * @throws IOException If there is some I/O problem.
     * @throws InterruptedException If interrupted.
     */
    public KeyedOutput getFree() throws InterruptedException, IOException {
        JournaledChannel jc = null;
        this.lock.lock();
        try {
            checkClosed();
            jc = findFree();
            jc.setPosition();
            return new KeyedOutput(this, jc);
        } catch (Throwable t) {
            if (jc != null) {
                LOG.debug("Throwable occurred while obtaining channel. Returning to the pool. {}", jc.path, t);
                free(jc);
            }
            throw t;
        } finally {
            this.lock.unlock();
        }
    }

    /**
     * Flushes underlying channel and writes journal entry, updating current position.
     * 
     * @param jc the JournaledChannel to flush
     */
    void free(final JournaledChannel jc) {
        if (jc == null) {
            throw new IllegalArgumentException("Cannot return a null JournaledChannel.");
        }
        this.lock.lock();
        try {
            if (this.free.contains(jc) || !this.free.offer(jc)) {
                LOG.warn("Could not return the channel to the pool {}", this.key);
            }
            // signal everyone since close and find may be waiting
            this.freeCondition.signalAll();
        } finally {
            this.lock.unlock();
        }
    }

    /**
     * Closes the underlying pool. This method will block if any resources have not been returned.
     * 
     * @throws InterruptedException If interrupted.
     * @throws IOException If there is some I/O problem.
     */
    @Override
    public void close() throws InterruptedException, IOException {
        this.lock.lock();
        try {
            while (this.free.size() < this.created) {
                LOG.debug("Waiting for leased {} objects.", this.created - this.free.size());
                this.freeCondition.await();
            }
            for (final JournaledChannel fc : this.free) {
                this.allchannels[fc.index].close();
            }
            this.allchannels = null;
        } finally {
            this.lock.unlock();
        }
    }

    private void checkClosed() throws ClosedChannelException {
        if (this.allchannels == null) {
            throw new ClosedChannelException();
        }
    }

    private JournaledChannel findFree() throws InterruptedException, IOException {
        // if nothing is available, and we can create additional channels, do it
        // could get closed when we await
        while (this.free.isEmpty()) {
            if (this.created < this.max) {
                createChannel();
            } else {
                this.freeCondition.await();
                checkClosed();
            }
        }
        return this.free.poll();
    }

    private void createChannel() throws IOException {
        final Path p = Paths.get(this.directory.toString(), this.key + "_" + UUID.randomUUID().toString() + EXTENSION);
        final JournaledChannel ko = new JournaledChannel(p, this.key, this.created);
        this.allchannels[this.created++] = ko;
        this.free.add(ko);
    }

    Path getDirectory() {
        return this.directory;
    }
}