1"""An asyncio-based implementation of the file lock."""
2
3from __future__ import annotations
4
5import asyncio
6import contextlib
7import logging
8import os
9import time
10from dataclasses import dataclass
11from inspect import iscoroutinefunction
12from threading import local
13from typing import TYPE_CHECKING, Any, NoReturn, cast
14
15from ._api import BaseFileLock, FileLockContext, FileLockMeta
16from ._error import Timeout
17from ._soft import SoftFileLock
18from ._unix import UnixFileLock
19from ._windows import WindowsFileLock
20
21if TYPE_CHECKING:
22 import sys
23 from collections.abc import Callable
24 from concurrent import futures
25 from types import TracebackType
26
27 if sys.version_info >= (3, 11): # pragma: no cover (py311+)
28 from typing import Self
29 else: # pragma: no cover (<py311)
30 from typing_extensions import Self
31
32
33_LOGGER = logging.getLogger("filelock")
34
35
36@dataclass
37class AsyncFileLockContext(FileLockContext):
38 """A dataclass which holds the context for a ``BaseAsyncFileLock`` object."""
39
40 #: Whether run in executor
41 run_in_executor: bool = True
42
43 #: The executor
44 executor: futures.Executor | None = None
45
46 #: The loop
47 loop: asyncio.AbstractEventLoop | None = None
48
49
50class AsyncThreadLocalFileContext(AsyncFileLockContext, local):
51 """A thread local version of the ``FileLockContext`` class."""
52
53
54class AsyncAcquireReturnProxy:
55 """A context-aware object that will release the lock file when exiting."""
56
57 def __init__(self, lock: BaseAsyncFileLock) -> None: # noqa: D107
58 self.lock = lock
59
60 async def __aenter__(self) -> BaseAsyncFileLock: # noqa: D105
61 return self.lock
62
63 async def __aexit__( # noqa: D105
64 self,
65 exc_type: type[BaseException] | None,
66 exc_value: BaseException | None,
67 traceback: TracebackType | None,
68 ) -> None:
69 await self.lock.release()
70
71
72class AsyncFileLockMeta(FileLockMeta):
73 def __call__( # type: ignore[override] # noqa: PLR0913
74 cls, # noqa: N805
75 lock_file: str | os.PathLike[str],
76 timeout: float = -1,
77 mode: int = 0o644,
78 thread_local: bool = False, # noqa: FBT001, FBT002
79 *,
80 blocking: bool = True,
81 is_singleton: bool = False,
82 loop: asyncio.AbstractEventLoop | None = None,
83 run_in_executor: bool = True,
84 executor: futures.Executor | None = None,
85 ) -> BaseAsyncFileLock:
86 if thread_local and run_in_executor:
87 msg = "run_in_executor is not supported when thread_local is True"
88 raise ValueError(msg)
89 instance = super().__call__(
90 lock_file=lock_file,
91 timeout=timeout,
92 mode=mode,
93 thread_local=thread_local,
94 blocking=blocking,
95 is_singleton=is_singleton,
96 loop=loop,
97 run_in_executor=run_in_executor,
98 executor=executor,
99 )
100 return cast("BaseAsyncFileLock", instance)
101
102
103class BaseAsyncFileLock(BaseFileLock, metaclass=AsyncFileLockMeta):
104 """Base class for asynchronous file locks."""
105
106 def __init__( # noqa: PLR0913
107 self,
108 lock_file: str | os.PathLike[str],
109 timeout: float = -1,
110 mode: int = 0o644,
111 thread_local: bool = False, # noqa: FBT001, FBT002
112 *,
113 blocking: bool = True,
114 is_singleton: bool = False,
115 loop: asyncio.AbstractEventLoop | None = None,
116 run_in_executor: bool = True,
117 executor: futures.Executor | None = None,
118 ) -> None:
119 """
120 Create a new lock object.
121
122 :param lock_file: path to the file
123 :param timeout: default timeout when acquiring the lock, in seconds. It will be used as fallback value in \
124 the acquire method, if no timeout value (``None``) is given. If you want to disable the timeout, set it \
125 to a negative value. A timeout of 0 means that there is exactly one attempt to acquire the file lock.
126 :param mode: file permissions for the lockfile
127 :param thread_local: Whether this object's internal context should be thread local or not. If this is set to \
128 ``False`` then the lock will be reentrant across threads.
129 :param blocking: whether the lock should be blocking or not
130 :param is_singleton: If this is set to ``True`` then only one instance of this class will be created \
131 per lock file. This is useful if you want to use the lock object for reentrant locking without needing \
132 to pass the same object around.
133 :param loop: The event loop to use. If not specified, the running event loop will be used.
134 :param run_in_executor: If this is set to ``True`` then the lock will be acquired in an executor.
135 :param executor: The executor to use. If not specified, the default executor will be used.
136
137 """
138 self._is_thread_local = thread_local
139 self._is_singleton = is_singleton
140
141 # Create the context. Note that external code should not work with the context directly and should instead use
142 # properties of this class.
143 kwargs: dict[str, Any] = {
144 "lock_file": os.fspath(lock_file),
145 "timeout": timeout,
146 "mode": mode,
147 "blocking": blocking,
148 "loop": loop,
149 "run_in_executor": run_in_executor,
150 "executor": executor,
151 }
152 self._context: AsyncFileLockContext = (AsyncThreadLocalFileContext if thread_local else AsyncFileLockContext)(
153 **kwargs
154 )
155
156 @property
157 def run_in_executor(self) -> bool:
158 """::return: whether run in executor."""
159 return self._context.run_in_executor
160
161 @property
162 def executor(self) -> futures.Executor | None:
163 """::return: the executor."""
164 return self._context.executor
165
166 @executor.setter
167 def executor(self, value: futures.Executor | None) -> None: # pragma: no cover
168 """
169 Change the executor.
170
171 :param value: the new executor or ``None``
172 :type value: futures.Executor | None
173
174 """
175 self._context.executor = value
176
177 @property
178 def loop(self) -> asyncio.AbstractEventLoop | None:
179 """::return: the event loop."""
180 return self._context.loop
181
182 async def acquire( # type: ignore[override]
183 self,
184 timeout: float | None = None,
185 poll_interval: float = 0.05,
186 *,
187 blocking: bool | None = None,
188 ) -> AsyncAcquireReturnProxy:
189 """
190 Try to acquire the file lock.
191
192 :param timeout: maximum wait time for acquiring the lock, ``None`` means use the default
193 :attr:`~BaseFileLock.timeout` is and if ``timeout < 0``, there is no timeout and
194 this method will block until the lock could be acquired
195 :param poll_interval: interval of trying to acquire the lock file
196 :param blocking: defaults to True. If False, function will return immediately if it cannot obtain a lock on the
197 first attempt. Otherwise, this method will block until the timeout expires or the lock is acquired.
198 :raises Timeout: if fails to acquire lock within the timeout period
199 :return: a context object that will unlock the file when the context is exited
200
201 .. code-block:: python
202
203 # You can use this method in the context manager (recommended)
204 with lock.acquire():
205 pass
206
207 # Or use an equivalent try-finally construct:
208 lock.acquire()
209 try:
210 pass
211 finally:
212 lock.release()
213
214 """
215 # Use the default timeout, if no timeout is provided.
216 if timeout is None:
217 timeout = self._context.timeout
218
219 if blocking is None:
220 blocking = self._context.blocking
221
222 # Increment the number right at the beginning. We can still undo it, if something fails.
223 self._context.lock_counter += 1
224
225 lock_id = id(self)
226 lock_filename = self.lock_file
227 start_time = time.perf_counter()
228 try:
229 while True:
230 if not self.is_locked:
231 _LOGGER.debug("Attempting to acquire lock %s on %s", lock_id, lock_filename)
232 await self._run_internal_method(self._acquire)
233 if self.is_locked:
234 _LOGGER.debug("Lock %s acquired on %s", lock_id, lock_filename)
235 break
236 if blocking is False:
237 _LOGGER.debug("Failed to immediately acquire lock %s on %s", lock_id, lock_filename)
238 raise Timeout(lock_filename) # noqa: TRY301
239 if 0 <= timeout < time.perf_counter() - start_time:
240 _LOGGER.debug("Timeout on acquiring lock %s on %s", lock_id, lock_filename)
241 raise Timeout(lock_filename) # noqa: TRY301
242 msg = "Lock %s not acquired on %s, waiting %s seconds ..."
243 _LOGGER.debug(msg, lock_id, lock_filename, poll_interval)
244 await asyncio.sleep(poll_interval)
245 except BaseException: # Something did go wrong, so decrement the counter.
246 self._context.lock_counter = max(0, self._context.lock_counter - 1)
247 raise
248 return AsyncAcquireReturnProxy(lock=self)
249
250 async def release(self, force: bool = False) -> None: # type: ignore[override] # noqa: FBT001, FBT002
251 """
252 Releases the file lock. Please note, that the lock is only completely released, if the lock counter is 0.
253 Also note, that the lock file itself is not automatically deleted.
254
255 :param force: If true, the lock counter is ignored and the lock is released in every case/
256
257 """
258 if self.is_locked:
259 self._context.lock_counter -= 1
260
261 if self._context.lock_counter == 0 or force:
262 lock_id, lock_filename = id(self), self.lock_file
263
264 _LOGGER.debug("Attempting to release lock %s on %s", lock_id, lock_filename)
265 await self._run_internal_method(self._release)
266 self._context.lock_counter = 0
267 _LOGGER.debug("Lock %s released on %s", lock_id, lock_filename)
268
269 async def _run_internal_method(self, method: Callable[[], Any]) -> None:
270 if iscoroutinefunction(method):
271 await method()
272 elif self.run_in_executor:
273 loop = self.loop or asyncio.get_running_loop()
274 await loop.run_in_executor(self.executor, method)
275 else:
276 method()
277
278 def __enter__(self) -> NoReturn:
279 """
280 Replace old __enter__ method to avoid using it.
281
282 NOTE: DO NOT USE `with` FOR ASYNCIO LOCKS, USE `async with` INSTEAD.
283
284 :return: none
285 :rtype: NoReturn
286 """
287 msg = "Do not use `with` for asyncio locks, use `async with` instead."
288 raise NotImplementedError(msg)
289
290 async def __aenter__(self) -> Self:
291 """
292 Acquire the lock.
293
294 :return: the lock object
295
296 """
297 await self.acquire()
298 return self
299
300 async def __aexit__(
301 self,
302 exc_type: type[BaseException] | None,
303 exc_value: BaseException | None,
304 traceback: TracebackType | None,
305 ) -> None:
306 """
307 Release the lock.
308
309 :param exc_type: the exception type if raised
310 :param exc_value: the exception value if raised
311 :param traceback: the exception traceback if raised
312
313 """
314 await self.release()
315
316 def __del__(self) -> None:
317 """Called when the lock object is deleted."""
318 with contextlib.suppress(RuntimeError):
319 loop = self.loop or asyncio.get_running_loop()
320 if not loop.is_running(): # pragma: no cover
321 loop.run_until_complete(self.release(force=True))
322 else:
323 loop.create_task(self.release(force=True))
324
325
326class AsyncSoftFileLock(SoftFileLock, BaseAsyncFileLock):
327 """Simply watches the existence of the lock file."""
328
329
330class AsyncUnixFileLock(UnixFileLock, BaseAsyncFileLock):
331 """Uses the :func:`fcntl.flock` to hard lock the lock file on unix systems."""
332
333
334class AsyncWindowsFileLock(WindowsFileLock, BaseAsyncFileLock):
335 """Uses the :func:`msvcrt.locking` to hard lock the lock file on windows systems."""
336
337
338__all__ = [
339 "AsyncAcquireReturnProxy",
340 "AsyncSoftFileLock",
341 "AsyncUnixFileLock",
342 "AsyncWindowsFileLock",
343 "BaseAsyncFileLock",
344]