Reputation: 802
I am new to python asyncio and have a slightly convoluted requirement. I have been going through the documentation for asyncio but have not found the right solution yet as I am unable to understand some aspects.
I have a script KafkaScript.py that reads from a kafka topic (streaming) and calls an API endpoint. The API will call a method method_A in another script ProcessingScript.py in parallel for each incoming request/kafka record.
method_A calls an asynchronous method_B which uses a globally defined ThreadPoolExecutor to call another asynchronous method_C using run_in_executor(). method_C return a string back to method_B which in turn must respond back to method_A.
Here is the script:
KafkaScript.py:
import requests
if __name__ == '__main__':
for i in Kafka-Topic:
# Use a new thread to call make API request for each kafka record
response = requests.get(<url generated>) # i is passed as param
The API:
import ProcessingScript
@app.route('/v1/generate', methods = ['GET'])
def generate():
data = request.args.get('data', type = str)
response = ProcessingScript.method_A(data)
return response
ProcessingScript.py:
executor = ThreadPoolExecutor(max_workers=5)
async def method_C(val):
# Calls playwright to get content from a page which is a blocking call per thread
return "processed:" + val
async def method_B(val):
loop = asyncio.get_event_loop()
# For each parallel thread, call method_C, get the response without blocking other threads
response = loop.run_in_executor(self.executor, asyncio.run, method_C(val))
return response
def method_A(val):
response = asyncio.run(method_B(val))
return response
As I understand, method_B can receive the string from method_C only when I make a call such as this.
response = await loop.run_in_executor(self.executor, asyncio.run, method_C(val))
The problem with the await here is, although multiple threads are calling method_A, the await makes all threads wait for the first thread to have a response.
How do I ensure this in a thread safe manner where none of the parallel requests to method_A are blocked by others threads waiting for the response?
Upvotes: 0
Views: 325
Reputation: 110591
This project needs to be reorganized - My suggestion would be to bring everything under a single asyncio-loop (the way it is to be used, actually), including the code to listen for kafka messages.
That would be a 3-4 line function in a continous loop that would post the needed data back in an internal queue. Another async method with a continuous loop could see these messages and launch an asyncio task, that could then do call_in_executor
for the final, non asychronous, code for each payload, if needed.
import asyncio, queue
import other_module
# if this does nothing async, it should not be an async method
def method_C(val):
# Calls playwright to get content from a page which is a blocking call per thread
return "processed:" + val
# your originak method_B serves nothing.
# def method_B(...): ...
def method_A(val, executor):
response = await asyncio.run_in_executor(method_c, val)
return response
###
interval = 0.1
async def main():
executor = ThreadPoolExecutor(max_workers=5)
tqueue = queue.Queue()
aqueue = asyncio.Queue()
responses = []
tasks = set()
try:
while True:
# you might want to put code to fetch more than a payload per cycle here
task = asyncio.create_task(method_A(await aqueue.get(), executor))
tasks.add(task)
done, pending_tasks = asyncio.wait(tasks, tiemout=interval)
for task in done:
# you might want to check the task for Exceptions here.
responses.append(done.result())
# and you might also want to do other things to response instead
# of simply adding them to a list for consuming later.
tasks = pending_tasks
# you might want to add some condition/mechanism to stop the process.
finally:
executor.close()
other_module.stop_kafka_worker(tqueue)
print(responses)
if __name__ == "__main__":
asuncio.run(main())
# ---------------------------------------------------
# kafka related code: "other_module"
import ProcessingScript
import sys
import threading
import queue
interval = 0.05 # wait 0.05 seconds for polling kafka queue again. change as desided.
_sentinel = object()
stop_kafka = False
async def kafkabroker(tqueue, aqueue):
while True:
try:
payload = tqueue.get_nowait()
except queue.Empty:
pass
else:
if payload is _sentinel:
break
await aqueue.put(payload)
await asyncio.sleep(interval)
def kafka_fetcher(tqueue):
# you may want to add some mechanism for shutting down this listenet -
# the "for x in message_topic" you written have to be changed for something else.
for msg in kafka_topic:
if stop_kafka:
break #this will stop, but will only be checked when a new kafka msg arrives
tqueue.put(msg)
def start_kafka_worker(tqueue):
global kafka_worker_thread
kafka_worker_thread = threading.Thread(target=kafka_fetcher(tqueue)
kafka_worker_thread.start()
def stop_kafka_worker(tqueue):
global stop_kafka
stop_kafka = True
tqueue.put(_sentinel)
Upvotes: 1