Reputation: 18810
I have following rdd
(000740C7AD5274,8884165739991289,0)
(000740C7AD5274,5247914560952402,1)
(000740C7AD5274,6366183814312296,0)
(000740C7AD5274,8416039242203850,1)
(000740C7AD5274,8767784019249585,0)
(000740C7AD5274,8875366436847528,0)
(000740C7AD5274,6878583261589229,0)
(000740C7AD5274,7480419089929113,1)
(000740C7AD5274,7480419089929113,0)
(000740C7AD5274,8848143710281107,0)
(000740C7AD5274,7617664942496492,1)
(000740C7AD5274,4905980213247549,0)
(000740C7AD5274,6806506896473929,1)
this is represent userId, productId, BuyorNot information. I want to generate set of stats from this data such as. Number of items bought by each user and Number of users per product.
I started as follows:
val userProduct = userProductRDD.groupBy(x => (x._1, x._2)).flatMap(k => (k._1, if (k._2._3) != 0) 1 else 0))
But this doesn't give (userId, distinct_bought_count)
Some guidance would be great to move forward.
Upvotes: 0
Views: 641
Reputation: 53829
Simply:
val userProducts: RDD[(String, Long)] =
userProductRDD.filter(_._3 == 1) // buys only
.map { case (u, p, _, _) => (u, p) } // drop buys and price
.distinct // keep distinct (user, product)
.map { case (u, _) => (u, 1L) } // word count problem
.reduceByKey(_ + _)
// similarly
val productUsers: RDD[(Long, Long)] =
userProductRDD.filter(_._3 == 1)
.map { case (u, p, _, _) => (p, u) }
.distinct
.map { case (p, _) => (p, 1L) }
.reduceByKey(_ + _)
Upvotes: 1