statsbeginner
statsbeginner

Reputation: 3

Apache Airflow Zombie Job

I want to read and make a plot with airflow. The first task is successfully run, but the second (creating the plot) failed. The error says a task exited with the return code Negsignal.SIGABRT and also zombie jobs detected. I have googled, and it say related to GPU and memory, but I am sure my memory is enough.

I want it to successfully run.

Here is the code

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import logging

# Define function to read the CSV file
def read_insurance_csv(**kwargs):
    try:
        # Path to the CSV file
        file_path = '/Users/xxx/airflow/dags/insurance_2.csv'
        
        # Read the CSV file
        insurance_df = pd.read_csv(file_path)
        
        # Save the DataFrame to a CSV file for verification
        insurance_df.to_csv('/Users/xxx/airflow/dags/insurance_read.csv', index=False)
        
        # Push the DataFrame to XCom
        kwargs['ti'].xcom_push(key='insurance_df', value=insurance_df.to_json())
        
        # Log the DataFrame's head (first few rows) to verify the content
        logging.info("Successfully read insurance.csv and pushed to XCom.")
        print(insurance_df.head())
    except Exception as e:
        logging.error(f"Error reading insurance.csv: {e}")
        raise

# Define function to create plots
def create_plots(**kwargs):
    try:
        # Pull the DataFrame from XCom
        ti = kwargs['ti']
        insurance_json = ti.xcom_pull(task_ids='read_insurance_csv', key='insurance_df')
        df = pd.read_json(insurance_json)
        
        # Save the DataFrame to a CSV file for verification
        df.to_csv('/Users/xxx/airflow/dags/insurance_for_plotting.csv', index=False)
        
        # Define the plotting code
        numerical_features = ['age', 'bmi']
        numerical_discrete = ['children']
        categorical_features = ['sex', 'smoker', 'region']

        # Create a figure and axes
        fig, axes = plt.subplots(2, 3, figsize=(15, 10))

        # Iterate over numerical features
        for i, num_feature in enumerate(numerical_features):
            # Scatter plots for numerical vs numerical
            sns.scatterplot(x=num_feature, y='charges', data=df, ax=axes[0, i])
            axes[0, i].set_title(f'Charges vs {num_feature}')

        for i, num_feature in enumerate(numerical_discrete):
            # Scatter plots for numerical vs numerical
            sns.boxplot(x=num_feature, y='charges', data=df, ax=axes[0, 2])
            axes[0, 2].set_title(f'Charges vs {num_feature}')

        # Iterate over categorical features
        for i, cat_feature in enumerate(categorical_features):
            # Kernel density plots for numerical vs categorical
            sns.kdeplot(x='charges', hue=cat_feature, data=df, common_norm=False, ax=axes[1, i])
            axes[1, i].set_title(f'Charges by {cat_feature}')

        plt.tight_layout()
        plt.savefig('/Users/xxx/airflow/dags/insurance_plots.png')
        plt.show()
        
        logging.info("Successfully created and saved plots.")
    except Exception as e:
        logging.error(f"Error creating plots: {e}")
        raise

# Define default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 4, 4),
    'retries': 1,
}

# Define the DAG
dag = DAG(
    'read_and_plot_insurance_csv_dag',
    default_args=default_args,
    description='A DAG for reading the insurance CSV file and creating plots',
    schedule_interval=None,  # Set to None for manual triggering
)

# Define the task to read the CSV file
read_insurance_csv_task = PythonOperator(
    task_id='read_insurance_csv',
    python_callable=read_insurance_csv,
    provide_context=True,
    dag=dag,
)

# Define the task to create plots
create_plots_task = PythonOperator(
    task_id='create_plots',
    python_callable=create_plots,
    provide_context=True,
    dag=dag,
)

# Set task dependencies
read_insurance_csv_task >> create_plots_task

Upvotes: 0

Views: 442

Answers (1)

Illia Kaltovich
Illia Kaltovich

Reputation: 130

Airflow is not a proper tool for computation. The error is most likely occurring either due to the initial size of the data uploaded to the worker or while transferring it using XCOM

Consider using third-party computing resources like VMs, containers, or any other cluster.

If your DataFrame is small, you could manage with Airflow's resources, but there is no need to separate the read_insurance_csv and create_plots methods.Just combine them

I highly recommend checking out the Airflow 101 course to familiarize yourself with the basic concepts: https://academy.astronomer.io/path/airflow-101

Upvotes: 0

Related Questions