Reputation: 131
Basically I'm working with airflow and developed a task that my download a file from an external source.
t1 = PythonOperator(
task_id='download',
python_callable=download,
provide_context=True,
dag=dag)
and this airflow is running in a virtual environment (pipenv).
The download function is:
def download(**kwargs):
folder_id = 'xxxxxx-xxxx-xxxx-xxxxxx'
file_name = download_file(folder_id)
return file_name
so basically I'm using Xcons to pass data from one task to another...and using this configurations it's impossible to manage all of dependencies of each DAG...
In the documentation I found this class called "PythonVirtualenvOperator", so to implement that I wrote :
t1 = PythonVirtualenvOperator(
task_id='download',
python_callable=download,
requirements=['requests'],
python_version='3.8',
provide_context=True,
dag=dag
)
and its giving me the following error:
TypeError: can't pickle module objects
the download_file function it's an API connection that is in another file.
any suggestion how can I manage the environment and have connection between tasks?
Upvotes: 7
Views: 22335
Reputation: 99
The problem is
provide_context=True,
Airflow cannot pickle the context because of all the unserializable stuff in it. You can use templating and op_kwargs
to work around this if you only need simple stuff like execution_ts
:
t1 = PythonVirtualenvOperator(
task_id='download',
python_callable=download,
provide_context=False,
op_kwargs={
execution_date_str: '{{ execution_date }}',
},
dag=dag)
Of course, you will need to update the arguments to your callable. I didn't go any deeper than that because it worked for my use case.
Upvotes: 6
Reputation: 2780
From the definition of the PythonVirtualenvOperator:
The function must be defined using def, and not be
part of a class. All imports must happen inside the function
and no variables outside of the scope may be referenced.
I'm guessing that someplace in the chain of code that's being called in your download
function, there's a method that's imported from another file using a top-level import. Perhaps moving that import into your download
function is enough?
Upvotes: 2