Nathan
Nathan

Reputation: 101

Python update queue with several asyncio

I want to update Queue with several asyncio I receive data from each A,B,C( using websocket and "while true") and then i want to put in the queue and all the provider will be able to write in the same Queue ( I know that maybe i need to use multiThread or something else but i dont find the right way

**if __name__ == '__main__':
global_queue = queue.Queue()
asyncio.run(A_Orderbook.data_stream(global_queue))
asyncio.run(B_Orderbook.data_stream(global_queue))
asyncio.run(C_Orderbook.data_stream(global_queue))
print(global_queue.qsize())**

Thks

Upvotes: 1

Views: 396

Answers (1)

Artiom  Kozyrev
Artiom Kozyrev

Reputation: 3826

You can do it the following way:

import asyncio


async def worker(worker_name: str, q: asyncio.Queue):
    """Produces tasks for consumer."""
    for i in range(1, 6):
        await asyncio.sleep(1)
        await q.put(f"{worker_name}-{i}")


async def consumer(q: asyncio.Queue):
    """Consumes tasks from workers."""
    while True:
        item = await q.get()
        await asyncio.sleep(1)
        print(item)
        # we need it to ensure that all tasks were done
        q.task_done()   


async def main_wrapper():
    """Main function - entry point of our async app."""
    q = asyncio.Queue()
    # we do not need to await the asyncio task it is run in "parallel"
    asyncio.create_task(consumer(q))  
    await asyncio.gather(*[worker(f"w{i}", q) for i in range(1, 5)])  # create worker-tasks
    await q.join()  # we wait until asyncio.create_task(consumer(q)) consume all tasks
    print("All DONE !")

if __name__ == '__main__':
    asyncio.run(main_wrapper())


Upvotes: 1

Related Questions