Reputation: 851
I am migrating from Spark Streaming to Structured Streaming and I am facing issue with the following code:
def processDataSet(inputDataset: Dataset[MyMessage], foobar: FooBar) = {
inputDataset.foreachPartition { partitionIterator =>
val filteredIterator = partitionIterator.filter(foobar.filter)
...
...
}
}
val streamingQuery = inputDataset
.writeStream
.trigger(ProcessingTime("5 seconds"))
.outputMode("append")
.format("console")
.start
It errors out with the following AnalysisException
:
Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
Is foreachPartition
not supported on streaming queries? Is writeStream.foreach
the only way to implement foreachPartition
in this case?
I'd like to avoid sending each event as it comes, but rather accumulate all rows, form one giant POST request body and send it to a HTTP endpoint. So if 1000 events in a batch and 5 partitions, generate 5 requests in parallel with 200 events in each request body.
Upvotes: 2
Views: 2732
Reputation: 74669
TL;DR Yes. foreachPartition
action is not supported and you should use ForeachWriter instead.
Quoting the scaladoc of foreachPartition:
foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit Applies a function
f
to each partition of this Dataset.
As you may have found out by now, foreach
is an action and therefore triggers Spark execution.
Since you work with streaming Datasets, triggering their execution is not allowed using "traditional" methods like foreach
.
Quoting Structured Streaming's Unsupported Operations:
In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).
Among the streaming alternatives is foreach
operator (aka sink). That's how to do foreachPartition
in Structured Streaming.
Quoting Using Foreach:
The foreach operation allows arbitrary operations to be computed on the output data.
To use this, you will have to implement the interface
ForeachWriter
, which has methods that get called whenever there is a sequence of rows generated as output after a trigger.
I'd like to avoid sending each event as it comes, but rather accumulate all rows, form one giant POST request body and send it to a HTTP endpoint. So if 1000 events in a batch and 5 partitions, generate 5 requests in parallel with 200 events in each request body.
That seems like an aggregation before writing the Dataset to a sink, doesn't it? Use groupBy
operator and collect_list
function to group rows so when you writeStream
you'll have as many groups as you want.
I'd rather avoid dealing with this low-level feature of RDD called partitions as a way to optimize writes unless there is no other way.
Upvotes: 2