How to use spark Java API to read binary file stream from HDFS? How to use spark Java API to read binary file stream from HDFS? hadoop hadoop

How to use spark Java API to read binary file stream from HDFS?


try this

     JavaStreamingContext context     JavaSparkContext jContext = context.sparkContext();     JavaPairRDD<String, PortableDataStream> rdd = jContext.binaryFiles(fsURI + directoryPath);     JavaRDD<Object> rdd1 = rdd.map(new Function<Tuple2<String, PortableDataStream>, Object>() {     private static final long serialVersionUID = -7894402430221488712L;     @Override     public Object call(Tuple2<String, PortableDataStream> arg0) throws Exception {     byte[] imageInByte = arg0._2().toArray();     String base64Encoded = DatatypeConverter.printBase64Binary(imageInByte);     return (arg0._1 + Constants.COMMA_DELIMITER + base64Encoded).getBytes();     }     });     java.util.Queue<JavaRDD<Object>> queue = new LinkedList();     queue.add(rdd1);     JavaDStream<Object> dStream = context.queueStream(queue);

The only limitation with this apparoach is that it will not be able to read new files from HDFS created after starting this pipeline.


Use this Approach:Write a Custom Receiver:

import java.io.IOException;import java.nio.charset.StandardCharsets;import java.util.ArrayList;import java.util.List;import java.util.Map;import java.util.Map.Entry;import javax.xml.bind.DatatypeConverter;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.input.PortableDataStream;import org.apache.spark.sql.DataFrame;import org.apache.spark.sql.SQLContext;import org.apache.spark.storage.StorageLevel;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairInputDStream;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.receiver.Receiver;class DFSReceiver extends Receiver<byte[]> {    /** The Constant serialVersionUID. */    private static final long serialVersionUID = -1051061769769056605L;    Long windowSize = 20000l;    /** Instantiates a new RMQ receiver. */    DFSReceiver() {        super(StorageLevel.MEMORY_AND_DISK_SER_2());    }    @Override    public void onStart() {        System.out.println("Inside onStart method");        new Thread() {            @Override            public void run() {                try {                    receive();                }                 } catch (Exception e) {                    e.printStackTrace();                    LOGGER.error("Exception raised at DFSReceiverHelper , exception : " + e);                }            }        }.start();    }    /** Receive.     *      * @throws Exception     *             the exception */    protected void receive() throws Exception {        try {            ConnectionMetadata connectionMetadata = ConnectionMetadataFactory.getConnectionMetadataObj(ConnectionConstants.HDFS_DATA_STORE);            String connectionId = connectionMetadata.getConnectionId(ConnectionConstants.HDFS_DATA_STORE, connectionName);            ConnectionMetaDataDTO c = connectionMetadata.getConnectionMetaDataById(connectionId);            Map<String, Object> map = connectionMetadata.getConnectionConfigParameters(c);            FileSystem fs = HDFSUtils.getFileSystemInstance(map);            JavaPairRDD<String, PortableDataStream> rdd = sparkContext.binaryFiles(fsURI + directoryPath);            List<Tuple2<String, PortableDataStream>> rddList = rdd.collect();            for (Tuple2<String, PortableDataStream> arg0 : rddList) {                byte[] imageInByte = arg0._2().toArray();                String base64Encoded = DatatypeConverter.printBase64Binary(imageInByte);                store((arg0._1 + Constants.COMMA_DELIMITER + base64Encoded).getBytes());            }            Long time = System.currentTimeMillis();            System.out.println();            Thread.currentThread().sleep(windowSize);            while (true) {                List<Path> newFiles = checkIfNewFileCreated(fs, new Path(fsURI + directoryPath), time);                for (Path p : newFiles) {                    JavaPairRDD<String, PortableDataStream> rdd11 = sparkContext.binaryFiles(p.toString());                    Tuple2<String, PortableDataStream> arg0 = rdd11.first();                    byte[] imageInByte = arg0._2().toArray();                    String base64Encoded = DatatypeConverter.printBase64Binary(imageInByte);                    store((arg0._1 + Constants.COMMA_DELIMITER + base64Encoded).getBytes());                }                Thread.currentThread().sleep(windowSize);                time += windowSize;            }        } catch (ShutdownSignalException s) {            LOGGER.error("ShutdownSignalException raised in receive method of DFSReceiver", s);        }    }    private List<Path> checkIfNewFileCreated(FileSystem fs, Path p, Long timeStamp) throws IOException {        List<Path> fileList = new ArrayList<>();        if (fs.isDirectory(p)) {            FileStatus[] fStatus = fs.listStatus(p);            for (FileStatus status : fStatus) {                if (status.isFile() && timeStamp < status.getModificationTime() && timeStamp + windowSize >= status.getModificationTime()) {                    fileList.add(status.getPath());                }            }        }        return fileList;    }    @Override    public void onStop() {    }}    

With this receiver you will be able to read newly created files also every 20 second.