user641887
user641887

Reputation: 1576

Apache Spark - reducebyKey - Java -

I am trying to understand the working of the reduceByKey in Spark using java as the programming language.

Say I have a sentence "I am who I am". I break the sentence into words and store it as a list [I, am, who, I, am].

Now this function assigns 1 to each word:

JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(String s) {
        return new Tuple2<String, Integer>(s, 1);
    }
});

So the output is something like this:

(I,1) 
(am,1)
(who,1)
(I,1)
(am,1)

Now if I have 3 reducers running, each reducer will get a key and the values associated with that key:

reducer 1:
    (I,1)
    (I,1)

reducer 2:
    (am,1)
    (am,1)

reducer 3:
    (who,1)

I wanted to know

a. What exactly happens here in the function below.
b. What are the parameters new Function2<Integer, Integer, Integer>
c. Basically how the JavaPairRDD is formed.

JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer i1, Integer i2) {
        return i1 + i2;
    }
});

Upvotes: 5

Views: 18845

Answers (4)

Ajay Kr Choudhary
Ajay Kr Choudhary

Reputation: 1352

reduceByKey, as the name suggests, would apply a reduce operation on the JavaPairRDD for which the key is the same. If you refer to the documentation, it says that reduceByKey

Merge the values for each key using an associative and commutative reduce function.

reduceByKey needs an implementation of Function2 Interface. The syntax of Function2 is: Function2<T1, T2, R> Here, input arguments are of type T1 and T2 and the output argument is of type R.

Let's understand this with the example that you mentioned

Your JavaPairRDD upon which you want to apply reduceByKey is :

(I,1) 
(am,1)
(who,1)
(I,1)
(am,1)

In your JavaPairRDD, the Key is the first argument (words in this case) and the value is the second argument (1 assigned to each word). you want to apply reduceByKey in order to know which word has occurred how many times. Whenever we see the same word we want to add up the value of JavaPairRDD. so to add up the value you would want two input argument and the return would be one argument.

Therefore first two Integer, Integer in the syntax refers to the input and the third integer refers to the output. Relating with the syntax of Function2 Interface, T1 and T2 are integer and R is also Integer.

To answer the question c)

The JavaPairRDD which is finally formed by applying the reduceByKey operation will have the key as the original key of the JavaPairRDD on which reduceByKey is applied and the value would be the final reduced value as computed in the implementation of Function2 Interface.

If you get confused with the parameter of these Functional Interfaces in general. Then you can probably use this rule: Input arguments would be followed by the output arguments in the syntax declaration of the Interface.

The input argument/arguments would be in the parenthesis of the function and The output argument would be the one mentioned before the function name.

For example:

  1. Look at the declaration of PairFunction in the question that you asked. It is PairFunction<String, String, Integer> and the corresponding call method is Tuple2<String, Integer> call(String s) . So the input is String here and the output is formed by String, Integer.
  2. Look at the declaration of Function2 Interface. It is Function2<Integer, Integer, Integer> and the corresponding call method is Integer call(Integer i1, Integer i2) . So the inputs are two Integer and the output is one Integer.

I hope it helps.

Upvotes: 0

Naveen Kumar
Naveen Kumar

Reputation: 41

In short, consider this:

Input: {(a:1),(b:2),(c:2),(a:3),(b:2),(c:3)}

Pass it to reduceByKey.

Output: {(a:4),(b:4),(c:5)}

Upvotes: -3

napster
napster

Reputation: 193

The reduceByKey works as below:

in an RDD , if spark finds elements having same key, then spark takes their values and performs certain operations on those values, and returns the same type of value. for eg, let us take, you have and RDD with elements:

[k,V1], [K,V2], here V1, V2 are f same type then the arguments to new Function2() could be three.

  1. from the value part of first K,V pair i.e V1.
  2. from the value part of second K,V pair i.e V2.
  3. the return type for the overridden call method which is again of type V1 and V2 (which can be the result of the function operation provided as part of call method).

and note that As RDD's are distributed across nodes, each node will perform their own reduce operation, and return the result to master, and the master again performs the final reduce operation on the results of workers.

I guess this explains your query.

Upvotes: 6

Sean Owen
Sean Owen

Reputation: 66876

I think your questions revolve around the reduce function here, which is a function of 2 arguments returning 1, whereas in a Reducer, you implement a function of many-to-many.

This API is simpler if less general. Here you provide an associative operation that can reduce any 2 values down to 1 (e.g. two integers sum to one). This is used to reduce all values for each key to 1. It's not necessary to provide an N-to-1 function since it can be accomplished with a 2-to-1 function. Here, you can't emit multiple values for one key.

The result are (key, reduced value) from each (key, bunch of values).

The Mapper and Reducer in classic Hadoop MapReduce were actually both quite similar (just that one takes a collection of values rather than single value per key) and let you implement a lot of patterns. In a way that's good, in a way that was wasteful and complex.

You can still reproduce what Mappers and Reducers do, but the method in Spark is mapPartitions, possibly paired with groupByKey. These are the most general operations you might consider, and I'm not saying you should emulate MapReduce this way in Spark. In fact it's unlikely to be efficient. But it is possible.

Upvotes: 6

Related Questions