Yaroslav Kolodiy
Yaroslav Kolodiy

Reputation: 131

XCOM's don't work with PythonVirtualenvOperator airflow 1.10.6

I'm using PythonVirtualenvOperator's in my airflow project an I need to pass parameters from one task to other, to test the xcoms i used this example and it works but when I change the pythonOperator to PythonVirtualenvOperator it have a prolem.

The code:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator, PythonVirtualenvOperator

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True,

}

dag = DAG('BAtatas', schedule_interval="@once", default_args=args)

value_1 = [1, 2, 3]
value_2 = {'a': 'b'}


def push(**kwargs):
    """Pushes an XCom without a specific target"""
    print(kwargs)
    kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)


def push_by_returning(**kwargs):
    """Pushes an XCom without a specific target, just by returning it"""
    return value_2


def puller(**kwargs):
    """Pull all previously pushed XComs and check if the pushed values match the pulled values."""
    ti = kwargs['ti']

    # get value_1
    pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
    assert pulled_value_1 == value_1

    # get value_2
    pulled_value_2 = ti.xcom_pull(task_ids='push_by_returning')
    assert pulled_value_2 == value_2

    # get both value_1 and value_2
    pulled_value_1, pulled_value_2 = ti.xcom_pull(key=None, task_ids=['push', 'push_by_returning'])
    assert (pulled_value_1, pulled_value_2) == (value_1, value_2)


push1 = PythonVirtualenvOperator(
    task_id='push',
    dag=dag,
    python_callable=push,
    requirements=['dill'],
    python_version='3.8',
    use_dill=True,
    system_site_packages=True,
    op_args=None,
    op_kwargs=None,
)

push2 = PythonVirtualenvOperator(
    task_id='push_by_returning',
    dag=dag,
    python_callable=push_by_returning,
    requirements=['dill'],
    python_version='3.8',
    use_dill=True,
    system_site_packages=True,
    op_args=None,
    op_kwargs=None,
)

pull = PythonVirtualenvOperator(
    task_id='puller',
    dag=dag,
    python_callable=puller,
    requirements=['dill'],
    python_version='3.8',
    use_dill=True,
    system_site_packages=True,
    op_args=None,
    op_kwargs=None,
)

pull << [push1, push2]

the error:

