Aride Chettali
Aride Chettali

Reputation: 181

No Parallelism with Custom Spark Structured Streaming Sink

I am writing a Custom Spark structured streaming sink to write events read from Kafka to Google BQ(Big Query). Below is the code that I have written.

The code is compiling and running successfully. But My Sink is always running in only one executor (always where the driver program runs). I do not understand the issue here.

Here is the implementation of my custom Big Query Sink.

package bq

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode


class DefaultSource extends StreamSinkProvider with DataSourceRegister{

  override def createSink(
                           sqlContext: SQLContext,
                           parameters: Map[String, String],
                           partitionColumns: Seq[String],
                           outputMode: OutputMode): Sink = {
    new BQSink(sqlContext, parameters, partitionColumns, outputMode)
  }

  override def shortName(): String = "bq"
}
class BQSink(sqlContext: SQLContext,
                  parameters: Map[String, String],
                  partitionColumns: Seq[String],
                  outputMode: OutputMode) extends Sink {


  override def addBatch(batchId: Long, data: DataFrame): Unit = {

    val df = data.sparkSession.createDataFrame
                       (data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
   
    df.collect().foreach({ row => {
      //code that writes the rows to Big Query.
    }  
  }

Here is my driver program

 // Reading raw events from Kafka
    val inputDF = sparkSession.readStream
                              .format("kafka")
                              .option("kafka.bootstrap.servers", config.getString("kafkaBrokers"))
                              .option("subscribe", "topic")
                              .option("fetchOffset.numRetries", 5)
                              .option("failOnDataLoss", "false")
                              .option("startingOffsets", "latest")
                              .load()
                              .selectExpr("value")
                              .as[Array[Byte]];

    // Transforming inputDF to OutputDF
    val outputDF = inputDF.map(event => transform(event))
                          

    // Writing outputDF events to BQ
    val query = outputDF.writeStream
                        .format("bq")
                        .option("checkpointLocation",config.getString("checkpointLocation"))
                        .outputMode(OutputMode.Append())
                        .start()

    //Start Streaming
    query.awaitTermination()

Even though my topic has multiple partitions, My Custom sink is running in a single executor only

Upvotes: 0

Views: 514

Answers (1)

Michael Heil
Michael Heil

Reputation: 18525

Using df.collect will collect all data from the executors to your driver. Therefore you are seeing only the driver sending data to your sink.

You need to do df.foreachPartition and use a BQ producer that is accessible on your executors. You may run into "Task not serializable" issue but you can have a look here to understand how this can be solved in Scala Spark.

Upvotes: 3

Related Questions