Reputation: 1651
Let's assume an airflow DAG consists of the following two operators:
Op1 (GCSToLocalFilesystemOperator
): Downloads the file XYZ from GCS
Op2 (PythonOperator
): Needs the file XYZ to do something with it
The DAG will be executed by GCP's Composer. I know it is possible to combine the two operators, and implement them as one PythonOperator
, but I want to do it more modular, and use builtin operators as much as possible.
Candidate Solutions:
Use a temporary file: temporary files cannot be used. If XYZ is stored as /tmp/XYZ, Op2 may or may not find it, since the two Operators might be run by different workers.
xcom: xcom cannot also be used here since the file is rather large.
Use GCS as a shared storage: It does not work in this particular example. We are back to where we are, since one operator still needs to do the downloading and availing the file for the second operator.
So, what is a good solution here? In general, is there a way to hand off a file (or a string) from one operator to another?
Upvotes: 1
Views: 2620
Reputation: 1651
I found the solution as described here. A gcsfuse filesystem exists on the composer pods. In your composer code, you can locally access it by /home/airflow/gcs
.
So, in the example above, Op1 needs to copy XYZ from the other bucket to the composer bucket. This can be done using GCSToGCSOperator
:
from airflow.operators.python_operator import PythonOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
MY_BUCKET='Original Bucket containing XYZ'
PATH_TO_XYZ='path/to/XYZ'
COMPOSER_BUCKET = os.environ.get('GCS_BUCKET')
XYZ_GCS_PATH = 'data/my_dir/XYZ'
XYZ_LOCAL_PATH = f'/home/airflow/gcs/{XYZ_GCS_PATH}'
def my_function(filepath):
with open(filepath) as f:
content = f.read()
print(content)
with models.DAG(...) as dag:
Op1 = GCSToGCSOperator(
task_id='download_data',
source_bucket=MY_BUCKET,
source_object=PATH_TO_XYZ,
destination_bucket=COMPOSER_BUCKET,
destination_object=XYZ_GCS_PATH,
)
Op1 = PythonOperator(
task_id='read_file',
python_callable=my_function,
op_kwargs={'filepath': XYZ_LOCAL_PATH}
)
Op1 >> Op2
Upvotes: 1
Reputation: 124
Yes, intermediary files. using xcom to pass the path to the file
Upvotes: 0