Himanshu Sharma
Himanshu Sharma

Reputation: 130

How to copy all the files present in a GCS bucket to LINUX VM using airflow?

I have a requirement to copy all the files present in a bucket irrespective of folders or sub folders it's present in, to a Linux VM. However, when the files land in VM, they should be copied to a shared folder at the same directory level.

e.g In GCS

Bucket name: landing-gcp-cloud-12gd45

file1: gs://landing-gcp-cloud-12gd45/file1/file1.csv

file2: gs://landing-gcp-cloud-12gd45/file2.csv

file3: gs://landing-gcp-cloud-12gd45/file3/file3.txt
...

In Linux

Destination Directory: files

file1: files/file1.csv

file2: files/file2.csv

file3: files/file3.txt
...

Upvotes: 0

Views: 1170

Answers (2)

Anjela B
Anjela B

Reputation: 1201

To transfer GCS object to Cloud Composer local, you can use GCSToLocalFilesystemOperator. After locally copied, you may use sftpoperator to transfer to your remote server.

See working DAG below:

import os
from datetime import datetime
from airflow.contrib.operators.sftp_operator import SFTPOperator
from airflow import DAG
from pathlib import Path

from airflow import models
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.utils.trigger_rule import TriggerRule

ENV_ID = os.environ.get("cloud-composer-env-id")

PROJECT_ID = os.environ.get("your-project")

DAG_ID = "gcs_upload_download"

BUCKET_NAME = f"your-bucket"

FILE_NAME = "your-object"    

PATH_TO_SAVED_FILE = "/home/airflow/gcs/data/<your-object>"

REMOTE_FILE_PATH = "/<your_local directory>/<your-object>"



with models.DAG(
    DAG_ID,
    schedule_interval='@once',
    start_date=datetime(2022,6,15),
    catchup=False,
    tags=["gcs", "example"],
) as dag:

    # [START howto_operator_gcs_download_file_task]
    download_file = GCSToLocalFilesystemOperator(
        task_id="download_file",
        object_name=FILE_NAME,
        bucket=BUCKET_NAME,
        filename=PATH_TO_SAVED_FILE,
    )
    # [END howto_operator_gcs_download_file_task]
    
    put_file = SFTPOperator(
    task_id="test_sftp",
    ssh_conn_id="20220615_connection",
    local_filepath=PATH_TO_SAVED_FILE,
    remote_filepath=REMOTE_FILE_PATH,
    operation="put",
    create_intermediate_dirs=True,
    dag=dag
    )
    
    download_file >> put_file

Setting up your connection_id, you can put the ssh key in the data directory and put it in the Extra field. Make sure also that you allow ssh connections on port 22 by creating firewall rules to avoid connection error. See below working sample connection_id:

enter image description here

Output:

enter image description here

Logs:

[2022-06-16, 02:18:26 UTC] {gcs.py:328} INFO - File downloaded to /home/airflow/gcs/data/monkey.jpg
[2022-06-16, 02:18:26 UTC] {taskinstance.py:1279} INFO - Marking task as SUCCESS. dag_id=gcs_upload_download, task_id=download_file, execution_date=20220616T021822, start_date=20220616T021824, end_date=20220616T021826
[2022-06-16, 02:18:27 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-06-16, 02:18:27 UTC] {local_task_job.py:264} INFO - 1 downstream tasks scheduled from follow-on schedule check

[2022-06-16, 02:18:31 UTC] {transport.py:1874} INFO - Connected (version 2.0, client OpenSSH_8.4p1)
[2022-06-16, 02:18:31 UTC] {transport.py:1874} INFO - Authentication (publickey) successful!
[2022-06-16, 02:18:31 UTC] {sftp.py:158} INFO - [chan 0] Opened sftp connection (server version 3)
[2022-06-16, 02:18:31 UTC] {sftp.py:143} INFO - Starting to transfer file from /home/airflow/gcs/data/monkey.jpg to <my-path>/monkey.jpg
[2022-06-16, 02:18:31 UTC] {taskinstance.py:1279} INFO - Marking task as SUCCESS. dag_id=gcs_upload_download, task_id=test_sftp, execution_date=20220616T021822, start_date=20220616T021829, end_date=20220616T021831
[2022-06-16, 02:18:31 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-06-16, 02:18:31 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check

Transferred file in Cloud Composer: enter image description here

Transferred file in Remote Server:

enter image description here

Upvotes: 1

teedak8s
teedak8s

Reputation: 780

Based on limited background provided(to be ran once a day), this is what i can recommend.

  • Create a dag with gcs hook as python operator.
    • Determine what all files have been modified(delta) based on updated_timestamp
  • Determine where to write:
    • Use SftpOperator to copy between remote server.
    • use gcp/aws python sdk to download the files to the webserver running airflow(if no remote copy is required).

If additional information is provided we can recommend something more concrete.

Upvotes: 0

Related Questions