Chianti5
Chianti5

Reputation: 243

Pyspark Dataframe Apply function to two columns

Say I have two PySpark DataFrames df1 and df2.

df1=   'a' 
        1    
        2    
        5    

df2=   'b'
        3
        6

And I want to find the closest df2['b'] value for each df1['a'], and add the closest values as a new column in df1.

In other words, for each value x in df1['a'], I want to find a y that achieves min(abx(x-y)) for all y in df2['b'](note: can assume that there is only one y that can achieve the minimum distance), and the result would be

'a'    'b'
 1      3
 2      3
 5      6

I tried the following code to create a distance matrix first (before finding the values achieving the minimum distance):

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def dict(x,y):
    return abs(x-y)
udf_dict = udf(dict, IntegerType())

sql_sc = SQLContext(sc)
udf_dict(df1.a, df2.b)

which gives

Column<PythonUDF#dist(a,b)>

Then I tried

sql_sc.CreateDataFrame(udf_dict(df1.a, df2.b))

which runs forever without giving error/output.

My questions are:

  1. As I'm new to Spark, is my way to construct the output DataFrame efficient? (My way would be creating a distance matrix for all the a and b values first, and then find the min one)
  2. What's wrong with the last line of my code and how to fix it?

Upvotes: 10

Views: 41792

Answers (1)

Mariusz
Mariusz

Reputation: 13946

Starting with your second question - you can apply udf only to existing dataframe, I think you were thinking for something like this:

>>> df1.join(df2).withColumn('distance', udf_dict(df1.a, df2.b)).show()
+---+---+--------+
|  a|  b|distance|
+---+---+--------+
|  1|  3|       2|
|  1|  6|       5|
|  2|  3|       1|
|  2|  6|       4|
|  5|  3|       2|
|  5|  6|       1|
+---+---+--------+

But there is a more efficient way to apply this distance, by using internal abs:

>>> from pyspark.sql.functions import abs
>>> df1.join(df2).withColumn('distance', abs(df1.a -df2.b))

Then you can find matching numbers by calculating:

>>> distances = df1.join(df2).withColumn('distance', abs(df1.a -df2.b))
>>> min_distances = distances.groupBy('a').agg(min('distance').alias('distance'))
>>> distances.join(min_distances, ['a', 'distance']).select('a', 'b').show()
+---+---+                                                                       
|  a|  b|
+---+---+
|  5|  6|
|  1|  3|
|  2|  3|
+---+---+

Upvotes: 19

Related Questions