Reputation: 3103
I'm reading a file using spark session then splitting the words and counting the iteration of the words. I need to show the data in desc order
SparkSession sparkSession = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.master", "local")
.getOrCreate();
JavaRDD<Word> textFile = sparkSession
.read()
.textFile("/Users/myname/Documents/README.txt")
.javaRDD()
.flatMap(s -> Arrays.asList(s.split("[\\s.]")).iterator())
.map(w -> {
Word word = new Word();
word.setWord(w.replace(",", ""));
return word;
});
Dataset<Row> df = sparkSession.createDataFrame(textFile, Word.class);
df.groupBy("word").count().orderBy(org.apache.spark.sql.functions.col("count").desc()).show();
When I'm using org.apache.spark.sql.functions.col("count")
it works fine but not able to do as defined in https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/functions.html#desc(java.lang.String)
df.sort(asc("dept"), desc("age"))
also How to sort by column in descending order in Spark SQL? didn't work. I guess it's for scala. What is the equivalent of this in Java?
Upvotes: 3
Views: 11935
Reputation: 107
I use spark 2.4.0
spark.kryo.registrationRequired
OR
kryo.register(org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering.class);
kryo.register(org.apache.spark.sql.catalyst.expressions.SortOrder[].class);
kryo.register(org.apache.spark.sql.catalyst.expressions.SortOrder.class);
kryo.register(org.apache.spark.sql.catalyst.expressions.BoundReference.class);
kryo.register(org.apache.spark.sql.catalyst.trees.Origin.class);
kryo.register(org.apache.spark.sql.catalyst.expressions.NullsFirst$.class);
kryo.register(org.apache.spark.sql.catalyst.expressions.Descending$.class);
kryo.register(org.apache.spark.sql.catalyst.expressions.NullsLast$.class);
kryo.register(Class.forName("scala.math.Ordering$$anon$4"));
kryo.register(Class.forName("scala.reflect.ClassTag$$anon$1"));
kryo.register(Class.forName("org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$8"));
kryo.register(Class.forName("org.apache.spark.sql.catalyst.expressions.Ascending$"));
private static SparkSession session;
public static void main(String[] args) {
/* DUMMY DATA creation */
List<Person> personsList = Arrays.asList(
new Person(1, "[email protected]", "nom_1"),
new Person(2, "[email protected]", "nom_2"),
new Person(3, "[email protected]", "nom_3"),
new Person(4, "[email protected]", "nom_4")
);
List<Profession> professionList = Arrays.asList(
new Profession(1, 2, "profession_4"),
new Profession(2, 1, "profession_2"),
new Profession(3, 1, "profession_5"),
new Profession(4, 2, "profession_2"),
new Profession(5, 2, "profession_5"),
new Profession(6, 3, "profession_7"),
new Profession(7, 3, "profession_2"),
new Profession(8, 4, "profession_2"),
new Profession(9, 4, "profession_7")
);
// SparkAppConfiguration.load(args);
// LaunchArgsEncoder launchArgs = SparkAppConfiguration.getLaunchArgs();
// Initialisation de la session
session = SparkUtils.initSession("test jointure");
/* Convert from Java list to Spark Dataset */
Dataset<Row> rowPerson = session.createDataFrame(personsList, Person.class);
System.out.println("rowPerson.show();");
rowPerson.show();
Dataset<Row> personRenamed = rowPerson.withColumnRenamed("id", "personId");
System.out.println("personRenamed.show();");
personRenamed.show();
Dataset<Row> rowProfession = session.createDataFrame(professionList, Profession.class);
System.out.println("rowProfession.show();");
rowProfession.show();
Dataset<Row> professionRenamed = rowProfession.withColumnRenamed("personId", "personFk");
System.out.println("professionRenamed.show();");
professionRenamed.show();
/* INNER JOIN IN Spark Java */
Dataset<Row> innerJoinData = personRenamed.join(professionRenamed,
personRenamed.col("personId").equalTo(professionRenamed.col("personFk")));
System.out.println("innerJoinData.show();");
innerJoinData.show();
Dataset<Jointure> joinResult = innerJoinData.select("personId", "nom", "courriel", "id", "profession")
.orderBy(org.apache.spark.sql.functions.col("personId").asc())
.as(Encoders.bean(Jointure.class));
System.out.println("joinResult.show();");
joinResult.show();
System.out.println("joinResult.printSchema();");
joinResult.printSchema();
System.exit(0);
}
public class Person implements Serializable{
/**
*
*/
private static final long serialVersionUID = 7327130742162877288L;
private long personId;
private String nom;
private String prenom;
private String courriel;
private String profession;
private String ville;
public Person(long personId, String nom, String prenom, String courriel, String profession, String ville) {
super();
this.personId = personId;
this.nom = nom;
this.prenom = prenom;
this.courriel = courriel;
this.profession = profession;
this.ville = ville;
}
public Person() {
super();
}
//getter and setter
}
public class Profession implements Serializable {
/**
*
*/
private static final long serialVersionUID = 7845266779357094461L;
private long id;
private long personId;
private String profession;
public Profession(long id, long personId, String profession) {
super();
this.id = id;
this.personId = personId;
this.profession = profession;
}
public Profession() {
super();
}
//getter and setter
}
public class Jointure implements Serializable {
/**
*
*/
private static final long serialVersionUID = 4341834876589947018L;
private long id;
private String nom;
private String prenom;
private String courriel;
private String profession;
public Jointure(long id, String nom, String prenom, String courriel, String profession) {
super();
this.id = id;
this.nom = nom;
this.prenom = prenom;
this.courriel = courriel;
this.profession = profession;
}
public Jointure() {
super();
}
//getter and setter
}
rowPerson.show();
+--------------------+---+-----+
| courriel| id| nom|
+--------------------+---+-----+
|[email protected]| 1|nom_1|
|[email protected]| 2|nom_2|
|[email protected]| 3|nom_3|
|[email protected]| 4|nom_4|
+--------------------+---+-----+
personRenamed.show();
+--------------------+--------+-----+
| courriel|personId| nom|
+--------------------+--------+-----+
|[email protected]| 1|nom_1|
|[email protected]| 2|nom_2|
|[email protected]| 3|nom_3|
|[email protected]| 4|nom_4|
+--------------------+--------+-----+
rowProfession.show();
+---+--------+------------+
| id|personId| profession|
+---+--------+------------+
| 1| 2|profession_4|
| 2| 1|profession_2|
| 3| 1|profession_5|
| 4| 2|profession_2|
| 5| 2|profession_5|
| 6| 3|profession_7|
| 7| 3|profession_2|
| 8| 4|profession_2|
| 9| 4|profession_7|
+---+--------+------------+
professionRenamed.show();
+---+--------+------------+
| id|personFk| profession|
+---+--------+------------+
| 1| 2|profession_4|
| 2| 1|profession_2|
| 3| 1|profession_5|
| 4| 2|profession_2|
| 5| 2|profession_5|
| 6| 3|profession_7|
| 7| 3|profession_2|
| 8| 4|profession_2|
| 9| 4|profession_7|
+---+--------+------------+
innerJoinData.show();
+--------------------+--------+-----+---+--------+------------+
| courriel|personId| nom| id|personFk| profession|
+--------------------+--------+-----+---+--------+------------+
|[email protected]| 2|nom_2| 1| 2|profession_4|
|[email protected]| 1|nom_1| 2| 1|profession_2|
|[email protected]| 1|nom_1| 3| 1|profession_5|
|[email protected]| 2|nom_2| 4| 2|profession_2|
|[email protected]| 2|nom_2| 5| 2|profession_5|
|[email protected]| 3|nom_3| 6| 3|profession_7|
|[email protected]| 3|nom_3| 7| 3|profession_2|
|[email protected]| 4|nom_4| 8| 4|profession_2|
|[email protected]| 4|nom_4| 9| 4|profession_7|
+--------------------+--------+-----+---+--------+------------+
joinResult.show();
+--------+-----+--------------------+---+------------+
|personId| nom| courriel| id| profession|
+--------+-----+--------------------+---+------------+
| 1|nom_1|[email protected]| 3|profession_5|
| 1|nom_1|[email protected]| 2|profession_2|
| 2|nom_2|[email protected]| 4|profession_2|
| 2|nom_2|[email protected]| 5|profession_5|
| 2|nom_2|[email protected]| 1|profession_4|
| 3|nom_3|[email protected]| 7|profession_2|
| 3|nom_3|[email protected]| 6|profession_7|
| 4|nom_4|[email protected]| 8|profession_2|
| 4|nom_4|[email protected]| 9|profession_7|
+--------+-----+--------------------+---+------------+
joinResult.printSchema();
root
|-- personId: long (nullable = false)
|-- nom: string (nullable = true)
|-- courriel: string (nullable = true)
|-- id: long (nullable = false)
|-- profession: string (nullable = true)
Upvotes: 0
Reputation: 23109
Your code should work as per the Spark Java doc. You haven't post the import statements. If you have not import
the functions
. Since desc()
and asc()
functions are under the functions
class.
So you need to use org.apache.spark.sql.functionsasc("dept"), org.apache.spark.sql.functionsdesc("age")
or import org.apache.spark.sql.functions.*
Upvotes: 2
Reputation: 931
In Java you have to import the package in this way:
import static org.apache.spark.sql.functions.*
Upvotes: 12