lu_ferra
lu_ferra

Reputation: 83

Applying aggregating function with spark streaming scala

I need to apply an aggregation function on a stream of data with apache spark streaming (NO APACHE SPARK STREAMING SQL).

In my case I have a kafka producer tha send messages in JSON format. The format is {'a': String, 'b': String, 'c': Integer, 'd': Double}

I need to aggregate on attributes 'a' and 'b' every 5 Seconds and I have to apply an aggregation function on the other 2 attributes (e.g. Average, or Sum, or Min, or Max).

How can I do that?

Thanks

Upvotes: 0

Views: 519

Answers (1)

maasg
maasg

Reputation: 37435

To get you started, you could approach aggregation like this:

import sparkSession.implicits._

jsonDstream.foreachRDD{jsonRDD => 
  val df = sparkSession.read.json(jsonRDD)
  val aggr = df.groupBy($"a", $"b").agg(avg($"c"))
  ... do something with aggr ...
}

Upvotes: 1

Related Questions