Alexis.Rolland
Alexis.Rolland

Reputation: 6363

Add task to asyncio loop running in separate thread

I'm trying to run an asyncio loop in a separate thread from my main thread. While the thread and loop are running, I would like to add new tasks to it. I have the following code:

class Craft:
    # [...]

    async def exec_subscription(self, session, subscription: str, variable_values: dict, callback: Callable) -> None:
        """Execute a subscription on the GraphQL API."""
        async for response in session.subscribe(gql(subscription), variable_values=variable_values):
            callback(response)

    def subscribe_job(self, job_id: int, callback: Callable) -> Union[bool, None]:
        """Subscribe to a job, receive the job information every time it is updated and pass it to a callback function."""

        async def _schedule_subscription_task(subscribe_job: str, variable_values: dict, callback: Callable) -> None:
            """Schedule subscription task in asyncio loop using existing websocket connection."""
            async with self._ws_client as session:
                task = asyncio.create_task(self.exec_subscription(session, subscribe_job, variable_values, callback))
                await asyncio.gather(task)
 
        def _run_subscription_loop(subscribe_job: str, variable_values: dict, callback: Callable) -> None:
            """Run asyncio loop."""
            asyncio.run(_schedule_subscription_task(subscribe_job, variable_values, callback))

        # Build GraphQL subscription
        subscribe_job = """
            subscription jobSubscription($jobId: Int!) {
                jobSubscriptionById(id: $jobId) {
                    job {...}
                }
            }
        """

        # Build variables dictionary
        variable_values = {
            'jobId': job_id
        }

        # Check if subscription thread is running
        thread_name = 'CraftSubscriptionThread'
        for thread in threading.enumerate():
            # If thread is found, add subscription to existing asyncio loop
            if thread.name == thread_name:
                # Add task to asyncio loop
                loop = asyncio.get_event_loop()
                asyncio.run_coroutine_threadsafe(_schedule_subscription_task(subscribe_job, variable_values, callback), loop)
                return True

        # Else create new event loop and new thread
        thread = threading.Thread(name=thread_name, daemon=True, target=_run_subscription_loop, args=(subscribe_job, variable_values, callback))
        thread.start()

        return True

In a Python terminal, I run the main method subscribe_job with the following callback function:

from craft import Craft

global updated_job
updated_job = "0"

def print_job(job):
    global updated_job
    updated_job = job
    print(updated_job)

craft.subscribe_job(1561, print_job)

This works well and when the subscription receives a message, it prints the job in the terminal:

>>> {'jobSubscriptionById': {'job': {'id': 1561, 'spaceId': 1, 'applicationId': 47, 'configurationId': 139, 'title': 'Job 1631357928', 'description': None, 'status': 'failed', 'step': 1, 'progress': '1.00'}}}

However, when I trigger another subscription to add a new task to my loop, it seems nothing happens. I simply trigger a new subscription for a different job as below, it supposedly call run_coroutine_threadsafe:

craft.subscribe_job(1561, print_job)

Upvotes: 1

Views: 3091

Answers (2)

Paul Cornelius
Paul Cornelius

Reputation: 11009

As far as I can tell from the code you provided, subscribe_job() is running in the main thread. Therefore, when you execute this line:

loop = asyncio.get_event_loop()

the loop you are retrieving is the event loop in the main thread. That's not what you want; you want the loop in the secondary thread. Since you say that nothing happens when you call asyncio.run_coroutine_threadsafe in the next line, I will venture to guess that your program has not actually started the event loop in the main thread. Otherwise, you would see the function actually run - but it would be executing in the main thread.

It takes a bit of work to fix this. You need to retrieve the secondary thread's event loop and store it in a variable. I'm guessing a little bit about how the rest of your program is written, but one approach would be to create a member variable in the class's constructor:

def __init__(self):
    self.secondary_loop = None

Now in your nested function async def schedule_subscription_task(...) add this line:

async def schedule_subscription_task(...):
    self.secondary_loop = asyncio.get_running_loop()

Back in the main part of subscribe_job, change your call to run_coroutine_threadsafe to this:

asyncio.run_coroutine_threadsafe(..., self.secondary_loop)

Admittedly there is a race condition here. The variable self.secondary_loop doesn't get set immediately upon launching the secondary thread. If you call subscribe_job twice in rapid succession, that variable may still be None even though you have actually started the secondary thread. You might need to add a small time delay after thread creation, which will only run once and shouldn't affect noticeably the performance of the program.

Upvotes: 1

Matt Fowler
Matt Fowler

Reputation: 2743

The main problem I see with your code is the line loop = asyncio.get_event_loop(). This method gets the currently running event loop in the current thread and if one does not exist, it creates one. The problem is the second time you call subscribe you're calling it from your main thread and you've only ever started an event loop in your CraftSubscriptionThread thread, therefore get_event_loop creates a new event loop which isn't even running (print loop.is_running() after you call get_event_loop() - it will return False), explaining why nothing happens.

Also note that asyncio.run creates a new event loop every time you call it, which based on your description does not sound like what you want. I would avoid looping through to see if the CraftSubscriptionThread thread is running and trying to get the event loop. Instead, create a thread with one explicit event loop and then start it with run_forever. Then, when you submit to your threaded event loop you can call run_coroutine_threadsafe as you were before. Here is a simplified version of your code that should illustrate how to solve this problem:

class ThreadedEventLoop(threading.Thread):
    def __init__(self):
        super().__init__()
        self._loop = asyncio.new_event_loop()
        self.daemon = True

    async def exec_subscription(self, callback: Callable) -> None:
        while True: #Simulate the callback firing every two seconds.
            await asyncio.sleep(2)
            callback('hello!')

    async def schedule_subscription_task(self, callback):
        await self.exec_subscription(callback)

    def submit(self, callback):
        asyncio.run_coroutine_threadsafe(self.schedule_subscription_task(callback), self._loop)

    def run(self) -> None:
        self._loop.run_forever()


def print_job(job):
    print('job1: updated_job')


def print_job2(job):
    print('job2: updated_job')


threaded_loop = ThreadedEventLoop()
threaded_loop.start()
threaded_loop.submit(print_job)
threaded_loop.submit(print_job2)
threaded_loop.join()

Upvotes: 2

Related Questions