khuang834
khuang834

Reputation: 971

How to run Airflow PythonOperator in a virtual environment

I have several python files that I'm currently executing using BashOperator. This allows me the flexibility to choose the python virtual environment easily.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
   'owner': 'airflow',
    'depends_on_past': False,
    ...}

dag = DAG('python_tasks', default_args=default_args, schedule_interval="23 4 * * *")

t1 = BashOperator(
                 task_id='task1',
                bash_command='~/anaconda3/envs/myenv/bin/python 
                              /python_files/python_task1.py',
                 dag=dag)

How can I achieve the same using PythonOperator to something like this?

from airflow.operators.bash_operator import PythonOperator
import python_files.python_task1

python_task = PythonOperator(
              task_id='python_task',
              python_callable=python_task1.main,
             dag=dag)

I assume PythonOperator will use the system python environment. I've found that Airflow has the PythonVirtualenvOperator, but this appears to work by creating a new virtual env on the fly using the specified requirements. I'd prefer to use an existing one that is already properly configured. How can I run PythonOperator with a specified python path?

Upvotes: 22

Views: 23824

Answers (3)

francosta
francosta

Reputation: 11

Use PythonVirtualenvOperator. You need to provide a python function to be run and a requirements.txt of the virtual environment.

Upvotes: 1

Dustin Sun
Dustin Sun

Reputation: 5532

My work-around is to use a Bash Operator to call /path/to/project/venv/bin/python my.py

Upvotes: 14

villasv
villasv

Reputation: 6831

First things first: you should not (in general) rely on pre-existing resources for your Operators. You operators should be portable, so using longstanding virtualenvs is somewhat against that principle. That being said, it's not as much of a big deal, just like you have to preinstall packages to the global environment you can pre-bake a few environments. Or, you can let the Operator create the environment and subsequent operators may reuse it - which is, I believe, the easiest and most dangerous approach.

Implementing a "virtualenv cache" shouldn't be difficult. Reading the implementation of PythonVirtualenvOperator's execution method:

def execute_callable(self):
    with TemporaryDirectory(prefix='venv') as tmp_dir:
        ...
        self._execute_in_subprocess(
            self._generate_python_cmd(tmp_dir,
                                      script_filename,
                                      input_filename,
                                      output_filename,
                                      string_args_filename))
        return self._read_result(output_filename)

So it looks like it doesn't delete the virtualenv explicitly (it relies on TemporaryDirectory to do that). You can subclass PythonVirtualenvOperator and simply use your own context manager that reuses temporary directories:

import glob

@contextmanager
def ReusableTemporaryDirectory(prefix):
    try:
        existing = glob.glob('/tmp/' + prefix + '*')
        if len(existing):
            name = existing[0]
        else:
            name = mkdtemp(prefix=prefix)
        yield name
    finally:
        # simply don't delete the tmp dir
        pass

def execute_callable(self):
    with ReusableTemporaryDirectory(prefix='cached-venv') as tmp_dir:
        ...

Naturally, you can get rid of the try-finally in ReusableTemporaryDirectory and put back the usual suffix and dir arguments, I made minimal changes to make it easy to compare with the original TemporaryDirectory class.

With this, your virtualenv won't be discarded but newer dependencies will be eventually installed by the Operator.

Upvotes: 11

Related Questions