Yaroslav Kolodiy
Yaroslav Kolodiy

Reputation: 131

How to use PythonVirtualenvOperator in airflow?

Basically I'm working with airflow and developed a task that my download a file from an external source.

t1 = PythonOperator(
        task_id='download',
        python_callable=download,
        provide_context=True,
        dag=dag)

and this airflow is running in a virtual environment (pipenv).

The download function is:

def download(**kwargs):
   folder_id = 'xxxxxx-xxxx-xxxx-xxxxxx'
   file_name = download_file(folder_id)
   return file_name

so basically I'm using Xcons to pass data from one task to another...and using this configurations it's impossible to manage all of dependencies of each DAG...

In the documentation I found this class called "PythonVirtualenvOperator", so to implement that I wrote :

t1 = PythonVirtualenvOperator(
        task_id='download',
        python_callable=download,
        requirements=['requests'],
        python_version='3.8',
        provide_context=True,
        dag=dag
    )

and its giving me the following error:

TypeError: can't pickle module objects

the download_file function it's an API connection that is in another file.

any suggestion how can I manage the environment and have connection between tasks?

Upvotes: 7

Views: 22335

Answers (2)

Matthew Schmill
Matthew Schmill

Reputation: 99

The problem is

provide_context=True,

Airflow cannot pickle the context because of all the unserializable stuff in it. You can use templating and op_kwargs to work around this if you only need simple stuff like execution_ts:

t1 = PythonVirtualenvOperator(
        task_id='download',
        python_callable=download,
        provide_context=False,
        op_kwargs={
          execution_date_str: '{{ execution_date }}',
        },
        dag=dag)

Of course, you will need to update the arguments to your callable. I didn't go any deeper than that because it worked for my use case.

Upvotes: 6

brki
brki

Reputation: 2780

From the definition of the PythonVirtualenvOperator:

The function must be defined using def, and not be
part of a class. All imports must happen inside the function
and no variables outside of the scope may be referenced.

I'm guessing that someplace in the chain of code that's being called in your download function, there's a method that's imported from another file using a top-level import. Perhaps moving that import into your download function is enough?

Upvotes: 2

Related Questions