Reputation: 111
I am trying to copy a file from S3
to redshift
table but I am unable to do so. However, I can read from the table so I know that my connection is okay.
Please help me to figure out the problem.
def upload_redshift():
conn_string = passd.redshift_login['login'] //the connection string containing dbname, username etc.
con = psycopg2.connect(conn_string);
sql = """FROM 's3://datawarehouse/my_S3_file' credentials 'aws_access_key_id=***;aws_secret_access_key=***' csv ; ;"""
try:
con = psycopg2.connect(conn_string)
logging.info("Connection Successful!")
except:
raise ValueError("Unable to connect to Redshift")
cur = con.cursor()
try:
cur.execute(sql)
logging.info(" Copy to redshift executed successfully")
except:
raise ValueError("Failed to execute copy command")
con.close()
I am getting Copy to redshift executed successfully
message but nothing is happening in my table.
Upvotes: 1
Views: 496
Reputation: 250
Try the following,
sql = "copy table_name FROM 's3://datawarehouse/my_S3_file' credentials 'aws_access_key_id=***;aws_secret_access_key=***' csv ;"
Also, try creating the connection under "connections tab" and use PostgresHook with aws_access_key_id and key as variables, something like below which enables to store the details encrypted within airflow,
pg_db = PostgresHook(postgres_conn_id='<<connection_id>>')
src_conn = pg_db.get_conn()
src_cursor = src_conn.cursor()
src_cursor.execute(sql)
src_cursor.commit()
src_cursor.close()
Also, you can use s3_to_redshift_operator operator and execute it as a task,
from airflow.operators.s3_to_redshift_operator import S3ToRedshiftTransfer
T1 = S3ToRedshiftTransfer(
schema = ‘’,
table = ‘’,
s3_bucket=‘’,
s3_key=‘’,
redshift_conn_id=‘’, #reference to a specific redshift database
aws_conn_id=‘’, #reference to a specific S3 connection
)
Upvotes: 1