Dinkar Thakur
Dinkar Thakur

Reputation: 3103

How to order by desc in Apache Spark Dataset using java api?

I'm reading a file using spark session then splitting the words and counting the iteration of the words. I need to show the data in desc order

SparkSession sparkSession = SparkSession
            .builder()
            .appName("Java Spark SQL basic example")
            .config("spark.master", "local")
            .getOrCreate();

JavaRDD<Word> textFile = sparkSession
            .read()
            .textFile("/Users/myname/Documents/README.txt")
            .javaRDD()
            .flatMap(s -> Arrays.asList(s.split("[\\s.]")).iterator())
            .map(w -> {
                Word word = new Word();
                word.setWord(w.replace(",", ""));
                return word;
            });

    Dataset<Row> df = sparkSession.createDataFrame(textFile, Word.class);
    df.groupBy("word").count().orderBy(org.apache.spark.sql.functions.col("count").desc()).show();

When I'm using org.apache.spark.sql.functions.col("count") it works fine but not able to do as defined in https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/functions.html#desc(java.lang.String)

df.sort(asc("dept"), desc("age"))

also How to sort by column in descending order in Spark SQL? didn't work. I guess it's for scala. What is the equivalent of this in Java?

Upvotes: 3

Views: 11935

Answers (3)

Merzouk MENHOUR
Merzouk MENHOUR

Reputation: 107

I use spark 2.4.0

  1. set the next key to false: spark.kryo.registrationRequired

OR

  1. add to kryo :


    kryo.register(org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering.class);
            kryo.register(org.apache.spark.sql.catalyst.expressions.SortOrder[].class);
            kryo.register(org.apache.spark.sql.catalyst.expressions.SortOrder.class);
            kryo.register(org.apache.spark.sql.catalyst.expressions.BoundReference.class);
            kryo.register(org.apache.spark.sql.catalyst.trees.Origin.class);
            kryo.register(org.apache.spark.sql.catalyst.expressions.NullsFirst$.class);
            kryo.register(org.apache.spark.sql.catalyst.expressions.Descending$.class);
            kryo.register(org.apache.spark.sql.catalyst.expressions.NullsLast$.class);

     kryo.register(Class.forName("scala.math.Ordering$$anon$4"));
                kryo.register(Class.forName("scala.reflect.ClassTag$$anon$1"));
                kryo.register(Class.forName("org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$8"));
                kryo.register(Class.forName("org.apache.spark.sql.catalyst.expressions.Ascending$"));

