Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/filelock/_soft_rw/_async.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

65 statements  

1"""Async wrapper around :class:`SoftReadWriteLock` for use with ``asyncio``.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import functools 

7from contextlib import asynccontextmanager 

8from typing import TYPE_CHECKING 

9 

10from ._sync import SoftReadWriteLock 

11 

12if TYPE_CHECKING: 

13 import os 

14 from collections.abc import AsyncGenerator, Callable 

15 from concurrent import futures 

16 from types import TracebackType 

17 

18 

19class AsyncAcquireSoftReadWriteReturnProxy: 

20 """Async context-aware object that releases an :class:`AsyncSoftReadWriteLock` on exit.""" 

21 

22 def __init__(self, lock: AsyncSoftReadWriteLock) -> None: 

23 self.lock = lock 

24 

25 async def __aenter__(self) -> AsyncSoftReadWriteLock: 

26 return self.lock 

27 

28 async def __aexit__( 

29 self, 

30 exc_type: type[BaseException] | None, 

31 exc_value: BaseException | None, 

32 traceback: TracebackType | None, 

33 ) -> None: 

34 await self.lock.release() 

35 

36 

37class AsyncSoftReadWriteLock: 

38 """ 

39 Async wrapper around :class:`SoftReadWriteLock` for ``asyncio`` applications. 

40 

41 The sync class's blocking filesystem operations run on a thread pool via ``loop.run_in_executor()``. 

42 Reentrancy, upgrade/downgrade rules, fork handling, heartbeat and TTL stale detection, and singleton 

43 behavior are delegated to the underlying :class:`SoftReadWriteLock`. 

44 

45 :param lock_file: path to the lock file; sidecar state/write/readers live next to it 

46 :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely 

47 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately on contention 

48 :param is_singleton: if ``True``, reuse existing :class:`SoftReadWriteLock` instances per resolved path 

49 :param heartbeat_interval: seconds between heartbeat refreshes; default 30 s 

50 :param stale_threshold: seconds of mtime inactivity before a marker is stale; defaults to ``3 * heartbeat_interval`` 

51 :param poll_interval: seconds between acquire retries under contention; default 0.25 s 

52 :param loop: event loop for ``run_in_executor``; ``None`` uses the running loop 

53 :param executor: executor for ``run_in_executor``; ``None`` uses the default executor 

54 

55 .. versionadded:: 3.27.0 

56 

57 """ 

58 

59 def __init__( # noqa: PLR0913 

60 self, 

61 lock_file: str | os.PathLike[str], 

62 timeout: float = -1, 

63 *, 

64 blocking: bool = True, 

65 is_singleton: bool = True, 

66 heartbeat_interval: float = 30.0, 

67 stale_threshold: float | None = None, 

68 poll_interval: float = 0.25, 

69 loop: asyncio.AbstractEventLoop | None = None, 

70 executor: futures.Executor | None = None, 

71 ) -> None: 

72 self._lock = SoftReadWriteLock( 

73 lock_file, 

74 timeout, 

75 blocking=blocking, 

76 is_singleton=is_singleton, 

77 heartbeat_interval=heartbeat_interval, 

78 stale_threshold=stale_threshold, 

79 poll_interval=poll_interval, 

80 ) 

81 self._loop = loop 

82 self._executor = executor 

83 

84 @property 

85 def lock_file(self) -> str: 

86 """:returns: the path to the lock file passed to the constructor.""" 

87 return self._lock.lock_file 

88 

89 @property 

90 def timeout(self) -> float: 

91 """:returns: the default timeout applied when ``acquire_read`` / ``acquire_write`` is called without one.""" 

92 return self._lock.timeout 

93 

94 @property 

95 def blocking(self) -> bool: 

96 """:returns: whether ``acquire_*`` defaults to blocking; ``False`` makes contention raise immediately.""" 

97 return self._lock.blocking 

98 

99 @property 

100 def loop(self) -> asyncio.AbstractEventLoop | None: 

101 """:returns: the event loop used for ``run_in_executor``, or ``None`` for the running loop.""" 

102 return self._loop 

103 

104 @property 

105 def executor(self) -> futures.Executor | None: 

106 """:returns: the executor used for ``run_in_executor``, or ``None`` for the default executor.""" 

107 return self._executor 

108 

109 async def acquire_read( 

110 self, timeout: float | None = None, *, blocking: bool | None = None 

111 ) -> AsyncAcquireSoftReadWriteReturnProxy: 

112 """ 

