costinrio
costinrio

Reputation: 15

Airflow - Using Redshift Operator to load data from Postgres to S3 FAILS

Since there are no outofthebox ways to load data from Postgres to S3 I am trying to use the Redshift operator to do this but I receive the following error:

[2020-05-03 18:53:07,359] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: postgres_to_S3.postgres_to_S3 2020-01-01T00:00:00+00:00 [queued]>
[2020-05-03 18:53:07,368] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: postgres_to_S3.postgres_to_S3 2020-01-01T00:00:00+00:00 [queued]>
[2020-05-03 18:53:07,368] {taskinstance.py:879} INFO - 
--------------------------------------------------------------------------------
[2020-05-03 18:53:07,368] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-05-03 18:53:07,368] {taskinstance.py:881} INFO - 
--------------------------------------------------------------------------------
[2020-05-03 18:53:07,385] {taskinstance.py:900} INFO - Executing <Task(RedshiftToS3Transfer): postgres_to_S3> on 2020-01-01T00:00:00+00:00
[2020-05-03 18:53:07,389] {standard_task_runner.py:53} INFO - Started process 124750 to run task
[2020-05-03 18:53:07,445] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: postgres_to_S3.postgres_to_S3 2020-01-01T00:00:00+00:00 [running]> erdc-Virtual-Machine
[2020-05-03 18:53:07,487] {redshift_to_s3_operator.py:124} INFO - Executing UNLOAD command...
[2020-05-03 18:53:07,491] {logging_mixin.py:112} INFO - [2020-05-03 18:53:07,490] {base_hook.py:87} INFO - Using connection to: id: pg_local. Host: 192.168.0.199, Port: 5432, Schema: AdventureWorks, Login: myuser, Password: mypass, extra: None
[2020-05-03 18:53:07,498] {logging_mixin.py:112} INFO - [2020-05-03 18:53:07,498] {dbapi_hook.py:174} INFO - 
                    UNLOAD ('SELECT * FROM AdventureWorks.dimgeography')
                    TO 's3://mybucket/S3_GEOGRAPHY/dimgeography_'
                    with credentials
                    'aws_access_key_id=mykey;aws_secret_access_key=mysecretkey'
                    ;

**[2020-05-03 18:53:07,499] {taskinstance.py:1145} ERROR - syntax error at or near "UNLOAD"
LINE 2:                     UNLOAD ('SELECT * FROM AdventureWorks.di...
                            ^
Traceback (most recent call last):**
  File "/home/erdc/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/erdc/.local/lib/python3.6/site-packages/airflow/operators/redshift_to_s3_operator.py", line 125, in execute
    postgres_hook.run(unload_query, self.autocommit)

I am using this as my DAG:

from airflow.operators.redshift_to_s3_operator import RedshiftToS3Transfer
from datetime import datetime, timedelta
from airflow.operators import DummyOperator
from airflow import DAG

default_args = {
    'owner': 'me',
    'start_date': datetime(2020,1,1),
    'retry_delay': timedelta(minutes=5)
}

# Using the context manager allows not to duplicate the dag parameter in each operation
with DAG('postgres_to_S3', default_args=default_args, schedule_interval='@once') as dag:


    start_task = DummyOperator(
        task_id='dummy_start'
    )


    unload_to_S3 = RedshiftToS3Transfer(
        task_id='postgres_to_S3',
        schema='AdventureWorks',
        table='dimgeography',
        s3_bucket='my_bucket',
        s3_key='S3_GEOGRAPHY',
        redshift_conn_id='pg_local',
        aws_conn_id='my_aws_conn'
#       ,dag=dag
    )

# Using arrows to set dependencies between tasks

start_task >> unload_to_S3

I know for sure the postgres and S3 connections work.

Do you think this workaround could work or I need to create an operator myself?

Thanks!

Upvotes: 0

Views: 1587

Answers (1)

Javier Lopez Tomas
Javier Lopez Tomas

Reputation: 2342

The operator (as it can be seen in the logs) is based on making an Unload query. Postgres doesn't have such functionality, hence it won't work.

You can take the code of MySQLToS3Operator here https://github.com/apache/airflow/blob/9788d3195bedbeaf5e1fbb501c064dab8f5e7803/airflow/operators/mysql_to_s3_operator.py and make some modifications to change mysql for postgres

By the way, if you use with Dag, you don't need to explicitly pass the argument dag=dag to any task

Upvotes: 1

Related Questions