Reputation: 23
I'm writing a large PySpark program and I've recently run into trouble when using reduceByKey
on an RDD. I've been able to recreate the problem with a simple test program. The code is:
from pyspark import SparkConf, SparkContext
APP_NAME = 'Test App'
def main(sc):
test = [(0, [i]) for i in xrange(100)]
test = sc.parallelize(test)
test = test.reduceByKey(method)
print test.collect()
def method(x, y):
x.append(y[0])
return x
if __name__ == '__main__':
# Configure Spark
conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster('local[*]')
sc = SparkContext(conf=conf)
main(sc)
I would expect the output to be (0, [0,1,2,3,4,...,98,99])
based on the Spark documentation. Instead, I get the following output:
[(0, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 24, 36, 48, 60, 72, 84])]
Could someone please help me understand why this output is being generated?
As a side note, when I use
def method(x, y):
x = x + y
return x
I get the expected output.
Upvotes: 2
Views: 2137
Reputation: 330073
First of all it looks like you actually want groupByKey
not reduceByKey
:
rdd = sc.parallelize([(0, i) for i in xrange(100)])
grouped = rdd.groupByKey()
k, vs = grouped.first()
assert len(list(vs)) == 100
Could someone please help me understand why this output is being generated?
reduceByKey
assumes that f
is associative and your method
is clearly not. Depending on the order of operations the output is different. Lets say you start with following data for a certain key:
[1], [2], [3], [4]
Now add lets add some parentheses:
((([1], [2]), [3]), [4])
(([1, 2], [3]), [4])
([1, 2, 3], [4])
[1, 2, 3, 4]
and with another set of parentheses
(([1], ([2], [3])), [4])
(([1], [2, 3]), [4])
([1, 2], [4])
[1, 2, 4]
When you rewrite it as follows:
method = lambda x, y: x + y
or simply
from operator import add
method = add
you get an associative function and it works as expected.
Generally speaking for reduce*
operations you want functions which are both associative and commutative.
Upvotes: 1