amine jisung
amine jisung

Reputation: 117

Detect existence of column element in multiple other columns using join

I'm using PySpark 2.4.

I have a dataframe like below as input:

 ceci_p| ceci_l|ceci_stok|
-------+-------+---------+
SFIL401| BPI202|   BPI202|
 BPI202| CDC111|   BPI202|
 LBP347|SFIL402|  SFIL402|
 LBP347|SFIL402|   LBP347|
-------+-------+---------+

I want to detect which ceci_stok values exist in both ceci_l and ceci_p columns using a join (maybe a self join).

For example: ceci_stok = BPI202 exists in both ceci_l and ceci_p.

I want to create a new dataframe as a result that contains ceci_stok which exist in both ceci_l and ceci_p.

Upvotes: 1

Views: 82

Answers (3)

mazaneicha
mazaneicha

Reputation: 9427

You're right, that can be done using autojoin. If you have a dataframe

>>> df.show(truncate=False)
+-------+-------+---------+                                                     
|ceci_p |ceci_l |ceci_stok|
+-------+-------+---------+
|SFIL401|BPI202 |BPI202   |
|BPI202 |CDC111 |BPI202   |
|LBP347 |SFIL402|SFIL402  |
|LBP347 |SFIL402|LBP347   |
+-------+-------+---------+

...then the following couple of joins (with "leftsemi" to drop right-hand side) should produce what you need:

>>> df.select("ceci_stok") \
      .join(df.select("ceci_p"),df.ceci_stok == df.ceci_p,"leftsemi") \
      .join(df.select("ceci_l"),df.ceci_stok == df.ceci_l,"leftsemi") \
      .show(truncate=False)
+---------+                                                                     
|ceci_stok|
+---------+
|BPI202   |
|BPI202   |
+---------+

You can dedup the result if you're just interested in unique values.

Upvotes: 1

Matt Andruff
Matt Andruff

Reputation: 5125

#c reate data for testing 
data = [("SFIL401","BPI202","BPI202"),
("BPI202","CDC111","BPI202"),
("LBP347","SFIL402","SFIL402"),
("LBP347","SFIL402","LBP347")]

data_schema = ["ceci_p","ceci_l","ceci_stok"]

df = spark.createDataFrame(data=data, schema = data_schema)
ceci_p = df.cache()\ #don't forget to cache table you reference multiple times.
 .select( df.ceci_p.alias("join_key") )\ #rename for union
 .distinct()
ceci_l = df\
 .select( df.ceci_l.alias("join_key") )\ #rename for union
 .distinct()
vals = ceci_l.join(ceci_p,"join_key").distinct() # get unique values to both columns your interested in
df.join( vals, df.ceci_stok == vals.join_key ).show()
+-------+-------+---------+--------+
| ceci_p| ceci_l|ceci_stok|join_key|
+-------+-------+---------+--------+
|SFIL401| BPI202|   BPI202|  BPI202|
| BPI202| CDC111|   BPI202|  BPI202|
+-------+-------+---------+--------+

Upvotes: 1

ZygD
ZygD

Reputation: 24386

The following seems to be working in Spark 3.0.2. Please try it.

from pyspark.sql functions as F

df2 = (
    df.select('ceci_stok').alias('_stok')
    .join(df.alias('_p'), F.col('_stok.ceci_stok') == F.col('_p.ceci_p'), 'leftsemi')
    .join(df.alias('_l'), F.col('_stok.ceci_stok') == F.col('_l.ceci_l'), 'leftsemi')
    .distinct()
)

df2.show()
# +---------+
# |ceci_stok|
# +---------+
# |   BPI202|
# +---------+

Upvotes: 1

Related Questions