Pyspark: Parse a column of json strings Pyspark: Parse a column of json strings json json

Pyspark: Parse a column of json strings


For Spark 2.1+, you can use from_json which allows the preservation of the other non-json columns within the dataframe as follows:

from pyspark.sql.functions import from_json, coljson_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schemadf.withColumn('json', from_json(col('json'), json_schema))

You let Spark derive the schema of the json string column. Then the df.json column is no longer a StringType, but the correctly decoded json structure, i.e., nested StrucType and all the other columns of df are preserved as-is.

You can access the json content as follows:

df.select(col('json.header').alias('header'))


Converting a dataframe with json strings to structured dataframe is'a actually quite simple in spark if you convert the dataframe to RDD of strings before (see: http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets)

For example:

>>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json))>>> new_df.printSchema()root |-- body: struct (nullable = true) |    |-- id: long (nullable = true) |    |-- name: string (nullable = true) |    |-- sub_json: struct (nullable = true) |    |    |-- id: long (nullable = true) |    |    |-- sub_sub_json: struct (nullable = true) |    |    |    |-- col1: long (nullable = true) |    |    |    |-- col2: string (nullable = true) |-- header: struct (nullable = true) |    |-- foo: string (nullable = true) |    |-- id: long (nullable = true)


Existing answers do not work if your JSON is anything but perfectly/traditionally formatted. For example, the RDD-based schema inference expects JSON in curly-braces {} and will provide an incorrect schema (resulting in null values) if, for example, your data looks like:

[  {    "a": 1.0,    "b": 1  },  {    "a": 0.0,    "b": 2  }]

I wrote a function to work around this issue by sanitizing JSON such that it lives in another JSON object:

def parseJSONCols(df, *cols, sanitize=True):    """Auto infer the schema of a json column and parse into a struct.    rdd-based schema inference works if you have well-formatted JSON,    like ``{"key": "value", ...}``, but breaks if your 'JSON' is just a    string (``"data"``) or is an array (``[1, 2, 3]``). In those cases you    can fix everything by wrapping the data in another JSON object    (``{"key": [1, 2, 3]}``). The ``sanitize`` option (default True)    automatically performs the wrapping and unwrapping.    The schema inference is based on this    `SO Post <https://stackoverflow.com/a/45880574)/>`_.    Parameters    ----------    df : pyspark dataframe        Dataframe containing the JSON cols.    *cols : string(s)        Names of the columns containing JSON.    sanitize : boolean        Flag indicating whether you'd like to sanitize your records        by wrapping and unwrapping them in another JSON object layer.    Returns    -------    pyspark dataframe        A dataframe with the decoded columns.    """    res = df    for i in cols:        # sanitize if requested.        if sanitize:            res = (                res.withColumn(                    i,                    psf.concat(psf.lit('{"data": '), i, psf.lit('}'))                )            )        # infer schema and apply it        schema = spark.read.json(res.rdd.map(lambda x: x[i])).schema        res = res.withColumn(i, psf.from_json(psf.col(i), schema))        # unpack the wrapped object if needed        if sanitize:            res = res.withColumn(i, psf.col(i).data)    return res

Note: psf = pyspark.sql.functions.