How to Define Custom partitioner for Spark RDDs of equally sized partition where each partition has equal number of elements?
Partitioner
s work by assigning a key to a partition. You would need prior knowledge of the key distribution, or look at all keys, to make such a partitioner. This is why Spark does not provide you with one.
In general you do not need such a partitioner. In fact I cannot come up with a use case where I would need equal-size partitions. What if the number of elements is odd?
Anyway, let us say you have an RDD keyed by sequential Int
s, and you know how many in total. Then you could write a custom Partitioner
like this:
class ExactPartitioner[V]( partitions: Int, elements: Int) extends Partitioner { def getPartition(key: Any): Int = { val k = key.asInstanceOf[Int] // `k` is assumed to go continuously from 0 to elements-1. return k * partitions / elements }}
This answer has some inspiration from Daniel, but provides a full implementation (using pimp my library pattern) with an example for peoples copy and paste needs :)
import RDDConversions._trait RDDWrapper[T] { def rdd: RDD[T]}// TODO View bounds are deprecated, should use context bounds// Might need to change ClassManifest for ClassTag in spark 1.0.0case class RichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest]( rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] { // Here we use a single Long to try to ensure the sort is balanced, // but for really large dataset, we may want to consider // using a tuple of many Longs or even a GUID def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] = rdd.map(kv => ((kv._1, Random.nextLong()), kv._2)).sortByKey() .grouped(numPartitions).map(t => (t._1._1, t._2))}case class RichRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] { def grouped(size: Int): RDD[T] = { // TODO Version where withIndex is cached val withIndex = rdd.mapPartitions(_.zipWithIndex) val startValues = withIndex.mapPartitionsWithIndex((i, iter) => Iterator((i, iter.toIterable.last))).toArray().toList .sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L) withIndex.mapPartitionsWithIndex((i, iter) => iter.map { case (value, index) => (startValues(i) + index.toLong, value) }) .partitionBy(new Partitioner { def numPartitions: Int = size def getPartition(key: Any): Int = (key.asInstanceOf[Long] * numPartitions.toLong / startValues.last).toInt }) .map(_._2) }}
Then in another file we have
// TODO modify above to be implicit class, rather than have implicit conversionsobject RDDConversions { implicit def toRichRDD[T: ClassManifest](rdd: RDD[T]): RichRDD[T] = new RichRDD[T](rdd) implicit def toRichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest]( rdd: RDD[(K, V)]): RichPairRDD[K, V] = RichPairRDD(rdd) implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd}
Then for your use case you just want (assuming it's already sorted)
import RDDConversions._yourRdd.grouped(2)
Disclaimer: Not tested, kinda just wrote this straight into the SO answer
In newer version of Spark you could write your own Partitioner and make use of the method zipWithIndex
The idea is to
- index your RDD
- use the index as Key
- Apply custom
Partitioner
based on the number of required partitions
An example code is shown below:
// define custom Partitioner Class class EqualDistributionPartitioner(numberOfPartitions: Int) extends Partitioner { override def numPartitions: Int = numberOfPartitions override def getPartition(key: Any): Int = { (key.asInstanceOf[Long] % numberOfPartitions).toInt } } // create test RDD (starting with one partition) val testDataRaw = Seq( ("field1_a", "field2_a"), ("field1_b", "field2_b"), ("field1_c", "field2_c"), ("field1_d", "field2_d"), ("field1_e", "field2_e"), ("field1_f", "field2_f"), ("field1_g", "field2_g"), ("field1_h", "field2_h"), ("field1_k", "field2_k"), ("field1_l", "field2_l"), ("field1_m", "field2_m"), ("field1_n", "field2_n") ) val testRdd: RDD[(String, String)] = spark.sparkContext.parallelize(testDataRaw, 1) // create index val testRddWithIndex: RDD[(Long, (String, String))] = testRdd.zipWithIndex().map(msg => (msg._2, msg._1)) // use index for equally distribution // Example with six partitions println("Example with 2 partitions:") val equallyDistributedPartitionTwo = testRddWithIndex.partitionBy(new EqualDistributionPartitioner(2)) equallyDistributedPartitionTwo.foreach(k => println(s"Partition: ${TaskContext.getPartitionId()}, Content: $k")) println("\nExample with 4 partitions:") // Example with four partitions val equallyDistributedPartitionFour = testRddWithIndex.partitionBy(new EqualDistributionPartitioner(4)) equallyDistributedPartitionFour.foreach(k => println(s"Partition: ${TaskContext.getPartitionId()}, Content: $k"))
where spark
is your SparkSession
.
As output you will get:
Example with 2 partitions:Partition: 0, Content: (0,(field1_a,field2_a))Partition: 1, Content: (1,(field1_b,field2_b))Partition: 0, Content: (2,(field1_c,field2_c))Partition: 1, Content: (3,(field1_d,field2_d))Partition: 0, Content: (4,(field1_e,field2_e))Partition: 1, Content: (5,(field1_f,field2_f))Partition: 0, Content: (6,(field1_g,field2_g))Partition: 1, Content: (7,(field1_h,field2_h))Partition: 0, Content: (8,(field1_k,field2_k))Partition: 1, Content: (9,(field1_l,field2_l))Partition: 0, Content: (10,(field1_m,field2_m))Partition: 1, Content: (11,(field1_n,field2_n))Example with 4 partitions:Partition: 0, Content: (0,(field1_a,field2_a))Partition: 0, Content: (4,(field1_e,field2_e))Partition: 0, Content: (8,(field1_k,field2_k))Partition: 3, Content: (3,(field1_d,field2_d))Partition: 3, Content: (7,(field1_h,field2_h))Partition: 3, Content: (11,(field1_n,field2_n))Partition: 1, Content: (1,(field1_b,field2_b))Partition: 1, Content: (5,(field1_f,field2_f))Partition: 1, Content: (9,(field1_l,field2_l))Partition: 2, Content: (2,(field1_c,field2_c))Partition: 2, Content: (6,(field1_g,field2_g))Partition: 2, Content: (10,(field1_m,field2_m))