Cemre Mengü
Cemre Mengü

Reputation: 18754

Best way to organize these tasks into a DAG

I am new to airflow, I took some courses about it but did not come across any example for my use case, I would like to:

I tend to see workflows like exporting data into a csv first (from postgres), then loading it into destination database. However, I feel like it would be best to do all these 3 tasks in a single python operator (for example looping with a cursor and bulk inserting) but not sure if this is suitable for airflow.

Any ideas on possible solutions to this situation? What is the general approach?

Upvotes: 1

Views: 557

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 15931

As you mentioned there are several options.

To name a few:

  1. Doing everything in one python task.
  2. Create a pipeline
  3. Create a custom operator.

All approaches are valid each one has advantages and disadvantages.

First approach:

You can write a python function that uses PostgresqlHook to create a dataframe and then load it to oracle.

def all_in_one(context):
    pg_hook = PostgresHook('postgres_conn')
    df = pg_hook.get_pandas_df('SELECT * FROM table')
    # do some transformation on df as needed and load to oracle


op = PyhtonOperator(task_id='all_in_one_task',
                    python_callable=all_in_one,
                    dag=dag
                    )

Advantages :

  1. easy coding (for people who are used to write python scripts)

Disadvantages:

  1. not suitable for large transfers as it's in memory.
  2. If you need to backfill or rerun the entire function is executed. So if there is an issue with loading to oracle you will still rerun the code that fetch records from PostgreSQL.

Second approach:

You can implement your own MyPostgresqlToOracleTransfer with any logic you wish. This is useful if you want to reuse the functionality in different DAGs

Third approach:

Work with files (data lake like). the file can be on local machine if you have only 1 worker, if not the file must be uploaded to a shared drive (S3, Google Storage, any other disk that can be accessed by all workers). Possible pipeline can be: PostgreSQLToGcs -> GcsToOracle

Depends on what service you are using, some of the required operators may already been implemented by Airflow.

Advantages :

  1. Each task stand for itself thus if you successful exported the data to disk, in event of backfill / failure you can just execute the failed operators and not the whole pipe. You can also save the exported files in cold storage in case you will need to rebuild from history.
  2. Suitable for large transfers.

Disadvantages:

  1. Adding another service which is "not needed" (shared disk resource)

Summary

I prefer the 2nd/3rd approaches. I think it's more suitable to what Airflow provides and allow more flexibility.

Upvotes: 1

Related Questions