Maksym Ivanov
Maksym Ivanov

Reputation: 35

Query for streaming dataset in Spark

I have a streaming Dataset with columns: bag_id, ball_color. I want to find the most popular color for each bag. So, I tried:

dataset.groupBy("bag_id", "color") # 1st aggregation
       .agg(count("color").as("color_count"))
       .groupBy("bag_id") # 2nd aggregation
       .agg(max("color_count"))

But I had an error:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;;

Can I create right query with only one aggregation function?

Upvotes: 2

Views: 934

Answers (2)

ggeop
ggeop

Reputation: 1375

Yes, in Spark 2.4.4 (latest for now) is NOT support yet Multiple streaming aggregations. But, as a workaround you can use the .foreachBatch() method:

def foreach_batch_function(df, epoch_id):
  df.groupBy("bag_id","color")
  .agg(count("color").as("color_count"))
  .groupBy("bag_id").agg(max("color_count"))
  .show() # .show() is a dummy action

streamingDF.writeStream.foreachBatch(foreach_batch_function).start()  

In .foreachBatch() the df is not a streaming df, so you can do everything you want.

Upvotes: 2

notNull
notNull

Reputation: 31460

There is an open Jira addressing this issue Spark-26655, as of now we can't run multiple aggregations on the Streaming data.

One workaround would be Performing one aggregation and saving back to Kafka..etc and again read from kafka to perform another aggregation.

(or)

We can run only one aggregation on the streaming data and saving it to HDFS/Hive/HBase and fetch to perform additional aggregations(this would be seperate job)

Upvotes: 2

Related Questions