Frody
Frody

Reputation: 163

For each key get only a number of values

Let's say I have a Product object with the following attributes:

Name, Code, ClientId

I want to write a job which takes only 3 products for each ClientId.

Can I achieve this?

Example: I have the following products:

P1,1,1
P2,2,1
P3,3,1
P4,4,1
P5,5,1
P6,6,2
P7,7,2
P8,8,2
P9,9,2

So i want to get 3 products for each ClientId and my desired result is:

P1,1,1
P2,2,1
P3,3,1
P6,6,2
P7,7,2
P8,8,2

Upvotes: 0

Views: 50

Answers (2)

Leo C
Leo C

Reputation: 22439

Let's say your Product is modeled as a case class and the product list is a RDD, you can group by ClientId to create a Map and take 3 per group from the grouped Map values:

case class Prod(Name: String, Code: Long, ClientId: Long)

val rdd = sc.parallelize(Seq(
  Prod("P1", 1, 1), Prod("P2", 2, 1), Prod("P3", 3, 1), Prod("P4", 4, 1), Prod("P5", 5, 1),
  Prod("P6", 6, 2), Prod("P7", 7, 2), Prod("P8", 8, 2), Prod("P9", 9, 2)
))

rdd.groupBy(_.ClientId).flatMap(_._2.take(3)).collect
// res1: Array[Prod] = Array(
//   Prod(P1,1,1), Prod(P2,2,1), Prod(P3,3,1), Prod(P6,6,2), Prod(P7,7,2), Prod(P8,8,2)
// )

Upvotes: 1

Praveen L
Praveen L

Reputation: 987

You can try like this.

val rd1 = sc.textFile(path2) // your sample data created as RDD
        .map(x => x.split(",")).map(x => (x(0),x(1),x(2)))
        .groupBy(x=> x._3) // grouping based on **ClientId** 

rd1.foreach(x => println((x._1,x._2.take(3)))) // here you can give some number X in take method to get X records of that key

Output:

(1,List((P1,1,1), (P2,2,1), (P3,3,1)))
(2,List((P6,6,2), (P7,7,2), (P8,8,2)))

Upvotes: 0

Related Questions