Reputation: 23
I'm trying to read multiple lines of a file at once in order to separate those lines into two separate lists. The cleanLine function essentially takes in the line that it is fed and cleans it, returning a line with no whitespaces. Right now my code compiles and returns the same results it did without multiprocessing, however, the overall runtime of the script has not improved so I am unsure if it is actually spawning multiple processes at once or if it is just doing one at a time still. In this specific case I'm not really sure how to tell if its actually creating multiple processes or just one. Is there any reason this portion of the script does not run any faster or am I doing this incorrectly? Any help or feedback would be greatly appreciated.
Snippet of the code:
import multiprocessing
from multiprocessing import Pool
filediff = open("sample.txt", "r", encoding ="latin-1")
filediffline = filediff.readlines()
pos = []
neg = []
cpuCores = multiprocessing.cpu_count() - 1
pool = Pool(processes = cpuCores)
for line in filediffline:
result = pool.apply_async(cleanLine, [line]).get()
if line.startswith("+"):
pos.append(result)
elif line.startswith("-"):
neg.append(result)
pool.close()
pool.join()
Upvotes: 2
Views: 1841
Reputation: 2950
you are working with IO
. i am not sure if your processing is CPU-bound
or IO-Bound
operation/process. As mentioned before, if you read the whole line to list
, thats mean that all IO
that you have been read lays in RAM (consider to use file.read() in this case! this have side effects if your data or file is too big), and all processing of this data done on the list then you will see some boost in the performance (depends on the list size), only in this case, where you have big enough list on ram, i recommend to use the concurent.futures
module, see below:
import concurrent.futures
def process_line(line):
return line.strip()
def execute(filename):
lines = []
with open(filename, encoding=encoding) as fp:
lines = fp.read()
with concurrent.futures.ProcessPoolExecutor() as executor:
results = [executor.submit(process_line(line)) for line in lines]
Upvotes: 0
Reputation: 77387
As mentioned, result = pool.apply_async(cleanLine, [line]).get()
sends a single line to the subprocess and waits for it to return. This is slower than just doing the processing in the parent process. Even if you rework that bit, its unlikely that anything will speed up, unless that preprocessing is CPU intensive.
An alternative is to build a pipeline, either by putting the preprocessing into a separate file and executing it with subprocess.Popen
or by using multiprocessing.Pipe
. With this method, the file read and line processing are both done in the separate process.
This has the advantage that file read + preprocessing overlap the work of the main process. But if that preprocessing is trivial compared to the cost of serializing the object to get it from one process to another, you won't see any speedup.
import multiprocessing as mp
pos = []
neg = []
def line_cleaner(line):
return line.strip()
def cleaner(filename, encoding, pipe):
try:
with open(filename, encoding=encoding) as fp:
for line in fp:
line = line_cleaner(line)
if line:
pipe.send(line)
finally:
pipe.close()
if __name__ == "__main__":
receiver, sender = mp.Pipe(duplex=False)
process = mp.Process(target=cleaner,
args=("sample.txt", "latin-1", sender))
process.start()
sender.close() # so child holds only reference
try:
while True:
line = receiver.recv()
if line.startswith("+"):
pos.append(line)
elif line.startswith("-"):
neg.append(line)
except EOFError:
pass # child exit
finally:
process.join()
print(pos, neg)
Upvotes: 1
Reputation: 32233
Using apply_async().get()
is equivalent of blocking call apply()
. For asynchronous processing try to leverage apply_async
with callback parameter for processing result. Keep in mind that callback is invoked in separate thread.
Upvotes: 0