Reputation: 243
Say I have two PySpark DataFrames df1
and df2
.
df1= 'a'
1
2
5
df2= 'b'
3
6
And I want to find the closest df2['b']
value for each df1['a']
, and add the closest values as a new column in df1
.
In other words, for each value x
in df1['a']
, I want to find a y
that achieves min(abx(x-y))
for all y in df2['b']
(note: can assume that there is only one y
that can achieve the minimum distance), and the result would be
'a' 'b'
1 3
2 3
5 6
I tried the following code to create a distance matrix first (before finding the values achieving the minimum distance):
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
def dict(x,y):
return abs(x-y)
udf_dict = udf(dict, IntegerType())
sql_sc = SQLContext(sc)
udf_dict(df1.a, df2.b)
which gives
Column<PythonUDF#dist(a,b)>
Then I tried
sql_sc.CreateDataFrame(udf_dict(df1.a, df2.b))
which runs forever without giving error/output.
My questions are:
a
and b
values first, and then find the min
one)Upvotes: 10
Views: 41792
Reputation: 13946
Starting with your second question - you can apply udf only to existing dataframe, I think you were thinking for something like this:
>>> df1.join(df2).withColumn('distance', udf_dict(df1.a, df2.b)).show()
+---+---+--------+
| a| b|distance|
+---+---+--------+
| 1| 3| 2|
| 1| 6| 5|
| 2| 3| 1|
| 2| 6| 4|
| 5| 3| 2|
| 5| 6| 1|
+---+---+--------+
But there is a more efficient way to apply this distance, by using internal abs
:
>>> from pyspark.sql.functions import abs
>>> df1.join(df2).withColumn('distance', abs(df1.a -df2.b))
Then you can find matching numbers by calculating:
>>> distances = df1.join(df2).withColumn('distance', abs(df1.a -df2.b))
>>> min_distances = distances.groupBy('a').agg(min('distance').alias('distance'))
>>> distances.join(min_distances, ['a', 'distance']).select('a', 'b').show()
+---+---+
| a| b|
+---+---+
| 5| 6|
| 1| 3|
| 2| 3|
+---+---+
Upvotes: 19