Anoop R
Anoop R

Reputation: 555

Getting unique_id for apache airflow tasks

I am new to airflow .In my company for ETL pipeline currently we are using Crontab and custom Scheduler(developed in-house) .Now we are planning to implement apache airflow for our all Data Pipe-line scenarios .For that while exploring the features not able to find unique_id for each Task Instances/Dag .When I searched most of the solutions ended up in macros and template .But none of them are not providing a uniqueID for a task .But I am able to see incremental uniqueID in the UI for each tasks .Is there any way to easily access those variables inside my python method .The main use case is I need to pass those ID's as a parameter to out Python/ruby/Pentaho jobs which is called as scripts/Methods .

For Example

my shell script 'test.sh ' need two arguments one is run_id and other is collection_id. Currently we are generating this unique run_id from a centralised Database and passing it to the jobs .If it is already present in the airflow context we are going to use that

from airflow.operators.bash_operator import BashOperator
from datetime import date, datetime, timedelta
from airflow import DAG

shell_command =  "/data2/test.sh -r run_id -c collection_id"


putfiles_s3 = BashOperator(
                task_id='putfiles_s3',
                bash_command=shell_command,
                dag=dag)

Looking for a unique run_id(Either Dag level/task level) for each run while executing this Dag(scheduled/manual)

Note: This is a sample task .There will be multiple dependant task to this Dag . Attaching Job_Id screenshot from airflow UI enter image description here

Thanks Anoop R

Upvotes: 7

Views: 11538

Answers (1)

Ash Berlin-Taylor
Ash Berlin-Taylor

Reputation: 4048

{{ ti.job_id }} is what you want:

from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow import DAG


dag = DAG(
    "job_id",
    start_date=datetime(2018, 1, 1),
)

with dag:
    BashOperator(
        task_id='unique_id',
        bash_command="echo {{ ti.job_id }}",
    )

This will be valid at runtime. A log from this execution looks like:

[2018-01-03 10:28:37,523] {bash_operator.py:80} INFO - Temporary script location: /tmp/airflowtmpcj0omuts//tmp/airflowtmpcj0omuts/unique_iddq7kw0yj  
[2018-01-03 10:28:37,524] {bash_operator.py:88} INFO - Running command: echo 4
[2018-01-03 10:28:37,621] {bash_operator.py:97} INFO - Output:
[2018-01-03 10:28:37,648] {bash_operator.py:101} INFO - 4

Note that this will only be valid at runtime, so the "Rendered Template" view in the webui will show None instead of a number.

Upvotes: 7

Related Questions