StormsEdge
StormsEdge

Reputation: 885

Trouble with confluent-kafka consumer working with asyncio

I am trying to integrate asyncio functionality into my kafka topic listener and am running into issues (pretty new to async programming in python).

I have a confluent-kafka consumer created that is listening to a topic. The topic has messages frequently and performance is of the utmost importance (hence the introduction of async io).

The main() function looks like this:

def main(self):
    while True:
        try:
            msg = consumer.poll(timeout=5.0)
            if msg is None:
                continue

            asyncio.ensure_future(handle_message(message))
        finally:
            consumer.close()     

Essentially I want to pull messages off the topic in a linear fashion, but the handling of the message should be asynchronous...meaning that any database I/O etc that happens in handle_message will be dealt with asynchronously (I have the awaits etc set up in that function properly). The problem is that it seems I never start execution in asyncio.ensure_future(). How do I continuously add tasks to an asynchronous loop as I pull messages from the kafka topic? Using confluent-kafka==1.5.0

Upvotes: 0

Views: 3385

Answers (1)

Russell Owen
Russell Owen

Reputation: 447

One problem is that msg = consumer.poll(timeout=5.0) will block the event loop. One way to avoid this is to run it in a concurrent.futures.ThreadPoolExecutor (which you should create before the loop, rather than creating a new one for each iteration of the loop).

Another issue is that you are not handling the situation that data comes in faster than you process it. I suggest you consider some way to limit the number of running tasks. One technique is to put each new created task into a set or similar collection. Before adding an item purge the collection of finished tasks. Then examine the size to see if you want to start another task, and if not, wait for one to finish.

Upvotes: 0

Related Questions