coredump
coredump

Reputation: 133

Which is the correct approach to operate on Delta Files

New to Py Spark, parquet and delta ecosystem and little confused on seeing multiple ways to play with delta files.

Can someone help me understand on which one is correct or preferred for updating a delta file ? Directly operating on underlying delta file OR first create a table and then run SQL query on it.

My eventual goal is to run a pyspark script on a 10-million record file, so do share any performance tips to keep in mind.

Method 1: SQL - Read table into dataframe, create a temp view/table and run a query using sqlContext

deltadf = spark.read.format("delta").load(delta_format_tablename)
deltadf.createOrReplaceTempView("tempEmployee")
sqlContext.sql("DELETE FROM tempEmployee where id > 1200")
 
#Would this update the underlying delta file

Method 2: Directly on Table - Create delta table object and do operation directly on it.

deltaTable = DeltaTable.forPath(spark, delta_format_tablename)
deltaTable.delete("id > 1200")

Method 3: Dataframes ( no tables ) - Read table into dataframe, and iterate through it

deltadf = spark.read.format("delta").load(delta_format_tablename)
for i in range(deltadf.count()): 
    # perform action like update / delete etc 

#now sure how to update the underlying delta file

Also I do not get this statement, it is saving a df to a file while simultaneously creating a table. Would updating the table (using some sql , automatically update the file ? )

## specify a path option to save as an external table
df.write.format("delta").option("path", "/mydata").saveAsTable("MyExternalTable")

Thanks.

Upvotes: 1

Views: 296

Answers (1)

ScootCork
ScootCork

Reputation: 3686

In general, you would use a dataframe if you want to append/overwrite/merge new data to an (existing) delta table.

# example of loading from tableA and writing to tableB
# referencing the delta table by path
df = spark.read.format('delta').load(path_table_a)
df = some_tranformations(df)
df.write.format('delta').mode('append').save(path_table_b)

If you just want to delete you can work with the delta table programmatically, i.e. through the delta table object (method 2)

deltaTable = DeltaTable.forPath(spark_session, table_path)
deltaTable.delete("id > 1200")

Or you can work with it through SQL if you have registered the table in your metastore (think of it as a database catalog with metadata of your tables), this is what saveAsTable does. You could then run deletes on it using sql.

spark.sql("DELETE FROM Employee where id > 1200")

The other methods (1 and 3) are either not possible or not advised.

Upvotes: 0

Related Questions