Split JSON string column to multiple columns Split JSON string column to multiple columns hadoop hadoop

Split JSON string column to multiple columns


If it's a CSV file and only one column is coming as JSON data. You can use following solution.

val csvDF = spark.read.option("delimiter", "|").option("inferSchema", true).option("header", true).csv("test.csv")val rdd = csvDF.select(" json_data").rdd.map(_.getString(0))val ds = rdd.toDSval jsonDF = spark.read.json(ds)val jsonDFWithID = jsonDF.withColumn("id", monotonically_increasing_id())val csvDFWithID = csvDF.select($"id ").withColumn("id", monotonically_increasing_id())val joinDF = jsonDFWithID.join(csvDFWithID, "id").drop("id")

This is how final Data Frame look like.

scala> joinDF.printSchema()root |-- address: struct (nullable = true) |    |-- city: string (nullable = true) |    |-- state: string (nullable = true) |-- depts: array (nullable = true) |    |-- element: string (containsNull = true) |-- name: string (nullable = true) |-- sal: long (nullable = true) |-- id : double (nullable = true)

Following solution would work if it's a JSON file. for me. inferSchema works perfectly fine.

json File

~/Downloads ▶ cat test.json{"id": 1, "name":"abc", "depts":["dep01", "dep02"]},{"id": 2, "name":"xyz", "depts" :["dep03"],"sal":100}

code

scala> scc.read.format("json").option("inerSchema", true).load("Downloads/test.json").show()+--------------+---+----+----+|         depts| id|name| sal|+--------------+---+----+----+|[dep01, dep02]|  1| abc|null||       [dep03]|  2| xyz| 100|+--------------+---+----+----+


Assuming json_data is of type map (which you can always convert to map if it's not), you can use getItem:

df = spark.createDataFrame([    [1, {"name": "abc", "depts": ["dep01", "dep02"]}],    [2, {"name": "xyz", "depts": ["dep03"], "sal": 100}]],    ['id', 'json_data'])df.select(    df.id,     df.json_data.getItem('name').alias('name'),     df.json_data.getItem('depts').alias('depts'),     df.json_data.getItem('sal').alias('sal')).show()+---+----+--------------+----+| id|name|         depts| sal|+---+----+--------------+----+|  1| abc|[dep01, dep02]|null||  2| xyz|       [dep03]| 100|+---+----+--------------+----+

A more dynamic way to extract columns:

cols = ['name', 'depts', 'sal']df.select(df.id, *(df.json_data.getItem(col).alias(col) for col in cols)).show()


Based on @Gaurang Shah's answer, I have implemented a solution to handle nested JSON structure and fixed the issues with using monotonically_increasing_id(Non-sequential)

In this approach, 'populateColumnName' function recursively checks for StructType column and populate the column name.

'renameColumns' function renames the columns by replacing '.' with '_' to identify the nested json fields.

'addIndex' function adds index to the dataframe to join the dataframe after parsing the JSON column.

def flattenJSON(df : DataFrame, columnName: String) : DataFrame = {    val indexCol = "internal_temp_id"    def populateColumnName(col : StructField) : Array[String] = {        col.dataType match {          case struct: StructType => struct.fields.flatMap(populateColumnName).map(col.name + "." + _)          case rest         => Array(col.name)        }    }    def renameColumns(name : String) : String = {        if(name contains ".") {            name + " as " + name.replaceAll("\\.", "_")        }        else name    }    def addIndex(df : DataFrame) : DataFrame = {        // Append "rowid" column of type Long        val newSchema = StructType(df.schema.fields ++ Array(StructField(indexCol, LongType, false)))        // Zip on RDD level        val rddWithId = df.rdd.zipWithIndex        // Convert back to DataFrame        spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)    }    val dfWithID = addIndex(df)    val jsonDF = df.select(columnName)    val ds = jsonDF.rdd.map(_.getString(0)).toDS    val parseDF = spark.read.option("inferSchema",true).json(ds)    val columnNames = parseDF.schema.fields.flatMap(populateColumnName).map(renameColumns)    var resultDF = parseDF.selectExpr(columnNames:_*)    val jsonDFWithID = addIndex(resultDF)    val joinDF = dfWithID.join(jsonDFWithID, indexCol).drop(indexCol)    joinDF}val res = flattenJSON(jsonDF, "address")