abhinavkulkarni
abhinavkulkarni

Reputation: 2409

RDD creation and variable binding

I have a very simple code:

def fun(x, n):
    return (x, n)

rdds = []
for i in range(2):
    rdd = sc.parallelize(range(5*i, 5*(i+1)))
    rdd = rdd.map(lambda x: fun(x, i))
    rdds.append(rdd)

a = sc.union(rdds)
print a.collect()

I had expected the output to be the following:

[(0, 0), (1, 0), (2, 0), (3, 0), (4, 0), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1)]

However, the output is the following:

[(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1)]

This is bewildering, to say the least.

It seems, due to lazy evaluation of RDDs, the value of i that is being used to create RDDs is the one it bears when collect() is called, which is 1 (from the last run of the for loop).

Now, both elements of the tuple are derived from i.

But it seems, for the first element of the tuple, i bears values 0 and 1 while for the second element of the tuple i bears the value 2.

Can somebody please explain what's happening?

Thanks.

Upvotes: 0

Views: 653

Answers (2)

Mohammad Yusuf
Mohammad Yusuf

Reputation: 17074

sc.parallelize() is an action which will be executed instantly. So both the values of i i.e 0 and 1 will be used.

But in case of rdd.map() only the last value of i will be used when you call collect() later.

rdd = sc.parallelize(range(5*i, 5*(i+1)))
rdd = rdd.map(lambda x: fun(x, i))

Here rdd.map wont transform the rdd, it will just create DAG(Directed Acyclic Graph), i.e lambda function will not be applied to elements of rdd.

When you call collect(), then the lambda function will be called but by that time i has a value of 1. If you reassign i=10 before calling collect then that value of i will be used.

Upvotes: 0

Zhang Tong
Zhang Tong

Reputation: 4719

just change

rdd = rdd.map(lambda x: fun(x, i))

to

rdd = rdd.map(lambda x, i=i: (x, i))

That is only about Python, look at this

https://docs.python.org/2.7/tutorial/controlflow.html#default-argument-values

Upvotes: 2

Related Questions