Sergio Ivanuzzo
Sergio Ivanuzzo

Reputation: 1922

Python3 asyncio - callback for add_done_callback do not updates self variable in server class

I have two servers, created with asyncio.start_server: asyncio.start_server(self.handle_connection, host = host, port = port) and running in one loop:

loop.run_until_complete(asyncio.gather(server1, server2))
loop.run_forever()

I'm using asyncio.Queue to communicate between servers. Messages from Server2, added via queue.put(msg) successfully receives by queue.get() in Server1. I'm running queue.get() by asyncio.ensure_future and using as callback for add_done_callback method from Server1:

def callback(self, future):
    msg = future.result()
    self.msg = msg

But this callback not working as expected - self.msg do not updates. What am I doing wrong?

UPDATED with additional code to show max full example:

class Queue(object):

    def __init__(self, loop, maxsize: int):
        self.instance = asyncio.Queue(loop = loop, maxsize = maxsize)

    async def put(self, data):
        await self.instance.put(data)

    async def get(self):
        data = await self.instance.get()
        self.instance.task_done()
        return data

    @staticmethod
    def get_instance():
        return Queue(loop = asyncio.get_event_loop(), maxsize = 10)

Server class:

    class BaseServer(object):

        def __init__(self, host, port):
            self.instance = asyncio.start_server(self.handle_connection, host = host, port = port)

        async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
            pass

        def get_instance(self):
            return self.instance

        @staticmethod
        def create():
            return BaseServer(None, None)

Next I'm running the servers:

loop.run_until_complete(asyncio.gather(server1.get_instance(), server2.get_instance()))
loop.run_forever()

In the handle_connection of server2 I'm calling queue.put(msg), in the handle_connection of server1 I'm registered queue.get() as task:

 task_queue = asyncio.ensure_future(queue.get())
 task_queue.add_done_callback(self.process_queue)

The process_queue method of server1:

    def process_queue(self, future):
        msg = future.result()
        self.msg = msg

The handle_connection method of server1:

 async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
     task_queue = asyncio.ensure_future(queue.get())
     task_queue.add_done_callback(self.process_queue)

     while self.msg != SPECIAL_VALUE:
        # doing something

Although task_queue is done, self.process_queue called, self.msg never updates.

Upvotes: 2

Views: 1464

Answers (1)

Sraw
Sraw

Reputation: 20264

Basically as you are using asynchronous structure, I think you can directly await the result:

async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
    msg = await queue.get()
    process_queue(msg)  # change it to accept real value instead of a future.
    # do something

Upvotes: 3

Related Questions