Roberto
Roberto

Reputation: 2194

Python multiprocessing EOF error on csv files

I am trying to implement a multiprocessing approach for reading and comparing two csv files. To get me started, I started with the code example from embarassingly parallel problems, which sums integers in a file. The problem is that the example will not run for me. (I am running Python 2.6 on Windows.)

I get the following EOF error:

File "C:\Python26\lib\pickle.py", line 880, in load_eof
raise EOFError
EOFError

At this line:

self.pin.start()

I found some examples that suggested the problem might be the csv opening method needs to be 'rb'. I tried, but that did not work either.

Then I tried to simplify the code to reproduce the error at the most basic level. I got the same error, on the same line. Even when I simplified such that the parse_input_csv function does not even read the file. (Not sure how EOF is triggered if the file does not get read?)

import csv
import multiprocessing

class CSVWorker(object):
    def __init__(self, infile, outfile):
        #self.infile = open(infile)
        self.infile = open(infile, 'rb') #try rb for Windows

        self.in_csvfile = csv.reader(self.infile)
        self.inq = multiprocessing.Queue()    
        self.pin = multiprocessing.Process(target=self.parse_input_csv, args=())

        self.pin.start()
        self.pin.join()    
        self.infile.close()

    def parse_input_csv(self):
#         for i, row in enumerate(self.in_csvfile):
#             self.inq.put( (i, row) )

#         for row in self.in_csvfile:
#             print row
#             #self.inq.put( row )

        print 'yup'


if __name__ == '__main__':        
    c = CSVWorker('random_ints.csv', 'random_ints_sums.csv')
    print 'done' 

Finally, I tried pulling it all outside a Class. This works if I do not iterate over the csv, but gives the same error if I do.

def manualCSVworker(infile, outfile):
    f = open(infile, 'rb')
    in_csvfile = csv.reader(f)        
    inq = multiprocessing.Queue()

    # this works (no reading csv file)
    pin = multiprocessing.Process(target=manual_parse_input_csv, args=(in_csvfile,))

    # this does not work (tries to read csv, and fails with EOFError)
    #pin = multiprocessing.Process(target=print_yup, args=())

    pin.start()
    pin.join()    
    f.close()

def print_yup():
    print 'yup'

def manual_parse_input_csv(csvReader):    
    for row in csvReader:
        print row

if __name__ == '__main__':        
    manualCSVworker('random_ints.csv', 'random_ints_sums.csv')
    print 'done' 

Can someone please help me identify the problem here?

EDIT: Just thought I would post the working code. I ended up dropping the Class implementation. As suggested by Tim Peters, I only pass filenames (not open files).

On 5 million rows x 2 columns, I noticed about 20% time improvement with 2 processors vs 1. I expected a bit more, but I think the problem is extra overhead of queueing. As per this thread, an improvement would probably be to queue records in blocks of 100 or more (rather than each line).

import csv
import multiprocessing
from datetime import datetime

NUM_PROCS = multiprocessing.cpu_count()

def main(numprocsrequested, infile, outfile):

    inq = multiprocessing.Queue()
    outq = multiprocessing.Queue()

    numprocs = min(numprocsrequested, NUM_PROCS)

    pin = multiprocessing.Process(target=parse_input_csv, args=(infile,numprocs,inq,))
    pout = multiprocessing.Process(target=write_output_csv, args=(outfile,numprocs,outq,))
    ps = [ multiprocessing.Process(target=sum_row, args=(inq,outq,)) for i in range(numprocs)]

    pin.start()
    pout.start()
    for p in ps:
        p.start()

    pin.join()
    i = 0
    for p in ps:
        p.join()
        #print "Done", i
        i += 1
    pout.join()

def parse_input_csv(infile, numprocs, inq):
        """Parses the input CSV and yields tuples with the index of the row
        as the first element, and the integers of the row as the second
        element.

        The index is zero-index based.

        The data is then sent over inqueue for the workers to do their
        thing.  At the end the input thread sends a 'STOP' message for each
        worker.
        """
        f = open(infile, 'rb')
        in_csvfile = csv.reader(f)

        for i, row in enumerate(in_csvfile):
            row = [ int(entry) for entry in row ]
            inq.put( (i,row) )

        for i in range(numprocs):
            inq.put("STOP")

        f.close()

def sum_row(inq, outq):
    """
    Workers. Consume inq and produce answers on outq
    """
    tot = 0
    for i, row in iter(inq.get, "STOP"):
        outq.put( (i, sum(row)) )
    outq.put("STOP")

def write_output_csv(outfile, numprocs, outq):
    """
    Open outgoing csv file then start reading outq for answers
    Since I chose to make sure output was synchronized to the input there
    is some extra goodies to do that.

    Obviously your input has the original row number so this is not
    required.
    """

    cur = 0
    stop = 0
    buffer = {}
    # For some reason csv.writer works badly across threads so open/close
    # and use it all in the same thread or else you'll have the last
    # several rows missing
    f = open(outfile, 'wb')
    out_csvfile = csv.writer(f)

    #Keep running until we see numprocs STOP messages
    for works in range(numprocs):
        for i, val in iter(outq.get, "STOP"):
            # verify rows are in order, if not save in buffer
            if i != cur:
                buffer[i] = val
            else:
                #if yes are write it out and make sure no waiting rows exist
                out_csvfile.writerow( [i, val] )
                cur += 1
                while cur in buffer:
                    out_csvfile.writerow([ cur, buffer[cur] ])
                    del buffer[cur]
                    cur += 1
    f.close()

if __name__ == '__main__':

    startTime = datetime.now()
    main(4, 'random_ints.csv', 'random_ints_sums.csv')
    print 'done'
    print(datetime.now()-startTime)

Upvotes: 2

Views: 3495

Answers (1)

Tim Peters
Tim Peters

Reputation: 70705

Passing an object across processes requires "pickling" it on the sending end (creating a string representation of the object) and "unpickling" it on the receiving end (recreating an isomorphic object from the string representation). Unless you know exactly what you're doing, you should stick to passing builtin Python types (strings, ints, floats, lists, dicts, ...) or types implemented by multiprocessing (Lock(), Queue(), ...). Otherwise chances are good the pickle-unpickle dance won't work.

There's no chance that passing an open file will ever work, let alone an open file wrapped inside yet another object (such as returned by csv.reader(f)). When I ran your code, I got an error message from pickle:

pickle.PicklingError: Can't pickle <type '_csv.reader'>: it's not the same object as _csv.reader

Didn't you? Never ignore errors - unless, again, you know exactly what you're doing.

The solution is simple: as I said in a comment, open the file in the worker process, just passing its string path. For example, use this instead:

def manual_parse_input_csv(csvfile):
    f = open(csvfile,'rb')
    in_csvfile = csv.reader(f)
    for row in in_csvfile:
        print row
    f.close()

and take all that code out of manualCSVworker, and change the process creation line to:

pin = multiprocessing.Process(target=manual_parse_input_csv, args=(infile,))

See? That passes the file path, a plain string. That works :-)

Upvotes: 4

Related Questions