[2019-11-29 17:58:04,745] {base_task_runner.py:113} INFO - Job 133: Subtask push Traceback (most recent call last):
[2019-11-29 17:58:04,745] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/home/yk0l0diy/.local/share/virtualenvs/load_data-KWkJBdeu/bin/airflow", line 37, in <module>
[2019-11-29 17:58:04,746] {base_task_runner.py:113} INFO - Job 133: Subtask push     args.func(args)
[2019-11-29 17:58:04,747] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/home/yk0l0diy/.local/share/virtualenvs/load_data-KWkJBdeu/lib/python3.8/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-11-29 17:58:04,748] {base_task_runner.py:113} INFO - Job 133: Subtask push     return f(*args, **kwargs)
[2019-11-29 17:58:04,748] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/home/yk0l0diy/.local/share/virtualenvs/load_data-KWkJBdeu/lib/python3.8/site-packages/airflow/bin/cli.py", line 551, in run
[2019-11-29 17:58:04,749] {base_task_runner.py:113} INFO - Job 133: Subtask push     _run(args, dag, ti)
[2019-11-29 17:58:04,749] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/home/yk0l0diy/.local/share/virtualenvs/load_data-KWkJBdeu/lib/python3.8/site-packages/airflow/bin/cli.py", line 466, in _run
[2019-11-29 17:58:04,749] {base_task_runner.py:113} INFO - Job 133: Subtask push     ti._run_raw_task(
[2019-11-29 17:58:04,749] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/home/yk0l0diy/.local/share/virtualenvs/load_data-KWkJBdeu/lib/python3.8/site-packages/airflow/utils/db.py", line 74, in wrapper
[2019-11-29 17:58:04,749] {base_task_runner.py:113} INFO - Job 133: Subtask push     return func(*args, **kwargs)
[2019-11-29 17:58:04,749] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/home/yk0l0diy/.local/share/virtualenvs/load_data-KWkJBdeu/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
[2019-11-29 17:58:04,749] {base_task_runner.py:113} INFO - Job 133: Subtask push     result = task_copy.execute(context=context)
[2019-11-29 17:58:04,749] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/home/yk0l0diy/.local/share/virtualenvs/load_data-KWkJBdeu/lib/python3.8/site-packages/airflow/operators/python_operator.py", line 113, in execute
[2019-11-29 17:58:04,749] {base_task_runner.py:113} INFO - Job 133: Subtask push     return_value = self.execute_callable()
[2019-11-29 17:58:04,749] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/home/yk0l0diy/.local/share/virtualenvs/load_data-KWkJBdeu/lib/python3.8/site-packages/airflow/operators/python_operator.py", line 297, in execute_callable
[2019-11-29 17:58:04,749] {base_task_runner.py:113} INFO - Job 133: Subtask push     self._write_args(input_filename)
[2019-11-29 17:58:04,749] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/home/yk0l0diy/.local/share/virtualenvs/load_data-KWkJBdeu/lib/python3.8/site-packages/airflow/operators/python_operator.py", line 337, in _write_args
[2019-11-29 17:58:04,749] {base_task_runner.py:113} INFO - Job 133: Subtask push     dill.dump(arg_dict, f)
[2019-11-29 17:58:04,749] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/home/yk0l0diy/.local/share/virtualenvs/load_data-KWkJBdeu/lib/python3.8/site-packages/dill/_dill.py", line 259, in dump
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push     Pickler(file, protocol, **_kwds).dump(obj)
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/home/yk0l0diy/.local/share/virtualenvs/load_data-KWkJBdeu/lib/python3.8/site-packages/dill/_dill.py", line 445, in dump
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push     StockPickler.dump(self, obj)
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/usr/lib/python3.8/pickle.py", line 485, in dump
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push     self.save(obj)
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/usr/lib/python3.8/pickle.py", line 558, in save
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push     f(self, obj)  # Call unbound method with explicit self
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/home/yk0l0diy/.local/share/virtualenvs/load_data-KWkJBdeu/lib/python3.8/site-packages/dill/_dill.py", line 912, in save_module_dict
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push     StockPickler.save_dict(pickler, obj)
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/usr/lib/python3.8/pickle.py", line 969, in save_dict
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push     self._batch_setitems(obj.items())
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/usr/lib/python3.8/pickle.py", line 995, in _batch_setitems
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push     save(v)
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/usr/lib/python3.8/pickle.py", line 558, in save
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push     f(self, obj)  # Call unbound method with explicit self
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/home/yk0l0diy/.local/share/virtualenvs/load_data-KWkJBdeu/lib/python3.8/site-packages/dill/_dill.py", line 912, in save_module_dict
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push     StockPickler.save_dict(pickler, obj)
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/usr/lib/python3.8/pickle.py", line 969, in save_dict
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push     self._batch_setitems(obj.items())
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/usr/lib/python3.8/pickle.py", line 995, in _batch_setitems
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push     save(v)
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/usr/lib/python3.8/pickle.py", line 558, in save
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push     f(self, obj)  # Call unbound method with explicit self
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/home/yk0l0diy/.local/share/virtualenvs/load_data-KWkJBdeu/lib/python3.8/site-packages/dill/_dill.py", line 912, in save_module_dict
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push     StockPickler.save_dict(pickler, obj)
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/usr/lib/python3.8/pickle.py", line 969, in save_dict
[2019-11-29 17:58:04,750] {base_task_runner.py:113} INFO - Job 133: Subtask push     self._batch_setitems(obj.items())
[2019-11-29 17:58:04,751] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/usr/lib/python3.8/pickle.py", line 995, in _batch_setitems
[2019-11-29 17:58:04,751] {base_task_runner.py:113} INFO - Job 133: Subtask push     save(v)
[2019-11-29 17:58:04,751] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/usr/lib/python3.8/pickle.py", line 576, in save
[2019-11-29 17:58:04,751] {base_task_runner.py:113} INFO - Job 133: Subtask push     rv = reduce(self.proto)
[2019-11-29 17:58:04,751] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/home/yk0l0diy/.local/share/virtualenvs/load_data-KWkJBdeu/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1194, in __getattr__
[2019-11-29 17:58:04,751] {base_task_runner.py:113} INFO - Job 133: Subtask push     self.var = Variable.get(item)
[2019-11-29 17:58:04,751] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/home/yk0l0diy/.local/share/virtualenvs/load_data-KWkJBdeu/lib/python3.8/site-packages/airflow/utils/db.py", line 74, in wrapper
[2019-11-29 17:58:04,751] {base_task_runner.py:113} INFO - Job 133: Subtask push     return func(*args, **kwargs)
[2019-11-29 17:58:04,751] {base_task_runner.py:113} INFO - Job 133: Subtask push   File "/home/yk0l0diy/.local/share/virtualenvs/load_data-KWkJBdeu/lib/python3.8/site-packages/airflow/models/variable.py", line 118, in get
[2019-11-29 17:58:04,751] {base_task_runner.py:113} INFO - Job 133: Subtask push     raise KeyError('Variable {} does not exist'.format(key))
[2019-11-29 17:58:04,751] {base_task_runner.py:113} INFO - Job 133: Subtask push KeyError: 'Variable __getstate__ does not exist'
[2019-11-29 17:58:07,380] {logging_mixin.py:112} INFO - [2019-11-29 17:58:07,379] {local_task_job.py:103} INFO - Task exited with return code 1

Any suggestion?

I'm using the PythonVirtualenvOperator to have my tasks individualized and have different requeremets/versions in each task.

Still not working with python3.7 the code is:

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, PythonVirtualenvOperator

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True,

}

