Reputation: 6363
I'm trying to run an asyncio loop in a separate thread from my main thread. While the thread and loop are running, I would like to add new tasks to it. I have the following code:
class Craft:
# [...]
async def exec_subscription(self, session, subscription: str, variable_values: dict, callback: Callable) -> None:
"""Execute a subscription on the GraphQL API."""
async for response in session.subscribe(gql(subscription), variable_values=variable_values):
callback(response)
def subscribe_job(self, job_id: int, callback: Callable) -> Union[bool, None]:
"""Subscribe to a job, receive the job information every time it is updated and pass it to a callback function."""
async def _schedule_subscription_task(subscribe_job: str, variable_values: dict, callback: Callable) -> None:
"""Schedule subscription task in asyncio loop using existing websocket connection."""
async with self._ws_client as session:
task = asyncio.create_task(self.exec_subscription(session, subscribe_job, variable_values, callback))
await asyncio.gather(task)
def _run_subscription_loop(subscribe_job: str, variable_values: dict, callback: Callable) -> None:
"""Run asyncio loop."""
asyncio.run(_schedule_subscription_task(subscribe_job, variable_values, callback))
# Build GraphQL subscription
subscribe_job = """
subscription jobSubscription($jobId: Int!) {
jobSubscriptionById(id: $jobId) {
job {...}
}
}
"""
# Build variables dictionary
variable_values = {
'jobId': job_id
}
# Check if subscription thread is running
thread_name = 'CraftSubscriptionThread'
for thread in threading.enumerate():
# If thread is found, add subscription to existing asyncio loop
if thread.name == thread_name:
# Add task to asyncio loop
loop = asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(_schedule_subscription_task(subscribe_job, variable_values, callback), loop)
return True
# Else create new event loop and new thread
thread = threading.Thread(name=thread_name, daemon=True, target=_run_subscription_loop, args=(subscribe_job, variable_values, callback))
thread.start()
return True
In a Python terminal, I run the main method subscribe_job
with the following callback function:
from craft import Craft
global updated_job
updated_job = "0"
def print_job(job):
global updated_job
updated_job = job
print(updated_job)
craft.subscribe_job(1561, print_job)
This works well and when the subscription receives a message, it prints the job in the terminal:
>>> {'jobSubscriptionById': {'job': {'id': 1561, 'spaceId': 1, 'applicationId': 47, 'configurationId': 139, 'title': 'Job 1631357928', 'description': None, 'status': 'failed', 'step': 1, 'progress': '1.00'}}}
However, when I trigger another subscription to add a new task to my loop, it seems nothing happens. I simply trigger a new subscription for a different job as below, it supposedly call run_coroutine_threadsafe
:
craft.subscribe_job(1561, print_job)
Upvotes: 1
Views: 3091
Reputation: 11009
As far as I can tell from the code you provided, subscribe_job()
is running in the main thread. Therefore, when you execute this line:
loop = asyncio.get_event_loop()
the loop you are retrieving is the event loop in the main thread. That's not what you want; you want the loop in the secondary thread. Since you say that nothing happens when you call asyncio.run_coroutine_threadsafe
in the next line, I will venture to guess that your program has not actually started the event loop in the main thread. Otherwise, you would see the function actually run - but it would be executing in the main thread.
It takes a bit of work to fix this. You need to retrieve the secondary thread's event loop and store it in a variable. I'm guessing a little bit about how the rest of your program is written, but one approach would be to create a member variable in the class's constructor:
def __init__(self):
self.secondary_loop = None
Now in your nested function async def schedule_subscription_task(...)
add this line:
async def schedule_subscription_task(...):
self.secondary_loop = asyncio.get_running_loop()
Back in the main part of subscribe_job
, change your call to run_coroutine_threadsafe
to this:
asyncio.run_coroutine_threadsafe(..., self.secondary_loop)
Admittedly there is a race condition here. The variable self.secondary_loop doesn't get set immediately upon launching the secondary thread. If you call subscribe_job
twice in rapid succession, that variable may still be None even though you have actually started the secondary thread. You might need to add a small time delay after thread creation, which will only run once and shouldn't affect noticeably the performance of the program.
Upvotes: 1
Reputation: 2743
The main problem I see with your code is the line loop = asyncio.get_event_loop()
. This method gets the currently running event loop in the current thread and if one does not exist, it creates one. The problem is the second time you call subscribe
you're calling it from your main thread and you've only ever started an event loop in your CraftSubscriptionThread
thread, therefore get_event_loop
creates a new event loop which isn't even running (print loop.is_running()
after you call get_event_loop()
- it will return False
), explaining why nothing happens.
Also note that asyncio.run
creates a new event loop every time you call it, which based on your description does not sound like what you want. I would avoid looping through to see if the CraftSubscriptionThread
thread is running and trying to get the event loop. Instead, create a thread with one explicit event loop and then start it with run_forever
. Then, when you submit to your threaded event loop you can call run_coroutine_threadsafe
as you were before. Here is a simplified version of your code that should illustrate how to solve this problem:
class ThreadedEventLoop(threading.Thread):
def __init__(self):
super().__init__()
self._loop = asyncio.new_event_loop()
self.daemon = True
async def exec_subscription(self, callback: Callable) -> None:
while True: #Simulate the callback firing every two seconds.
await asyncio.sleep(2)
callback('hello!')
async def schedule_subscription_task(self, callback):
await self.exec_subscription(callback)
def submit(self, callback):
asyncio.run_coroutine_threadsafe(self.schedule_subscription_task(callback), self._loop)
def run(self) -> None:
self._loop.run_forever()
def print_job(job):
print('job1: updated_job')
def print_job2(job):
print('job2: updated_job')
threaded_loop = ThreadedEventLoop()
threaded_loop.start()
threaded_loop.submit(print_job)
threaded_loop.submit(print_job2)
threaded_loop.join()
Upvotes: 2