Reputation: 17676
I have a dataframe similarly to:
| u1| foo| 1|
| u1| foo| 2|
| u1| bar| 10|
| u2| foo| 10|
| u2| foo| 2|
| u2| bar| 10|
And want to get a result of:
| u1| bar| 10| 1|
| u1| foo| 3| 2|
| u2| foo| 12| 1|
| u2| bar| 10| 2|
Currently, there is code similarly to:
val df = Seq(("u1", "foo", 1), ("u1", "foo", 2), ("u1", "bar", 10), ("u2", "foo", 10), ("u2", "foo", 2), ("u2", "bar", 10)).toDF("key", "thing", "value")
// calculate sums per key and thing
val aggregated = df.groupBy("key", "thing").agg(sum("value").alias("sum_value"))
// get topk items per key
val k = lit(10)
val topk = aggregated.withColumn("rank", rank over Window.partitionBy("key").orderBy(desc("sum_value"))).filter('rank < k)
However, this code is very inefficient. A window function generates a total order of items and causes a gigantic shuffle.
How can I calculate top-k items more efficiently? Maybe using approximate functions i.e. sketches similarly to or
Upvotes: 3
Views: 3940
Reputation: 1777
This is a classical algorithm of recommender systems.
case class Rating(thing: String, value: Int) extends Ordered[Rating] {
def compare(that: Rating): Int =
case class Recommendation(key: Int, ratings: Seq[Rating]) {
def keep(n: Int) = this.copy(ratings = ratings.sorted.take(n))
val TOPK = 10
.agg(collect_list(struct('thing, 'value)) as "ratings")
You can also check the source code at:
, several solutions here for Spark or ScioTopByKeyAggregator.scala
, considered the best practice when using their recommendation algorithm, it looks like their examples still uses RDD
though.import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
sc.parallelize(Array(("u1", ("foo", 1)), ("u1", ("foo", 2)), ("u1", ("bar", 10)), ("u2", ("foo", 10)),
("u2", ("foo", 2)), ("u2", ("bar", 10))))
Upvotes: 5
Reputation: 17676
RDD`s to the rescue[(String, String, Long)].rdd.groupBy(_._1).map{ case (thing, it) => (thing,> (e._2, e._3)).toList.sortBy(sorter => sorter._2).take(1))}
| _1| _2|
| u1| [[foo,3]]|
| u2|[[bar,10]]|
This can most likely be improved using the suggestion from the comment. I.e. when not starting out from aggregated
, but rather df
. This could look similar to:[(String, String, Long)].rdd.groupBy(_._1).map{case (thing, it) => {
val aggregatedInner = it.groupBy(e=> (e._2)).mapValues(events=> => value._3).sum)
val topk = aggregatedInner.toArray.sortBy(sorter=> sorter._2).take(1)
(thing, topk)
Upvotes: 0