How to add a new Struct column to a DataFrame How to add a new Struct column to a DataFrame elasticsearch elasticsearch

How to add a new Struct column to a DataFrame


I assume you start with some kind of flat schema like this:

root |-- lat: double (nullable = false) |-- long: double (nullable = false) |-- key: string (nullable = false)

First lets create example data:

import org.apache.spark.sql.Rowimport org.apache.spark.sql.functions.{col, udf}import org.apache.spark.sql.types._val rdd = sc.parallelize(    Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil)val schema = StructType(    StructField("lat", DoubleType, false) ::    StructField("long", DoubleType, false) ::    StructField("key", StringType, false) ::Nil)val df = sqlContext.createDataFrame(rdd, schema)

An easy way is to use an udf and case class:

case class Location(lat: Double, long: Double)val makeLocation = udf((lat: Double, long: Double) => Location(lat, long))val dfRes = df.   withColumn("location", makeLocation(col("lat"), col("long"))).   drop("lat").   drop("long")dfRes.printSchema

and we get

root |-- key: string (nullable = false) |-- location: struct (nullable = true) |    |-- lat: double (nullable = false) |    |-- long: double (nullable = false)

A hard way is to transform your data and apply schema afterwards:

val rddRes = df.    map{case Row(lat, long, key) => Row(key, Row(lat, long))}val schemaRes = StructType(    StructField("key", StringType, false) ::    StructField("location", StructType(        StructField("lat", DoubleType, false) ::        StructField("long", DoubleType, false) :: Nil    ), true) :: Nil )sqlContext.createDataFrame(rddRes, schemaRes).show

and we get an expected output

+------+-------------+|   key|     location|+------+-------------+|Warsaw|[52.23,21.01]|| Corte|  [42.3,9.15]|+------+-------------+

Creating nested schema from scratch can be tedious so if you can I would recommend the first approach. It can be easily extended if you need more sophisticated structure:

case class Pin(location: Location)val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long))df.    withColumn("pin", makePin(col("lat"), col("long"))).    drop("lat").    drop("long").    printSchema

and we get expected output:

root |-- key: string (nullable = false) |-- pin: struct (nullable = true) |    |-- location: struct (nullable = true) |    |    |-- lat: double (nullable = false) |    |    |-- long: double (nullable = false)

Unfortunately you have no control over nullable field so if is important for your project you'll have to specify schema.

Finally you can use struct function introduced in 1.4:

import org.apache.spark.sql.functions.structdf.select($"key", struct($"lat", $"long").alias("location"))


Try this:

import org.apache.spark.sql.functions._df.registerTempTable("dt")dfres = sql("select struct(lat,lon) as colName from dt")