1"""An asyncio-based implementation of the file lock."""
2
3from __future__ import annotations
4
5import asyncio
6import contextlib
7import logging
8import os
9import time
10from dataclasses import dataclass
11from inspect import iscoroutinefunction
12from threading import local
13from typing import TYPE_CHECKING, Any, Callable, NoReturn, cast
14
15from ._api import BaseFileLock, FileLockContext, FileLockMeta
16from ._error import Timeout
17from ._soft import SoftFileLock
18from ._unix import UnixFileLock
19from ._windows import WindowsFileLock
20
21if TYPE_CHECKING:
22 import sys
23 from concurrent import futures
24 from types import TracebackType
25
26 if sys.version_info >= (3, 11): # pragma: no cover (py311+)
27 from typing import Self
28 else: # pragma: no cover (<py311)
29 from typing_extensions import Self
30
31
32_LOGGER = logging.getLogger("filelock")
33
34
35@dataclass
36class AsyncFileLockContext(FileLockContext):
37 """A dataclass which holds the context for a ``BaseAsyncFileLock`` object."""
38
39 #: Whether run in executor
40 run_in_executor: bool = True
41
42 #: The executor
43 executor: futures.Executor | None = None
44
45 #: The loop
46 loop: asyncio.AbstractEventLoop | None = None
47
48
49class AsyncThreadLocalFileContext(AsyncFileLockContext, local):
50 """A thread local version of the ``FileLockContext`` class."""
51
52
53class AsyncAcquireReturnProxy:
54 """A context-aware object that will release the lock file when exiting."""
55
56 def __init__(self, lock: BaseAsyncFileLock) -> None: # noqa: D107
57 self.lock = lock
58
59 async def __aenter__(self) -> BaseAsyncFileLock: # noqa: D105
60 return self.lock
61
62 async def __aexit__( # noqa: D105
63 self,
64 exc_type: type[BaseException] | None,
65 exc_value: BaseException | None,
66 traceback: TracebackType | None,
67 ) -> None:
68 await self.lock.release()
69
70
71class AsyncFileLockMeta(FileLockMeta):
72 def __call__( # type: ignore[override] # noqa: PLR0913
73 cls, # noqa: N805
74 lock_file: str | os.PathLike[str],
75 timeout: float = -1,
76 mode: int = 0o644,
77 thread_local: bool = False, # noqa: FBT001, FBT002
78 *,
79 blocking: bool = True,
80 is_singleton: bool = False,
81 loop: asyncio.AbstractEventLoop | None = None,
82 run_in_executor: bool = True,
83 executor: futures.Executor | None = None,
84 ) -> BaseAsyncFileLock:
85 if thread_local and run_in_executor:
86 msg = "run_in_executor is not supported when thread_local is True"
87 raise ValueError(msg)
88 instance = super().__call__(
89 lock_file=lock_file,
90 timeout=timeout,
91 mode=mode,
92 thread_local=thread_local,
93 blocking=blocking,
94 is_singleton=is_singleton,
95 loop=loop,
96 run_in_executor=run_in_executor,
97 executor=executor,
98 )
99 return cast("BaseAsyncFileLock", instance)
100
101
102class BaseAsyncFileLock(BaseFileLock, metaclass=AsyncFileLockMeta):
103 """Base class for asynchronous file locks."""
104
105 def __init__( # noqa: PLR0913
106 self,
107 lock_file: str | os.PathLike[str],
108 timeout: float = -1,
109 mode: int = 0o644,
110 thread_local: bool = False, # noqa: FBT001, FBT002
111 *,
112 blocking: bool = True,
113 is_singleton: bool = False,
114 loop: asyncio.AbstractEventLoop | None = None,
115 run_in_executor: bool = True,
116 executor: futures.Executor | None = None,
117 ) -> None:
118 """
119 Create a new lock object.
120
121 :param lock_file: path to the file
122 :param timeout: default timeout when acquiring the lock, in seconds. It will be used as fallback value in \
123 the acquire method, if no timeout value (``None``) is given. If you want to disable the timeout, set it \
124 to a negative value. A timeout of 0 means that there is exactly one attempt to acquire the file lock.
125 :param mode: file permissions for the lockfile
126 :param thread_local: Whether this object's internal context should be thread local or not. If this is set to \
127 ``False`` then the lock will be reentrant across threads.
128 :param blocking: whether the lock should be blocking or not
129 :param is_singleton: If this is set to ``True`` then only one instance of this class will be created \
130 per lock file. This is useful if you want to use the lock object for reentrant locking without needing \
131 to pass the same object around.
132 :param loop: The event loop to use. If not specified, the running event loop will be used.
133 :param run_in_executor: If this is set to ``True`` then the lock will be acquired in an executor.
134 :param executor: The executor to use. If not specified, the default executor will be used.
135
136 """
137 self._is_thread_local = thread_local
138 self._is_singleton = is_singleton
139
140 # Create the context. Note that external code should not work with the context directly and should instead use
141 # properties of this class.
142 kwargs: dict[str, Any] = {
143 "lock_file": os.fspath(lock_file),
144 "timeout": timeout,
145 "mode": mode,
146 "blocking": blocking,
147 "loop": loop,
148 "run_in_executor": run_in_executor,
149 "executor": executor,
150 }
151 self._context: AsyncFileLockContext = (AsyncThreadLocalFileContext if thread_local else AsyncFileLockContext)(
152 **kwargs
153 )
154
155 @property
156 def run_in_executor(self) -> bool:
157 """::return: whether run in executor."""
158 return self._context.run_in_executor
159
160 @property
161 def executor(self) -> futures.Executor | None:
162 """::return: the executor."""
163 return self._context.executor
164
165 @executor.setter
166 def executor(self, value: futures.Executor | None) -> None: # pragma: no cover
167 """
168 Change the executor.
169
170 :param value: the new executor or ``None``
171 :type value: futures.Executor | None
172
173 """
174 self._context.executor = value
175
176 @property
177 def loop(self) -> asyncio.AbstractEventLoop | None:
178 """::return: the event loop."""
179 return self._context.loop
180
181 async def acquire( # type: ignore[override]
182 self,
183 timeout: float | None = None,
184 poll_interval: float = 0.05,
185 *,
186 blocking: bool | None = None,
187 ) -> AsyncAcquireReturnProxy:
188 """
189 Try to acquire the file lock.
190
191 :param timeout: maximum wait time for acquiring the lock, ``None`` means use the default
192 :attr:`~BaseFileLock.timeout` is and if ``timeout < 0``, there is no timeout and
193 this method will block until the lock could be acquired
194 :param poll_interval: interval of trying to acquire the lock file
195 :param blocking: defaults to True. If False, function will return immediately if it cannot obtain a lock on the
196 first attempt. Otherwise, this method will block until the timeout expires or the lock is acquired.
197 :raises Timeout: if fails to acquire lock within the timeout period
198 :return: a context object that will unlock the file when the context is exited
199
200 .. code-block:: python
201
202 # You can use this method in the context manager (recommended)
203 with lock.acquire():
204 pass
205
206 # Or use an equivalent try-finally construct:
207 lock.acquire()
208 try:
209 pass
210 finally:
211 lock.release()
212
213 """
214 # Use the default timeout, if no timeout is provided.
215 if timeout is None:
216 timeout = self._context.timeout
217
218 if blocking is None:
219 blocking = self._context.blocking
220
221 # Increment the number right at the beginning. We can still undo it, if something fails.
222 self._context.lock_counter += 1
223
224 lock_id = id(self)
225 lock_filename = self.lock_file
226 start_time = time.perf_counter()
227 try:
228 while True:
229 if not self.is_locked:
230 _LOGGER.debug("Attempting to acquire lock %s on %s", lock_id, lock_filename)
231 await self._run_internal_method(self._acquire)
232 if self.is_locked:
233 _LOGGER.debug("Lock %s acquired on %s", lock_id, lock_filename)
234 break
235 if blocking is False:
236 _LOGGER.debug("Failed to immediately acquire lock %s on %s", lock_id, lock_filename)
237 raise Timeout(lock_filename) # noqa: TRY301
238 if 0 <= timeout < time.perf_counter() - start_time:
239 _LOGGER.debug("Timeout on acquiring lock %s on %s", lock_id, lock_filename)
240 raise Timeout(lock_filename) # noqa: TRY301
241 msg = "Lock %s not acquired on %s, waiting %s seconds ..."
242 _LOGGER.debug(msg, lock_id, lock_filename, poll_interval)
243 await asyncio.sleep(poll_interval)
244 except BaseException: # Something did go wrong, so decrement the counter.
245 self._context.lock_counter = max(0, self._context.lock_counter - 1)
246 raise
247 return AsyncAcquireReturnProxy(lock=self)
248
249 async def release(self, force: bool = False) -> None: # type: ignore[override] # noqa: FBT001, FBT002
250 """
251 Releases the file lock. Please note, that the lock is only completely released, if the lock counter is 0.
252 Also note, that the lock file itself is not automatically deleted.
253
254 :param force: If true, the lock counter is ignored and the lock is released in every case/
255
256 """
257 if self.is_locked:
258 self._context.lock_counter -= 1
259
260 if self._context.lock_counter == 0 or force:
261 lock_id, lock_filename = id(self), self.lock_file
262
263 _LOGGER.debug("Attempting to release lock %s on %s", lock_id, lock_filename)
264 await self._run_internal_method(self._release)
265 self._context.lock_counter = 0
266 _LOGGER.debug("Lock %s released on %s", lock_id, lock_filename)
267
268 async def _run_internal_method(self, method: Callable[[], Any]) -> None:
269 if iscoroutinefunction(method):
270 await method()
271 elif self.run_in_executor:
272 loop = self.loop or asyncio.get_running_loop()
273 await loop.run_in_executor(self.executor, method)
274 else:
275 method()
276
277 def __enter__(self) -> NoReturn:
278 """
279 Replace old __enter__ method to avoid using it.
280
281 NOTE: DO NOT USE `with` FOR ASYNCIO LOCKS, USE `async with` INSTEAD.
282
283 :return: none
284 :rtype: NoReturn
285 """
286 msg = "Do not use `with` for asyncio locks, use `async with` instead."
287 raise NotImplementedError(msg)
288
289 async def __aenter__(self) -> Self:
290 """
291 Acquire the lock.
292
293 :return: the lock object
294
295 """
296 await self.acquire()
297 return self
298
299 async def __aexit__(
300 self,
301 exc_type: type[BaseException] | None,
302 exc_value: BaseException | None,
303 traceback: TracebackType | None,
304 ) -> None:
305 """
306 Release the lock.
307
308 :param exc_type: the exception type if raised
309 :param exc_value: the exception value if raised
310 :param traceback: the exception traceback if raised
311
312 """
313 await self.release()
314
315 def __del__(self) -> None:
316 """Called when the lock object is deleted."""
317 with contextlib.suppress(RuntimeError):
318 loop = self.loop or asyncio.get_running_loop()
319 if not loop.is_running(): # pragma: no cover
320 loop.run_until_complete(self.release(force=True))
321 else:
322 loop.create_task(self.release(force=True))
323
324
325class AsyncSoftFileLock(SoftFileLock, BaseAsyncFileLock):
326 """Simply watches the existence of the lock file."""
327
328
329class AsyncUnixFileLock(UnixFileLock, BaseAsyncFileLock):
330 """Uses the :func:`fcntl.flock` to hard lock the lock file on unix systems."""
331
332
333class AsyncWindowsFileLock(WindowsFileLock, BaseAsyncFileLock):
334 """Uses the :func:`msvcrt.locking` to hard lock the lock file on windows systems."""
335
336
337__all__ = [
338 "AsyncAcquireReturnProxy",
339 "AsyncSoftFileLock",
340 "AsyncUnixFileLock",
341 "AsyncWindowsFileLock",
342 "BaseAsyncFileLock",
343]