bioinfornatics
bioinfornatics

Reputation: 1808

How to apply a repartition using a window in pyspark?

I have a column Foo which contains double value such as:

[ 100.4, 39.6, 98.2, 10.8, 62.1, 69.6 … ]

I would like to repartition using a window of 10 which would generate a dataset something like:

Foo=10
Foo=20
Foo=30
Foo=40
Foo=50
Foo=60
Foo=70
Foo=80
Foo=90
Foo=100
Foo=110

The use of repartiton(number: int, colname: str) split the dataframe to the given number of files. But I can not choose the window.

So how to do it in pyspark ?

thanks

Upvotes: 0

Views: 283

Answers (2)

murtihash
murtihash

Reputation: 8410

Adding to Daniel's answer.

+-----+----------+
|  Foo|Foo_binned|
+-----+----------+
|100.4|       100|
| 39.6|        30|
| 98.2|        90|
| 10.8|        10|
| 62.1|        60|
| 69.6|        60|
+-----+----------+

This will ensure that for every foo range, you get 1 file.

from pyspark.sql import functions as F
n = df.select(F.col('Foo_binned')).distinct().count()

data.repartition(n)\
     .write \
     .partitionBy("Foo_binned")\
     .csv(path)

Upvotes: 1

Daniel
Daniel

Reputation: 1242

I'm not sure what you do mean by repartitioning, but in any case, assuming you have a df of:

+-----+
|  Foo|
+-----+
|100.4|
| 39.6|
| 98.2|
| 10.8|
| 62.1|
| 69.6|
+-----+

You can easily round your values:

from pyspark.sql.functions import col, floor
df2 = df.withColumn('Foo_binned', floor(col('Foo') / 10) * 10)
+-----+----------+
|  Foo|Foo_binned|
+-----+----------+
|100.4|       100|
| 39.6|        30|
| 98.2|        90|
| 10.8|        10|
| 62.1|        60|
| 69.6|        60|
+-----+----------+

If this is a result you are looking for, you can select / rename just the new column. You can also just change the method for rounding dependently on your requirements (floor, round, ceil).

If by repartitioning you actually want to physically save the values in different folders based on the bucketing into 10's, you can run:

df2.write.partitionBy('Foo_binned').csv('./foos.csv')

Which will partition the data while saving:

30.03.2020  23:05                 8 ._SUCCESS.crc
30.03.2020  23:05    <DIR>          Foo_binned=10
30.03.2020  23:05    <DIR>          Foo_binned=100
30.03.2020  23:05    <DIR>          Foo_binned=30
30.03.2020  23:05    <DIR>          Foo_binned=60
30.03.2020  23:05    <DIR>          Foo_binned=90
30.03.2020  23:05                 0 _SUCCESS

Last but not least, if you just want your in-memory data partitioned by those buckets, it's pretty hard to achieve, because, well, you shouldn't be doing that. Spark includes an optimization engine that will do it's best when you just let it

df = spark.createDataFrame([ (100.2,), (100.1,), (100.7,), (100.4,), (39.6, ), (39.6, ), (39.6, ), (39.6, ), (98.2, ), (10.8, ), (10.2, ), (10.8, ), (10.8, ), (62.1, ), (69.6, )], ['Foo'])
df2 = df.repartitionByRange('Foo')
print('No of partitions', df2.rdd.getNumPartitions())

No of partitions 8

Upvotes: 1

Related Questions