Reputation: 167
I have data that gets dumped once a day to s3://<bucket>/mydata/year=*/month=*/*.snappy.parquet as cumulative data for that month. I have a crawler that crawls it to update the mydata table and a CW rule that calls a lambda upon crawler success that starts a Glue job to transform the columns and output to s3://<bucket>/mydata-transformed/year=*/month=*/*.snappy.parquet. This flow basically works. However, the current problem I have is that the output data gets additively written instead of replacing what's there (because it's cumulative data for the month). For example, suppose at midnight on Oct 1,2020, data for 10/1 gets dumped into s3://<bucket>/mydata/year=2020/month=10/*.snappy.parquet. The flow will produce transformed data in s3://<bucket>/mydata-transformed/year=2020/month=10/*.snappy.parquet, everything is good for 10/1 data. However, the next day when data for 10/1 and 10/2 gets dumped into s3://<bucket>/mydata/year=2020/month=10/*.snappy.parquet (overwrites previous day's files), the Glue job will produce additive data in the output folder, i.e. it'll contain data from yesterday's run, plus today's run (so 10/1 data twice, and 10/2 data). On next day, it'd be 10/1 data 3X, 10/2 data 2X, and 10/3 data. And so on. Data for 2020/09 and before are ok since they didn't change. Below is the basic structure of my code with boiler plate code removed and real transformation replaced by contrived one.
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydatabase", table_name = "mydata", transformation_ctx = "DataSource0")
ds_df = DataSource0.toDF()
ds_df1 = ds_df.select("year","month",upper(col('colA')),upper(col('colB')),upper(col('colC')),upper(col('colD')))
Transform0 = DynamicFrame.fromDF(ds_df1, glueContext, "Transform0")
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "parquet", connection_options = {"path": "s3://<bucket>/mydata-transformed/", "partitionKeys": ["year","month"]}, transformation_ctx = "DataSink0")
job.commit()
What can I do to make it so that for the current month, previous day' data is deleted and replaced by data from the current job? Is there a way to know that, in my example, that the month=10 partition in the source data has changed, and therefore I can purge the same partition in the output before doing the transformations and output new data?
Thanks.
[Edit] So it seems one solution would be to get the job bookmark and then use the CURR_LATEST_PARTITIONS value to determine which partition I should delete before processing the data. In my case, CURR_LATEST_PARTITIONS is 2020/09 when I'm processing 2020/10. So I know to delete the data for 2020/10, since that has to be the data that's being processed for if CURR_LATEST_PARTITIONS is 2020/09. I don't really like this solution, but I think it'll work and I'm not sure what else I could do.
Upvotes: 3
Views: 13515
Reputation: 173
You can use purge_s3_path.
Note that it wont work right out of the box because by default the 'retention period' is 7 days, meaning anything newer than 168 hours wont be deleted by purge_s3_path. So you need to specify the retention to zero for a path if you wish to delete it like below :
glueContext.purge_s3_path('s3://s3_path/bucket', options={"retentionPeriod":0})
Upvotes: 3
Reputation: 704
now exist a function in glue for delete S3 path or delete glue catalog table.
Upvotes: 3
Reputation: 447
You have a few options:
write()
. However, for really large datasets it can be a bit inefficient as a single worker will be used to overwrite existing data in S3. An example is below: sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydatabase", table_name = "mydata", transformation_ctx = "DataSource0")
ds_df = DataSource0.toDF()
ds_df1 = ds_df.select("year","month",upper(col('colA')),upper(col('colB')),upper(col('colC')),upper(col('colD')))
ds_df1 \
.write.mode('overwrite') \
.format('parquet') \
.partitionBy('year', 'month') \
.save('s3://<bucket>/mydata-transformed/')
job.commit()
import boto3
s3_res = boto3.resource('s3')
bucket = 'my-bucket-name'
# Add any logic to derive required prefix based on year/month/day
prefix = 'mydata/year=2020/month=10/'
s3_res.Bucket(bucket).objects.filter(Prefix=key).delete()
purge_s3_path
to delete data from a certain prefix. Link is here.Upvotes: 6