Reputation: 842
I would like to have the structured streaming read from JSON file and process the data and write the data to Kafka and Parquet sinks. I see below sample code for same
datasetOfString.writeStream.foreach(new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
// open connection
}
def process(record: String) = {
// write string to connection
}
def close(errorOrNull: Throwable): Unit = {
// close the connection
}
})
But how can I pass multiple writers here? Is it like below?
datasetOfString.writeStream.foreach(kafkaWriter).start()
datasetOfString.writeStream.foreach(parquetWriter).start()
If I do like this then what would be purpose of using foreach writer? is it just for more control while writing?
Upvotes: 1
Views: 2145
Reputation: 668
foreach() operation is an action.
It do not return any value.
It executes input function on each element of an RDD
Quoting spark official documentation of Spark Structured Streaming
Streaming queries are currently bound to a single sink, so multiplexing the write with existing sinks via the same streaming query isn't possible. In your current functionality implementation, you will effectively process the data twice. Personally, I will not recommend this still, you can process data per micro batch and then use foreach or a custom sink which writes to Kafka topics and parquet source
Upvotes: 1