Andrés Bustamante
Andrés Bustamante

Reputation: 462

Check if values of column pyspark df exist in other column pyspark df

I have 2 pyspark dataframes and I want to check if the values of one column exist in a column in the other dataframe.

I have only seen solutions of how to filter the values that exist (like this), what I need to do is to return a column of true or false.

In pandas it will be something like this:

df_A["column1"].isin(df_B["column1"])

thanks in advance!

Upvotes: 2

Views: 14772

Answers (3)

decodering
decodering

Reputation: 184

To tweak Mck's answer a little bit (drop duplicate df_A entries and select the relevant columns):

SHORT

df_A = df_A.withColumn("uid_A", F.monotonically_increasing_id())  # Add an index column
df_B = df_B.withColumn("uid_B", F.monotonically_increasing_id())  # Add an index column

df_A = df_A.alias('df_A')
(
    df_A
    .join(df_B.withColumn('flag',F.lit(True)),'col1','left')
    .fillna(False)
    .dropDuplicates(['uid_A'])
    .select('df_A.col1','flag')
).show()

'''
+----+-----+
|col1| flag|
+----+-----+
|   A|false|
|   B|false|
|   C| true|
|   B|false|
|   C| true|
|   D| true|
+----+-----+
'''

FULL


### Initialise dataframe ###

df_A = pd.DataFrame({'col1': ['A', 'B', 'C', 'B', 'C', 'D'], 
                     'col2': [1, 2, 3, 4, 5, 6]})
df_A = spark.createDataFrame(df_A)

df_B = pd.DataFrame({'col1': ['C', 'E', 'D', 'C', 'F', 'G', 'H'], 
                     'col2': [10, 20, 30, 40, 50, 60, 70]})
df_A = spark.createDataFrame(df_B)

### PREVIOUS ###

df_A.join(df_B.withColumn('flag',F.lit(True)),'col1','left').fillna(False).show()

'''
+----+----+----+-----+
|col1|col2|col2| flag|
+----+----+----+-----+
|   A|   1|null|false|
|   B|   2|null|false|
|   B|   4|null|false|
|   C|   3|  10| true|
|   C|   3|  40| true|
|   C|   5|  10| true|
|   C|   5|  40| true|
|   D|   6|  30| true|
+----+----+----+-----+
'''

### BETTER ###

df_A = df_A.withColumn("uid_A", F.monotonically_increasing_id())  # Add an index column
df_B = df_B.withColumn("uid_B", F.monotonically_increasing_id())  # Add an index column

df_A = df_A.alias('df_A')
(
    df_A
    .join(df_B.withColumn('flag',F.lit(True)),'col1','left')
    .fillna(False)
    .dropDuplicates(['uid_A'])
    .select('df_A.col1','flag')
).show()

'''
+----+-----+
|col1| flag|
+----+-----+
|   A|false|
|   B|false|
|   C| true|
|   B|false|
|   C| true|
|   D| true|
+----+-----+
'''

Extra nuggets: To take only column values based on the True/False values of the .isin results, it may be more straightforward to use pyspark's leftsemi join which takes only the left table columns based on the matching results of the specified cols on the right, shown also in this stackoverflow post.

# Pandas (isin)
df_A["column1"].isin(df_B["column1"])

# Pyspark (isin equivalent, applied to column values)
df_A.join(df_B, ["column1"], "leftsemi").select("column1")

'''
+----+----+
|col1|col2|
+----+----+
|   C|   3|
|   C|   5|
|   D|   6|
+----+----+
'''

# Pandas (~isin)
df_A["column1"].isin(df_B["column1"])

# Pyspark (~isin equivalent, applied to column values)
df_A.join(df_B, ["column1"], "leftanti").select("column1")


Further join examples/info - PySpark Join Types | Join Two DataFrames

Upvotes: 0

mck
mck

Reputation: 42352

EDIT: a cleaner way to do this:

import pyspark.sql.functions as F

result = df1.join(df2.withColumn('flag', F.lit(True)), 'col1', 'left').fillna(False)

result.show()
+----+-----+
|col1| flag|
+----+-----+
|   0| true|
|   1| true|
|   2|false|
+----+-----+

Old answer:

df1 = spark.createDataFrame(([0],[1],[2])).toDF('col1')
df2 = spark.createDataFrame(([0],[1],[3])).toDF('col1')

df1.join(df2, 'col1', 'semi') \
   .distinct() \
   .select('col1', F.lit(True).alias('flag')) \
   .join(df1, 'col1', 'right') \
   .fillna(False, 'flag') \
   .show()

+----+-----+
|col1| flag|
+----+-----+
|   0| true|
|   1| true|
|   2|false|
+----+-----+

Upvotes: 9

ATIF ADIB
ATIF ADIB

Reputation: 589

you can collect all the values in column1 and then make a broadcast variable from it, on which you can write a udf

from pyspark.sql import udf
from pyspark.sql.types import BooleanType

df_B_col_1_values = df_B.rdd.map(lambda x: x.column1).distinct().collect()
df_B_col_1_values = sc.broadcast(set(df_B_col_1_values))

my_udf = udf(lambda x: x in df_B_col_1_values.value, BooleanType())
df_A = df_A.withColumn('column1_present', my_udf(col('column1'))

Upvotes: 2

Related Questions