Reputation: 403
I have a DAG that needs to recompile this customer lists in various brands. The script is called with two arguments brand and listtype.
I need the brands to run concurrently, but the list types to be dependent on the preceding list type, but I can't figure out how to do that in a loop. Can ya'll help me out?
BrandsToRun = ['A', 'B', 'C']
ListTypes = ['1', '2', '3']
# Defining the DAG
################################################################################
with DAG(
'MusterMaster',
default_args = default_args,
description = 'x',
# schedule_interval = None
schedule_interval = '30 4 * * *',
catchup = False
) as MusterMaster:
for Brand in BrandsToRun:
for ListType in ListTypes:
ListLoad = BashOperator(
task_id='Load_'+str(Brand)+'_'+str(ListType),
bash_command = """python3 '/usr/local/bin/MusterMaster.py' {0} {1}""".format(Brand[0], ListType[0]),
pool='logs'
)
ListLoad
I want the tasks to have dependency structure like this, but I can't figure it out. Brand should run concurrently, but ListTypes should be dependent on the preceding ListType.
Muster A 1 >> Muster A 2 >> Muster A 3
Muster B 1 >> Muster B 2 >> Muster B 3
Muster C 1 >> Muster C 2 >> Muster C 3
How can I best accomplish this?
Upvotes: 2
Views: 2695
Reputation: 16099
You can do:
for Brand in BrandsToRun:
list = []
for ListType in ListTypes:
list.append(BashOperator(
task_id='Load_'+str(Brand)+'_'+str(ListType),
bash_command = """python3 '/usr/local/bin/MusterMaster.py' {0} {1}""".format(Brand[0], ListType[0]),
pool='logs'))
if len(list) > 1:
list[-2] >> list[-1]
Which will give you:
Upvotes: 2