martina.physics
martina.physics

Reputation: 9804

Pyspark join and operation on values within a list in column

I have two dataframes, namely

df1 = sc.parallelize([
  ['u1', 'type1', ['a', 'b']],
  ['u2', 'type1', ['a', 'c', 'd']],
  ['u1', 'type2', ['d']]
]).toDF(('person', 'type', 'keywords'))

df2 = sc.parallelize([
  ['a', 2],
  ['b', 1],
  ['c', 0],
  ['d', 1],
  ['e', 3],
]).toDF(('keyword', 'score'))

I need to calculate, for each person and per type, the average score of its keywords. So, this average would be 1.5 for person 'u1' on type 'type1' as it has keywords 'a' and 'b' which contribute with 2+1/2=1.5

I have tried an approach encompassing a join:

df = df1.join(df2) \
  .select('person', 'type', 'keywords', 'keyword', 'score') \
  .groupBy('person', 'type') \
  .agg(avg('score'))

but the problem is, it is computing the average on each possible keyword, not solely on those which said user and type have, so that I obtain 1.4 everywhere, which is the sum of all scores for all keywords divided by their number. I need to sum up only scores for those keywords in the list keywords per user and type.

Upvotes: 2

Views: 9641

Answers (1)

zero323
zero323

Reputation: 330173

You'll have to explode the keywords first:

from pyspark.sql.functions import explode, avg, col

(df1.select("person", "type", explode("keywords").alias("keyword"))
    .join(df2, "keyword")
    .groupBy("person", "type")
    .agg(avg("score")))

While it could be possible to do something like this

from pyspark.sql.functions import expr

(df1.join(df2, expr("array_contains(keywords, keyword)"))
    .groupBy("person", "type")
    .agg(avg("score")))

to achieve the same result it is something you want to avoid in practice to avoid expansion into a Cartesian product.

Upvotes: 5

Related Questions