1"""Async wrapper around :class:`ReadWriteLock` for use with ``asyncio``."""
2
3from __future__ import annotations
4
5import asyncio
6import functools
7from concurrent.futures import ThreadPoolExecutor
8from contextlib import asynccontextmanager
9from typing import TYPE_CHECKING
10
11from ._read_write import ReadWriteLock
12
13if TYPE_CHECKING:
14 import os
15 from collections.abc import AsyncGenerator, Callable
16 from concurrent import futures
17 from types import TracebackType
18
19
20class AsyncAcquireReadWriteReturnProxy:
21 """Context-aware object that releases the async read/write lock on exit."""
22
23 def __init__(self, lock: AsyncReadWriteLock) -> None:
24 self.lock = lock
25
26 async def __aenter__(self) -> AsyncReadWriteLock:
27 return self.lock
28
29 async def __aexit__(
30 self,
31 exc_type: type[BaseException] | None,
32 exc_value: BaseException | None,
33 traceback: TracebackType | None,
34 ) -> None:
35 await self.lock.release()
36
37
38class AsyncReadWriteLock:
39 """
40 Async wrapper around :class:`ReadWriteLock` for use in ``asyncio`` applications.
41
42 Because Python's :mod:`sqlite3` module has no async API, all blocking SQLite operations are dispatched to a thread
43 pool via ``loop.run_in_executor()``. Reentrancy, upgrade/downgrade rules, and singleton behavior are delegated
44 to the underlying :class:`ReadWriteLock`.
45
46 :param lock_file: path to the SQLite database file used as the lock
47 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
48 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
49 :param is_singleton: if ``True``, reuse existing :class:`ReadWriteLock` instances for the same resolved path
50 :param loop: event loop for ``run_in_executor``; ``None`` uses the running loop
51 :param executor: executor for ``run_in_executor``. When ``None`` a dedicated single-thread executor is created
52 and owned by this lock, ensuring every operation runs on the same thread (required for SQLite affinity); it
53 is shut down by :meth:`close`. A caller-supplied executor is used as-is and never shut down here, so when no
54 executor is passed remember to call :meth:`close` to release the owned one.
55
56 .. versionadded:: 3.21.0
57
58 """
59
60 def __init__( # noqa: PLR0913
61 self,
62 lock_file: str | os.PathLike[str],
63 timeout: float = -1,
64 *,
65 blocking: bool = True,
66 is_singleton: bool = True,
67 loop: asyncio.AbstractEventLoop | None = None,
68 executor: futures.Executor | None = None,
69 ) -> None:
70 self._lock = ReadWriteLock(lock_file, timeout, blocking=blocking, is_singleton=is_singleton)
71 self._loop = loop
72 self._owns_executor = executor is None
73 self._executor = executor or ThreadPoolExecutor(max_workers=1)
74
75 @property
76 def lock_file(self) -> str:
77 """:returns: the path to the lock file."""
78 return self._lock.lock_file
79
80 @property
81 def timeout(self) -> float:
82 """:returns: the default timeout."""
83 return self._lock.timeout
84
85 @property
86 def blocking(self) -> bool:
87 """:returns: whether blocking is enabled by default."""
88 return self._lock.blocking
89
90 @property
91 def loop(self) -> asyncio.AbstractEventLoop | None:
92 """:returns: the event loop (or ``None`` for the running loop)."""
93 return self._loop
94
95 @property
96 def executor(self) -> futures.Executor:
97 """:returns: the executor used for ``run_in_executor`` (a dedicated single-thread one if none was supplied)."""
98 return self._executor
99
100 async def _run(self, func: Callable[..., object], *args: object, **kwargs: object) -> object:
101 loop = self._loop or asyncio.get_running_loop()
102 return await loop.run_in_executor(self._executor, functools.partial(func, *args, **kwargs))
103
104 async def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AsyncAcquireReadWriteReturnProxy:
105 """
106 Acquire a shared read lock.
107
108 See :meth:`ReadWriteLock.acquire_read` for full semantics.
109
110 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
111 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
112
113 :returns: a proxy that can be used as an async context manager to release the lock
114
115 :raises RuntimeError: if a write lock is already held on this instance
116 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
117
118 """
119 await self._run(self._lock.acquire_read, timeout, blocking=blocking)
120 return AsyncAcquireReadWriteReturnProxy(lock=self)
121
122 async def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AsyncAcquireReadWriteReturnProxy:
123 """
124 Acquire an exclusive write lock.
125
126 See :meth:`ReadWriteLock.acquire_write` for full semantics.
127
128 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
129 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
130
131 :returns: a proxy that can be used as an async context manager to release the lock
132
133 :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread
134 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
135
136 """
137 await self._run(self._lock.acquire_write, timeout, blocking=blocking)
138 return AsyncAcquireReadWriteReturnProxy(lock=self)
139
140 async def release(self, *, force: bool = False) -> None:
141 """
142 Release one level of the current lock.
143
144 See :meth:`ReadWriteLock.release` for full semantics.
145
146 :param force: if ``True``, release the lock completely regardless of the current lock level
147
148 :raises RuntimeError: if no lock is currently held and *force* is ``False``
149
150 """
151 await self._run(self._lock.release, force=force)
152
153 @asynccontextmanager
154 async def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]:
155 """
156 Async context manager that acquires and releases a shared read lock.
157
158 Falls back to instance defaults for *timeout* and *blocking* when ``None``.
159
160 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
161 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
162
163 """
164 if timeout is None:
165 timeout = self._lock.timeout
166 if blocking is None:
167 blocking = self._lock.blocking
168 await self.acquire_read(timeout, blocking=blocking)
169 try:
170 yield
171 finally:
172 await self.release()
173
174 @asynccontextmanager
175 async def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]:
176 """
177 Async context manager that acquires and releases an exclusive write lock.
178
179 Falls back to instance defaults for *timeout* and *blocking* when ``None``.
180
181 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
182 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
183
184 """
185 if timeout is None:
186 timeout = self._lock.timeout
187 if blocking is None:
188 blocking = self._lock.blocking
189 await self.acquire_write(timeout, blocking=blocking)
190 try:
191 yield
192 finally:
193 await self.release()
194
195 async def close(self) -> None:
196 """
197 Release the lock (if held) and close the underlying SQLite connection.
198
199 After calling this method, the lock instance is no longer usable.
200
201 """
202 await self._run(self._lock.close)
203 if self._owns_executor:
204 self._executor.shutdown(wait=False)
205
206 def __del__(self) -> None:
207 # Safety net: if close() was never called, still shut down the executor we created so its worker thread does
208 # not outlive the lock. A caller-supplied executor is left untouched. shutdown(wait=False) never blocks.
209 if getattr(self, "_owns_executor", False):
210 self._executor.shutdown(wait=False)
211
212
213__all__ = [
214 "AsyncAcquireReadWriteReturnProxy",
215 "AsyncReadWriteLock",
216]