How do you perform basic joins of two RDD tables in Spark using Python? How do you perform basic joins of two RDD tables in Spark using Python? python python

How do you perform basic joins of two RDD tables in Spark using Python?


It can be done either using PairRDDFunctions or Spark Data Frames. Since data frame operations benefit from Catalyst Optimizer the second option is worth considering.

Assuming your data looks as follows:

rdd1 =  sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])rdd2 =  sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6)])

With PairRDDs:

Inner join:

rdd1.join(rdd2)

Left outer join:

rdd1.leftOuterJoin(rdd2)

Cartesian product (doesn't require RDD[(T, U)]):

rdd1.cartesian(rdd2)

Broadcast join (doesn't require RDD[(T, U)]):

Finally there is cogroup which has no direct SQL equivalent but can be useful in some situations:

cogrouped = rdd1.cogroup(rdd2)cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect()## [('foo', ([1], [4])), ('bar', ([2], [5, 6])), ('baz', ([3], []))]

With Spark Data Frames

You can use either SQL DSL or execute raw SQL using sqlContext.sql.

df1 = spark.createDataFrame(rdd1, ('k', 'v1'))df2 = spark.createDataFrame(rdd2, ('k', 'v2'))# Register temporary tables to be able to use `sparkSession.sql`df1.createOrReplaceTempView('df1')df2.createOrReplaceTempView('df2')

Inner join:

# inner is a default value so it could be omitteddf1.join(df2, df1.k == df2.k, how='inner') spark.sql('SELECT * FROM df1 JOIN df2 ON df1.k = df2.k')

Left outer join:

df1.join(df2, df1.k == df2.k, how='left_outer')spark.sql('SELECT * FROM df1 LEFT OUTER JOIN df2 ON df1.k = df2.k')

Cross join (explicit cross join or configuration changes are required in Spark. 2.0 - spark.sql.crossJoin.enabled for Spark 2.x):

df1.crossJoin(df2)spark.sql('SELECT * FROM df1 CROSS JOIN df2')

df1.join(df2)sqlContext.sql('SELECT * FROM df JOIN df2')

Since 1.6 (1.5 in Scala) each of these can be combined with broadcast function:

from pyspark.sql.functions import broadcastdf1.join(broadcast(df2), df1.k == df2.k)

to perform broadcast join. See also Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark