Reputation: 130
I have a requirement to copy all the files present in a bucket irrespective of folders or sub folders it's present in, to a Linux VM. However, when the files land in VM, they should be copied to a shared folder at the same directory level.
e.g In GCS
Bucket name: landing-gcp-cloud-12gd45
file1: gs://landing-gcp-cloud-12gd45/file1/file1.csv
file2: gs://landing-gcp-cloud-12gd45/file2.csv
file3: gs://landing-gcp-cloud-12gd45/file3/file3.txt
...
In Linux
Destination Directory: files
file1: files/file1.csv
file2: files/file2.csv
file3: files/file3.txt
...
Upvotes: 0
Views: 1170
Reputation: 1201
To transfer GCS object to Cloud Composer local, you can use GCSToLocalFilesystemOperator. After locally copied, you may use sftpoperator to transfer to your remote server.
See working DAG below:
import os
from datetime import datetime
from airflow.contrib.operators.sftp_operator import SFTPOperator
from airflow import DAG
from pathlib import Path
from airflow import models
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("cloud-composer-env-id")
PROJECT_ID = os.environ.get("your-project")
DAG_ID = "gcs_upload_download"
BUCKET_NAME = f"your-bucket"
FILE_NAME = "your-object"
PATH_TO_SAVED_FILE = "/home/airflow/gcs/data/<your-object>"
REMOTE_FILE_PATH = "/<your_local directory>/<your-object>"
with models.DAG(
DAG_ID,
schedule_interval='@once',
start_date=datetime(2022,6,15),
catchup=False,
tags=["gcs", "example"],
) as dag:
# [START howto_operator_gcs_download_file_task]
download_file = GCSToLocalFilesystemOperator(
task_id="download_file",
object_name=FILE_NAME,
bucket=BUCKET_NAME,
filename=PATH_TO_SAVED_FILE,
)
# [END howto_operator_gcs_download_file_task]
put_file = SFTPOperator(
task_id="test_sftp",
ssh_conn_id="20220615_connection",
local_filepath=PATH_TO_SAVED_FILE,
remote_filepath=REMOTE_FILE_PATH,
operation="put",
create_intermediate_dirs=True,
dag=dag
)
download_file >> put_file
Setting up your connection_id, you can put the ssh key in the data directory and put it in the Extra
field. Make sure also that you allow ssh connections on port 22 by creating firewall rules to avoid connection error. See below working sample connection_id:
Output:
Logs:
[2022-06-16, 02:18:26 UTC] {gcs.py:328} INFO - File downloaded to /home/airflow/gcs/data/monkey.jpg
[2022-06-16, 02:18:26 UTC] {taskinstance.py:1279} INFO - Marking task as SUCCESS. dag_id=gcs_upload_download, task_id=download_file, execution_date=20220616T021822, start_date=20220616T021824, end_date=20220616T021826
[2022-06-16, 02:18:27 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-06-16, 02:18:27 UTC] {local_task_job.py:264} INFO - 1 downstream tasks scheduled from follow-on schedule check
[2022-06-16, 02:18:31 UTC] {transport.py:1874} INFO - Connected (version 2.0, client OpenSSH_8.4p1)
[2022-06-16, 02:18:31 UTC] {transport.py:1874} INFO - Authentication (publickey) successful!
[2022-06-16, 02:18:31 UTC] {sftp.py:158} INFO - [chan 0] Opened sftp connection (server version 3)
[2022-06-16, 02:18:31 UTC] {sftp.py:143} INFO - Starting to transfer file from /home/airflow/gcs/data/monkey.jpg to <my-path>/monkey.jpg
[2022-06-16, 02:18:31 UTC] {taskinstance.py:1279} INFO - Marking task as SUCCESS. dag_id=gcs_upload_download, task_id=test_sftp, execution_date=20220616T021822, start_date=20220616T021829, end_date=20220616T021831
[2022-06-16, 02:18:31 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-06-16, 02:18:31 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
Transferred file in Cloud Composer:
Transferred file in Remote Server:
Upvotes: 1
Reputation: 780
Based on limited background provided(to be ran once a day), this is what i can recommend.
updated_timestamp
If additional information is provided we can recommend something more concrete.
Upvotes: 0