Cliff Hill
Cliff Hill

Reputation: 131

Python 3.5 asyncio execute coroutine on event loop from synchronous code in different thread

I am hoping someone can help me here.

I have an object that has the ability to have attributes that return coroutine objects. This works beautifully, however I have a situation where I need to get the results of the coroutine object from synchronous code in a separate thread, while the event loop is currently running. The code I came up with is:

def get_sync(self, key: str, default: typing.Any=None) -> typing.Any:
    """
    Get an attribute synchronously and safely.

    Note:
        This does nothing special if an attribute is synchronous. It only
        really has a use for asynchronous attributes. It processes
        asynchronous attributes synchronously, blocking everything until
        the attribute is processed. This helps when running SQL code that
        cannot run asynchronously in coroutines.

    Args:
        key (str): The Config object's attribute name, as a string.
        default (Any): The value to use if the Config object does not have
            the given attribute. Defaults to None.

    Returns:
        Any: The vale of the Config object's attribute, or the default
        value if the Config object does not have the given attribute.
    """
    ret = self.get(key, default)

    if asyncio.iscoroutine(ret):
        if loop.is_running():
            loop2 = asyncio.new_event_loop()
            try:
                ret = loop2.run_until_complete(ret)

            finally:
                loop2.close()
        else:
            ret = loop.run_until_complete(ret)

    return ret

What I am looking for is a safe way to synchronously get the results of a coroutine object in a multithreaded environment. self.get() can return a coroutine object, for attributes I have set to provide them. The issues I have found are: If the event loop is running or not. After searching for a few hours on stack overflow and a few other sites, my (broken) solution is above. If the loop is running, I make a new event loop and run my coroutine in the new event loop. This works, except that the code hangs forever on the ret = loop2.run_until_complete(ret) line.

Right now, I have the following scenarios with results:

  1. results of self.get() is not a coroutine
    • Returns results. [Good]
  2. results of self.get() is a coroutine & event loop is not running (basically in same thread as the event loop)
    • Returns results. [Good]
  3. results of self.get() is a coroutine & event loop is running (basically in a different thread than the event loop)
    • Hangs forever waiting for results. [Bad]

Does anyone know how I can go about fixing the bad result so I can get the value I need? Thanks.

I hope I made some sense here.

I do have a good, and valid reason to be using threads; specifically I am using SQLAlchemy which is not async and I punt the SQLAlchemy code to a ThreadPoolExecutor to handle it safely. However, I need to be able to query these asynchronous attributes from within these threads for the SQLAlchemy code to get certain configuration values safely. And no, I won't switch away from SQLAlchemy to another system just in order to accomplish what I need, so please do not offer alternatives to it. The project is too far along to switch something so fundamental to it.

I tried using asyncio.run_coroutine_threadsafe() and loop.call_soon_threadsafe() and both failed. So far, this has gotten the farthest on making it work, I feel like I am just missing something obvious.

When I get a chance, I will write some code that provides an example of the problem.

Ok, I implemented an example case, and it worked the way I would expect. So it is likely my problem is elsewhere in the code. Leaving this open and will change the question to fit my real problem if I need.

Does anyone have any possible ideas as to why a concurrent.futures.Future from asyncio.run_coroutine_threadsafe() would hang forever rather than return a result?

My example code that does not duplicate my error, unfortunately, is below:

import asyncio
import typing

loop = asyncio.get_event_loop()

class ConfigSimpleAttr:
    __slots__ = ('value', '_is_async')

    def __init__(
        self,
        value: typing.Any,
        is_async: bool=False
    ):
        self.value = value
        self._is_async = is_async

    async def _get_async(self):
        return self.value

    def __get__(self, inst, cls):
        if self._is_async and loop.is_running():
            return self._get_async()
        else:
            return self.value

class BaseConfig:
    __slots__ = ()

    attr1 = ConfigSimpleAttr(10, True)
    attr2 = ConfigSimpleAttr(20, True)    

    def get(self, key: str, default: typing.Any=None) -> typing.Any:
        return getattr(self, key, default)

    def get_sync(self, key: str, default: typing.Any=None) -> typing.Any:
        ret = self.get(key, default)

        if asyncio.iscoroutine(ret):
            if loop.is_running():
                fut = asyncio.run_coroutine_threadsafe(ret, loop)
                print(fut, fut.running())
                ret = fut.result()
            else:
                ret = loop.run_until_complete(ret)

        return ret

config = BaseConfig()

def example_func():
    return config.get_sync('attr1')

async def main():
    a1 = await loop.run_in_executor(None, example_func)
    a2 = await config.attr2
    val = a1 + a2
    print('{a1} + {a2} = {val}'.format(a1=a1, a2=a2, val=val))
    return val

loop.run_until_complete(main())

This is the stripped down version of exactly what my code is doing, and the example works, even if my actual application doesn't. I am stuck as far as where to look for answers. Suggestions are welcome as to where to try to track down my "stuck forever" problem, even if my code above doesn't actually duplicate the problem.

Upvotes: 6

Views: 2983

Answers (2)

Cliff Hill
Cliff Hill

Reputation: 131

Ok, I got my code working, by taking a different approach to it. The problem was tied with using something that had file IO, which I was converting into a coroutine using loop.run_in_executor() on the file IO components. Then, I was trying to use this in a sync function being called from another thread, processed using another loop.run_in_executor() on that function. This is a very important routine in my code (called probably a million times or more during the execution of my short-running code), and I made a decision that my logic was just getting too complicated. So... I uncomplicated it. Now, if I want to use the file IO components asynchronously, I explicitly use my "get_async()" method, otherwise, I use my attribute through normal attribute access.

By removing the complexity of my logic, it made the code cleaner, easier to understand, and even more importantly, it actually works. While I am not 100% certain that I know the root cause of the issue (I believe it has something to do with a thread processing an attribute, which then in turn starts another thread that tries to read the attribute before it is processed, which caused something like a race condition and halting my code, but I could never duplicate the error outside of my application unfortunately to completely prove it out), I was able to get past it and continue with my development efforts.

Upvotes: 0

Vincent
Vincent

Reputation: 13415

It is very unlikely that you need to run several event loops at the same time, so this part looks quite wrong:

    if loop.is_running():
        loop2 = asyncio.new_event_loop()
        try:
            ret = loop2.run_until_complete(ret)

        finally:
            loop2.close()
    else:
        ret = loop.run_until_complete(ret)

Even testing whether the loop is running or not doesn't seem to be the right approach. It's probably better to give explicitly the (only) running loop to get_sync and schedule the coroutine using run_coroutine_threadsafe:

def get_sync(self, key, loop):
    ret = self.get(key, default)
    if not asyncio.iscoroutine(ret):
        return ret
    future = asyncio.run_coroutine_threadsafe(ret, loop)
    return future.result()

EDIT: Hanging problems can be related to tasks being scheduled in the wrong loop (e.g. forgetting about the optional loop argument when calling a coroutine). This kind of problem should be easier to debug with the PR 303 (now merged): a RuntimeError is raised instead when the loop and the future don't match. So you might want to run your tests with the latest version of asyncio.

Upvotes: 1

Related Questions