1"""Async wrapper around :class:`ReadWriteLock` for use with ``asyncio``."""
2
3from __future__ import annotations
4
5import asyncio
6import functools
7from contextlib import asynccontextmanager
8from typing import TYPE_CHECKING
9
10from ._read_write import ReadWriteLock
11
12if TYPE_CHECKING:
13 import os
14 from collections.abc import AsyncGenerator, Callable
15 from concurrent import futures
16 from types import TracebackType
17
18
19class AsyncAcquireReadWriteReturnProxy:
20 """Context-aware object that releases the async read/write lock on exit."""
21
22 def __init__(self, lock: AsyncReadWriteLock) -> None:
23 self.lock = lock
24
25 async def __aenter__(self) -> AsyncReadWriteLock:
26 return self.lock
27
28 async def __aexit__(
29 self,
30 exc_type: type[BaseException] | None,
31 exc_value: BaseException | None,
32 traceback: TracebackType | None,
33 ) -> None:
34 await self.lock.release()
35
36
37class AsyncReadWriteLock:
38 """
39 Async wrapper around :class:`ReadWriteLock` for use in ``asyncio`` applications.
40
41 Because Python's :mod:`sqlite3` module has no async API, all blocking SQLite operations are dispatched to a thread
42 pool via ``loop.run_in_executor()``. Reentrancy, upgrade/downgrade rules, and singleton behavior are delegated
43 to the underlying :class:`ReadWriteLock`.
44
45 :param lock_file: path to the SQLite database file used as the lock
46 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
47 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
48 :param is_singleton: if ``True``, reuse existing :class:`ReadWriteLock` instances for the same resolved path
49 :param loop: event loop for ``run_in_executor``; ``None`` uses the running loop
50 :param executor: executor for ``run_in_executor``; ``None`` uses the default executor
51
52 .. versionadded:: 3.21.0
53
54 """
55
56 def __init__( # noqa: PLR0913
57 self,
58 lock_file: str | os.PathLike[str],
59 timeout: float = -1,
60 *,
61 blocking: bool = True,
62 is_singleton: bool = True,
63 loop: asyncio.AbstractEventLoop | None = None,
64 executor: futures.Executor | None = None,
65 ) -> None:
66 self._lock = ReadWriteLock(lock_file, timeout, blocking=blocking, is_singleton=is_singleton)
67 self._loop = loop
68 self._executor = executor
69
70 @property
71 def lock_file(self) -> str:
72 """:returns: the path to the lock file."""
73 return self._lock.lock_file
74
75 @property
76 def timeout(self) -> float:
77 """:returns: the default timeout."""
78 return self._lock.timeout
79
80 @property
81 def blocking(self) -> bool:
82 """:returns: whether blocking is enabled by default."""
83 return self._lock.blocking
84
85 @property
86 def loop(self) -> asyncio.AbstractEventLoop | None:
87 """:returns: the event loop (or ``None`` for the running loop)."""
88 return self._loop
89
90 @property
91 def executor(self) -> futures.Executor | None:
92 """:returns: the executor (or ``None`` for the default)."""
93 return self._executor
94
95 async def _run(self, func: Callable[..., object], *args: object, **kwargs: object) -> object:
96 loop = self._loop or asyncio.get_running_loop()
97 return await loop.run_in_executor(self._executor, functools.partial(func, *args, **kwargs))
98
99 async def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AsyncAcquireReadWriteReturnProxy:
100 """
101 Acquire a shared read lock.
102
103 See :meth:`ReadWriteLock.acquire_read` for full semantics.
104
105 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
106 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
107
108 :returns: a proxy that can be used as an async context manager to release the lock
109
110 :raises RuntimeError: if a write lock is already held on this instance
111 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
112
113 """
114 await self._run(self._lock.acquire_read, timeout, blocking=blocking)
115 return AsyncAcquireReadWriteReturnProxy(lock=self)
116
117 async def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AsyncAcquireReadWriteReturnProxy:
118 """
119 Acquire an exclusive write lock.
120
121 See :meth:`ReadWriteLock.acquire_write` for full semantics.
122
123 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
124 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
125
126 :returns: a proxy that can be used as an async context manager to release the lock
127
128 :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread
129 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
130
131 """
132 await self._run(self._lock.acquire_write, timeout, blocking=blocking)
133 return AsyncAcquireReadWriteReturnProxy(lock=self)
134
135 async def release(self, *, force: bool = False) -> None:
136 """
137 Release one level of the current lock.
138
139 See :meth:`ReadWriteLock.release` for full semantics.
140
141 :param force: if ``True``, release the lock completely regardless of the current lock level
142
143 :raises RuntimeError: if no lock is currently held and *force* is ``False``
144
145 """
146 await self._run(self._lock.release, force=force)
147
148 @asynccontextmanager
149 async def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]:
150 """
151 Async context manager that acquires and releases a shared read lock.
152
153 Falls back to instance defaults for *timeout* and *blocking* when ``None``.
154
155 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
156 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
157
158 """
159 if timeout is None:
160 timeout = self._lock.timeout
161 if blocking is None:
162 blocking = self._lock.blocking
163 await self.acquire_read(timeout, blocking=blocking)
164 try:
165 yield
166 finally:
167 await self.release()
168
169 @asynccontextmanager
170 async def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]:
171 """
172 Async context manager that acquires and releases an exclusive write lock.
173
174 Falls back to instance defaults for *timeout* and *blocking* when ``None``.
175
176 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
177 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
178
179 """
180 if timeout is None:
181 timeout = self._lock.timeout
182 if blocking is None:
183 blocking = self._lock.blocking
184 await self.acquire_write(timeout, blocking=blocking)
185 try:
186 yield
187 finally:
188 await self.release()
189
190 async def close(self) -> None:
191 """
192 Release the lock (if held) and close the underlying SQLite connection.
193
194 After calling this method, the lock instance is no longer usable.
195
196 """
197 await self._run(self._lock.close)
198
199
200__all__ = [
201 "AsyncAcquireReadWriteReturnProxy",
202 "AsyncReadWriteLock",
203]