Reputation:
I'm trying to figure out how to use the PythonVirtualenvOperator
inside of a DAG that I'm creating, using the TaskFlow API in Apache Airflow 2.0.1.
Here's what my sample DAG looks like:
from time import sleep
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonVirtualenvOperator
default_args = {
'owner': 'airflow'
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags = ['trevor'])
def trevor_dag():
@task()
def task1():
sleep(2)
return 'hi'
t_task1 = PythonVirtualenvOperator(python_callable=task1, system_site_packages=False, requirements=['boto3', 'apache-airflow'], task_id='trevor')
mydag = trevor_dag()
DAG executes successfully in a Python virtual environment.
It seems that, even though I'm installing the apache-airflow
package into the virtual environment, it's not finding the TaskFlow API types.
[2021-04-06 22:44:19,324] {process_utils.py:135} INFO - Executing cmd: /tmp/venvgjg2bsg6/bin/python /tmp/venvgjg2bsg6/script.py /tmp/venvgjg2bsg6/script.in /tmp/venvgjg2bsg6/script.out /tmp/venvgjg2bsg6/string_args.txt
[2021-04-06 22:44:19,329] {process_utils.py:137} INFO - Output:
[2021-04-06 22:44:19,722] {process_utils.py:141} INFO - Traceback (most recent call last):
[2021-04-06 22:44:19,723] {process_utils.py:141} INFO - File "/tmp/venvgjg2bsg6/script.py", line 28, in <module>
[2021-04-06 22:44:19,723] {process_utils.py:141} INFO - @task()
[2021-04-06 22:44:19,724] {process_utils.py:141} INFO - NameError: name 'task' is not defined
[2021-04-06 22:44:20,059] {taskinstance.py:1455} ERROR - Command '['/tmp/venvgjg2bsg6/bin/python', '/tmp/venvgjg2bsg6/script.py', '/tmp/venvgjg2bsg6/script.in', '/tmp/venvgjg2bsg6/script.out', '/tmp/venvgjg2bsg6/string_args.txt']' returned non-zero exit status 1.
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 493, in execute
super().execute(context=serializable_context)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 117, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 531, in execute_callable
string_args_filename,
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/process_utils.py", line 145, in execute_in_subprocess
raise subprocess.CalledProcessError(exit_code, cmd)
subprocess.CalledProcessError: Command '['/tmp/venvgjg2bsg6/bin/python', '/tmp/venvgjg2bsg6/script.py', '/tmp/venvgjg2bsg6/script.in', '/tmp/venvgjg2bsg6/script.out', '/tmp/venvgjg2bsg6/string_args.txt']' returned non-zero exit status 1.
[2021-04-06 22:44:20,061] {taskinstance.py:1503} INFO - Marking task as FAILED. dag_id=trevor_dag, task_id=trevor, execution_date=20210406T224317, start_date=20210406T224317, end_date=20210406T224420
Question: How do I properly utilize the PythonVirtualenvOperator
in DAGs built on the Airflow 2.x TaskFlow API?
Upvotes: 4
Views: 5262
Reputation: 7815
PythonVirtualenvOperator
expects a function to be executed as an argument to its python_callable
parameter. Since you use the task
decorator on task1()
, what PythonVirtualenvOperator
gets instead is an Airflow operator (and not the function task1()
). You need to remove that task
decorator.
Also, task1()
will be "cut out" from the DAG and executed in a virtual environment on its own. So, you have to do all necessary imports inside the function. I.e., importing sleep()
.
The following DAG will work as expected:
from airflow.decorators import dag
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonVirtualenvOperator
default_args = {"owner": "airflow"}
@dag(
default_args=default_args,
schedule_interval=None,
start_date=days_ago(2),
tags=["trevor"],
)
def trevor_dag():
def task1():
from time import sleep
sleep(100)
return "hi"
t_task1 = PythonVirtualenvOperator(
python_callable=task1,
system_site_packages=False,
requirements=["boto3", "apache-airflow"],
task_id="trevor",
)
mydag = trevor_dag()
Upvotes: 7