Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/filelock/asyncio.py: 41%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""An asyncio-based implementation of the file lock.""" # noqa: A005
3from __future__ import annotations
5import asyncio
6import contextlib
7import logging
8import os
9import time
10from dataclasses import dataclass
11from threading import local
12from typing import TYPE_CHECKING, Any, Callable, NoReturn, cast
14from ._api import BaseFileLock, FileLockContext, FileLockMeta
15from ._error import Timeout
16from ._soft import SoftFileLock
17from ._unix import UnixFileLock
18from ._windows import WindowsFileLock
20if TYPE_CHECKING:
21 import sys
22 from concurrent import futures
23 from types import TracebackType
25 if sys.version_info >= (3, 11): # pragma: no cover (py311+)
26 from typing import Self
27 else: # pragma: no cover (<py311)
28 from typing_extensions import Self
31_LOGGER = logging.getLogger("filelock")
34@dataclass
35class AsyncFileLockContext(FileLockContext):
36 """A dataclass which holds the context for a ``BaseAsyncFileLock`` object."""
38 #: Whether run in executor
39 run_in_executor: bool = True
41 #: The executor
42 executor: futures.Executor | None = None
44 #: The loop
45 loop: asyncio.AbstractEventLoop | None = None
48class AsyncThreadLocalFileContext(AsyncFileLockContext, local):
49 """A thread local version of the ``FileLockContext`` class."""
52class AsyncAcquireReturnProxy:
53 """A context-aware object that will release the lock file when exiting."""
55 def __init__(self, lock: BaseAsyncFileLock) -> None: # noqa: D107
56 self.lock = lock
58 async def __aenter__(self) -> BaseAsyncFileLock: # noqa: D105
59 return self.lock
61 async def __aexit__( # noqa: D105
62 self,
63 exc_type: type[BaseException] | None,
64 exc_value: BaseException | None,
65 traceback: TracebackType | None,
66 ) -> None:
67 await self.lock.release()
70class AsyncFileLockMeta(FileLockMeta):
71 def __call__( # type: ignore[override] # noqa: PLR0913
72 cls, # noqa: N805
73 lock_file: str | os.PathLike[str],
74 timeout: float = -1,
75 mode: int = 0o644,
76 thread_local: bool = False, # noqa: FBT001, FBT002
77 *,
78 blocking: bool = True,
79 is_singleton: bool = False,
80 loop: asyncio.AbstractEventLoop | None = None,
81 run_in_executor: bool = True,
82 executor: futures.Executor | None = None,
83 ) -> BaseAsyncFileLock:
84 if thread_local and run_in_executor:
85 msg = "run_in_executor is not supported when thread_local is True"
86 raise ValueError(msg)
87 instance = super().__call__(
88 lock_file=lock_file,
89 timeout=timeout,
90 mode=mode,
91 thread_local=thread_local,
92 blocking=blocking,
93 is_singleton=is_singleton,
94 loop=loop,
95 run_in_executor=run_in_executor,
96 executor=executor,
97 )
98 return cast(BaseAsyncFileLock, instance)
101class BaseAsyncFileLock(BaseFileLock, metaclass=AsyncFileLockMeta):
102 """Base class for asynchronous file locks."""
104 def __init__( # noqa: PLR0913
105 self,
106 lock_file: str | os.PathLike[str],
107 timeout: float = -1,
108 mode: int = 0o644,
109 thread_local: bool = False, # noqa: FBT001, FBT002
110 *,
111 blocking: bool = True,
112 is_singleton: bool = False,
113 loop: asyncio.AbstractEventLoop | None = None,
114 run_in_executor: bool = True,
115 executor: futures.Executor | None = None,
116 ) -> None:
117 """
118 Create a new lock object.
120 :param lock_file: path to the file
121 :param timeout: default timeout when acquiring the lock, in seconds. It will be used as fallback value in \
122 the acquire method, if no timeout value (``None``) is given. If you want to disable the timeout, set it \
123 to a negative value. A timeout of 0 means that there is exactly one attempt to acquire the file lock.
124 :param mode: file permissions for the lockfile
125 :param thread_local: Whether this object's internal context should be thread local or not. If this is set to \
126 ``False`` then the lock will be reentrant across threads.
127 :param blocking: whether the lock should be blocking or not
128 :param is_singleton: If this is set to ``True`` then only one instance of this class will be created \
129 per lock file. This is useful if you want to use the lock object for reentrant locking without needing \
130 to pass the same object around.
131 :param loop: The event loop to use. If not specified, the running event loop will be used.
132 :param run_in_executor: If this is set to ``True`` then the lock will be acquired in an executor.
133 :param executor: The executor to use. If not specified, the default executor will be used.
135 """
136 self._is_thread_local = thread_local
137 self._is_singleton = is_singleton
139 # Create the context. Note that external code should not work with the context directly and should instead use
140 # properties of this class.
141 kwargs: dict[str, Any] = {
142 "lock_file": os.fspath(lock_file),
143 "timeout": timeout,
144 "mode": mode,
145 "blocking": blocking,
146 "loop": loop,
147 "run_in_executor": run_in_executor,
148 "executor": executor,
149 }
150 self._context: AsyncFileLockContext = (AsyncThreadLocalFileContext if thread_local else AsyncFileLockContext)(
151 **kwargs
152 )
154 @property
155 def run_in_executor(self) -> bool:
156 """::return: whether run in executor."""
157 return self._context.run_in_executor
159 @property
160 def executor(self) -> futures.Executor | None:
161 """::return: the executor."""
162 return self._context.executor
164 @executor.setter
165 def executor(self, value: futures.Executor | None) -> None: # pragma: no cover
166 """
167 Change the executor.
169 :param value: the new executor or ``None``
170 :type value: futures.Executor | None
172 """
173 self._context.executor = value
175 @property
176 def loop(self) -> asyncio.AbstractEventLoop | None:
177 """::return: the event loop."""
178 return self._context.loop
180 async def acquire( # type: ignore[override]
181 self,
182 timeout: float | None = None,
183 poll_interval: float = 0.05,
184 *,
185 blocking: bool | None = None,
186 ) -> AsyncAcquireReturnProxy:
187 """
188 Try to acquire the file lock.
190 :param timeout: maximum wait time for acquiring the lock, ``None`` means use the default
191 :attr:`~BaseFileLock.timeout` is and if ``timeout < 0``, there is no timeout and
192 this method will block until the lock could be acquired
193 :param poll_interval: interval of trying to acquire the lock file
194 :param blocking: defaults to True. If False, function will return immediately if it cannot obtain a lock on the
195 first attempt. Otherwise, this method will block until the timeout expires or the lock is acquired.
196 :raises Timeout: if fails to acquire lock within the timeout period
197 :return: a context object that will unlock the file when the context is exited
199 .. code-block:: python
201 # You can use this method in the context manager (recommended)
202 with lock.acquire():
203 pass
205 # Or use an equivalent try-finally construct:
206 lock.acquire()
207 try:
208 pass
209 finally:
210 lock.release()
212 """
213 # Use the default timeout, if no timeout is provided.
214 if timeout is None:
215 timeout = self._context.timeout
217 if blocking is None:
218 blocking = self._context.blocking
220 # Increment the number right at the beginning. We can still undo it, if something fails.
221 self._context.lock_counter += 1
223 lock_id = id(self)
224 lock_filename = self.lock_file
225 start_time = time.perf_counter()
226 try:
227 while True:
228 if not self.is_locked:
229 _LOGGER.debug("Attempting to acquire lock %s on %s", lock_id, lock_filename)
230 await self._run_internal_method(self._acquire)
231 if self.is_locked:
232 _LOGGER.debug("Lock %s acquired on %s", lock_id, lock_filename)
233 break
234 if blocking is False:
235 _LOGGER.debug("Failed to immediately acquire lock %s on %s", lock_id, lock_filename)
236 raise Timeout(lock_filename) # noqa: TRY301
237 if 0 <= timeout < time.perf_counter() - start_time:
238 _LOGGER.debug("Timeout on acquiring lock %s on %s", lock_id, lock_filename)
239 raise Timeout(lock_filename) # noqa: TRY301
240 msg = "Lock %s not acquired on %s, waiting %s seconds ..."
241 _LOGGER.debug(msg, lock_id, lock_filename, poll_interval)
242 await asyncio.sleep(poll_interval)
243 except BaseException: # Something did go wrong, so decrement the counter.
244 self._context.lock_counter = max(0, self._context.lock_counter - 1)
245 raise
246 return AsyncAcquireReturnProxy(lock=self)
248 async def release(self, force: bool = False) -> None: # type: ignore[override] # noqa: FBT001, FBT002
249 """
250 Releases the file lock. Please note, that the lock is only completely released, if the lock counter is 0.
251 Also note, that the lock file itself is not automatically deleted.
253 :param force: If true, the lock counter is ignored and the lock is released in every case/
255 """
256 if self.is_locked:
257 self._context.lock_counter -= 1
259 if self._context.lock_counter == 0 or force:
260 lock_id, lock_filename = id(self), self.lock_file
262 _LOGGER.debug("Attempting to release lock %s on %s", lock_id, lock_filename)
263 await self._run_internal_method(self._release)
264 self._context.lock_counter = 0
265 _LOGGER.debug("Lock %s released on %s", lock_id, lock_filename)
267 async def _run_internal_method(self, method: Callable[[], Any]) -> None:
268 if asyncio.iscoroutinefunction(method):
269 await method()
270 elif self.run_in_executor:
271 loop = self.loop or asyncio.get_running_loop()
272 await loop.run_in_executor(self.executor, method)
273 else:
274 method()
276 def __enter__(self) -> NoReturn:
277 """
278 Replace old __enter__ method to avoid using it.
280 NOTE: DO NOT USE `with` FOR ASYNCIO LOCKS, USE `async with` INSTEAD.
282 :return: none
283 :rtype: NoReturn
284 """
285 msg = "Do not use `with` for asyncio locks, use `async with` instead."
286 raise NotImplementedError(msg)
288 async def __aenter__(self) -> Self:
289 """
290 Acquire the lock.
292 :return: the lock object
294 """
295 await self.acquire()
296 return self
298 async def __aexit__(
299 self,
300 exc_type: type[BaseException] | None,
301 exc_value: BaseException | None,
302 traceback: TracebackType | None,
303 ) -> None:
304 """
305 Release the lock.
307 :param exc_type: the exception type if raised
308 :param exc_value: the exception value if raised
309 :param traceback: the exception traceback if raised
311 """
312 await self.release()
314 def __del__(self) -> None:
315 """Called when the lock object is deleted."""
316 with contextlib.suppress(RuntimeError):
317 loop = self.loop or asyncio.get_running_loop()
318 if not loop.is_running(): # pragma: no cover
319 loop.run_until_complete(self.release(force=True))
320 else:
321 loop.create_task(self.release(force=True))
324class AsyncSoftFileLock(SoftFileLock, BaseAsyncFileLock):
325 """Simply watches the existence of the lock file."""
328class AsyncUnixFileLock(UnixFileLock, BaseAsyncFileLock):
329 """Uses the :func:`fcntl.flock` to hard lock the lock file on unix systems."""
332class AsyncWindowsFileLock(WindowsFileLock, BaseAsyncFileLock):
333 """Uses the :func:`msvcrt.locking` to hard lock the lock file on windows systems."""
336__all__ = [
337 "AsyncAcquireReturnProxy",
338 "AsyncSoftFileLock",
339 "AsyncUnixFileLock",
340 "AsyncWindowsFileLock",
341 "BaseAsyncFileLock",
342]