Reputation: 1
Passing panda data frame between airflow tasks failing.Tried the below code
try:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
except Exception as e:
print(e)
default_args = {
"owner": "airflow",
"start_date": datetime(2021, 1, 1),
"retries": 0,
"retry_delay": timedelta(minutes=1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
}
def read_file(**context):
path = "/opt/airflow/common/netflix_titles.csv"
df = pd.read_csv(path, encoding="ISO-8859-1")
context['ti'].xcom_push(key='df', value=df)
def process_type(**context):
df = context.get("ti").xcom_pull(key="df")
print(df)
dag = DAG(dag_id="DAG-READ-CSV", schedule_interval="@once", default_args=default_args, catchup=False)
read_file = PythonOperator(task_id="read_file", python_callable=read_file, dag=dag)
process_type = PythonOperator(task_id="process_title", python_callable=process_type, dag=dag)
read_file >> process_type
Error Details:
*** Reading local file: /opt/airflow/logs/DAG-READ-CSV/read_file/2021-05-15T02:16:50.764650+00:00/1.log
[2021-05-15 02:16:52,600] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: DAG-READ-CSV.read_file 2021-05-15T02:16:50.764650+00:00 [queued]>
[2021-05-15 02:16:52,618] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: DAG-READ-CSV.read_file 2021-05-15T02:16:50.764650+00:00 [queued]>
[2021-05-15 02:16:52,619] {taskinstance.py:1068} INFO -
--------------------------------------------------------------------------------
[2021-05-15 02:16:52,620] {taskinstance.py:1069} INFO - Starting attempt 1 of 1
[2021-05-15 02:16:52,621] {taskinstance.py:1070} INFO -
--------------------------------------------------------------------------------
[2021-05-15 02:16:52,629] {taskinstance.py:1089} INFO - Executing <Task(PythonOperator): read_file> on 2021-05-15T02:16:50.764650+00:00
[2021-05-15 02:16:52,634] {standard_task_runner.py:52} INFO - Started process 527 to run task
[2021-05-15 02:16:52,639] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'DAG-READ-CSV', 'read_file', '2021-05-15T02:16:50.764650+00:00', '--job-id', '197', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/airflow_practice_7_practice_read_csv.py', '--cfg-path', '/tmp/tmp5jr0dror', '--error-file', '/tmp/tmpn9o4ulj3']
[2021-05-15 02:16:52,644] {standard_task_runner.py:77} INFO - Job 197: Subtask read_file
[2021-05-15 02:16:52,696] {logging_mixin.py:104} INFO - Running <TaskInstance: DAG-READ-CSV.read_file 2021-05-15T02:16:50.764650+00:00 [running]> on host f526bca85af4
[2021-05-15 02:16:52,745] {taskinstance.py:1283} INFO - Exporting the following env vars:
[email protected]
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=DAG-READ-CSV
AIRFLOW_CTX_TASK_ID=read_file
AIRFLOW_CTX_EXECUTION_DATE=2021-05-15T02:16:50.764650+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-05-15T02:16:50.764650+00:00
[2021-05-15 02:16:52,766] {xcom.py:238} ERROR - Could not serialize the XCom value into JSON. If you are using pickles instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config.
[2021-05-15 02:16:52,767] {taskinstance.py:1482} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 117, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 128, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/airflow_practice_7_practice_read_csv.py", line 23, in read_file
context['ti'].xcom_push(key='df', value=df)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1928, in xcom_push
session=session,
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/xcom.py", line 88, in set
value = XCom.serialize_value(value)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/xcom.py", line 235, in serialize_value
return json.dumps(value).encode('UTF-8')
File "/usr/local/lib/python3.6/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/usr/local/lib/python3.6/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/local/lib/python3.6/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/local/lib/python3.6/json/encoder.py", line 180, in default
o.__class__.__name__)
TypeError: Object of type 'DataFrame' is not JSON serializable
[2021-05-15 02:16:52,772] {taskinstance.py:1532} INFO - Marking task as FAILED. dag_id=DAG-READ-CSV, task_id=read_file, execution_date=20210515T021650, start_date=20210515T021652, end_date=20210515T021652
[2021-05-15 02:16:52,814] {local_task_job.py:146} INFO - Task exited with return code 1
Upvotes: 0
Views: 2906
Reputation: 71
Although it's not recommended to pass large objects with XCOM (taking up storage in the metadata database) You can try pass the DataFrame as dict like that:
import pandas as pd
import json
from airflow import DAG, settings
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.timetables.interval import CronDataIntervalTimetable
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
}
def read_file():
df = pd.DataFrame({"a": 1, "b": 2, "c": 3}, [0])
return df.to_dict()
def process_type(df_dict):
df_dict = df_dict.replace('\'', "\"")
df = pd.DataFrame.from_dict(json.loads(df_dict))
print(df)
with DAG(
dag_id='DAG_ID',
default_args=DEFAULT_ARGS,
start_date=days_ago(1),
timetable=CronDataIntervalTimetable(cron="0 3 * * *", timezone=settings.TIMEZONE),
catchup=False,
) as dag:
read_file = PythonOperator(
task_id='read_file',
python_callable=read_file,
)
process_type = PythonOperator(
task_id='process_type',
python_callable=process_type,
op_kwargs={"df_dict": "{{ ti.xcom_pull(task_ids='read_file') }}"}
)
read_file >> process_type
Upvotes: 1
Reputation: 85
As suggested in a comment, you could use something like redis as a cache. To store a dataframe in such a datastructure store you can serialize your df with tools like pyarrow.
import redis
import pyarrow as pa
redis_conn = redis.Redis(host=redis_host, port=redis_port, db=redis_db)
context = pa.default_serialization_context()
def read_file(**context):
path = "/opt/airflow/common/netflix_titles.csv"
df = pd.read_csv(path, encoding="ISO-8859-1")
r.set("name", context.serialize(df).to_buffer().to_pybytes())
def process_type(**context):
df = context.deserialize(r.get("name"))
print(df)
If you don't delete your key in your last task it could persist, so I would suggest to give an unique name which can be created by adding +kwargs['ts_nodash']
after the name and deleting it in your last task.
Upvotes: 0