Retain keys with null values while writing JSON in spark
Apparently, spark does not provide any option to handle nulls. So following custom solution should work.
import com.fasterxml.jackson.module.scala.DefaultScalaModuleimport com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapperimport com.fasterxml.jackson.databind.ObjectMappercase class EventHeader(accept_language:String,app_id:String,app_name:String,client_ip_address:String,event_id: String,event_timestamp:String,offering_id:String,server_ip_address:String,server_timestamp:Long,topic_name:String,version:String)val ds = Seq(EventHeader(null,"App_ID",null,"IP","ID",null,"Offering","IP",1492565987565L,"Topic","1.0")).toDS()val ds1 = ds.mapPartitions(records => {val mapper = new ObjectMapper with ScalaObjectMappermapper.registerModule(DefaultScalaModule)records.map(mapper.writeValueAsString(_))})ds1.coalesce(1).write.text("hdfs://localhost:9000/user/dedupe_employee")
This will produce output as :
{"accept_language":null,"app_id":"App_ID","app_name":null,"client_ip_address":"IP","event_id":"ID","event_timestamp":null,"offering_id":"Offering","server_ip_address":"IP","server_timestamp":1492565987565,"topic_name":"Topic","version":"1.0"}
If you are on Spark 3, you can add
spark.sql.jsonGenerator.ignoreNullFields false
ignoreNullFields
is an option to set when you want DataFrame converted to json file since Spark 3.
If you need Spark 2 (specifically PySpark 2.4.6), you can try converting DataFrame to rdd with Python dict format. And then call pyspark.rdd.saveTextFile
to output json file to hdfs. The following example may help.
cols = ddp.columnsddp_ = ddp.rddddp_ = ddp_.map(lambda row: dict([(c, row[c]) for c in cols])ddp_ = ddp.repartition(1).saveAsTextFile(your_hdfs_file_path)
This should produce output file like,
{"accept_language": None, "app_id":"123", ...}{"accept_language": None, "app_id":"456", ...}
What's more, if you want to replace Python None
with JSON null
, you will need to dump every dict into json.
ddp_ = ddp_.map(lambda row: json.dumps(row, ensure.ascii=False))