Reputation: 3304
I determine the number of employees and enterprises of activities sectors, city by city:
|codeCommune|nomCommune |regroupement|section|libelleAPE |nombreEntreprises|nombreSalaries|nombreActifs|
+-----------+--------------------+------------+-------+---------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------+------------+
|14654 |Saint-Pierre-en-Auge|84.11Z |O |Administration publique générale |3 |153.5 |169.5 |
|14654 |Saint-Pierre-en-Auge|16.24Z |C |Fabrication d'emballages en bois |1 |149.5 |150.5 |
|14654 |Saint-Pierre-en-Auge|10.11Z |C |Transformation et conservation de la viande de boucherie |1 |149.5 |150.5 |
with a grouping level (regroupement
below) that is set by the user:
+-----------+--------------------+------------+-------+------------------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------+------------+
|codeCommune|nomCommune |regroupement|section|libelleAPE |nombreEntreprises|nombreSalaries|nombreActifs|
+-----------+--------------------+------------+-------+------------------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------+------------+
|14654 |Saint-Pierre-en-Auge|10 |C |Industries alimentaires |16 |208.0 |225.0 |
|14654 |Saint-Pierre-en-Auge|86 |Q |Activités pour la santé humaine |46 |169.5 |218.5 |
|14654 |Saint-Pierre-en-Auge|84 |O |Administration publique et défense ; sécurité sociale obligatoire |5 |153.5 |171.5 |
The job is done that way:
From a Dataset
of enterprises and establishments, partitioned by a department code (roughly the two first characters of a city code), these columns are selected:
city_code
,city_name
,grouping
(the part of the activity code we are retaining: 84.11Z
or 84
),section
(a code summarizing the sector of an activity: industrial, commercial, etc.),activity_description
,siren
(the enterprise identifier: an enterprise might have many establishments),number_of_workers
,number_of_actives_people
a groupBy
is done :
RelationalGroupedDataset group = enterprisesAndEstablishments
.groupBy("city_code", "city_name", "grouping", "section", "activity_description");
group.agg(countDistinct("siren").as("nombreEntreprises"),
sum("number_of_workers").as("nombreSalaries"),
sum("number_of_actives_people").as("nombreActifs"));
My problem is that the groupBy
method doesn't care about the dataset partitions and is gathering its data from any partition of the dataset enterprisesAndEstablishments
and sorts globally a massive amount of data,
when targeting only a part would be more efficient: all activities in this sample are in the partition [codeDepartement=14]
.
I would like it to respect these partitions and do this groupBy
at their levels, to avoid shuffle.
I'm looking for the sortWithPartitions
companion of groupBy
. Something that would be called groupWithinPartitions
but I don't find it.
What is the best way to achieve what I'm looking for,
or if there's no tool, what alternative should I choose ?
Upvotes: 1
Views: 895
Reputation: 296
You can achieve the same using the RDD low-level function aggregateByKey
which is one of the aggregate functions (Others are reduceByKey
& groupByKey
) available in Spark one difference which makes it a powerful one among the three.
aggregate key does not need to operate on the same datatype and can do different aggregation(Maximum, minimum, average, sum & count) within the partition and do a different aggregation between partitions.
case class EnterpriseEmp(
city_code: Long,
city_name: String,
grouping: Int,
section: String,
activity_description: String,
siren: String,
number_of_workers: Long,
number_of_actives_people: Long
)
val empList =
Array(
EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 10, "C", "Industries alimentaires", "A1", 100, 100),
EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 10, "C", "Industries alimentaires", "A1", 150, 200),
EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 10, "C", "Industries alimentaires", "A1", 200, 300),
EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 86, "Q", "Activités pour la santé humaine", "B1", 1000, 1001),
EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 86, "Q", "Activités pour la santé humaine", "B1", 1050, 2001),
EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 86, "Q", "Activités pour la santé humaine", "B1", 2000, 3001)
)
val data = sc.parallelize(empList)
val keyed = data.keyBy(key =>
(
key.city_code,
key.city_name,
key.grouping,
key.section,
key.activity_description
)
)
aggregateByKey requires 3 main inputs:
val init_value = (0L, 0L, 0L) //sum("number_of_workers"), sum("number_of_actives_people"), count("siren")
val combinerFunc = (inter: (Long, Long, Long), value: EnterpriseEmp) => {
(
inter._1 + value.number_of_workers,
inter._2 + value.number_of_actives_people,
inter._3 + 1
)
}
val reduceFunc = (p1: (Long, Long, Long), p2: (Long, Long, Long)) => {
(p1._1 + p2._1, p1._2 + p2._2, p1._3 + p2._3)
}
val output = keyed.aggregateByKey(init_value)(combinerFunc, reduceFunc)
Output:
output.collect.foreach(println)
((14654,Saint-Pierre-en-Auge,86,Q,Activités pour la santé humaine),(4050,6003,3))
((14654,Saint-Pierre-en-Auge,10,C,Industries alimentaires),(450,600,3))
Upvotes: 2