Spark Structured Streaming ForeachWriter and database performance Spark Structured Streaming ForeachWriter and database performance database database

Spark Structured Streaming ForeachWriter and database performance


Opening and closing of the underlying sink depends on your implementation of ForeachWriter.

The relevant class which invokes ForeachWriter is the ForeachSink, and this is the code which calls your writer:

data.queryExecution.toRdd.foreachPartition { iter =>  if (writer.open(TaskContext.getPartitionId(), batchId)) {    try {      while (iter.hasNext) {        writer.process(encoder.fromRow(iter.next()))      }    } catch {      case e: Throwable =>        writer.close(e)        throw e    }    writer.close(null)  } else {    writer.close(null)  }}

Opening and closing of the writer is attempted foreach batch that is generated from your source. If you want open and close to be literally open and close the sink driver each time, you'll need to do so via your implementation.

If you want more control over how the data is handled, you can implement the Sink trait which gives a batch id and the underlying DataFrame:

trait Sink {  def addBatch(batchId: Long, data: DataFrame): Unit}