Rajalakshmi
Rajalakshmi

Reputation: 771

Create tasks from lists - Airflow

I have a requirement where I have a list of lists like

[[a,b,c,d],[e,f,g,h],[i,j,k,l]]

Now I want to create tasks in DAG like below

a >> b >> c >> d

e >> f >> g >> h

i >> j >> k >> l

Any help is appreciated.

Upvotes: 1

Views: 1980

Answers (4)

christheliz
christheliz

Reputation: 306

With a,...,l being nodes and >> describing the arcs, we can write the nested list as a dictionary and then use this object for a directed acyclic graph. Depending on your data (e.g) the nested lists and the connections between them you can adapt the code. In the example above, we have three lists converting to three arcs. One could the list object to a dictionary like this:

graph = [["a","b","c","d"],["e","f","g","h"],["i","j","k","l"]]

dic_list = []
z = {}
for i in range(len(graph)):
    b = dict(zip(graph[i][::1], list(graph[i][1::1])))
    dic_list.append(b)
    z = {**z, **dic_list[i]}

and then use this standard code from the python documentation to build a DAG out of it like this:

   def find_all_paths(graph, start, end, path=[]):
        path = path + [start]
        if start == end:
            return [path]
        if not graph.has_key(start):
            return []
        paths = []
        for node in graph[start]:
            if node not in path:
                newpaths = find_all_paths(graph, node, end, path)
                for newpath in newpaths:
                    paths.append(newpath)
        return paths

Does this answer your question?

Upvotes: 0

Josh Fell
Josh Fell

Reputation: 3589

You can use the handy chain() function to do this in 1 line.

from airflow.models.baseoperator import chain

[a,b,c,d,e,f,g,h,i,j,k,l] = [DummyOperator(task_id=f"{i}") for i in "abcdefghijkl"]
chain([a,e,i], [b,f,j], [c,g,k], [d,h,l])

enter image description here

Upvotes: 6

Javier Lopez Tomas
Javier Lopez Tomas

Reputation: 2342

op_lists = [[a,b,c,d],[e,f,g,h],[i,j,k,l]]
for op_list in op_lists:
    for i in range(len(op_list) - 1):
        op_list[i] >> op_list[i + 1]

EDIT: I didn't see balderman's answer. He was first

Upvotes: 0

balderman
balderman

Reputation: 23815

Assuming a,b,c,... are operators - the below should do the job (mocking airflow operator)

class Operator:
    def __init__(self, name):
        self.name = name

    def set_downstream(self, other):
        print(f'{self}: setting {other} as downstream')

    def __str__(self) -> str:
        return self.name


a = Operator('a')
b = Operator('b')
c = Operator('c')
d = Operator('d')
e = Operator('e')
f = Operator('f')
lst = [[a, b, c], [e, f, d]]

for oper_lst in lst:
    for i in range(0, len(oper_lst) - 1):
        oper_lst[i].set_downstream(oper_lst[i + 1])

output

a: setting b as downstream
b: setting c as downstream
e: setting f as downstream
f: setting d as downstream

Upvotes: 0

Related Questions