Tomasz Kubat
Tomasz Kubat

Reputation: 158

Apache Airflow - use python result in the next steps

I am working on some simple Apache Airflow DAG. My goal is to:
1. calculate the data parameter based on the DAG run date - I try achieve that by the Python operator.
2. pass the parameter calculated above as a bq query parameter.

Any ideas are welcom.

My code below - I have marked the two points with I am struggling with by the 'TODO' label.

...

def set_date_param(dag_run_time):
    # a business logic applied here
    ....

    return "2020-05-28" # example result

# --------------------------------------------------------
# DAG definition below
# --------------------------------------------------------

# Python operator
set_data_param = PythonOperator(
  task_id='set_data_param',
  python_callable=set_data_param,
  provide_cotext=True,
  op_kwargs={
    "dag_run_date": #TODO - how to pass the DAG running date as a function input parameter
  },
  dag=dag
)

# bq operator
load_data_to_bq_table = BigQueryOperator(
    task_id='load_data_to_bq_table',
    sql="""SELECT ccustomer_id, sales
    FROM `my_project.dataset1.table1` 
    WHERE date_key = {date_key_param}
    """.format(
       date_key_param = 
   ), #TODO - how to get the python operator results from the previous step
    use_legacy_sql=False,
    destination_dataset_table="my_project.dataset2.table2}",
    trigger_rule='all_success',
    dag=dag
)

set_data_param >> load_data_to_bq_table

Upvotes: 0

Views: 1520

Answers (1)

SergiyKolesnikov
SergiyKolesnikov

Reputation: 7815

  1. For PythonOperator to pass the execution date to the python_callable, you only need to set provide_cotext=True (as it has been already done in your example). This way, Airflow automatically passes a collection of keyword arguments to the python callable, such that the names and values of these arguments are equivalent to the template variables described here. That is, if you define the python callable as set_data_param(ds, **kwargs): ..., the ds parameter will automatically get the execution date as a string value in the format YYYY-MM-DD.

  2. XCOM allows task instances to exchange messages. To use the date returned by set_date_param() inside the sql query string of BigQueryOperator, you can combine XCOM with Jinja templating:

sql="""SELECT ccustomer_id, sales
    FROM `my_project.dataset1.table1` 
    WHERE date_key = {{ task_instance.xcom_pull(task_ids='set_data_param') }}
    """

The following complete example puts all pieces together. In the example, the get_date task creates a date string based on the execution date. After that, the use_date task uses XCOM and Jinja templating to retrieve the date string and writes it to a log.

import logging
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

default_args = {'start_date': days_ago(1)}


def calculate_date(ds, execution_date, **kwargs):
    return f'{ds} ({execution_date.strftime("%m/%d/%Y")})'


def log_date(date_string):
    logging.info(date_string)


with DAG(
    'a_dag',
    schedule_interval='*/5 * * * *',
    default_args=default_args,
    catchup=False,
) as dag:
    get_date = PythonOperator(
        task_id='get_date',
        python_callable=calculate_date,
        provide_context=True,
    )
    use_date = PythonOperator(
        task_id='use_date',
        python_callable=log_date,
        op_args=['Date: {{ task_instance.xcom_pull(task_ids="get_date") }}'],
    )
get_date >> use_date

Upvotes: 2

Related Questions