Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/filelock/_async_read_write.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

77 statements  

1"""Async wrapper around :class:`ReadWriteLock` for use with ``asyncio``.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import functools 

7from concurrent.futures import ThreadPoolExecutor 

8from contextlib import asynccontextmanager 

9from typing import TYPE_CHECKING 

10 

11from ._read_write import ReadWriteLock 

12 

13if TYPE_CHECKING: 

14 import os 

15 from collections.abc import AsyncGenerator, Callable 

16 from concurrent import futures 

17 from types import TracebackType 

18 

19 

20class AsyncAcquireReadWriteReturnProxy: 

21 """Context-aware object that releases the async read/write lock on exit.""" 

22 

23 def __init__(self, lock: AsyncReadWriteLock) -> None: 

24 self.lock = lock 

25 

26 async def __aenter__(self) -> AsyncReadWriteLock: 

27 return self.lock 

28 

29 async def __aexit__( 

30 self, 

31 exc_type: type[BaseException] | None, 

32 exc_value: BaseException | None, 

33 traceback: TracebackType | None, 

34 ) -> None: 

35 await self.lock.release() 

36 

37 

38class AsyncReadWriteLock: 

39 """ 

40 Async wrapper around :class:`ReadWriteLock` for use in ``asyncio`` applications. 

41 

42 Because Python's :mod:`sqlite3` module has no async API, all blocking SQLite operations are dispatched to a thread 

43 pool via ``loop.run_in_executor()``. Reentrancy, upgrade/downgrade rules, and singleton behavior are delegated 

44 to the underlying :class:`ReadWriteLock`. 

45 

46 :param lock_file: path to the SQLite database file used as the lock 

47 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

48 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

49 :param is_singleton: if ``True``, reuse existing :class:`ReadWriteLock` instances for the same resolved path 

50 :param loop: event loop for ``run_in_executor``; ``None`` uses the running loop 

51 :param executor: executor for ``run_in_executor``; ``None`` uses the default executor 

52 

53 .. versionadded:: 3.21.0 

54 

55 """ 

56 

57 def __init__( # noqa: PLR0913 

58 self, 

59 lock_file: str | os.PathLike[str], 

60 timeout: float = -1, 

61 *, 

62 blocking: bool = True, 

63 is_singleton: bool = True, 

64 loop: asyncio.AbstractEventLoop | None = None, 

65 executor: futures.Executor | None = None, 

66 ) -> None: 

67 self._lock = ReadWriteLock(lock_file, timeout, blocking=blocking, is_singleton=is_singleton) 

68 self._loop = loop 

69 self._owns_executor = executor is None 

70 self._executor = executor or ThreadPoolExecutor(max_workers=1) 

71 

72 @property 

73 def lock_file(self) -> str: 

74 """:returns: the path to the lock file.""" 

75 return self._lock.lock_file 

76 

77 @property 

78 def timeout(self) -> float: 

79 """:returns: the default timeout.""" 

80 return self._lock.timeout 

81 

82 @property 

83 def blocking(self) -> bool: 

84 """:returns: whether blocking is enabled by default.""" 

85 return self._lock.blocking 

86 

87 @property 

88 def loop(self) -> asyncio.AbstractEventLoop | None: 

89 """:returns: the event loop (or ``None`` for the running loop).""" 

90 return self._loop 

91 

92 @property 

93 def executor(self) -> futures.Executor | None: 

94 """:returns: the executor (or ``None`` for the default).""" 

95 return self._executor 

96 

97 async def _run(self, func: Callable[..., object], *args: object, **kwargs: object) -> object: 

98 loop = self._loop or asyncio.get_running_loop() 

99 return await loop.run_in_executor(self._executor, functools.partial(func, *args, **kwargs)) 

100 

101 async def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AsyncAcquireReadWriteReturnProxy: 

102 """ 

103 Acquire a shared read lock. 

104 

105 See :meth:`ReadWriteLock.acquire_read` for full semantics. 

106 

107 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

108 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

109 

110 :returns: a proxy that can be used as an async context manager to release the lock 

111 

112 :raises RuntimeError: if a write lock is already held on this instance 

113 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

114 

115 """ 

116 await self._run(self._lock.acquire_read, timeout, blocking=blocking) 

117 return AsyncAcquireReadWriteReturnProxy(lock=self) 

118 

119 async def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AsyncAcquireReadWriteReturnProxy: 

120 """ 

121 Acquire an exclusive write lock. 

122 

123 See :meth:`ReadWriteLock.acquire_write` for full semantics. 

124 

125 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

126 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

127 

128 :returns: a proxy that can be used as an async context manager to release the lock 

129 

130 :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread 

131 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

132 

133 """ 

134 await self._run(self._lock.acquire_write, timeout, blocking=blocking) 

135 return AsyncAcquireReadWriteReturnProxy(lock=self) 

136 

137 async def release(self, *, force: bool = False) -> None: 

138 """ 

139 Release one level of the current lock. 

140 

141 See :meth:`ReadWriteLock.release` for full semantics. 

142 

143 :param force: if ``True``, release the lock completely regardless of the current lock level 

144 

145 :raises RuntimeError: if no lock is currently held and *force* is ``False`` 

146 

147 """ 

148 await self._run(self._lock.release, force=force) 

149 

150 @asynccontextmanager 

151 async def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]: 

152 """ 

153 Async context manager that acquires and releases a shared read lock. 

154 

155 Falls back to instance defaults for *timeout* and *blocking* when ``None``. 

156 

157 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

158 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

159 

160 """ 

161 if timeout is None: 

162 timeout = self._lock.timeout 

163 if blocking is None: 

164 blocking = self._lock.blocking 

165 await self.acquire_read(timeout, blocking=blocking) 

166 try: 

167 yield 

168 finally: 

169 await self.release() 

170 

171 @asynccontextmanager 

172 async def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]: 

173 """ 

174 Async context manager that acquires and releases an exclusive write lock. 

175 

176 Falls back to instance defaults for *timeout* and *blocking* when ``None``. 

177 

178 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

179 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

180 

181 """ 

182 if timeout is None: 

183 timeout = self._lock.timeout 

184 if blocking is None: 

185 blocking = self._lock.blocking 

186 await self.acquire_write(timeout, blocking=blocking) 

187 try: 

188 yield 

189 finally: 

190 await self.release() 

191 

192 async def close(self) -> None: 

193 """ 

194 Release the lock (if held) and close the underlying SQLite connection. 

195 

196 After calling this method, the lock instance is no longer usable. 

197 

198 """ 

199 await self._run(self._lock.close) 

200 if self._owns_executor: 

201 self._executor.shutdown(wait=False) 

202 

203 

204__all__ = [ 

205 "AsyncAcquireReadWriteReturnProxy", 

206 "AsyncReadWriteLock", 

207]