Kleyson Rios
Kleyson Rios

Reputation: 2877

Spark Structured Streaming - groupByKey individually by partition

My Kafka producers are distributing the messages into topic partitions based on a given key.

So, in the Spark side I already have the messages that need be processed together in the same partition.

Now, I need to do a groupByKey to have in each partition the values aggregated in a list by the keys, but not need merge the partitions because there is not chance to have a given key in more than one partition.

How could I do this groupByKey only at partition level ?

|topic-partition1| ---> |spark-partition1| -- groupByKey --> |spark-partition1.1| -- mapGroupsWithState --> ...
|topic-partition2| ---> |spark-partition2| -- groupByKey --> |spark-partition2.1| -- mapGroupsWithState --> ...
|topic-partition3| ---> |spark-partition3| -- groupByKey --> |spark-partition3.1| -- mapGroupsWithState --> ...

Upvotes: 1

Views: 929

Answers (1)

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149646

If you know all events are going to come in a given partition, you can use DataSet.mapPartitions on the dataset:

val dataSet: DataSet[(String, String)] = ???
dataSet.mapPartitions { iter =>
  val res: Map[String, List[(String, String)] =
    iter.toList.groupBy { case (key, _) => key }

  // Do additional processing on res, which is now grouped by each key
  // present in the partition.
}

Otherwise, if you need mapGroupsWithState, there is on way to avoid using groupByKey as you need a KeyValueGroupedDataset[K, V].

If you're concerned with performance, don't be unless you've found this a bottleneck while profiling.

Upvotes: 1

Related Questions