How to use spark Java API to read binary file stream from HDFS?
try this
JavaStreamingContext context JavaSparkContext jContext = context.sparkContext(); JavaPairRDD<String, PortableDataStream> rdd = jContext.binaryFiles(fsURI + directoryPath); JavaRDD<Object> rdd1 = rdd.map(new Function<Tuple2<String, PortableDataStream>, Object>() { private static final long serialVersionUID = -7894402430221488712L; @Override public Object call(Tuple2<String, PortableDataStream> arg0) throws Exception { byte[] imageInByte = arg0._2().toArray(); String base64Encoded = DatatypeConverter.printBase64Binary(imageInByte); return (arg0._1 + Constants.COMMA_DELIMITER + base64Encoded).getBytes(); } }); java.util.Queue<JavaRDD<Object>> queue = new LinkedList(); queue.add(rdd1); JavaDStream<Object> dStream = context.queueStream(queue);
The only limitation with this apparoach is that it will not be able to read new files from HDFS created after starting this pipeline.
Use this Approach:Write a Custom Receiver:
import java.io.IOException;import java.nio.charset.StandardCharsets;import java.util.ArrayList;import java.util.List;import java.util.Map;import java.util.Map.Entry;import javax.xml.bind.DatatypeConverter;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.input.PortableDataStream;import org.apache.spark.sql.DataFrame;import org.apache.spark.sql.SQLContext;import org.apache.spark.storage.StorageLevel;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairInputDStream;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.receiver.Receiver;class DFSReceiver extends Receiver<byte[]> { /** The Constant serialVersionUID. */ private static final long serialVersionUID = -1051061769769056605L; Long windowSize = 20000l; /** Instantiates a new RMQ receiver. */ DFSReceiver() { super(StorageLevel.MEMORY_AND_DISK_SER_2()); } @Override public void onStart() { System.out.println("Inside onStart method"); new Thread() { @Override public void run() { try { receive(); } } catch (Exception e) { e.printStackTrace(); LOGGER.error("Exception raised at DFSReceiverHelper , exception : " + e); } } }.start(); } /** Receive. * * @throws Exception * the exception */ protected void receive() throws Exception { try { ConnectionMetadata connectionMetadata = ConnectionMetadataFactory.getConnectionMetadataObj(ConnectionConstants.HDFS_DATA_STORE); String connectionId = connectionMetadata.getConnectionId(ConnectionConstants.HDFS_DATA_STORE, connectionName); ConnectionMetaDataDTO c = connectionMetadata.getConnectionMetaDataById(connectionId); Map<String, Object> map = connectionMetadata.getConnectionConfigParameters(c); FileSystem fs = HDFSUtils.getFileSystemInstance(map); JavaPairRDD<String, PortableDataStream> rdd = sparkContext.binaryFiles(fsURI + directoryPath); List<Tuple2<String, PortableDataStream>> rddList = rdd.collect(); for (Tuple2<String, PortableDataStream> arg0 : rddList) { byte[] imageInByte = arg0._2().toArray(); String base64Encoded = DatatypeConverter.printBase64Binary(imageInByte); store((arg0._1 + Constants.COMMA_DELIMITER + base64Encoded).getBytes()); } Long time = System.currentTimeMillis(); System.out.println(); Thread.currentThread().sleep(windowSize); while (true) { List<Path> newFiles = checkIfNewFileCreated(fs, new Path(fsURI + directoryPath), time); for (Path p : newFiles) { JavaPairRDD<String, PortableDataStream> rdd11 = sparkContext.binaryFiles(p.toString()); Tuple2<String, PortableDataStream> arg0 = rdd11.first(); byte[] imageInByte = arg0._2().toArray(); String base64Encoded = DatatypeConverter.printBase64Binary(imageInByte); store((arg0._1 + Constants.COMMA_DELIMITER + base64Encoded).getBytes()); } Thread.currentThread().sleep(windowSize); time += windowSize; } } catch (ShutdownSignalException s) { LOGGER.error("ShutdownSignalException raised in receive method of DFSReceiver", s); } } private List<Path> checkIfNewFileCreated(FileSystem fs, Path p, Long timeStamp) throws IOException { List<Path> fileList = new ArrayList<>(); if (fs.isDirectory(p)) { FileStatus[] fStatus = fs.listStatus(p); for (FileStatus status : fStatus) { if (status.isFile() && timeStamp < status.getModificationTime() && timeStamp + windowSize >= status.getModificationTime()) { fileList.add(status.getPath()); } } } return fileList; } @Override public void onStop() { }}
With this receiver you will be able to read newly created files also every 20 second.