Reputation: 163
Let's say I have a Product object with the following attributes:
Name, Code, ClientId
I want to write a job which takes only 3 products for each ClientId
.
Can I achieve this?
Example: I have the following products:
P1,1,1
P2,2,1
P3,3,1
P4,4,1
P5,5,1
P6,6,2
P7,7,2
P8,8,2
P9,9,2
So i want to get 3 products for each ClientId and my desired result is:
P1,1,1
P2,2,1
P3,3,1
P6,6,2
P7,7,2
P8,8,2
Upvotes: 0
Views: 50
Reputation: 22439
Let's say your Product
is modeled as a case class and the product list is a RDD
, you can group by ClientId
to create a Map and take 3 per group from the grouped Map values:
case class Prod(Name: String, Code: Long, ClientId: Long)
val rdd = sc.parallelize(Seq(
Prod("P1", 1, 1), Prod("P2", 2, 1), Prod("P3", 3, 1), Prod("P4", 4, 1), Prod("P5", 5, 1),
Prod("P6", 6, 2), Prod("P7", 7, 2), Prod("P8", 8, 2), Prod("P9", 9, 2)
))
rdd.groupBy(_.ClientId).flatMap(_._2.take(3)).collect
// res1: Array[Prod] = Array(
// Prod(P1,1,1), Prod(P2,2,1), Prod(P3,3,1), Prod(P6,6,2), Prod(P7,7,2), Prod(P8,8,2)
// )
Upvotes: 1
Reputation: 987
You can try like this.
val rd1 = sc.textFile(path2) // your sample data created as RDD
.map(x => x.split(",")).map(x => (x(0),x(1),x(2)))
.groupBy(x=> x._3) // grouping based on **ClientId**
rd1.foreach(x => println((x._1,x._2.take(3)))) // here you can give some number X in take method to get X records of that key
Output:
(1,List((P1,1,1), (P2,2,1), (P3,3,1)))
(2,List((P6,6,2), (P7,7,2), (P8,8,2)))
Upvotes: 0