TadeM
TadeM

Reputation: 71

PySpark best way to filter df based on columns from different df's

I have a DF A_DF which has among others two columns say COND_B and COND_C. Then I have 2 different df's B_DF with COND_B column and C_DF with COND_C column.

Now I would like to filter A_DF where the value match in one OR the other. Something like:

df = A_DF.filter((A_DF.COND_B == B_DF.COND_B) | (A_DF.COND_C == C_DF.COND_C))

But I found out it is not possible like this.

EDIT error: Attribute CON_B#264,COND_C#6 is missing from the schema: [... COND_B#532, COND_C#541 ]. Attribute(s) with the same name appear in the operation: COND_B,COND_C. Please check if the right attribute(s) are used.; looks like I can filter only on same DF because of the #number added on the fly..

So I first tried to do a list from B_DF and C_DF and use filter based on that but it was too expensive to use collect() on 100m of records.

So I tried:

AB_DF = A_DF.join(B_DF, 'COND_B', 'left_semi')
AC_DF = A_DF.join(C_DF, 'COND_C', 'left_semi')

df = AB_DF.unionAll(AC_DF).dropDuplicates()

dropDuplicates() I used to removed duplicate records where both conditions where true. But even with that I got some unexpected results.

Is there some other - smoother solution to do it simply? Something like an EXISTS statement in SQL?

EDIT2 I tried SQL based on @mck response:

e.createOrReplaceTempView('E')
b.createOrReplaceTempView('B')
p.createOrReplaceTempView('P')

df = spark.sql("""select * from E where exists (select 1 from B where E.BUSIPKEY = B.BUSIPKEY) or exists (select 1 from P where E.PCKEY = P.PCKEY)""")

my_output.write_dataframe(df)

with error:

Traceback (most recent call last):
  File "/myproject/abc.py", line 45, in my_compute_function
    df = spark.sql("""select * from E where exists (select 1 from B where E.BUSIPKEY = B.BUSIPKEY) or exists (select 1 from P where E.PCKEY = P.PCKEY)""")
TypeError: sql() missing 1 required positional argument: 'sqlQuery'

Thanks a lot!

Upvotes: 0

Views: 137

Answers (1)

mck
mck

Reputation: 42392

Your idea of using exists should work. You can do:

A_DF.createOrReplaceTempView('A')
B_DF.createOrReplaceTempView('B')
C_DF.createOrReplaceTempView('C')

df = spark.sql("""
    select * from A 
    where exists (select 1 from B where A.COND_B = B.COND_B) 
    or exists (select 1 from C where A.COND_C = C.COND_C)
""")

Upvotes: 1

Related Questions