Split JSON string column to multiple columns
If it's a CSV
file and only one column is coming as JSON
data. You can use following solution.
val csvDF = spark.read.option("delimiter", "|").option("inferSchema", true).option("header", true).csv("test.csv")val rdd = csvDF.select(" json_data").rdd.map(_.getString(0))val ds = rdd.toDSval jsonDF = spark.read.json(ds)val jsonDFWithID = jsonDF.withColumn("id", monotonically_increasing_id())val csvDFWithID = csvDF.select($"id ").withColumn("id", monotonically_increasing_id())val joinDF = jsonDFWithID.join(csvDFWithID, "id").drop("id")
This is how final Data Frame look like.
scala> joinDF.printSchema()root |-- address: struct (nullable = true) | |-- city: string (nullable = true) | |-- state: string (nullable = true) |-- depts: array (nullable = true) | |-- element: string (containsNull = true) |-- name: string (nullable = true) |-- sal: long (nullable = true) |-- id : double (nullable = true)
Following solution would work if it's a JSON
file. for me. inferSchema
works perfectly fine.
json File
~/Downloads ▶ cat test.json{"id": 1, "name":"abc", "depts":["dep01", "dep02"]},{"id": 2, "name":"xyz", "depts" :["dep03"],"sal":100}
code
scala> scc.read.format("json").option("inerSchema", true).load("Downloads/test.json").show()+--------------+---+----+----+| depts| id|name| sal|+--------------+---+----+----+|[dep01, dep02]| 1| abc|null|| [dep03]| 2| xyz| 100|+--------------+---+----+----+
Assuming json_data
is of type map
(which you can always convert to map
if it's not), you can use getItem
:
df = spark.createDataFrame([ [1, {"name": "abc", "depts": ["dep01", "dep02"]}], [2, {"name": "xyz", "depts": ["dep03"], "sal": 100}]], ['id', 'json_data'])df.select( df.id, df.json_data.getItem('name').alias('name'), df.json_data.getItem('depts').alias('depts'), df.json_data.getItem('sal').alias('sal')).show()+---+----+--------------+----+| id|name| depts| sal|+---+----+--------------+----+| 1| abc|[dep01, dep02]|null|| 2| xyz| [dep03]| 100|+---+----+--------------+----+
A more dynamic way to extract columns:
cols = ['name', 'depts', 'sal']df.select(df.id, *(df.json_data.getItem(col).alias(col) for col in cols)).show()
Based on @Gaurang Shah's answer, I have implemented a solution to handle nested JSON structure and fixed the issues with using monotonically_increasing_id(Non-sequential)
In this approach, 'populateColumnName' function recursively checks for StructType column and populate the column name.
'renameColumns' function renames the columns by replacing '.' with '_' to identify the nested json fields.
'addIndex' function adds index to the dataframe to join the dataframe after parsing the JSON column.
def flattenJSON(df : DataFrame, columnName: String) : DataFrame = { val indexCol = "internal_temp_id" def populateColumnName(col : StructField) : Array[String] = { col.dataType match { case struct: StructType => struct.fields.flatMap(populateColumnName).map(col.name + "." + _) case rest => Array(col.name) } } def renameColumns(name : String) : String = { if(name contains ".") { name + " as " + name.replaceAll("\\.", "_") } else name } def addIndex(df : DataFrame) : DataFrame = { // Append "rowid" column of type Long val newSchema = StructType(df.schema.fields ++ Array(StructField(indexCol, LongType, false))) // Zip on RDD level val rddWithId = df.rdd.zipWithIndex // Convert back to DataFrame spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema) } val dfWithID = addIndex(df) val jsonDF = df.select(columnName) val ds = jsonDF.rdd.map(_.getString(0)).toDS val parseDF = spark.read.option("inferSchema",true).json(ds) val columnNames = parseDF.schema.fields.flatMap(populateColumnName).map(renameColumns) var resultDF = parseDF.selectExpr(columnNames:_*) val jsonDFWithID = addIndex(resultDF) val joinDF = dfWithID.join(jsonDFWithID, indexCol).drop(indexCol) joinDF}val res = flattenJSON(jsonDF, "address")