Why this LR code run on spark too slowly? Why this LR code run on spark too slowly? hadoop hadoop

Why this LR code run on spark too slowly?


It is really hard to give you an answer for this. Maybe this would be a better match for the code review stackoverflow subsite?

Some things that are immediately obvious:

Your gradient function seems inefficient. When you want to do something for each key/value pair of a map, it is much more efficient to do

for((k,v)<-map) {   ...}

than to do

for(k<-map.keySet) { val value = map.get(k).get;   ... }

Also, for performance critical code like this it might be preferable to change the reduce to accumulating a mutable value. So the rewritten gradient function would be

def gradient(p: DataPoint, w: Broadcast[Vector]) : Vector = {  def h(w: Broadcast[Vector], x: SparserVector): Double = {    val wb = w.value    val features = x.elements    var s = 0.0    for((k,v)<-features)      s += v * wb(k)    1 / (1 + Math.exp(-p.y * s))  }  p.x * (-(1 - p.y *h(w, p.x)))}

Now if you want to increase the performance even more, you will have to change the SparseVector to use an array of indices and an array of values instead of a Map[Int, Double]. The reason for this is that in a Map the keys and values will be boxed as objects with considerable overhead, while an Array[Int] or Array[Double] is just a single compact chunk of memory

(For convenience it might be advisable to define a builder that uses a SortedMap[Int, Double] and converts into two arrays when finished building)

class SparseVector(val indices: Array[Int], val values: Array[Double]) {  require(indices.length == values.length)  def *(scale: Double): Vector = {    var x = new Array[Double](dimNum)    var i = 0    while(i < indices.length) {        x(indices(i)) = scale * values(i)       i += 1    }    Vector(x)  }}

Note that the code examples above are not tested, but I guess you will get the idea.