Reputation: 35
I have a streaming Dataset with columns: bag_id, ball_color. I want to find the most popular color for each bag. So, I tried:
dataset.groupBy("bag_id", "color") # 1st aggregation
.agg(count("color").as("color_count"))
.groupBy("bag_id") # 2nd aggregation
.agg(max("color_count"))
But I had an error:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;;
Can I create right query with only one aggregation function?
Upvotes: 2
Views: 934
Reputation: 1375
Yes, in Spark 2.4.4 (latest for now) is NOT support yet Multiple streaming aggregations. But, as a workaround you can use the .foreachBatch()
method:
def foreach_batch_function(df, epoch_id):
df.groupBy("bag_id","color")
.agg(count("color").as("color_count"))
.groupBy("bag_id").agg(max("color_count"))
.show() # .show() is a dummy action
streamingDF.writeStream.foreachBatch(foreach_batch_function).start()
In .foreachBatch()
the df is not a streaming df, so you can do everything you want.
Upvotes: 2
Reputation: 31460
There is an open Jira addressing this issue Spark-26655, as of now we can't run multiple aggregations on the Streaming data.
One workaround would be Performing one aggregation
and saving back to Kafka..etc and again read from kafka to perform another aggregation.
(or)
We can run only one aggregation on the streaming data and saving it to HDFS/Hive/HBase and fetch to perform additional aggregations(this would be seperate job)
Upvotes: 2