user189198
user189198

Reputation:

Use PythonVirtualenvOperator in Apache Airflow 2.0 TaskFlow DAG

I'm trying to figure out how to use the PythonVirtualenvOperator inside of a DAG that I'm creating, using the TaskFlow API in Apache Airflow 2.0.1.

Sample Code

Here's what my sample DAG looks like:

from time import sleep
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonVirtualenvOperator

default_args = {
  'owner': 'airflow'
}

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags = ['trevor'])
def trevor_dag():
  @task()
  def task1():
    sleep(2)
    return 'hi'
  t_task1 = PythonVirtualenvOperator(python_callable=task1, system_site_packages=False, requirements=['boto3', 'apache-airflow'], task_id='trevor')
  

mydag = trevor_dag()

Expected Result

DAG executes successfully in a Python virtual environment.

Actual Result

It seems that, even though I'm installing the apache-airflow package into the virtual environment, it's not finding the TaskFlow API types.

[2021-04-06 22:44:19,324] {process_utils.py:135} INFO - Executing cmd: /tmp/venvgjg2bsg6/bin/python /tmp/venvgjg2bsg6/script.py /tmp/venvgjg2bsg6/script.in /tmp/venvgjg2bsg6/script.out /tmp/venvgjg2bsg6/string_args.txt
[2021-04-06 22:44:19,329] {process_utils.py:137} INFO - Output:
[2021-04-06 22:44:19,722] {process_utils.py:141} INFO - Traceback (most recent call last):
[2021-04-06 22:44:19,723] {process_utils.py:141} INFO -   File "/tmp/venvgjg2bsg6/script.py", line 28, in <module>
[2021-04-06 22:44:19,723] {process_utils.py:141} INFO -     @task()
[2021-04-06 22:44:19,724] {process_utils.py:141} INFO - NameError: name 'task' is not defined
[2021-04-06 22:44:20,059] {taskinstance.py:1455} ERROR - Command '['/tmp/venvgjg2bsg6/bin/python', '/tmp/venvgjg2bsg6/script.py', '/tmp/venvgjg2bsg6/script.in', '/tmp/venvgjg2bsg6/script.out', '/tmp/venvgjg2bsg6/string_args.txt']' returned non-zero exit status 1.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 493, in execute
    super().execute(context=serializable_context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 117, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 531, in execute_callable
    string_args_filename,
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/process_utils.py", line 145, in execute_in_subprocess
    raise subprocess.CalledProcessError(exit_code, cmd)
subprocess.CalledProcessError: Command '['/tmp/venvgjg2bsg6/bin/python', '/tmp/venvgjg2bsg6/script.py', '/tmp/venvgjg2bsg6/script.in', '/tmp/venvgjg2bsg6/script.out', '/tmp/venvgjg2bsg6/string_args.txt']' returned non-zero exit status 1.
[2021-04-06 22:44:20,061] {taskinstance.py:1503} INFO - Marking task as FAILED. dag_id=trevor_dag, task_id=trevor, execution_date=20210406T224317, start_date=20210406T224317, end_date=20210406T224420

Question: How do I properly utilize the PythonVirtualenvOperator in DAGs built on the Airflow 2.x TaskFlow API?

Upvotes: 4

Views: 5262

Answers (1)

SergiyKolesnikov
SergiyKolesnikov

Reputation: 7815

PythonVirtualenvOperator expects a function to be executed as an argument to its python_callable parameter. Since you use the task decorator on task1(), what PythonVirtualenvOperator gets instead is an Airflow operator (and not the function task1()). You need to remove that task decorator.

Also, task1() will be "cut out" from the DAG and executed in a virtual environment on its own. So, you have to do all necessary imports inside the function. I.e., importing sleep().

The following DAG will work as expected:

from airflow.decorators import dag
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonVirtualenvOperator

default_args = {"owner": "airflow"}


@dag(
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=["trevor"],
)
def trevor_dag():
    def task1():
        from time import sleep

        sleep(100)
        return "hi"

    t_task1 = PythonVirtualenvOperator(
        python_callable=task1,
        system_site_packages=False,
        requirements=["boto3", "apache-airflow"],
        task_id="trevor",
    )


mydag = trevor_dag()

Upvotes: 7

Related Questions