user3423920
user3423920

Reputation: 209

How can I use "where not exists" SQL condition in pyspark?

I have a table on Hive and I am trying to insert data in that table.
I am taking data from SQL but I don't want to insert id which already exists in the Hive table. I am trying to use the same condition like where not exists. I am using PySpark on Airflow.

Upvotes: 4

Views: 14573

Answers (3)

Steven
Steven

Reputation: 15283

The exists operator doesn't exist in Spark but there are 2 join operators that can replace it : left_anti and left_semi.

If you want for example to insert a dataframe df in a hive table target, you can do :

new_df = df.join(
    spark.table("target"),
    how='left_anti',
    on='id'
)

then you write new_df in your table.

left_anti allows you to keep only the lines which do not meet the join condition (equivalent of not exists). The equivalent of exists is left_semi.

Upvotes: 9

matkurek
matkurek

Reputation: 781

You can use not exist directly using spark SQL on the dataframes through temp views:

table_withNull_df.createOrReplaceTempView("table_withNull")
tblA_NoNull_df.createOrReplaceTempView("tblA_NoNull")

result_df = spark.sql("""
select * from table_withNull 
where not exists 
(select 1 from 
tblA_NoNull 
where table_withNull.id = tblA_NoNull.id)
""")

This method can be preferred to left anti joins since they can cause unexpected BroadcastNestedLoopJoin resulting in a broadcast timeout (even without explicitly requesting the broadcast in the anti join).

After that you can do write.mode("append") to insert the previously not encountered data.

Example taken from here

Upvotes: 4

Vzzarr
Vzzarr

Reputation: 5700

IMHO I don't think exists such a property in Spark. I think you can use 2 approaches:

  1. A workaround with the UNIQUE condition (typical of relational DB): in this way when you try to insert (in append mode) an already existing record you'll get an exception that you can properly handle.

  2. Read the table in which you want to write, outer join it with the data that you want to add to the aforementioned table and then write the result in overwrite mode (but I think that the first solution may be better in performance).

For more details feel free to ask

Upvotes: -1

Related Questions