Reputation: 3
I want to read and make a plot with airflow. The first task is successfully run, but the second (creating the plot) failed. The error says a task exited with the return code Negsignal.SIGABRT and also zombie jobs detected. I have googled, and it say related to GPU and memory, but I am sure my memory is enough.
I want it to successfully run.
Here is the code
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import logging
# Define function to read the CSV file
def read_insurance_csv(**kwargs):
try:
# Path to the CSV file
file_path = '/Users/xxx/airflow/dags/insurance_2.csv'
# Read the CSV file
insurance_df = pd.read_csv(file_path)
# Save the DataFrame to a CSV file for verification
insurance_df.to_csv('/Users/xxx/airflow/dags/insurance_read.csv', index=False)
# Push the DataFrame to XCom
kwargs['ti'].xcom_push(key='insurance_df', value=insurance_df.to_json())
# Log the DataFrame's head (first few rows) to verify the content
logging.info("Successfully read insurance.csv and pushed to XCom.")
print(insurance_df.head())
except Exception as e:
logging.error(f"Error reading insurance.csv: {e}")
raise
# Define function to create plots
def create_plots(**kwargs):
try:
# Pull the DataFrame from XCom
ti = kwargs['ti']
insurance_json = ti.xcom_pull(task_ids='read_insurance_csv', key='insurance_df')
df = pd.read_json(insurance_json)
# Save the DataFrame to a CSV file for verification
df.to_csv('/Users/xxx/airflow/dags/insurance_for_plotting.csv', index=False)
# Define the plotting code
numerical_features = ['age', 'bmi']
numerical_discrete = ['children']
categorical_features = ['sex', 'smoker', 'region']
# Create a figure and axes
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
# Iterate over numerical features
for i, num_feature in enumerate(numerical_features):
# Scatter plots for numerical vs numerical
sns.scatterplot(x=num_feature, y='charges', data=df, ax=axes[0, i])
axes[0, i].set_title(f'Charges vs {num_feature}')
for i, num_feature in enumerate(numerical_discrete):
# Scatter plots for numerical vs numerical
sns.boxplot(x=num_feature, y='charges', data=df, ax=axes[0, 2])
axes[0, 2].set_title(f'Charges vs {num_feature}')
# Iterate over categorical features
for i, cat_feature in enumerate(categorical_features):
# Kernel density plots for numerical vs categorical
sns.kdeplot(x='charges', hue=cat_feature, data=df, common_norm=False, ax=axes[1, i])
axes[1, i].set_title(f'Charges by {cat_feature}')
plt.tight_layout()
plt.savefig('/Users/xxx/airflow/dags/insurance_plots.png')
plt.show()
logging.info("Successfully created and saved plots.")
except Exception as e:
logging.error(f"Error creating plots: {e}")
raise
# Define default arguments for the DAG
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 4, 4),
'retries': 1,
}
# Define the DAG
dag = DAG(
'read_and_plot_insurance_csv_dag',
default_args=default_args,
description='A DAG for reading the insurance CSV file and creating plots',
schedule_interval=None, # Set to None for manual triggering
)
# Define the task to read the CSV file
read_insurance_csv_task = PythonOperator(
task_id='read_insurance_csv',
python_callable=read_insurance_csv,
provide_context=True,
dag=dag,
)
# Define the task to create plots
create_plots_task = PythonOperator(
task_id='create_plots',
python_callable=create_plots,
provide_context=True,
dag=dag,
)
# Set task dependencies
read_insurance_csv_task >> create_plots_task
Upvotes: 0
Views: 442
Reputation: 130
Airflow is not a proper tool for computation. The error is most likely occurring either due to the initial size of the data uploaded to the worker or while transferring it using XCOM
Consider using third-party computing resources like VMs, containers, or any other cluster.
If your DataFrame is small, you could manage with Airflow's resources, but there is no need to separate the read_insurance_csv and create_plots methods.Just combine them
I highly recommend checking out the Airflow 101 course to familiarize yourself with the basic concepts: https://academy.astronomer.io/path/airflow-101
Upvotes: 0