Reputation: 1327
There are parts of my DAG that generate lists which I cannot break up into individual tasks to be processed individually downstream.
Here's a pseudo example:
def push(**kwargs):
# """Pushes an XCom without a specific target"""
for n in range(10):
kwargs['ti'].xcom_push(key=f'vals', value=n)
def puller(**kwargs):
ti = kwargs['ti']
v1 = ti.xcom_pull(key='vals', task_ids='push')
print(v1)
push = python_operator.PythonOperator(
task_id='push',
python_callable=push,
provide_context=True
)
puller = python_operator.PythonOperator(
task_id='puller',
python_callable=puller,
provide_context=True
)
It appears the xcom_push only uses the last value, not generates a list. Therefore, I'd have to load the values in push into a list then use a for loop in the pull to process each item individually.
I'm completely fine doing that but it seems counter-intuitive for doing batch jobs.
How would I have the puller pull one of the 10 tasks generated by push?
Upvotes: 1
Views: 2664
Reputation: 45361
Between DAG runs, you shouldn't be changing the structure of the DAG, so your puller is either one task, meant to pull all the values, or 10 tasks each meant to pull one of the values.
Here's how you'd push all 10 values with xcom:
def push(**kwargs):
# """Pushes an XCom without a specific target"""
final_output = []
for n in range(10):
# doing work
final_output.append(n)
kwargs['ti'].xcom_push(key=f'vals', value=final_output)
push = python_operator.PythonOperator(
task_id='push',
python_callable=push,
provide_context=True
)
And then you can either pull all 10 of them like this
def puller(**kwargs):
ti = kwargs['ti']
v1 = ti.xcom_pull(key='vals', task_ids='push')
print(v1) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
puller = python_operator.PythonOperator(
task_id='puller',
python_callable=puller,
provide_context=True
)
Or one value for each of ten tasks:
def puller(index=0, **kwargs):
ti = kwargs['ti']
v1 = ti.xcom_pull(key='vals', task_ids='push')[index]
print(v1)
ten_ops = [python_operator.PythonOperator(
task_id=f'puller_{n}',
python_callable=puller,
provide_context=True,
op_kwargs={'index': n},
) for n in range(10)]
I hope that helps, unless I misunderstood the question.
Upvotes: 4