Collin Cunningham
Collin Cunningham

Reputation: 749

How do you utilize partition information in a map on each row?

I am trying to code an ML algorithm from scratch in Spark and I am having an issue in setting up a histogram of each feature across each partition.

The goal is have some final variable N, and get the max and min of each column within each partition. Then I want to map the rows to bucket them in the N bins with the bin length as (max - min)/N. I have tried mapWithIndex to get the max but then I am not sure how to tie this back to a map function and make sure the correct max is connected with the correct partition.

Upvotes: 0

Views: 328

Answers (1)

niuer
niuer

Reputation: 1669

Try following code below: Suppose we are going to use N=3 bins for each partition, here's my DataFrame:

from pyspark.sql.window import Window
from pyspark.sql import functions as F

N = 3
values = [
(1, 5),
(2, 13),
(3, 25),
(4, 30),
(5, 38),
(6, 50),
(7, 11),
(8, 73),
(9, 48),    
(10, 65),
(11, 55),
(12, 42)
]
columns = ['ID', 'Amount']
df=spark.createDataFrame(values, columns)
df.show()

The DataFrame looks like below:

+---+------+
| ID|Amount|
+---+------+
|  1|     5|
|  2|    13|
|  3|    25|
|  4|    30|
|  5|    38|
|  6|    50|
|  7|    11|
|  8|    73|
|  9|    48|
| 10|    65|
| 11|    55|
| 12|    42|
+---+------+

Let's repartition the DataFrame to 3 partitions so we don't have too many of them:

df = df.repartition(3)

After this, we first get the partition ID for each row:

df = df.withColumn('pid', F.spark_partition_id())

Compute the maximum and minimum Amount in each partition, and use them to compute the proper bin_length.

df = df.withColumn('max_a', F.max(col('Amount')).over(Window.partitionBy('pid')))
df = df.withColumn('min_a', F.min(col('Amount')).over(Window.partitionBy('pid')))
df = df.withColumn('bin_len', (df['max_a'] - df['min_a'])/N)

Now we can compute the distance of each row to the first row within each partition, and use that to compute the bucket number. Here I assume the bucket numbers start with 1.

df = df.withColumn('diff_a', F.col('Amount')-F.first('Amount').over(Window.partitionBy('pid').orderBy('Amount')))
df = df.withColumn('bucket', F.floor(F.col('diff_a')/F.col('bin_len')))
df = df.withColumn('bucket', F.when(col('bucket')==N, col('bucket')).otherwise(col('bucket')+1))
df.show()

The final output is:

+---+------+---+-----+-----+------------------+------+------+
| ID|Amount|pid|max_a|min_a|           bin_len|diff_a|bucket|
+---+------+---+-----+-----+------------------+------+------+
|  1|     5|  1|   73|    5|22.666666666666668|     0|     1|
|  2|    13|  1|   73|    5|22.666666666666668|     8|     1|
|  5|    38|  1|   73|    5|22.666666666666668|    33|     2|
|  8|    73|  1|   73|    5|22.666666666666668|    68|     3|
|  3|    25|  2|   65|   25|13.333333333333334|     0|     1|
|  4|    30|  2|   65|   25|13.333333333333334|     5|     1|
| 12|    42|  2|   65|   25|13.333333333333334|    17|     2|
| 10|    65|  2|   65|   25|13.333333333333334|    40|     3|
|  7|    11|  0|   55|   11|14.666666666666666|     0|     1|
|  9|    48|  0|   55|   11|14.666666666666666|    37|     3|
|  6|    50|  0|   55|   11|14.666666666666666|    39|     3|
| 11|    55|  0|   55|   11|14.666666666666666|    44|     3|
+---+------+---+-----+-----+------------------+------+------+

You can see that now the DataFrame is first grouped by pid, then sorted by Amount within each group. If you check the pid==1 group, minimum Amount = 5, maximum Amount = 73, bin length = (73-5)/3 = 22.66666. The minimum 5 should fall to bucket #1, the maximum 73 should fall to bucket #3, number 38 (which is between 27.666 and 50.33333) falls to bucket #2.

Upvotes: 1

Related Questions