Reputation: 526
I'm working on a Python application where I need to consume messages from a Kafka topic, process them by making an async API request, and produce a response to an outbound Kafka topic. Since the Kafka client I'm using is synchronous (confluent-kafka), I decided to use ThreadPoolExecutor to run the consumer in a separate thread and launch async tasks in the main event loop for I/O-bound operations.
The code works, but I'm facing a race condition when two requests arrive simultaneously. The acknowledgment is sent for both requests, but the actual API request (inside fetch_response_from_rest_service
) is sent for only one of the requests, twice. This issue is happening in the section of the code where I’ve marked a comment.
Here’s the relevant code:
import json
import confluent_kafka as kafka
import asyncio
from concurrent.futures import ThreadPoolExecutor
import logging as logger
async def run_prediction(inbound_topics, outbound_topics):
consumer_args = {'bootstrap.servers': config.BOOTSTRAP_SERVERS,
'group.id': config.APPLICATION_ID,
'default.topic.config': {'auto.offset.reset': config.AUTO_OFFSET_RESET},
'enable.auto.commit': config.ENABLE_AUTO_COMMIT,
'max.poll.interval.ms': config.MAX_POLL_INTERVAL_MS}
training_consumer = kafka.Consumer(consumer_args)
training_consumer.subscribe(inbound_topics)
outbound_producer = kafka.Producer({'bootstrap.servers': config.BOOTSTRAP_SERVERS})
logger.info(f"Listening to inbound topic {inbound_topics}")
while True:
msg = training_consumer.poll(timeout=config.KAFKA_POLL_TIMEOUT)
if not msg:
continue
if msg.error():
logger.info(f"Consumer error: {str(msg.error())}")
continue
try:
send_ack(msg.value(), "MESSAGE_RECEIVED")
loop = asyncio.get_event_loop()
loop.run_in_executor(executor, lambda: asyncio.run(
fetch_response_from_rest_service(message=msg.value().decode(),
callback=kafka_status_callback(msg, outbound_producer,
outbound_topics))))
except Exception as ex:
logger.exception(ex)
async def fetch_response_from_rest_service(message, callback):
# race condition happens at this point message variable when two requests come at same time
message = json.loads(message)
url = "SOME_ENDPOINT"
headers = {
"Content-Type": "application/json"
}
response = None
try:
logger.info(f"sending request to {url} for payload {message}")
response = await async_request("POST", url, headers, data=json.dumps(message),
timeout=10)
response = json.loads(response)
except Exception as ex:
logger.exception(f"All retries failed. Error: {ex}")
finally:
callback(response)
asyncio.run(run_prediction(["INBOUND_TOPIC"], ["OUTBOUND_TOPIC"]))
send_ack method just sends a message on a kafka topic regarding acknowledgment of processing.
I identified the race condition when multiple messages are processed at the same time. The fetch_response_from_rest_service
function seems to be using the wrong message for one of the requests, causing it to reuse the some message twice and other one gets dropped.
I tried solving this by locking the section of the code that processes the message variable:
async def fetch_response_from_rest_service(message, callback):
message_copy_lock = asyncio.Lock()
async with message_copy_lock:
logger.info(f"Got message : {json.loads(message)['conversationRequest']['requestId']}")
message = json.loads(message)
url = "SOME_ENDPOINT"
headers = {
"Content-Type": "application/json"
}
response = None
try:
logger.info(f"sending request to {url} for payload {message}")
response = await async_request("POST", url, headers, data=json.dumps(message),
timeout=10)
response = json.loads(response)
except Exception as ex:
logger.exception(f"All retries failed. Error: {ex}")
finally:
callback(response)
However, this did not fix the issue.
My constraints:
asyncio.create_task()
if I
was using an async Kafka client, where I could await for response from API call and still go ahead with polling requests, but I want to avoid that path due to
project limitations.Questions:
Any help or suggestions would be appreciated!
Upvotes: 0
Views: 274
Reputation: 110591
You are doing really a crazy mess in this code, and it is actually surprising it gets as far as working in some special cases.
The things come down to organization of the tasks you have - so, let's see your questions at the end first:
Because you are scheduling a lambda
function to be run in the off-thread executor, but this function is using the nonlocal variable msg
, and by the time it's run (in the other thread), the contents of msg
have changed and it actually runs with the next message.
(One first hint: there is no way the problem could be at fetch_response_from_fetch_service
, as inside it, message
is a plain local variable, and these are pretty much shielded and scoped to the function in Python. The only way the same message can be replied twice is if the function is called twice with the same message)
By understanding how asyncio and multithreading are supposed to be combined. (hint: no need to run running asyncio loops in every thread - we just use other threads because code in there is supposed to be synchronous.
Ok -this is a major needed clean-up for you code, but not what is wrong. The fix is really: do not use msg
as a nonlocal variable in the function to be run in the executor - instead, pass it as an eager parameter.
As simple as possible - the idea behind both a ThreadPoolExecutor and an async loop are the same: your work is divided in tasks, and tasks run concurrently. The async tasks run as tasks in the same thread, and are managed by the asyncio loop - and the synchronous tasks are scheduled and submmitted to different worker threads - managed by some machinery of the executor, so that new tasks - as function calls - are scheduled, executed, and their return value "sent back", and then a thread is ready to be reused for a new task.
There is no sense in running multiple event loops in the other threads - unless one really have workloads that can benefit from managing the a group of async tasks in another thread -which can make sense in a Free Threading (no GIL) build of Python (3.13t and onwards), but not up to Python 3.12
The main idea is correct. The way you are doing it is not: there is no need (or sense) in calling asyncio.run
in each thread. And what is wrong, actually, is a side effect of using non-local variables in your target function (the lambda).
Oh, on a second read when I went to get the working version: fetch_response_from_rest_service
actually makes use of async code, so it should run in the asycio - main thread, not in an executor. AND you don't say what kafka_status_callback
does or should return - but you are aware it is called eagerly aren't you? Since you mentioned that your code is working, even if only sometimes, I am assuming it is being using correctly. and calling the function returns instantaneously a Python callable which will work with callback(response)
later on.
In fixing your code, I will assume this callback is the I/O bound synchronous function that acts on Kafka - but I can't be sure if that is correct.
Just the kafka related code should run in the executor.
No - let's ust fix some of the confuse mess and things will work. However, since you mention it, your idea of the Lock usage is incorrect, and I suggest you read a bit about it. TL;DR: you created the lock as a local variable, so the only code ever using it is the local code - no other code is blocked to run if this lock is active - the code that could be running in parallel and mess thins up will have another Lock instance of its own.
Now, onto a working version:
import json
import confluent_kafka as kafka
import asyncio
from concurrent.futures import ThreadPoolExecutor
import logging as logger
from functoils import partial # Needed to be able to pass kwargs to
# a function to be called with `run_in_executor`.
async def run_prediction(inbound_topics, outbound_topics):
consumer_args = {'bootstrap.servers': config.BOOTSTRAP_SERVERS,
'group.id': config.APPLICATION_ID,
'default.topic.config': {'auto.offset.reset': config.AUTO_OFFSET_RESET},
'enable.auto.commit': config.ENABLE_AUTO_COMMIT,
'max.poll.interval.ms': config.MAX_POLL_INTERVAL_MS}
training_consumer = kafka.Consumer(consumer_args)
training_consumer.subscribe(inbound_topics)
outbound_producer = kafka.Producer({'bootstrap.servers': config.BOOTSTRAP_SERVERS})
logger.info(f"Listening to inbound topic {inbound_topics}")
running_tasks = set() # all asyncio tasks created to run concurrently
# must be gathered somewhere - at some point
# they have to be _awaited_
ack_tasks = set()
loop = asyncio.get_running_loop() # you only need one reference to this
# no point in making this call inside the while loop
# (also, note that in modern asyncio code, you should call
# get_running_loop, not get_event_loop)
while True:
# consumer.poll is a synchronous IO bound call isn't it?
# it should run in an executor! otherwise it blocks the async loop!
# Also, I don't see any declaration of `executor` in your code .
# Assuming it is fine - or you could just use `None` and asyncio
# will use its default executor.
msg = await loop.run_in_executor( # your code were not awaiting for the `run_in_executor` tasks
executor,
partial(consumer.poll, timeout=config.KAFKA_POLL_TIMEOUT) # partial returns the function as a callable with the
# named arguments bound - so it can be called lazly
# in the worker thread.
)
if msg and msg.error():
logger.info(f"Consumer error: {str(msg.error())}")
# try: # exceptions will actually be raised when we _await_ the running tasks.
# this is sync, IObound, right? we can run in executor, but
# this time, instead of awaiting here, we fire it, and await it later.
if msg:
ack_tasks.add(
loop.run_in_executor(
executor,
send_ack, msg.value(), "MESSAGE_RECEIVED"
)
)
# fetch_response_from_rest_service is an async function
# which should run on _this_ thread!
running_tasks.add(
asyncio.create_task(
fetch_response_from_rest_service(
msg.value().decode(),
callback=kafka_status_callback(
msg, outbound_producer, outbound_topics
) # Assuming `kafka_status_Callback is __non__ blocking itself, and that it returns a blcking callable.
) # this (fetch_response...) is called eagerly, before the `create_task` -
# its _immediate_ return value is a co-routine whichi is wraped in the task
# that will run concurrently - but the important prt: without the `lambda`
# which made this a lazy call to run in other thread
# the `msg` variable will be used _synchronously_, now,
# and the race condition is avoided
)
)
# the `cotinue`s were removed so that we can, even when no message is received,
# await for the tasks running concurrently, so they can be resolved.
# first, consume the done "send_ack" tasks:
done, pending = asyncio.wait(ack_tasks, timeout=0)
ack_tasks = pending
# if you want to check for exceptions in the `ack` this is the place:
# loop through the `done` tasks, and check their `.exception()` method.
# otherwise just go on.
# now, check the complete fetch_response_from_rest_service tasks!
done, pending = asyncio.wait(running_tasks, timeout=0)
for task in done:
if error:=task.exception():
logger.exception(error) # feel free to add more info, about the task identity, etc...
running_tasks = pending
async def fetch_response_from_rest_service(message, callback):
# now `message` will be a unique `msg` your previous error won't tae place
# (we were not called from a lambda which might have been executed just when
# the local variabl `msg` in `run_prediction` function
# had the contents of the next message already)
# WAS: race condition happens at this point message variable when two requests come at same time
message_dict = json.loads(message) # do you need this?
# in this snippet you are decoding the json
# just to encode it back.
url = "SOME_ENDPOINT"
headers = {
"Content-Type": "application/json"
}
response = None
try:
logger.info(f"sending request to {url} for payload {message}") # use `message_dict` if you want the decoded json
# just for logging.
response = await async_request("POST", url, headers, data=message,
timeout=10)
response = json.loads(response)
except Exception as ex:
logger.exception(f"All retries failed. Error: {ex}")
finally:
# and, I am assuming this callback is synchronous, IO bound!
# we run _it_ and _only_ it in other thread - and await the result!
await asyncio.get_running_loop().run_in_executor(executor, callback, response))
asyncio.run(run_prediction(["INBOUND_TOPIC"], ["OUTBOUND_TOPIC"]))
Upvotes: 0