add-semi-colons
add-semi-colons

Reputation: 18810

spark-scala Efficient way to count at each group level

I have following rdd

(000740C7AD5274,8884165739991289,0)
(000740C7AD5274,5247914560952402,1)
(000740C7AD5274,6366183814312296,0)
(000740C7AD5274,8416039242203850,1)
(000740C7AD5274,8767784019249585,0)
(000740C7AD5274,8875366436847528,0)
(000740C7AD5274,6878583261589229,0)
(000740C7AD5274,7480419089929113,1)
(000740C7AD5274,7480419089929113,0)
(000740C7AD5274,8848143710281107,0)
(000740C7AD5274,7617664942496492,1)
(000740C7AD5274,4905980213247549,0)
(000740C7AD5274,6806506896473929,1)

this is represent userId, productId, BuyorNot information. I want to generate set of stats from this data such as. Number of items bought by each user and Number of users per product.

I started as follows:

val userProduct = userProductRDD.groupBy(x => (x._1, x._2)).flatMap(k => (k._1, if (k._2._3) != 0) 1 else 0))

But this doesn't give (userId, distinct_bought_count) Some guidance would be great to move forward.

Upvotes: 0

Views: 641

Answers (1)

Jean Logeart
Jean Logeart

Reputation: 53829

Simply:

val userProducts: RDD[(String, Long)] = 
  userProductRDD.filter(_._3 == 1)                 // buys only
                .map { case (u, p, _, _) => (u, p) } // drop buys and price
                .distinct                          // keep distinct (user, product)
                .map { case (u, _) => (u, 1L) }    // word count problem
                .reduceByKey(_ + _)

// similarly
val productUsers: RDD[(Long, Long)] =
  userProductRDD.filter(_._3 == 1)
                .map { case (u, p, _, _) => (p, u) }
                .distinct
                .map { case (p, _) => (p, 1L) }
                .reduceByKey(_ + _)

Upvotes: 1

Related Questions