Reputation: 9804
I have two dataframes, namely
df1 = sc.parallelize([
['u1', 'type1', ['a', 'b']],
['u2', 'type1', ['a', 'c', 'd']],
['u1', 'type2', ['d']]
]).toDF(('person', 'type', 'keywords'))
df2 = sc.parallelize([
['a', 2],
['b', 1],
['c', 0],
['d', 1],
['e', 3],
]).toDF(('keyword', 'score'))
I need to calculate, for each person
and per type
, the average score
of its keywords
. So, this average would be 1.5 for person
'u1' on type
'type1' as it has keywords 'a' and 'b' which contribute with 2+1/2=1.5
I have tried an approach encompassing a join:
df = df1.join(df2) \
.select('person', 'type', 'keywords', 'keyword', 'score') \
.groupBy('person', 'type') \
.agg(avg('score'))
but the problem is, it is computing the average on each possible keyword, not solely on those which said user and type have, so that I obtain 1.4 everywhere, which is the sum of all scores for all keywords divided by their number.
I need to sum up only scores for those keywords in the list keywords
per user and type.
Upvotes: 2
Views: 9641
Reputation: 330173
You'll have to explode
the keywords
first:
from pyspark.sql.functions import explode, avg, col
(df1.select("person", "type", explode("keywords").alias("keyword"))
.join(df2, "keyword")
.groupBy("person", "type")
.agg(avg("score")))
While it could be possible to do something like this
from pyspark.sql.functions import expr
(df1.join(df2, expr("array_contains(keywords, keyword)"))
.groupBy("person", "type")
.agg(avg("score")))
to achieve the same result it is something you want to avoid in practice to avoid expansion into a Cartesian product.
Upvotes: 5