Siddharth Satpathy
Siddharth Satpathy

Reputation: 3043

Partition PySpark DataFrame depending on unique values in column (Custom Partitioning)

I have a PySpark data frame in which I have separate columns for names, types, days and values. An example of the dataframe can be seen below:

+------+----+---+-----+
|  Name|Type|Day|Value|
+------+----+---+-----+
| name1|   a|  1|  140|
| name2|   a|  1|  180|
| name3|   a|  1|  150|
| name4|   b|  1|  145|
| name5|   b|  1|  185|
| name6|   c|  1|  155|
| name7|   c|  1|  160|
| name8|   a|  2|  120|
| name9|   a|  2|  110|
|name10|   b|  2|  125|
|name11|   b|  2|  185|
|name12|   c|  3|  195|
+------+----+---+-----+

For a selected value of Type, I want to create separate dataframes depending on the unique values of the column titled Day. Let's say, I have chosen a as my preferred Type. In the aforementioned example, I have three unique values of Day (viz. 1, 2 , 3). For each unique value of Day which has a row with the chosen Type a - (that is days 1 and 2 in the above data), I want to create a dataframe which has all rows with the chosen chosen Type and Day. In the example mentioned above, I will have two dataframe which will look as below

+------+----+---+-----+
|  Name|Type|Day|Value|
+------+----+---+-----+
| name1|   a|  1|  140|
| name2|   a|  1|  180|
| name3|   a|  1|  150|
+------+----+---+-----+

and

+------+----+---+-----+
|  Name|Type|Day|Value|
+------+----+---+-----+
| name8|   a|  2|  120|
| name9|   a|  2|  110|
+------+----+---+-----+

How can I do this? In the actual data that I will be working with, I have millions of columns. So, I want to know about the most efficient way in which I can realize the above mentioned aim.

You can use the below mentioned code to generate the example given above.

from pyspark.sql import *
import numpy as np

Stats = Row("Name", "Type", "Day", "Value")

stat1 = Stats('name1', 'a', 1, 140)
stat2 = Stats('name2', 'a', 1, 180)
stat3 = Stats('name3', 'a', 1, 150)
stat4 = Stats('name4', 'b', 1, 145)
stat5 = Stats('name5', 'b', 1, 185)
stat6 = Stats('name6', 'c', 1, 155)
stat7 = Stats('name7', 'c', 1, 160)
stat8 = Stats('name8', 'a', 2, 120)
stat9 = Stats('name9', 'a', 2, 110)
stat10 = Stats('name10', 'b', 2, 125)
stat11 = Stats('name11', 'b', 2, 185)
stat12 = Stats('name12', 'c', 3, 195)

Upvotes: 1

Views: 1738

Answers (1)

pissall
pissall

Reputation: 7419

You can just use df.repartition("Type", "Day")

Docs for the same.

When I validate using the following function, I get the mentioned output

def validate(partition):
    count = 0
    for row in partition:
        print(row)    
        count += 1
    print(count)

My data

+------+--------------------+-------+-------+
|amount|          trans_date|user_id|row_num|
+------+--------------------+-------+-------+
|  99.1|2019-06-04T00:00:...|    101|      1|
| 89.27|2019-06-04T00:00:...|    102|      2|
|  89.1|2019-03-04T00:00:...|    102|      3|
| 73.11|2019-09-10T00:00:...|    103|      4|
|-69.81|2019-09-11T00:00:...|    101|      5|
| 12.51|2018-12-14T00:00:...|    101|      6|
| 43.23|2018-09-11T00:00:...|    101|      7|
+------+--------------------+-------+-------+

After df.repartition("user_id") I get the following:

Output

Row(amount=73.11, trans_date='2019-09-10T00:00:00.000+05:30', user_id='103', row_num=4)
1
Row(amount=89.27, trans_date='2019-06-04T00:00:00.000+05:30', user_id='102', row_num=2)
Row(amount=89.1, trans_date='2019-03-04T00:00:00.000+05:30', user_id='102', row_num=3)
2
Row(amount=99.1, trans_date='2019-06-04T00:00:00.000+05:30', user_id='101', row_num=1)
Row(amount=-69.81, trans_date='2019-09-11T00:00:00.000+05:30', user_id='101', row_num=5)
Row(amount=12.51, trans_date='2018-12-14T00:00:00.000+05:30', user_id='101', row_num=6)
Row(amount=43.23, trans_date='2018-09-11T00:00:00.000+05:30', user_id='101', row_num=7)
4

Upvotes: 2

Related Questions