1"""Async wrapper around :class:`ReadWriteLock` for use with ``asyncio``."""
2
3from __future__ import annotations
4
5import asyncio
6import functools
7from concurrent.futures import ThreadPoolExecutor
8from contextlib import asynccontextmanager
9from typing import TYPE_CHECKING
10
11from ._read_write import ReadWriteLock
12
13if TYPE_CHECKING:
14 import os
15 from collections.abc import AsyncGenerator, Callable
16 from concurrent import futures
17 from types import TracebackType
18
19
20class AsyncAcquireReadWriteReturnProxy:
21 """Context-aware object that releases the async read/write lock on exit."""
22
23 def __init__(self, lock: AsyncReadWriteLock) -> None:
24 self.lock = lock
25
26 async def __aenter__(self) -> AsyncReadWriteLock:
27 return self.lock
28
29 async def __aexit__(
30 self,
31 exc_type: type[BaseException] | None,
32 exc_value: BaseException | None,
33 traceback: TracebackType | None,
34 ) -> None:
35 await self.lock.release()
36
37
38class AsyncReadWriteLock:
39 """
40 Async wrapper around :class:`ReadWriteLock` for use in ``asyncio`` applications.
41
42 Because Python's :mod:`sqlite3` module has no async API, all blocking SQLite operations are dispatched to a thread
43 pool via ``loop.run_in_executor()``. Reentrancy, upgrade/downgrade rules, and singleton behavior are delegated
44 to the underlying :class:`ReadWriteLock`.
45
46 :param lock_file: path to the SQLite database file used as the lock
47 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
48 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
49 :param is_singleton: if ``True``, reuse existing :class:`ReadWriteLock` instances for the same resolved path
50 :param loop: event loop for ``run_in_executor``; ``None`` uses the running loop
51 :param executor: executor for ``run_in_executor``; ``None`` uses the default executor
52
53 .. versionadded:: 3.21.0
54
55 """
56
57 def __init__( # noqa: PLR0913
58 self,
59 lock_file: str | os.PathLike[str],
60 timeout: float = -1,
61 *,
62 blocking: bool = True,
63 is_singleton: bool = True,
64 loop: asyncio.AbstractEventLoop | None = None,
65 executor: futures.Executor | None = None,
66 ) -> None:
67 self._lock = ReadWriteLock(lock_file, timeout, blocking=blocking, is_singleton=is_singleton)
68 self._loop = loop
69 self._owns_executor = executor is None
70 self._executor = executor or ThreadPoolExecutor(max_workers=1)
71
72 @property
73 def lock_file(self) -> str:
74 """:returns: the path to the lock file."""
75 return self._lock.lock_file
76
77 @property
78 def timeout(self) -> float:
79 """:returns: the default timeout."""
80 return self._lock.timeout
81
82 @property
83 def blocking(self) -> bool:
84 """:returns: whether blocking is enabled by default."""
85 return self._lock.blocking
86
87 @property
88 def loop(self) -> asyncio.AbstractEventLoop | None:
89 """:returns: the event loop (or ``None`` for the running loop)."""
90 return self._loop
91
92 @property
93 def executor(self) -> futures.Executor | None:
94 """:returns: the executor (or ``None`` for the default)."""
95 return self._executor
96
97 async def _run(self, func: Callable[..., object], *args: object, **kwargs: object) -> object:
98 loop = self._loop or asyncio.get_running_loop()
99 return await loop.run_in_executor(self._executor, functools.partial(func, *args, **kwargs))
100
101 async def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AsyncAcquireReadWriteReturnProxy:
102 """
103 Acquire a shared read lock.
104
105 See :meth:`ReadWriteLock.acquire_read` for full semantics.
106
107 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
108 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
109
110 :returns: a proxy that can be used as an async context manager to release the lock
111
112 :raises RuntimeError: if a write lock is already held on this instance
113 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
114
115 """
116 await self._run(self._lock.acquire_read, timeout, blocking=blocking)
117 return AsyncAcquireReadWriteReturnProxy(lock=self)
118
119 async def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AsyncAcquireReadWriteReturnProxy:
120 """
121 Acquire an exclusive write lock.
122
123 See :meth:`ReadWriteLock.acquire_write` for full semantics.
124
125 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
126 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
127
128 :returns: a proxy that can be used as an async context manager to release the lock
129
130 :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread
131 :raises Timeout: if the lock cannot be acquired within *timeout* seconds
132
133 """
134 await self._run(self._lock.acquire_write, timeout, blocking=blocking)
135 return AsyncAcquireReadWriteReturnProxy(lock=self)
136
137 async def release(self, *, force: bool = False) -> None:
138 """
139 Release one level of the current lock.
140
141 See :meth:`ReadWriteLock.release` for full semantics.
142
143 :param force: if ``True``, release the lock completely regardless of the current lock level
144
145 :raises RuntimeError: if no lock is currently held and *force* is ``False``
146
147 """
148 await self._run(self._lock.release, force=force)
149
150 @asynccontextmanager
151 async def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]:
152 """
153 Async context manager that acquires and releases a shared read lock.
154
155 Falls back to instance defaults for *timeout* and *blocking* when ``None``.
156
157 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
158 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
159
160 """
161 if timeout is None:
162 timeout = self._lock.timeout
163 if blocking is None:
164 blocking = self._lock.blocking
165 await self.acquire_read(timeout, blocking=blocking)
166 try:
167 yield
168 finally:
169 await self.release()
170
171 @asynccontextmanager
172 async def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]:
173 """
174 Async context manager that acquires and releases an exclusive write lock.
175
176 Falls back to instance defaults for *timeout* and *blocking* when ``None``.
177
178 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default
179 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
180
181 """
182 if timeout is None:
183 timeout = self._lock.timeout
184 if blocking is None:
185 blocking = self._lock.blocking
186 await self.acquire_write(timeout, blocking=blocking)
187 try:
188 yield
189 finally:
190 await self.release()
191
192 async def close(self) -> None:
193 """
194 Release the lock (if held) and close the underlying SQLite connection.
195
196 After calling this method, the lock instance is no longer usable.
197
198 """
199 await self._run(self._lock.close)
200 if self._owns_executor:
201 self._executor.shutdown(wait=False)
202
203
204__all__ = [
205 "AsyncAcquireReadWriteReturnProxy",
206 "AsyncReadWriteLock",
207]