Reputation: 3
I am trying to create an asynchronous function that listens to an event stream (SSE) indefinitely, until the user presses CTRL+C to stop listening or until a specific condition is met. This asynchronous function must also return a list of processed data. Returned list must be empty if cancelled by KeyboardInterrupt.
My approach was to use the asyncio event loop and handle the KeyboardInterrupt exception but the loop/task does'nt seem to be closed after function has returned?
async def async_stream(ip, port):
key_list = []
url = f"http://{ip}:{port}/ilovepython/stream"
headers = {
"Accept": "text/event-stream"
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as resp:
while True:
if resp.status != 200:
print(f"Error : {resp.status} {resp.reason}")
return []
b64_data = await resp.content.readline()
# do things with b64_data
# fill key_list as b64_data is being processed
# return when condition is met
except HTTPError as http_err:
printl(f'HTTP error occurred: {http_err}', logging.ERROR)
except Exception as err:
printl(f'Other error occurred: {err}', logging.ERROR)
def stream(ip, port):
key_list = []
loop = asyncio.get_event_loop()
try:
key_list = loop.run_until_complete(async_subscribe_stream(ip, port))
except KeyboardInterrupt:
return []
return key_list
The result of this code is that the session correctly listens and does things such as print the data. When CTRL+C is pressed, data is no longer printed as expected. However when this function is called again later the data is printed twice (because the loop was never stopped?).
Trying to catch a KeyboardInterrupt within async_stream() doesn't seem to work.
Closing the event loop afterwords using finally: loop.close()
yields errors when I call the function again at a later time.
Please note that I am working with Python 3.6 so asyncio.run() is off the table.
I would love to understand why this is happening and how to fix it thanks.
Upvotes: 0
Views: 112