MWyatt
MWyatt

Reputation: 23

PySpark's reduceByKey not working as expected

I'm writing a large PySpark program and I've recently run into trouble when using reduceByKey on an RDD. I've been able to recreate the problem with a simple test program. The code is:

from pyspark import SparkConf, SparkContext

APP_NAME = 'Test App'

def main(sc):
    test = [(0, [i]) for i in xrange(100)]
    test = sc.parallelize(test)
    test = test.reduceByKey(method)
    print test.collect()

def method(x, y):
    x.append(y[0])
    return x

if __name__ == '__main__':
    # Configure Spark
    conf = SparkConf().setAppName(APP_NAME)
    conf = conf.setMaster('local[*]')
    sc = SparkContext(conf=conf)

    main(sc)

I would expect the output to be (0, [0,1,2,3,4,...,98,99]) based on the Spark documentation. Instead, I get the following output:

[(0, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 24, 36, 48, 60, 72, 84])] 

Could someone please help me understand why this output is being generated?

As a side note, when I use

def method(x, y):
    x = x + y
    return x

I get the expected output.

Upvotes: 2

Views: 2137

Answers (1)

zero323
zero323

Reputation: 330073

First of all it looks like you actually want groupByKey not reduceByKey:

rdd = sc.parallelize([(0, i) for i in xrange(100)])
grouped = rdd.groupByKey()
k, vs = grouped.first()
assert len(list(vs)) == 100

Could someone please help me understand why this output is being generated?

reduceByKey assumes that f is associative and your method is clearly not. Depending on the order of operations the output is different. Lets say you start with following data for a certain key:

[1], [2], [3], [4]

Now add lets add some parentheses:

  1. ((([1], [2]), [3]), [4])
  2. (([1, 2], [3]), [4])
  3. ([1, 2, 3], [4])
  4. [1, 2, 3, 4]

and with another set of parentheses

  1. (([1], ([2], [3])), [4])
  2. (([1], [2, 3]), [4])
  3. ([1, 2], [4])
  4. [1, 2, 4]

When you rewrite it as follows:

method = lambda x, y: x + y

or simply

from operator import add
method = add

you get an associative function and it works as expected.

Generally speaking for reduce* operations you want functions which are both associative and commutative.

Upvotes: 1

Related Questions