private static SparkSession session;

    public static void main(String[] args) {
        /* DUMMY DATA creation */
        List<Person> personsList = Arrays.asList(
            new Person(1, "[email protected]", "nom_1"),
            new Person(2, "[email protected]", "nom_2"), 
            new Person(3, "[email protected]", "nom_3"),
            new Person(4, "[email protected]", "nom_4")
        );

        List<Profession> professionList = Arrays.asList(
            new Profession(1, 2, "profession_4"),
            new Profession(2, 1, "profession_2"), 
            new Profession(3, 1, "profession_5"),
            new Profession(4, 2, "profession_2"), 
            new Profession(5, 2, "profession_5"),
            new Profession(6, 3, "profession_7"), 
            new Profession(7, 3, "profession_2"),
            new Profession(8, 4, "profession_2"), 
            new Profession(9, 4, "profession_7")
        );

        // SparkAppConfiguration.load(args);
        // LaunchArgsEncoder launchArgs = SparkAppConfiguration.getLaunchArgs();

        // Initialisation de la session
        session = SparkUtils.initSession("test jointure");

        /* Convert from Java list to Spark Dataset */
        Dataset<Row> rowPerson = session.createDataFrame(personsList, Person.class);
        System.out.println("rowPerson.show();");
        rowPerson.show();

        Dataset<Row> personRenamed = rowPerson.withColumnRenamed("id", "personId");
        System.out.println("personRenamed.show();");
        personRenamed.show();

        Dataset<Row> rowProfession = session.createDataFrame(professionList, Profession.class);
        System.out.println("rowProfession.show();");
        rowProfession.show();

        Dataset<Row> professionRenamed = rowProfession.withColumnRenamed("personId", "personFk");
        System.out.println("professionRenamed.show();");
        professionRenamed.show();


        /* INNER JOIN IN Spark Java */
        Dataset<Row> innerJoinData = personRenamed.join(professionRenamed,
                    personRenamed.col("personId").equalTo(professionRenamed.col("personFk")));

        System.out.println("innerJoinData.show();");
        innerJoinData.show();

        Dataset<Jointure> joinResult = innerJoinData.select("personId", "nom", "courriel", "id", "profession")
                                                        .orderBy(org.apache.spark.sql.functions.col("personId").asc()) 
                                                        .as(Encoders.bean(Jointure.class));
        System.out.println("joinResult.show();");
        joinResult.show();
        System.out.println("joinResult.printSchema();");
        joinResult.printSchema();

        System.exit(0);

 }


 public class Person implements Serializable{

    /**
     * 
     */
    private static final long serialVersionUID = 7327130742162877288L;
    private long personId;
    private String nom;
    private String prenom;
    private String courriel;
    private String profession;
    private String ville;

    public Person(long personId, String nom, String prenom, String courriel, String profession, String ville) {
        super();
        this.personId = personId;
        this.nom = nom;
        this.prenom = prenom;
        this.courriel = courriel;
        this.profession = profession;
        this.ville = ville;
    }

    public Person() {
        super();
    }
   //getter and setter
  }

   public class Profession implements Serializable {

/**
 * 
 */
private static final long serialVersionUID = 7845266779357094461L;

private long id;
private long personId;
private String profession;

public Profession(long id, long personId, String profession) {
    super();
    this.id = id;
    this.personId = personId;
    this.profession = profession;
}

public Profession() {
    super();
}
    //getter and setter
   }

   public class Jointure implements Serializable {


    /**
     * 
     */
    private static final long serialVersionUID = 4341834876589947018L;

    private long id; 
    private String nom;
    private String prenom;
    private String courriel; 
    private String profession;

    public Jointure(long id, String nom, String prenom, String courriel,   String profession) {
        super();
        this.id = id;
        this.nom = nom;
        this.prenom = prenom;
        this.courriel = courriel;
        this.profession = profession;
    }

    public Jointure() {
        super();
    }
    //getter and setter
   }
    rowPerson.show();
    +--------------------+---+-----+
    |            courriel| id|  nom|
    +--------------------+---+-----+
    |[email protected]|  1|nom_1|
    |[email protected]|  2|nom_2|
    |[email protected]|  3|nom_3|
    |[email protected]|  4|nom_4|
    +--------------------+---+-----+

    personRenamed.show();
    +--------------------+--------+-----+
    |            courriel|personId|  nom|
    +--------------------+--------+-----+
    |[email protected]|       1|nom_1|
    |[email protected]|       2|nom_2|
    |[email protected]|       3|nom_3|
    |[email protected]|       4|nom_4|
    +--------------------+--------+-----+

    rowProfession.show();
    +---+--------+------------+
    | id|personId|  profession|
    +---+--------+------------+
    |  1|       2|profession_4|
    |  2|       1|profession_2|
    |  3|       1|profession_5|
    |  4|       2|profession_2|
    |  5|       2|profession_5|
    |  6|       3|profession_7|
    |  7|       3|profession_2|
    |  8|       4|profession_2|
    |  9|       4|profession_7|
    +---+--------+------------+

    professionRenamed.show();
    +---+--------+------------+
    | id|personFk|  profession|
    +---+--------+------------+
    |  1|       2|profession_4|
    |  2|       1|profession_2|
    |  3|       1|profession_5|
    |  4|       2|profession_2|
    |  5|       2|profession_5|
    |  6|       3|profession_7|
    |  7|       3|profession_2|
    |  8|       4|profession_2|
    |  9|       4|profession_7|
    +---+--------+------------+

    innerJoinData.show();
    +--------------------+--------+-----+---+--------+------------+
    |            courriel|personId|  nom| id|personFk|  profession|
    +--------------------+--------+-----+---+--------+------------+
    |[email protected]|       2|nom_2|  1|       2|profession_4|
    |[email protected]|       1|nom_1|  2|       1|profession_2|
    |[email protected]|       1|nom_1|  3|       1|profession_5|
    |[email protected]|       2|nom_2|  4|       2|profession_2|
    |[email protected]|       2|nom_2|  5|       2|profession_5|
    |[email protected]|       3|nom_3|  6|       3|profession_7|
    |[email protected]|       3|nom_3|  7|       3|profession_2|
    |[email protected]|       4|nom_4|  8|       4|profession_2|
    |[email protected]|       4|nom_4|  9|       4|profession_7|
    +--------------------+--------+-----+---+--------+------------+

    joinResult.show();
    +--------+-----+--------------------+---+------------+
    |personId|  nom|            courriel| id|  profession|
    +--------+-----+--------------------+---+------------+
    |       1|nom_1|[email protected]|  3|profession_5|
    |       1|nom_1|[email protected]|  2|profession_2|
    |       2|nom_2|[email protected]|  4|profession_2|
    |       2|nom_2|[email protected]|  5|profession_5|
    |       2|nom_2|[email protected]|  1|profession_4|
    |       3|nom_3|[email protected]|  7|profession_2|
    |       3|nom_3|[email protected]|  6|profession_7|
    |       4|nom_4|[email protected]|  8|profession_2|
    |       4|nom_4|[email protected]|  9|profession_7|
    +--------+-----+--------------------+---+------------+

    joinResult.printSchema();
    root
     |-- personId: long (nullable = false)
     |-- nom: string (nullable = true)
     |-- courriel: string (nullable = true)
     |-- id: long (nullable = false)
     |-- profession: string (nullable = true)




Upvotes: 0

koiralo
koiralo

Reputation: 23109

Your code should work as per the Spark Java doc. You haven't post the import statements. If you have not import the functions. Since desc() and asc() functions are under the functions class. So you need to use org.apache.spark.sql.functionsasc("dept"), org.apache.spark.sql.functionsdesc("age")

or import org.apache.spark.sql.functions.*

Upvotes: 2

Umberto Griffo
Umberto Griffo

Reputation: 931

In Java you have to import the package in this way:

import static org.apache.spark.sql.functions.*

Upvotes: 12

Related Questions