steak_ale_piethon
steak_ale_piethon

Reputation: 77

Airflow DAG - access context using SimpleHttpOperator to enable XCOM pull

I am struggling to pull xcoms into a task using the SimpleHttpOperator.

The below dag is meant to orchestrate a number of requests (via Google Cloud Functions) made to a third-party API, store the CSVs in Storage, and eventually access all the CSVs, merge them together, transform, and store in Big query.

The tasks are in pairs, for each report, the first task triggers the Cloud function that generates the request and stores the report token in Secret Manager, and the second task checks to see if the report is available to download, retrying until it is, and then saving it to Google Cloud Storage.

Once all the CSVs are available, the last task will trigger another cloud function that downloads all CSVs from storage, merges, and uploads to BQ.

When each download is complete, I'm using the response_filter arg of the SimpleHttpOperator to make the filename available for later use as an xcom.

# Python standard modules
from datetime import datetime, timedelta# Airflow modules
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
import json

default_args = {
    'owner': '--',
    'depends_on_past': False,
    # Start on 27th of June, 2020
    'start_date': datetime(2021, 6, 16),
    'email': ['--'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(seconds=60),
    'provide_context': True    
}

with DAG(
    "dailymotion_reporting",
    default_args=default_args,
    schedule_interval='0 6 * * *',
    tags=["my_dags"]
) as dag:    
    
    

    def push_xcom(**context):
        v = context['ti'].xcom_push(key="filename", value=response.json()["filename"])
        return v

    def response_check(response):
        if response[2] == "report not ready":
            print("report not ready: " + report_summary)
            return False
        elif response[2] == "report downloaded":
            print("report downloaded: " + report_summary)
            return True

    #t1 as request first report
    report1_request = SimpleHttpOperator(
        task_id= "report1_request",
        method='POST',
        http_conn_id='report_request_trigger',
        endpoint='request_dm_report',
        data=json.dumps({
                "dimensions": "DAY-VIDEO_ID-VIDEO_OWNER_CHANNEL_SLUG-VISITOR_DOMAIN_GROUP-VISITOR_SUBDOMAIN-VISITOR_DEVICE_TYPE-INVENTORY_POSITION", 
                "metrics": "TOTAL_INVENTORY", 
                "product": "EMBED"
        }),
        headers={"Content-Type": "application/json"}
    )
    #t2 check report availability until available then download
    report1_check_dl = SimpleHttpOperator(
        task_id= "report1_check_dl",
        method='GET',
        http_conn_id='report_request_trigger',
        endpoint='check_previously_requested_dm_reports',
        response_check = lambda response: True if response.json()["report_status"] == "report downloaded" else False,
        response_filter = lambda response: {"filename": response.json()["filename"]}
    )

The task that is meant to pull the CSVs out of storage is below. I am trying to retrieve the filenames from the xcoms produced by the previous tasks and include them in the data payload for my cloud function.

ad_report_transformations = SimpleHttpOperator(
    task_id= "ad_report_transformations",
    method='POST',
    http_conn_id='report_request_trigger',
    endpoint='dm_transform_ad_data',
    data = json.dumps(" {{ context['ti'].xcom_pull(task_ids=['report1_check_dl', 'report2_check_dl','report3_check_dl', 'report4_check_dl', 'report5_check_dl'], key=return_value.json()['filename']) }} "),
    response_check = lambda response: True if response == "ok" else False
)

However, having tried many different methods, I keep getting variations of the same error

{taskinstance.py:1152} ERROR - 'context' is undefined

What's the best way for me to define the context using SimpleHttpOperator? Or is there another way to pull those values in?

Most of the solutions I have seen with similar issues use the pythonOperator which has a provide_context arg that seems to enable the above, but I wanted to see if there was a way for me to do this without having to rewrite all my tasks as functions.

Upvotes: 3

Views: 4119

Answers (2)

Jarek Potiuk
Jarek Potiuk

Reputation: 20107

Why json.dumps() ?

I believe this should work just fine if you pass simply the

data = " {{ context['ti'].xcom_pull(task_ids=['report1_check_dl', 'report2_check_dl','report3_check_dl', 'report4_check_dl', 'report5_check_dl'], key=return_value.json()['filename']) }} "

json.dumps encodes the string additionally (for example it uses \') which I think messes around with jinja templating.

Upvotes: 0

Josh Fell
Josh Fell

Reputation: 3629

When retrieving XCom in Jinja templates, you don't need the context object since the context object is passed to render the template value behind the scenes. Try this:

data="{{ ti.xcom_pull(task_ids=['report1_check_dl', 'report2_check_dl','report3_check_dl', 'report4_check_dl', 'report5_check_dl'], key=return_value.json()['filename']) }}"

The above assumes you don't actually need json.dumps().

Upvotes: 5

Related Questions