codegirl2000
codegirl2000

Reputation: 23

Asyncio event loop within a thread issue

Trying to create a event loop inside a thread, where the thread is initiated within the constructor of a class. I want to run multiple tasks within the event loop. However, having an issue whenever I try to run with the thread and get the error "NoneType object has no attribute create_task" Is there something I am doing wrong in calling it.

import asyncio
import threading 

Class Test():
  def __init__(self):
    self.loop = None
    self.th = threading.Thread(target=self.create)
    self.th.start()

  def __del__(self):
    self.loop.close()

  def self.create(self):
    self.loop = new_event_loop()
    asyncio.set_event_loop(self.loop)

  def fun(self):
    task = self.loop.create_task(coroutine)
    loop.run_until_complete(task)

  def fun2(self):
    task = self.loop.create_task(coroutine)
    loop.run_until_complete(task)

t = Test()
t.fun()
t.fun2()

Upvotes: 2

Views: 606

Answers (1)

Paul Cornelius
Paul Cornelius

Reputation: 11009

It is tricky to combine threading and asyncio, although it can be useful if done properly.

The code you gave has several syntax errors, so obviously it isn't the code you are actually running. Please, in the future, check your post carefully out of respect for the time of those who answer questions here. You'll get better and quicker answers if you spot these avoidable errors yourself.

  • The keyword "class" should not be capitalized.
  • The class definition does not need empty parenthesis.
  • The function definition for create should not have self. in front of it.
  • There is no variable named coroutine defined in the script.

The next problem is the launching of the secondary thread. The method threading.Thread.start() does not wait for the thread to actually start. The new thread is "pending" and will start sometime soon, but you don't have control over when that happens. So start() returns immediately; your __init__ method returns; and your call to t.fun() happens before the thread starts. At that point self.loop is in fact None, as the error message indicates.

An nice way to overcome this is with a threading.Barrier object, which can be used to insure that the thread has started before the __init__ method returns.

Your __del__ method is probably not necessary, and will normally only get executed during program shut down. If it runs under any other circumstances, you will get an error if you call loop.close on a loop that is still running. I think it's better to insure that the thread shuts down cleanly, so I've provided a Test.close method for that purpose.

Your functions fun and fun2 are written in a way that makes them not very useful. You start a task and then you immediately wait for it to finish. In that case, there's no good reason to use asyncio at all. The whole idea of asyncio is to run more than one task concurrently. Creating tasks one at a time and always waiting for each one to finish doesn't make a lot of sense.

Most asyncio functions are not threadsafe. You have to use the two important methods loop.call_soon_threadsafe and asyncio.run_coroutine_threadsafe if you want to run asyncio code across threads. The methods fun and fun2 execute in the main thread, so you should use run_coroutine_threadsafe to launch tasks in the secondary thread.

Finally, with such programs it's usually a good idea to provide a thread shutdown method. In the following listing, close obtains a list of all the running tasks, sends a cancel message to each, and then sends the stop command to the loop itself. Then it waits for the thread to really exit. The main thread will be blocked until the secondary thread is finished, so the program will shut down cleanly.

Here is a simple working program, with all the functionality that you seem to want:

import asyncio
import threading

async def coro(s):
    print(s)
    await asyncio.sleep(3.0)

class Test:
    def __init__(self):
        self.loop = None
        self.barrier = threading.Barrier(2)  # Added
        self.th = threading.Thread(target=self.create)
        self.th.start()
        self.barrier.wait()   # Blocks until the new thread is running

    def create(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.barrier.wait()
        print("Thread started")
        self.loop.run_forever() 
        print("Loop stopped")
        self.loop.close()  # Clean up loop resources
    
    def close(self):  # call this from main thread
        self.loop.call_soon_threadsafe(self._close)
        self.th.join()  # Wait for the thread to exit (insures loop is closed)
        
    def _close(self):  # Executes in thread self.th
        tasks = asyncio.all_tasks(self.loop)
        for task in tasks:
            task.cancel()
        self.loop.call_soon(self.loop.stop)

    def fun(self): 
        return asyncio.run_coroutine_threadsafe(coro("Hello 1"), self.loop)

    def fun2(self):
        return asyncio.run_coroutine_threadsafe(coro("Hello 2"), self.loop)

t = Test()
print("Test constructor complete")
t.fun()
fut = t.fun2()
# Comment out the next line if you don't want to wait here
# fut.result()  # Wait for fun2 to finish
print("Closing")
t.close()
print("Finished")

Upvotes: 1

Related Questions