Marc Le Bihan
Marc Le Bihan

Reputation: 3304

The <K> class in a groupByKey(...) has a Map among its members. The groupByKey operation fails on an "un-comparable" problem

I have a class Entreprise that have primives data types and a Map on another class : Etablissement that is only made of primitive data types.

public class Entreprise implements Comparable<Entreprise> {
   /** Liste des établissements de l'entreprise. */
   private Map<String, Etablissement> etablissements = new HashMap<>();

   /** Sigle de l'entreprise */
   private String sigle;

   /** Nom de naissance */
   private String nomNaissance;

   /** Nom d'usage */
   private String nomUsage;
 ...
   @Override
   public int compareTo(Entreprise o) {
      return getSiren().compareTo(o.getSiren());
   }
}

In an EntrepriseDataset I created this schema :

public StructType schemaEntreprise() {
   StructType schema = new StructType()
      .add("sigle", StringType, true)
      .add("nomNaissance", StringType, true)
      .add("nomUsage", StringType, true)
       ...

   // Ajouter au Dataset des entreprises la liaison avec les établissements.
   MapType mapEtablissements = new MapType(StringType, this.datasetEtablissement.schemaEtablissement(), true);
   StructField etablissements = new StructField("etablissements", mapEtablissements, true, Metadata.empty());
   schema.add(etablissements);

  return schema;
}

I can do convenients joinWith such as :

Dataset<Tuple2<Entreprise, Etablissement>> ds = dsEntreprises
    .joinWith(dsEtablissements,
    dsEntreprises.col("siren").equalTo(dsEtablissements.col("siren")), "inner");

It's the beginning of some operations that should lead me to a Dataset of Entreprise objects having their Etablissement objects in their maps

