Athena Wisdom
Athena Wisdom

Reputation: 6871

Perform SSH Connection inside Airflow's python_callable?

In an Airflow 1.10.10 DAG, we have a ShortCircuitOperator that uses the python function check_remote_server() to decide the branch.

In check_remote_server_data() function, how can we start an SSH connection to the remote server, run a bash command on it and get the results?

Is it possible to use the Airflow SSH Connection that I have previously defined using the Web UI?

def check_remote_server_data():
    pass   # how can we use a predefined Airflow SSH connection, named `remote`?

with dag:
    shortcircuitop = ShortCircuitOperator(
        task_id='shortcircuitop',
        python_callable=check_remote_server_data,
        dag=dag
    )

I am only able to do it using SSHOperator, but I need the results to be used to determine the short circuit conditions:

SSHOperator(
    task_id='sshop',
    ssh_conn_id='remote',
    command='date +%F',
dag=dag)

Upvotes: 0

Views: 1150

Answers (1)

joebeeson
joebeeson

Reputation: 4366

The SSHOperator supports an argument, do_xcom_push to pass the output of the command as an XCom value that you can access in your ShortCircuitOperator:

def check_remote_server_data(**context):
    xcom_stdout = context["task_instance"].xcom_pull(task_ids="ssh_task_id")

with dag:
    ssh_operator = SSHOperator(task_id="ssh_task_id", do_xcom_push=True, ...)

    shortcircuitop = ShortCircuitOperator(
        task_id='shortcircuitop',
        python_callable=check_remote_server_data,
        provide_context=True,
        dag=dag
    )

    ssh_operator >> shortcircuitop >> [other_task0, other_task1, ...]

Upvotes: 1

Related Questions