dag = DAG('BAtatas', schedule_interval="@once", default_args=args)

value_1 = [1, 2, 3]
value_2 = {'a': 'b'}


def push(**kwargs):
    """Pushes an XCom without a specific target"""
    print(kwargs)
    kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)


def push_by_returning(**kwargs):
    """Pushes an XCom without a specific target, just by returning it"""
    return value_2


def puller(**kwargs):
    """Pull all previously pushed XComs and check if the pushed values match the pulled values."""
    ti = kwargs['ti']

    # get value_1
    pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
    assert pulled_value_1 == value_1

    # get value_2
    pulled_value_2 = ti.xcom_pull(task_ids='push_by_returning')
    assert pulled_value_2 == value_2

    # get both value_1 and value_2
    pulled_value_1, pulled_value_2 = ti.xcom_pull(key=None, task_ids=['push', 'push_by_returning'])
    assert (pulled_value_1, pulled_value_2) == (value_1, value_2)


push1 = PythonVirtualenvOperator(
    task_id='push',
    dag=dag,
    python_callable=push,
    requirements=[],
    python_version='3.7',
    use_dill=False,
    system_site_packages=True,
    op_args=None,
    op_kwargs=None,
)

push2 = PythonVirtualenvOperator(
    task_id='push_by_returning',
    dag=dag,
    python_callable=push_by_returning,
    requirements=[],
    python_version='3.7',
    use_dill=False,
    system_site_packages=True,
    op_args=None,
    op_kwargs=None,
)

pull = PythonVirtualenvOperator(
    task_id='puller',
    dag=dag,
    python_callable=puller,
    requirements=[],
    python_version='3.7',
    use_dill=False,
    system_site_packages=True,
    op_args=None,
    op_kwargs=None,
)

pull << [push1, push2]

the error is:

[2019-12-04 14:08:59,579] {base_task_runner.py:113} INFO - Job 139: Subtask push     pickle.dump(arg_dict, f)
[2019-12-04 14:08:59,579] {base_task_runner.py:113} INFO - Job 139: Subtask push TypeError: cannot pickle 'module' object

and when I use use_dill=True,

the error is:

[2019-12-04 14:32:12,500] {base_task_runner.py:113} INFO - Job 141: Subtask push     return func(*args, **kwargs)
[2019-12-04 14:32:12,500] {base_task_runner.py:113} INFO - Job 141: Subtask push   File "/home/yk0l0diy/.local/share/virtualenvs/load_data-KWkJBdeu/lib/python3.8/site-packages/airflow/models/variable.py", line 118, in get
[2019-12-04 14:32:12,500] {base_task_runner.py:113} INFO - Job 141: Subtask push     raise KeyError('Variable {} does not exist'.format(key))
[2019-12-04 14:32:12,500] {base_task_runner.py:113} INFO - Job 141: Subtask push KeyError: 'Variable __getstate__ does not exist'
[2019-12-04 14:32:16,640] {logging_mixin.py:112} INFO - [2019-12-04 14:32:16,639] {local_task_job.py:103} INFO - Task exited with return code 1

Upvotes: 2

Views: 4186

Answers (2)

jon
jon

Reputation: 61

This is fixed in airflow 2.1.0. The issue for this is here: https://github.com/apache/airflow/issues/15335

Upvotes: 2

Gorka
Gorka

Reputation: 291

The problem lies in the serialization process when calling the virtual environment. The context contains things that pickle/dill are unable to serialize so in the latest version of Airflow the context is no longer provided.

That means you can not retrieve things from the context or push things into the context while executing something inside the virtual environment created by PythonVirtualenvOperator.

The only way to pass the virtual environment something from the context is to use templates_dict or op_kwargs to process the XCom during the rendering instead of trying to access the context from inside the virtual environment.

pull = PythonVirtualenvOperator(
    task_id='puller',
    op_kwargs={
        "result_push": "{{ ti.xcom_pull(task_ids='pusher') }}"
    },
)

And the only way of sending something from PythonVirtualenvOperator to Xcom is by using the standard output while inside the execution of the code inside the virtual environment (disclaimer: I don't recommend this at all. I don't know if I should be even saying this):

pusher = PythonVirtualenvOperator(
    task_id='pusher',
    do_xcom_push=True,
    ...
)

PythonVirtualenvOperator will try to do return self._read_result(output_filename) and the output of that script will be stored in XCom.

def push(**kwargs):
    print("test") # this value should appear inside the Xcom of that task if nothing else has printed something else and then you'll get a bunch of nothing.

Upvotes: 8

Related Questions