Reputation: 83
I need to apply an aggregation function on a stream of data with apache spark streaming (NO APACHE SPARK STREAMING SQL).
In my case I have a kafka producer tha send messages in JSON format.
The format is {'a': String, 'b': String, 'c': Integer, 'd': Double}
I need to aggregate on attributes 'a'
and 'b'
every 5 Seconds and I have to apply an aggregation function on the other 2 attributes (e.g. Average, or Sum, or Min, or Max).
How can I do that?
Thanks
Upvotes: 0
Views: 519
Reputation: 37435
To get you started, you could approach aggregation like this:
import sparkSession.implicits._
jsonDstream.foreachRDD{jsonRDD =>
val df = sparkSession.read.json(jsonRDD)
val aggr = df.groupBy($"a", $"b").agg(avg($"c"))
... do something with aggr ...
}
Upvotes: 1