Harish Pathak
Harish Pathak

Reputation: 1607

how to use aggregateByKey on javaPairRDD in Java?

I have searched a lot but I didn't find examples of doing aggregateByKey in java code.

I want to find count of rows in a JavaPairRDD reducing by key.

I read that aggregateByKey is best way to do it, but i am using Java instead of scala and I am not able to use it in Java.

Please help!!!

for example:

input: [(key1,[name:abc,email:def,address:ghi]),(key1,[name:abc,email:def,address:ghi]),(key2,[name:abc,email:def,address:ghi])]

output: [(key1,[name:abc,email:def,address:ghi, count:2]),(key2,[name:abc,email:def,address:ghi, count:1])]

I want to do exactly same as in my example, I want to add an extra column to the output row having count of the rows reduced.

Thanks!!!

Upvotes: 0

Views: 8030

Answers (3)

Shekhar
Shekhar

Reputation: 19

Data file:average.txt

student_Name,subject,marks

ss,english,80

ss,maths,60

GG,english,180

PP,english,80

PI,english,80

GG,maths,100

PP,maths,810

PI,maths,800

The problem is to find subject wise average using aggregateByKey spark transformation in java 8.

And here is one approach:

    JavaRDD<String> baseRDD = jsc.textFile("average.txt");
    JavaPairRDD<String,Integer> studentRDD = baseRDD.mapToPair( s -> new Tuple2<String,Integer>(s.split(",")[1],Integer.parseInt(s.split(",")[2])));
    JavaPairRDD<String,Avg> avgRDD = studentRDD.aggregateByKey(new Avg(0,0), (v,x) -> new Avg(v.getSum()+x,v.getNum()+1), (v1,v2) -> new Avg(v1.getSum()+v2.getSum(),v1.getNum()+v2.getNum()));

    Map<String,Avg> mapAvg = avgRDD.collectAsMap();

    for(Entry<String,Avg> entry : mapAvg.entrySet()){
        System.out.println(entry.getKey()+"::"+entry.getValue().getAvg());
    }



import java.io.Serializable;

public class Avg implements Serializable{

private static final long serialVersionUID = 1L;

private int sum;
private int num;

public Avg(int sum, int num){
    this.sum = sum;
    this.num = num;
}

public double getAvg(){ return (this.sum / this.num);}

public int getSum(){    return this.sum;    }

public int getNum(){        return this.num;    }

}

Upvotes: 1

Harish Pathak
Harish Pathak

Reputation: 1607

Here is the example of how I did aggregate by key in java.

JavaPairRDD<String, Row> result = inputDataFrame.javaRDD().mapToPair(new  PairFunction<Row, String, Row>() {
    private static final long serialVersionUID = 1L;
    public Tuple2<String, Row> call(Row tblRow) throws Exception {
        String strID= CommonConstant.BLANKSTRING;
        Object[] newRow = new Object[schemaSize];
        for(String s: matchKey)
        {
            if(tblRow.apply(finalSchema.get(s))!=null){
                strID+= tblRow.apply(finalSchema.get(s)).toString().trim().toLowerCase();
            }                           
        }   
        int rowSize=    tblRow.length();
        for (int itr = 0; itr < rowSize; itr++)
        {
            if(tblRow.apply(itr)!=null)
            {
                newRow[itr] = tblRow.apply(itr);
            }
        }
        newRow[idIndex]= Utils.generateKey(strID);
        return new Tuple2<String, Row>(strID,RowFactory.create(newRow));
    }
}).aggregateByKey(RowFactory.create(arr), new Function2<Row,Row,Row>(){

    private static final long serialVersionUID = 1L;

    public Row call(Row argRow1, Row argRow2) throws Exception {
        // TODO Auto-generated method stub

        Integer rowThreshold=   dataSchemaHashMap.get(CommonConstant.STR_TEMPThreshold);
        Object[] newRow = new Object[schemaSize];
        int rowSize=    argRow1.length();

        for (int itr = 0; itr < rowSize; itr++)
        {
            if(argRow1!=null && argRow2!=null)
            {
                if(argRow1.apply(itr)!=null && argRow2.apply(itr)!=null)
                {
                    if(itr==rowSize-1){
                        newRow[itr] = Integer.parseInt(argRow1.apply(itr).toString())+Integer.parseInt(argRow2.apply(itr).toString());
                    }else{
                        newRow[itr] = argRow2.apply(itr);
                    }
                }
            }
        }

        return RowFactory.create(newRow);

    }

}, new Function2<Row,Row,Row>(){
    private static final long serialVersionUID = 1L;

    public Row call(Row v1, Row v2) throws Exception {
        // TODO Auto-generated method stub
        return v1;
    }
});

JavaRDD<Row> result1 = result.map(new Function<Tuple2<String,Row>, Row>() {
    private static final long serialVersionUID = -5480405270683046298L;
    public Row call(Tuple2<String, Row> rddRow) throws Exception {
        return rddRow._2();
    }
});

Upvotes: 4

Srinivasarao Daruna
Srinivasarao Daruna

Reputation: 3374

I am not sure what you are trying to do, but i can provide a solution that gives output you needed. AggregateByKey does not do what you are expecting to do, it is just a way of combining for the RDD, where as on DataFrame it does similar to what you expect. Any way, below code can give you the output required.

JavaPairRDD<String, Iterable<String>> groups = pairs.groupByKey();

JavaPairRDD<Integer, String> counts = groups.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, Integer, String>(){

            public Tuple2<Integer, String> call(Tuple2<String, Iterable<String>> arg0) throws Exception {
                HashMap<String, Integer> counts = new HashMap<String, Integer>();
                Iterator<String> itr = arg0._2.iterator();
                String val = null;
                while(itr.hasNext()){
                    val = itr.next();
                    if(counts.get(val) == null){
                        counts.put(val, 1);
                    }else{
                        counts.put(val, counts.get(val)+1);
                    }
                }

                return new Tuple2(arg0._1, counts.toString());
            }

        });

You can try and let me know. And mind you, this is not the combining frankly, as combining does not do this kind of things.

Upvotes: -1

Related Questions