Reputation: 111
I want to write data from a directory to partitions dynamically using Spark. Here is the sample code.
val input_DF = spark.read.parquet("input path")
input_DF.write.mode("overwrite").partitionBy("colname").parquet("output path...")
As shown below, No of records per each key are different and there is skew for a key. input_DF.groupBy($"colname").agg(count("colname")).show()
+-----------------+------------------------+
|colname |count(colname) |
+-----------------+------------------------+
| NA| 14859816| --> More no of records
| A| 2907930|
| D| 1118504|
| B| 485151|
| C| 435305|
| F| 370095|
| G| 170060|
+-----------------+------------------------+
Because of this, job is failing when reasonable memory (8GB) is given for each executor. Job is completing successfully when high memory (15GB) per each executor is given, but taking too long to complete.
I have tried using repartition expecting it will distribute data evenly across partitions. But, as it uses default HashPartitioner, records of a key goes to a single partition.
repartition(num partition,$"colname") --> Creates HashPartition
But this is creating num part files as mentioned in repartiton, but moving all records of a key to a partition (all records with col value NA goes to a partition). Remaining part files have no records (only Parquet metadata,38364 bytes).
-rw-r--r-- 2 hadoop hadoop 0 2017-11-20 14:29 /user/hadoop/table/_SUCCESS
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00000-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00001-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00002-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00003-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:07 /user/hadoop/table/part-r-00004-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00005-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00006-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 1038264502 2017-11-20 13:20 /user/hadoop/table/part-r-00007-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00008-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00009-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00010-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00011-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00012-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00013-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00014-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 128212247 2017-11-20 13:09 /user/hadoop/table/part-r-00015-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00016-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00017-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00018-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 117142244 2017-11-20 13:08 /user/hadoop/table/part-r-00019-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00020-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 347033731 2017-11-20 13:11 /user/hadoop/table/part-r-00021-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00022-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00023-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00024-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 100306686 2017-11-20 13:08 /user/hadoop/table/part-r-00025-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 36961707 2017-11-20 13:07 /user/hadoop/table/part-r-00026-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00027-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00028-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00029-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00030-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00031-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00032-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00033-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00034-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00035-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00036-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:07 /user/hadoop/table/part-r-00037-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00038-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00039-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00040-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00041-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 68859 2017-11-20 13:06 /user/hadoop/table/part-r-00042-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 4031720288 2017-11-20 14:29 /user/hadoop/table/part-r-00043-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00044-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00045-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00046-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00047-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00048-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00049-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
-rw-r--r-- 2 hadoop hadoop 38634 2017-11-20 13:06 /user/hadoop/table/part-r-00050-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
I would like to know
Is there a way to write same key records to different partitions of DataFrame/RDD? Probably custom partitioner to write every Nth record to Nth partition
(1st rec to partition 1)
(2nd rec to partition 2)
(3rd rec to partition 3)
(4th rec to partition 4)
(5th rec to partition 1)
(6th rec to partition 2)
(7th rec to partition 3)
(8th rec to partition 4)
If yes, can it be controlled using parameters like max no of bytes per partition of DataFrame/RDD.
As the expected result is just writing data into different sub-directories (partitions for Hive) based on a key, I would like to write data by distributing records of a key to multiple tasks, each writing a part file under sub-directory.
Upvotes: 0
Views: 142
Reputation: 111
The problem was resolved when reparation was done on unique key, instead of the key which was used in "partitionBy". If the dataFrame is missing a unique for some reason, one can add a sudo column using
df.withColumn("Unique_ID", monotonicallyIncreasingId)
and then reparation on "Unique_ID", this way we can distribute data evenly to multiple partitions. To increase performance further, data can be sorted with in DataFrame partition on a key used for join/group/partition
Upvotes: 0