Primary keys with Apache Spark Primary keys with Apache Spark database database

Primary keys with Apache Spark


Scala:

If all you need is unique numbers you can use zipWithUniqueId and recreate DataFrame. First some imports and dummy data:

import sqlContext.implicits._import org.apache.spark.sql.Rowimport org.apache.spark.sql.types.{StructType, StructField, LongType}val df = sc.parallelize(Seq(    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")

Extract schema for further usage:

val schema = df.schema

Add id field:

val rows = df.rdd.zipWithUniqueId.map{   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

Create DataFrame:

val dfWithPK = sqlContext.createDataFrame(  rows, StructType(StructField("id", LongType, false) +: schema.fields))

The same thing in Python:

from pyspark.sql import Rowfrom pyspark.sql.types import StructField, StructType, LongTyperow = Row("foo", "bar")row_with_index = Row(*["id"] + df.columns)df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()def make_row(columns):    def _make_row(row, uid):        row_dict = row.asDict()        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])    return _make_rowf = make_row(df.columns)df_with_pk = (df.rdd    .zipWithUniqueId()    .map(lambda x: f(*x))    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

If you prefer consecutive number your can replace zipWithUniqueId with zipWithIndex but it is a little bit more expensive.

Directly with DataFrame API:

(universal Scala, Python, Java, R with pretty much the same syntax)

Previously I've missed monotonicallyIncreasingId function which should work just fine as long as you don't require consecutive numbers:

import org.apache.spark.sql.functions.monotonicallyIncreasingIddf.withColumn("id", monotonicallyIncreasingId).show()// +---+----+-----------+// |foo| bar|         id|// +---+----+-----------+// |  a|-1.0|17179869184|// |  b|-2.0|42949672960|// |  c|-3.0|60129542144|// +---+----+-----------+

While useful monotonicallyIncreasingId is non-deterministic. Not only ids may be different from execution to execution but without additional tricks cannot be used to identify rows when subsequent operations contain filters.

Note:

It is also possible to use rowNumber window function:

from pyspark.sql.window import Windowfrom pyspark.sql.functions import rowNumberw = Window().orderBy()df.withColumn("id", rowNumber().over(w)).show()

Unfortunately:

WARN Window: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

So unless you have a natural way to partition your data and ensure uniqueness is not particularly useful at this moment.


from pyspark.sql.functions import monotonically_increasing_iddf.withColumn("id", monotonically_increasing_id()).show()

Note that the 2nd argument of df.withColumn is monotonically_increasing_id() not monotonically_increasing_id .


I found the following solution to be relatively straightforward for the case where zipWithIndex() is the desired behavior, i.e. for those desirng consecutive integers.

In this case, we're using pyspark and relying on dictionary comprehension to map the original row object to a new dictionary which fits a new schema including the unique index.

# read the initial dataframe without indexdfNoIndex = sqlContext.read.parquet(dataframePath)# Need to zip together with a unique integer# First create a new schema with uuid field appendednewSchema = StructType([StructField("uuid", IntegerType(), False)]                       + dfNoIndex.schema.fields)# zip with the index, map it to a dictionary which includes new fielddf = dfNoIndex.rdd.zipWithIndex()\                      .map(lambda (row, id): {k:v                                              for k, v                                              in row.asDict().items() + [("uuid", id)]})\                      .toDF(newSchema)