1"""Async wrapper around :class:`SoftReadWriteLock` for use with ``asyncio``."""
2
3from __future__ import annotations
4
5import asyncio
6import functools
7from contextlib import asynccontextmanager
8from typing import TYPE_CHECKING
9
10from ._sync import SoftReadWriteLock
11
12if TYPE_CHECKING:
13 import os
14 from collections.abc import AsyncGenerator, Callable
15 from concurrent import futures
16 from types import TracebackType
17
18
19class AsyncAcquireSoftReadWriteReturnProxy:
20 """Async context-aware object that releases an :class:`AsyncSoftReadWriteLock` on exit."""
21
22 def __init__(self, lock: AsyncSoftReadWriteLock) -> None:
23 self.lock = lock
24
25 async def __aenter__(self) -> AsyncSoftReadWriteLock:
26 return self.lock
27
28 async def __aexit__(
29 self,
30 exc_type: type[BaseException] | None,
31 exc_value: BaseException | None,
32 traceback: TracebackType | None,
33 ) -> None:
34 await self.lock.release()
35
36
37class AsyncSoftReadWriteLock:
38 """
39 Async wrapper around :class:`SoftReadWriteLock` for ``asyncio`` applications.
40
41 The sync class's blocking filesystem operations run on a thread pool via ``loop.run_in_executor()``.
42 Reentrancy, upgrade/downgrade rules, fork handling, heartbeat and TTL stale detection, and singleton
43 behavior are delegated to the underlying :class:`SoftReadWriteLock`.
44
45 :param lock_file: path to the lock file; sidecar state/write/readers live next to it
46 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
47 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately on contention
48 :param is_singleton: if ``True``, reuse existing :class:`SoftReadWriteLock` instances per resolved path
49 :param heartbeat_interval: seconds between heartbeat refreshes; default 30 s
50 :param stale_threshold: seconds of mtime inactivity before a marker is stale; defaults to ``3 * heartbeat_interval``
51 :param poll_interval: seconds between acquire retries under contention; default 0.25 s
52 :param loop: event loop for ``run_in_executor``; ``None`` uses the running loop
53 :param executor: executor for ``run_in_executor``; ``None`` uses the default executor
54
55 .. versionadded:: 3.27.0
56
57 """
58
59 def __init__( # noqa: PLR0913
60 self,
61 lock_file: str | os.PathLike[str],
62 timeout: float = -1,
63 *,
64 blocking: bool = True,
65 is_singleton: bool = True,
66 heartbeat_interval: float = 30.0,
67 stale_threshold: float | None = None,
68 poll_interval: float = 0.25,
69 loop: asyncio.AbstractEventLoop | None = None,
70 executor: futures.Executor | None = None,
71 ) -> None:
72 self._lock = SoftReadWriteLock(
73 lock_file,
74 timeout,
75 blocking=blocking,
76 is_singleton=is_singleton,
77 heartbeat_interval=heartbeat_interval,
78 stale_threshold=stale_threshold,
79 poll_interval=poll_interval,
80 )
81 self._loop = loop
82 self._executor = executor
83
84 @property
85 def lock_file(self) -> str:
86 """:returns: the path to the lock file passed to the constructor."""
87 return self._lock.lock_file
88
89 @property
90 def timeout(self) -> float:
91 """:returns: the default timeout applied when ``acquire_read`` / ``acquire_write`` is called without one."""
92 return self._lock.timeout
93
94 @property
95 def blocking(self) -> bool:
96 """:returns: whether ``acquire_*`` defaults to blocking; ``False`` makes contention raise immediately."""
97 return self._lock.blocking
98
99 @property
100 def loop(self) -> asyncio.AbstractEventLoop | None:
101 """:returns: the event loop used for ``run_in_executor``, or ``None`` for the running loop."""
102 return self._loop
103
104 @property
105 def executor(self) -> futures.Executor | None:
106 """:returns: the executor used for ``run_in_executor``, or ``None`` for the default executor."""
107 return self._executor
108
109 async def acquire_read(
110 self, timeout: float | None = None, *, blocking: bool | None = None
111 ) -> AsyncAcquireSoftReadWriteReturnProxy:
112 """
113 Acquire a shared read lock.
114
115 See :meth:`SoftReadWriteLock.acquire_read` for the full reentrancy / upgrade / fork semantics. The blocking
116 work runs inside ``run_in_executor`` so other coroutines on the same loop continue to progress while this
117 call waits.
118
119 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
120 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
121
122 :returns: a proxy usable as an async context manager to release the lock
123
124 :raises RuntimeError: if a write lock is already held, if this instance was invalidated by
125 :func:`os.fork`, or if :meth:`close` was called
126 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
127
128 """
129 await self._run(self._lock.acquire_read, timeout, blocking=blocking)
130 return AsyncAcquireSoftReadWriteReturnProxy(lock=self)
131
132 async def acquire_write(
133 self, timeout: float | None = None, *, blocking: bool | None = None
134 ) -> AsyncAcquireSoftReadWriteReturnProxy:
135 """
136 Acquire an exclusive write lock.
137
138 See :meth:`SoftReadWriteLock.acquire_write` for the two-phase writer-preferring semantics. The blocking
139 work runs inside ``run_in_executor``.
140
141 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
142 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
143
144 :returns: a proxy usable as an async context manager to release the lock
145
146 :raises RuntimeError: if a read lock is already held, if a write lock is held by a different thread, if
147 this instance was invalidated by :func:`os.fork`, or if :meth:`close` was called
148 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
149
150 """
151 await self._run(self._lock.acquire_write, timeout, blocking=blocking)
152 return AsyncAcquireSoftReadWriteReturnProxy(lock=self)
153
154 async def release(self, *, force: bool = False) -> None:
155 """
156 Release one level of the current lock.
157
158 :param force: if ``True``, release the lock completely regardless of the current lock level
159
160 :raises RuntimeError: if no lock is currently held and *force* is ``False``
161
162 """
163 await self._run(self._lock.release, force=force)
164
165 @asynccontextmanager
166 async def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]:
167 """
168 Async context manager that acquires and releases a shared read lock.
169
170 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
171 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
172
173 :raises RuntimeError: if a write lock is already held on this instance
174 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
175
176 """
177 await self.acquire_read(timeout, blocking=blocking)
178 try:
179 yield
180 finally:
181 await self.release()
182
183 @asynccontextmanager
184 async def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]:
185 """
186 Async context manager that acquires and releases an exclusive write lock.
187
188 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
189 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
190
191 :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread
192 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
193
194 """
195 await self.acquire_write(timeout, blocking=blocking)
196 try:
197 yield
198 finally:
199 await self.release()
200
201 async def close(self) -> None:
202 """Release any held lock and release the underlying filesystem resources. Idempotent."""
203 await self._run(self._lock.close)
204
205 async def _run(self, func: Callable[..., object], *args: object, **kwargs: object) -> object:
206 loop = self._loop or asyncio.get_running_loop()
207 return await loop.run_in_executor(self._executor, functools.partial(func, *args, **kwargs))
208
209
210__all__ = [
211 "AsyncAcquireSoftReadWriteReturnProxy",
212 "AsyncSoftReadWriteLock",
213]