Harold17
Harold17

Reputation: 67

Custom Operator XCom during run time in Airflow

I understand for PythonOperator/BashOperator we can use Xcom to communicate.

e.g.

def func(**context):
    context['task_instance'].xcom_pull()

However, I am wondering how to access xcom for a custom operator during run time.

My operator looks like this:

class ECHOXOperator(BaseOperator):
    @apply_defaults
    def __init__(self, x, *args, **kwargs):
        self.x = x
        super(ECHOXOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print(self.x)

So in my DAG:

I can do

task2 = ECHOXOperator(x = 'Hello")

And it works well. But how can I access x from an upstream task?

Something like:

def task1(**context):
    task_instance = context['task_instance']
    task_instance.xcom_push(key="x", value="Hello")

generate_data = PythonOperator(
    task_id="task1",
    python_callable=task1,
    dag=dag,
)

task2 = ECHOXOperator(x = task_instance.xcom_pull('task1', 'x'), provide_context=True)

task1 >> task2

This is not working because task_instance in ECHOXOperator is not defined.

Thanks

Upvotes: 3

Views: 1764

Answers (1)

kaxil
kaxil

Reputation: 18844

You should pass x as templated_fields in your Custom Operator.

class ECHOXOperator(BaseOperator):
    template_fields = ['x']

    @apply_defaults
    def __init__(self, x, *args, **kwargs):
        self.x = x
        super(ECHOXOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print(self.x)

And now you can do the following to get the value of Xcom passed in previous task:

def task1(**context):
    task_instance = context['task_instance']
    task_instance.xcom_push(key="x", value="Hello")

generate_data = PythonOperator(
    task_id="task1",
    python_callable=task1,
    dag=dag,
)

task2 = ECHOXOperator(x = "{{ ti.xcom_pull('task1', 'x') }}")

task1 >> task2

More information regarding templated_fields and Jinja templating: https://airflow.readthedocs.io/en/latest/concepts.html#id1

Upvotes: 5

Related Questions