Reputation: 3118
In airflow we have Postgres to GCS and S3 operators, these all are using SQL query to fetch the results and export it to the target.
But for large tables, COPY
method would be preferred. Is there any such operators in Airflow to export the data via COPY and having options to mention the where condition?
Upvotes: 2
Views: 6601
Reputation: 4366
The PostgresHook
class has a method, bulk_dump
, that does just that and can be used to export the entirety of a table to a file. If you need export a query and not a table you'll want to use the copy_expert
method instead.
You could use these in conjunction with a PythonOperator
and a small function to (1) get the PostgresHook
from Airflow and (2) run the export for downstream tasks.
Edit: Based on your comment it sounded like you might benefit from a more explicit demonstration. Here's some (untested) code to server as inspiration:
import logging
from tempfile import NamedTemporaryFile
from airflow import models
from airflow.hooks.postgres_hook import PostgresHook
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.operators.python_operator import PythonOperator
# Change these to your identifiers, if needed.
GOOGLE_CONN_ID = "google_cloud_default"
POSTGRES_CONN_ID = "postgres_default"
def copy_to_gcs(copy_sql, file_name, bucket_name):
gcs_hook = GoogleCloudStorageHook(GOOGLE_CONN_ID)
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
with NamedTemporaryFile(suffix=".csv") as temp_file:
temp_name = temp_file.name
logging.info("Exporting query to file '%s'", temp_name)
pg_hook.copy_expert(copy_sql, filename=temp_name)
logging.info("Uploading to %s/%s", bucket_name, file_name)
gcs_hook.upload(bucket_name, file_name, temp_name)
with models.DAG(...) as dag:
copy_to_gcs_task = python_operator.PythonOperator(
task_id="copy_to_gcs",
python_callable=copy_to_gcs,
op_kwargs={
"copy_sql": "SELECT * FROM the_source",
"file_name": "name_for_gcs.csv",
"bucket_name": "from_airflow"
)
Upvotes: 3