File size: 4,871 Bytes
079c32c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import os
import multiprocessing
import threading
import platform
from enum import Enum, unique

from pathlib import Path
if platform.system().lower() != 'windows':
    import fcntl
else:
    fcntl = None


@unique
class LockContextType(Enum):
    """
    Overview:
        Enum to express the type of the lock.
    """
    THREAD_LOCK = 1
    PROCESS_LOCK = 2


_LOCK_TYPE_MAPPING = {
    LockContextType.THREAD_LOCK: threading.Lock,
    LockContextType.PROCESS_LOCK: multiprocessing.Lock,
}


class LockContext(object):
    """
    Overview:
        Generate a LockContext in order to make sure the thread safety.

    Interfaces:
        ``__init__``, ``__enter__``, ``__exit__``.

    Example:
        >>> with LockContext() as lock:
        >>>     print("Do something here.")
    """

    def __init__(self, type_: LockContextType = LockContextType.THREAD_LOCK):
        """
        Overview:
            Init the lock according to the given type.

        Arguments:
           - type_ (:obj:`LockContextType`): The type of lock to be used. Defaults to LockContextType.THREAD_LOCK.
        """
        self.lock = _LOCK_TYPE_MAPPING[type_]()

    def acquire(self):
        """
        Overview:
            Acquires the lock.
        """
        self.lock.acquire()

    def release(self):
        """
        Overview:
            Releases the lock.
        """
        self.lock.release()

    def __enter__(self):
        """
        Overview:
            Enters the context and acquires the lock.
        """
        self.lock.acquire()

    def __exit__(self, *args, **kwargs):
        """
        Overview:
            Exits the context and releases the lock.
        Arguments:
            - args (:obj:`Tuple`): The arguments passed to the ``__exit__`` function.
            - kwargs (:obj:`Dict`): The keyword arguments passed to the ``__exit__`` function.
        """
        self.lock.release()


rw_lock_mapping = {}


def get_rw_file_lock(name: str, op: str):
    """
    Overview:
        Get generated file lock with name and operator
    Arguments:
        - name (:obj:`str`): Lock's name.
        - op (:obj:`str`): Assigned operator, i.e. ``read`` or ``write``.
    Returns:
        - (:obj:`RWLockFairD`): Generated rwlock
    """
    assert op in ['read', 'write']
    try:
        from readerwriterlock import rwlock
    except ImportError:
        import sys
        from ditk import logging
        logging.warning("Please install readerwriterlock first, such as `pip3 install readerwriterlock`.")
        sys.exit(1)
    if name not in rw_lock_mapping:
        rw_lock_mapping[name] = rwlock.RWLockFairD()
    lock = rw_lock_mapping[name]
    if op == 'read':
        return lock.gen_rlock()
    elif op == 'write':
        return lock.gen_wlock()


class FcntlContext:
    """
    Overview:
        A context manager that acquires an exclusive lock on a file using fcntl. \
        This is useful for preventing multiple processes from running the same code.

    Interfaces:
        ``__init__``, ``__enter__``, ``__exit__``.

    Example:
        >>> lock_path = "/path/to/lock/file"
        >>> with FcntlContext(lock_path) as lock:
        >>>    # Perform operations while the lock is held

    """

    def __init__(self, lock_path: str) -> None:
        """
        Overview:
            Initialize the LockHelper object.

        Arguments:
            - lock_path (:obj:`str`): The path to the lock file.
        """
        self.lock_path = lock_path
        self.f = None

    def __enter__(self) -> None:
        """
        Overview:
            Acquires the lock and opens the lock file in write mode. \
            If the lock file does not exist, it is created.
        """
        assert self.f is None, self.lock_path
        self.f = open(self.lock_path, 'w')
        fcntl.flock(self.f.fileno(), fcntl.LOCK_EX)

    def __exit__(self, *args, **kwargs) -> None:
        """
        Overview:
            Closes the file and releases any resources used by the lock_helper object.
        Arguments:
            - args (:obj:`Tuple`): The arguments passed to the ``__exit__`` function.
            - kwargs (:obj:`Dict`): The keyword arguments passed to the ``__exit__`` function.
        """
        self.f.close()
        self.f = None


def get_file_lock(name: str, op: str) -> FcntlContext:
    """
    Overview:
        Acquires a file lock for the specified file. \

    Arguments:
        - name (:obj:`str`): The name of the file.
        - op (:obj:`str`): The operation to perform on the file lock.
    """
    if fcntl is None:
        return get_rw_file_lock(name, op)
    else:
        lock_name = name + '.lock'
        if not os.path.isfile(lock_name):
            try:
                Path(lock_name).touch()
            except Exception as e:
                pass
        return FcntlContext(lock_name)