Reputation: 21
I am getting following error message while applying flatMapGroupsWithState
.
Exception in thread "main" org.apache.spark.sql.AnalysisException: flatMapGroupsWithState in update mode is not supported with aggregation on a streaming DataFrame/Dataset;
Following is what I am trying to do.
agg
doesn't return KeyValueGroupDataSet
, so applying groupByKey
on previous step output to group based on aggFunction
columnflatMapGroupsWithState
.Getting error message for the last step.
Does this error means I cannot apply flatMapGroupsWithState
after applying agg(....)
on dataset?
Upvotes: 2
Views: 3847
Reputation: 74619
Does this error means I cannot apply
flatMapGroupsWithState
after applyingagg(....)
on dataset?
Not really. It says that (highlighting mine)...
flatMapGroupsWithState in update mode is not supported with aggregation on a streaming DataFrame/Dataset
That means that you use the default output mode which is update
, but should rather be complete
or append
as described in the Spark official documentation Output Modes (see "Queries with flatMapGroupsWithState" query type).
From the comment:
Also
flatMapGroupWithState
does support update operation. FromKeyValueGroupedDataset
,if (outputMode != OutputMode.Append && outputMode != OutputMode.Update) { throw new IllegalArgumentException("The output mode of function should be append or update") }
You're right that flatMapGroupWithState
supports append and update output mode, but only when given as part of the input parameters. See the signature of KeyValueGroupedDataset.flatMapGroupWithState
:
flatMapGroupsWithState[S, U](
outputMode: OutputMode, // <-- HERE
timeoutConf: GroupStateTimeout)(
func: (K, Iterator[V], GroupState[S]) ⇒ Iterator[U]): Dataset[U]
In other words, OutputMode
appears twice in a streaming query:
For the streaming query itself (in DataStreamWriter.outputMode
with append
output mode as the default)
In KeyValueGroupedDataset.flatMapGroupWithState
The exception refers to "flatMapGroupsWithState in update mode" (i.e. flatMapGroupsWithState
with the update output mode as an input argument) which is OK.
It is not OK "with aggregation" (i.e. in a streaming query with a streaming aggregation of any kind groupBy
, groupByKey
, cube
, rollup
, etc.)
Upvotes: 3