Reputation: 347
I am a new bee to spark and I am trying to perform a group by and count using the following spark functions:
Dataset<Row> result = dataset
.groupBy("column1", "column2")
.count();
But I read here that using group by is not a good idea since it does not have a combiner, which in turn affects the spark job's runtime efficiency. Instead, one should use reduceByKey function for aggregation operations.
So I tried using reduceByKey
function, but it is not available for dataset
. Instead, datasets use reduce(ReduceFunction<Row> func)
.
Since I can not find an example to perform group and count with reduce function, I tried converting it to JavaRDD
and used reduceByKey
:
//map each row to 1 and then group them by key
JavaPairRDD<String[], Integer> mapOnes;
try {
mapOnes = dailySummary.javaRDD().mapToPair(
new PairFunction<Row, String[], Integer>() {
@Override
public Tuple2<String[], Integer> call(Row t) throws Exception {
return new Tuple2<String[], Integer>(new String[]{t.getAs("column1"), t.getAs("column2")}, 1);
}
});
}catch(Exception e) {
log.error("exception in mapping ones: "+e);
throw new Exception();
}
JavaPairRDD<String[], Integer> rowCount;
try {
rowCount = mapOnes.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
});
}catch(Exception e) {
log.error("exception in reduce by key: "+e);
throw new Exception();
}
But this is also giving exception as org.apache.spark.SparkException: Task not serializable
for mapToPair
function.
Can anyone suggest a better way to group and perform count using dataset's reduce
and map
function.
Any help is appreciated.
Upvotes: 2
Views: 2657
Reputation: 2091
Based on a dataset containing 2 columns, one with the name of the county and the other the state in the US.
Desired output:
reduce()
Autauga County, Alabama, Baldwin County, Alabama, Barbour County, Alabama, Bibb County, Alabama, Blount County, Alabama, Bullock County, Alabama, Butler County, Alabama, Calhoun County, Alabama, Chambers County, Alabama, Cherokee County, Alabama, Chilton County,
…
Usage:
System.out.println("reduce()");
String listOfCountyStateDs = countyStateDs
.reduce(
new CountyStateConcatenatorUsingReduce());
System.out.println(listOfCountyStateDs);
Implementation:
private final class CountyStateConcatenatorUsingReduce
implements ReduceFunction<String> {
private static final long serialVersionUID = 12859L;
@Override
public String call(String v1, String v2) throws Exception {
return v1 + ", " + v2;
}
}
However, you will have to write your own logic, which may be time consuming and you'd prefer using the groupBy anyway...
Upvotes: 1
Reputation: 13001
The groupBy in the link you added refers to RDD. In the RDD semantics, groupBy would basically shuffle all the data according to the key, i.e. it would bring ALL values relating to the key to one place.
This is why reduceByKey is suggested as reduceByKey first performs the reduce operation on each partition and only the reduced value is shuffled which means a lot less traffic (and prevents out of memory issues with bringing everything to one partition).
In Datasets, groupBy behaves differently. It does not give a dataset as a returned object but instead a KeyValueGroupedDataset object. When you do count on this object (or the more generic agg), it basically defines a reducer which works very similar to reduceByKey.
This means there is no need for a separate reduceByKey method (the dataset groupby is actually a form of reduceByKey).
Stick with the original groupBy(...).count(...)
Upvotes: 3