Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/filelock/_async_read_write.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

80 statements  

1"""Async wrapper around :class:`ReadWriteLock` for use with ``asyncio``.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import functools 

7from concurrent.futures import ThreadPoolExecutor 

8from contextlib import asynccontextmanager 

9from typing import TYPE_CHECKING 

10 

11from ._read_write import ReadWriteLock 

12 

13if TYPE_CHECKING: 

14 import os 

15 from collections.abc import AsyncGenerator, Callable 

16 from concurrent import futures 

17 from types import TracebackType 

18 

19 

20class AsyncAcquireReadWriteReturnProxy: 

21 """Context-aware object that releases the async read/write lock on exit.""" 

22 

23 def __init__(self, lock: AsyncReadWriteLock) -> None: 

24 self.lock = lock 

25 

26 async def __aenter__(self) -> AsyncReadWriteLock: 

27 return self.lock 

28 

29 async def __aexit__( 

30 self, 

31 exc_type: type[BaseException] | None, 

32 exc_value: BaseException | None, 

33 traceback: TracebackType | None, 

34 ) -> None: 

35 await self.lock.release() 

36 

37 

38class AsyncReadWriteLock: 

39 """ 

40 Async wrapper around :class:`ReadWriteLock` for use in ``asyncio`` applications. 

41 

42 Because Python's :mod:`sqlite3` module has no async API, all blocking SQLite operations are dispatched to a thread 

43 pool via ``loop.run_in_executor()``. Reentrancy, upgrade/downgrade rules, and singleton behavior are delegated 

44 to the underlying :class:`ReadWriteLock`. 

45 

46 :param lock_file: path to the SQLite database file used as the lock 

47 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

48 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

49 :param is_singleton: if ``True``, reuse existing :class:`ReadWriteLock` instances for the same resolved path 

50 :param loop: event loop for ``run_in_executor``; ``None`` uses the running loop 

51 :param executor: executor for ``run_in_executor``. When ``None`` a dedicated single-thread executor is created 

52 and owned by this lock, ensuring every operation runs on the same thread (required for SQLite affinity); it 

53 is shut down by :meth:`close`. A caller-supplied executor is used as-is and never shut down here, so when no 

54 executor is passed remember to call :meth:`close` to release the owned one. 

55 

56 .. versionadded:: 3.21.0 

57 

58 """ 

59 

60 def __init__( # noqa: PLR0913 

61 self, 

62 lock_file: str | os.PathLike[str], 

63 timeout: float = -1, 

64 *, 

65 blocking: bool = True, 

66 is_singleton: bool = True, 

67 loop: asyncio.AbstractEventLoop | None = None, 

68 executor: futures.Executor | None = None, 

69 ) -> None: 

70 self._lock = ReadWriteLock(lock_file, timeout, blocking=blocking, is_singleton=is_singleton) 

71 self._loop = loop 

72 self._owns_executor = executor is None 

73 self._executor = executor or ThreadPoolExecutor(max_workers=1) 

74 

75 @property 

76 def lock_file(self) -> str: 

77 """:returns: the path to the lock file.""" 

78 return self._lock.lock_file 

79 

80 @property 

81 def timeout(self) -> float: 

82 """:returns: the default timeout.""" 

83 return self._lock.timeout 

84 

85 @property 

86 def blocking(self) -> bool: 

87 """:returns: whether blocking is enabled by default.""" 

88 return self._lock.blocking 

89 

90 @property 

91 def loop(self) -> asyncio.AbstractEventLoop | None: 

92 """:returns: the event loop (or ``None`` for the running loop).""" 

93 return self._loop 

94 

95 @property 

96 def executor(self) -> futures.Executor: 

97 """:returns: the executor used for ``run_in_executor`` (a dedicated single-thread one if none was supplied).""" 

98 return self._executor 

99 

100 async def _run(self, func: Callable[..., object], *args: object, **kwargs: object) -> object: 

101 loop = self._loop or asyncio.get_running_loop() 

102 return await loop.run_in_executor(self._executor, functools.partial(func, *args, **kwargs)) 

103 

104 async def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AsyncAcquireReadWriteReturnProxy: 

105 """ 

106 Acquire a shared read lock. 

107 

108 See :meth:`ReadWriteLock.acquire_read` for full semantics. 

109 

110 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

111 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

112 

113 :returns: a proxy that can be used as an async context manager to release the lock 

114 

115 :raises RuntimeError: if a write lock is already held on this instance 

116 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

117 

118 """ 

119 await self._run(self._lock.acquire_read, timeout, blocking=blocking) 

120 return AsyncAcquireReadWriteReturnProxy(lock=self) 

121 

122 async def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AsyncAcquireReadWriteReturnProxy: 

123 """ 

124 Acquire an exclusive write lock. 

125 

126 See :meth:`ReadWriteLock.acquire_write` for full semantics. 

127 

128 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

129 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

130 

131 :returns: a proxy that can be used as an async context manager to release the lock 

132 

133 :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread 

134 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

135 

136 """ 

137 await self._run(self._lock.acquire_write, timeout, blocking=blocking) 

138 return AsyncAcquireReadWriteReturnProxy(lock=self) 

139 

140 async def release(self, *, force: bool = False) -> None: 

141 """ 

142 Release one level of the current lock. 

143 

144 See :meth:`ReadWriteLock.release` for full semantics. 

145 

146 :param force: if ``True``, release the lock completely regardless of the current lock level 

147 

148 :raises RuntimeError: if no lock is currently held and *force* is ``False`` 

149 

150 """ 

151 await self._run(self._lock.release, force=force) 

152 

153 @asynccontextmanager 

154 async def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]: 

155 """ 

156 Async context manager that acquires and releases a shared read lock. 

157 

158 Falls back to instance defaults for *timeout* and *blocking* when ``None``. 

159 

160 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

161 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

162 

163 """ 

164 if timeout is None: 

165 timeout = self._lock.timeout 

166 if blocking is None: 

167 blocking = self._lock.blocking 

168 await self.acquire_read(timeout, blocking=blocking) 

169 try: 

170 yield 

171 finally: 

172 await self.release() 

173 

174 @asynccontextmanager 

175 async def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]: 

176 """ 

177 Async context manager that acquires and releases an exclusive write lock. 

178 

179 Falls back to instance defaults for *timeout* and *blocking* when ``None``. 

180 

181 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

182 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

183 

184 """ 

185 if timeout is None: 

186 timeout = self._lock.timeout 

187 if blocking is None: 

188 blocking = self._lock.blocking 

189 await self.acquire_write(timeout, blocking=blocking) 

190 try: 

191 yield 

192 finally: 

193 await self.release() 

194 

195 async def close(self) -> None: 

196 """ 

197 Release the lock (if held) and close the underlying SQLite connection. 

198 

199 After calling this method, the lock instance is no longer usable. 

200 

201 """ 

202 await self._run(self._lock.close) 

203 if self._owns_executor: 

204 self._executor.shutdown(wait=False) 

205 

206 def __del__(self) -> None: 

207 # Safety net: if close() was never called, still shut down the executor we created so its worker thread does 

208 # not outlive the lock. A caller-supplied executor is left untouched. shutdown(wait=False) never blocks. 

209 if getattr(self, "_owns_executor", False): 

210 self._executor.shutdown(wait=False) 

211 

212 

213__all__ = [ 

214 "AsyncAcquireReadWriteReturnProxy", 

215 "AsyncReadWriteLock", 

216]