Reputation: 630
When having used Python's multiprocessing Pool.map()
, I do not get back my memory.
Over 1GB of memory is still occupied, although the function with the Pool
is exited, everything is closed, and I even try to delete the variable of the Pool
and explicitly call the garbage collector.
When, in the code shown below, un-commenting the two lines above the pool.map()
(and commenting the pool.map()
line) everything looks OK, but as soon as using multiprocessing
the memory seems not to get freed again after leaving the function.
Because in the real world code several other functions using multiprocessing
are called, this then even stacks up, consuming all the memory.
(Unfortunately I can not produce a minimal example for the minor second case, with stacking up the memory, but as soon as the main problem is solved this second one should be gone also.)
This is Python 3.7.3 on Linux and any help for at least explaining or even solving this issue is very welcome.
Minimal example code:
import gc
from time import sleep
from memory_profiler import profile
import numpy as np
def waitat(where, t):
# print and wait, gives chance to see live memory usage in some task manager program
print(where)
sleep(t)
@profile
def parallel_convert_all_to_hsv(imgs: np.ndarray) -> np.ndarray:
from skimage.color import rgb2hsv
import multiprocessing as mp
print("going parallel")
pool = mp.Pool()
try:
# images_converted = [] # there is no memory problem when using commented lines below, instead of pool.map(…) line
# for img in imgs:
# images_converted.append(rgb2hsv(img))
images_converted = pool.map(rgb2hsv, imgs)
except KeyboardInterrupt:
pool.terminate()
waitat("after pool.map",5)
pool.close()
pool.join()
waitat("before del pool",5)
pool = None
del pool # memory should now be freed here?
mp = None
rgb2hsv = None
waitat("after del pool",5)
print("copying over")
res = np.array(images_converted)
waitat("before del image_hsv in function",5)
images_converted = None
del images_converted
return res
@profile
def doit():
print("create random images")
max_images = 700
images = np.random.rand(max_images, 300, 300,3)
waitat("before going parallel",5)
images_converted = parallel_convert_all_to_hsv(images)
print("images_converted has %i bytes" % images_converted.nbytes)
# how to clean up Pool's memory at latest here?
waitat("before deleting original images",5)
images = None
del images
waitat("memory should be as before going parallel + %i bytes" % images_converted.nbytes ,10)
images_converted = None
del images_converted
waitat("nearly end, memory should be as before" ,15)
gc.collect(2)
waitat("end, memory should be as before" ,15)
doit()
Output with using Memory Profiler, showing the problem:
$ python3 -m memory_profiler pool-mem-probs.py
create random images
before going parallel
going parallel
after pool.map
before del pool
after del pool
copying over
before del image_hsv in function
Filename: pool-mem-probs.py
Line # Mem usage Increment Line Contents
================================================
11 1481.2 MiB 1481.2 MiB @profile
12 def parallel_convert_all_to_hsv(imgs: np.ndarray) -> np.ndarray:
13 1487.2 MiB 6.0 MiB from skimage.color import rgb2hsv
14 1487.2 MiB 0.0 MiB import multiprocessing as mp
15 1487.2 MiB 0.0 MiB print("going parallel")
16 1488.6 MiB 1.4 MiB pool = mp.Pool()
17 1488.6 MiB 0.0 MiB try:
18 # images_converted = [] # there is no memory problem when using commented lines below, instead of pool.map(…) line
19 # for img in imgs:
20 # images_converted.append(rgb2hsv(img))
21 2930.9 MiB 1442.3 MiB images_converted = pool.map(rgb2hsv, imgs)
22 except KeyboardInterrupt:
23 pool.terminate()
24 2930.9 MiB 0.0 MiB waitat("after pool.map",5)
25
26 2930.9 MiB 0.0 MiB pool.close()
27 2931.0 MiB 0.1 MiB pool.join()
28
29 2931.0 MiB 0.0 MiB waitat("before del pool",5)
30 2931.0 MiB 0.0 MiB pool = None
31 2931.0 MiB 0.0 MiB del pool # memory should now be freed here?
32 2931.0 MiB 0.0 MiB mp = None
33 2931.0 MiB 0.0 MiB rgb2hsv = None
34
35 2931.0 MiB 0.0 MiB waitat("after del pool",5)
36 2931.0 MiB 0.0 MiB print("copying over")
37 4373.0 MiB 1441.9 MiB res = np.array(images_converted)
38 4373.0 MiB 0.0 MiB waitat("before del image_hsv in function",5)
39 4016.6 MiB 0.0 MiB images_converted = None
40 4016.6 MiB 0.0 MiB del images_converted
41 4016.6 MiB 0.0 MiB return res
images_converted has 1512000000 bytes
before deleting original images
memory should be as before going parallel + 1512000000 bytes
nearly end, memory should be as before
end, memory should be as before
Filename: pool-mem-probs.py
Line # Mem usage Increment Line Contents
================================================
43 39.1 MiB 39.1 MiB @profile
44 def doit():
45 39.1 MiB 0.0 MiB print("create random images")
46 39.1 MiB 0.0 MiB max_images = 700
47 1481.2 MiB 1442.1 MiB images = np.random.rand(max_images, 300, 300,3)
48
49 1481.2 MiB 0.0 MiB waitat("before going parallel",5)
50 4016.6 MiB 2535.4 MiB images_converted = parallel_convert_all_to_hsv(images)
51 4016.6 MiB 0.0 MiB print("images_converted has %i bytes" % images_converted.nbytes)
52 # how to clean up Pool's memory at latest here?
53
54 4016.6 MiB 0.0 MiB waitat("before deleting original images",5)
55 2574.6 MiB 0.0 MiB images = None
56 2574.6 MiB 0.0 MiB del images
57 2574.6 MiB 0.0 MiB waitat("memory should be as before going parallel + %i bytes" % images_converted.nbytes ,10)
58 1132.7 MiB 0.0 MiB images_converted = None
59 1132.7 MiB 0.0 MiB del images_converted
60 1132.7 MiB 0.0 MiB waitat("nearly end, memory should be as before" ,15)
61 1132.7 MiB 0.0 MiB gc.collect(2)
62 1132.7 MiB 0.0 MiB waitat("end, memory should be as before" ,15)
Output of non-parallel code (where the problem does not occur):
$ python3 -m memory_profiler pool-mem-probs.py
create random images
before going parallel
going parallel
after pool.map
before del pool
after del pool
copying over
before del image_hsv in function
Filename: pool-mem-probs.py
Line # Mem usage Increment Line Contents
================================================
11 1481.3 MiB 1481.3 MiB @profile
12 def parallel_convert_all_to_hsv(imgs: np.ndarray) -> np.ndarray:
13 1488.1 MiB 6.8 MiB from skimage.color import rgb2hsv
14 1488.1 MiB 0.0 MiB import multiprocessing as mp
15 1488.1 MiB 0.0 MiB print("going parallel")
16 1488.7 MiB 0.6 MiB pool = mp.Pool()
17 1488.7 MiB 0.0 MiB try:
18 1488.7 MiB 0.0 MiB images_converted = [] # there is no memory problem when using commented lines below, instead of pool.map(…) line
19 2932.6 MiB 0.0 MiB for img in imgs:
20 2932.6 MiB 2.2 MiB images_converted.append(rgb2hsv(img))
21 # images_converted = pool.map(rgb2hsv, imgs)
22 except KeyboardInterrupt:
23 pool.terminate()
24 2932.6 MiB 0.0 MiB waitat("after pool.map",5)
25
26 2932.6 MiB 0.0 MiB pool.close()
27 2932.8 MiB 0.2 MiB pool.join()
28
29 2932.8 MiB 0.0 MiB waitat("before del pool",5)
30 2932.8 MiB 0.0 MiB pool = None
31 2932.8 MiB 0.0 MiB del pool # memory should now be freed here?
32 2932.8 MiB 0.0 MiB mp = None
33 2932.8 MiB 0.0 MiB rgb2hsv = None
34
35 2932.8 MiB 0.0 MiB waitat("after del pool",5)
36 2932.8 MiB 0.0 MiB print("copying over")
37 4373.3 MiB 1440.5 MiB res = np.array(images_converted)
38 4373.3 MiB 0.0 MiB waitat("before del image_hsv in function",5)
39 2929.6 MiB 0.0 MiB images_converted = None
40 2929.6 MiB 0.0 MiB del images_converted
41 2929.6 MiB 0.0 MiB return res
images_converted has 1512000000 bytes
before deleting original images
memory should be as before going parallel + 1512000000 bytes
nearly end, memory should be as before
end, memory should be as before
Filename: pool-mem-probs.py
Line # Mem usage Increment Line Contents
================================================
43 39.2 MiB 39.2 MiB @profile
44 def doit():
45 39.2 MiB 0.0 MiB print("create random images")
46 39.2 MiB 0.0 MiB max_images = 700
47 1481.3 MiB 1442.1 MiB images = np.random.rand(max_images, 300, 300,3)
48
49 1481.3 MiB 0.0 MiB waitat("before going parallel",5)
50 2929.6 MiB 1448.3 MiB images_converted = parallel_convert_all_to_hsv(images)
51 2929.6 MiB 0.0 MiB print("images_converted has %i bytes" % images_converted.nbytes)
52 # how to clean up Pool's memory at latest here?
53
54 2929.6 MiB 0.0 MiB waitat("before deleting original images",5)
55 1487.7 MiB 0.0 MiB images = None
56 1487.7 MiB 0.0 MiB del images
57 1487.7 MiB 0.0 MiB waitat("memory should be as before going parallel + %i bytes" % images_converted.nbytes ,10)
58 45.7 MiB 0.0 MiB images_converted = None
59 45.7 MiB 0.0 MiB del images_converted
60 45.7 MiB 0.0 MiB waitat("nearly end, memory should be as before" ,15)
61 45.7 MiB 0.0 MiB gc.collect(2)
62 45.7 MiB 0.0 MiB waitat("end, memory should be as before" ,15)
Upvotes: 17
Views: 5706
Reputation: 594
As multithreading.Pool
is not able to free up memory of around 1* Gb, I have also tried replacing it with ThreadPool
but no better. I am still wondering about this memory leak problem inside Pools.
This may not be the best solution but can be a work-around solution.
By not using ThreadPool
or ProcessPool
, I am creating Threads or Processes manually and assigning each with the image to convert to HSV. Well, I have commented the line p = multiprocessing.Process(target=do_hsv, args=(imgs[j], shared_list))
because it will spawn new process for each image conversion which I think will be overkill and much expensive than Threads. Obviously, creating threads manually will take some more time (9 sec without memory leak) than ThreadPool
(4 sec but with memory leak) but as you can see it almost remains calm on memory.
Here is my code:
import multiprocessing
import os
import threading
import time
from memory_profiler import profile
import numpy as np
from skimage.color import rgb2hsv
def do_hsv(img, shared_list):
shared_list.append(rgb2hsv(img))
# print("Converted by process {} having parent process {}".format(os.getpid(), os.getppid()))
@profile
def parallel_convert_all_to_hsv(imgs, shared_list):
cores = os.cpu_count()
starttime = time.time()
for i in range(0, len(imgs), cores):
# print("i :", i)
jobs = []; pipes = []
end = i + cores if (i + cores) <= len(imgs) else i + len(imgs[i : -1]) + 1
# print("end :", end)
for j in range(i, end):
# print("j :", j)
# p = multiprocessing.Process(target=do_hsv, args=(imgs[j], shared_list))
p = threading.Thread(target= do_hsv, args=(imgs[j], shared_list))
jobs.append(p)
for p in jobs: p.start()
for proc in jobs:
proc.join()
print("Took {} seconds to complete ".format(starttime - time.time()))
return 1
@profile
def doit():
print("create random images")
max_images = 700
images = np.random.rand(max_images, 300, 300,3)
# images = [x for x in range(0, 10000)]
manager = multiprocessing.Manager()
shared_list = manager.list()
parallel_convert_all_to_hsv(images, shared_list)
del images
del shared_list
print()
doit()
Here is the Output:
create random images
Took -9.085552453994751 seconds to complete
Filename: MemoryNotFreed.py
Line # Mem usage Increment Line Contents
================================================
15 1549.1 MiB 1549.1 MiB @profile
16 def parallel_convert_all_to_hsv(imgs, shared_list):
17
18 1549.1 MiB 0.0 MiB cores = os.cpu_count()
19
20 1549.1 MiB 0.0 MiB starttime = time.time()
21
22 1566.4 MiB 0.0 MiB for i in range(0, len(imgs), cores):
23
24 # print("i :", i)
25
26 1566.4 MiB 0.0 MiB jobs = []; pipes = []
27
28 1566.4 MiB 0.0 MiB end = i + cores if (i + cores) <= len(imgs) else i + len(imgs[i : -1]) + 1
29
30 # print("end :", end)
31
32 1566.4 MiB 0.0 MiB for j in range(i, end):
33 # print("j :", j)
34
35 # p = multiprocessing.Process(target=do_hsv, args=(imgs[j], shared_list))
36 1566.4 MiB 0.0 MiB p = threading.Thread(target= do_hsv, args=(imgs[j], shared_list))
37
38 1566.4 MiB 0.0 MiB jobs.append(p)
39
40 1566.4 MiB 0.8 MiB for p in jobs: p.start()
41
42 1574.9 MiB 1.0 MiB for proc in jobs:
43 1574.9 MiB 13.5 MiB proc.join()
44
45 1563.5 MiB 0.0 MiB print("Took {} seconds to complete ".format(starttime - time.time()))
46 1563.5 MiB 0.0 MiB return 1
Filename: MemoryNotFreed.py
Line # Mem usage Increment Line Contents
================================================
48 106.6 MiB 106.6 MiB @profile
49 def doit():
50
51 106.6 MiB 0.0 MiB print("create random images")
52
53 106.6 MiB 0.0 MiB max_images = 700
54
55 1548.7 MiB 1442.1 MiB images = np.random.rand(max_images, 300, 300,3)
56
57 # images = [x for x in range(0, 10000)]
58 1549.0 MiB 0.3 MiB manager = multiprocessing.Manager()
59 1549.1 MiB 0.0 MiB shared_list = manager.list()
60
61 1563.5 MiB 14.5 MiB parallel_convert_all_to_hsv(images, shared_list)
62
63 121.6 MiB 0.0 MiB del images
64
65 121.6 MiB 0.0 MiB del shared_list
66
67 121.6 MiB 0.0 MiB print()
Upvotes: 0
Reputation: 169
Indeed, there is a leak problem, but it does not appear for some magical parameters. I could not understand it, but we can reduce the leak by passing a list to pool.map instead of a ndarray.
images_converted = pool.map(rgb2hsv, [i for i in imgs])
This consistently reduces memory leak in my tests.
OLD ANSWER:
It does not seems there is a problem in pool. You should not expect "del pool" on line 31 to free your memory, since what is occupying it are the variables "imgs" and "images_converted". These are in the scope of the function "parallel_convert_all_to_hsv" and not in the scope of "rgb2hsv", so "del pool" is not related to them.
The memory is corrected released after deleting "images" and "images_converted" in lines 56 and 59.
Upvotes: 0
Reputation: 1455
The generation threshold may be getting in the way, take a look at gc.get_threshold()
try including
gc.disable()
Upvotes: 4