Reputation: 1251
Suppose a following situation:
[c1, c2, c3] >> child_task
where all c1
, c2
, c3
and child_task
are operators and have task_id
equal to id1
, id2
, id3
and child_id
respectively.
Task child_task
is also a PythonOperator
with provide_context=True
and python_callable=dummy_func
def dummy_func(**context):
#...
Is it possible to retrieve all parents' ids inside the dummy_func
(perhaps by browsing the dag somehow using the context)?
Expected result in this case would be a list ['id1', 'id2', 'id3']
.
Upvotes: 7
Views: 12538
Reputation: 420
you could do something like the below to get all the task ids from within the dag, this should be helpful for anyone looking for a working answer
def get_all_upstream_task_ids(task):
upstream_task_ids = set()
for upstream_task in task.get_direct_relatives(upstream=True):
upstream_task_ids.add(upstream_task.task_id)
upstream_task_ids.update(get_all_upstream_task_ids(upstream_task))
return list(upstream_task_ids)
get_all_upstream_task_ids(context['task'])
Upvotes: 0
Reputation: 121
A modern approach with Airflow 2
from airflow.decorators import dag, task
...
@task
def task1():
return "x"
@task
def task2():
return "y"
@task
def print_task_ids(x, y, **context):
dag_run = context["dag_run"]
tis = dag_run.get_task_instances()
for ti in tis:
print(ti.task_id)
@dag(**dag_args)
def show_task_ids():
print_task_ids(task1(), task2())
dag = show_task_ids()
Upvotes: -1
Reputation: 11607
The upstream_task_ids
and downstream_task_ids
properties of BaseOperator
are meant just for this purpose.
from typing import List
..
parent_task_ids: List[str] = my_task.upstream_task_ids
child_task_ids: List[str] = my_task_downstream_task_ids
Do note however that with this property
, you only get immediate (upstream / downstream) neighbour(s) of a task. In order to get all ancestor or descendent task
s, you can quickly cook-up the good old graph theory approach such as this BFS
-like implementation
from typing import List, Set
from queue import Queue
from airflow.models import BaseOperator
def get_ancestor_tasks(my_task: BaseOperator) -> List[BaseOperator]:
ancestor_task_ids: Set[str] = set()
tasks_queue: Queue = Queue()
# determine parent tasks to begin BFS
for task in my_task.upstream_list:
tasks_queue.put(item=task)
# perform BFS
while not tasks_queue.empty():
task: BaseOperator = tasks_queue.get()
ancestor_task_ids.add(element=task.task_id)
for _task in task.upstream_list:
tasks_queue.put(item=_task)
# Convert task_ids to actual tasks
ancestor_tasks: List[BaseOperator] = [task for task in my_task.dag.tasks if task.task_id in ancestor_task_ids]
return ancestor_tasks
Above snippet is NOT tested, but I'm sure you can take inspiration from it
References
Upvotes: 12