sclee1
sclee1

Reputation: 1281

Parameters in ReduceByKey in Spark

While coding in Java in Spark, I have been facing the problems with parameters in reduceByKey in Spark. I didn't understand the parameters used in the reduceByKey function. I know that what reduceByKey means and the way it works. However, the codes below are a little different from the basic spark code examples (ex. word count example)

As you can see, there are two parameters in reduceByKey which are new KrukalReducer(numPoints) and numSubGraphs. numSubGraphs is integer value and the KruskalReducer is java class.

 mstToBeMergedResult = mstToBeMerged.mapToPair(new SetPartitionIdFunction(K)).reduceByKey(
                    new KruskalReducer(numPoints), numSubGraphs);

I did't understand why such integer variables are used for reduceByKey. I tried to connect two parameters to the concept with ReduceByKey but failed to get it.

I attached the java class for your information.

 public static final class KruskalReducer implements Function2<Iterable<Edge>, Iterable<Edge>, Iterable<Edge>>{
        private static final long serialVersionUID = 1L;
        private transient UnionFind uf = null;
        private final int numPoints;

        public KruskalReducer(int numPoints) {
            this.numPoints = numPoints;
        }

        // merge sort
        @Override
        public Iterable<Edge> call(Iterable<Edge> leftEdges, Iterable<Edge> rightEdges) throws Exception{
            uf = new UnionFind(numPoints);
            List<Edge> edges = Lists.newArrayList();
            Iterator<Edge> leftEdgesIterator = leftEdges.iterator();
            Iterator<Edge> rightEdgesIterator = rightEdges.iterator();
            Edge leftEdge = leftEdgesIterator.next();
            Edge rightEdge = rightEdgesIterator.next();
            Edge minEdge;
            boolean isLeft;
            Iterator<Edge> minEdgeIterator;
            final int numEdges = numPoints - 1;
            do {
                if (leftEdge.getWeight() < rightEdge.getWeight()) {
                    minEdgeIterator = leftEdgesIterator;
                    minEdge = leftEdge;
                    isLeft = true;
                } else {
                    minEdgeIterator = rightEdgesIterator;
                    minEdge = rightEdge;
                    isLeft = false;
                }
                if (uf.unify(minEdge.getLeft(), minEdge.getRight())) {
                    edges.add(minEdge);
                }
                minEdge = minEdgeIterator.hasNext() ? minEdgeIterator.next() : null;
                if (isLeft) {
                    leftEdge = minEdge;
                } else {
                    rightEdge = minEdge;
                }
            }while (minEdge != null && edges.size() < numEdges);
            minEdge = isLeft ? rightEdge : leftEdge;
            minEdgeIterator = isLeft ? rightEdgesIterator : leftEdgesIterator;

            while (edges.size() < numEdges && minEdgeIterator.hasNext()) {
                if (uf.unify(minEdge.getLeft(), minEdge.getRight())) {
                    edges.add(minEdge);
                }
                minEdge = minEdgeIterator.next();
            }
            return edges;
        }
    }

Additionally, the full related codes are shown as below. (You can skip this code if you get confused)

   JavaPairRDD<Integer, Iterable<Edge>> mstToBeMerged = partitions.combineByKey(new CreateCombiner(),
                    new Merger(), new KruskalReducer(numPoints));


JavaPairRDD<Integer, Iterable<Edge>> mstToBeMergedResult = null;
while (numSubGraphs > 1){
     numSubGraphs = (numSubGraphs + (K - 1)) / K;
     mstToBeMergedResult = mstToBeMerged.mapToPair(new SetPartitionIdFunction(K)).reduceByKey(
              new KruskalReducer(numPoints), numSubGraphs);
     mstToBeMerged = mstToBeMergedResult;
     displayResults(mstToBeMerged);
}


private static class CreateCombiner implements Function<Edge, Iterable<Edge>>{

        private static final long serialVersionUID = 1L;

        @Override
        public Iterable<Edge> call(Edge edge) throws Exception {
            List<Edge> edgeList = Lists.newArrayListWithCapacity(1);
            edgeList.add(edge);
            return edgeList;
        }
    }

    private static class Merger implements Function2<Iterable<Edge>, Edge, Iterable<Edge>>{

        private static final long serialVersionUID = 1L;

        @Override
        public Iterable<Edge> call(Iterable<Edge> list, Edge edge) throws Exception {
            List<Edge> mergeList = Lists.newArrayList(list);
            mergeList.add(edge);
            return mergeList;
        }
    }

Upvotes: 0

Views: 828

Answers (1)

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149518

I did't understand why such integer variables are used for reduceByKey. I tried to connect two parameters to the concept with ReduceByKey but failed to get it.

If I'm reading the right overload:

def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairRDD[K, V] =
  fromRDD(rdd.reduceByKey(func, numPartitions))

Then the number you're passing is the number of partitions in the underlying RDD. Because reduceByKey is a shuffle boundary operation, data will get re-partitioned and passing that numbers allows you to control how many partitions will be allocated.

Upvotes: 1

Related Questions