Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/filelock/_async_read_write.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

73 statements  

1"""Async wrapper around :class:`ReadWriteLock` for use with ``asyncio``.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import functools 

7from contextlib import asynccontextmanager 

8from typing import TYPE_CHECKING 

9 

10from ._read_write import ReadWriteLock 

11 

12if TYPE_CHECKING: 

13 import os 

14 from collections.abc import AsyncGenerator, Callable 

15 from concurrent import futures 

16 from types import TracebackType 

17 

18 

19class AsyncAcquireReadWriteReturnProxy: 

20 """Context-aware object that releases the async read/write lock on exit.""" 

21 

22 def __init__(self, lock: AsyncReadWriteLock) -> None: 

23 self.lock = lock 

24 

25 async def __aenter__(self) -> AsyncReadWriteLock: 

26 return self.lock 

27 

28 async def __aexit__( 

29 self, 

30 exc_type: type[BaseException] | None, 

31 exc_value: BaseException | None, 

32 traceback: TracebackType | None, 

33 ) -> None: 

34 await self.lock.release() 

35 

36 

37class AsyncReadWriteLock: 

38 """ 

39 Async wrapper around :class:`ReadWriteLock` for use in ``asyncio`` applications. 

40 

41 Because Python's :mod:`sqlite3` module has no async API, all blocking SQLite operations are dispatched to a thread 

42 pool via ``loop.run_in_executor()``. Reentrancy, upgrade/downgrade rules, and singleton behavior are delegated 

43 to the underlying :class:`ReadWriteLock`. 

44 

45 :param lock_file: path to the SQLite database file used as the lock 

46 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

47 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

48 :param is_singleton: if ``True``, reuse existing :class:`ReadWriteLock` instances for the same resolved path 

49 :param loop: event loop for ``run_in_executor``; ``None`` uses the running loop 

50 :param executor: executor for ``run_in_executor``; ``None`` uses the default executor 

51 

52 .. versionadded:: 3.21.0 

53 

54 """ 

55 

56 def __init__( # noqa: PLR0913 

57 self, 

58 lock_file: str | os.PathLike[str], 

59 timeout: float = -1, 

60 *, 

61 blocking: bool = True, 

62 is_singleton: bool = True, 

63 loop: asyncio.AbstractEventLoop | None = None, 

64 executor: futures.Executor | None = None, 

65 ) -> None: 

66 self._lock = ReadWriteLock(lock_file, timeout, blocking=blocking, is_singleton=is_singleton) 

67 self._loop = loop 

68 self._executor = executor 

69 

70 @property 

71 def lock_file(self) -> str: 

72 """:returns: the path to the lock file.""" 

73 return self._lock.lock_file 

74 

75 @property 

76 def timeout(self) -> float: 

77 """:returns: the default timeout.""" 

78 return self._lock.timeout 

79 

80 @property 

81 def blocking(self) -> bool: 

82 """:returns: whether blocking is enabled by default.""" 

83 return self._lock.blocking 

84 

85 @property 

86 def loop(self) -> asyncio.AbstractEventLoop | None: 

87 """:returns: the event loop (or ``None`` for the running loop).""" 

88 return self._loop 

89 

90 @property 

91 def executor(self) -> futures.Executor | None: 

92 """:returns: the executor (or ``None`` for the default).""" 

93 return self._executor 

94 

95 async def _run(self, func: Callable[..., object], *args: object, **kwargs: object) -> object: 

96 loop = self._loop or asyncio.get_running_loop() 

97 return await loop.run_in_executor(self._executor, functools.partial(func, *args, **kwargs)) 

98 

99 async def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AsyncAcquireReadWriteReturnProxy: 

100 """ 

101 Acquire a shared read lock. 

102 

103 See :meth:`ReadWriteLock.acquire_read` for full semantics. 

104 

105 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

106 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

107 

108 :returns: a proxy that can be used as an async context manager to release the lock 

109 

110 :raises RuntimeError: if a write lock is already held on this instance 

111 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

112 

113 """ 

114 await self._run(self._lock.acquire_read, timeout, blocking=blocking) 

115 return AsyncAcquireReadWriteReturnProxy(lock=self) 

116 

117 async def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AsyncAcquireReadWriteReturnProxy: 

118 """ 

119 Acquire an exclusive write lock. 

120 

121 See :meth:`ReadWriteLock.acquire_write` for full semantics. 

122 

123 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

124 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable 

125 

126 :returns: a proxy that can be used as an async context manager to release the lock 

127 

128 :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread 

129 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

130 

131 """ 

132 await self._run(self._lock.acquire_write, timeout, blocking=blocking) 

133 return AsyncAcquireReadWriteReturnProxy(lock=self) 

134 

135 async def release(self, *, force: bool = False) -> None: 

136 """ 

137 Release one level of the current lock. 

138 

139 See :meth:`ReadWriteLock.release` for full semantics. 

140 

141 :param force: if ``True``, release the lock completely regardless of the current lock level 

142 

143 :raises RuntimeError: if no lock is currently held and *force* is ``False`` 

144 

145 """ 

146 await self._run(self._lock.release, force=force) 

147 

148 @asynccontextmanager 

149 async def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]: 

150 """ 

151 Async context manager that acquires and releases a shared read lock. 

152 

153 Falls back to instance defaults for *timeout* and *blocking* when ``None``. 

154 

155 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

156 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

157 

158 """ 

159 if timeout is None: 

160 timeout = self._lock.timeout 

161 if blocking is None: 

162 blocking = self._lock.blocking 

163 await self.acquire_read(timeout, blocking=blocking) 

164 try: 

165 yield 

166 finally: 

167 await self.release() 

168 

169 @asynccontextmanager 

170 async def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]: 

171 """ 

172 Async context manager that acquires and releases an exclusive write lock. 

173 

174 Falls back to instance defaults for *timeout* and *blocking* when ``None``. 

175 

176 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

177 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

178 

179 """ 

180 if timeout is None: 

181 timeout = self._lock.timeout 

182 if blocking is None: 

183 blocking = self._lock.blocking 

184 await self.acquire_write(timeout, blocking=blocking) 

185 try: 

186 yield 

187 finally: 

188 await self.release() 

189 

190 async def close(self) -> None: 

191 """ 

192 Release the lock (if held) and close the underlying SQLite connection. 

193 

194 After calling this method, the lock instance is no longer usable. 

195 

196 """ 

197 await self._run(self._lock.close) 

198 

199 

200__all__ = [ 

201 "AsyncAcquireReadWriteReturnProxy", 

202 "AsyncReadWriteLock", 

203]