Reputation: 53
I am making a project which collects data from clients sensors, processes the gathered data and sends it back to clients. There can be multiple clients asking to receive some data from our server at the same time, so I had to implement multiprocessing. I can't use Threads because there are certain variables that must be client independent. If I did, my code would probably get very complicated to read and upgrade, and I don't want that. So I decided to use Processes, but now there is some data that needs to be sheared between parent and child Processes. After some research, I found that Pipe communication would satisfy my requirements.
The following code successfully sends data from parent to child Process, child updates the data and sends it back to the parent. But it is working only because of the sleep() function that stops the parent from using the pipe at the same time as the child.
How can it be changed so it does the same, but without the sleep() function for which I believe it will most probably cause problems in the future?
from multiprocessing import Process, Pipe
import time
def update_data(pipe):
p_out, p_in = pipe
L = []
while True:
message = p_out.recv()
if message=='FINISHED':
break
L.append(message)
L.append(['new data']) #updating received data
writer(L, p_in) #sending received data to parent Process
p_in.close()
def writer(i, p_in):
p_in.send(i)
p_in.send('FINISHED')
L = ['0' for i in range(10)] #current data
if __name__=='__main__':
p_out, p_in = Pipe()
update_data_process = Process(target=update_data, args=((p_out, p_in),))
update_data_process.start()
writer(L, p_in) #sending current data to child Process
time.sleep(3) #needs to be changed
while True:
message = p_out.recv()
if message != 'FINISHED':
L = message
else:
break
print(L)
p_in.close()
update_data_process.join()
Upvotes: 4
Views: 11377
Reputation: 21694
You have the issue because you are treating the connections like if they were simplex, but Pipe() by default returns duplex (two-way) connections.
This means when you call parent_conn, child_conn = Pipe()
, you get one connection, only the parent should use for reads and writes and another such connection object for the child. Parent and child only operate upon their connection objects.
from multiprocessing import Process, Pipe
from datetime import datetime
SENTINEL = 'SENTINEL'
def update_data(child_conn):
result = []
for msg in iter(child_conn.recv, SENTINEL):
print(f'{datetime.now()} child received {msg}')
result.append(msg)
print(f'{datetime.now()} child received sentinel')
result.append(['new data'])
writer(child_conn, result)
child_conn.close()
def writer(conn, data):
conn.send(data)
conn.send(SENTINEL)
if __name__=='__main__':
parent_conn, child_conn = Pipe() # default is duplex!
update_data_process = Process(target=update_data, args=(child_conn,))
update_data_process.start()
data = ['0' for i in range(3)]
writer(parent_conn, data)
for msg in iter(parent_conn.recv, SENTINEL):
print(f'{datetime.now()} parent received {msg}')
print(f'{datetime.now()} parent received sentinel')
parent_conn.close()
update_data_process.join()
Example Output:
2019-03-12 00:09:52.920375 child received ['0', '0', '0']
2019-03-12 00:09:52.920512 child received sentinel
2019-03-12 00:09:52.920702 parent received [['0', '0', '0'], ['new data']]
2019-03-12 00:09:52.920764 parent received sentinel
Process finished with exit code 0
In case you are unfamiliar with the use of iter(callable, sentinel)
, look here.
Upvotes: 6