Reputation: 11
I am trying to copy files from SFTP to google cloud storage.
While executing getting exception No module named 'airflow.providers.sftp'
.
Much appreciated if some one can give pointers.
Code snippet is :
import os
import airflow
from airflow import DAG
from airflow import models
from airflow.operators import python_operator
from airflow.providers.google.cloud.transfers.sftp_to_gcs import SFTPToGCSOperator
from airflow.utils.dates import days_ago
with models.DAG("test_ssh_to_gcs", start_date=days_ago(1), schedule_interval=None) as dag:
copy_file_from_ssh_to_gcs = SFTPToGCSOperator(
task_id="file-copy-ssh-to-gcs",
source_path="/ ",
destination_bucket='test_sftp_to_gcs',
destination_path="test/test.csv",
gcp_conn_id="google_cloud_default",
sftp_conn_id="sftp_test",
)
copy_file_from_ssh_to_gcs
Upvotes: 1
Views: 10995
Reputation: 33
Install dependency via pip.
pip install apache-airflow-providers-sftp
Upvotes: 1
Reputation: 21
With airflow 1.10 you can install backported packages
For your case the following need to be added to your composer cluster:
1- apache-airflow-backport-providers-google
2- apache-airflow-backport-providers-sftp
3- apache-airflow-backport-providers-ssh
Upvotes: 0
Reputation: 3955
You get the error because SFTPToGCSOperator uses airflow.providers.sftp.operators.SFTPOperator
under the hood, which present in airflow >= 2.0.0.
The bad news is need to upgrade your Airflow version to use airflow.providers.google.cloud.transfers.sftp_to_gcs.SFTPToGCSOperator
.
If you don't want / cannot upgrade airflow, you can create a DAG chaining two operators:
Operator | Airflow 1.x import |
---|---|
SFTP Operator (download a file to a local using 'operation=get') | from airflow.contrib.operators.sftp_operator import SFTPOperator |
Upload File to Google cloud storage | from airflow.contrib.operators.file_to_gcs import FileToGoogleCloudStorageOperator |
This should do the trick:
LOCALFILE = '/tmp/kk'
with models.DAG("test_ssh_to_gcs", start_date=days_ago(1), schedule_interval=None) as dag:
download_sftp = SFTPOperator(
task_id = 'part1_sftp_download_to_local',
ssh_conn_id="sftp_test",
local_file=LOCALFILE,
remote_file='',
operation='get')
gcp_upload = FileToGoogleCloudStorageOperator(
task_id='part2_upload_to_gcs',
bucket='test_sftp_to_gcs',
src=LOCALFILE,
dst="test/test.csv",
google_cloud_storage_conn_id="google_cloud_default" # configured in Airflow
)
sftp_download >> gcp_upload
Upvotes: 0
Reputation: 760
First, have you tried installing the package with pip install apache-airflow-providers-sftp
?
Be also careful about the documentation version you are refering to. With Airflow 2.0, some packages have been moved.
Upvotes: 1