Reputation: 133
I have created a redshift connection in Airflow as attached in the screenshot. Post that, I imported the RedshiftToS3Operator in my DAG to run a redshift query and store the csv in s3.
from datetime import timedelta, datetime
import pytz
import airflow
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.email import EmailOperator
from airflow.models import Variable
from airflow.hooks.base import BaseHook
redshift_conn = BaseHook.get_connection('redshift_qa')
env_name = Variable.get("deploy_environment")
default_args = {
'owner': 'Abhra',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(1),
'retries': 0,
'retry_delay': timedelta(minutes=2),
'provide_context': True,
'email': ["[email protected]"],
'email_on_failure': True,
'email_on_retry': True
}
dag = DAG(
'TS-Redshift-to-S3',
default_args=default_args,
dagrun_timeout=timedelta(hours=4),
schedule_interval='0 9 * * *'
)
begin_DAG = DummyOperator(task_id='begin_DAG', dag=dag)
stop_DAG = DummyOperator(task_id='stop_DAG', dag=dag)
S3_BUCKET_NAME = "test-bucket"
S3_KEY = "redshift-to-s3.csv"
QUERY = "select * from public.sample_table"
redshift_s3_operator = RedshiftToS3Operator(
task_id='transfer_redshift_to_s3',
s3_bucket=S3_BUCKET_NAME,
s3_key=S3_KEY,
schema='public',
select_query = QUERY,
conn_id = redshift_conn,
dag=dag
)
begin_DAG >> redshift_s3_operator >> stop_DAG
I am getting the following error in Airflow webserver: *airflow.exceptions.AirflowException: Invalid arguments were passed to RedshiftToS3Operator (task_id: transfer_redshift_to_s3). Invalid arguments were: *kwargs: {'conn_id': 'redshift_qa'}
All I am trying to do is be able to import the connection I created in Airflow UI into this DAG. I have looked into this but I am trying to use Amazon Web Services as the Conn Type. What are the changes I need in this dag?
Screenshot : Redshift Connection added in Airflow
Upvotes: 1
Views: 906
Reputation: 149
RedshiftToS3Operator
does not have an argument conn_id
. You're looking for redshift_conn_id
.
Also, provide the name of the connection, not a connection object.
redshift_s3_operator = RedshiftToS3Operator(
task_id='transfer_redshift_to_s3',
s3_bucket=S3_BUCKET_NAME,
s3_key=S3_KEY,
schema='public',
select_query=QUERY,
redshift_conn_id='redshift_qa',
dag=dag
)
Upvotes: 0