MrSir
MrSir

Reputation: 53

Send Output Of OracleOperator To Another Task In Airflow

I need to use the output from oracleOperator in another task for further execution. The trouble I have is that when I pull the data into another task and print it, it gives a result as None. There is no error thrown but the data isn't passed through. Also, the xcom tab in task UI shows blank for keys and values.

My code is as follows:

from airflow import DAG
from airflow.operators.oracle_operator import OracleOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'xyz',
    'start_date': days_ago(2),
}

dag = DAG('example_xcom', schedule_interval="@once", default_args=args, tags=['example'])


def puller(**kwargs):
    ti = kwargs['ti']
    # get value_1
    pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
    print("VALUE IN PULLER ")
    print(pulled_value_1)

pull = PythonOperator(
    task_id='pullee',
    dag=dag,
    python_callable=puller,
    provide_context=True,
)
push = OracleOperator(
    task_id='data',
    sql='SELECT * FROM CUSTOMERS', 
    oracle_conn_id='1',
    provide_context=True,
    dag=dag,
)


push>>pull

Upvotes: 3

Views: 2843

Answers (2)

kaxil
kaxil

Reputation: 18844

You can use the following code. Basically using PythonOperator with OracleHook.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.oracle_hook import OracleHook
from airflow.utils.dates import days_ago

args = {
    'owner': 'xyz',
    'start_date': days_ago(2),
}

dag = DAG('example_xcom', schedule_interval="@once", default_args=args, tags=['example'])


def puller(**kwargs):
    ti = kwargs['ti']
    # get value_1
    pulled_value_1 = ti.xcom_pull(task_ids='data')
    print("VALUE IN PULLER : ", pulled_value_1)


def get_data_from_oracle(**kwargs):
    oracle_hook = OracleHook(oracle_conn_id=kwargs['oracle_conn_id'])
    return oracle_hook.get_records(sql=kwargs['sql'])

push = PythonOperator(
    task_id='data',
    op_kwargs={'oracle_conn_id': 'oracle_conn_id', 'sql': 'SELECT * FROM CUSTOMERS'}
    provide_context=True,
    python_callable=get_data_from_oracle,
    dag=dag,
)

pull = PythonOperator(
    task_id='pullee',
    dag=dag,
    python_callable=puller,
    provide_context=True,
)


push >> pull

Upvotes: 5

Nick_Kh
Nick_Kh

Reputation: 5243

Your DAG seems to be fine here. However, looking into the source code of airflow.operators.oracle_operator, you may realize that the main execute method is actually using OracleHook() apart of airflow.hooks.dbapi_hook module extensions:

def execute(self, context):
  self.log.info('Executing: %s', self.sql)
  hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
  hook.run(
      self.sql,
      autocommit=self.autocommit,
      parameters=self.parameters)

And the culprit here is hook.run() method that runs SQL query but returns nothing to the Xcom, thus xcom_pull() method retrieves none records.

run(self, sql, autocommit=False, parameters=None)[source]

Runs a command or a list of commands. Pass a list of sql statements to the sql parameter to get them to execute sequentially

As a solution, you can create custom Airflow Operator duplicating source code of genuine OracleOperator() and replacing hook.run() with hook.get_records() method, then you will expect to execute the nested query, fetching resulting records for further pushing to the next task.

Hope you find this useful.

Upvotes: 0

Related Questions