BHa
BHa

Reputation: 68

Python Multiprocessing on Iterator

I am trying to use multiprocessing in handling csv files in excess of 2GB. The problem is that the input is only being consumed in one process while the others seem to be idling.

The following recreates the problem I am encountering. Is it possible to use multiprocess with an iterator? Consuming the full input into memory is not desired.

import csv
import multiprocessing
import time

def something(row):
    # print row[0]
    # pass
    return row

def main():
    start = time.time()
    i = open("input.csv")
    reader = csv.reader(i, delimiter='\t')

    print reader.next()

    p = multiprocessing.Pool(16)
    print "Starting processes"
    j = p.imap(something, reader, chunksize=10000)

    count= 1
    while j:
        print j.next()

    print time.time() - start


if __name__ == '__main__':
    main()

Upvotes: 2

Views: 7764

Answers (1)

chapelo
chapelo

Reputation: 2562

I think you are confusing "processes" with "processors".

Your program is definitely spawning multiple processes at the same time, as you can verify in the system or resources monitor while your program is running. How many processors or CPU cores are being used depends mainly on the OS, and has a lot to do with how CPU intensive is the task you are delegating to each process.

Make a little modification to your something function, to introduce a sleep time, that simulates the work being done in the function:

def something(row):
    time.sleep(.4)
    return row

Now, first run your function sequentially to each row in your file, and notice that each result is coming one by one every 400ms.

def main():
    with open("input.csv") as i:
        reader = csv.reader(i)
        print (next(reader))

        # SEQUENTIALLY:
        for row in reader:
            result = something(row)
            print (result)

Now try with the pool of workers. Keep it at a low number, say 4 workers, and you will see that the result comes every 400ms, but in groups of 4 (or roughly the number of workers in the pool):

def main():
    with open("input.csv") as i:
        reader = csv.reader(i)
        print (next(reader))

        # IN PARALLEL 
        print ("Starting processes")
        p = multiprocessing.Pool(4)
        results = p.imap(something, reader)
        for result in results:
            print(result)  # one result is the processing of 4 rows...

While running in parallel, check the system monitor and look for how many "python" processes are being executed. Should be one plus the number of workers.

I hope this explanation is useful.

Upvotes: 4

Related Questions