Reputation: 181
I am writing a Custom Spark structured streaming sink to write events read from Kafka to Google BQ(Big Query). Below is the code that I have written.
The code is compiling and running successfully. But My Sink is always running in only one executor (always where the driver program runs). I do not understand the issue here.
Here is the implementation of my custom Big Query Sink.
package bq
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
class DefaultSource extends StreamSinkProvider with DataSourceRegister{
override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new BQSink(sqlContext, parameters, partitionColumns, outputMode)
}
override def shortName(): String = "bq"
}
class BQSink(sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode) extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
val df = data.sparkSession.createDataFrame
(data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
df.collect().foreach({ row => {
//code that writes the rows to Big Query.
}
}
Here is my driver program
// Reading raw events from Kafka
val inputDF = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", config.getString("kafkaBrokers"))
.option("subscribe", "topic")
.option("fetchOffset.numRetries", 5)
.option("failOnDataLoss", "false")
.option("startingOffsets", "latest")
.load()
.selectExpr("value")
.as[Array[Byte]];
// Transforming inputDF to OutputDF
val outputDF = inputDF.map(event => transform(event))
// Writing outputDF events to BQ
val query = outputDF.writeStream
.format("bq")
.option("checkpointLocation",config.getString("checkpointLocation"))
.outputMode(OutputMode.Append())
.start()
//Start Streaming
query.awaitTermination()
Even though my topic has multiple partitions, My Custom sink is running in a single executor only
Upvotes: 0
Views: 514
Reputation: 18525
Using df.collect
will collect all data from the executors to your driver. Therefore you are seeing only the driver sending data to your sink.
You need to do df.foreachPartition
and use a BQ producer that is accessible on your executors. You may run into "Task not serializable" issue but you can have a look here to understand how this can be solved in Scala Spark.
Upvotes: 3