Reputation: 2762
I am trying to use the shared_memory with pool in python's multiprocessing.
In the Documentation, about shared memory
, the argument buf
(the memory view) is not clear to me (perhaps because I don't understand the concept of memory view - is it a pointer?).
I want to use this shared memory across different processes. Following, my example base on the documentation:
a = np.array([1, 1, 2, 3, 5, 8])
shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
# Do I need to create the existing_shm or I can keep using shm?
existing_shm = shared_memory.SharedMemory(name=shm.name)
Now comes my first problem. I define the function that will use the array in the shared memory:
def test_function(Input):
c = np.ndarray(a.shape, dtype=np.int64, buffer=existing_shm.buf)
c[1]=100
print(c)
This is incorrect but I don't know how should it be.
Then the main. Is there a role of having the main function to make this work?
if __name__=='__main__':
with Pool(os.cpu_count()) as p:
p.map(test_function, range(12))
It doesn't work.
Do I have to define c
in every process? Or I can define it in the main and use it across all processes? I assume that c
is a python object and therefore can't be shared by processes due to the gil-lock?
Thank you very much!
Upvotes: 2
Views: 1836
Reputation: 2762
This works. I don't have a clear understanding of all the facts yet, though.
1- The shared memory object is declared:
shm = shared_memory.SharedMemory(create=True, size=10000000*4)
.
2- A (numpy array in this case) object is declared with buffer as follows:
b = np.ndarray((10000000,), dtype=np.int32, buffer=shm.buf)
.
3- The numpy array is populated by copying data into it.
b[:] = np.random.randint(100, size=10000000, dtype=np.int32)
.
Then, all the function to be executed in many cpus needs is the name of the shared memory object and the mentioned step 2 inside the function is mapping the shared memory, which has been populated earlier.
It's essential that you close
the shared object after accessing it and at the end unlink
.
import numpy as np
from multiprocessing import shared_memory, Pool
import os
def test_function(args):
Input, shm_name, size = args
existing_shm = shared_memory.SharedMemory(name=shm_name)
d = np.ndarray(size, dtype=np.int32, buffer=existing_shm.buf)
#print(Input, d[Input-1:Input+2])
d[Input]=-20
#print(Input, d[Input-1:Input+2])
existing_shm.close()
print(Input, 'parent process:', os.getppid())
print(Input, 'process id:', os.getpid())
if __name__=='__main__':
shm = shared_memory.SharedMemory(create=True, size=10000000*4)
b = np.ndarray((10000000,), dtype=np.int32, buffer=shm.buf)
b[:] = np.random.randint(100, size=10000000, dtype=np.int32)
inputs =[[ 1,shm.name,b.shape],
[ 2,shm.name,b.shape],
[ 3,shm.name,b.shape],
[ 4,shm.name,b.shape],
[ 5,shm.name,b.shape],
[ 6,shm.name,b.shape],
[ 7,shm.name,b.shape],
[ 8,shm.name,b.shape],
[ 9,shm.name,b.shape],
[ 10,shm.name,b.shape],
[ 11,shm.name,b.shape],
[ 12,shm.name,b.shape],
[13,shm.name,b.shape]]
with Pool(os.cpu_count()) as p:
p.map(test_function, inputs)
print(b[:20])
# Clean up from within the first Python shell
shm.close()
shm.unlink() # Free and release the shared memory block at the very end
Upvotes: 2