InputStreamChannelFactory.java

package emissary.core.channels;

import jakarta.annotation.Nullable;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.commons.lang3.Validate;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;

public class InputStreamChannelFactory {
    private InputStreamChannelFactory() {}

    public static final int SIZE_IS_UNKNOWN = -1;

    /**
     * Creates a factory implementation based on an {@link InputStreamFactory}
     * 
     * @param size if known, else provide a negative value to allow the factory to work out the size upon first create
     * @param inputStreamFactory for the data
     * @return an InputStreamChannelFactory instance of the data
     */
    public static SeekableByteChannelFactory create(final long size, final InputStreamFactory inputStreamFactory) {
        return new InputStreamChannelFactoryImpl(size, inputStreamFactory);
    }

    private static class InputStreamChannelFactoryImpl implements SeekableByteChannelFactory {
        private final long size;
        @Nullable
        private final IOException ioException;
        private final InputStreamFactory inputStreamFactory;

        public InputStreamChannelFactoryImpl(final long size, final InputStreamFactory inputStreamFactory) {
            Validate.notNull(inputStreamFactory, "Required: inputStream not null");

            // If the size is unknown then calculate it and save any IOException that occurs.
            if (size < 0) {
                long tempSize = SIZE_IS_UNKNOWN;
                IOException tempIoException = null;

                try (InputStream is = inputStreamFactory.create()) {
                    tempSize = SeekableByteChannelHelper.available(is);
                } catch (IOException e) {
                    tempIoException = e;
                }

                this.size = tempSize;
                ioException = tempIoException;
            } else {
                this.size = size;
                this.ioException = null;
            }

            this.inputStreamFactory = inputStreamFactory;
        }

        @Override
        public SeekableByteChannel create() {
            return new InputStreamChannel(size, inputStreamFactory, ioException);
        }
    }

    private static class InputStreamChannel extends AbstractSeekableByteChannel {
        /**
         * The InputStreamFactory used to get InputStream instances.
         */
        private final InputStreamFactory inputStreamFactory;
        private final long size;
        private final IOException ioException;

        /**
         * The current InputStream instance.
         */
        @Nullable
        private BoundedInputStream inputStream;

        /**
         * Create a new InputStreamChannel instance with a fixed size and data source
         * 
         * @param size of the InputStreamChannel
         * @param inputStreamFactory data source
         */
        public InputStreamChannel(final long size, final InputStreamFactory inputStreamFactory, final IOException ioException) {
            Validate.notNull(inputStreamFactory, "Required: inputStreamFactory not null!");

            this.size = size;
            this.inputStreamFactory = inputStreamFactory;
            this.ioException = ioException;
        }

        @Override
        protected final int readImpl(final ByteBuffer byteBuffer) throws IOException {
            if (inputStream != null && position() < inputStream.getCount()) {
                inputStream.close();
                inputStream = null;
            }

            if (inputStream == null) {
                inputStream = BoundedInputStream.builder()
                        .setInputStream(inputStreamFactory.create())
                        .get();
            }

            // Actually perform the read
            return SeekableByteChannelHelper.getFromInputStream(inputStream, byteBuffer, position() - inputStream.getCount());
        }


        @Override
        protected long sizeImpl() throws IOException {
            if (ioException != null) {
                throw ioException;
            }

            return size;
        }

        @Override
        protected final void closeImpl() throws IOException {
            if (inputStream != null) {
                inputStream.close();
                inputStream = null;
            }
        }
    }
}