Reputation: 2316
I have this code where I am reading a file in ipython
using pyspark
. What I am trying to do is to add a piece to it which forms a list based on a particular column read from the file but when I try to execute it then the list comes out to be empty and nothing gets appended to it. My code is:
list1 = []
def file_read(line):
list1.append(line[10])
# bunch of other code which process other column indexes on `line`
inputData = sc.textFile(fileName).zipWithIndex().filter(lambda (line,rownum): rownum>0).map(lambda (line, rownum): line)
column_val = (inputData
.map(lambda line: line.split(","))
.filter(lambda line: len(line) >1 )
.map(file_read))
WHen I execute this part of code the list1
still comes to be empty even though there's data in line[10]
as I am using it in other parts of the code in the same function above. It seems as if it is just not appending it to the list. How can I form the list above?
Upvotes: 1
Views: 2667
Reputation: 330193
Well, it actually does append to the list1
, problem is not to the one you're thinking about. Every variable referenced in the closures are serialized and send to the workers. It applies to list1
as well.
Every partition receives it's own copy of the list1
, when file_read
is called data is appended to this copy, and when a given map phase is finished it goes out of scope and is discarded.
Not particularly elegant piece of code but you should see that it is really what is happening here:
rdd = sc.parallelize(range(100), 5)
line1 = []
def file_read(line):
list1.append(line)
print len(list1)
return line
xs = rdd.map(file_read).collect()
Edit
Spark provides two types of shared variable. Broadcast variables, which are read only from the worker perspective, and accumulators which are write only from the driver perspective.
By default accumulators support only numeric variables and are intended to be used mostly as counters. It is possible to define custom accumulators though. To do that you have to extend AccumulatorParam
class and provide custom zero
and addInPlace
implementations:
class ListParam(AccumulatorParam):
def zero(self, v):
return []
def addInPlace(self, acc1, acc2):
acc1.extend(acc2)
return acc1
Next you can redefine file_read
as follows:
def file_read1(line):
global list1 # Required otherwise the next line will fail
list1 += [line]
return line
Example usage:
list1 = sc.accumulator([], ListParam())
rdd = sc.parallelize(range(10)).map(file_read1).collect()
list1.value
Even if it is possible to use accumulator like this it is probably to expensive to be used in practice and in the worst case scenario it can crash the driver. Instead you can simply use another transformation:
tmp = (inputData
.map(lambda line: line.split(","))
.filter(lambda line: len(line) >1 ))
def line_read2(line): return ... # Just a core logic
line1 = tmp.map(lambda line: line[10])
column_val = tmp.map(line_read2)
Side note:
Code you've provided doesn't do anything. Transformations in Spark are just the descriptions of what has to be done, but until you call an action data nothing is really executed.
Upvotes: 7