Neethu Lalitha
Neethu Lalitha

Reputation: 3071

Spark combineByKey over reduceBykey using Java

I have the below dataset:

    key   value
---------------------------
    key1,CLASS-A,YES,1
    key2,CLASS-B,YES,2
    key2,CLASS-B,YES,1
    key1,CLASS-A,YES,4
    key3,CLASS-C,DEFAULT,1

OUTPUT should look like:

    key   value
---------------------------
key1,    CLASS-A,YES,5
key2,    CLASS-B,YES,3
key3,    CLASS-C,NO,1

While using reduceByKey to obtain the result , i found that whenever there is a key with only one value , in this case key3, the reduceByKey is not called as there is nothing to reduce against.And I get :

   key   value
---------------------------
key1,    CLASS-A,YES,5
key2,    CLASS-B,YES,3
key3,    CLASS-C,DEAFULT,1

Can I achieve this using combineByKey in Spark (Java) .

What I tried so far :

reduceByKey(
                    new Function2<String, String, String>() {

                        @Override
                        public String call(String s1, String s2) throws Exception {
                            String[] vals1 = StringUtils.split(s1, ",");
                            String[] vals2 = StringUtils.split(s2, ",");

                            String jobStat1 = vals1[0];
                            String jobStat2 = vals2[0];

                            String reducedJobStat;


                            boolean s1 = jobStat1.equals("YES")

                            boolean s2 = jobStat2.equals("YES");

                            if (s1 || s2) {

                                reducedJobStat = "YES";
                            } else {

                                reducedJobStat = "NO";
                            }


                            return reducedJobStat;
                        }
                    }
            )

Upvotes: 1

Views: 518

Answers (2)

Neethu Lalitha
Neethu Lalitha

Reputation: 3071

So , I got an alternate solution using combinerByKey. The code for reduceByKey looks simpler, but I am doing a mapValues after the reduceByKey(please look the question for reason) to get the result .

CombineByKey is fairly simple if we understand how it internally works .

Example using CombineByKey

Input :

key   value

key1,CLASS-A,YES,1
key2,CLASS-B,YES,2
key2,CLASS-B,YES,1
key1,CLASS-A,YES,4
key3,CLASS-C,DEFAULT,1



 //The CreateCombiner will initialise the 1st Key in the 1st partition . Here Lets divide the input into 2 partitions:

 Partition 1:                                            Partition 2:

     key        value                                key   value
 ---------------------------                 ---------------------------
     key1,  CLASS-A,YES,1                       key1,           CLASS-A,YES,4
     key2,  CLASS-B,YES,2                       key3,           CLASS-C,DEFAULT,1
     key2,  CLASS-B,YES,1                        

public class CreateCombiner implements Function<String, String> {


    @Override
    public String call(String value) {  //value here is "CLASS-A,YES,1"  
        String jobStatus = value.split(",")[0];

        if (jobStatus.equals("YES")
                || jobStatus.equals("DEFAULT") {

            return "YES"+ " " + value.split(" ")[1] + " " + value.split(" ")[2];
        } else {
            return "NO" + " " + value.split(" ")[1] + " " + value.split(" ")[2];
        }

    }
}
 When the Key1 in 1st partition is encounterd, the CreateCombiner will initialise that key's. (key1 here)  value,In our case  we change the value(2nd string(YES/NO/DEFAULT)).
 Becase in my usecase I want to change all "DEFAULT" to "YES" .
 It replaces all the YES and DEFAULT strings to YES and otherwise to NO. Now Same for Key2 in the 1st partition .
 Again when it finds key2 in the 1st partition , the MergeValue class is called. It will merge the values . So here Key2 has 2 values(CLASS-B,YES,2 and CLASS-B,YES,1). It merges both.
 like (key2,CLASS-B,YES,3)

 The MergeCombiner  takes the combiners (tuples) created on each partition and merges them together. So in my case the logic is same as in MergeValue.


public class MergeValue implements Function2<String, String, String> {

    // MergeCombiner will decide the jobStatus and add the outCount and lbdCount.
    // This is a Merging function that takes a value and merges it into the previously collecte value(s).

