TheDataGuy
TheDataGuy

Reputation: 3118

Airflow - Export PostgreSQL table using COPY

In airflow we have Postgres to GCS and S3 operators, these all are using SQL query to fetch the results and export it to the target.

But for large tables, COPY method would be preferred. Is there any such operators in Airflow to export the data via COPY and having options to mention the where condition?

Upvotes: 2

Views: 6601

Answers (1)

joebeeson
joebeeson

Reputation: 4366

The PostgresHook class has a method, bulk_dump, that does just that and can be used to export the entirety of a table to a file. If you need export a query and not a table you'll want to use the copy_expert method instead.

You could use these in conjunction with a PythonOperator and a small function to (1) get the PostgresHook from Airflow and (2) run the export for downstream tasks.


Edit: Based on your comment it sounded like you might benefit from a more explicit demonstration. Here's some (untested) code to server as inspiration:

import logging
from tempfile import NamedTemporaryFile

from airflow import models
from airflow.hooks.postgres_hook import PostgresHook
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.operators.python_operator import PythonOperator


# Change these to your identifiers, if needed.
GOOGLE_CONN_ID = "google_cloud_default"
POSTGRES_CONN_ID = "postgres_default"


def copy_to_gcs(copy_sql, file_name, bucket_name):
    gcs_hook = GoogleCloudStorageHook(GOOGLE_CONN_ID)
    pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)

    with NamedTemporaryFile(suffix=".csv") as temp_file:
        temp_name = temp_file.name        

        logging.info("Exporting query to file '%s'", temp_name)
        pg_hook.copy_expert(copy_sql, filename=temp_name)

        logging.info("Uploading to %s/%s", bucket_name, file_name)
        gcs_hook.upload(bucket_name, file_name, temp_name)


with models.DAG(...) as dag:
    copy_to_gcs_task = python_operator.PythonOperator(
        task_id="copy_to_gcs",
        python_callable=copy_to_gcs,
        op_kwargs={
            "copy_sql": "SELECT * FROM the_source",
            "file_name": "name_for_gcs.csv",
            "bucket_name": "from_airflow"
        )

Upvotes: 3

Related Questions