InputStreamChannelFactory.java
package emissary.core.channels;
import org.apache.commons.io.input.CountingInputStream;
import org.apache.commons.lang3.Validate;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import javax.annotation.Nullable;
public class InputStreamChannelFactory {
private InputStreamChannelFactory() {}
public static final int SIZE_IS_UNKNOWN = -1;
/**
* Creates a factory implementation based on an {@link InputStreamFactory}
*
* @param size if known, else provide a negative value to allow the factory to work out the size upon first create
* @param inputStreamFactory for the data
* @return an InputStreamChannelFactory instance of the data
*/
public static SeekableByteChannelFactory create(final long size, final InputStreamFactory inputStreamFactory) {
return new InputStreamChannelFactoryImpl(size, inputStreamFactory);
}
private static class InputStreamChannelFactoryImpl implements SeekableByteChannelFactory {
private final long size;
private final InputStreamFactory inputStreamFactory;
public InputStreamChannelFactoryImpl(final long size, final InputStreamFactory inputStreamFactory) {
Validate.notNull(inputStreamFactory, "Required: inputStream not null");
this.size = size;
this.inputStreamFactory = inputStreamFactory;
}
@Override
public SeekableByteChannel create() {
return new InputStreamChannel(size, inputStreamFactory);
}
}
private static class InputStreamChannel extends AbstractSeekableByteChannel {
/**
* The InputStreamFactory used to get InputStream instances.
*/
private final InputStreamFactory inputStreamFactory;
/**
* The current InputStream instance.
*/
@Nullable
private CountingInputStream inputStream;
private long size;
/**
* Create a new InputStreamChannel instance with a fixed size and data source
*
* @param size of the InputStreamChannel
* @param inputStreamFactory data source
*/
public InputStreamChannel(final long size, final InputStreamFactory inputStreamFactory) {
Validate.notNull(inputStreamFactory, "Required: inputStreamFactory not null!");
this.size = size;
this.inputStreamFactory = inputStreamFactory;
}
@Override
protected final int readImpl(final ByteBuffer byteBuffer) throws IOException {
if (inputStream != null && position() < inputStream.getByteCount()) {
inputStream.close();
inputStream = null;
}
if (inputStream == null) {
inputStream = new CountingInputStream(inputStreamFactory.create());
}
// Actually perform the read
return SeekableByteChannelHelper.getFromInputStream(inputStream, byteBuffer, position() - inputStream.getByteCount());
}
@Override
protected long sizeImpl() throws IOException {
if (size < 0) {
try (InputStream is = inputStreamFactory.create()) {
size = SeekableByteChannelHelper.available(is);
}
}
return size;
}
@Override
protected final void closeImpl() throws IOException {
if (inputStream != null) {
inputStream.close();
inputStream = null;
}
}
}
}