SeekableByteChannelHelper.java

package emissary.core.channels;

import emissary.core.IBaseDataObject;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;

/**
 * Helper methods to handle {@link java.nio.channels.SeekableByteChannel} objects
 */
public final class SeekableByteChannelHelper {
    private static final Logger logger = LoggerFactory.getLogger(SeekableByteChannelHelper.class);

    /** Channel factory backed by an empty byte array. Used for situations when a BDO should have its payload discarded. */
    public static final SeekableByteChannelFactory EMPTY_CHANNEL_FACTORY = memory(new byte[0]);

    private SeekableByteChannelHelper() {}

    /**
     * Make an existing factory immutable.
     * 
     * @param sbcf to make immutable
     * @return the wrapped factory
     */
    public static SeekableByteChannelFactory immutable(final SeekableByteChannelFactory sbcf) {
        return ImmutableChannelFactory.create(sbcf);
    }

    /**
     * Create an in memory SBC factory which can be used to create any number of channels based on the provided bytes
     * without storing them multiple times.
     * 
     * @param bytes to use with the channel
     * @return the factory
     */
    public static SeekableByteChannelFactory memory(final byte[] bytes) {
        return InMemoryChannelFactory.create(bytes);
    }

    /**
     * Create a file SBC factory.
     * 
     * @param path to the file.
     * @return the factory
     */
    public static SeekableByteChannelFactory file(final Path path) {
        return FileChannelFactory.create(path);
    }

    /**
     * Create a fill SBC factory.
     * 
     * @param size of the SeekableByteChannel
     * @param value of each element in the SeekableByteChannel.
     * @return the factory
     */
    public static SeekableByteChannelFactory fill(final long size, final byte value) {
        return FillChannelFactory.create(size, value);
    }

    /**
     * Create an InputStream SBC factory.
     * 
     * @param size of the SeekableByteChannel
     * @param inputStreamFactory creates the needed InputStreams.
     * @return the factory
     */
    public static SeekableByteChannelFactory inputStream(final long size, final InputStreamFactory inputStreamFactory) {
        return InputStreamChannelFactory.create(size, inputStreamFactory);
    }

    /**
     * Given a BDO, create a byte array with as much data as possible.
     * 
     * @param ibdo to get the data from
     * @param maxSize to limit the byte array to
     * @return a byte array of the data from the BDO sized up to maxSize (so could truncate data)
     */
    public static byte[] getByteArrayFromBdo(final IBaseDataObject ibdo, final int maxSize) {
        try (SeekableByteChannel sbc = ibdo.getChannelFactory().create()) {
            final long truncatedBy = sbc.size() - maxSize;
            if (truncatedBy > 0 && logger.isWarnEnabled()) {
                logger.warn("Returned data for [{}] will be truncated by {} bytes due to size constraints of byte arrays", ibdo.shortName(),
                        truncatedBy);
            }
            return getByteArrayFromChannel(ibdo.getChannelFactory(), maxSize);
        } catch (final IOException ioe) {
            logger.error("Error when fetching from byte channel factory on object {}", ibdo.shortName(), ioe);
            ibdo.setData(new byte[0]);
            return new byte[0];
        }
    }

    /**
     * Given a channel factory, create a byte array with as much data as possible.
     * 
     * @param sbcf to get the data from
     * @param maxSize to limit the byte array to
     * @return a byte array of the data from the factory sized up to maxSize (so could truncate data)
     * @throws IOException if we couldn't read all the data
     */
    public static byte[] getByteArrayFromChannel(final SeekableByteChannelFactory sbcf, final int maxSize) throws IOException {
        try (SeekableByteChannel sbc = sbcf.create()) {
            final int byteArraySize = (int) Math.min(sbc.size(), maxSize);
            final ByteBuffer buff = ByteBuffer.allocate(byteArraySize);

            IOUtils.readFully(sbc, buff);
            return buff.array();
        }
    }

    /**
     * Provided with an existing input stream, check how far we can read into it.
     * 
     * Note that the inputStream is read as-is, so if the stream is not at the start, this method won't take that into
     * account. If we can successfully read the stream, the position of the provided stream will of course change.
     * 
     * Don't wrap the provided stream with anything such as BufferedInputStream as this will cause read errors prematurely,
     * unless this is acceptable.
     * 
     * @param inputStream to read - caller must handle closing this object
     * @return position of last successful read (which could be the size of the stream)
     */
    public static long available(final InputStream inputStream) {
        long totalBytesRead = 0;
        try {
            for (; inputStream.read() != -1; totalBytesRead++) {
                // Do nothing.
            }
        } catch (final IOException ioe) {
            // Do nothing.
        }

        return totalBytesRead;
    }

    /**
     * Reads data from an input stream into a buffer
     * 
     * @param inputStream to read from
     * @param byteBuffer to read into
     * @param bytesToSkip within the {@code is} to get to the next read location
     * @throws IOException if an error occurs
     */
    public static int getFromInputStream(final InputStream inputStream, final ByteBuffer byteBuffer, final long bytesToSkip) throws IOException {
        Validate.notNull(inputStream, "Required: inputStream");
        Validate.notNull(byteBuffer, "Required: byteBuffer");
        Validate.isTrue(bytesToSkip > -1, "Required: bytesToSkip > -1");

        // Skip to position if we're not already there
        IOUtils.skipFully(inputStream, bytesToSkip);

        // Read direct into buffer's array if possible, otherwise copy through an internal buffer
        final int bytesToRead = byteBuffer.remaining();
        if (byteBuffer.hasArray()) {
            final int bytesRead = inputStream.read(byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(), bytesToRead);
            if (bytesRead > 0) {
                byteBuffer.position(byteBuffer.position() + bytesRead);
            }
            return bytesRead;
        } else {
            final byte[] internalBuff = new byte[bytesToRead];
            final int bytesRead = inputStream.read(internalBuff);
            if (bytesRead > 0) {
                byteBuffer.put(internalBuff, 0, bytesRead);
            }
            return bytesRead;
        }
    }
}