arcee123
arcee123

Reputation: 211

How to use the s3 hook in airflow

I have an s3 folder location, that I am moving to GCS. I am using Airflow to make the movements happen.

In this environment, my s3 is an "ever growing" folder, meaning we do not delete files after we get them.

def GetFiles(**kwargs):
    foundfiles = False

    s3 = S3Hook(aws_conn_id='S3_BDEX')
    s3.get_conn()
    bucket = s3.get_bucket(
        bucket_name='/file.share.external.bdex.com/Offrs'
    )
    files = s3.list_prefixes(bucket_name='/file.share.external.bdex.com/Offrs')
    print("BUCKET:  {}".format(files))


check_for_file = BranchPythonOperator(
    task_id='Check_FTP_and_Download',
    provide_context=True,
    python_callable=GetFiles,
    dag=dag
)

What I need here is the list of files and their creation date/time. This way I can compare existing files to determine if they are new or not.

I know I can connect, because the function get_bucket function worked. However, in this case, I get the following errors:

Invalid bucket name "/file.share.external.bdex.com/Offrs": Bucket name must match the regex "^[a-zA-Z0-9.\-_]{1,255}$"

Thank you

Upvotes: 7

Views: 18012

Answers (1)

R Penumaka
R Penumaka

Reputation: 171

  1. The bucket name is wrong. If the url is s3://something/path/to/file, then the bucket name is "something".

Upvotes: 2

Related Questions