Reputation: 1
I am trying out FastAPI and Docker, and want to do some parallelization. I use multiprocessing for the parallelization, as it's really simple to pause and resume them if I want to. I made an endpoint that creates a new multiprocessing process, which processes that data and other stuff as well. It runs perfectly fine, does the job. However, at the end it leaves the semaphore files in the container's /dev/shm folder and after a lot of processes the folder fills up, resulting in the program failing as no more processes can be created.
I created 2 files:
import asyncio
from fastapi import FastAPI
from multiprocessing import Queue
from example_script import create_thread
app = FastAPI(
title="Example title",
description="Example description"
)
names = Queue()
@app.on_event("startup")
async def startup_event():
for i in range(15):
names.put(f"process_{i}")
@app.post("/example")
async def example_endpoint(var_1: int):
'''
Creates a process upon calling this endpoint
'''
# unnecessary codes before
# ...
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, lambda: handle_creation_example(var_1))
return
def handle_creation_example(var_1: int):
# unnecessary codes before
# ...
create_thread(var_1, names)
import asyncio
import multiprocessing
def create_thread(var_1, names):
'''
Creates a process for the given work.
'''
# code before determines
# unnecessary codes before, var_2 comes from here
# ...
name = names.get()
process = multiprocessing.Process(target=do_job, args=[var_1, var_2, names, name], name=name, daemon=True)
process.start()
def do_job(var_1, var_2, names, name):
'''
Processes the data and does the job on the process
'''
asyncio.run(process_data(var_1, var_2))
names.put(name)
I created a multiprocessing.Queue on the server containing the possible names for each process, so I can identify them easier. There are codes that check if there are any free process names (aka less than 15 processes), I just stripped the code to the very basics so it's easier to see the way processes are created. This might make you wonder why I create unnecessary looking functions, but it helped me in separating some stuff.
I read about Pool, Manager etc., but as far as I know they can only be created in the if __name__ == "__main__"
block, but I cannot use that with FastAPI. I created the Queue to store the possible names because I didn't find any other ways to share this type of data between the processes.
I think I also read that using join() will solve this issue, but that blocks the endpoint while the process is running, making the whole parallelization part not functional.
What's the best way to handle these left behind semaphore files? I couldn't figure out which ones were still being used so just simply deleting them seems like it's not the way to go. I could increase the default size of /dev/shm from 64mb but that just makes the problem come out later, so that's also not the right direction.
For clarification, I ran ls -la in the server container's /dev/shm folder:
ls -la
total 36
drwxrwxrwt 2 root root 220 Jan 2 13:08 .
drwxr-xr-x 5 root root 340 Jan 2 13:08 ..
-rw------- 1 root root 32 Jan 2 13:08 sem.mp-2osm5fnv
-rw------- 1 root root 32 Jan 2 13:08 sem.mp-7vxm99oo
-rw------- 1 root root 32 Jan 2 13:08 sem.mp-9nekb51r
-rw------- 1 root root 32 Jan 2 13:08 sem.mp-cocmzp9h
-rw------- 1 root root 32 Jan 2 13:08 sem.mp-jn8pjtcg
-rw------- 1 root root 32 Jan 2 13:08 sem.mp-k1hsyy39
-rw------- 1 root root 32 Jan 2 13:08 sem.mp-k3z69zd4
-rw------- 1 root root 32 Jan 2 13:08 sem.mp-oncnkurj
-rw------- 1 root root 32 Jan 2 13:08 sem.mp-r1bvmrsk
These are the files that pile up overtime.
Upvotes: 0
Views: 90
Reputation: 18090
Have a daemon thread join the process.
process.start()
joiner = threading.Thread(target=process.join, daemon=True)
joiner.start()
Daemon threads aren't tracked, they don't leak memory.
You are likely creating a queue per process, and processes are tracked until they are joined (so they get terminated on exit). If they are not joined then the queue is leaked because it is saved as an argument in the leaked process object.
Upvotes: 1