Visya
Visya

Reputation: 134

Reuse parameter value across different tasks in Airflow

How do I reuse a value that is calculated on the DAG run between tasks?

I'm trying to generate a timestamp in my DAG and use it in several tasks. So far I tried setting a Variable and a params value - nothing works, it's unique per each task run.

Here is my code:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import Variable
from airflow.utils.dates import days_ago

from airflow.providers.amazon.aws.operators.athena import AWSAthenaOperator
from airflow.providers.amazon.aws.operators.glue import AwsGlueJobOperator

default_args = {
    "sla": timedelta(hours=1),
}

config = Variable.get("config", deserialize_json=True)
athena_output_bucket = config["athena_output_bucket"]
glue_db = config["glue_db"]
bucket = config["bucket"]
region = config["region"]


def get_snapshot_timestamp(time_of_run=None):
    if not time_of_run:
        time_of_run = datetime.now()

    timestamp = time_of_run.timestamp() * 1000
    return str(int(timestamp))


class TemplatedArgsGlueOperator(AwsGlueJobOperator):
    template_fields = ("script_args",)


table = "my_table"

with DAG(
    "my-table-export",
    default_args=default_args,
    description="Export my table from DynamoDB to S3",
    schedule_interval=timedelta(days=1),
    start_date=days_ago(1),
    params={
        "snapshot_ts": get_snapshot_timestamp(),
        "athena_output_location": f"s3://{athena_output_bucket}/{table}",
        "table": table,
    },
) as dag:
    my_table_export_to_s3 = TemplatedArgsGlueOperator(
        task_id="my_table_export_to_s3",
        job_name="my-table-export-to-s3",
        num_of_dpus=2,
        region_name=region,
        script_args={"--snapshot_ts": "{{ params.snapshot_ts }}"},
    )

    add_new_partition = AWSAthenaOperator(
        task_id="add_new_partition",
        query="""
          ALTER TABLE {{ params.table }} ADD PARTITION (snapshot_ts = '{{ params.snapshot_ts }}')
          LOCATION 's3://{{ var.json.config.bucket }}/{{ params.table }}/snapshot_ts={{ params.snapshot_ts }}'
        """,
        database=glue_db,
        output_location="{{ params.athena_output_location }}",
    )

    update_latest_view = AWSAthenaOperator(
        task_id="update_latest_view",
        query="""
          CREATE OR REPLACE VIEW {{ params.table }}_latest AS
          SELECT * from {{ params.table }}
          WHERE snapshot_ts = '{{ params.snapshot_ts }}'
        """,
        database=glue_db,
        output_location="{{ params.athena_output_location }}",
    )


my_table_export_to_s3 >> add_new_partition >> update_latest_view

I want snapshot_ts to be the same across all three tasks, but it's different. What am I doing wrong?

Upvotes: 0

Views: 1307

Answers (1)

floating_hammer
floating_hammer

Reputation: 439

This should be possible via xcom. xCom is used precisely for exchanging information between various tasks. To quote

XComs let tasks exchange messages, allowing more nuanced forms of control and shared state. The name is an abbreviation of “cross-communication”. XComs are principally defined by a key, value, and timestamp, but also track attributes like the task/DAG that created the XCom and when it should become visible. Any object that can be pickled can be used as an XCom value, so users should make sure to use objects of appropriate size.

In xCom a pythonoperator is used to call a function. That function pushes some values into a table called xcom in inside airflow metadata db. The same is then access via other DAGs or Tasks.

An example of how to do it all is here - https://www.cloudwalker.io/2020/01/28/airflow-xcom/

Upvotes: 1

Related Questions