user9108941
user9108941

Reputation: 21

How to do stateful aggregation using flatMapGroupsWithState?

I am getting following error message while applying flatMapGroupsWithState.

Exception in thread "main" org.apache.spark.sql.AnalysisException: flatMapGroupsWithState in update mode is not supported with aggregation on a streaming DataFrame/Dataset;

Following is what I am trying to do.

Getting error message for the last step.

Does this error means I cannot apply flatMapGroupsWithState after applying agg(....) on dataset?

Upvotes: 2

Views: 3847

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74619

Does this error means I cannot apply flatMapGroupsWithState after applying agg(....) on dataset?

Not really. It says that (highlighting mine)...

flatMapGroupsWithState in update mode is not supported with aggregation on a streaming DataFrame/Dataset

That means that you use the default output mode which is update, but should rather be complete or append as described in the Spark official documentation Output Modes (see "Queries with flatMapGroupsWithState" query type).

enter image description here


From the comment:

Also flatMapGroupWithState does support update operation. From KeyValueGroupedDataset, if (outputMode != OutputMode.Append && outputMode != OutputMode.Update) { throw new IllegalArgumentException("The output mode of function should be append or update") }

You're right that flatMapGroupWithState supports append and update output mode, but only when given as part of the input parameters. See the signature of KeyValueGroupedDataset.flatMapGroupWithState:

flatMapGroupsWithState[S, U](
  outputMode: OutputMode, // <-- HERE
  timeoutConf: GroupStateTimeout)(
  func: (K, Iterator[V], GroupState[S]) ⇒ Iterator[U]): Dataset[U]

In other words, OutputMode appears twice in a streaming query:

  1. For the streaming query itself (in DataStreamWriter.outputMode with append output mode as the default)

  2. In KeyValueGroupedDataset.flatMapGroupWithState

The exception refers to "flatMapGroupsWithState in update mode" (i.e. flatMapGroupsWithState with the update output mode as an input argument) which is OK.

It is not OK "with aggregation" (i.e. in a streaming query with a streaming aggregation of any kind groupBy, groupByKey, cube, rollup, etc.)

Upvotes: 3

Related Questions