Reputation: 2592
I read this How to use airflow xcoms with MySqlOperator and while it has a similiar title it doesn't really address my issue.
I have the following code:
def branch_func_is_new_records(**kwargs):
ti = kwargs['ti']
xcom = ti.xcom_pull(task_ids='query_get_max_order_id')
string_to_print = 'Value in xcom is: {}'.format(xcom)
logging.info(string_to_print)
if int(xcom) > int(LAST_IMPORTED_ORDER_ID)
return 'import_orders'
else:
return 'skip_operation'
query_get_max_order_id = 'SELECT COALESCE(max(orders_id),0) FROM warehouse.orders where orders_id>1 limit 10'
get_max_order_id = MySqlOperator(
task_id='query_get_max_order_id',
sql= query_get_max_order_id,
mysql_conn_id=MyCon,
xcom_push=True,
dag=dag)
branch_op_is_new_records = BranchPythonOperator(
task_id='branch_operation_is_new_records',
provide_context=True,
python_callable=branch_func_is_new_records,
dag=dag)
get_max_order_id >> branch_op_is_new_records >> import_orders
branch_op_is_new_records >> skip_operation
The MySqlOperator
returns a number according to the number the BranchPythonOperator
choose the next task. It's guaranteed that the MySqlOperator
has returned value greater than 0
.
My problem is that nothing is pushed to XCOM
by the MySqlOperator
On the UI when I go to XCOM
I see nothing. The BranchPythonOperator
oviously reads nothing so my code fails.
Why the XCOM
doesn't work here?
Upvotes: 3
Views: 6853
Reputation: 4048
The MySQL operator currently (airflow 1.10.0 at time of writing) doesn't support returning anything in XCom, so the fix for you for now is to write a small operator yourself. You can do this directly in your DAG file (untested, so there may be silly errors):
from airflow.operators.mysql_operator import MySqlOperator as BaseMySqlOperator
from airflow.hooks.mysql_hook import MySqlHook
class ReturningMySqlOperator(BaseMySqlOperator):
def execute(self, context):
self.log.info('Executing: %s', self.sql)
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
schema=self.database)
return hook.get_first(
self.sql,
parameters=self.parameters)
def branch_func_is_new_records(**kwargs):
ti = kwargs['ti']
xcom = ti.xcom_pull(task_ids='query_get_max_order_id')
string_to_print = 'Value in xcom is: {}'.format(xcom)
logging.info(string_to_print)
if str(xcom) == 'NewRecords':
return 'import_orders'
else:
return 'skip_operation'
query_get_max_order_id = 'SELECT COALESCE(max(orders_id),0) FROM warehouse.orders where orders_id>1 limit 10'
get_max_order_id = ReturningMySqlOperator(
task_id='query_get_max_order_id',
sql= query_get_max_order_id,
mysql_conn_id=MyCon,
# xcom_push=True,
dag=dag)
branch_op_is_new_records = BranchPythonOperator(
task_id='branch_operation_is_new_records',
provide_context=True,
python_callable=branch_func_is_new_records,
dag=dag)
get_max_order_id >> branch_op_is_new_records >> import_orders
branch_op_is_new_records >> skip_operation
Upvotes: 5