Reputation: 771
I have a requirement where I have a list of lists like
[[a,b,c,d],[e,f,g,h],[i,j,k,l]]
Now I want to create tasks in DAG like below
a >> b >> c >> d
e >> f >> g >> h
i >> j >> k >> l
Any help is appreciated.
Upvotes: 1
Views: 1980
Reputation: 306
With a,...,l being nodes and >> describing the arcs, we can write the nested list as a dictionary and then use this object for a directed acyclic graph. Depending on your data (e.g) the nested lists and the connections between them you can adapt the code. In the example above, we have three lists converting to three arcs. One could the list object to a dictionary like this:
graph = [["a","b","c","d"],["e","f","g","h"],["i","j","k","l"]]
dic_list = []
z = {}
for i in range(len(graph)):
b = dict(zip(graph[i][::1], list(graph[i][1::1])))
dic_list.append(b)
z = {**z, **dic_list[i]}
and then use this standard code from the python documentation to build a DAG out of it like this:
def find_all_paths(graph, start, end, path=[]):
path = path + [start]
if start == end:
return [path]
if not graph.has_key(start):
return []
paths = []
for node in graph[start]:
if node not in path:
newpaths = find_all_paths(graph, node, end, path)
for newpath in newpaths:
paths.append(newpath)
return paths
Does this answer your question?
Upvotes: 0
Reputation: 3589
You can use the handy chain()
function to do this in 1 line.
from airflow.models.baseoperator import chain
[a,b,c,d,e,f,g,h,i,j,k,l] = [DummyOperator(task_id=f"{i}") for i in "abcdefghijkl"]
chain([a,e,i], [b,f,j], [c,g,k], [d,h,l])
Upvotes: 6
Reputation: 2342
op_lists = [[a,b,c,d],[e,f,g,h],[i,j,k,l]]
for op_list in op_lists:
for i in range(len(op_list) - 1):
op_list[i] >> op_list[i + 1]
EDIT: I didn't see balderman's answer. He was first
Upvotes: 0
Reputation: 23815
Assuming a,b,c,... are operators - the below should do the job (mocking airflow operator)
class Operator:
def __init__(self, name):
self.name = name
def set_downstream(self, other):
print(f'{self}: setting {other} as downstream')
def __str__(self) -> str:
return self.name
a = Operator('a')
b = Operator('b')
c = Operator('c')
d = Operator('d')
e = Operator('e')
f = Operator('f')
lst = [[a, b, c], [e, f, d]]
for oper_lst in lst:
for i in range(0, len(oper_lst) - 1):
oper_lst[i].set_downstream(oper_lst[i + 1])
output
a: setting b as downstream
b: setting c as downstream
e: setting f as downstream
f: setting d as downstream
Upvotes: 0