Reputation: 15
Since there are no outofthebox ways to load data from Postgres to S3 I am trying to use the Redshift operator to do this but I receive the following error:
[2020-05-03 18:53:07,359] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: postgres_to_S3.postgres_to_S3 2020-01-01T00:00:00+00:00 [queued]>
[2020-05-03 18:53:07,368] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: postgres_to_S3.postgres_to_S3 2020-01-01T00:00:00+00:00 [queued]>
[2020-05-03 18:53:07,368] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-05-03 18:53:07,368] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-05-03 18:53:07,368] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-05-03 18:53:07,385] {taskinstance.py:900} INFO - Executing <Task(RedshiftToS3Transfer): postgres_to_S3> on 2020-01-01T00:00:00+00:00
[2020-05-03 18:53:07,389] {standard_task_runner.py:53} INFO - Started process 124750 to run task
[2020-05-03 18:53:07,445] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: postgres_to_S3.postgres_to_S3 2020-01-01T00:00:00+00:00 [running]> erdc-Virtual-Machine
[2020-05-03 18:53:07,487] {redshift_to_s3_operator.py:124} INFO - Executing UNLOAD command...
[2020-05-03 18:53:07,491] {logging_mixin.py:112} INFO - [2020-05-03 18:53:07,490] {base_hook.py:87} INFO - Using connection to: id: pg_local. Host: 192.168.0.199, Port: 5432, Schema: AdventureWorks, Login: myuser, Password: mypass, extra: None
[2020-05-03 18:53:07,498] {logging_mixin.py:112} INFO - [2020-05-03 18:53:07,498] {dbapi_hook.py:174} INFO -
UNLOAD ('SELECT * FROM AdventureWorks.dimgeography')
TO 's3://mybucket/S3_GEOGRAPHY/dimgeography_'
with credentials
'aws_access_key_id=mykey;aws_secret_access_key=mysecretkey'
;
**[2020-05-03 18:53:07,499] {taskinstance.py:1145} ERROR - syntax error at or near "UNLOAD"
LINE 2: UNLOAD ('SELECT * FROM AdventureWorks.di...
^
Traceback (most recent call last):**
File "/home/erdc/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/erdc/.local/lib/python3.6/site-packages/airflow/operators/redshift_to_s3_operator.py", line 125, in execute
postgres_hook.run(unload_query, self.autocommit)
I am using this as my DAG:
from airflow.operators.redshift_to_s3_operator import RedshiftToS3Transfer
from datetime import datetime, timedelta
from airflow.operators import DummyOperator
from airflow import DAG
default_args = {
'owner': 'me',
'start_date': datetime(2020,1,1),
'retry_delay': timedelta(minutes=5)
}
# Using the context manager allows not to duplicate the dag parameter in each operation
with DAG('postgres_to_S3', default_args=default_args, schedule_interval='@once') as dag:
start_task = DummyOperator(
task_id='dummy_start'
)
unload_to_S3 = RedshiftToS3Transfer(
task_id='postgres_to_S3',
schema='AdventureWorks',
table='dimgeography',
s3_bucket='my_bucket',
s3_key='S3_GEOGRAPHY',
redshift_conn_id='pg_local',
aws_conn_id='my_aws_conn'
# ,dag=dag
)
# Using arrows to set dependencies between tasks
start_task >> unload_to_S3
I know for sure the postgres and S3 connections work.
Do you think this workaround could work or I need to create an operator myself?
Thanks!
Upvotes: 0
Views: 1587
Reputation: 2342
The operator (as it can be seen in the logs) is based on making an Unload query. Postgres doesn't have such functionality, hence it won't work.
You can take the code of MySQLToS3Operator here https://github.com/apache/airflow/blob/9788d3195bedbeaf5e1fbb501c064dab8f5e7803/airflow/operators/mysql_to_s3_operator.py and make some modifications to change mysql for postgres
By the way, if you use with Dag
, you don't need to explicitly pass the argument dag=dag
to any task
Upvotes: 1