Ram
Ram

Reputation: 111

How to write records of a same key to multiple files (custom partitioner)

I want to write data from a directory to partitions dynamically using Spark. Here is the sample code.

val input_DF = spark.read.parquet("input path")
input_DF.write.mode("overwrite").partitionBy("colname").parquet("output path...")

As shown below, No of records per each key are different and there is skew for a key. input_DF.groupBy($"colname").agg(count("colname")).show()

+-----------------+------------------------+
|colname          |count(colname)          |
+-----------------+------------------------+
|               NA|                14859816|  --> More no of records
|                A|                 2907930|
|                D|                 1118504|
|                B|                  485151|
|                C|                  435305|
|                F|                  370095|
|                G|                  170060|
+-----------------+------------------------+

Because of this, job is failing when reasonable memory (8GB) is given for each executor. Job is completing successfully when high memory (15GB) per each executor is given, but taking too long to complete.

I have tried using repartition expecting it will distribute data evenly across partitions. But, as it uses default HashPartitioner, records of a key goes to a single partition.

repartition(num partition,$"colname")  --> Creates HashPartition 

But this is creating num part files as mentioned in repartiton, but moving all records of a key to a partition (all records with col value NA goes to a partition). Remaining part files have no records (only Parquet metadata,38364 bytes).

        -rw-r--r--   2 hadoop hadoop          0 2017-11-20 14:29 /user/hadoop/table/_SUCCESS
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00000-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00001-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00002-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00003-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:07 /user/hadoop/table/part-r-00004-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00005-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00006-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop 1038264502 2017-11-20 13:20 /user/hadoop/table/part-r-00007-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00008-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00009-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00010-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00011-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00012-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00013-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00014-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop  128212247 2017-11-20 13:09 /user/hadoop/table/part-r-00015-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00016-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00017-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00018-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop  117142244 2017-11-20 13:08 /user/hadoop/table/part-r-00019-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00020-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop  347033731 2017-11-20 13:11 /user/hadoop/table/part-r-00021-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00022-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00023-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00024-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop  100306686 2017-11-20 13:08 /user/hadoop/table/part-r-00025-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop   36961707 2017-11-20 13:07 /user/hadoop/table/part-r-00026-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00027-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00028-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00029-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00030-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00031-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00032-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00033-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00034-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00035-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00036-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:07 /user/hadoop/table/part-r-00037-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00038-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00039-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00040-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00041-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      68859 2017-11-20 13:06 /user/hadoop/table/part-r-00042-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop 4031720288 2017-11-20 14:29 /user/hadoop/table/part-r-00043-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00044-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00045-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00046-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00047-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00048-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00049-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet
        -rw-r--r--   2 hadoop hadoop      38634 2017-11-20 13:06 /user/hadoop/table/part-r-00050-5736c707-fad4-4ba7-ab38-226cfbc3bf10.snappy.parquet

I would like to know

  1. Is there a way to write same key records to different partitions of DataFrame/RDD? Probably custom partitioner to write every Nth record to Nth partition

    (1st rec to partition 1)
    (2nd rec to partition 2)
    (3rd rec to partition 3)
    (4th rec to partition 4)
    (5th rec to partition 1)
    (6th rec to partition 2)
    (7th rec to partition 3)
    (8th rec to partition 4) 
    
  2. If yes, can it be controlled using parameters like max no of bytes per partition of DataFrame/RDD.

As the expected result is just writing data into different sub-directories (partitions for Hive) based on a key, I would like to write data by distributing records of a key to multiple tasks, each writing a part file under sub-directory.

Upvotes: 0

Views: 142

Answers (1)

Ram
Ram

Reputation: 111

The problem was resolved when reparation was done on unique key, instead of the key which was used in "partitionBy". If the dataFrame is missing a unique for some reason, one can add a sudo column using

df.withColumn("Unique_ID", monotonicallyIncreasingId)

and then reparation on "Unique_ID", this way we can distribute data evenly to multiple partitions. To increase performance further, data can be sorted with in DataFrame partition on a key used for join/group/partition

Upvotes: 0

Related Questions