How to Define Custom partitioner for Spark RDDs of equally sized partition where each partition has equal number of elements? How to Define Custom partitioner for Spark RDDs of equally sized partition where each partition has equal number of elements? hadoop hadoop

How to Define Custom partitioner for Spark RDDs of equally sized partition where each partition has equal number of elements?


Partitioners work by assigning a key to a partition. You would need prior knowledge of the key distribution, or look at all keys, to make such a partitioner. This is why Spark does not provide you with one.

In general you do not need such a partitioner. In fact I cannot come up with a use case where I would need equal-size partitions. What if the number of elements is odd?

Anyway, let us say you have an RDD keyed by sequential Ints, and you know how many in total. Then you could write a custom Partitioner like this:

class ExactPartitioner[V](    partitions: Int,    elements: Int)  extends Partitioner {  def getPartition(key: Any): Int = {    val k = key.asInstanceOf[Int]    // `k` is assumed to go continuously from 0 to elements-1.    return k * partitions / elements  }}


This answer has some inspiration from Daniel, but provides a full implementation (using pimp my library pattern) with an example for peoples copy and paste needs :)

import RDDConversions._trait RDDWrapper[T] {  def rdd: RDD[T]}// TODO View bounds are deprecated, should use context bounds// Might need to change ClassManifest for ClassTag in spark 1.0.0case class RichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](  rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] {  // Here we use a single Long to try to ensure the sort is balanced,   // but for really large dataset, we may want to consider  // using a tuple of many Longs or even a GUID  def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] =    rdd.map(kv => ((kv._1, Random.nextLong()), kv._2)).sortByKey()    .grouped(numPartitions).map(t => (t._1._1, t._2))}case class RichRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] {  def grouped(size: Int): RDD[T] = {    // TODO Version where withIndex is cached    val withIndex = rdd.mapPartitions(_.zipWithIndex)    val startValues =      withIndex.mapPartitionsWithIndex((i, iter) =>         Iterator((i, iter.toIterable.last))).toArray().toList      .sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L)    withIndex.mapPartitionsWithIndex((i, iter) => iter.map {      case (value, index) => (startValues(i) + index.toLong, value)    })    .partitionBy(new Partitioner {      def numPartitions: Int = size      def getPartition(key: Any): Int =         (key.asInstanceOf[Long] * numPartitions.toLong / startValues.last).toInt    })    .map(_._2)  }}

Then in another file we have

// TODO modify above to be implicit class, rather than have implicit conversionsobject RDDConversions {  implicit def toRichRDD[T: ClassManifest](rdd: RDD[T]): RichRDD[T] =     new RichRDD[T](rdd)  implicit def toRichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](    rdd: RDD[(K, V)]): RichPairRDD[K, V] = RichPairRDD(rdd)  implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd}

Then for your use case you just want (assuming it's already sorted)

import RDDConversions._yourRdd.grouped(2)

Disclaimer: Not tested, kinda just wrote this straight into the SO answer


In newer version of Spark you could write your own Partitioner and make use of the method zipWithIndex

The idea is to

  • index your RDD
  • use the index as Key
  • Apply custom Partitioner based on the number of required partitions

An example code is shown below:

  // define custom Partitioner Class  class EqualDistributionPartitioner(numberOfPartitions: Int) extends Partitioner {    override def numPartitions: Int = numberOfPartitions    override def getPartition(key: Any): Int = {      (key.asInstanceOf[Long] % numberOfPartitions).toInt    }  }  // create test RDD (starting with one partition)  val testDataRaw = Seq(    ("field1_a", "field2_a"),    ("field1_b", "field2_b"),    ("field1_c", "field2_c"),    ("field1_d", "field2_d"),    ("field1_e", "field2_e"),    ("field1_f", "field2_f"),    ("field1_g", "field2_g"),    ("field1_h", "field2_h"),    ("field1_k", "field2_k"),    ("field1_l", "field2_l"),    ("field1_m", "field2_m"),    ("field1_n", "field2_n")  )  val testRdd: RDD[(String, String)] = spark.sparkContext.parallelize(testDataRaw, 1)  // create index  val testRddWithIndex: RDD[(Long, (String, String))] = testRdd.zipWithIndex().map(msg => (msg._2, msg._1))  // use index for equally distribution  // Example with six partitions  println("Example with 2 partitions:")  val equallyDistributedPartitionTwo = testRddWithIndex.partitionBy(new EqualDistributionPartitioner(2))  equallyDistributedPartitionTwo.foreach(k => println(s"Partition: ${TaskContext.getPartitionId()}, Content: $k"))  println("\nExample with 4 partitions:")  // Example with four partitions  val equallyDistributedPartitionFour = testRddWithIndex.partitionBy(new EqualDistributionPartitioner(4))  equallyDistributedPartitionFour.foreach(k => println(s"Partition: ${TaskContext.getPartitionId()}, Content: $k"))

where spark is your SparkSession.

As output you will get:

Example with 2 partitions:Partition: 0, Content: (0,(field1_a,field2_a))Partition: 1, Content: (1,(field1_b,field2_b))Partition: 0, Content: (2,(field1_c,field2_c))Partition: 1, Content: (3,(field1_d,field2_d))Partition: 0, Content: (4,(field1_e,field2_e))Partition: 1, Content: (5,(field1_f,field2_f))Partition: 0, Content: (6,(field1_g,field2_g))Partition: 1, Content: (7,(field1_h,field2_h))Partition: 0, Content: (8,(field1_k,field2_k))Partition: 1, Content: (9,(field1_l,field2_l))Partition: 0, Content: (10,(field1_m,field2_m))Partition: 1, Content: (11,(field1_n,field2_n))Example with 4 partitions:Partition: 0, Content: (0,(field1_a,field2_a))Partition: 0, Content: (4,(field1_e,field2_e))Partition: 0, Content: (8,(field1_k,field2_k))Partition: 3, Content: (3,(field1_d,field2_d))Partition: 3, Content: (7,(field1_h,field2_h))Partition: 3, Content: (11,(field1_n,field2_n))Partition: 1, Content: (1,(field1_b,field2_b))Partition: 1, Content: (5,(field1_f,field2_f))Partition: 1, Content: (9,(field1_l,field2_l))Partition: 2, Content: (2,(field1_c,field2_c))Partition: 2, Content: (6,(field1_g,field2_g))Partition: 2, Content: (10,(field1_m,field2_m))