Reputation: 23
I am trying to create two processes that run forever, each running an asyncio loop inside of them.
I want to understand if the logic is correct. Is there a better way to do the same thing?
import asyncio
import multiprocessing
async def my_async_func(topic):
while True:
await asyncio.sleep(5)
print(topic)
def create_aio_loop(topic):
loop = asyncio.get_event_loop()
asyncio.ensure_future(my_async_func(topic), loop=loop)
try:
loop.run_forever()
except KeyboardInterrupt:
loop.stop()
def main():
topic_a = 'New to Asyncio'
topic_b = 'New to Multiprocessing'
process_a = multiprocessing.Process(target=create_aio_loop, args=(topic_a, ))
process_b = multiprocessing.Process(target=create_aio_loop, args=(topic_b, ))
processes = [process_a, process_b]
try:
for process in processes:
process.start()
except KeyboardInterrupt:
for process in processes:
process.terminate()
process.join()
if __name__ == '__main__':
main()
Upvotes: 1
Views: 1937
Reputation: 5156
I am trying to create two processes that run forever, each running an asyncio loop inside of them.
I assume that you know why you wish to dispatch some of your code on several processing (virtual) cores (multiprocessing) and paralellize the rest on the same core (asyncio).
Then I think that you did right: you spawn two processes, and each of them has its own asyncio loop. The only improvement that I could find was to use loop.run_until_complete
, which removes one line of code :) :
import os
import asyncio
import multiprocessing
async def my_async_func(topic):
while True:
await asyncio.sleep(5)
print(topic)
def create_aio_loop(topic):
process_name = "[Process %s, topic %s]" % (os.getpid(), topic)
print("%s Started " % process_name)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(my_async_func(topic))
except KeyboardInterrupt:
print("%s Loop interrupted" % process_name)
loop.stop()
print("%s terminating" % process_name)
if __name__ == '__main__':
topic_a = 'New to Asyncio'
topic_b = 'New to Multiprocessing'
process_a = multiprocessing.Process(target=create_aio_loop, args=(topic_a, ))
process_b = multiprocessing.Process(target=create_aio_loop, args=(topic_b, ))
processes = [process_a, process_b]
try:
for process in processes:
process.start()
except KeyboardInterrupt:
for process in processes:
process.terminate()
process.join()
Oh and also I suggest that you display all messages using a prefix containing the process id, that is much easier for multiprocessing debugging. I introduced an example with the start/terminate print messages.
Running this yields:
>python tmp_asyncio.py
[Process 11456, topic New to Multiprocessing] Started
[Process 18396, topic New to Asyncio] Started
New to Asyncio
New to Multiprocessing
(here I pressed Ctrl+C)
[Process 11456, topic New to Multiprocessing] Loop interrupted
[Process 11456, topic New to Multiprocessing] terminating
[Process 18396, topic New to Asyncio] Loop interrupted
[Process 18396, topic New to Asyncio] terminating
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "C:\Miniconda3\envs\baseenv\lib\multiprocessing\util.py", line 310, in _exit_function
p.join()
File "C:\Miniconda3\envs\baseenv\lib\multiprocessing\process.py", line 121, in join
res = self._popen.wait(timeout)
File "C:\Miniconda3\envs\baseenv\lib\multiprocessing\popen_spawn_win32.py", line 81, in wait
res = _winapi.WaitForSingleObject(int(self._handle), msecs)
KeyboardInterrupt
Upvotes: 2