Programmer120
Programmer120

Reputation: 2592

How to use MySqlOperator with xcom in Airflow?

I read this How to use airflow xcoms with MySqlOperator and while it has a similiar title it doesn't really address my issue.

I have the following code:

def branch_func_is_new_records(**kwargs):
    ti = kwargs['ti']
    xcom = ti.xcom_pull(task_ids='query_get_max_order_id')
    string_to_print = 'Value in xcom is: {}'.format(xcom)
    logging.info(string_to_print)
    if int(xcom) > int(LAST_IMPORTED_ORDER_ID)
        return 'import_orders'
    else:
        return 'skip_operation'

query_get_max_order_id  = 'SELECT COALESCE(max(orders_id),0) FROM warehouse.orders where orders_id>1 limit 10'
get_max_order_id = MySqlOperator(
        task_id='query_get_max_order_id',
        sql= query_get_max_order_id,
        mysql_conn_id=MyCon,
        xcom_push=True,
        dag=dag)

branch_op_is_new_records = BranchPythonOperator(
    task_id='branch_operation_is_new_records',
    provide_context=True,
    python_callable=branch_func_is_new_records,
    dag=dag)

get_max_order_id >> branch_op_is_new_records >> import_orders
branch_op_is_new_records >> skip_operation

The MySqlOperator returns a number according to the number the BranchPythonOperator choose the next task. It's guaranteed that the MySqlOperator has returned value greater than 0.

My problem is that nothing is pushed to XCOM by the MySqlOperator On the UI when I go to XCOM I see nothing. The BranchPythonOperator oviously reads nothing so my code fails.

Why the XCOM doesn't work here?

Upvotes: 3

Views: 6853

Answers (1)

Ash Berlin-Taylor
Ash Berlin-Taylor

Reputation: 4048

The MySQL operator currently (airflow 1.10.0 at time of writing) doesn't support returning anything in XCom, so the fix for you for now is to write a small operator yourself. You can do this directly in your DAG file (untested, so there may be silly errors):

from airflow.operators.mysql_operator import MySqlOperator as BaseMySqlOperator
from airflow.hooks.mysql_hook import MySqlHook

class ReturningMySqlOperator(BaseMySqlOperator):
    def execute(self, context):
        self.log.info('Executing: %s', self.sql)
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
                         schema=self.database)
        return hook.get_first(
            self.sql,
            parameters=self.parameters)


def branch_func_is_new_records(**kwargs):
    ti = kwargs['ti']
    xcom = ti.xcom_pull(task_ids='query_get_max_order_id')
    string_to_print = 'Value in xcom is: {}'.format(xcom)
    logging.info(string_to_print)
    if str(xcom) == 'NewRecords':
        return 'import_orders'
    else:
        return 'skip_operation'

query_get_max_order_id  = 'SELECT COALESCE(max(orders_id),0) FROM warehouse.orders where orders_id>1 limit 10'
get_max_order_id = ReturningMySqlOperator(
        task_id='query_get_max_order_id',
        sql= query_get_max_order_id,
        mysql_conn_id=MyCon,
        # xcom_push=True,
        dag=dag)

branch_op_is_new_records = BranchPythonOperator(
    task_id='branch_operation_is_new_records',
    provide_context=True,
    python_callable=branch_func_is_new_records,
    dag=dag)

get_max_order_id >> branch_op_is_new_records >> import_orders
branch_op_is_new_records >> skip_operation

Upvotes: 5

Related Questions