Reputation: 41
It is the first time that I post here so sorry if everything is not perfect.
The situation is the following : I send some bursts of NMEA sentences from a client to a server over a TCP connection using socket in python.
I want to avoid any latency for sending and receiving NMEA sentences. Therefore, I want the receiving of NMEA sentences from the server side to be the priority, and use the time between the bursts of data to process the sentences and store it in a database.
Therefore I wanted to use asyncio (python 3.8) to continuously add the NMEA sentences to a asyncio.queue and let the process of these sentences to happen while the function to receive data from the socket is waiting for new data.
The pseudo-code of my main loop inside my class is the following :
#Producer
async def _read_comms(self,queue):
""" """
while True:
comm_nmea = await self.DataPort.receive_line_async()
await queue.put(comm_nmea)
#Consumers
async def _process_comms(self,queue):
""" """
while True:
comm_nmea = await queue.get()
data = await self.Parser.parse_comm_nmea(comm_nmea=comm_nmea)
await self._data_to_db(data)
#main
async def receive(self):
""" """
self.Logger.info("Start collecting")
while(True):
queue = asyncio.Queue()
await self._read_comms(queue)
await self._process_comms(queue)
if self.update_rate != 0:
time.sleep(self.update_rate)
I must say that I am really new to asyncio (I started for this project). And I am pretty sure that I do a lot of stupid stuff there.
The problem with this code, is that I never get outside of the first loop in process_comms(). Which was to be expected but I am looking for an solution for what I am trying to do here.
Which is as a summary to have two concurrent loop that use the Queue as a buffer. One to feed the queue, the other one to process it.
Upvotes: 2
Views: 1175
Reputation: 41
Never mind, solved my problem !
For those that might encounter same problem :
I solved that by using :
queue = asyncio.Queue()
await asyncio.gather(self._read_comms(queue),self._process_comms(queue))
Upvotes: 2