Reputation: 1106
I have an API setup written in FastAPI and I am hitting 1 endpoint using multiprocessing via 5 processes. Here is the reproducible setup:
import uvicorn
from fastapi import FastAPI
import PIL.Image as Image
import requests
from loguru import logger
import sys
log_format = "{level} {process}-{thread} {time} {name}:{line} - {message}"
logger.remove()
logger.add(sys.stderr, format=log_format, backtrace=True, diagnose=True)
logger.add("logs/" + "t_{time}.log", format=log_format, colorize=True, backtrace=True, diagnose=True)
Image.MAX_IMAGE_PIXELS = None
def get_the_image_from_net():
a = requests.get("https://eoimages.gsfc.nasa.gov/images/imagerecords/73000/73751/world.topo.bathy.200407.3x5400x2700.png", verify=False)
return True
app = FastAPI()
@app.get("/expectoPatronum")
async def get_image_of_the_new_world():
"""
Gets Image of the World
"""
logger.info("Received request for getting Image of the world")
image_bytes: bytes = get_the_image_from_net()
logger.info("Image has been downloaded and processed as bytes array")
return True
if __name__ == '__main__':
uvicorn.run("image_downloader_api:app", host="0.0.0.0", port=10009)
import time
from multiprocessing import Pool, Manager, Process
import requests
from loguru import logger
import sys
log_format = "{level} {process}-{thread} {time} {name}:{line} - {message}"
logger.remove()
logger.add(sys.stderr, format=log_format, backtrace=True, diagnose=True, enqueue=True)
logger.add("logs/" + "t_{time}.log", format=log_format, colorize=True, backtrace=True, diagnose=True, enqueue=True)
manager = Manager()
process_dictionary = manager.dict()
process_completion_race_list = manager.list()
def test_function(index_iterator: int):
logger.info(f"STARTING REQUEST FOR {index_iterator}")
response = requests.get("http://localhost:10009/expectoPatronum")
logger.info(f"ENDING REQUEST FOR {index_iterator}")
p1 = Process(target=test_function, args=(1,))
p2 = Process(target=test_function, args=(2,))
p3 = Process(target=test_function, args=(3,))
p4 = Process(target=test_function, args=(4,))
p5 = Process(target=test_function, args=(5,))
p1.start()
p2.start()
p3.start()
p4.start()
p5.start()
p1.join()
p2.join()
p3.join()
p4.join()
p5.join()
pool = Pool(5)
pool.map(test_function, [i for i in range(1,6)])
pool.close()
However, in all cases the problem is the same:
Expectation: Since it is a SYNC API, the first request should be handled by API, return its response, then move on to handling 2nd request, return its response, and so on. Basically, everything should happen sequentially.
Observation and Problem: Only the first request's response takes some x
amount of time to return a response, all other requests return responses at the same time:
INFO 13607-140018755471168 2022-07-13T10:23:16.958178+0530 __main__:20 - STARTING REQUEST FOR 1
INFO 13608-140018755471168 2022-07-13T10:23:16.962500+0530 __main__:20 - STARTING REQUEST FOR 2
INFO 13609-140018755471168 2022-07-13T10:23:16.966071+0530 __main__:20 - STARTING REQUEST FOR 3
INFO 13613-140018755471168 2022-07-13T10:23:16.966112+0530 __main__:20 - STARTING REQUEST FOR 5
INFO 13611-140018755471168 2022-07-13T10:23:16.966230+0530 __main__:20 - STARTING REQUEST FOR 4
INFO 13607-140018755471168 2022-07-13T10:23:31.631877+0530 __main__:26 - ENDING REQUEST FOR 1
INFO 13613-140018755471168 2022-07-13T10:24:24.807898+0530 __main__:26 - ENDING REQUEST FOR 5
INFO 13608-140018755471168 2022-07-13T10:24:24.807911+0530 __main__:26 - ENDING REQUEST FOR 2
INFO 13611-140018755471168 2022-07-13T10:24:24.808056+0530 __main__:26 - ENDING REQUEST FOR 4
INFO 13609-140018755471168 2022-07-13T10:24:24.808055+0530 __main__:26 - ENDING REQUEST FOR 3
As you can see ENDING REQUEST FOR 1
is logged at 10:23:31
which is normal, as 1 request takes about 7-10 seconds. However, all other responses are returned at roughly the same time of 10:24:24
. How is this even possible? Ideally, each subsequent request should have taken 7-10 seconds more.
INFO 12950-140191386634048 2022-07-13T10:23:16.966509+0530 image_downloader_api:28 - Received request for getting Image of the world
INFO 12950-140191386634048 2022-07-13T10:23:31.630237+0530 image_downloader_api:30 - Image has been downloaded and processed as bytes array
INFO 12950-140191386634048 2022-07-13T10:23:31.631287+0530 image_downloader_api:28 - Received request for getting Image of the world
INFO 12950-140191386634048 2022-07-13T10:23:53.571931+0530 image_downloader_api:30 - Image has been downloaded and processed as bytes array
INFO 12950-140191386634048 2022-07-13T10:23:53.572622+0530 image_downloader_api:28 - Received request for getting Image of the world
INFO 12950-140191386634048 2022-07-13T10:24:05.753152+0530 image_downloader_api:30 - Image has been downloaded and processed as bytes array
INFO 12950-140191386634048 2022-07-13T10:24:05.753887+0530 image_downloader_api:28 - Received request for getting Image of the world
INFO 12950-140191386634048 2022-07-13T10:24:15.155202+0530 image_downloader_api:30 - Image has been downloaded and processed as bytes array
INFO 12950-140191386634048 2022-07-13T10:24:15.155954+0530 image_downloader_api:28 - Received request for getting Image of the world
INFO 12950-140191386634048 2022-07-13T10:24:24.806016+0530 image_downloader_api:30 - Image has been downloaded and processed as bytes array
These logs prove, that yes requests were being handled one-by-one and only the last request took time till 10:24:24
.
Upvotes: 0
Views: 241
Reputation: 3797
Adding an answer since comment space is limited
Try running this code:
import time
import multiprocessing
from multiprocessing import Pool, Manager, Process
import requests
from loguru import logger
import sys
log_format = "{level} {process}-{thread} {time} {name}:{line} - {message}"
logger.remove()
logger.add(sys.stderr, format=log_format, backtrace=True, diagnose=True, enqueue=True)
logger.add("logs/" + "t_{time}.log", format=log_format, colorize=True, backtrace=True, diagnose=True, enqueue=True)
def test_function(index_iterator: int):
logger.info(f"STARTING REQUEST FOR {index_iterator}")
response = requests.get("http://localhost:10009/expectoPatronum")
logger.info(f"ENDING REQUEST FOR {index_iterator}")
if __name__ == "__main__":
multiprocessing.set_start_method("spawn", force=True)
p1 = Process(target=test_function, args=(1,))
p2 = Process(target=test_function, args=(2,))
p3 = Process(target=test_function, args=(3,))
p4 = Process(target=test_function, args=(4,))
p5 = Process(target=test_function, args=(5,))
p1.start()
p2.start()
p3.start()
p4.start()
p5.start()
p1.join()
p2.join()
p3.join()
p4.join()
p5.join()
If this runs with the expected output, then you can be sure it's using the start method fork
that's causing this. Additionally, I am not familiar with loguru
and it might be possible that somehow the logs are messing up rather than the actual code. Try using a queue (or just generic print statements) to "log" instead, just to make sure it's not loguru
that's causing this.
My output
INFO 92324-54540 2022-07-14T13:16:52.969779+0530 __mp_main__:15 - STARTING REQUEST FOR 1
INFO 98628-30892 2022-07-14T13:16:52.969779+0530 __mp_main__:15 - STARTING REQUEST FOR 3
INFO 95024-78576 2022-07-14T13:16:52.981287+0530 __mp_main__:15 - STARTING REQUEST FOR 2
INFO 38264-96180 2022-07-14T13:16:52.996923+0530 __mp_main__:15 - STARTING REQUEST FOR 4
INFO 93336-112648 2022-07-14T13:16:52.996923+0530 __mp_main__:15 - STARTING REQUEST FOR 5
INFO 98628-30892 2022-07-14T13:17:20.219388+0530 __mp_main__:17 - ENDING REQUEST FOR 3
INFO 92324-54540 2022-07-14T13:17:34.634835+0530 __mp_main__:17 - ENDING REQUEST FOR 1
INFO 95024-78576 2022-07-14T13:17:46.080381+0530 __mp_main__:17 - ENDING REQUEST FOR 2
INFO 38264-96180 2022-07-14T13:18:05.049615+0530 __mp_main__:17 - ENDING REQUEST FOR 4
INFO 93336-112648 2022-07-14T13:18:14.804244+0530 __mp_main__:17 - ENDING REQUEST FOR 5
Upvotes: 0