Reputation: 67
I understand for PythonOperator/BashOperator
we can use Xcom to communicate.
e.g.
def func(**context):
context['task_instance'].xcom_pull()
However, I am wondering how to access xcom
for a custom operator during run time.
My operator looks like this:
class ECHOXOperator(BaseOperator):
@apply_defaults
def __init__(self, x, *args, **kwargs):
self.x = x
super(ECHOXOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print(self.x)
So in my DAG:
I can do
task2 = ECHOXOperator(x = 'Hello")
And it works well. But how can I access x from an upstream task?
Something like:
def task1(**context):
task_instance = context['task_instance']
task_instance.xcom_push(key="x", value="Hello")
generate_data = PythonOperator(
task_id="task1",
python_callable=task1,
dag=dag,
)
task2 = ECHOXOperator(x = task_instance.xcom_pull('task1', 'x'), provide_context=True)
task1 >> task2
This is not working because task_instance
in ECHOXOperator
is not defined.
Thanks
Upvotes: 3
Views: 1764
Reputation: 18844
You should pass x
as templated_fields in your Custom Operator.
class ECHOXOperator(BaseOperator):
template_fields = ['x']
@apply_defaults
def __init__(self, x, *args, **kwargs):
self.x = x
super(ECHOXOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print(self.x)
And now you can do the following to get the value of Xcom passed in previous task:
def task1(**context):
task_instance = context['task_instance']
task_instance.xcom_push(key="x", value="Hello")
generate_data = PythonOperator(
task_id="task1",
python_callable=task1,
dag=dag,
)
task2 = ECHOXOperator(x = "{{ ti.xcom_pull('task1', 'x') }}")
task1 >> task2
More information regarding templated_fields and Jinja templating: https://airflow.readthedocs.io/en/latest/concepts.html#id1
Upvotes: 5