Reputation: 293
How to solve this problem?? I don't know what to do....
My Dags
def _game():
game_df = pd.DataFrame(columns=['match', 'puuid']
input_data = {'match':match100,'puuid':game_content['puuid']}
game_df = game_df.append(input_data, ignore_index=True)
return game_df
def _game_score():
game_df = _game()
create_data_df = PythonOperator(
task_id = 'create_data_df',
python_callable=_game,
dag=dag
)
data_processing = PythonOperator(
task_id = 'data_processing',
python_callable=_game_score,
dag=dag
)
My Airflow logs
[2022-09-19, 12:58:11 UTC] {python.py:173} INFO - Done. Returned value was: match, puuid
KR_6078406478, CLsxdELas_df67weR
[2022-09-19, 12:58:11 UTC] {xcom.py:586} ERROR - Could not serialize the XCom value into JSON. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your *** config.
[2022-09-19, 12:58:11 UTC] {taskinstance.py:1909} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 68, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 2412, in xcom_push
session=session,
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 68, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom.py", line 198, in set
map_index=map_index,
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom.py", line 583, in serialize_value
return json.dumps(value).encode('UTF-8')
File "/usr/local/lib/python3.7/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/usr/local/lib/python3.7/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/local/lib/python3.7/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type DataFrame is not JSON serializable
Then error occur _game(): return game_df What Mean json serializable? My DataFrame is connected Json? And how to solve this
Upvotes: 1
Views: 7791
Reputation: 2519
In your airflow.cfg file set, enable_xcom_pickling to True which serialises xcom to JSON.
Upvotes: 0
Reputation: 24633
well, airflow is a workflow manager, so it has limited capabilities to transfer data between tasks and you should not passing data between tasks.
so xcom doesn't support exchanging non serialized data type like dataframes (read more about Python pickle here),
but if you must , you need to make custom xcom backend
.
for more information , see here
Upvotes: 2