Reputation: 27806
AFAIK this code can be used to lock a directory:
class LockDirectory(object):
def __init__(self, directory):
assert os.path.exists(directory)
self.directory = directory
def __enter__(self):
self.dir_fd = os.open(self.directory, os.O_RDONLY)
try:
fcntl.flock(self.dir_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError as ex:
if ex.errno != errno.EAGAIN:
raise
raise Exception('Somebody else is locking %r - quitting.' % self.directory)
def __exit__(self, exc_type, exc_val, exc_tb):
self.dir_fd.close()
But according to the answers of this question locking a directoy is not possible: Python: Lock a directory
What is wrong with above code?
I only need to support current linux version. No Windows, Mac or other unix.
Upvotes: 5
Views: 12818
Reputation: 334
I created this directory locker to manage directory access from multiple runtimes. I had multiple programs running in different runtimes that needed semaphore like behavior when accessing a directory because one of the programs running was a backup restore service, meaning that the entire directory had to be replaced while simultaneously other programs were reading or editing it.
Note that speed was not the main concern with this implementation as reading from and editing within the directory was not done with very high frequency. If you have very many files in the folder above the directory you wish to lock, this implementation is probably not well suited.
import os
import threading
import time
import re
import uuid
import signal
from enum import Enum
from random import uniform
from typing import Optional
from common.custom_exceptions import DirectoryLockError
class LockUser(Enum):
SLAVE = 0 # Multiple slaves allowed
MASTER = 1 # Locks slave access, only one master at a time
class GlobalDirectoryLock:
"""
Masters always have priority over slaves.
Slaves can access simultaneously while no master lock is active.
Only one master has access at a time.
Can be used across multiple runtimes/threads simultaneously
Do not store the lock or share between threads.
Initialize it when you need one and discard it.
Usage:
with GlobalDirectoryLock(path, lock_type):
# Do stuff
"""
MASTER_LOCKFILE = ".master_lock" # Master lock blocks directory
MASTER_PENDING_LOCKFILE = ".pending_lock" # Marks a pending master
SLAVE_LOCKFILE = ".slave_lock" # Marks directory as in use by non-master
MASTER_REGEX = f"^{MASTER_LOCKFILE}"
MASTER_PENDING_REGEX = f"^{MASTER_PENDING_LOCKFILE}"
SLAVE_REGEX = f"^{SLAVE_LOCKFILE}"
WAIT_TIME = 0.001 # seconds
def __init__(
self,
filesystem_mount_path: str,
lock_type: LockUser = LockUser.SLAVE,
):
"""
:param filesystem_mount_path: The path to the directory locking target
:param lock_type: The type of lock to use (slave or master)
"""
self.__path = filesystem_mount_path
self.__lock_type = lock_type
self.__slave_lock: Optional[str] = None
self.__master_lock: Optional[str] = None
self.__pending_lock: Optional[str] = None
self.__pending_time: Optional[int] = None
self.__pending_uuid: Optional[str] = None
@staticmethod
def cleanup(path):
"""
Clean up any lock files left over after a SIGKILL
It is advisable to run this as a pre-startup job before any other programs start.
"""
for filename in os.listdir(path):
if re.search(GlobalDirectoryLock.MASTER_PENDING_REGEX, filename):
os.remove(f"{path}/{filename}")
elif re.search(GlobalDirectoryLock.SLAVE_REGEX, filename):
os.remove(f"{path}/{filename}")
elif filename == GlobalDirectoryLock.MASTER_REGEX:
os.remove(f"{path}/{filename}")
def __has_master(self) -> bool:
""":returns: True if directory is currently locked by a master"""
for filename in os.listdir(self.__path):
if re.search(GlobalDirectoryLock.MASTER_REGEX, filename):
return True
return False
def __has_slaves(self) -> bool:
""":returns: True if directory currently has active slaves"""
for filename in os.listdir(self.__path):
if re.search(GlobalDirectoryLock.SLAVE_REGEX, filename):
return True
return False
def __has_master_waiting(self) -> bool:
""":returns: True if there are masters waiting for directory access"""
for filename in os.listdir(self.__path):
if re.search(GlobalDirectoryLock.MASTER_PENDING_REGEX, filename):
return True
return False
def __is_next_master(self) -> bool:
""":returns: True if this master is next in line for directory access"""
for filename in os.listdir(self.__path):
if re.search(GlobalDirectoryLock.MASTER_PENDING_REGEX, filename):
f_name, uid, ns_time = filename.split("-")
ns_time = int(ns_time)
# If other pending file was created before this one
if ns_time < self.__pending_time:
return False
# If the times are exactly the same, compare uuid
elif uid != self.__pending_uuid and ns_time == self.__pending_time:
return int(uid, base=16) < int(self.__pending_uuid, base=16)
return True
def __slave_acquire(self):
if self.__slave_lock:
raise DirectoryLockError(
"SlaveLock: Do not reuse directory locks."
)
self.__slave_lock = \
f"{self.__path}/" \
f"{GlobalDirectoryLock.SLAVE_LOCKFILE}-" \
f"{uuid.uuid4().hex}"
while self.__has_master() or self.__has_master_waiting():
time.sleep(GlobalDirectoryLock.WAIT_TIME)
open(self.__slave_lock, 'w').close()
def __master_acquire(self):
if self.__pending_uuid:
raise DirectoryLockError(
"MasterLock: Do not reuse directory locks."
)
self.__pending_time = time.time_ns()
self.__pending_uuid = uuid.uuid4().hex
self.__master_pending = \
f"{self.__path}/" \
f"{GlobalDirectoryLock.MASTER_PENDING_LOCKFILE}-" \
f"{self.__pending_uuid}-" \
f"{self.__pending_time}"
self.__master_lock = \
f"{self.__path}/" \
f"{GlobalDirectoryLock.MASTER_LOCKFILE}-" \
f"{self.__pending_uuid}"
open(self.__master_pending, 'w').close()
while self.__has_master() or not self.__is_next_master():
time.sleep(GlobalDirectoryLock.WAIT_TIME)
# Create master lock before removing pending
# Otherwise slaves might slip through before the master lock file is created
open(self.__master_lock, 'x').close()
os.remove(self.__master_pending)
while self.__has_slaves():
time.sleep(GlobalDirectoryLock.WAIT_TIME)
def __master_release(self):
if self.__master_lock and os.path.exists(self.__master_lock):
os.remove(self.__master_lock)
def __pending_release(self):
if self.__pending_lock and os.path.exists(self.__pending_lock):
os.remove(self.__master_pending)
def __slave_release(self):
if self.__slave_lock and os.path.exists(self.__slave_lock):
os.remove(self.__slave_lock)
def __enter__(self):
if self.__lock_type == LockUser.MASTER:
self.__master_acquire()
else:
self.__slave_acquire()
def __exit__(self, exc_type, exc_val, exc_tb):
if self.__lock_type == LockUser.MASTER:
self.__master_release()
self.__pending_release()
else:
self.__slave_release()
# Always raise errors if any occur
return False
Here's a test I made to run it in multiple threads, I ran it for an hour or so without problems. Maybe someone could come up with a better one.
class GracefulKiller:
def __init__(self):
self.kill_now = False
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
def exit_gracefully(self, *args):
self.kill_now = True
def lock_master(path: str, state: ThreadState):
print("Request master")
try:
with GlobalDirectoryLock(path, LockUser.MASTER):
random_sleep = uniform(0.0, 0.25)
print(f"Master locked: sleep {random_sleep}")
time.sleep(random_sleep)
except Exception as err:
state.failed = True
state.set_error(err)
return
print(f"Master unlocked")
def lock_slave(path: str, state: ThreadState):
print("Request slave")
try:
with GlobalDirectoryLock(path, LockUser.SLAVE):
random_sleep = uniform(0.0, 0.25)
print(f"Slave lock: sleep {random_sleep}")
time.sleep(random_sleep)
except Exception as err:
state.failed = True
state.set_error(err)
print("Slave unlocked")
def while_loop_lock_slave(path: str, state: ThreadState):
while not state.failed:
time.sleep(uniform(0.0, 0.25))
lock_slave(path, state)
def while_loop_lock_master(path: str, state: ThreadState):
while not state.failed:
time.sleep(uniform(1, 2.5))
lock_master(path, state)
def threaded_test(path: str):
slaves = 6
masters = 3
threads = []
state = ThreadState()
for i in range(slaves):
threads.append(
threading.Thread(
target=while_loop_lock_slave,
args=(path, state)
)
)
for i in range(masters):
threads.append(
threading.Thread(
target=while_loop_lock_master,
args=(path, state)
)
)
for thread in threads:
thread.start()
killer = GracefulKiller()
all_done = False
while not all_done:
if killer.kill_now:
state.failed = True
all_done = True
for thread in threads:
if not thread.is_alive():
thread.join()
else:
all_done = False
err = state.get_error()
if err:
raise err
if __name__ == '__main__':
# Path to some directory that you want to lock
dir_path = "../localstorage"
# Probably best to do cleanup in a dedicated startup job
GlobalDirectoryLock.cleanup(dir_path)
# Run test
threaded_test(dir_path)
Upvotes: 0
Reputation: 3027
I change your code a bit,add return self
like most context manage do,then with dup()
,the second context manage will fail.and the solution is simple,uncommentfcntl.flock(self.dir_fd,fcntl.LOCK_UN)
The mode used to open the file doesn't matter to flock.
and you cannot flock on NFS.
import os
import fcntl
import time
class LockDirectory(object):
def __init__(self, directory):
assert os.path.exists(directory)
self.directory = directory
def __enter__(self):
self.dir_fd = os.open(self.directory, os.O_RDONLY)
try:
fcntl.flock(self.dir_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError as ex:
raise Exception('Somebody else is locking %r - quitting.' % self.directory)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# fcntl.flock(self.dir_fd,fcntl.LOCK_UN)
os.close(self.dir_fd)
def main():
with LockDirectory("test") as lock:
newfd = os.dup(lock.dir_fd)
with LockDirectory("test") as lock2:
pass
if __name__ == '__main__':
main()
Upvotes: 5
Reputation: 27806
I found an answer here: Python: Lock directory
It is possible to lock a directory with this:
fcntl.flock(self.dir_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
Of course this is a lock which every code which plays in this game need to check first.
AFAIK this is called "advisory lock".
Upvotes: 0
Reputation: 1121774
If all you need is a read lock, then there is only a minor error in the code you have. It is perfectly feasible to get a read lock on a directory.
You'll need to alter your __exit__
function to use os.close()
to close the file descriptor; a file descriptor is just an integer, and integers have no .close()
method:
def __exit__(self, exc_type, exc_val, exc_tb):
os.close(self.dir_fd)
The usual confusion for people that think you can't, are those that have tried with the open()
function. Python won't let you open a directory node with that function because there is no point in creating a Python file object for a directory. Or perhaps there is an assumption that you wanted the OS to enforce access to the directory via the lock (as opposed to an advisory lock that a cooperative set of processes agree to obtain first before attempting access).
So no, there is nothing wrong with the code if all you want is an advisory lock, and are fine with this only working on Linux.
I'd drop the directory
distinction from the code. The lock will work on any path that you have read access to. It is not exclusive to directories.
The downside of locking the directory is that this doesn't give you a place to store lock metadata. While lsof
can give you the PID of the current owner of the lock, you may want to communicate some other information with the lock to help troubleshoot or automate lock breaking. A .lock
file or symlink would let you record additional information. For example, Mercurial will create a symlink with the hostname, the PID namespace identifier (Linux only) and the PID in the target name; you can create such a symlink atomically, while writing that data to a file would require creating a file under a temp name followed by a rename.
Upvotes: 2
Reputation: 1323
I would suggest you go with a simple lock file. As the question in the comment (How to lock a directory between python processes in linux?) suggests, there is no locking mechanism for directories, as opposed to files.
Lock files are used left and right on Linux, they are very transparent and easy to debug, so I would just go with that.
I am waiting to be challenged on this however!
Upvotes: -1