Reputation: 85
I wrote a DockerOperator in Apache Airflow and I want to give it a volume. So far so good. Here is an example:
t = DockerOperator(
task_id='test',
image='testimage:latest',
command='python3 /code/test.py',
volumes=["/mnt/interim:/interim"],
xcom_push=True,
dag=dag,
)
The problem I have is the following:
The name of the mounted directory needs to be flexible. Therefore, I want to mount a directory with the run_id in its name.
volumes=["/mnt/interim/" + "{{ run_id }}" + ":/interim"]
Airflow however does not seem to resolve "{{ run_id }}" in the volumes but only in the command of a DockerOperator.
To put it in a nutshell, I want to get the run_id in order to mount it.
Please note, using an airflow Variable (environment variable of airflow) wont do the trick, because if the task runs parallel, this Variable might get overwritten.
Maybe someone of you already knows an advanced DockerOperator that can do that (CustomOperator).
Thanks in advance :)
Upvotes: 1
Views: 1013
Reputation: 2250
Thanks Johannes for posting the question.
What you're trying achieve is possible, but because this isn't a very common use case this isn't enabled by default. The arguments that are in the template_fields
iterable are being templated by Airflow. The volumes
field isn't in there, so therefore it isn't being picked up.
The easiest way of doing this, is copy the docker_operator.py in your project, and add the volumes
field to the list: https://github.com/apache/airflow/blob/master/airflow/operators/docker_operator.py#L126:
template_fields = ('command', 'environment', 'container_name', 'volumes',)
You can also open a ticket and get this merged up stream, but I'm not sure how many users will template this field. Hope this helps.
Upvotes: 4