Reputation: 1808
I have a column Foo
which contains double
value such as:
[ 100.4, 39.6, 98.2, 10.8, 62.1, 69.6 … ]
I would like to repartition using a window of 10
which would generate a dataset something like:
Foo=10
Foo=20
Foo=30
Foo=40
Foo=50
Foo=60
Foo=70
Foo=80
Foo=90
Foo=100
Foo=110
The use of repartiton(number: int, colname: str)
split the dataframe to the given number
of files. But I can not choose the window.
So how to do it in pyspark ?
thanks
Upvotes: 0
Views: 283
Reputation: 8410
Adding to Daniel's answer.
+-----+----------+
| Foo|Foo_binned|
+-----+----------+
|100.4| 100|
| 39.6| 30|
| 98.2| 90|
| 10.8| 10|
| 62.1| 60|
| 69.6| 60|
+-----+----------+
This will ensure that for every foo range, you get 1 file.
from pyspark.sql import functions as F
n = df.select(F.col('Foo_binned')).distinct().count()
data.repartition(n)\
.write \
.partitionBy("Foo_binned")\
.csv(path)
Upvotes: 1
Reputation: 1242
I'm not sure what you do mean by repartitioning, but in any case, assuming you have a df
of:
+-----+
| Foo|
+-----+
|100.4|
| 39.6|
| 98.2|
| 10.8|
| 62.1|
| 69.6|
+-----+
You can easily round your values:
from pyspark.sql.functions import col, floor
df2 = df.withColumn('Foo_binned', floor(col('Foo') / 10) * 10)
+-----+----------+
| Foo|Foo_binned|
+-----+----------+
|100.4| 100|
| 39.6| 30|
| 98.2| 90|
| 10.8| 10|
| 62.1| 60|
| 69.6| 60|
+-----+----------+
If this is a result you are looking for, you can select / rename just the new column. You can also just change the method for rounding dependently on your requirements (floor
, round
, ceil
).
If by repartitioning you actually want to physically save the values in different folders based on the bucketing into 10's, you can run:
df2.write.partitionBy('Foo_binned').csv('./foos.csv')
Which will partition the data while saving:
30.03.2020 23:05 8 ._SUCCESS.crc
30.03.2020 23:05 <DIR> Foo_binned=10
30.03.2020 23:05 <DIR> Foo_binned=100
30.03.2020 23:05 <DIR> Foo_binned=30
30.03.2020 23:05 <DIR> Foo_binned=60
30.03.2020 23:05 <DIR> Foo_binned=90
30.03.2020 23:05 0 _SUCCESS
Last but not least, if you just want your in-memory data partitioned by those buckets, it's pretty hard to achieve, because, well, you shouldn't be doing that. Spark includes an optimization engine that will do it's best when you just let it
df = spark.createDataFrame([ (100.2,), (100.1,), (100.7,), (100.4,), (39.6, ), (39.6, ), (39.6, ), (39.6, ), (98.2, ), (10.8, ), (10.2, ), (10.8, ), (10.8, ), (62.1, ), (69.6, )], ['Foo'])
df2 = df.repartitionByRange('Foo')
print('No of partitions', df2.rdd.getNumPartitions())
No of partitions 8
Upvotes: 1