Yempee
Yempee

Reputation: 1

Not able to pass data frame between airflow tasks

Passing panda data frame between airflow tasks failing.Tried the below code


try:
    from datetime import timedelta, datetime
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    import pandas as pd
except Exception as e:
    print(e)

default_args = {
    "owner": "airflow",
    "start_date": datetime(2021, 1, 1),
    "retries": 0,
    "retry_delay": timedelta(minutes=1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
}


def read_file(**context):
    path = "/opt/airflow/common/netflix_titles.csv"
    df = pd.read_csv(path, encoding="ISO-8859-1")
    context['ti'].xcom_push(key='df', value=df)


def process_type(**context):
    df = context.get("ti").xcom_pull(key="df")
    print(df)

dag = DAG(dag_id="DAG-READ-CSV", schedule_interval="@once", default_args=default_args, catchup=False)

read_file = PythonOperator(task_id="read_file", python_callable=read_file, dag=dag)

process_type = PythonOperator(task_id="process_title", python_callable=process_type, dag=dag)

read_file >> process_type


Error Details:

*** Reading local file: /opt/airflow/logs/DAG-READ-CSV/read_file/2021-05-15T02:16:50.764650+00:00/1.log
[2021-05-15 02:16:52,600] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: DAG-READ-CSV.read_file 2021-05-15T02:16:50.764650+00:00 [queued]>
[2021-05-15 02:16:52,618] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: DAG-READ-CSV.read_file 2021-05-15T02:16:50.764650+00:00 [queued]>
[2021-05-15 02:16:52,619] {taskinstance.py:1068} INFO - 
--------------------------------------------------------------------------------
[2021-05-15 02:16:52,620] {taskinstance.py:1069} INFO - Starting attempt 1 of 1
[2021-05-15 02:16:52,621] {taskinstance.py:1070} INFO - 
--------------------------------------------------------------------------------
[2021-05-15 02:16:52,629] {taskinstance.py:1089} INFO - Executing <Task(PythonOperator): read_file> on 2021-05-15T02:16:50.764650+00:00
[2021-05-15 02:16:52,634] {standard_task_runner.py:52} INFO - Started process 527 to run task
[2021-05-15 02:16:52,639] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'DAG-READ-CSV', 'read_file', '2021-05-15T02:16:50.764650+00:00', '--job-id', '197', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/airflow_practice_7_practice_read_csv.py', '--cfg-path', '/tmp/tmp5jr0dror', '--error-file', '/tmp/tmpn9o4ulj3']
[2021-05-15 02:16:52,644] {standard_task_runner.py:77} INFO - Job 197: Subtask read_file
[2021-05-15 02:16:52,696] {logging_mixin.py:104} INFO - Running <TaskInstance: DAG-READ-CSV.read_file 2021-05-15T02:16:50.764650+00:00 [running]> on host f526bca85af4
[2021-05-15 02:16:52,745] {taskinstance.py:1283} INFO - Exporting the following env vars:
[email protected]
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=DAG-READ-CSV
AIRFLOW_CTX_TASK_ID=read_file
AIRFLOW_CTX_EXECUTION_DATE=2021-05-15T02:16:50.764650+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-05-15T02:16:50.764650+00:00
[2021-05-15 02:16:52,766] {xcom.py:238} ERROR - Could not serialize the XCom value into JSON. If you are using pickles instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config.
[2021-05-15 02:16:52,767] {taskinstance.py:1482} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 117, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 128, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/airflow_practice_7_practice_read_csv.py", line 23, in read_file
    context['ti'].xcom_push(key='df', value=df)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1928, in xcom_push
    session=session,
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/xcom.py", line 88, in set
    value = XCom.serialize_value(value)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/xcom.py", line 235, in serialize_value
    return json.dumps(value).encode('UTF-8')
  File "/usr/local/lib/python3.6/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/local/lib/python3.6/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/lib/python3.6/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/lib/python3.6/json/encoder.py", line 180, in default
    o.__class__.__name__)
TypeError: Object of type 'DataFrame' is not JSON serializable
[2021-05-15 02:16:52,772] {taskinstance.py:1532} INFO - Marking task as FAILED. dag_id=DAG-READ-CSV, task_id=read_file, execution_date=20210515T021650, start_date=20210515T021652, end_date=20210515T021652
[2021-05-15 02:16:52,814] {local_task_job.py:146} INFO - Task exited with return code 1

Upvotes: 0

Views: 2906

Answers (2)

YonatanAshkenazi
YonatanAshkenazi

Reputation: 71

Although it's not recommended to pass large objects with XCOM (taking up storage in the metadata database) You can try pass the DataFrame as dict like that:

import pandas as pd
import json
from airflow import DAG, settings
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.timetables.interval import CronDataIntervalTimetable


DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
}


def read_file():
    df = pd.DataFrame({"a": 1, "b": 2, "c": 3}, [0])
    return df.to_dict()


def process_type(df_dict):
    df_dict = df_dict.replace('\'', "\"")
    df = pd.DataFrame.from_dict(json.loads(df_dict))
    print(df)


with DAG(
        dag_id='DAG_ID',
        default_args=DEFAULT_ARGS,
        start_date=days_ago(1),
        timetable=CronDataIntervalTimetable(cron="0 3 * * *", timezone=settings.TIMEZONE),
        catchup=False,
) as dag:

    read_file = PythonOperator(
        task_id='read_file',
        python_callable=read_file,
    )

    process_type = PythonOperator(
        task_id='process_type',
        python_callable=process_type,
        op_kwargs={"df_dict": "{{ ti.xcom_pull(task_ids='read_file') }}"}

    )

    read_file >> process_type

Upvotes: 1

AFZ84
AFZ84

Reputation: 85

As suggested in a comment, you could use something like redis as a cache. To store a dataframe in such a datastructure store you can serialize your df with tools like pyarrow.

import redis
import pyarrow as pa

redis_conn = redis.Redis(host=redis_host, port=redis_port, db=redis_db)
context = pa.default_serialization_context()



def read_file(**context):
    path = "/opt/airflow/common/netflix_titles.csv"
    df = pd.read_csv(path, encoding="ISO-8859-1")
    r.set("name", context.serialize(df).to_buffer().to_pybytes())

def process_type(**context):
    df = context.deserialize(r.get("name"))
    print(df)


If you don't delete your key in your last task it could persist, so I would suggest to give an unique name which can be created by adding +kwargs['ts_nodash'] after the name and deleting it in your last task.

Upvotes: 0

Related Questions