How can I update a broadcast variable in spark streaming? How can I update a broadcast variable in spark streaming? java java

How can I update a broadcast variable in spark streaming?


Extending the answer By @Rohan Aletty. Here is a sample code of a BroadcastWrapper that refresh broadcast variable based on some ttl

public class BroadcastWrapper {    private Broadcast<ReferenceData> broadcastVar;    private Date lastUpdatedAt = Calendar.getInstance().getTime();    private static BroadcastWrapper obj = new BroadcastWrapper();    private BroadcastWrapper(){}    public static BroadcastWrapper getInstance() {        return obj;    }    public JavaSparkContext getSparkContext(SparkContext sc) {       JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);       return jsc;    }    public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){        Date currentDate = Calendar.getInstance().getTime();        long diff = currentDate.getTime()-lastUpdatedAt.getTime();        if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms            if (var != null)               var.unpersist();            lastUpdatedAt = new Date(System.currentTimeMillis());            //Your logic to refresh            ReferenceData data = getRefData();            var = getSparkContext(sparkContext).broadcast(data);       }       return var;   }}

Your code would look like :

public void startSparkEngine() {    final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> {        Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());        stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField()));    });    filteredStream.foreachRDD(rdd -> {        rdd.foreach(obj -> {        // Final processing of filtered objects        });        return null;    });}

This worked for me on multi-cluster as well.Hope this helps


Recently faced issue with this. Thought it might be helpful for scala users..

Scala way of doing BroadCastWrapper is like below example.

import java.io.{ ObjectInputStream, ObjectOutputStream }import org.apache.spark.broadcast.Broadcastimport org.apache.spark.streaming.StreamingContextimport scala.reflect.ClassTag/* wrapper lets us update brodcast variables within DStreams' foreachRDD without running into serialization issues */case class BroadcastWrapper[T: ClassTag]( @transient private val ssc: StreamingContext,  @transient private val _v: T) {  @transient private var v = ssc.sparkContext.broadcast(_v)  def update(newValue: T, blocking: Boolean = false): Unit = {    v.unpersist(blocking)    v = ssc.sparkContext.broadcast(newValue)  }  def value: T = v.value  private def writeObject(out: ObjectOutputStream): Unit = {    out.writeObject(v)  }  private def readObject(in: ObjectInputStream): Unit = {    v = in.readObject().asInstanceOf[Broadcast[T]]  }}

Every time you need to call update function to get new broadcast variable.


Almost every one that is dealing with streaming applications need a way to weave (filter, lookup etc) reference data (from DB, files etc) into the streaming data. We have a partial solution of the whole two parts

  1. Lookup reference data to be used in streaming operations

    • create CacheLookup object with desired cache TTL
    • wrap that in Broadcast
    • use CacheLookup as part of streaming logic

For most part this works fine, except for the following

  1. Update the reference data

    There is no definitive way achieve this despite the suggestions in these threads, i.e: kill the previous broadcast variable and create new one. Multiple unknowns like what to be expected between these operations.

This is such a common need, it would have helped if there is a way to send info to broadcast variable informing update. With that, it is possible to invalidate the local caches in "CacheLookup"

The second portion of the problem is still not solved. I would be interested if there is any viable approach to this