Reputation: 3304
I have a class Entreprise
that have primives data types and a Map on another class : Etablissement
that is only made of primitive data types.
public class Entreprise implements Comparable<Entreprise> {
/** Liste des établissements de l'entreprise. */
private Map<String, Etablissement> etablissements = new HashMap<>();
/** Sigle de l'entreprise */
private String sigle;
/** Nom de naissance */
private String nomNaissance;
/** Nom d'usage */
private String nomUsage;
...
@Override
public int compareTo(Entreprise o) {
return getSiren().compareTo(o.getSiren());
}
}
In an EntrepriseDataset
I created this schema :
public StructType schemaEntreprise() {
StructType schema = new StructType()
.add("sigle", StringType, true)
.add("nomNaissance", StringType, true)
.add("nomUsage", StringType, true)
...
// Ajouter au Dataset des entreprises la liaison avec les établissements.
MapType mapEtablissements = new MapType(StringType, this.datasetEtablissement.schemaEtablissement(), true);
StructField etablissements = new StructField("etablissements", mapEtablissements, true, Metadata.empty());
schema.add(etablissements);
return schema;
}
I can do convenients joinWith
such as :
Dataset<Tuple2<Entreprise, Etablissement>> ds = dsEntreprises
.joinWith(dsEtablissements,
dsEntreprises.col("siren").equalTo(dsEtablissements.col("siren")), "inner");
It's the beginning of some operations that should lead me to a Dataset of Entreprise
objects having their Etablissement
objects in their maps
Enterprise : {{834935512, Activité principale : 68.20A (NAFRev2), effectif salarié : null (null, employeur : null), active : null, dernier traitement : Jan 26, 2018, historisation débutée le Dec 26, 2017, nombre de périodes sans changement : 1}, nombre d'établissements : 1, catégorie entreprise : null (null), catégorie juridique : 1000, n° répertoire national des associations : null, Economie Sociale et Solidaire : null, NIC de l'établissement siège : 00014, sigle : null, dénomination de l'entreprise : {18}, dénominations usuelles 1 : null, 2 :{19}, 3 : {20}, 4 : {21} , Nom de naissance : LOHIER, Nom d'usage : null, prénom usuel : ROGER, autres prénoms : ROGER, pseudonyme : null, sexe : M, purgée : null, date de création : Dec 26, 2017}
Etablishment : {{83493551200014, Activité principale : 68.20A (NAFRev2), effectif salarié : null (null, employeur : null), active : null, dernier traitement : Jan 26, 2018, historisation débutée le Dec 26, 2017, nombre de périodes sans changement : 1}, activité au registre des métiers : null, date de création de l'établissement : 2017-12-26, établissement siège : false, dénomination de l'établissement : null, enseigne 1 : null, 2 : null, 3 : null, adresses : {anomalies : [], annulé logiquement : false, distribution spéciale : null, numéro dans la voie : 74, répétition : null, type de voie : BD, libellé de voie : DE LA PORTELETTE, complément d'adresse : 74-78, code postal : 80100, cedex : null - null, commune : 80001 - ABBEVILLE, commune étrangère : null, pays : null - null}}
Enterprise : {{178001111, Activité principale : 84.23Z (NAFRev2), effectif salarié : 41 (2016, employeur : null), active : null, dernier traitement : Jan 17, 2019, historisation débutée le Jan 1, 2008, nombre de périodes sans changement : 5}, nombre d'établissements : 3, catégorie entreprise : ETI (2,016), catégorie juridique : 7171, n° répertoire national des associations : null, Economie Sociale et Solidaire : null, NIC de l'établissement siège : 00016, sigle : null, dénomination de l'entreprise : {18}, dénominations usuelles 1 : null, 2 :{19}, 3 : {20}, 4 : {21} , Nom de naissance : null, Nom d'usage : null, prénom usuel : null, autres prénoms : null, pseudonyme : null, sexe : null, purgée : null, date de création : Jan 1, 1978}
Etablishment : {{17800111100396, Activité principale : 84.23Z (NAFRev2), effectif salarié : 11 (2016, employeur : null), active : null, dernier traitement : Sep 29, 2018, historisation débutée le Jan 1, 2008, nombre de périodes sans changement : 3}, activité au registre des métiers : null, date de création de l'établissement : 1983-01-01, établissement siège : false, dénomination de l'établissement : null, enseigne 1 : TRIBUNAL D'INSTANCE D'ABBEVILLE, 2 : null, 3 : null, adresses : {anomalies : [], annulé logiquement : false, distribution spéciale : BP A8, numéro dans la voie : 79, répétition : null, type de voie : RUE, libellé de voie : MARECHAL FOCH, complément d'adresse : null, code postal : 80100, cedex : 80103 - ABBEVILLE CEDEX, commune : 80001 - ABBEVILLE, commune étrangère : null, pays : null - null}}
Etablishment : {{17800111100743, Activité principale : 84.23Z (NAFRev2), effectif salarié : null (null, employeur : null), active : null, dernier traitement : Sep 1, 2008, historisation débutée le Dec 25, 2007, nombre de périodes sans changement : 1}, activité au registre des métiers : null, date de création de l'établissement : 2007-12-25, établissement siège : false, dénomination de l'établissement : null, enseigne 1 : TRIBUNAL PARITAIRE BAUX RURAUX, 2 : null, 3 : null, adresses : {anomalies : [], annulé logiquement : false, distribution spéciale : BP 330, numéro dans la voie : 79, répétition : null, type de voie : RUE, libellé de voie : MARECHAL FOCH, complément d'adresse : null, code postal : 80100, cedex : 80103 - ABBEVILLE CEDEX, commune : 80001 - ABBEVILLE, commune étrangère : null, pays : null - null}}
Etablishment : {{17800111100503, Activité principale : 84.23Z (NAFRev2), effectif salarié : 02 (2016, employeur : null), active : null, dernier traitement : Sep 29, 2018, historisation débutée le Jan 1, 2008, nombre de périodes sans changement : 3}, activité au registre des métiers : null, date de création de l'établissement : 1982-07-01, établissement siège : false, dénomination de l'établissement : null, enseigne 1 : CONSEIL DE PRUD'HOMMES D'ABBEVILLE, 2 : null, 3 : null, adresses : {anomalies : [], annulé logiquement : false, distribution spéciale : null, numéro dans la voie : 9, répétition : null, type de voie : AV, libellé de voie : DU GENERAL LECLERC, complément d'adresse : null, code postal : 80100, cedex : null - null, commune : 80001 - ABBEVILLE, commune étrangère : null, pays : null - null}}
But what is going wrong is that I cannot execute a groupByKey
:
KeyValueGroupedDataset<Entreprise, Tuple2<Entreprise, Etablissement>> dsK =
ds.groupByKey((MapFunction<Tuple2<Entreprise, Etablissement>, Entreprise>) f -> {
Entreprise entreprise = f._1();
Etablissement etablissement = f._2();
entreprise.ajouterEtablissement(etablissement);
return entreprise;
},
Encoders.bean(Entreprise.class));
Dataset<Entreprise> dsEntreprisesAvecEtablissements =
dsK.mapGroups(new MapGroupsFunction<Entreprise, Tuple2<Entreprise, Etablissement>, Entreprise>() {
@Override
public Entreprise call(Entreprise key, Iterator<Tuple2<Entreprise, Etablissement>> values) {
while(values.hasNext()) {
Etablissement etablissement = values.next()._2();
key.ajouterEtablissement(etablissement);
}
return key;
}
},
Encoders.bean(Entreprise.class));
That groupByKey(...)
(or more accurately the first action taken on dsEntreprisesAvecEtablissements
dataset) fails with a message :
java.lang.IllegalArgumentException: cannot generate compare code for un-comparable type: map<string,struct<activiteArtisanRegistreDesMetiers:string,activitePrincipale:string,anneeValiditeEffectifSalarie:int,cedex:string,cedexSecondaire:string,codeCommune:string,codeCommuneSecondaire:string,codePaysEtranger:string,codePaysEtrangerSecondaire:string,codePostal:string,codePostalSecondaire:string,complementAdresse:string,complementAdresseSecondaire:string,dateCreationEtablissement:string,dateDebutHistorisation:string,dateDernierTraitement:string,denominationEtablissement:string,distributionSpeciale:string,distributionSpecialeSecondaire:string,enseigne1:string,enseigne2:string,enseigne3:string,indiceRepetition:string,indiceRepetitionSecondaire:string,libelleCedex:string,libelleCedexSecondaire:string,libelleVoie:string,libelleVoieSecondaire:string,nomCommune:string,nomCommuneSecondaire:string,nomPaysEtranger:string,nomPaysEtrangerSecondaire:string,nombrePeriodes:int,nomenclatureActivitePrincipale:string,numeroVoie:string,numeroVoieSecondaire:string,siege:boolean,siret:string,trancheEffectifSalarie:string,typeDeVoie:string,typeDeVoieSecondaire:string>>
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.genComp(CodeGenerator.scala:700) ~[spark-catalyst_2.12-2.4.3.jar:2.4.3]
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.$anonfun$genComparisons$3(GenerateOrdering.scala:121) ~[spark-catalyst_2.12-2.4.3.jar:2.4.3]
I find no other groupByKey method allowing to give a alternative comparison method, that the equals(Map) the groupByKey seems to execute here. It doesn't consider that my Java Pojo targets of Encoders.bean(..)
are Comparable
. So, eventually, I have to end with this clumsy code :
Entreprises entreprises = new Entreprises();
List<Tuple2<Entreprise, Etablissement>> tuples = ds.collectAsList();
Iterator<Tuple2<Entreprise, Etablissement>> itTuples = tuples.iterator();
while(itTuples.hasNext()) {
Tuple2<Entreprise, Etablissement> tuple = itTuples.next();
Entreprise entreprise = entreprises.get(tuple._1().getSiren());
Etablissement etablissement = tuple._2();
if (entreprise == null) {
entreprise = tuple._1();
entreprises.add(entreprise);
}
entreprise.ajouterEtablissement(etablissement);
}
return entreprises;
But a better way might exist to solve my problem more nicely. What should I have done to have at the end of my operations a Dataset<Entreprise>
where each Entreprise
has a Map of their Etablissement
on it ?
Upvotes: 2
Views: 560
Reputation: 1280
Spark does not allow comparison of MapType
. You can do things a bit differently.
The important thing regarding your code is the key for joining and grouping by. It is the same for both of these operations. This makes things much easier.
You can try one of the following:
Enterprise
to siren: String
. And collecting all the Etablissement
records in mapGroups
. This may cause issues in case of duplicate keys in Enterprise
.siren
in Etablissement stream before join, and collecting them in the mapGroups
function. The resulting stream is joined with the Enterprise
stream followed by a map. First Solution
Dataset<Tuple2<Entreprise, Etablissement>> ds = dsEntreprises
.joinWith(dsEtablissements,
dsEntreprises.col("siren").equalTo(dsEtablissements.col("siren")), "inner");
KeyValueGroupedDataset<String, Tuple2<Entreprise, Etablissement>> dsK = ds.groupByKey((MapFunction<Tuple2<Entreprise, Etablissement>, String>)
value -> value._1.siren, Encoders.STRING());
dsK.mapGroups((MapGroupsFunction<String, Tuple2<Entreprise, Etablissement>, Entreprise>) (key, values) -> {
Entreprise e = null;
while (values.hasNext()) {
Tuple2<Entreprise, Etablissement> tuple = values.next();
if (e == null) {
e = tuple._1;
}
e.ajouterEtablissement(tuple._2);
}
return e;
}, Encoders.bean(Entreprise.class))
.foreach((ForeachFunction<Entreprise>) x -> System.out.println(x));
Second Solution This solution is better because it deals with the case of duplicate keys in enterprise gracefully. It will also be efficient because it will reduce the number of records which are going to be joined.
KeyValueGroupedDataset<String, Etablissement> ets = dsEtablissements.groupByKey((MapFunction<Etablissement, String>) value -> value.siren, Encoders.STRING());
Dataset<EtablissementList> etm = ets.mapGroups((MapGroupsFunction<String, Etablissement, EtablissementList>) (key, values) -> {
Map<String, Etablissement> map = new HashMap<>();
while (values.hasNext()) {
Etablissement etablissement = values.next();
map.put(etablissement.getId(), etablissement);
}
return new EtablissementList(map, key);
}, Encoders.bean(EtablissementList.class));
Dataset<Tuple2<Entreprise, EtablissementList>> dx = dsEntreprises.joinWith(etm, dsEntreprises.col("siren").equalTo(etm.col("siren")), "inner");
Dataset<Entreprise> finalDs = dx.map((MapFunction<Tuple2<Entreprise, EtablissementList>, Entreprise>) value -> {
value._1.etablissements = value._2.etablissements;
return value._1;
}, Encoders.bean(Entreprise.class));
finalDs.foreach((ForeachFunction<Entreprise>) x -> System.out.println(x));
POJO
public static class EtablissementList {
private Map<String, Etablissement> etablissements = new ConcurrentHashMap<>();
private String siren;
public EtablissementList() {
}
public EtablissementList(Map<String, Etablissement> etablissements, String siren) {
this.etablissements = etablissements;
this.siren = siren;
}
public Map<String, Etablissement> getEtablissements() {
return etablissements;
}
public void setEtablissements(Map<String, Etablissement> etablissements) {
this.etablissements = etablissements;
}
public String getSiren() {
return siren;
}
public void setSiren(String siren) {
this.siren = siren;
}
}
Upvotes: 1