Reputation: 27
I have a PySpark DataFrame, df1, that looks like:
Customer1 Customer2 v_cust1 v_cust2
1 2 0.9 0.1
1 3 0.3 0.4
1 4 0.2 0.9
2 1 0.8 0.8
I want to take the cosine similarity of the two dataframes. And have something like that
Customer1 Customer2 v_cust1 v_cust2 cosine_sim
1 2 0.9 0.1 0.1
1 3 0.3 0.4 0.9
1 4 0.2 0.9 0.15
2 1 0.8 0.8 1
I have a python function that receives number/array of numbers like this:
def cos_sim(a, b):
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
How can i create the cosine_sim column in my dataframe using udf? Can i pass several columns instead of one column to the udf cosine_sim function?
Upvotes: 1
Views: 667
Reputation: 7399
It would be more efficient if you'd rather use a pandas_udf.
It performs better at vectorized operations than spark udfs: Introducing Pandas UDF for PySpark
from pyspark.sql.functions import PandasUDFType, pandas_udf
import pyspark.sql.functions as F
# Names of columns
a, b = "v_cust1", "v_cust2"
cosine_sim_col = "cosine_sim"
# Make a reserved column to fill the values since the constraint of pandas_udf
# is that the input schema and output schema has to remain the same.
df = df.withColumn("cosine_sim", F.lit(1.0).cast("double"))
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def cos_sim(df):
df[cosine_sim_col] = float(np.dot(df[a], df[b]) / (np.linalg.norm(df[a]) * np.linalg.norm(df[b])))
return df
# Assuming that you want to groupby Customer1 and Customer2 for arrays
df2 = df.groupby(["Customer1", "Customer2"]).apply(cos_sim)
# But if you want to send entire columns then make a column with the same
# value in all rows and group by it. For e.g.:
df3 = df.withColumn("group", F.lit("group_a")).groupby("group").apply(cos_sim)
Upvotes: 2