Rishabhg
Rishabhg

Reputation: 118

Unable to push data to xcom in airflow

from airflow.operators.python import get_current_context


context = get_current_context()
ti = context['ti']
ti.xcom_push(key="file", value = doc )

I have the above code in a task and doc is the data that I want to pass to xcom. Its throwing the following error stack trace :

Traceback (most recent call last):
  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/decorators/base.py", line 217, in execute
    return_value = super().execute(context)
  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/operators/python.py", line 175, in execute
    return_value = self.execute_callable()
  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/operators/python.py", line 192, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/bitnami/airflow/dags/rover_ocr_pipeline.py", line 65, in retrieve
    ti.xcom_push(key="file", value = doc )
  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 75, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2294, in xcom_push
    XCom.set(
  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/xcom.py", line 234, in set
    value = cls.serialize_value(
  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/xcom.py", line 627, in serialize_value
    return json.dumps(value, cls=XComEncoder).encode("UTF-8")
  File "/opt/bitnami/python/lib/python3.9/json/__init__.py", line 234, in dumps
    return cls(
  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/json.py", line 176, in encode
    return super().encode(o)
  File "/opt/bitnami/python/lib/python3.9/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/opt/bitnami/python/lib/python3.9/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/json.py", line 153, in default
    CLASSNAME: o.__module__ + "." + o.__class__.__qualname__,
AttributeError: 'bytes' object has no attribute '__module__'

This was working till now, I am guessing its an issue with airflow version. Previously I was using 2.3.4 , now using 2.5.0.

Airflow is running on kubernetes cluster and using airflow:2.5.0-debian-11-r11 image.

Upvotes: 0

Views: 1230

Answers (1)

Tevett Goad
Tevett Goad

Reputation: 156

Moving from comments to an actual answer, see above comments for full conversation

XCOM tries to convert everything to a string before storing in the XCOM tables. In this case since bytes is a class, it was trying to serialize it which isn't possible. Converting the bytes to a normal string by base64 encoding the bytes allowed for it to be stored in xcom.

While probably not worth the effort for just this case, this could be handled automatically by creating a custom xcom backend that accurately detects when dealing with byte strings and performs the conversion behind the scenes.

Upvotes: 1

Related Questions