mang4521
mang4521

Reputation: 802

Handle continuous parallel requests without blocking in Asyncio

I am new to python asyncio and have a slightly convoluted requirement. I have been going through the documentation for asyncio but have not found the right solution yet as I am unable to understand some aspects.

I have a script KafkaScript.py that reads from a kafka topic (streaming) and calls an API endpoint. The API will call a method method_A in another script ProcessingScript.py in parallel for each incoming request/kafka record.

method_A calls an asynchronous method_B which uses a globally defined ThreadPoolExecutor to call another asynchronous method_C using run_in_executor(). method_C return a string back to method_B which in turn must respond back to method_A.

Here is the script:

KafkaScript.py:

import requests

if __name__ == '__main__':
    for i in Kafka-Topic:
        # Use a new thread to call make API request for each kafka record
        response = requests.get(<url generated>) # i is passed as param

The API:

import ProcessingScript

@app.route('/v1/generate', methods = ['GET'])
def generate():
    data = request.args.get('data', type = str)
    response = ProcessingScript.method_A(data)
    return response
        

ProcessingScript.py:

executor = ThreadPoolExecutor(max_workers=5)

async def method_C(val):
        # Calls playwright to get content from a page which is a blocking call per thread
        return "processed:" + val


async def method_B(val):
        loop = asyncio.get_event_loop()
        # For each parallel thread, call method_C, get the response without blocking other threads
        response = loop.run_in_executor(self.executor, asyncio.run, method_C(val))
        return response


def method_A(val):
        response = asyncio.run(method_B(val))
        return response

As I understand, method_B can receive the string from method_C only when I make a call such as this.

response = await loop.run_in_executor(self.executor, asyncio.run, method_C(val))

The problem with the await here is, although multiple threads are calling method_A, the await makes all threads wait for the first thread to have a response.

How do I ensure this in a thread safe manner where none of the parallel requests to method_A are blocked by others threads waiting for the response?

Upvotes: 0

Views: 325

Answers (1)

jsbueno
jsbueno

Reputation: 110591

This project needs to be reorganized - My suggestion would be to bring everything under a single asyncio-loop (the way it is to be used, actually), including the code to listen for kafka messages.

That would be a 3-4 line function in a continous loop that would post the needed data back in an internal queue. Another async method with a continuous loop could see these messages and launch an asyncio task, that could then do call_in_executor for the final, non asychronous, code for each payload, if needed.

import asyncio, queue
import other_module

# if this does nothing async, it should not be an async method
def method_C(val):
    # Calls playwright to get content from a page which is a blocking call per thread
    return "processed:" + val

# your originak method_B serves nothing. 
# def method_B(...): ... 

def method_A(val, executor):
    response = await asyncio.run_in_executor(method_c, val)
    return response
    
###

interval = 0.1 

async def main():
    executor = ThreadPoolExecutor(max_workers=5)
    tqueue = queue.Queue()
    aqueue = asyncio.Queue()
    responses = []
    tasks = set()
    try:
        while True:
            # you might want to put code to fetch more than a payload per cycle here
            task = asyncio.create_task(method_A(await aqueue.get(), executor))
            tasks.add(task)
            done, pending_tasks = asyncio.wait(tasks, tiemout=interval)
            
            for task in done:
                # you might want to check the task for Exceptions here.
                responses.append(done.result())
                # and you might also want to do other things to response instead
                # of simply adding them to  a list for consuming later.
            
            tasks = pending_tasks
            
            # you might want to add some condition/mechanism to stop the process. 
    finally:
        executor.close()
        other_module.stop_kafka_worker(tqueue)
    print(responses)
    

if __name__ == "__main__":
    asuncio.run(main())

# ---------------------------------------------------
# kafka related code: "other_module"
    
    

import ProcessingScript
import sys
import threading

import queue

interval = 0.05  # wait 0.05 seconds for polling kafka queue again. change as desided.
_sentinel = object()
stop_kafka = False

async def kafkabroker(tqueue, aqueue):
    while True:
        try:
            payload = tqueue.get_nowait()
        except queue.Empty:
            pass
        else:
            if payload is _sentinel:
                break
            await aqueue.put(payload)
        await asyncio.sleep(interval)
        
def kafka_fetcher(tqueue):
    # you may want to add some mechanism for shutting down this listenet - 
    # the "for x in message_topic" you written have to be changed for something else.
    for msg in kafka_topic:
        if stop_kafka:
            break  #this will stop, but will only be checked when a new kafka msg arrives
        tqueue.put(msg)

def start_kafka_worker(tqueue):
    global kafka_worker_thread
    kafka_worker_thread = threading.Thread(target=kafka_fetcher(tqueue)
    kafka_worker_thread.start()
    

def stop_kafka_worker(tqueue):
    global stop_kafka
    stop_kafka =  True
    tqueue.put(_sentinel)



Upvotes: 1

Related Questions