susjam
susjam

Reputation: 23

Multiprocessing pool to read lines in a file

I'm trying to read multiple lines of a file at once in order to separate those lines into two separate lists. The cleanLine function essentially takes in the line that it is fed and cleans it, returning a line with no whitespaces. Right now my code compiles and returns the same results it did without multiprocessing, however, the overall runtime of the script has not improved so I am unsure if it is actually spawning multiple processes at once or if it is just doing one at a time still. In this specific case I'm not really sure how to tell if its actually creating multiple processes or just one. Is there any reason this portion of the script does not run any faster or am I doing this incorrectly? Any help or feedback would be greatly appreciated.

Snippet of the code:

import multiprocessing
from multiprocessing import Pool

filediff = open("sample.txt", "r", encoding ="latin-1")
filediffline = filediff.readlines()

pos = []
neg = []
cpuCores = multiprocessing.cpu_count() - 1
pool = Pool(processes = cpuCores)

for line in filediffline:
   result = pool.apply_async(cleanLine, [line]).get()
   
   if line.startswith("+"):
       pos.append(result)

   elif line.startswith("-"):
       neg.append(result)


pool.close()
pool.join()

Upvotes: 2

Views: 1841

Answers (3)

AdamF
AdamF

Reputation: 2950

you are working with IO. i am not sure if your processing is CPU-bound or IO-Bound operation/process. As mentioned before, if you read the whole line to list, thats mean that all IO that you have been read lays in RAM (consider to use file.read() in this case! this have side effects if your data or file is too big), and all processing of this data done on the list then you will see some boost in the performance (depends on the list size), only in this case, where you have big enough list on ram, i recommend to use the concurent.futures module, see below:

import concurrent.futures

def process_line(line):
    return line.strip()

def execute(filename):
    lines = []
    with open(filename, encoding=encoding) as fp:
        lines = fp.read()
    with concurrent.futures.ProcessPoolExecutor() as executor:
      results = [executor.submit(process_line(line)) for line in lines]

Upvotes: 0

tdelaney
tdelaney

Reputation: 77387

As mentioned, result = pool.apply_async(cleanLine, [line]).get() sends a single line to the subprocess and waits for it to return. This is slower than just doing the processing in the parent process. Even if you rework that bit, its unlikely that anything will speed up, unless that preprocessing is CPU intensive.

An alternative is to build a pipeline, either by putting the preprocessing into a separate file and executing it with subprocess.Popen or by using multiprocessing.Pipe. With this method, the file read and line processing are both done in the separate process.

This has the advantage that file read + preprocessing overlap the work of the main process. But if that preprocessing is trivial compared to the cost of serializing the object to get it from one process to another, you won't see any speedup.

import multiprocessing as mp

pos = []
neg = []

def line_cleaner(line):
    return line.strip()

def cleaner(filename, encoding, pipe):
    try:
        with open(filename, encoding=encoding) as fp:
            for line in fp:
                line = line_cleaner(line)
                if line:
                    pipe.send(line)
    finally:
        pipe.close()

if __name__ == "__main__":
    receiver, sender = mp.Pipe(duplex=False)
    process = mp.Process(target=cleaner, 
        args=("sample.txt", "latin-1", sender))
    process.start()
    sender.close() # so child holds only reference
    try:
        while True:
            line  = receiver.recv()
            if line.startswith("+"):
               pos.append(line)
            elif line.startswith("-"):
               neg.append(line)
    except EOFError:
        pass # child exit
    finally:
        process.join()

print(pos, neg)

Upvotes: 1

alex_noname
alex_noname

Reputation: 32233

Using apply_async().get() is equivalent of blocking call apply(). For asynchronous processing try to leverage apply_async with callback parameter for processing result. Keep in mind that callback is invoked in separate thread.

Upvotes: 0

Related Questions