Reputation: 462
I have 2 pyspark dataframes and I want to check if the values of one column exist in a column in the other dataframe.
I have only seen solutions of how to filter the values that exist (like this), what I need to do is to return a column of true or false.
In pandas it will be something like this:
df_A["column1"].isin(df_B["column1"])
thanks in advance!
Upvotes: 2
Views: 14772
Reputation: 184
To tweak Mck's answer a little bit (drop duplicate df_A
entries and select the relevant columns):
SHORT
df_A = df_A.withColumn("uid_A", F.monotonically_increasing_id()) # Add an index column
df_B = df_B.withColumn("uid_B", F.monotonically_increasing_id()) # Add an index column
df_A = df_A.alias('df_A')
(
df_A
.join(df_B.withColumn('flag',F.lit(True)),'col1','left')
.fillna(False)
.dropDuplicates(['uid_A'])
.select('df_A.col1','flag')
).show()
'''
+----+-----+
|col1| flag|
+----+-----+
| A|false|
| B|false|
| C| true|
| B|false|
| C| true|
| D| true|
+----+-----+
'''
FULL
### Initialise dataframe ###
df_A = pd.DataFrame({'col1': ['A', 'B', 'C', 'B', 'C', 'D'],
'col2': [1, 2, 3, 4, 5, 6]})
df_A = spark.createDataFrame(df_A)
df_B = pd.DataFrame({'col1': ['C', 'E', 'D', 'C', 'F', 'G', 'H'],
'col2': [10, 20, 30, 40, 50, 60, 70]})
df_A = spark.createDataFrame(df_B)
### PREVIOUS ###
df_A.join(df_B.withColumn('flag',F.lit(True)),'col1','left').fillna(False).show()
'''
+----+----+----+-----+
|col1|col2|col2| flag|
+----+----+----+-----+
| A| 1|null|false|
| B| 2|null|false|
| B| 4|null|false|
| C| 3| 10| true|
| C| 3| 40| true|
| C| 5| 10| true|
| C| 5| 40| true|
| D| 6| 30| true|
+----+----+----+-----+
'''
### BETTER ###
df_A = df_A.withColumn("uid_A", F.monotonically_increasing_id()) # Add an index column
df_B = df_B.withColumn("uid_B", F.monotonically_increasing_id()) # Add an index column
df_A = df_A.alias('df_A')
(
df_A
.join(df_B.withColumn('flag',F.lit(True)),'col1','left')
.fillna(False)
.dropDuplicates(['uid_A'])
.select('df_A.col1','flag')
).show()
'''
+----+-----+
|col1| flag|
+----+-----+
| A|false|
| B|false|
| C| true|
| B|false|
| C| true|
| D| true|
+----+-----+
'''
Extra nuggets: To take only column values based on the True
/False
values of the .isin
results, it may be more straightforward to use pyspark's leftsemi
join which takes only the left table columns based on the matching results of the specified cols on the right, shown also in this stackoverflow post.
# Pandas (isin)
df_A["column1"].isin(df_B["column1"])
# Pyspark (isin equivalent, applied to column values)
df_A.join(df_B, ["column1"], "leftsemi").select("column1")
'''
+----+----+
|col1|col2|
+----+----+
| C| 3|
| C| 5|
| D| 6|
+----+----+
'''
# Pandas (~isin)
df_A["column1"].isin(df_B["column1"])
# Pyspark (~isin equivalent, applied to column values)
df_A.join(df_B, ["column1"], "leftanti").select("column1")
Further join examples/info - PySpark Join Types | Join Two DataFrames
Upvotes: 0
Reputation: 42352
EDIT: a cleaner way to do this:
import pyspark.sql.functions as F
result = df1.join(df2.withColumn('flag', F.lit(True)), 'col1', 'left').fillna(False)
result.show()
+----+-----+
|col1| flag|
+----+-----+
| 0| true|
| 1| true|
| 2|false|
+----+-----+
Old answer:
df1 = spark.createDataFrame(([0],[1],[2])).toDF('col1')
df2 = spark.createDataFrame(([0],[1],[3])).toDF('col1')
df1.join(df2, 'col1', 'semi') \
.distinct() \
.select('col1', F.lit(True).alias('flag')) \
.join(df1, 'col1', 'right') \
.fillna(False, 'flag') \
.show()
+----+-----+
|col1| flag|
+----+-----+
| 0| true|
| 1| true|
| 2|false|
+----+-----+
Upvotes: 9
Reputation: 589
you can collect all the values in column1 and then make a broadcast variable from it, on which you can write a udf
from pyspark.sql import udf
from pyspark.sql.types import BooleanType
df_B_col_1_values = df_B.rdd.map(lambda x: x.column1).distinct().collect()
df_B_col_1_values = sc.broadcast(set(df_B_col_1_values))
my_udf = udf(lambda x: x in df_B_col_1_values.value, BooleanType())
df_A = df_A.withColumn('column1_present', my_udf(col('column1'))
Upvotes: 2