DataNeer
DataNeer

Reputation: 1

How to access the response (export Manifest) from Airflow DynamoDBToS3Operator

How do I access the response of DynamoDBToS3Operator, so that I can pass the get the export location. I found out that response['ExportManifest'] contains the path to manifest_summary.json (and contains ExportId), which I can parse to get /data location.

I need this /data s3 path to be used in alter table query for Athena.

Although the export path is in my control, I do not have way to get ExportId.

I have tried doing pulling from the XCom (but it looks like nothing is pushed to Xcom). I also tried printing the response (.output) and it was None. Similarly, response_body was also None.

    @task()
    def extract_dynamo_response(response):
        print("response passed", response). # it was None
        context = get_current_context()
        response_body = context['task_instance'].xcom_pull(task_ids='dynamo_full_export_task_task_id')
        print("response in edr: ", response_body). # it was also None
        return response["ExportManifest"]
    dynamo_full_export_task = DynamoDBToS3Operator(
        task_id="dynamo_full_export_task_task_id",
        dynamodb_table_name=dag_run_config["dynamo_table"],
        s3_bucket_name=dag_run_config["bucket"],
        export_time=pendulum.now("UTC"),  
        s3_key_prefix=dag_run_config["dynamo_export_s3_loc"],
        export_format="ION",
        file_size = 10**9
    )

When calling the task, I am doing as:

dynamo_response = extract_dynamo_response(dynamo_full_export_task.output)

I need to know Exported Location (especially ExportId)

Any other ways would be also be appreciated.

Upvotes: 0

Views: 8

Answers (0)

Related Questions