Reputation: 749
I am trying to code an ML algorithm from scratch in Spark and I am having an issue in setting up a histogram of each feature across each partition.
The goal is have some final variable N
, and get the max
and min
of each column within each partition. Then I want to map the rows to bucket them in the N
bins with the bin length as (max - min)/N
. I have tried mapWithIndex
to get the max
but then I am not sure how to tie this back to a map
function and make sure the correct max
is connected with the correct partition.
Upvotes: 0
Views: 328
Reputation: 1669
Try following code below:
Suppose we are going to use N=3
bins for each partition, here's my DataFrame:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
N = 3
values = [
(1, 5),
(2, 13),
(3, 25),
(4, 30),
(5, 38),
(6, 50),
(7, 11),
(8, 73),
(9, 48),
(10, 65),
(11, 55),
(12, 42)
]
columns = ['ID', 'Amount']
df=spark.createDataFrame(values, columns)
df.show()
The DataFrame looks like below:
+---+------+
| ID|Amount|
+---+------+
| 1| 5|
| 2| 13|
| 3| 25|
| 4| 30|
| 5| 38|
| 6| 50|
| 7| 11|
| 8| 73|
| 9| 48|
| 10| 65|
| 11| 55|
| 12| 42|
+---+------+
Let's repartition the DataFrame to 3 partitions so we don't have too many of them:
df = df.repartition(3)
After this, we first get the partition ID for each row:
df = df.withColumn('pid', F.spark_partition_id())
Compute the maximum and minimum Amount
in each partition, and use them to compute the proper bin_length
.
df = df.withColumn('max_a', F.max(col('Amount')).over(Window.partitionBy('pid')))
df = df.withColumn('min_a', F.min(col('Amount')).over(Window.partitionBy('pid')))
df = df.withColumn('bin_len', (df['max_a'] - df['min_a'])/N)
Now we can compute the distance of each row to the first row within each partition, and use that to compute the bucket number. Here I assume the bucket numbers start with 1.
df = df.withColumn('diff_a', F.col('Amount')-F.first('Amount').over(Window.partitionBy('pid').orderBy('Amount')))
df = df.withColumn('bucket', F.floor(F.col('diff_a')/F.col('bin_len')))
df = df.withColumn('bucket', F.when(col('bucket')==N, col('bucket')).otherwise(col('bucket')+1))
df.show()
The final output is:
+---+------+---+-----+-----+------------------+------+------+
| ID|Amount|pid|max_a|min_a| bin_len|diff_a|bucket|
+---+------+---+-----+-----+------------------+------+------+
| 1| 5| 1| 73| 5|22.666666666666668| 0| 1|
| 2| 13| 1| 73| 5|22.666666666666668| 8| 1|
| 5| 38| 1| 73| 5|22.666666666666668| 33| 2|
| 8| 73| 1| 73| 5|22.666666666666668| 68| 3|
| 3| 25| 2| 65| 25|13.333333333333334| 0| 1|
| 4| 30| 2| 65| 25|13.333333333333334| 5| 1|
| 12| 42| 2| 65| 25|13.333333333333334| 17| 2|
| 10| 65| 2| 65| 25|13.333333333333334| 40| 3|
| 7| 11| 0| 55| 11|14.666666666666666| 0| 1|
| 9| 48| 0| 55| 11|14.666666666666666| 37| 3|
| 6| 50| 0| 55| 11|14.666666666666666| 39| 3|
| 11| 55| 0| 55| 11|14.666666666666666| 44| 3|
+---+------+---+-----+-----+------------------+------+------+
You can see that now the DataFrame is first grouped by pid
, then sorted by Amount
within each group. If you check the pid==1
group, minimum Amount = 5, maximum Amount = 73, bin length = (73-5)/3 = 22.66666. The minimum 5 should fall to bucket #1, the maximum 73 should fall to bucket #3, number 38 (which is between 27.666 and 50.33333) falls to bucket #2.
Upvotes: 1