Reputation: 1922
I have two servers, created with asyncio.start_server:
asyncio.start_server(self.handle_connection, host = host, port = port)
and running in one loop:
loop.run_until_complete(asyncio.gather(server1, server2))
loop.run_forever()
I'm using asyncio.Queue to communicate between servers. Messages from Server2, added via queue.put(msg)
successfully receives by queue.get()
in Server1. I'm running queue.get()
by asyncio.ensure_future
and using as callback for
add_done_callback
method from Server1:
def callback(self, future):
msg = future.result()
self.msg = msg
But this callback
not working as expected - self.msg do not updates. What am I doing wrong?
UPDATED with additional code to show max full example:
class Queue(object):
def __init__(self, loop, maxsize: int):
self.instance = asyncio.Queue(loop = loop, maxsize = maxsize)
async def put(self, data):
await self.instance.put(data)
async def get(self):
data = await self.instance.get()
self.instance.task_done()
return data
@staticmethod
def get_instance():
return Queue(loop = asyncio.get_event_loop(), maxsize = 10)
Server class:
class BaseServer(object):
def __init__(self, host, port):
self.instance = asyncio.start_server(self.handle_connection, host = host, port = port)
async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
pass
def get_instance(self):
return self.instance
@staticmethod
def create():
return BaseServer(None, None)
Next I'm running the servers:
loop.run_until_complete(asyncio.gather(server1.get_instance(), server2.get_instance()))
loop.run_forever()
In the handle_connection
of server2 I'm calling queue.put(msg)
, in the handle_connection
of server1 I'm registered queue.get()
as task:
task_queue = asyncio.ensure_future(queue.get())
task_queue.add_done_callback(self.process_queue)
The process_queue
method of server1:
def process_queue(self, future):
msg = future.result()
self.msg = msg
The handle_connection
method of server1:
async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
task_queue = asyncio.ensure_future(queue.get())
task_queue.add_done_callback(self.process_queue)
while self.msg != SPECIAL_VALUE:
# doing something
Although task_queue
is done, self.process_queue
called, self.msg
never updates.
Upvotes: 2
Views: 1464
Reputation: 20264
Basically as you are using asynchronous structure, I think you can directly await the result:
async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
msg = await queue.get()
process_queue(msg) # change it to accept real value instead of a future.
# do something
Upvotes: 3