Reputation: 3719
I am trying to pass a Python function in Airflow. I am not sure what the key and values should be for a xcom_push function. Could anyone assist on this. Thanks
def db_log(**context):
db_con = psycopg2.connect(" dbname = 'name' user = 'user' password = 'pass' host = 'host' port = '5439' sslmode = 'require' ")
task_instance = context['task_instance']
task_instance.xcom_push(key=db_con, value = db_log)
return (db_con)
Could anyone assist in getting the correct key and value for the xcom_push function. Thanks..
Upvotes: 8
Views: 29154
Reputation: 184
Instead of using xcom to connect to your DB I would recommend you use Connections : https://airflow.apache.org/howto/connection/index.html
Start by setting a connection to connect to your DB either from the command line with :
airflow connections -a --conn_id postgres_custom --conn_host <your-host> --conn_type postgres --conn_port 1234 --conn_login <username> --conn_password <password> --conn_extra {"sslmode": "require"}
Or directly from the UI. Here is some documentation on how to set up a postgres connection in airflow (works with other DB types as well): https://airflow.apache.org/howto/connection/postgres.html
Then you can query your database with some DAG :
DAG_ARGS = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
DAG_ID = "Dummy_DAG"
with DAG(dag_id=DAG_ID,
default_args=DAG_ARGS,
schedule_interval=None) as dag:
query_1 = PostgresOperator(
task_id='POSTGRES_QUERY',
postgres_conn_id='postgres_custom',
sql= """SELECT COUNT(*) FROM TABLE A""",
database="my-db",
dag=dag,
)
query_2 = PostgresOperator(
task_id='POSTGRES_QUERY_2',
postgres_conn_id='postgres_custom',
sql="""SELECT COUNT(*) FROM TABLE B""",
database="my-db",
dag=dag,
)
query_1 >> query_2
Upvotes: 0
Reputation: 69
This is a bit old, but from what I understand, if you are running db_log as a task, then returning db_con would automatically push it to the xcom.
You could then access it with {{ti.xcom_pull(task_ids='TASK_NAME_HERE')}}
Upvotes: 1
Reputation: 767
Refer the below example:
Hope that would help.
args = {
'owner': 'airflow',
'start_date': start_date
}
dag = DAG(dag_id = 'test_dag', schedule_interval=None, default_args=args)
y = 0
def LoadYaml(**kwargs):
y = 'df-12345567789'
kwargs['ti'].xcom_push(key='name',value=y)
return True
def CreatePipeLine(**kwargs):
print("I m client")
def ActivatePipeline(client,pipelineId):
print("activated", client, pipelineId)
start_task = DummyOperator(task_id='Start_Task', dag=dag)
LoadYaml_task = ShortCircuitOperator(task_id='LoadYaml_task',provide_context=True,python_callable=LoadYaml,dag=dag)
start_task.set_downstream(LoadYaml_task)
CreatePipeLine_task = ShortCircuitOperator(task_id='CreatePipeLine_task',provide_context=True,python_callable=CreatePipeLine,op_kwargs = {'client' : 'HeyImclient'},dag=dag)
LoadYaml_task.set_downstream(CreatePipeLine_task)
ActivatePipeline_task= ShortCircuitOperator(task_id='ActivatePipeline_task',provide_context=True,python_callable=ActivatePipeline,op_kwargs = {'client' : 'You','pipelineId' : '1234'},dag=dag)
CreatePipeLine_task.set_downstream(ActivatePipeline_task)
Upvotes: 0
Reputation: 8239
In examples the correct way of calling can be found, e.g.: https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_xcom.py
So here it should be
task_instance.xcom_push(key=<string identifier>, value=<actual value / object>)
In your case
task_instance.xcom_push(key="db_con", value=db_con)
Upvotes: 10