How to balance my data across the partitions?
The memory overhead limit exceeding issue I think is due to DirectMemory buffers used during fetch. I think it's fixed in 2.0.0. (We had the same issue, but stopped digging much deeper when we found that upgrading to 2.0.0 resolved it. Unfortunately I don't have Spark issue numbers to back me up.)
The uneven partitions after repartition
are surprising. Contrast with https://github.com/apache/spark/blob/v2.0.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L443. Spark even generates random keys in repartition
, so it is not done with a hash that could be biased.
I tried your example and get the exact same results with Spark 1.6.2 and Spark 2.0.0. But not from Scala spark-shell
:
scala> val data = sc.parallelize(1 to 3, 3).mapPartitions { it => (1 to it.next * 1000).iterator }data: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:24scala> data.mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeqres1: Seq[Int] = WrappedArray(1000, 2000, 3000)scala> data.repartition(3).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeqres2: Seq[Int] = WrappedArray(1999, 2001, 2000)scala> data.repartition(6).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeqres3: Seq[Int] = WrappedArray(999, 1000, 1000, 1000, 1001, 1000)scala> data.repartition(12).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeqres4: Seq[Int] = WrappedArray(500, 501, 501, 501, 501, 500, 499, 499, 499, 499, 500, 500)
Such beautiful partitions!
(Sorry this is not a full answer. I just wanted to share my findings so far.)