How to instantiate FSDataInputStream with raw InputStream? How to instantiate FSDataInputStream with raw InputStream? hadoop hadoop

How to instantiate FSDataInputStream with raw InputStream?


FSDataInputStream constructor, as shown below, defined in FSDataInputStream.java expects InputStream parameter to be an instance of Seekable or PositionedReadable

public FSDataInputStream(InputStream in) throws IOException  {    super(in);    if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {      throw new IllegalArgumentException(          "In is not an instance of Seekable or PositionedReadable");    } }

Hope following solution helps you.

import java.io.*;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.PositionedReadable;import org.apache.hadoop.fs.Seekable;import org.springframework.core.io.ClassPathResource;import org.springframework.core.io.Resource;public class SeekableTest {    public static void main(String[] args) throws IOException    {        Resource resource = new ClassPathResource("somefile");        InputStream in = resource.getInputStream();        ByteArrayOutputStream baos = new ByteArrayOutputStream();        byte buf[] = new byte[1024];        int read;        while ((read = in.read(buf)) > 0)          baos.write(buf, 0, read);        byte data[] = baos.toByteArray();        SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data);        FSDataInputStream in2 = new FSDataInputStream(bais);    }    static class SeekableByteArrayInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable {        public SeekableByteArrayInputStream(byte[] buf)        {            super(buf);        }        @Override        public long getPos() throws IOException{            return pos;        }        @Override        public void seek(long pos) throws IOException {          if (mark != 0)            throw new IllegalStateException();          reset();          long skipped = skip(pos);          if (skipped != pos)            throw new IOException();        }        @Override        public boolean seekToNewSource(long targetPos) throws IOException {          return false;        }        @Override        public int read(long position, byte[] buffer, int offset, int length) throws IOException {          if (position >= buf.length)            throw new IllegalArgumentException();          if (position + length > buf.length)            throw new IllegalArgumentException();          if (length > buffer.length)            throw new IllegalArgumentException();          System.arraycopy(buf, (int) position, buffer, offset, length);          return length;        }        @Override        public void readFully(long position, byte[] buffer) throws IOException {          read(position, buffer, 0, buffer.length);        }        @Override        public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {          read(position, buffer, offset, length);        }    }}

Reference: accumulo