risraelsen
risraelsen

Reputation: 253

Python: Writing to a single file with queue while using multiprocessing Pool

I have hundreds of thousands of text files that I want to parse in various ways. I want to save the output to a single file without synchronization problems. I have been using multiprocessing pool to do this to save time, but I can't figure out how to combine Pool and Queue.

The following code will save the infile name as well as the maximum number of consecutive "x"s in the file. However, I want all processes to save results to the same file, and not to different files as in my example. Any help on this would be greatly appreciated.

import multiprocessing

with open('infilenamess.txt') as f:
    filenames = f.read().splitlines()

def mp_worker(filename):
 with open(filename, 'r') as f:
      text=f.read()
      m=re.findall("x+", text)
      count=len(max(m, key=len))
      outfile=open(filename+'_results.txt', 'a')
      outfile.write(str(filename)+'|'+str(count)+'\n')
      outfile.close()

def mp_handler():
    p = multiprocessing.Pool(32)
    p.map(mp_worker, filenames)

if __name__ == '__main__':
    mp_handler()

Upvotes: 23

Views: 27764

Answers (3)

Iain Hunter
Iain Hunter

Reputation: 5057

Here's my approach using a multiprocessing Manager object. The nice thing about this approach is that when processing drops out of the manager with block in the run_multi() function, the filewriter queue is automatically closed making code very easy to read and you have no hassle trying to stop listening on the queue.

from functools import partial
from multiprocessing import Manager, Pool, Queue
from random import randint
import time

def run_multi():
    input = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    with Manager() as manager:
        pool = Pool()  # By default pool will size depending on cores available
        message_queue = manager.Queue()  # Queue for sending messages to file writer listener
        pool.apply_async(file_writer, (message_queue, ))  # Start file listener ahead of doing the work
        pool.map(partial(worker, message_queue=message_queue), input)  # Partial function allows us to use map to divide workload

def worker(input: int, message_queue: Queue):
    message_queue.put(input * 10)
    time.sleep(randint(1, 5))  # Simulate hard work

def file_writer(message_queue: Queue):
    with open("demo.txt", "a") as report:
        while True:
            report.write(f"Value is: {message_queue.get()}\n")

if __name__ == "__main__":
    run_multi()

Upvotes: 4

Raj
Raj

Reputation: 3908

I took the accepted answer and simplified it for my own understanding of how this works. I am posting it here in case it helps someone else.

import multiprocessing

def mp_worker(number):
    number += 1
    return number

def mp_handler():
    p = multiprocessing.Pool(32)
    numbers = list(range(1000))
    with open('results.txt', 'w') as f:
        for result in p.imap(mp_worker, numbers):
            f.write('%d\n' % result)

if __name__=='__main__':
    mp_handler()

Upvotes: 13

tdelaney
tdelaney

Reputation: 77407

Multiprocessing pools implement a queue for you. Just use a pool method that returns the worker return value to the caller. imap works well:

import multiprocessing 
import re

def mp_worker(filename):
    with open(filename) as f:
        text = f.read()
    m = re.findall("x+", text)
    count = len(max(m, key=len))
    return filename, count

def mp_handler():
    p = multiprocessing.Pool(32)
    with open('infilenamess.txt') as f:
        filenames = [line for line in (l.strip() for l in f) if line]
    with open('results.txt', 'w') as f:
        for result in p.imap(mp_worker, filenames):
            # (filename, count) tuples from worker
            f.write('%s: %d\n' % result)

if __name__=='__main__':
    mp_handler()

Upvotes: 45

Related Questions