Enterprise : {{834935512, Activité principale : 68.20A (NAFRev2), effectif salarié : null (null, employeur : null), active : null, dernier traitement : Jan 26, 2018, historisation débutée le Dec 26, 2017, nombre de périodes sans changement : 1}, nombre d'établissements : 1, catégorie entreprise : null (null), catégorie juridique : 1000, n° répertoire national des associations : null, Economie Sociale et Solidaire : null, NIC de l'établissement siège : 00014, sigle : null, dénomination de l'entreprise : {18}, dénominations usuelles 1 : null, 2 :{19}, 3 : {20}, 4 : {21} , Nom de naissance : LOHIER, Nom d'usage : null, prénom usuel : ROGER, autres prénoms : ROGER, pseudonyme : null, sexe : M, purgée : null, date de création : Dec 26, 2017}
Etablishment : {{83493551200014, Activité principale : 68.20A (NAFRev2), effectif salarié : null (null, employeur : null), active : null, dernier traitement : Jan 26, 2018, historisation débutée le Dec 26, 2017, nombre de périodes sans changement : 1}, activité au registre des métiers : null, date de création de l'établissement : 2017-12-26, établissement siège : false, dénomination de l'établissement : null, enseigne 1 : null, 2 : null, 3 : null, adresses : {anomalies : [], annulé logiquement : false, distribution spéciale : null, numéro dans la voie : 74, répétition : null, type de voie : BD, libellé de voie : DE LA PORTELETTE, complément d'adresse : 74-78, code postal : 80100, cedex : null - null, commune : 80001 - ABBEVILLE, commune étrangère : null, pays : null - null}}

Enterprise : {{178001111, Activité principale : 84.23Z (NAFRev2), effectif salarié : 41 (2016, employeur : null), active : null, dernier traitement : Jan 17, 2019, historisation débutée le Jan 1, 2008, nombre de périodes sans changement : 5}, nombre d'établissements : 3, catégorie entreprise : ETI (2,016), catégorie juridique : 7171, n° répertoire national des associations : null, Economie Sociale et Solidaire : null, NIC de l'établissement siège : 00016, sigle : null, dénomination de l'entreprise : {18}, dénominations usuelles 1 : null, 2 :{19}, 3 : {20}, 4 : {21} , Nom de naissance : null, Nom d'usage : null, prénom usuel : null, autres prénoms : null, pseudonyme : null, sexe : null, purgée : null, date de création : Jan 1, 1978}
Etablishment : {{17800111100396, Activité principale : 84.23Z (NAFRev2), effectif salarié : 11 (2016, employeur : null), active : null, dernier traitement : Sep 29, 2018, historisation débutée le Jan 1, 2008, nombre de périodes sans changement : 3}, activité au registre des métiers : null, date de création de l'établissement : 1983-01-01, établissement siège : false, dénomination de l'établissement : null, enseigne 1 : TRIBUNAL D'INSTANCE D'ABBEVILLE, 2 : null, 3 : null, adresses : {anomalies : [], annulé logiquement : false, distribution spéciale : BP A8, numéro dans la voie : 79, répétition : null, type de voie : RUE, libellé de voie : MARECHAL FOCH, complément d'adresse : null, code postal : 80100, cedex : 80103 - ABBEVILLE CEDEX, commune : 80001 - ABBEVILLE, commune étrangère : null, pays : null - null}}
Etablishment : {{17800111100743, Activité principale : 84.23Z (NAFRev2), effectif salarié : null (null, employeur : null), active : null, dernier traitement : Sep 1, 2008, historisation débutée le Dec 25, 2007, nombre de périodes sans changement : 1}, activité au registre des métiers : null, date de création de l'établissement : 2007-12-25, établissement siège : false, dénomination de l'établissement : null, enseigne 1 : TRIBUNAL PARITAIRE BAUX RURAUX, 2 : null, 3 : null, adresses : {anomalies : [], annulé logiquement : false, distribution spéciale : BP 330, numéro dans la voie : 79, répétition : null, type de voie : RUE, libellé de voie : MARECHAL FOCH, complément d'adresse : null, code postal : 80100, cedex : 80103 - ABBEVILLE CEDEX, commune : 80001 - ABBEVILLE, commune étrangère : null, pays : null - null}}
Etablishment : {{17800111100503, Activité principale : 84.23Z (NAFRev2), effectif salarié : 02 (2016, employeur : null), active : null, dernier traitement : Sep 29, 2018, historisation débutée le Jan 1, 2008, nombre de périodes sans changement : 3}, activité au registre des métiers : null, date de création de l'établissement : 1982-07-01, établissement siège : false, dénomination de l'établissement : null, enseigne 1 : CONSEIL DE PRUD'HOMMES D'ABBEVILLE, 2 : null, 3 : null, adresses : {anomalies : [], annulé logiquement : false, distribution spéciale : null, numéro dans la voie : 9, répétition : null, type de voie : AV, libellé de voie : DU GENERAL LECLERC, complément d'adresse : null, code postal : 80100, cedex : null - null, commune : 80001 - ABBEVILLE, commune étrangère : null, pays : null - null}}

But what is going wrong is that I cannot execute a groupByKey :

KeyValueGroupedDataset<Entreprise, Tuple2<Entreprise, Etablissement>> dsK = 
ds.groupByKey((MapFunction<Tuple2<Entreprise, Etablissement>, Entreprise>) f -> {
      Entreprise entreprise = f._1();
      Etablissement etablissement = f._2();
      entreprise.ajouterEtablissement(etablissement);

      return entreprise;
  }, 
  Encoders.bean(Entreprise.class));

Dataset<Entreprise> dsEntreprisesAvecEtablissements = 
dsK.mapGroups(new MapGroupsFunction<Entreprise, Tuple2<Entreprise, Etablissement>, Entreprise>() {
     @Override
     public Entreprise call(Entreprise key, Iterator<Tuple2<Entreprise, Etablissement>> values) {
        while(values.hasNext()) {
           Etablissement etablissement = values.next()._2();
           key.ajouterEtablissement(etablissement);
        }

        return key;
      }
   },
   Encoders.bean(Entreprise.class));

That groupByKey(...) (or more accurately the first action taken on dsEntreprisesAvecEtablissements dataset) fails with a message :

java.lang.IllegalArgumentException: cannot generate compare code for un-comparable type: map<string,struct<activiteArtisanRegistreDesMetiers:string,activitePrincipale:string,anneeValiditeEffectifSalarie:int,cedex:string,cedexSecondaire:string,codeCommune:string,codeCommuneSecondaire:string,codePaysEtranger:string,codePaysEtrangerSecondaire:string,codePostal:string,codePostalSecondaire:string,complementAdresse:string,complementAdresseSecondaire:string,dateCreationEtablissement:string,dateDebutHistorisation:string,dateDernierTraitement:string,denominationEtablissement:string,distributionSpeciale:string,distributionSpecialeSecondaire:string,enseigne1:string,enseigne2:string,enseigne3:string,indiceRepetition:string,indiceRepetitionSecondaire:string,libelleCedex:string,libelleCedexSecondaire:string,libelleVoie:string,libelleVoieSecondaire:string,nomCommune:string,nomCommuneSecondaire:string,nomPaysEtranger:string,nomPaysEtrangerSecondaire:string,nombrePeriodes:int,nomenclatureActivitePrincipale:string,numeroVoie:string,numeroVoieSecondaire:string,siege:boolean,siret:string,trancheEffectifSalarie:string,typeDeVoie:string,typeDeVoieSecondaire:string>>
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.genComp(CodeGenerator.scala:700) ~[spark-catalyst_2.12-2.4.3.jar:2.4.3]
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.$anonfun$genComparisons$3(GenerateOrdering.scala:121) ~[spark-catalyst_2.12-2.4.3.jar:2.4.3]

I find no other groupByKey method allowing to give a alternative comparison method, that the equals(Map) the groupByKey seems to execute here. It doesn't consider that my Java Pojo targets of Encoders.bean(..) are Comparable. So, eventually, I have to end with this clumsy code :

Entreprises entreprises = new Entreprises();

List<Tuple2<Entreprise, Etablissement>> tuples = ds.collectAsList();
Iterator<Tuple2<Entreprise, Etablissement>> itTuples = tuples.iterator();

while(itTuples.hasNext()) {
   Tuple2<Entreprise, Etablissement> tuple = itTuples.next();
   Entreprise entreprise = entreprises.get(tuple._1().getSiren());
   Etablissement etablissement = tuple._2();

   if (entreprise == null) {
      entreprise = tuple._1();
      entreprises.add(entreprise);
   }

   entreprise.ajouterEtablissement(etablissement);
}

return entreprises;

But a better way might exist to solve my problem more nicely. What should I have done to have at the end of my operations a Dataset<Entreprise> where each Entreprise has a Map of their Etablissement on it ?

Upvotes: 2

Views: 560

Answers (1)

damjad
damjad

Reputation: 1280

Spark does not allow comparison of MapType. You can do things a bit differently.

The important thing regarding your code is the key for joining and grouping by. It is the same for both of these operations. This makes things much easier.

You can try one of the following:

  • Changing the key from Enterprise to siren: String. And collecting all the Etablissement records in mapGroups. This may cause issues in case of duplicate keys in Enterprise.
  • Group by siren in Etablissement stream before join, and collecting them in the mapGroups function. The resulting stream is joined with the Enterprise stream followed by a map.

First Solution

Dataset<Tuple2<Entreprise, Etablissement>> ds = dsEntreprises
                .joinWith(dsEtablissements,
                        dsEntreprises.col("siren").equalTo(dsEtablissements.col("siren")), "inner");

        KeyValueGroupedDataset<String, Tuple2<Entreprise, Etablissement>> dsK = ds.groupByKey((MapFunction<Tuple2<Entreprise, Etablissement>, String>)
                value -> value._1.siren, Encoders.STRING());

        dsK.mapGroups((MapGroupsFunction<String, Tuple2<Entreprise, Etablissement>, Entreprise>) (key, values) -> {
            Entreprise e = null;
            while (values.hasNext()) {
                Tuple2<Entreprise, Etablissement> tuple = values.next();
                if (e == null) {
                    e = tuple._1;
                }

                e.ajouterEtablissement(tuple._2);
            }

            return e;
        }, Encoders.bean(Entreprise.class))
                .foreach((ForeachFunction<Entreprise>) x -> System.out.println(x));

Second Solution This solution is better because it deals with the case of duplicate keys in enterprise gracefully. It will also be efficient because it will reduce the number of records which are going to be joined.

KeyValueGroupedDataset<String, Etablissement> ets = dsEtablissements.groupByKey((MapFunction<Etablissement, String>) value -> value.siren, Encoders.STRING());

    Dataset<EtablissementList> etm = ets.mapGroups((MapGroupsFunction<String, Etablissement, EtablissementList>) (key, values) -> {
        Map<String, Etablissement> map = new HashMap<>();
        while (values.hasNext()) {
            Etablissement etablissement = values.next();
            map.put(etablissement.getId(), etablissement);
        }

        return new EtablissementList(map, key);
    }, Encoders.bean(EtablissementList.class));

    Dataset<Tuple2<Entreprise, EtablissementList>> dx = dsEntreprises.joinWith(etm, dsEntreprises.col("siren").equalTo(etm.col("siren")), "inner");
    Dataset<Entreprise> finalDs = dx.map((MapFunction<Tuple2<Entreprise, EtablissementList>, Entreprise>) value -> {
        value._1.etablissements = value._2.etablissements;
        return value._1;
    }, Encoders.bean(Entreprise.class));

    finalDs.foreach((ForeachFunction<Entreprise>) x -> System.out.println(x));

POJO

public static class EtablissementList {
        private Map<String, Etablissement> etablissements = new ConcurrentHashMap<>();
        private String siren;

        public EtablissementList() {
        }

        public EtablissementList(Map<String, Etablissement> etablissements, String siren) {
            this.etablissements = etablissements;
            this.siren = siren;
        }

        public Map<String, Etablissement> getEtablissements() {
            return etablissements;
        }

        public void setEtablissements(Map<String, Etablissement> etablissements) {
            this.etablissements = etablissements;
        }

        public String getSiren() {
            return siren;
        }

        public void setSiren(String siren) {
            this.siren = siren;
        }
    }

Upvotes: 1

Related Questions