Don Kirkby
Don Kirkby

Reputation: 56620

Memory usage steadily growing for multiprocessing.Pool.imap_unordered

I just noticed that my program is using more and more memory as it processes a large file. It's only processing one line at a time, though, so I couldn't figure out why it would keep using more memory.

After a lot of digging, I realised that the program has three parts:

  1. Load the data, one line at a time.
  2. Process each line in a multiprocessing.Pool using imap_unordered().
  3. Process each line in a single thread.

If steps 1 and 2 are faster than step 3, then the results from the pool workers will queue up, consuming memory.

How can I throttle the data that I feed into the pool for step 2, so it doesn't get ahead of the consumer in step 3?

This looks similar to another multiprocessing question, but it's not clear to me where the delay is in that question.

Here's a small example that demonstrates the problem:

import logging
import os
import multiprocessing
from time import sleep

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s:%(process)d:%(thread)d:%(message)s')
logger = logging.getLogger()

def process_step1():
    data = 'a' * 100000
    for i in xrange(10000):
        sleep(.001)  # Faster than step 3.
        yield data
        if i % 1000 == 0:
            logger.info('Producing %d.', i)
    logger.info('Finished producing.')


def process_step2(data):
    return data.upper()


def process_step3(up_data):
    assert up_data == 'A' * 100000
    sleep(.005)  # Slower than step 1.


def main():
    pool = multiprocessing.Pool(processes=10)
    logger.info('Starting.')
    loader = process_step1()
    processed = pool.imap_unordered(process_step2, loader)
    for i, up_data in enumerate(processed):
        process_step3(up_data)
        if i % 500 == 0:
            logger.info('Consuming %d, using %0.1f MB.', i, get_memory())
    logger.info('Done.')


def get_memory():
    """ Look up the memory usage, return in MB. """
    proc_file = '/proc/{}/status'.format(os.getpid())
    scales = {'KB': 1024.0, 'MB': 1024.0 * 1024.0}
    with open(proc_file, 'rU') as f:
        for line in f:
            if 'VmSize:' in line:
                fields = line.split()
                size = int(fields[1])
                scale = fields[2].upper()
                return size*scales[scale]/scales['MB']
    return 0.0  # Unknown

main()

When that runs, I see a steady increase in memory use until step 1 finishes. If I let it run long enough after that, the memory use will start to decrease.

2016-12-01 15:37:50,859:6414:139712380557056:Starting.
2016-12-01 15:37:50,861:6414:139712266237696:Producing 0.
2016-12-01 15:37:50,868:6414:139712380557056:Consuming 0, using 255.0 MB.
2016-12-01 15:37:52,054:6414:139712266237696:Producing 1000.
2016-12-01 15:37:53,244:6414:139712266237696:Producing 2000.
2016-12-01 15:37:53,421:6414:139712380557056:Consuming 500, using 383.0 MB.
2016-12-01 15:37:54,446:6414:139712266237696:Producing 3000.
2016-12-01 15:37:55,635:6414:139712266237696:Producing 4000.
2016-12-01 15:37:55,976:6414:139712380557056:Consuming 1000, using 511.2 MB.
2016-12-01 15:37:56,831:6414:139712266237696:Producing 5000.
2016-12-01 15:37:58,019:6414:139712266237696:Producing 6000.
2016-12-01 15:37:58,529:6414:139712380557056:Consuming 1500, using 703.2 MB.
2016-12-01 15:37:59,209:6414:139712266237696:Producing 7000.
2016-12-01 15:38:00,406:6414:139712266237696:Producing 8000.
2016-12-01 15:38:01,084:6414:139712380557056:Consuming 2000, using 831.5 MB.
2016-12-01 15:38:01,602:6414:139712266237696:Producing 9000.
2016-12-01 15:38:02,802:6414:139712266237696:Finished producing.
2016-12-01 15:38:03,640:6414:139712380557056:Consuming 2500, using 959.5 MB.
2016-12-01 15:38:06,199:6414:139712380557056:Consuming 3000, using 959.5 MB.

Upvotes: 10

Views: 5181

Answers (3)

TimelyR Huang
TimelyR Huang

Reputation: 11

I ran into similar issue, and I think the bottleneck effect mentioned by Memory usage steadily growing for multiprocessing.Pool.imap_unordered should be the case. I managed to tune down the memory build up by increasing the chunksize to > 50 in imap_unordered, and it works magically.

Also I have found that if you are using a slice of a large numpy array in the subprocess, if you are not copying the slice, but directly using the slice, the RAM will build up quickly.

Upvotes: 0

Ali
Ali

Reputation: 500

I think there is no need for semaphore.

You can limit the number of task per child process

multiprocessing.Pool(maxtasksperchild=1)

