Reputation: 168
I am working in AWS Glue environment. I read the data from Glue catalog as a Dynamic dataframe and convert it to Pyspark dataframe for my custom transformations. To do an upsert of the new/updated data, I am intending to use delta tables.
But I'm only finding options to read data as a delta table from a path. I need to convert my Pyspark dataframe to a Delta table for doing merge operations. Is there any way to do this?
Upvotes: 5
Views: 18723
Reputation: 2080
df.write.partitionBy("your_colum").format("delta").mode("overwrite").save(your_path)
PartitionBy will depend of yor df size. In small df you could skip that
Upvotes: 3
Reputation: 87279
You need to have only a destination table as Delta table. The data that you're planning to merge into is not required to be a Delta table. It's really depends on what API you're using:
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "/data/events/")
updatesDF = .... # your transformed dataframe
deltaTable.alias("target").merge(
updatesDF.alias("updates"),
"target.col1 = updates.col1") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
updates_df.createOrReplaceTempView(updates)
merge_sql = f"""
merge into target
using updates
ON source.col1 == target.col1
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
updates_df._jdf.sparkSession().sql(merge_sql)
The only catch here is that you need to use df._jdf.sparkSession().sql
to execute the SQL command in the same context where you have registered the temp view.
Upvotes: 5