Reputation: 2409
I have a very simple code:
def fun(x, n):
return (x, n)
rdds = []
for i in range(2):
rdd = sc.parallelize(range(5*i, 5*(i+1)))
rdd = rdd.map(lambda x: fun(x, i))
rdds.append(rdd)
a = sc.union(rdds)
print a.collect()
I had expected the output to be the following:
[(0, 0), (1, 0), (2, 0), (3, 0), (4, 0), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1)]
However, the output is the following:
[(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1)]
This is bewildering, to say the least.
It seems, due to lazy evaluation of RDDs, the value of i
that is being used to create RDDs is the one it bears when collect()
is called, which is 1 (from the last run of the for
loop).
Now, both elements of the tuple are derived from i
.
But it seems, for the first element of the tuple, i
bears values 0 and 1 while for the second element of the tuple i
bears the value 2.
Can somebody please explain what's happening?
Thanks.
Upvotes: 0
Views: 653
Reputation: 17074
sc.parallelize()
is an action which will be executed instantly. So both the values of i
i.e 0
and 1
will be used.
But in case of rdd.map()
only the last value of i
will be used when you call collect()
later.
rdd = sc.parallelize(range(5*i, 5*(i+1)))
rdd = rdd.map(lambda x: fun(x, i))
Here rdd.map wont transform the rdd, it will just create DAG(Directed Acyclic Graph), i.e lambda function will not be applied to elements of rdd.
When you call collect(), then the lambda function will be called but by that time i
has a value of 1. If you reassign i=10
before calling collect then that value of i
will be used.
Upvotes: 0
Reputation: 4719
just change
rdd = rdd.map(lambda x: fun(x, i))
to
rdd = rdd.map(lambda x, i=i: (x, i))
That is only about Python, look at this
https://docs.python.org/2.7/tutorial/controlflow.html#default-argument-values
Upvotes: 2