Reputation: 1743
I made a custom airflow operator, this operator takes an input and the output of this operator is on XCOM.
What I want to achieve is to call the operator with some defined input, parse the output as Python callable inside the Branch Operator and then pass the parsed output to another task that calls the same operator tree:
CustomOperator_Task1 = CustomOperator(
data={
'type': 'custom',
'date': '2017-11-12'
},
task_id='CustomOperator_Task1',
dag=dag)
data = {}
def checkOutput(**kwargs):
result = kwargs['ti'].xcom_pull(task_ids='CustomOperator_Task1')
if result.success = True:
data = result.data
return "CustomOperator_Task2"
return "Failure"
BranchOperator_Task = BranchPythonOperator(
task_id='BranchOperator_Task ',
dag=dag,
python_callable=checkOutput,
provide_context=True,
trigger_rule="all_done")
CustomOperator_Task2 = CustomOperator(
data= data,
task_id='CustomOperator_Task2',
dag=dag)
CustomOperator_Task1 >> BranchOperator_Task >> CustomOperator_Task2
In task CustomOperator_Task2
I would want to pass the parsed data from BranchOperator_Task
. Right now it is always empty {}
What is the best way to do that?
Upvotes: 4
Views: 4081
Reputation: 6548
I see your issue now. Setting the data
variable like you are won't work because of how Airflow works. An entirely different process will be running the next task, so it won't have the context of what data
was set to.
Instead, BranchOperator_Task
has to push the parsed output into another XCom so CustomOperator_Task2
can explicitly fetch it.
def checkOutput(**kwargs):
ti = kwargs['ti']
result = ti.xcom_pull(task_ids='CustomOperator_Task1')
if result.success:
ti.xcom_push(key='data', value=data)
return "CustomOperator_Task2"
return "Failure"
BranchOperator_Task = BranchPythonOperator(
...)
CustomOperator_Task2 = CustomOperator(
data_xcom_task_id=BranchOperator_Task.task_id,
data_xcom_key='data',
task_id='CustomOperator_Task2',
dag=dag)
Then your operator might look something like this.
class CustomOperator(BaseOperator):
@apply_defaults
def __init__(self, data_xcom_task_id, data_xcom_key, *args, **kwargs):
self.data_xcom_task_id = data_xcom_task_id
self.data_xcom_key = data_xcom_key
def execute(self, context):
data = context['ti'].xcom_pull(task_ids=self.data_xcom_task_id, key=self.data_xcom_key)
...
Parameters may not be required if you just want to hardcode them. It depends on your use case.
Upvotes: 2
Reputation: 2903
As your comment suggests, the return value from your custom operator is None, therefore your xcom_pull should expect to be empty. Please use xcom_push explicitly, as the default behavior of airflow could change over time.
Upvotes: 0