How to read parquet files using `ssc.fileStream()`? What are the types passed to `ssc.fileStream()`?
My sample to read parquet files in Spark Streaming is below.
val ssc = new StreamingContext(sparkConf, Seconds(2))ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "parquet.avro.AvroReadSupport")val stream = ssc.fileStream[Void, GenericRecord, ParquetInputFormat[GenericRecord]]( directory, { path: Path => path.toString.endsWith("parquet") }, true, ssc.sparkContext.hadoopConfiguration)val lines = stream.map(row => { println("row:" + row.toString()) row})
Some points are ...
- record type is GenericRecord
- readSupportClass is AvroReadSupport
- pass Configuration to fileStream
- set parquet.read.support.class to the Configuration
I referred to source codes below for creating sample.
And I also could not find good examples.
I would like to wait better one.
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
https://github.com/Parquet/parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
You can access the parquet by adding some parquet
specific hadoop
settings :
val ssc = new StreamingContext(conf, Seconds(5))var schema =StructType(Seq( StructField("a", StringType, nullable = false), ........ ))val schemaJson=schema.jsonval fileDir="/tmp/fileDir"ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport") ssc.sparkContext.hadoopConfiguration.set("org.apache.spark.sql.parquet.row.requested_schema", schemaJson)ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_BINARY_AS_STRING.key, "false")ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, "false")ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, "false")ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_BINARY_AS_STRING.key, "false")val streamRdd = ssc.fileStream[Void, UnsafeRow, ParquetInputFormat[UnsafeRow]](fileDir,(t: org.apache.hadoop.fs.Path) => true, false)streamRdd.count().print()ssc.start()ssc.awaitTermination()
This code was prepared with Spark 2.1.0
.