unixak
unixak

Reputation: 3

Threading with asyncio issue

I want to run few threads and in each thread to have independent asyncio loop where will be processed list of async routines.

Each thread has created local instance of class 'data' but in real its look like as a shared object between threads. I don't understand why it happend.

So, question are:

  1. Why it happend? Each thread should have own local instance of 'data' (unique).
  2. How to solve this issue? Synchronization across threads with object 'data' is not needed.

Here is code, dont worry about exceptions, thread joining, etc.. It's simplified as an example.

Expected output:

id=1, list a: ['1', '1', '1']

Real Output:

id=1, list a: ['1', '3', '2', '1', '3', '2', '3', '2', '1']

Data processing:

class data:

id = 0
a = []
b = []

def __init__(self, id):
    self.id = id

async def load_0(self):
    for i in range(0, 3):
        self.a.append(str(self.id))
        await asyncio.sleep(0.1)

async def load_n(self):
    for i in range(0, 3):
        self.b.append(str(self.id))
        await asyncio.sleep(0.1)

Run asyncio tasks in thread:

async def thread_loop(loop, id):
    tasks = []

    d = data(id)

    # 0 .. n tasks
    tasks.append(asyncio.create_task(d.load_0()))
    tasks.append(asyncio.create_task(d.load_n()))

    await asyncio.gather(*tasks, return_exceptions = True)

if (id == 1):
    print('id=' + str(d.id) + ', list a: ' + str(d.a))

New event loop in thread:

def thread_main(id):
    loop = asyncio.new_event_loop()
    loop.run_until_complete(thread_loop(loop, id))

Create and start threads:

async def start(threads):
    threads.append(threading.Thread(target = thread_main, args = (1,)))
    threads.append(threading.Thread(target = thread_main, args = (2,)))

    for thread in threads:
        thread.start()

    while True:
        await asyncio.sleep(0.1)

Main:

if __name__ == '__main__':
    threads = []
    loop = asyncio.get_event_loop()
    loop.run_until_complete(start(threads))

Upvotes: 0

Views: 370

Answers (1)

dirn
dirn

Reputation: 20709

Each of your threads has its own instance of data. You get that with d = data(id). The reason you're seeing that behavior when you inspect d.a and d.b is that they are shared across all threads. This isn't related to threads or asyncio; it's the way you define your class.

When you assign mutable objects to class-level attributes, these objects are shared across all instances of the class.

>>> class C:
...     l = []
...
>>> c1 = C()
>>> c2 = C()
>>>
>>> c1.l.append(1)
>>> c2.l
[1]

The way to fix this is to move the assignment of the initial value to __init__.

>>> class C:
...     def __init__(self):
...         self.l = []
...
>>> c1 = C()
>>> c2 = C()
>>>
>>> c1.l.append(1)
>>> c2.l
[]

In your case that would be

class data:
    id = 0

    def __init__(self, id):
        self.id = id
        self.a = []
        self.b = []

You can even remove id = 0 from the class's definition since you assign a value in __init__.

class data:
    def __init__(self, id):
        self.id = id
        self.a = []
        self.b = []

This may be more than you need, especially without knowing what your real code looks like, but you could also consider using a dataclass.

from dataclasses import dataclass, field

@dataclass
class data:
    id: int
    a: list[str] = field(default_factory=list)
    b: list[str] = field(default_factory=list)

Note: Using list[str] requires either Python 3.10 or from __future__ import annotations. Otherwise you'll need to use typing.List[str] instead.

Upvotes: 2

Related Questions