How to instantiate FSDataInputStream with raw InputStream?
FSDataInputStream
constructor, as shown below, defined in FSDataInputStream.java expects InputStream
parameter to be an instance
of Seekable
or PositionedReadable
public FSDataInputStream(InputStream in) throws IOException { super(in); if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) { throw new IllegalArgumentException( "In is not an instance of Seekable or PositionedReadable"); } }
Hope following solution
helps you.
import java.io.*;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.PositionedReadable;import org.apache.hadoop.fs.Seekable;import org.springframework.core.io.ClassPathResource;import org.springframework.core.io.Resource;public class SeekableTest { public static void main(String[] args) throws IOException { Resource resource = new ClassPathResource("somefile"); InputStream in = resource.getInputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte buf[] = new byte[1024]; int read; while ((read = in.read(buf)) > 0) baos.write(buf, 0, read); byte data[] = baos.toByteArray(); SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data); FSDataInputStream in2 = new FSDataInputStream(bais); } static class SeekableByteArrayInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable { public SeekableByteArrayInputStream(byte[] buf) { super(buf); } @Override public long getPos() throws IOException{ return pos; } @Override public void seek(long pos) throws IOException { if (mark != 0) throw new IllegalStateException(); reset(); long skipped = skip(pos); if (skipped != pos) throw new IOException(); } @Override public boolean seekToNewSource(long targetPos) throws IOException { return false; } @Override public int read(long position, byte[] buffer, int offset, int length) throws IOException { if (position >= buf.length) throw new IllegalArgumentException(); if (position + length > buf.length) throw new IllegalArgumentException(); if (length > buffer.length) throw new IllegalArgumentException(); System.arraycopy(buf, (int) position, buffer, offset, length); return length; } @Override public void readFully(long position, byte[] buffer) throws IOException { read(position, buffer, 0, buffer.length); } @Override public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { read(position, buffer, offset, length); } }}
Reference: accumulo