How to balance my data across the partitions? How to balance my data across the partitions? hadoop hadoop

How to balance my data across the partitions?


The memory overhead limit exceeding issue I think is due to DirectMemory buffers used during fetch. I think it's fixed in 2.0.0. (We had the same issue, but stopped digging much deeper when we found that upgrading to 2.0.0 resolved it. Unfortunately I don't have Spark issue numbers to back me up.)


The uneven partitions after repartition are surprising. Contrast with https://github.com/apache/spark/blob/v2.0.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L443. Spark even generates random keys in repartition, so it is not done with a hash that could be biased.

I tried your example and get the exact same results with Spark 1.6.2 and Spark 2.0.0. But not from Scala spark-shell:

scala> val data = sc.parallelize(1 to 3, 3).mapPartitions { it => (1 to it.next * 1000).iterator }data: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:24scala> data.mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeqres1: Seq[Int] = WrappedArray(1000, 2000, 3000)scala> data.repartition(3).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeqres2: Seq[Int] = WrappedArray(1999, 2001, 2000)scala> data.repartition(6).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeqres3: Seq[Int] = WrappedArray(999, 1000, 1000, 1000, 1001, 1000)scala> data.repartition(12).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeqres4: Seq[Int] = WrappedArray(500, 501, 501, 501, 501, 500, 499, 499, 499, 499, 500, 500)

Such beautiful partitions!


(Sorry this is not a full answer. I just wanted to share my findings so far.)