happyhuman
happyhuman

Reputation: 1651

Sharing large files between airflow operators

Let's assume an airflow DAG consists of the following two operators:

The DAG will be executed by GCP's Composer. I know it is possible to combine the two operators, and implement them as one PythonOperator, but I want to do it more modular, and use builtin operators as much as possible.

Candidate Solutions:

So, what is a good solution here? In general, is there a way to hand off a file (or a string) from one operator to another?

Upvotes: 1

Views: 2620

Answers (2)

happyhuman
happyhuman

Reputation: 1651

I found the solution as described here. A gcsfuse filesystem exists on the composer pods. In your composer code, you can locally access it by /home/airflow/gcs.

So, in the example above, Op1 needs to copy XYZ from the other bucket to the composer bucket. This can be done using GCSToGCSOperator:

from airflow.operators.python_operator import PythonOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator

MY_BUCKET='Original Bucket containing XYZ'
PATH_TO_XYZ='path/to/XYZ'
COMPOSER_BUCKET = os.environ.get('GCS_BUCKET')
XYZ_GCS_PATH = 'data/my_dir/XYZ'
XYZ_LOCAL_PATH = f'/home/airflow/gcs/{XYZ_GCS_PATH}'

def my_function(filepath):
  with open(filepath) as f:
     content = f.read()
     print(content)


with models.DAG(...) as dag:
  Op1 = GCSToGCSOperator(
      task_id='download_data',
      source_bucket=MY_BUCKET,
      source_object=PATH_TO_XYZ,
      destination_bucket=COMPOSER_BUCKET,
      destination_object=XYZ_GCS_PATH,
  )

  Op1 = PythonOperator(
      task_id='read_file',
      python_callable=my_function,
      op_kwargs={'filepath': XYZ_LOCAL_PATH}
  )

  Op1 >> Op2

Upvotes: 1

CrookedCloud
CrookedCloud

Reputation: 124

Yes, intermediary files. using xcom to pass the path to the file

Upvotes: 0

Related Questions