Jason Donnald
Jason Donnald

Reputation: 2316

Issue with creating a global list from map using PySpark

I have this code where I am reading a file in ipython using pyspark. What I am trying to do is to add a piece to it which forms a list based on a particular column read from the file but when I try to execute it then the list comes out to be empty and nothing gets appended to it. My code is:

list1 = []

def file_read(line):

    list1.append(line[10])
    # bunch of other code which process other column indexes on `line`

inputData = sc.textFile(fileName).zipWithIndex().filter(lambda (line,rownum): rownum>0).map(lambda (line, rownum): line)

column_val = (inputData
    .map(lambda line: line.split(","))
    .filter(lambda line: len(line) >1 )
    .map(file_read))

WHen I execute this part of code the list1 still comes to be empty even though there's data in line[10] as I am using it in other parts of the code in the same function above. It seems as if it is just not appending it to the list. How can I form the list above?

Upvotes: 1

Views: 2667

Answers (1)

zero323
zero323

Reputation: 330193

Well, it actually does append to the list1, problem is not to the one you're thinking about. Every variable referenced in the closures are serialized and send to the workers. It applies to list1 as well.

Every partition receives it's own copy of the list1, when file_read is called data is appended to this copy, and when a given map phase is finished it goes out of scope and is discarded.

Not particularly elegant piece of code but you should see that it is really what is happening here:

rdd = sc.parallelize(range(100), 5)

line1 = []

def file_read(line):
    list1.append(line)
    print len(list1)
    return line

xs = rdd.map(file_read).collect()

Edit

Spark provides two types of shared variable. Broadcast variables, which are read only from the worker perspective, and accumulators which are write only from the driver perspective.

By default accumulators support only numeric variables and are intended to be used mostly as counters. It is possible to define custom accumulators though. To do that you have to extend AccumulatorParam class and provide custom zero and addInPlace implementations:

class ListParam(AccumulatorParam):
    def zero(self, v):
        return []
    def addInPlace(self, acc1, acc2):
        acc1.extend(acc2)
        return acc1

Next you can redefine file_read as follows:

def file_read1(line):
    global list1 # Required otherwise the next line will fail
    list1 += [line]
    return line

Example usage:

list1 = sc.accumulator([], ListParam())

rdd = sc.parallelize(range(10)).map(file_read1).collect()
list1.value

Even if it is possible to use accumulator like this it is probably to expensive to be used in practice and in the worst case scenario it can crash the driver. Instead you can simply use another transformation:

tmp = (inputData
    .map(lambda line: line.split(","))
    .filter(lambda line: len(line) >1 ))

def line_read2(line): return ... # Just a core logic

line1 = tmp.map(lambda line: line[10])
column_val = tmp.map(line_read2)

Side note:

Code you've provided doesn't do anything. Transformations in Spark are just the descriptions of what has to be done, but until you call an action data nothing is really executed.

Upvotes: 7

Related Questions