Programmer120
Programmer120

Reputation: 2592

How to perform S3 to BigQuery using Airflow?

Currently there is no S3ToBigQuery operator.

My choices are:

  1. Use the S3ToGoogleCloudStorageOperator and then use the GoogleCloudStorageToBigQueryOperator

    This is not something i'm eager to do. This means paying double for storage. Even if removing the file from either one of the storage that still involves payment.

  2. Download the file from S3 to local file system and load it to BigQuery from file system - However there is no S3DownloadOperator This means writing the whole process from scratch without Airflow involvement. This misses the point of using Airflow.

Is there another option? What would you suggest to do?

Upvotes: 1

Views: 3437

Answers (3)

Damon Cool
Damon Cool

Reputation: 131

This is what I ended up with. This should be converted to a S3toLocalFile Operator.

def download_from_s3(**kwargs):
    hook = S3Hook(aws_conn_id='project-s3')    

    result = hook.read_key(bucket_name='stage-project-metrics',
                           key='{}.csv'.format(kwargs['ds']))

    if not result:
        logging.info('no data found')
    else:
        outfile = '{}project{}.csv'.format(Variable.get("data_directory"),kwargs['ds'])

        f=open(outfile,'w+')
        f.write(result)
        f.close()

    return result

Upvotes: 1

kaxil
kaxil

Reputation: 18844

What you can do instead is use S3ToGoogleCloudStorageOperator and then use GoogleCloudStorageToBigQueryOperator with an external_table table flag i.e pass external_table =True.

This will create an external data that points to GCS location and doesn't store your data in BigQuery but you can still query it.

Upvotes: 0

Viraj Parekh
Viraj Parekh

Reputation: 1381

If the first option is cost restrictive, you could just use the S3Hook to download the file through the PythonOperator:

from airflow.hooks.S3_hook import S3Hook
from datetime import timedelta, datetime
from airflow import DAG
from airflow.hooks.S3_hook import S3Hook
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0
}
def download_from_s3(**kwargs):


    hook = S3Hook(aws_conn_id='s3_conn')

    hook.read_key(bucket_name='workflows-dev',
                   key='test_data.csv')

dag = DAG('s3_download',
          schedule_interval='@daily',
          default_args=default_args,
          catchup=False)

with dag:
download_data = PythonOperator(
        task_id='download_data',
        python_callable=download_from_s3,
        provide_context=True
    )

Upvotes: 0

Related Questions