113 Acquire a shared read lock. 

114 

115 See :meth:`SoftReadWriteLock.acquire_read` for the full reentrancy / upgrade / fork semantics. The blocking 

116 work runs inside ``run_in_executor`` so other coroutines on the same loop continue to progress while this 

117 call waits. 

118 

119 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

120 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

121 

122 :returns: a proxy usable as an async context manager to release the lock 

123 

124 :raises RuntimeError: if a write lock is already held, if this instance was invalidated by 

125 :func:`os.fork`, or if :meth:`close` was called 

126 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

127 

128 """ 

129 await self._run(self._lock.acquire_read, timeout, blocking=blocking) 

130 return AsyncAcquireSoftReadWriteReturnProxy(lock=self) 

131 

132 async def acquire_write( 

133 self, timeout: float | None = None, *, blocking: bool | None = None 

134 ) -> AsyncAcquireSoftReadWriteReturnProxy: 

135 """ 

136 Acquire an exclusive write lock. 

137 

138 See :meth:`SoftReadWriteLock.acquire_write` for the two-phase writer-preferring semantics. The blocking 

139 work runs inside ``run_in_executor``. 

140 

141 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

142 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

143 

144 :returns: a proxy usable as an async context manager to release the lock 

145 

146 :raises RuntimeError: if a read lock is already held, if a write lock is held by a different thread, if 

147 this instance was invalidated by :func:`os.fork`, or if :meth:`close` was called 

148 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

149 

150 """ 

151 await self._run(self._lock.acquire_write, timeout, blocking=blocking) 

152 return AsyncAcquireSoftReadWriteReturnProxy(lock=self) 

153 

154 async def release(self, *, force: bool = False) -> None: 

155 """ 

156 Release one level of the current lock. 

157 

158 :param force: if ``True``, release the lock completely regardless of the current lock level 

159 

160 :raises RuntimeError: if no lock is currently held and *force* is ``False`` 

161 

162 """ 

163 await self._run(self._lock.release, force=force) 

164 

165 @asynccontextmanager 

166 async def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]: 

167 """ 

168 Async context manager that acquires and releases a shared read lock. 

169 

170 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

171 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

172 

173 :raises RuntimeError: if a write lock is already held on this instance 

174 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

175 

176 """ 

177 await self.acquire_read(timeout, blocking=blocking) 

178 try: 

179 yield 

180 finally: 

181 await self.release() 

182 

183 @asynccontextmanager 

184 async def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]: 

185 """ 

186 Async context manager that acquires and releases an exclusive write lock. 

187 

188 :param timeout: maximum wait time in seconds, or ``None`` to use the instance default 

189 :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default 

190 

191 :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread 

192 :raises Timeout: if the lock cannot be acquired within *timeout* seconds 

193 

194 """ 

195 await self.acquire_write(timeout, blocking=blocking) 

196 try: 

197 yield 

198 finally: 

199 await self.release() 

200 

201 async def close(self) -> None: 

202 """Release any held lock and release the underlying filesystem resources. Idempotent.""" 

203 await self._run(self._lock.close) 

204 

205 async def _run(self, func: Callable[..., object], *args: object, **kwargs: object) -> object: 

206 loop = self._loop or asyncio.get_running_loop() 

207 return await loop.run_in_executor(self._executor, functools.partial(func, *args, **kwargs)) 

208 

209 

210__all__ = [ 

211 "AsyncAcquireSoftReadWriteReturnProxy", 

212 "AsyncSoftReadWriteLock", 

213]