Reputation: 41
I'm trying to work on a Fraud Detection dataset from kaggle Credit Card Transactions Fraud Detection Dataset
I'm working on PySpark and wish to apply Undersampling techniques using PySpark. However, I can't find any articles or documentations that highlight on implementing intelligent Undersampling techniques like NearMiss, TomekLinks, ClusterCentroids, ENN etc on Spark ML. Moreover, going with random undersampling doesn't make much sense as it can lead to missing out on some important information.
I tried various approaches. However, I couldn't make it work. I would be glad if someone can help me on this.
**Please note, I don't want to convert my Spark DataFrame to Pandas DataFrame to apply these techniques as I want to make use of Spark distributed processing capabilities. **
(My priority is to be able to perform it via Spark. Though, if there are some suggestions/code that works with Pandas API on Spark, I will be happy to know about that)
This is what I've tried so far.....
I was able to succesfully implement in NearMiss and TomekLinks techniques on a Pandas DataFrame. Proceeding ahead with this, I tried to implement the same using a DataFrame built using Pandas API on Spark (i.e. pyspark.pandas). However, it failed due to incompatibilities of internal libraries used in the imblearn implementations of NearMiss and TomekLinks.
I also tried building a custom function that will mimic the steps performed in TomekLinks, it also failed and I wasn't able to solve it.
import numpy as np
from sklearn.neighbors import NearestNeighbors
def remove_tomek_links(X, y):
# Calculate pairwise distances
dist = np.sqrt(np.sum((X[:, np.newaxis, :] - X[np.newaxis, :, :]) ** 2, axis=-1))
# Find nearest neighbors of minority instances
nn = NearestNeighbors(n_neighbors=2)
nn.fit(X[y == 1])
minority_nn_dist, minority_nn = nn.kneighbors(X[y == 1])
# Check for Tomek Links
majority_nn = nn.kneighbors(X[y == 0], return_distance=False)[:, 1]
tomek_links = np.logical_and(y[majority_nn] == 1, np.in1d(minority_nn[:, 0], majority_nn))
# Remove Tomek Links
X = np.delete(X, np.concatenate((np.where(tomek_links)[0], minority_nn[tomek_links, 0])), axis=0)
y = np.delete(y, np.concatenate((np.where(tomek_links)[0], minority_nn[tomek_links, 0])), axis=0)
return X, y
I planned going back to Spark DataFrame and find some implementations of those techniques, but no luck. I queries ChatGPT to implement the same, here are some implementations I was returned. At the end, they didn't work as well even after multiple attempts to fix it in case of any mistake in code.
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.functions import udf, array, col, size, sum, sqrt, array_sort, struct
from pyspark.sql.types import IntegerType, BooleanType
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import monotonically_increasing_id
def compute_distance(u, v):
return float(sqrt(float(sum((u - v) ** 2))))
compute_distance_udf = udf(compute_distance)
def near_miss_2(spark_df, features_col, label_col, sampling_strategy=None):
assembler = VectorAssembler(inputCols=features_col, outputCol="features")
df_with_features = assembler.transform(spark_df)
# Get class distribution
class_distribution = df_with_features.groupBy(label_col).count().toPandas()
# Find majority and minority classes
majority_class = class_distribution[label_col][class_distribution['count'].idxmax()]
minority_class = class_distribution[label_col][class_distribution['count'].idxmin()]
# Filter minority and majority class
minority_df = df_with_features.filter(col(label_col) == minority_class)
majority_df = df_with_features.filter(col(label_col) == majority_class)
# Compute pairwise distance between all instances
dist_udf = udf(lambda x,y: compute_distance_udf(x,y), FloatType())
pairwise_dist_df = minority_df.crossJoin(majority_df)\
.withColumn("distance", dist_udf(col("features"), col("features_1")))\
.select(col("features"), col(label_col), col("features_1"), col("distance"))
# Sort neighbors by distance
sorted_neighbors_df = pairwise_dist_df.groupBy(col("features"))\
.agg(array_sort(collect_list(struct(col("distance"), col(label_col), col("features_1")))).alias("sorted_neighbors"))\
.select(col("features"), col(label_col), col("sorted_neighbors"))
# Find k nearest neighbors
k = 3
find_k_neighbors_udf = udf(lambda x: x[:k], ArrayType(VectorUDT()))
k_neighbors_df = sorted_neighbors_df.withColumn("k_neighbors", find_k_neighbors_udf(col("sorted_neighbors")))\
.select(col("features"), col(label_col), col("k_neighbors"))
# Find the majority class neighbors
majority_neighbors_df = k_neighbors_df.filter(col("k_neighbors")[0]["label"] == majority_class)
# Find the remaining minority class examples
minority_remaining_df = k_neighbors_df.filter(col("k_neighbors")[0]["label"] == minority_class)\
.select(col("features"))
# Select samples that are closer to the minority class
selected_samples_df = majority_neighbors_df.select("features")\
.union(minority_remaining_df)\
.distinct()
if sampling_strategy is not None:
# Compute the number of samples to select from the majority class based on the sampling strategy
majority_count = class_distribution.loc[majority_class]['count']
minority_count = class_distribution.loc[minority_class]['count']
desired_ratio = sampling_strategy[minority_class]
majority_samples = int(minority_count * desired_ratio - minority_count)
# If the number of samples to select from the majority class is negative, select all the majority class samples
if majority_samples < 0:
selected_majority_df = majority_df
else:
# Randomly select
At the end, I also tried implementations of other algorithms like ENN, ClusterCentroid, NearMiss, but none of them worked.
I'm using Fraud Detection Dataset from Kaggle Credit Card Transactions Fraud Detection Dataset
Can anyone help me on this to implement atleast one of these techniques for Class Balancing?
PS: I couldn't find any library in Spark ML for this, if there's any please suggest.
Thank You!!!
Upvotes: 3
Views: 312