Reputation: 13
In Azure Databricks, when I have a parquet file that is not partitioned by some column. And subsequently append a new dataframe with partitionBy("some_column"), the data of my original "unpartitioned" dataframe is overwritten. Why does this happen? Shouldn't the append repartition my initial data? Or at least give a warning the data is overwritten?
## Example 1, only data from df2 is in file_path
# Init dataframe
df1.write.mode("overwrite").parquet(file_path)
# Append with partitionBy
df2.write.mode("append").partitionBy("column1").parquet(file_path)
## Example 2, correctly appended data frome both frames is in file_path
# Init dataframe
df1.write.mode("overwrite").partitionBy("column1").parquet(file_path)
# Append with partitionBy
df2.write.mode("append").partitionBy("column1").parquet(file_path)
Upvotes: 1
Views: 50
Reputation: 710
The behavior you observed in Example 1 occurs because appending a dataframe with a different partitioning to an existing Parquet file does not trigger a repartitioning of the original data. Instead, new directories are created under the original one to store partitions. When Spark reads Parquet after writing the second dataframe, your original metadata gets lost.
Some other implementations, such as Delta tables raise an error when you try to write with different partitions.
This example shows your use case with some mock data:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Define schema
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True)
])
# Create data
data1 = [(1, "Alice"),
(2, "Bob"),
(3, "Charlie")]
data2 = [(1, "Alice"),
(2, "Bob"),
(3, "Charlie"),
(4, "Joe")]
# Create DataFrame
df1 = spark.createDataFrame(data1, schema)
df2 = spark.createDataFrame(data2, schema)
df1.write.mode("overwrite").format('delta').save('dbfs:/FileStore/testdelta')
# Append with partitionBy
df2.write.mode("append").partitionBy("name").format('delta').save('dbfs:/FileStore/testdelta')
display(spark.read.format('delta').load('dbfs:/FileStore/testdelta'))
The code should raise an exception as follows:
AnalysisException: Partition columns do not match the partition columns of the table.
Given: [`name`]
Table: []
Preventing your data get lost.
This article dives into the internal Parquet files structure and the Spark reader:
Upvotes: 0