J.S
J.S

Reputation: 71

Compute the distances between the rows of two dataframes in pyspark

I have two dataframes one for user profils and one for item profils.

df_client = sqlContext.createDataFrame([('c1',0,1,3),('c2',1,0,3)], ['client_id','col1','col2','col3'])
df_item = sqlContext.createDataFrame([('it1',0,1,3),('it2',1,0,3)], ['item_id','col1','col2','col3'])

And I would like to compute the cosine similarity between users and items, and obtain a final dataframe like this:

df_final.show()
      client_id item_id  distance
0        c1     it1       0
1        c1     it2       0.1
2        c2     it1       0.1
3        c2     it2       0

But in reality there are 11 millions users, 150 items and 150 columns. So I developed three solutions but each had a step that took some time.

An example of solution:

list_item= df_item.rdd.collect()

def cosine_distance(v):

    list_item_distance = []

    for row in list_item:
        distance =  round(float(cosine(np.array(v[1:]),np.array(row[1:]))),4)
        list_item_distance.append((v["client_id"],row["item_id"],distance))

    return list_item_distance

rdd_final = df_client.rdd.map(lambda row: cosine_distance(row))

list_final = rdd_final.reduce(lambda x,y: x+y)

But the reduce is to long

Does the problem be to get the result like dataframe? Someone has a solution to realise the job quickly please?

Upvotes: 5

Views: 1418

Answers (1)

Assaf Mendelson
Assaf Mendelson

Reputation: 13001

The first thing I would do is convert the columns to an array. Since there is no direct way of doing so, you can do df.create_map(colnames) to turn the columns to a map, then choose an order and create a udf to make it into an array.

Next I would mark the items df as broadcast (150 lines with ~150 columns is not too big) and do a join. This would be the longest portion probably.

Then I would take the two arrays and calculate the cosine distance between them

Upvotes: 1

Related Questions