maxtasksperchild is the number of tasks a worker process can complete before it will exit and be replaced with a fresh worker process, to enable unused resources to be freed. The default maxtasksperchild is None, which means worker processes will live as long as the pool. link

Upvotes: -1

Don Kirkby
Don Kirkby

Reputation: 56620

It seems like Pool.imap_unordered() launches a new thread to iterate over the input sequence generated by step 1, so we need to throttle that thread from the main thread that is running step 3. The Semaphore class is designed for limiting one thread from another, so we call acquire() before we produce each line, and release() when we consume each line. If we start the semaphore at some arbitrary value like 100, then it will produce a buffer of 100 lines before blocking and waiting for the consumer to catch up.

import logging
import os
import multiprocessing
from threading import Semaphore
from time import sleep

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s:%(process)d:%(thread)d:%(message)s')
logger = logging.getLogger()

def process_step1(semaphore):
    data = 'a' * 100000
    for i in xrange(10000):
        semaphore.acquire()
        sleep(.001)  # Faster than step 3.
        yield data
        if i % 1000 == 0:
            logger.info('Producing %d.', i)
    logger.info('Finished producing.')


def process_step2(data):
    return data.upper()


def process_step3(up_data, semaphore):
    assert up_data == 'A' * 100000
    sleep(.005)  # Slower than step 1.
    semaphore.release()


def main():
    pool = multiprocessing.Pool(processes=10)
    semaphore = Semaphore(100)
    logger.info('Starting.')
    loader = process_step1(semaphore)
    processed = pool.imap_unordered(process_step2, loader)
    for i, up_data in enumerate(processed):
        process_step3(up_data, semaphore)
        if i % 500 == 0:
            logger.info('Consuming %d, using %0.1f MB.', i, get_memory())
    logger.info('Done.')


def get_memory():
    """ Look up the memory usage, return in MB. """
    proc_file = '/proc/{}/status'.format(os.getpid())
    scales = {'KB': 1024.0, 'MB': 1024.0 * 1024.0}
    with open(proc_file, 'rU') as f:
        for line in f:
            if 'VmSize:' in line:
                fields = line.split()
                size = int(fields[1])
                scale = fields[2].upper()
                return size*scales[scale]/scales['MB']
    return 0.0  # Unknown

main()

Now the memory usage is steady, because the producer doesn't get very far ahead of the consumer.

2016-12-01 15:52:13,833:6695:140124578850560:Starting.
2016-12-01 15:52:13,835:6695:140124535109376:Producing 0.
2016-12-01 15:52:13,841:6695:140124578850560:Consuming 0, using 255.0 MB.
2016-12-01 15:52:16,424:6695:140124578850560:Consuming 500, using 255.0 MB.
2016-12-01 15:52:18,498:6695:140124535109376:Producing 1000.
2016-12-01 15:52:19,015:6695:140124578850560:Consuming 1000, using 255.0 MB.
2016-12-01 15:52:21,602:6695:140124578850560:Consuming 1500, using 255.0 MB.
2016-12-01 15:52:23,675:6695:140124535109376:Producing 2000.
2016-12-01 15:52:24,192:6695:140124578850560:Consuming 2000, using 255.0 MB.
2016-12-01 15:52:26,776:6695:140124578850560:Consuming 2500, using 255.0 MB.
2016-12-01 15:52:28,846:6695:140124535109376:Producing 3000.
2016-12-01 15:52:29,362:6695:140124578850560:Consuming 3000, using 255.0 MB.
2016-12-01 15:52:31,951:6695:140124578850560:Consuming 3500, using 255.0 MB.
2016-12-01 15:52:34,022:6695:140124535109376:Producing 4000.
2016-12-01 15:52:34,538:6695:140124578850560:Consuming 4000, using 255.0 MB.
2016-12-01 15:52:37,128:6695:140124578850560:Consuming 4500, using 255.0 MB.
2016-12-01 15:52:39,193:6695:140124535109376:Producing 5000.
2016-12-01 15:52:39,704:6695:140124578850560:Consuming 5000, using 255.0 MB.
2016-12-01 15:52:42,291:6695:140124578850560:Consuming 5500, using 255.0 MB.
2016-12-01 15:52:44,361:6695:140124535109376:Producing 6000.
2016-12-01 15:52:44,878:6695:140124578850560:Consuming 6000, using 255.0 MB.
2016-12-01 15:52:47,465:6695:140124578850560:Consuming 6500, using 255.0 MB.

Update

If you're using multiprocessing.Pool, consider upgrading to concurrent.futures.process.ProcessPoolExecutor, because it handles killed workers better. It doesn't affect the problem described in this question.

Upvotes: 10

Related Questions