espogian
espogian

Reputation: 607

Binning a numerical column with PySpark

I have a PySpark DataFrame df which has a numerical column (with NaNs)

+-------+
|numbers|
+-------+
| 142.56|
|       |
|2023.33|
| 477.76|
| 175.52|
|1737.45|
| 520.72|
|  641.2|
|   79.3|
| 138.43|
+-------+

I want to create a new column which defines some bins, e.g. 0, (0, 500], (500, 1000], (1000, inf)

Is there a way to accomplish this using a function like pandas.cut? At the moment the way I do this with PySpark is by defining an udf function as follows, but this approach has the disadvantage to be tedious and non-parametric

from pyspark.sql import functions as F
from pyspark.sql.types import *

def func(numbers):
    if numbers==0:
        return '0'
    elif numbers>0 and numbers<=500:
        return '(0, 500]'
    elif numbers>500 and numbers<=1000:
        return '(500, 1000]'
    elif numbers>500:
        return '(500, inf)'
    else return 'Other'

func_udf = F.udf(func, StringType())

df.withColumn('numbers_bin', func_udf(df['numbers']))

If df was a Pandas DataFrame, I would have used this approach:

df['numbers_bin'] = pd.cut(
    df['numbers'],
    np.concatenate((-np.inf, [0, 500, 1000], np.inf), axis=None))

Which is way more cleaner and modular

Upvotes: 3

Views: 5810

Answers (2)

Samir Hinojosa
Samir Hinojosa

Reputation: 825

Going a little further

from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import udf, broadcast, col, count

BINS = [-float('inf'), 0, 500, 1000, float('inf')]

df_final = Bucketizer(
                splits=BINS,
                inputCol="numbers",
                outputCol="bin_number"
    ).transform(df)

df_final.show()
+-------+----------+
|numbers|bin_number|
+-------+----------+
| 142.56|       1.0|
|   null|      null|
|2023.33|       3.0|
| 477.76|       1.0|
| 175.52|       1.0|
|1737.45|       3.0|
| 520.72|       2.0|
|  641.2|       2.0|
|   79.3|       1.0|
| 138.43|       1.0|
+-------+----------+

Now, let's add the intervals for each row

intervals = []
for i in range(0, len(BINS)-1):
    intervals.append(f"({BINS[i]}, {BINS[i+1]}]")
print(intervals)
['(-inf, 0]', '(0, 500]', '(500, 1000]', '(1000, inf]']

I use broadcast to make sure to send the list to all the nodes in the cluster

mapping = spark.sparkContext.broadcast(intervals)

def get_bins(values):
    def f(x):
        if x is None:
            return values[int(0)]
        else:
            return values[int(x)]
    return udf(f)

df_final = df_final.withColumn("interval", get_bins(mapping.value)(col("bin_number")))
df_final.show()
+-------+----------+-----------+
|numbers|bin_number|   interval|
+-------+----------+-----------+
| 142.56|       1.0|   (0, 500]|
|   null|      null|  (-inf, 0]|
|2023.33|       3.0|(1000, inf]|
| 477.76|       1.0|   (0, 500]|
| 175.52|       1.0|   (0, 500]|
|1737.45|       3.0|(1000, inf]|
| 520.72|       2.0|(500, 1000]|
|  641.2|       2.0|(500, 1000]|
|   79.3|       1.0|   (0, 500]|
| 138.43|       1.0|   (0, 500]|
+-------+----------+-----------+

Finally, we can count for each interval

df_final = df_final.groupBy("interval").agg(count("interval").alias("count")).orderBy(col("count").asc())
df_final.show()
+-----------+-----+
|   interval|count|
+-----------+-----+
|  (-inf, 0]|    1|
|(1000, inf]|    2|
|(500, 1000]|    2|
|   (0, 500]|    5|
+-----------+-----+

Upvotes: 0

mck
mck

Reputation: 42392

You can use Bucketizer from Spark ML:

from pyspark.ml.feature import Bucketizer

df2 = Bucketizer(
    splits=[-float('inf'), 0, 500, 1000, float('inf')],
    inputCol='numbers',
    outputCol='numbers_bin'
).transform(df)

df2.show()
+-------+-----------+
|numbers|numbers_bin|
+-------+-----------+
| 142.56|        1.0|
|   null|       null|
|2023.33|        3.0|
| 477.76|        1.0|
| 175.52|        1.0|
|1737.45|        3.0|
| 520.72|        2.0|
|  641.2|        2.0|
|   79.3|        1.0|
| 138.43|        1.0|
+-------+-----------+

If you want to show the interval instead:

import pyspark.sql.functions as F

df2 = Bucketizer(
    splits=[-float('inf'), 0, 500, 1000, float('inf')],
    inputCol='numbers', 
    outputCol='numbers_bin'
).transform(df).withColumn(
    'numbers_bin',
    F.expr("""
        format_string(
            '%s, %s',
            array(-float('inf'), 0, 500, 1000, float('inf'))[int(numbers_bin) - 1],
            array(-float('inf'), 0, 500, 1000, float('inf'))[int(numbers_bin)])
    """)
)

df2.show()
+-------+--------------+
|numbers|   numbers_bin|
+-------+--------------+
| 142.56|-Infinity, 0.0|
|   null|    null, null|
|2023.33| 500.0, 1000.0|
| 477.76|-Infinity, 0.0|
| 175.52|-Infinity, 0.0|
|1737.45| 500.0, 1000.0|
| 520.72|    0.0, 500.0|
|  641.2|    0.0, 500.0|
|   79.3|-Infinity, 0.0|
| 138.43|-Infinity, 0.0|
+-------+--------------+

Upvotes: 6

Related Questions