ikamen
ikamen

Reputation: 3485

Coroutine that is guaranteed to exit its context managers

I'd like to use a context manager within a coroutine. This coroutine should handle unknown number of steps. However, due to unknown number of steps, it's unclear when should the context manager exit. I'd like it to exit when the co-routine goes out of scope / is garbage collected; however this seems not to happen in the example below:

import contextlib


@contextlib.contextmanager
def cm():
    print("STARTED")
    yield
    print("ENDED")


def coro(a: str):
    with cm():
        print(a)
        while True:
            val1, val2 = yield
            print(val1, val2)


c = coro("HI")


c.send(None)
print("---")
c.send((1, 2))
print("---!")

Output of this program:

STARTED
HI
---
1 2
---!

How can I make a coroutine that will support any number of steps, and be guaranteed to exit gracefully? I don't want to make this a responsibility of the caller.

Upvotes: 1

Views: 1366

Answers (2)

Kyle Parsons
Kyle Parsons

Reputation: 1525

TLDR: So the issue is that when an exception is raised (and not handled) inside a with block. The __exit__ method of the context manager is called with that exception. For contextmanager-decorated generators, this causes the exception to be thrown to the generator. cm does not handle this exception and thus the cleanup code is not run. When coro is garbage collected, its close method is called which throws a GeneratorExit to coro (which then gets thrown to cm). What follows is a detailed description of the above steps.

The close method throws a GeneratorExit to coro which means a GeneratorExit is raised at the point of yield. coro doesn't handle the GeneratorExit so it exits the context via an error. This causes the __exit__ method of the context to be called with an error and error information. What does the __exit__ method from a contextmanager-decorated generator do? If it is called with an exception, it throws that exception to the underlying generator.

At this point the a GeneratorExit is raised from the yield statement in the body of our context manager. That unhandled exception causes the cleanup code to not be run. That unhandled exception is raised by context manager and is passed back to the __exit__ of the contextmanager decorator. Being the same error that was thrown, __exit__ returns False to indicate the original error sent to __exit__ was unhandled.

Finally, this continues the GeneratorExit's propagation outside of the with block inside coro where it continues to be unhandled. However, not handling GeneratorExits is regular for generators, so the original close method suppresses the GeneratorExit.

See this part of the yield documentation:

If the generator is not resumed before it is finalized (by reaching a zero reference count or by being garbage collected), the generator-iterator’s close() method will be called, allowing any pending finally clauses to execute.

Looking at the close documentation we see:

Raises a GeneratorExit at the point where the generator function was paused. If the generator function then exits gracefully, is already closed, or raises GeneratorExit (by not catching the exception), close returns to its caller.

This part of the with statement documentation:

  1. The suite is executed.

  2. The context manager’s exit() method is invoked. If an exception caused the suite to be exited, its type, value, and traceback are passed as arguments to exit(). Otherwise, three None arguments are supplied.

And the code of the __exit__ method for the contextmanager decorator.

So with all this context (rim-shot), the easiest way we can get the desired behavior is with a try-except-finally in the definition of our context manager. This is the suggested method from the contextlib docs. And all their examples follow this form.

Thus, you can use a try…except…finally statement to trap the error (if any), or ensure that some cleanup takes place.

import contextlib


@contextlib.contextmanager
def cm():
    try:
        print("STARTED")
        yield
    except Exception:
        raise
    finally:
        print("ENDED")


def coro(a: str):
    with cm():
        print(a)
        while True:
            val1, val2 = yield
            print(val1, val2)


c = coro("HI")


c.send(None)
print("---")
c.send((1, 2))
print("---!")

The output is now:

STARTED
HI
---
1 2
---!
ENDED

as desired.

We could also define our context manager in the traditional manner: as a class with an __enter__ and __exit__ method and still gotten the correct behavior:

class CM:
    def __enter__(self):
        print('STARTED')

    def __exit__(self, exc_type, exc_value, traceback):
        print('ENDED')
        return False

The situation is somewhat simpler, because we can see exactly what the __exit__ method is without having to go to the source code. The GeneratorExit gets sent (as a parameter) to __exit__ where __exit__ happily runs its cleanup code and then returns False. This is not strictly necessary as otherwise None (another Falsey value) would have been returned, but it indicates that any exception that was sent to __exit__ was not handled. (The return value of __exit__ doesn't matter if there was no exception).

Upvotes: 1

martineau
martineau

Reputation: 123473

You can do it by telling the coroutine to shutdown by sending it something the will cause it to break out of the loop and return as illustrated below. Doing so will cause a StopIteration exception to be raised where this is done, so I added another context manager to allow it to be suppressed. Note I have also added a coroutine decorator to make them start-up automatically when first called, but that part is strictly optional.

import contextlib
from typing import Callable


QUIT = 'quit'

def coroutine(func: Callable):
    """ Decorator to make coroutines automatically start when called. """
    def start(*args, **kwargs):
        cr = func(*args, **kwargs)
        next(cr)
        return cr
    return start

@contextlib.contextmanager
def ignored(*exceptions):
    try:
        yield
    except exceptions:
        pass


@contextlib.contextmanager
def cm():
    print("STARTED")
    yield
    print("ENDED")

@coroutine
def coro(a: str):
    with cm():
        print(a)
        while True:
            value = (yield)
            if value == QUIT:
                break
            val1, val2 = value
            print(val1, val2)

print("---")
with ignored(StopIteration):
    c = coro("HI")
    #c.send(None)  # No longer needed.

    c.send((1, 2))
    c.send((3, 5))
    c.send(QUIT)  # Tell coroutine to clean itself up and exit.
print("---!")

Output:

STARTED
HI
---
1 2
3 5
ENDED
---!

Upvotes: 0

Related Questions