Reputation: 607
I have a PySpark DataFrame df
which has a numerical column (with NaNs)
+-------+
|numbers|
+-------+
| 142.56|
| |
|2023.33|
| 477.76|
| 175.52|
|1737.45|
| 520.72|
| 641.2|
| 79.3|
| 138.43|
+-------+
I want to create a new column which defines some bins, e.g. 0, (0, 500], (500, 1000], (1000, inf)
Is there a way to accomplish this using a function like pandas.cut? At the moment the way I do this with PySpark is by defining an udf function as follows, but this approach has the disadvantage to be tedious and non-parametric
from pyspark.sql import functions as F
from pyspark.sql.types import *
def func(numbers):
if numbers==0:
return '0'
elif numbers>0 and numbers<=500:
return '(0, 500]'
elif numbers>500 and numbers<=1000:
return '(500, 1000]'
elif numbers>500:
return '(500, inf)'
else return 'Other'
func_udf = F.udf(func, StringType())
df.withColumn('numbers_bin', func_udf(df['numbers']))
If df was a Pandas DataFrame, I would have used this approach:
df['numbers_bin'] = pd.cut(
df['numbers'],
np.concatenate((-np.inf, [0, 500, 1000], np.inf), axis=None))
Which is way more cleaner and modular
Upvotes: 3
Views: 5810
Reputation: 825
Going a little further
from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import udf, broadcast, col, count
BINS = [-float('inf'), 0, 500, 1000, float('inf')]
df_final = Bucketizer(
splits=BINS,
inputCol="numbers",
outputCol="bin_number"
).transform(df)
df_final.show()
+-------+----------+
|numbers|bin_number|
+-------+----------+
| 142.56| 1.0|
| null| null|
|2023.33| 3.0|
| 477.76| 1.0|
| 175.52| 1.0|
|1737.45| 3.0|
| 520.72| 2.0|
| 641.2| 2.0|
| 79.3| 1.0|
| 138.43| 1.0|
+-------+----------+
Now, let's add the intervals for each row
intervals = []
for i in range(0, len(BINS)-1):
intervals.append(f"({BINS[i]}, {BINS[i+1]}]")
print(intervals)
['(-inf, 0]', '(0, 500]', '(500, 1000]', '(1000, inf]']
I use broadcast to make sure to send the list to all the nodes in the cluster
mapping = spark.sparkContext.broadcast(intervals)
def get_bins(values):
def f(x):
if x is None:
return values[int(0)]
else:
return values[int(x)]
return udf(f)
df_final = df_final.withColumn("interval", get_bins(mapping.value)(col("bin_number")))
df_final.show()
+-------+----------+-----------+
|numbers|bin_number| interval|
+-------+----------+-----------+
| 142.56| 1.0| (0, 500]|
| null| null| (-inf, 0]|
|2023.33| 3.0|(1000, inf]|
| 477.76| 1.0| (0, 500]|
| 175.52| 1.0| (0, 500]|
|1737.45| 3.0|(1000, inf]|
| 520.72| 2.0|(500, 1000]|
| 641.2| 2.0|(500, 1000]|
| 79.3| 1.0| (0, 500]|
| 138.43| 1.0| (0, 500]|
+-------+----------+-----------+
Finally, we can count for each interval
df_final = df_final.groupBy("interval").agg(count("interval").alias("count")).orderBy(col("count").asc())
df_final.show()
+-----------+-----+
| interval|count|
+-----------+-----+
| (-inf, 0]| 1|
|(1000, inf]| 2|
|(500, 1000]| 2|
| (0, 500]| 5|
+-----------+-----+
Upvotes: 0
Reputation: 42392
You can use Bucketizer
from Spark ML:
from pyspark.ml.feature import Bucketizer
df2 = Bucketizer(
splits=[-float('inf'), 0, 500, 1000, float('inf')],
inputCol='numbers',
outputCol='numbers_bin'
).transform(df)
df2.show()
+-------+-----------+
|numbers|numbers_bin|
+-------+-----------+
| 142.56| 1.0|
| null| null|
|2023.33| 3.0|
| 477.76| 1.0|
| 175.52| 1.0|
|1737.45| 3.0|
| 520.72| 2.0|
| 641.2| 2.0|
| 79.3| 1.0|
| 138.43| 1.0|
+-------+-----------+
If you want to show the interval instead:
import pyspark.sql.functions as F
df2 = Bucketizer(
splits=[-float('inf'), 0, 500, 1000, float('inf')],
inputCol='numbers',
outputCol='numbers_bin'
).transform(df).withColumn(
'numbers_bin',
F.expr("""
format_string(
'%s, %s',
array(-float('inf'), 0, 500, 1000, float('inf'))[int(numbers_bin) - 1],
array(-float('inf'), 0, 500, 1000, float('inf'))[int(numbers_bin)])
""")
)
df2.show()
+-------+--------------+
|numbers| numbers_bin|
+-------+--------------+
| 142.56|-Infinity, 0.0|
| null| null, null|
|2023.33| 500.0, 1000.0|
| 477.76|-Infinity, 0.0|
| 175.52|-Infinity, 0.0|
|1737.45| 500.0, 1000.0|
| 520.72| 0.0, 500.0|
| 641.2| 0.0, 500.0|
| 79.3|-Infinity, 0.0|
| 138.43|-Infinity, 0.0|
+-------+--------------+
Upvotes: 6