Bob Fang
Bob Fang

Reputation: 7381

How to synchronize between multiple async processes in Python?

I have an async http webs service using fastapi. I am running multiple instances of the same service on the server on a different port and I have an nginx server in front so I can utilise them all. I have a particular resource I need to protect that only one client is accessing it.

@app.get("/do_something")
async def do_something():
     critical_section_here()

I tried to protect this critical section using a file lock like this:

@app.get("/do_something")
async def do_something():
    with FileLock("dosomething.lock"):
        critical_section()

This will prevent multiple processes to enter the critical section at the same time. But what I found is that this will actually dead lock. Think about the following event:

  1. client 1 connected to port 8000 and enter the critical section
  2. while client 1 is still using the resource client 2 is routed to the same port 8000 and then it will try to acquire the file lock, it cannot, so it will keep trying and this will block the execution of client 1 and client 1 will never be able to release the filelock and this means not only this process is locked every other server instance will be locked as well.

Is there a way for me to coordinate these processes so that only one of them access the critical section? I thought about adding a timeout to the filelock but I really don't want to reject user, I just want to wait until it's his/her turn to enter the critical section.

Upvotes: 5

Views: 2421

Answers (2)

alex_noname
alex_noname

Reputation: 32163

You could use aioredlock.

It allows you to create distributed locks between workers (processes). For more information about its usage, follow the link above.

The redlock algorithm is a distributed lock implementation for Redis. There are many implementations of it in several languages. In this case, this is the asyncio compatible implementation for python 3.5+.

Example of usage:

# Or you can use the lock as async context manager:
try:
    async with await lock_manager.lock("resource_name") as lock:
        assert lock.valid is True
        # Do your stuff having the lock
        await lock.extend()  # alias for lock_manager.extend(lock)
        # Do more stuff having the lock
    assert lock.valid is False # lock will be released by context manager
except LockError:
    print('Lock not acquired')
    raise

Upvotes: 0

HTF
HTF

Reputation: 7270

You can try something like this:

import fcntl

from contextlib import asynccontextmanager

from fastapi import FastAPI

app = FastAPI()


def acquire_lock():
    f = open("/tmp/test.lock", "w")
    fcntl.flock(f, fcntl.LOCK_EX)
    return f


@asynccontextmanager
async def lock():
    loop = asyncio.get_running_loop()
    f = await loop.run_in_executor(None, acquire_lock)
    try:
        yield
    finally:
        f.close()


@app.get("/test/")
async def test():
    async with lock():
        print("Enter critical section")
        await asyncio.sleep(5)
        print("End critical section")

It will basically serialize all your requests.

Upvotes: 5

Related Questions