Reputation: 2592
Currently there is no S3ToBigQuery operator.
My choices are:
Use the S3ToGoogleCloudStorageOperator and then use the GoogleCloudStorageToBigQueryOperator
This is not something i'm eager to do. This means paying double for storage. Even if removing the file from either one of the storage that still involves payment.
Download the file from S3
to local file system and load it to BigQuery from file system - However there is no S3DownloadOperator
This means writing the whole process from scratch without Airflow involvement. This misses the point of using Airflow.
Is there another option? What would you suggest to do?
Upvotes: 1
Views: 3437
Reputation: 131
This is what I ended up with. This should be converted to a S3toLocalFile Operator.
def download_from_s3(**kwargs):
hook = S3Hook(aws_conn_id='project-s3')
result = hook.read_key(bucket_name='stage-project-metrics',
key='{}.csv'.format(kwargs['ds']))
if not result:
logging.info('no data found')
else:
outfile = '{}project{}.csv'.format(Variable.get("data_directory"),kwargs['ds'])
f=open(outfile,'w+')
f.write(result)
f.close()
return result
Upvotes: 1
Reputation: 18844
What you can do instead is use S3ToGoogleCloudStorageOperator
and then use GoogleCloudStorageToBigQueryOperator
with an external_table
table flag i.e pass external_table =True
.
This will create an external data that points to GCS location and doesn't store your data in BigQuery but you can still query it.
Upvotes: 0
Reputation: 1381
If the first option is cost restrictive, you could just use the S3Hook
to download the file through the PythonOperator:
from airflow.hooks.S3_hook import S3Hook
from datetime import timedelta, datetime
from airflow import DAG
from airflow.hooks.S3_hook import S3Hook
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 0
}
def download_from_s3(**kwargs):
hook = S3Hook(aws_conn_id='s3_conn')
hook.read_key(bucket_name='workflows-dev',
key='test_data.csv')
dag = DAG('s3_download',
schedule_interval='@daily',
default_args=default_args,
catchup=False)
with dag:
download_data = PythonOperator(
task_id='download_data',
python_callable=download_from_s3,
provide_context=True
)
Upvotes: 0