Reputation: 303
I've a use case as below in PySpark:
df.show()
emp_no dept_no emp_name emp_address location
1. 10. ABC. AAA. X
2. 20. DEF. CCC. Y
3. 20. GHI. DDD. Z
4. 10. JKL. EEE. Y
And, I've below 2 lists:
dept_list = [10, 20]
location_list = ['Y', 'Z']
now i'm iterating lists for data-frame and doing joining as below:
lst = []
for a, b in zip(dept_list, location_list):
df1 = df.where(col('dept_no' == a))
df2 = df.where(col('location' == b))
join_conditions = [df1.dept_no==df2.dept_no, df1.emp_address==df2.emp_address]
result_df = df1.join(df2, join_conditions, how="inner").select(df1.emp_id, df1.emp_name)
lst.append(result_df)
Finally doing union of all as below:
from functools import reduce
from pyspark.sql import DataFrame
final_df = reduce(DataFrame.union, lst)
Now final result:
final_df.show()
emp_no emp_name
4. JKL
3. GHI
How can I avoid this FOR loop ?
EDIT1: if we have similar lists for same column [i.e., location] then how to use ? For example:
location_list_1 = ['X', 'Y']
location_list_2 = ['Z', 'Z'] # value can be repeated here, but len(location_list_1)=len(location_list_2)
lst = []
for a, b in zip(location_list_1, location_list_2):
df1 = df.where(col('location' == a))
df2 = df.where(col('location' == b))
join_conditions = [df1.dept_no==df2.dept_no, df1.emp_address==df2.emp_address]
result_df = df1.join(df2, join_conditions, how="fullouter").select(nvl(df1.emp_id, df2.emp_id), nvl(df1.emp_name, df2.emp_name))
lst.append(result_df)
And output should be:
emp_no emp_name
1. ABC # from 1st iteration in FOR loop [for location 'X' & 'Z']
3. GHI # from 1st iteration in FOR loop [for location 'X' & 'Z']
2. DEF # from 2nd iteration in FOR loop [for location 'Y' & 'Z']
4. JKL # from 2nd iteration in FOR loop [for location 'Y' & 'Z']
3. GHI # from 2nd iteration in FOR loop [for location 'Y' & 'Z']
# Here, 3-GHI should come twice.
Here similarly, how to avoid FOR loop ?
Upvotes: 1
Views: 1405
Reputation: 3286
If emp_address
is not unique then perform a self-join on it (df.join(df, 'emp_address')
) and then filter using the following condition: (dept_no, location) in zip(dept_list, location_list)
.
The easiest way to do this would be to create a tiny UDF:
def check(x, y):
return (x,y) in zip(list1, list2)
f = udf(check, StringType(), IntegerType())
df.filter(f(col(x), col(y))
Alternatively, you can concatenate the two columns as strings (using some separator) - then you can use isin
:
my_list = [f'{x}:::{y}' for x, y in zip(list1, list2)]
df.filter(concatenate(col(x), lit(':::'), col(y)).isin(my_list))
You may need to pass some parameters like r_suffix
to the join since you have the same column names on both sides of the join.
If your joins are anything but inner, you need to join first before filtering. Otherwise, filter first so you have fewer rows to join.
Upvotes: 1
Reputation: 973
I think this should work:
from functools import reduce
from operator import __or__
from pyspark.sql import functions as f
final_df = df.alias("a").join(
df.alias("b"),
on=reduce(
__or__,
[
(f.col("a.dept_no") == f.lit(a)) & (f.col("b.location") == f.lit(b))
for a, b in zip(dept_list, location_list)
]
) & (
f.col("a.dept_no") == f.col("b.dept_no")
) & (
f.col("a.emp_address") == f.col("b.emp_address")
)
).select("a.emp_id", "b.emp_name")
Upvotes: 0