Reputation: 2877
My Kafka producers are distributing the messages into topic partitions based on a given key.
So, in the Spark side I already have the messages that need be processed together in the same partition.
Now, I need to do a groupByKey to have in each partition the values aggregated in a list by the keys, but not need merge the partitions because there is not chance to have a given key in more than one partition.
How could I do this groupByKey only at partition level ?
|topic-partition1| ---> |spark-partition1| -- groupByKey --> |spark-partition1.1| -- mapGroupsWithState --> ...
|topic-partition2| ---> |spark-partition2| -- groupByKey --> |spark-partition2.1| -- mapGroupsWithState --> ...
|topic-partition3| ---> |spark-partition3| -- groupByKey --> |spark-partition3.1| -- mapGroupsWithState --> ...
Upvotes: 1
Views: 929
Reputation: 149646
If you know all events are going to come in a given partition, you can use DataSet.mapPartitions
on the dataset:
val dataSet: DataSet[(String, String)] = ???
dataSet.mapPartitions { iter =>
val res: Map[String, List[(String, String)] =
iter.toList.groupBy { case (key, _) => key }
// Do additional processing on res, which is now grouped by each key
// present in the partition.
}
Otherwise, if you need mapGroupsWithState
, there is on way to avoid using groupByKey
as you need a KeyValueGroupedDataset[K, V]
.
If you're concerned with performance, don't be unless you've found this a bottleneck while profiling.
Upvotes: 1