Reputation: 395
I just began learning Airflow, but it is quite difficult to grasp the concept of Xcom. Therefore I wrote a dag like this:
from airflow import DAG
from airflow.utils.edgemodifier import Label
from datetime import datetime
from datetime import timedelta
from airflow.operators.bash import BashOperator
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.contrib.hooks.ssh_hook import SSHHook
#For more default argument for a task (or creating templates), please check this website
#https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/index.html#airflow.models.BaseOperator
default_args = {
'owner': '...',
'email': ['...'],
'email_on_retry': False,
'email_on_failure': True,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'start_date': datetime(2021, 6, 10, 23, 0, 0, 0),
}
hook = SSHHook(
remote_host='...',
username='...',
password='...## Heading ##',
port=22,
)
with DAG(
'test_dag',
description='This is my first DAG to learn BASH operation, SSH connection, and transfer data among jobs',
default_args=default_args,
start_date=datetime(2021, 6, 10, 23, 0, 0, 0),
schedule_interval="0 * * * *",
tags = ['Testing', 'Tutorial'],
) as dag:
# Declare Tasks
Read_my_IP = BashOperator(
# Task ID has to be the combination of alphanumeric chars, dashes, dots, and underscores
task_id='Read_my_IP',
# The last line will be pushed to next task
bash_command="hostname -i | awk '{print $1}'",
)
Read_remote_IP = SSHOperator(
task_id='Read_remote_IP',
ssh_hook=hook,
environment={
'Pi_IP': Read_my_IP.xcom_pull('Read_my_IP'),
},
command="echo {{Pi_IP}}",
)
# Declare Relationship between tasks
Read_my_IP >> Label("PI's IP address") >> Read_remote_IP
The first task ran successfully, but I could not obtain the XCom return_value from task Read_my_IP
, which is the IP address of the local machine. This might be very basic, but the documentation does not mention how to declare task_instance
.
Please help to complete the Xcom flow and pass the IP address from the local machine to the remote machine for further procedure.
Upvotes: 5
Views: 11060
Reputation: 16139
The command
parameter of SSHOperator
is templated thus you can get the xcom directly:
Read_remote_IP = SSHOperator(
task_id='Read_remote_IP',
ssh_hook=hook,
command="echo {{ ti.xcom_pull(task_ids='Read_my_IP') }}"
)
Note that you need also to explicitly ask for xcom to be pushed from BashOperator
(see operator description):
Read_my_IP = BashOperator(
# Task ID has to be the combination of alphanumeric chars, dashes, dots, and underscores
task_id='Read_my_IP',
# The last line will be pushed to next task
bash_command="hostname -i | awk '{print $1}'",
do_xcom_push=True
)
Upvotes: 5