Reputation: 351
I'm attempting to write pyspark code in Glue that lets me update the Glue Catalog by adding new partitions and overwrite existing partitions in the same call.
I read that there is no way to overwrite partitions in Glue so we must use pyspark code similar to this:
final_df.withColumn('year', date_format('date', 'yyyy'))\
.withColumn('month', date_format('date', 'MM'))\
.withColumn('day', date_format('date', 'dd'))\
.write.mode('overwrite')\
.format('parquet')\
.partitionBy('year', 'month', 'day')\
.save('s3://my_bucket/')
However with this method, the Glue Catalog does not get updated automatically so an msck repair table
call is needed after each write. Recently AWS released a new feature enableUpdateCatalog
, where newly created partitions are immediately updated in the Glue Catalog. The code looks like this:
additionalOptions = {"enableUpdateCatalog": True}
additionalOptions["partitionKeys"] = ["year", "month", "day"]
dyn_frame_catalog = glueContext.write_dynamic_frame_from_catalog(
frame=partition_dyf,
database = "my_db",
table_name = "my_table",
format="parquet",
additional_options=additionalOptions,
transformation_ctx = "my_ctx"
)
Is there a way to combine these 2 commands or will I need to use the pyspark method with write.mode('overwrite')
and run an MSCK REPAIR TABLE my_table
on every run of the Glue job?
Upvotes: 2
Views: 4438
Reputation: 56
If you have not already found your answer, I believe the following will work:
DataSink5 = glueContext.getSink(
path = "s3://...",
connection_type = "s3",
updateBehavior = "UPDATE_IN_DATABASE",
partitionKeys = ["year", "month", "day"],
enableUpdateCatalog = True,
transformation_ctx = "DataSink5")
DataSink5.setCatalogInfo(
catalogDatabase = "my_db",
catalogTableName = "my_table")
DataSink5.setFormat("glueparquet")
DataSink5.writeFrame(partition_dyf)
Upvotes: 2