dark horse
dark horse

Reputation: 3719

Airflow - Defining the key,value for a xcom_push function

I am trying to pass a Python function in Airflow. I am not sure what the key and values should be for a xcom_push function. Could anyone assist on this. Thanks

def db_log(**context):
  db_con = psycopg2.connect(" dbname = 'name' user = 'user' password = 'pass' host = 'host' port = '5439' sslmode = 'require' ")
  task_instance = context['task_instance']
  task_instance.xcom_push(key=db_con, value = db_log)
  return (db_con)

Could anyone assist in getting the correct key and value for the xcom_push function. Thanks..

Upvotes: 8

Views: 29154

Answers (4)

FraDel
FraDel

Reputation: 184

Instead of using xcom to connect to your DB I would recommend you use Connections : https://airflow.apache.org/howto/connection/index.html

Start by setting a connection to connect to your DB either from the command line with :

airflow connections -a --conn_id postgres_custom --conn_host <your-host> --conn_type postgres --conn_port 1234 --conn_login <username> --conn_password <password> --conn_extra {"sslmode": "require"}

Or directly from the UI. Here is some documentation on how to set up a postgres connection in airflow (works with other DB types as well): https://airflow.apache.org/howto/connection/postgres.html

Then you can query your database with some DAG :

DAG_ARGS = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}


DAG_ID = "Dummy_DAG"


with DAG(dag_id=DAG_ID,
         default_args=DAG_ARGS,
         schedule_interval=None) as dag:

    query_1 = PostgresOperator(
        task_id='POSTGRES_QUERY',
        postgres_conn_id='postgres_custom',
        sql= """SELECT COUNT(*) FROM TABLE A""",
        database="my-db",
        dag=dag,
    )

    query_2 = PostgresOperator(
        task_id='POSTGRES_QUERY_2',
        postgres_conn_id='postgres_custom',
        sql="""SELECT COUNT(*) FROM TABLE B""",
        database="my-db",
        dag=dag,
    )

    query_1 >> query_2

Upvotes: 0

Alex oladele
Alex oladele

Reputation: 69

This is a bit old, but from what I understand, if you are running db_log as a task, then returning db_con would automatically push it to the xcom.

You could then access it with {{ti.xcom_pull(task_ids='TASK_NAME_HERE')}}

Upvotes: 1

Rahul
Rahul

Reputation: 767

Refer the below example:

Hope that would help.

args = {
    'owner': 'airflow',
    'start_date': start_date
}

dag = DAG(dag_id = 'test_dag', schedule_interval=None, default_args=args)
y = 0

def LoadYaml(**kwargs):
        y = 'df-12345567789'
        kwargs['ti'].xcom_push(key='name',value=y)
        return True

def CreatePipeLine(**kwargs):
        print("I m client")

def ActivatePipeline(client,pipelineId):
        print("activated", client, pipelineId)

start_task = DummyOperator(task_id='Start_Task', dag=dag)

LoadYaml_task = ShortCircuitOperator(task_id='LoadYaml_task',provide_context=True,python_callable=LoadYaml,dag=dag)

start_task.set_downstream(LoadYaml_task)

CreatePipeLine_task = ShortCircuitOperator(task_id='CreatePipeLine_task',provide_context=True,python_callable=CreatePipeLine,op_kwargs = {'client' : 'HeyImclient'},dag=dag)

LoadYaml_task.set_downstream(CreatePipeLine_task)

ActivatePipeline_task= ShortCircuitOperator(task_id='ActivatePipeline_task',provide_context=True,python_callable=ActivatePipeline,op_kwargs = {'client' : 'You','pipelineId' : '1234'},dag=dag)

CreatePipeLine_task.set_downstream(ActivatePipeline_task)

Upvotes: 0

tobi6
tobi6

Reputation: 8239

In examples the correct way of calling can be found, e.g.: https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_xcom.py

So here it should be

task_instance.xcom_push(key=<string identifier>, value=<actual value / object>)

In your case

task_instance.xcom_push(key="db_con", value=db_con)

Upvotes: 10

Related Questions