Reputation: 53
I need to use the output from oracleOperator in another task for further execution. The trouble I have is that when I pull the data into another task and print it, it gives a result as None. There is no error thrown but the data isn't passed through. Also, the xcom tab in task UI shows blank for keys and values.
My code is as follows:
from airflow import DAG
from airflow.operators.oracle_operator import OracleOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
args = {
'owner': 'xyz',
'start_date': days_ago(2),
}
dag = DAG('example_xcom', schedule_interval="@once", default_args=args, tags=['example'])
def puller(**kwargs):
ti = kwargs['ti']
# get value_1
pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
print("VALUE IN PULLER ")
print(pulled_value_1)
pull = PythonOperator(
task_id='pullee',
dag=dag,
python_callable=puller,
provide_context=True,
)
push = OracleOperator(
task_id='data',
sql='SELECT * FROM CUSTOMERS',
oracle_conn_id='1',
provide_context=True,
dag=dag,
)
push>>pull
Upvotes: 3
Views: 2843
Reputation: 18844
You can use the following code. Basically using PythonOperator
with OracleHook
.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.oracle_hook import OracleHook
from airflow.utils.dates import days_ago
args = {
'owner': 'xyz',
'start_date': days_ago(2),
}
dag = DAG('example_xcom', schedule_interval="@once", default_args=args, tags=['example'])
def puller(**kwargs):
ti = kwargs['ti']
# get value_1
pulled_value_1 = ti.xcom_pull(task_ids='data')
print("VALUE IN PULLER : ", pulled_value_1)
def get_data_from_oracle(**kwargs):
oracle_hook = OracleHook(oracle_conn_id=kwargs['oracle_conn_id'])
return oracle_hook.get_records(sql=kwargs['sql'])
push = PythonOperator(
task_id='data',
op_kwargs={'oracle_conn_id': 'oracle_conn_id', 'sql': 'SELECT * FROM CUSTOMERS'}
provide_context=True,
python_callable=get_data_from_oracle,
dag=dag,
)
pull = PythonOperator(
task_id='pullee',
dag=dag,
python_callable=puller,
provide_context=True,
)
push >> pull
Upvotes: 5
Reputation: 5243
Your DAG seems to be fine here. However, looking into the source code of airflow.operators.oracle_operator, you may realize that the main execute method is actually using OracleHook()
apart of airflow.hooks.dbapi_hook module extensions:
def execute(self, context):
self.log.info('Executing: %s', self.sql)
hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
hook.run(
self.sql,
autocommit=self.autocommit,
parameters=self.parameters)
And the culprit here is hook.run()
method that runs SQL query but returns nothing to the Xcom, thus xcom_pull()
method retrieves none records.
run(self, sql, autocommit=False, parameters=None)[source]
Runs a command or a list of commands. Pass a list of sql statements to the sql parameter to get them to execute sequentially
As a solution, you can create custom Airflow Operator duplicating source code of genuine OracleOperator()
and replacing hook.run()
with hook.get_records() method, then you will expect to execute the nested query, fetching resulting records for further pushing to the next task.
Hope you find this useful.
Upvotes: 0