Marc Le Bihan
Marc Le Bihan

Reputation: 3304

Apache Spark : Make a multiple columns groupBy on a Dataset works withtin partition

I determine the number of employees and enterprises of activities sectors, city by city:

|codeCommune|nomCommune          |regroupement|section|libelleAPE                                                                                                                       |nombreEntreprises|nombreSalaries|nombreActifs|
+-----------+--------------------+------------+-------+---------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------+------------+
|14654      |Saint-Pierre-en-Auge|84.11Z      |O      |Administration publique générale                                                                                                 |3                |153.5         |169.5       |
|14654      |Saint-Pierre-en-Auge|16.24Z      |C      |Fabrication d'emballages en bois                                                                                                 |1                |149.5         |150.5       |
|14654      |Saint-Pierre-en-Auge|10.11Z      |C      |Transformation et conservation de la viande de boucherie                                                                         |1                |149.5         |150.5       |

with a grouping level (regroupement below) that is set by the user:

+-----------+--------------------+------------+-------+------------------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------+------------+
|codeCommune|nomCommune          |regroupement|section|libelleAPE                                                                                                                                |nombreEntreprises|nombreSalaries|nombreActifs|
+-----------+--------------------+------------+-------+------------------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------+------------+
|14654      |Saint-Pierre-en-Auge|10          |C      |Industries alimentaires                                                                                                                   |16               |208.0         |225.0       |
|14654      |Saint-Pierre-en-Auge|86          |Q      |Activités pour la santé humaine                                                                                                           |46               |169.5         |218.5       |
|14654      |Saint-Pierre-en-Auge|84          |O      |Administration publique et défense ; sécurité sociale obligatoire                                                                         |5                |153.5         |171.5       |

The job is done that way:

  1. From a Dataset of enterprises and establishments, partitioned by a department code (roughly the two first characters of a city code), these columns are selected:

    • city_code,
    • city_name,
    • grouping (the part of the activity code we are retaining: 84.11Z or 84),
    • section (a code summarizing the sector of an activity: industrial, commercial, etc.),
    • activity_description,
    • siren (the enterprise identifier: an enterprise might have many establishments),
    • number_of_workers,
    • number_of_actives_people
  2. a groupBy is done :

RelationalGroupedDataset group = enterprisesAndEstablishments
   .groupBy("city_code", "city_name", "grouping", "section", "activity_description");
  1. I perform my calculations by an aggregation then:
group.agg(countDistinct("siren").as("nombreEntreprises"), 
   sum("number_of_workers").as("nombreSalaries"),
   sum("number_of_actives_people").as("nombreActifs"));

My problem is that the groupBy method doesn't care about the dataset partitions and is gathering its data from any partition of the dataset enterprisesAndEstablishments and sorts globally a massive amount of data,
when targeting only a part would be more efficient: all activities in this sample are in the partition [codeDepartement=14].

I would like it to respect these partitions and do this groupBy at their levels, to avoid shuffle.

I'm looking for the sortWithPartitions companion of groupBy. Something that would be called groupWithinPartitions but I don't find it.

What is the best way to achieve what I'm looking for,
or if there's no tool, what alternative should I choose ?

Upvotes: 1

Views: 895

Answers (1)

Manoj
Manoj

Reputation: 296

You can achieve the same using the RDD low-level function aggregateByKey which is one of the aggregate functions (Others are reduceByKey & groupByKey) available in Spark one difference which makes it a powerful one among the three.

aggregate key does not need to operate on the same datatype and can do different aggregation(Maximum, minimum, average, sum & count) within the partition and do a different aggregation between partitions.

case class EnterpriseEmp(
    city_code: Long,
    city_name: String,
    grouping: Int,
    section: String,
    activity_description: String,
    siren: String,
    number_of_workers: Long,
    number_of_actives_people: Long
)

val empList =
      Array(
        EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 10, "C", "Industries alimentaires", "A1", 100, 100),
        EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 10, "C", "Industries alimentaires", "A1", 150, 200),
        EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 10, "C", "Industries alimentaires", "A1", 200, 300),
        EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 86, "Q", "Activités pour la santé humaine", "B1", 1000, 1001),
        EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 86, "Q", "Activités pour la santé humaine", "B1", 1050, 2001),
        EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 86, "Q", "Activités pour la santé humaine", "B1", 2000, 3001)
      )
val data = sc.parallelize(empList)
val keyed = data.keyBy(key =>
  (
    key.city_code,
    key.city_name,
    key.grouping,
    key.section,
    key.activity_description
  )
)

aggregateByKey requires 3 main inputs:

  1. zeroValue: Initial value and it will not affect the aggregate values.
  2. Combiner function: This function accepts two parameters. The second parameter is merged into the first parameter. This function combines/merges values within a single partition.
  3. Reduce/Merge function: This function also accepts two parameters. Here parameters are merged into one across RDD partitions.
val init_value = (0L, 0L, 0L) //sum("number_of_workers"), sum("number_of_actives_people"), count("siren")
val combinerFunc = (inter: (Long, Long, Long), value: EnterpriseEmp) => {
  (
    inter._1 + value.number_of_workers,
    inter._2 + value.number_of_actives_people,
    inter._3 + 1
  )
}
val reduceFunc = (p1: (Long, Long, Long), p2: (Long, Long, Long)) => {
  (p1._1 + p2._1, p1._2 + p2._2, p1._3 + p2._3)
}
val output = keyed.aggregateByKey(init_value)(combinerFunc, reduceFunc)

Output:

output.collect.foreach(println)
((14654,Saint-Pierre-en-Auge,86,Q,Activités pour la santé humaine),(4050,6003,3))
((14654,Saint-Pierre-en-Auge,10,C,Industries alimentaires),(450,600,3))

Upvotes: 2

Related Questions