Reputation: 1963
Is there a way to ssh to different server and run BashOperator using Airbnb's Airflow? I am trying to run a hive sql command with Airflow but I need to SSH to a different box in order to run the hive shell. My tasks should look like this:
Thanks!
Upvotes: 38
Views: 74193
Reputation: 5611
Demo for Airflow 2.X:
First, create a Connection URI
# refer doc: https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html
# in airflow HOST, run bash cmd:
"""
airflow connections add 'ssh_dt17' --conn-uri 'ssh://[username]:[password]@192.168.1.17'
"""
Second, Demo dag code:
from airflow.decorators import dag, task
# from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, get_current_context
from airflow.operators.dummy import DummyOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
@dag(
default_args=default_args,
description='demo',
schedule_interval=None,
start_date=datetime(2022, 9, 20, tzinfo=tz),
catchup = False,
max_active_tasks = 1,
)
def demo_run_ssh_remote_cmd():
# ssh_conn_id
################################################################
ssh_dt17 = SSHHook(ssh_conn_id='ssh_dt17', remote_host='192.168.1.17')
# dt17 ssh run remote cmd
################################################################
cmd_logrotate =(r'''
/usr/sbin/logrotate -v -f /etc/logrotate.d/access_log_8am_8pm
''')
logrotate_ad = SSHOperator(
task_id='logrotate_ad',
command=cmd_logrotate,
ssh_hook=ssh_dt17,
max_active_tis_per_dag=1,
cmd_timeout = 60*5,
# trigger_rule="none_failed",
)
# =============================================================================================
start = DummyOperator(task_id="start")
start >> logrotate_ad
_ = demo_run_ssh_remote_cmd()
Upvotes: 1
Reputation: 1963
I think that I just figured it out:
Create a SSH connection in UI under Admin > Connection. Note: the connection will be deleted if you reset the database
In the Python file add the following
from airflow.contrib.hooks import SSHHook
sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
Add the SSH operator task
t1 = SSHExecuteOperator(
task_id="task1",
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
Thanks!
Upvotes: 49
Reputation: 1942
Here is a working example with the ssh operator in Airflow 2:
[BEWARE: the output of this operator is base64 encoded]
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
sshHook = SSHHook(ssh_conn_id="conn-id", key_file='/opt/airflow/keys/ssh.key')
# a hook can also be defined directly in the code:
# sshHook = SSHHook(remote_host='server.com', username='admin', key_file='/opt/airflow/keys/ssh.key')
ls = SSHOperator(
task_id="ls",
command= "ls -l",
ssh_hook = sshHook,
dag = dag)
The conn-id
is the one set in the Admin -> Connections.
The key_file
is the private ssh key.
Upvotes: 12
Reputation: 311
One thing to note with Anton's answer is that the argument is actually ssh_conn_id
, not conn_id
for the SSHOperator
object. At least in version 1.10.
A quick example would look like
from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'start_date': datetime.now() - timedelta(minutes=20),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(dag_id='testing_stuff',
default_args=default_args,
schedule_interval='0,10,20,30,40,50 * * * *',
dagrun_timeout=timedelta(seconds=120))
# Step 1 - Dump data from postgres databases
t1_bash = """
echo 'Hello World'
"""
t1 = SSHOperator(
ssh_conn_id='ssh_default',
task_id='test_ssh_operator',
command=t1_bash,
dag=dag)
Upvotes: 31