import os import multiprocessing import threading import platform from enum import Enum, unique from pathlib import Path if platform.system().lower() != 'windows': import fcntl else: fcntl = None @unique class LockContextType(Enum): """ Overview: Enum to express the type of the lock. """ THREAD_LOCK = 1 PROCESS_LOCK = 2 _LOCK_TYPE_MAPPING = { LockContextType.THREAD_LOCK: threading.Lock, LockContextType.PROCESS_LOCK: multiprocessing.Lock, } class LockContext(object): """ Overview: Generate a LockContext in order to make sure the thread safety. Interfaces: ``__init__``, ``__enter__``, ``__exit__``. Example: >>> with LockContext() as lock: >>> print("Do something here.") """ def __init__(self, type_: LockContextType = LockContextType.THREAD_LOCK): """ Overview: Init the lock according to the given type. Arguments: - type_ (:obj:`LockContextType`): The type of lock to be used. Defaults to LockContextType.THREAD_LOCK. """ self.lock = _LOCK_TYPE_MAPPING[type_]() def acquire(self): """ Overview: Acquires the lock. """ self.lock.acquire() def release(self): """ Overview: Releases the lock. """ self.lock.release() def __enter__(self): """ Overview: Enters the context and acquires the lock. """ self.lock.acquire() def __exit__(self, *args, **kwargs): """ Overview: Exits the context and releases the lock. Arguments: - args (:obj:`Tuple`): The arguments passed to the ``__exit__`` function. - kwargs (:obj:`Dict`): The keyword arguments passed to the ``__exit__`` function. """ self.lock.release() rw_lock_mapping = {} def get_rw_file_lock(name: str, op: str): """ Overview: Get generated file lock with name and operator Arguments: - name (:obj:`str`): Lock's name. - op (:obj:`str`): Assigned operator, i.e. ``read`` or ``write``. Returns: - (:obj:`RWLockFairD`): Generated rwlock """ assert op in ['read', 'write'] try: from readerwriterlock import rwlock except ImportError: import sys from ditk import logging logging.warning("Please install readerwriterlock first, such as `pip3 install readerwriterlock`.") sys.exit(1) if name not in rw_lock_mapping: rw_lock_mapping[name] = rwlock.RWLockFairD() lock = rw_lock_mapping[name] if op == 'read': return lock.gen_rlock() elif op == 'write': return lock.gen_wlock() class FcntlContext: """ Overview: A context manager that acquires an exclusive lock on a file using fcntl. \ This is useful for preventing multiple processes from running the same code. Interfaces: ``__init__``, ``__enter__``, ``__exit__``. Example: >>> lock_path = "/path/to/lock/file" >>> with FcntlContext(lock_path) as lock: >>> # Perform operations while the lock is held """ def __init__(self, lock_path: str) -> None: """ Overview: Initialize the LockHelper object. Arguments: - lock_path (:obj:`str`): The path to the lock file. """ self.lock_path = lock_path self.f = None def __enter__(self) -> None: """ Overview: Acquires the lock and opens the lock file in write mode. \ If the lock file does not exist, it is created. """ assert self.f is None, self.lock_path self.f = open(self.lock_path, 'w') fcntl.flock(self.f.fileno(), fcntl.LOCK_EX) def __exit__(self, *args, **kwargs) -> None: """ Overview: Closes the file and releases any resources used by the lock_helper object. Arguments: - args (:obj:`Tuple`): The arguments passed to the ``__exit__`` function. - kwargs (:obj:`Dict`): The keyword arguments passed to the ``__exit__`` function. """ self.f.close() self.f = None def get_file_lock(name: str, op: str) -> FcntlContext: """ Overview: Acquires a file lock for the specified file. \ Arguments: - name (:obj:`str`): The name of the file. - op (:obj:`str`): The operation to perform on the file lock. """ if fcntl is None: return get_rw_file_lock(name, op) else: lock_name = name + '.lock' if not os.path.isfile(lock_name): try: Path(lock_name).touch() except Exception as e: pass return FcntlContext(lock_name)