K P
K P

Reputation: 851

Why does foreachPartition error out for streaming datasets?

I am migrating from Spark Streaming to Structured Streaming and I am facing issue with the following code:

def processDataSet(inputDataset: Dataset[MyMessage], foobar: FooBar) = {
    inputDataset.foreachPartition { partitionIterator =>
      val filteredIterator = partitionIterator.filter(foobar.filter)
      ...
      ...
    }
}       
val streamingQuery = inputDataset
  .writeStream
  .trigger(ProcessingTime("5 seconds"))
  .outputMode("append")
  .format("console")
  .start

It errors out with the following AnalysisException:

Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

Is foreachPartition not supported on streaming queries? Is writeStream.foreach the only way to implement foreachPartition in this case?

I'd like to avoid sending each event as it comes, but rather accumulate all rows, form one giant POST request body and send it to a HTTP endpoint. So if 1000 events in a batch and 5 partitions, generate 5 requests in parallel with 200 events in each request body.

Upvotes: 2

Views: 2732

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74669

TL;DR Yes. foreachPartition action is not supported and you should use ForeachWriter instead.

Quoting the scaladoc of foreachPartition:

foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit Applies a function f to each partition of this Dataset.

As you may have found out by now, foreach is an action and therefore triggers Spark execution.

Since you work with streaming Datasets, triggering their execution is not allowed using "traditional" methods like foreach.

Quoting Structured Streaming's Unsupported Operations:

In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).

Among the streaming alternatives is foreach operator (aka sink). That's how to do foreachPartition in Structured Streaming.

Quoting Using Foreach:

The foreach operation allows arbitrary operations to be computed on the output data.

To use this, you will have to implement the interface ForeachWriter, which has methods that get called whenever there is a sequence of rows generated as output after a trigger.


I'd like to avoid sending each event as it comes, but rather accumulate all rows, form one giant POST request body and send it to a HTTP endpoint. So if 1000 events in a batch and 5 partitions, generate 5 requests in parallel with 200 events in each request body.

That seems like an aggregation before writing the Dataset to a sink, doesn't it? Use groupBy operator and collect_list function to group rows so when you writeStream you'll have as many groups as you want.

I'd rather avoid dealing with this low-level feature of RDD called partitions as a way to optimize writes unless there is no other way.

Upvotes: 2

Related Questions