Reputation: 311
With a simple example I try to demonstrate a typical multiprocessing setup with two processes:
I want to terminate the first process by a keyboard keypress, which will send None
to a queue, which then stops the program.
I use the keyboard package for detecting if a key is pressed.
import multiprocessing
import keyboard
import numpy as np
def getData(queue):
KEY_PRESSED = False
while KEY_PRESSED is False:
if keyboard.is_pressed("a"):
queue.put(None)
print("STOP in getData")
KEY_PRESSED=True
else:
data = np.random.random([8, 250])
queue.put(data)
def processData(queue):
FLAG_STOP = False
while FLAG_STOP is False:
data = queue.get() # # ch, samples
if data is None:
print("STOP in processData")
FLAG_STOP = True
else:
print("Processing Data")
print(str(data[0,0]))
if __name__ == "__main__":
queue = multiprocessing.Queue()
processes = [
multiprocessing.Process(target=getData, args=(queue,)),
multiprocessing.Process(target=processData, args=(queue,))
]
for p in processes:
p.start()
for p in processes:
p.join()
If I debug the code, the pressed key is actually detected, but simultaneously random data from the while loop is put into the queue. Which makes it very difficult debugging the code.
Additionally I tried the pynput package, which creates a thread to detect a pressed key. Using this method however the same problem occured, the program did not savely terminated the execution by sending out None
to the other process.
I would be very happy if someone could point to the error in the described method, or propose another way of savely detecting keypress within a processs.
Upvotes: 0
Views: 542
Reputation: 44108
I am not sure what problem you are describing: savely is not an English word. You say that the pressed key is actually detected. If that is the case and if you have print("STOP...")
statements in both functions, then if you just run the code from a command prompt and getData
detects that an a
is being pressed, then I don't see how both print statements won't eventually be executed and the two processes terminate.
If the problem is that the program does not terminate for a very long while, then I think what you are missing is that unless the call to keyboard.is_pressed("a")
is a particularly slow function to execute, by time you get around to pressing a
on the keyboard, function getData
will have already written thousands of records to the queue before it writes None
out. That means that processData
has to first read those thousands of records and print them before it finally gets to the None
record. Since it has to also print the numbers, there is no way the processData
can keep up with getData
. So long after getData
has written its None
record, processData
still has thousands of records to read.
This can be demonstrated in a variation of your code where function getData
does not wait for keyboard input but simply writes random numbers to the output queue for 5 seconds before writing its None
record and terminating (this simulates your program where you wait 5 seconds before pressing a
). Function processData
prints the number of records it has read before it gets to the None
record and the elapsed time it took to read and print those records:
import multiprocessing
import numpy as np
import time
def getData(queue):
KEY_PRESSED = False
expiration = time.time() + 5
# Run for 5 seconds:
while KEY_PRESSED is False:
if time.time() > expiration:
queue.put(None)
print("STOP in getData")
KEY_PRESSED=True
else:
data = np.random.random([8, 250])
queue.put(data)
def processData(queue):
FLAG_STOP = False
t = time.time()
cnt = 0
while FLAG_STOP is False:
data = queue.get() # # ch, samples
if data is None:
print("STOP in processData")
print('Number of items read from queue:', cnt, 'elapsed_time:', time.time() - t)
FLAG_STOP = True
else:
cnt += 1
print("Processing Data")
print(str(data[0,0]))
if __name__ == "__main__":
queue = multiprocessing.Queue()
processes = [
multiprocessing.Process(target=getData, args=(queue,)),
multiprocessing.Process(target=processData, args=(queue,))
]
for p in processes:
p.start()
for p in processes:
p.join()
Prints:
...
Processing Data
0.21449036510257957
Processing Data
0.27058883046461824
Processing Data
0.5433716680659376
STOP in processData
Number of items read from queue: 128774 elapsed_time: 35.389172077178955
Even though getData
wrote numbers for only 5 seconds, it took 35 seconds for processData
to read and print them.
The problem can be resolved by putting a limit on the number of messages that can be on the Queue instance:
queue = multiprocessing.Queue(1)
This will block getData
from putting the next value on the queue until processData
has read the value.
Prints:
...
Processing Data
0.02822635996071321
Processing Data
0.05390434023333657
Processing Data
STOP in getData
0.9030600225686444
STOP in processData
Number of items read from queue: 16342 elapsed_time: 5.000030040740967
So if you use a queue with a maxsize of 1, your program should terminate immediately upon pressing the a
key.
Upvotes: 1