Seastar
Seastar

Reputation: 386

Aplly UDF to subsets of pyspark dataframe

I have a Dataframe like the following, containing two sorted lists of strings for each possible combination of key1 and key2.

df=
+----+------------+-------+-------+
|key1|        key2| value1| value2|
+----+------------+-------+-------+
| 'a'|  '10,0,10' |  'abc'|  'abc'|
| 'a'|  '10,0,10' |  'aab'|  'aab'|
| 'a'|  '10,0,10' |  'acb'|  'acb'|
| 'a'|  '10,0,20' |  'abc'|  'abc'|
| 'a'|  '10,0,20' |  'acb'|  'aab'|
| 'a'|  '10,0,20' |  'aab'|  'acb'|
| 'b'|  '10,0,10' |  'bcd'|  'bcd'|
| 'b'|  '10,0,10' |  'bbc'|  'bdc'|
| 'b'|  '10,0,10' |  'bdc'|  'bbc'|
|...

Now I want to apply a funcion like this:

for c in [x for x in df.select('key1').distinct().collect()]:
    for s in [x for x in df.select('key2').distinct().collect()]:
       jaccard_sim([x for x in df.select('value1').filter(df['key1']==c).filter(df['key2']==s).collect()], 
              [x for x in df.select('value2').filter(df['key1']==c).filter(df['key2']==s).collect()])

But since I want to use sparks ability to parallelize the execution I think the above implementation might be kind of stupid;) Has anyone have an idea how to solve it?

The background is that I have a sorted list (value1) per key1 and key2 combination which I want to compare to a benchmark list per key 1 (value2) and calculate the jaccard similarity between the lists. If anyone has in general a (better) suggestion on how to do this with pyspark I would really apprechicate it! Thanks:)

Upvotes: 2

Views: 550

Answers (1)

mayank agrawal
mayank agrawal

Reputation: 2545

You can approach like this,

import pyspark.sql.functions as F

def convert_form(x):
    print type(x)
    val1 = [y['value1'] for y in x]
    val2 = [y['value2'] for y in x]
    return [val1, val2]

jaccard_udf = F.udf(lambda x: jaccard_sim(*convert_form(x)) ) #assuming you have jaccard_sim function

df = df.select('key1', 'key2', F.struct('value1','value2').alias('values'))\
       .groupby('key1', 'key2').agg(F.collect_list('values').alias('collected_col'))\
       .withColumn('jaccard_distance', jaccard_udf(F.col('collected_col')) )

df.show()

Upvotes: 1

Related Questions