Reputation: 1
How do I access the response of DynamoDBToS3Operator, so that I can pass the get the export location. I found out that response['ExportManifest'] contains the path to manifest_summary.json (and contains ExportId), which I can parse to get /data location.
I need this /data s3 path to be used in alter table query for Athena.
Although the export path is in my control, I do not have way to get ExportId.
I have tried doing pulling from the XCom (but it looks like nothing is pushed to Xcom). I also tried printing the response (.output) and it was None. Similarly, response_body was also None.
@task()
def extract_dynamo_response(response):
print("response passed", response). # it was None
context = get_current_context()
response_body = context['task_instance'].xcom_pull(task_ids='dynamo_full_export_task_task_id')
print("response in edr: ", response_body). # it was also None
return response["ExportManifest"]
dynamo_full_export_task = DynamoDBToS3Operator(
task_id="dynamo_full_export_task_task_id",
dynamodb_table_name=dag_run_config["dynamo_table"],
s3_bucket_name=dag_run_config["bucket"],
export_time=pendulum.now("UTC"),
s3_key_prefix=dag_run_config["dynamo_export_s3_loc"],
export_format="ION",
file_size = 10**9
)
When calling the task, I am doing as:
dynamo_response = extract_dynamo_response(dynamo_full_export_task.output)
I need to know Exported Location (especially ExportId)
Any other ways would be also be appreciated.
Upvotes: 0
Views: 8