    @Override
    public String call(String v1, String v2) throws Exception {


        String[] vals1 = StringUtils.split(v1, ",");
        String[] vals2 = StringUtils.split(v2, ",");

        String jobStat1 = vals1[0];
        String jobStat2 = vals2[0];

        String reducedJobStat;


        boolean stat1Process = (jobStat1.equals("YES"))
                || (jobStat1.equals("DEFAULT"));

          boolean stat2Process = (jobStat2.equals("YES"))
         || (jobStat2.equals("DEFAULT"));
        if (stat1Process || stat2Process) {

            reducedJobStat = "YES";
        } else {

            reducedJobStat = "NO";
        }

        int outCount = Integer.parseInt(vals1[1]) + Integer.parseInt(vals2[1]);

        int lbdCount = Integer.parseInt(vals1[2]) + Integer.parseInt(vals2[2]);
        return reducedJobStat + " " + Integer.toString(outCount) + " " + Integer.toString(lbdCount);

    }

}



public class MergeCombiner implements Function2<String, String, String> {

    // This fucntion combines the merged values together from MergeValue.
    // Basically this function takes the new values produced at the partition level and combines them until we end up
    // with one singular value.
    @Override
    public String call(String v1, String v2) throws Exception {


        String[] vals1 = StringUtils.split(v1, ",");
        String[] vals2 = StringUtils.split(v2, ",");

        String jobStat1 = vals1[0];
        String jobStat2 = vals2[0];

        String reducedJobStat;

        //Here we decide the jobStatus from 2 combiners , if both of them are complete ie jobStat1 and jobStat2 is COMP
        // LETE, then the Status is marked as complete.
        boolean stat1Process = (jobStat1.equals("YES");

        boolean stat2Process = (jobStat2.equals("YES");

        if (stat1Process || stat2Process) {

            reducedJobStat = "YES";
        } else {

            reducedJobStat = "YES";
        }

        int outCount = Integer.parseInt(vals1[1]) + Integer.parseInt(vals2[1]);

        int lbdCount = Integer.parseInt(vals1[2]) + Integer.parseInt(vals2[2]);

        return reducedJobStat + " " + Integer.toString(outCount) + " " + Integer.toString(lbdCount);

    }

Call to combineByKey

combineByKey(new CreateCombiner(), new MergeValue(), new MergeCombiner());

The same code Inmplemented using reduceByKey:

 reduceByKey(
                         new Function2<String, String, String>() {

                             @Override
                             public String call(String s1, String s2) throws Exception {
                                 String[] vals1 = StringUtils.split(s1, " ");
                                 String[] vals2 = StringUtils.split(s2, " ");

                                 String jobStat1 = vals1[0];
                                 String jobStat2 = vals2[0];

                                 String reducedJobStat;

                                 boolean stat1Process = (jobStat1.equals("YES")) ||
                                         (jobStat1.equals("DEFAULT");

                                 boolean stat2Process = (jobStat2.equals("YES")) ||
                                         (jobStat2.equals("DEFAULT");

                                 if (stat1Process || stat2Process) {

                                     reducedJobStat = "YES";
                                 } else {

                                     reducedJobStat = "NO";
                                 }

                                 int outCount = Integer.parseInt(vals1[1]) + Integer.parseInt(vals2[1]);

                                 int lbdCount = Integer.parseInt(vals1[2]) + Integer.parseInt(vals2[2]);

                                 return reducedJobStat + " " + Integer.toString(outCount) + " " + Integer.toString(lbdCount);
                             }
                         } ).mapValues(new Function<String, String>() {
                     @Override
                     public String call(String s) throws Exception {
                         String jobStatus = s.split(" ")[0];

                         if (jobStatus.equals("YES")
                                 || jobStatus.equals("DEFAULT") {


                             return "YES" + " " + s.split(" ")[1] + " " + s.split(" ")[2];
                         } else {
                             return "NO" + " " + s.split(" ")[1] + " " + s.split(" ")[2];

                         }


                     }
                 });

Upvotes: 0

JaredS
JaredS

Reputation: 53

the fundamental difference between reduceByKey and combineByKey in spark is that reduceByKey requires a function that takes a pair of values and returns a single value, whereas combineByKey allows for you to simultaneously transform your data and it requires three functions. The first is to create the new value type from the existing value type, the second adds an existing value type to a new value type, and the third adds to of the new value type.

The best example I have seen of combineByKey is at http://codingjunkie.net/spark-combine-by-key/

For your specific case, I'd recommend keeping it simple and using reduceByKey followed by mapValues to accomplish the desired transformation on key3. This might look something like:

reduced_rdd.mapValues(v => (v._1, if (v._2 == "DEFAULT") "NO" else v._2, v._3))

Upvotes: 1

Related Questions