|
import os |
|
import multiprocessing |
|
import threading |
|
import platform |
|
from enum import Enum, unique |
|
|
|
from pathlib import Path |
|
if platform.system().lower() != 'windows': |
|
import fcntl |
|
else: |
|
fcntl = None |
|
|
|
|
|
@unique |
|
class LockContextType(Enum): |
|
""" |
|
Overview: |
|
Enum to express the type of the lock. |
|
""" |
|
THREAD_LOCK = 1 |
|
PROCESS_LOCK = 2 |
|
|
|
|
|
_LOCK_TYPE_MAPPING = { |
|
LockContextType.THREAD_LOCK: threading.Lock, |
|
LockContextType.PROCESS_LOCK: multiprocessing.Lock, |
|
} |
|
|
|
|
|
class LockContext(object): |
|
""" |
|
Overview: |
|
Generate a LockContext in order to make sure the thread safety. |
|
|
|
Interfaces: |
|
``__init__``, ``__enter__``, ``__exit__``. |
|
|
|
Example: |
|
>>> with LockContext() as lock: |
|
>>> print("Do something here.") |
|
""" |
|
|
|
def __init__(self, type_: LockContextType = LockContextType.THREAD_LOCK): |
|
""" |
|
Overview: |
|
Init the lock according to the given type. |
|
|
|
Arguments: |
|
- type_ (:obj:`LockContextType`): The type of lock to be used. Defaults to LockContextType.THREAD_LOCK. |
|
""" |
|
self.lock = _LOCK_TYPE_MAPPING[type_]() |
|
|
|
def acquire(self): |
|
""" |
|
Overview: |
|
Acquires the lock. |
|
""" |
|
self.lock.acquire() |
|
|
|
def release(self): |
|
""" |
|
Overview: |
|
Releases the lock. |
|
""" |
|
self.lock.release() |
|
|
|
def __enter__(self): |
|
""" |
|
Overview: |
|
Enters the context and acquires the lock. |
|
""" |
|
self.lock.acquire() |
|
|
|
def __exit__(self, *args, **kwargs): |
|
""" |
|
Overview: |
|
Exits the context and releases the lock. |
|
Arguments: |
|
- args (:obj:`Tuple`): The arguments passed to the ``__exit__`` function. |
|
- kwargs (:obj:`Dict`): The keyword arguments passed to the ``__exit__`` function. |
|
""" |
|
self.lock.release() |
|
|
|
|
|
rw_lock_mapping = {} |
|
|
|
|
|
def get_rw_file_lock(name: str, op: str): |
|
""" |
|
Overview: |
|
Get generated file lock with name and operator |
|
Arguments: |
|
- name (:obj:`str`): Lock's name. |
|
- op (:obj:`str`): Assigned operator, i.e. ``read`` or ``write``. |
|
Returns: |
|
- (:obj:`RWLockFairD`): Generated rwlock |
|
""" |
|
assert op in ['read', 'write'] |
|
try: |
|
from readerwriterlock import rwlock |
|
except ImportError: |
|
import sys |
|
from ditk import logging |
|
logging.warning("Please install readerwriterlock first, such as `pip3 install readerwriterlock`.") |
|
sys.exit(1) |
|
if name not in rw_lock_mapping: |
|
rw_lock_mapping[name] = rwlock.RWLockFairD() |
|
lock = rw_lock_mapping[name] |
|
if op == 'read': |
|
return lock.gen_rlock() |
|
elif op == 'write': |
|
return lock.gen_wlock() |
|
|
|
|
|
class FcntlContext: |
|
""" |
|
Overview: |
|
A context manager that acquires an exclusive lock on a file using fcntl. \ |
|
This is useful for preventing multiple processes from running the same code. |
|
|
|
Interfaces: |
|
``__init__``, ``__enter__``, ``__exit__``. |
|
|
|
Example: |
|
>>> lock_path = "/path/to/lock/file" |
|
>>> with FcntlContext(lock_path) as lock: |
|
>>> # Perform operations while the lock is held |
|
|
|
""" |
|
|
|
def __init__(self, lock_path: str) -> None: |
|
""" |
|
Overview: |
|
Initialize the LockHelper object. |
|
|
|
Arguments: |
|
- lock_path (:obj:`str`): The path to the lock file. |
|
""" |
|
self.lock_path = lock_path |
|
self.f = None |
|
|
|
def __enter__(self) -> None: |
|
""" |
|
Overview: |
|
Acquires the lock and opens the lock file in write mode. \ |
|
If the lock file does not exist, it is created. |
|
""" |
|
assert self.f is None, self.lock_path |
|
self.f = open(self.lock_path, 'w') |
|
fcntl.flock(self.f.fileno(), fcntl.LOCK_EX) |
|
|
|
def __exit__(self, *args, **kwargs) -> None: |
|
""" |
|
Overview: |
|
Closes the file and releases any resources used by the lock_helper object. |
|
Arguments: |
|
- args (:obj:`Tuple`): The arguments passed to the ``__exit__`` function. |
|
- kwargs (:obj:`Dict`): The keyword arguments passed to the ``__exit__`` function. |
|
""" |
|
self.f.close() |
|
self.f = None |
|
|
|
|
|
def get_file_lock(name: str, op: str) -> FcntlContext: |
|
""" |
|
Overview: |
|
Acquires a file lock for the specified file. \ |
|
|
|
Arguments: |
|
- name (:obj:`str`): The name of the file. |
|
- op (:obj:`str`): The operation to perform on the file lock. |
|
""" |
|
if fcntl is None: |
|
return get_rw_file_lock(name, op) |
|
else: |
|
lock_name = name + '.lock' |
|
if not os.path.isfile(lock_name): |
|
try: |
|
Path(lock_name).touch() |
|
except Exception as e: |
|
pass |
|
return FcntlContext(lock_name) |
|
|