Reputation: 7245
I need to split a pyspark
dataframe df
and save the different chunks.
This is what I am doing: I define a column id_tmp
and I split the dataframe based on that.
chunk = 10000
id1 = 0
id2 = chunk
df = df.withColumn('id_tmp', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
c = df.count()
while id1 < c:
stop_df = df.filter( (tmp.id_tmp < id2) & (tmp.id_tmp >= id1))
stop_df.write.format('com.databricks.spark.csv').save('myFolder/')
id1+=chunk
id2+=chunk
Is there a more efficient way without defining the column id_tmp
Upvotes: 5
Views: 24298
Reputation: 3824
I suggest you to use the partitionBy
method from the DataFrameWriter
interface built-in Spark (docs). Here is an example.
Given the df
DataFrame, the chuck identifier needs to be one or more columns. In my example id_tmp
. The following snippet generates a DF with 12 records with 4 chunk ids.
import pyspark.sql.functions as F
df = spark.range(0, 12).withColumn("id_tmp", F.col("id") % 4).orderBy("id_tmp")
df.show()
Returns:
+---+------+
| id|id_tmp|
+---+------+
| 8| 0|
| 0| 0|
| 4| 0|
| 1| 1|
| 9| 1|
| 5| 1|
| 6| 2|
| 2| 2|
| 10| 2|
| 3| 3|
| 11| 3|
| 7| 3|
+---+------+
To save each chunk indepedently you need:
(df
.repartition("id_tmp")
.write
.partitionBy("id_tmp")
.mode("overwrite")
.format("csv")
.save("output_folder"))
repartition
will shuffle the records so that each node has a complete set of records for one "id_tmp" value. Then each chunk is written to one file with the partitionBy
.
Resulting folder structure:
output_folder/
output_folder/._SUCCESS.crc
output_folder/id_tmp=0
output_folder/id_tmp=0/.part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv.crc
output_folder/id_tmp=0/part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv
output_folder/id_tmp=1
output_folder/id_tmp=1/.part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv.crc
output_folder/id_tmp=1/part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv
output_folder/id_tmp=2
output_folder/id_tmp=2/.part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv.crc
output_folder/id_tmp=2/part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv
output_folder/id_tmp=3
output_folder/id_tmp=3/.part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv.crc
output_folder/id_tmp=3/part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv
output_folder/_SUCCESS
The size and number of partitions are quite important for Spark's performance. Don't partition the dataset too much and have reasonable file sizes (like 1GB per file) especially if you are using cloud storage services. It is also advised to use the partition variables if you want to filter the data when loading (i.e.: year=YYYY/month=MM/day=DD)
Upvotes: 4