ab_padfoot
ab_padfoot

Reputation: 133

Airflow Exception while trying to store output of a SQL query from Redshift to S3 using RedshiftToS3Operator

I have created a redshift connection in Airflow as attached in the screenshot. Post that, I imported the RedshiftToS3Operator in my DAG to run a redshift query and store the csv in s3.

from datetime import timedelta, datetime
import pytz
import airflow  
from airflow import DAG  
from airflow.operators.dummy import DummyOperator
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.email import EmailOperator
from airflow.models import Variable
from airflow.hooks.base import BaseHook

redshift_conn = BaseHook.get_connection('redshift_qa')

env_name = Variable.get("deploy_environment")

default_args = {  
    'owner': 'Abhra',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(1),
    'retries': 0,
    'retry_delay': timedelta(minutes=2),
    'provide_context': True,
    'email': ["[email protected]"],
    'email_on_failure': True,
    'email_on_retry': True
}

dag = DAG(  
    'TS-Redshift-to-S3',
    default_args=default_args,
    dagrun_timeout=timedelta(hours=4),
    schedule_interval='0 9 * * *'
)

begin_DAG = DummyOperator(task_id='begin_DAG', dag=dag)
stop_DAG = DummyOperator(task_id='stop_DAG', dag=dag)
S3_BUCKET_NAME = "test-bucket"
S3_KEY = "redshift-to-s3.csv"
QUERY = "select * from public.sample_table"

redshift_s3_operator = RedshiftToS3Operator(
    task_id='transfer_redshift_to_s3',
    s3_bucket=S3_BUCKET_NAME,
    s3_key=S3_KEY,
    schema='public',
    select_query = QUERY,
    conn_id = redshift_conn,
    dag=dag
)


begin_DAG >> redshift_s3_operator >> stop_DAG

I am getting the following error in Airflow webserver: *airflow.exceptions.AirflowException: Invalid arguments were passed to RedshiftToS3Operator (task_id: transfer_redshift_to_s3). Invalid arguments were: *kwargs: {'conn_id': 'redshift_qa'}

All I am trying to do is be able to import the connection I created in Airflow UI into this DAG. I have looked into this but I am trying to use Amazon Web Services as the Conn Type. What are the changes I need in this dag?

Screenshot : Redshift Connection added in Airflow

Upvotes: 1

Views: 906

Answers (1)

holly.evans
holly.evans

Reputation: 149

RedshiftToS3Operator does not have an argument conn_id. You're looking for redshift_conn_id.

Also, provide the name of the connection, not a connection object.

redshift_s3_operator = RedshiftToS3Operator(
    task_id='transfer_redshift_to_s3',
    s3_bucket=S3_BUCKET_NAME,
    s3_key=S3_KEY,
    schema='public',
    select_query=QUERY,
    redshift_conn_id='redshift_qa',
    dag=dag
)

Upvotes: 0

Related Questions