thomasbartnik
thomasbartnik

Reputation: 71

uasyncio V3 Exception detect

How to correctly implement the following code for the uasyncio V3 version? exception detection does not work in the code below.

async def main():
    tasks = (s1, s2)
    try:

        res = await uasyncio.funcs.gather(*tasks, return_exceptions=True)

    except uasyncio.CancelledError:
        print("Cancelled.")
    except Exception as e:
        print("error")


try:
    uasyncio.run(main())
except KeyboardInterrupt as e: # its work ok.
    print("Caught keyboard interrupt. Canceling tasks...")
    uasyncio.new_event_loop()
except Exception as e: # not working ?
    print("error: "+str(e))

Upvotes: 0

Views: 297

Answers (1)

David R Knowles
David R Knowles

Reputation: 11

Not sure you are still looking for a solution but in V3 the loop is slightly different and to get an exception it needs to be handled slightly different too.

If you want a decent tutorial check out Peters Git https://github.com/peterhinch/micropython-async/blob/master/v3/docs/TUTORIAL.md

import uasyncio as asyncio

def set_exception():
    def handle_exception(loop, context):
        import sys
        sys.print_exception(context["exception"])
        sys.exit()
    loop = asyncio.get_event_loop()
    loop.set_exception_handler(handle_exception)

async def main():
    set_exception()
    #asyncio.create_task(task)
    while True:
         await asyncio.sleep_ms(0)

def run():
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print('Interrupted')
    finally:
        asyncio.new_event_loop()
run()

Upvotes: 1

Related Questions