Koushik Chandra
Koushik Chandra

Reputation: 1491

Implementing MERGE INTO sql in pyspark

How can problemmatically (pyspark) sql MERGE INTO statement can be achieved. I have two tables which I have table into temporary view using createOrReplaceTempView option. Then I tried using MERGE INTO statement on those two temporary views. But it is failing. The reason can be MERGE is not supported in SPARK SQL. Can some one give an hint how could a simple MERGE INTO SQL equivalent statement (something like below) can be implemented programmatically in pyspark.

MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
  UPDATE SET
    events.data = updates.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)

Upvotes: 9

Views: 11198

Answers (1)

Manu Gupta
Manu Gupta

Reputation: 830

Merge does not support directly but we if we are ok to overwrite the complete table then you can follow the approach.

hiveContext.sql("select * from events").registerTempTable("temp_events")
hiveContext.sql("select * from updates").registerTempTable("temp_updates")

hiveContext("""
select
case when b.eventId is null then a.date else b.date as date,
case when b.eventId is null then a.eventId else b.eventId end as eventId,
case when b.eventId is null then a.data else b.data as data
from
temp_events a
full outer join
temp_updates b
on a.eventId=b.eventId
""").registerTempTable("FinalData")

hiveContext.sql("INSERT OVERWRITE TABLE table_name select * from FinalData")

Using the case, we are making sure if the data is available in new set then we are taking those values else we will be taking the older values.

Please check if this solution works for you.

Thanks, Manu

Upvotes: 4

Related Questions