Reputation: 31
The data is stored in parquet format. The parquet files are partitioned based off a partition key column (hash value of the user id column)
userData/
partitionKey=1/
part-00044-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
part-00044-cf737804-90ea-4c37-94f8-9aa016f6953b.c000.snappy.parquet
partitionKey=2/
part-00059-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
partitionKey=3/
part-00002-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
Given the partitioning scheme, we know:
While reading in the data, I want all the data of 1 user to fall into the same spark partition. A single spark partition can have more than 1 users, but it should have all the rows for all those users.
Currently, what I use is: SparkSession.read.parquet("../userData").repartition(200, col("UserId"))
(also tried partitionBy with custom partitioner; The sequence of operations: DataFrame -> RDD -> KeyedRDD -> partitionBy -> RDD -> DataFrame; Before the partitionBy, there is a deserialize to object step that explodes the shuffle write)
Is there a way to avoid the repartition and leverage the input folder structure to place a user's data on a single partition?
Upvotes: 3
Views: 902
Reputation: 844
SparkSession.read.parquet
should automatically infer partitioning information based on your file paths. You can find more information here
If your file paths are:
userData/
UserId=1/
part-00044-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
part-00044-cf737804-90ea-4c37-94f8-9aa016f6953b.c000.snappy.parquet
UserId=2/
part-00059-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
UserId=3/
part-00002-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
When you invoke SparkSession.read.parquet("/path/to/userData")
, it will partition by UserId
.
Upvotes: 0