Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/filelock/asyncio.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

119 statements  

1"""An asyncio-based implementation of the file lock.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import contextlib 

7import logging 

8import os 

9import time 

10from dataclasses import dataclass 

11from inspect import iscoroutinefunction 

12from threading import local 

13from typing import TYPE_CHECKING, Any, NoReturn, cast 

14 

15from ._api import _UNSET_FILE_MODE, BaseFileLock, FileLockContext, FileLockMeta 

16from ._error import Timeout 

17from ._soft import SoftFileLock 

18from ._unix import UnixFileLock 

19from ._windows import WindowsFileLock 

20 

21if TYPE_CHECKING: 

22 import sys 

23 from collections.abc import Callable 

24 from concurrent import futures 

25 from types import TracebackType 

26 

27 if sys.version_info >= (3, 11): # pragma: no cover (py311+) 

28 from typing import Self 

29 else: # pragma: no cover (<py311) 

30 from typing_extensions import Self 

31 

32 

33_LOGGER = logging.getLogger("filelock") 

34 

35 

36@dataclass 

37class AsyncFileLockContext(FileLockContext): 

38 """A dataclass which holds the context for a ``BaseAsyncFileLock`` object.""" 

39 

40 #: Whether run in executor 

41 run_in_executor: bool = True 

42 

43 #: The executor 

44 executor: futures.Executor | None = None 

45 

46 #: The loop 

47 loop: asyncio.AbstractEventLoop | None = None 

48 

49 

50class AsyncThreadLocalFileContext(AsyncFileLockContext, local): 

51 """A thread local version of the ``FileLockContext`` class.""" 

52 

53 

54class AsyncAcquireReturnProxy: 

55 """A context-aware object that will release the lock file when exiting.""" 

56 

57 def __init__(self, lock: BaseAsyncFileLock) -> None: # noqa: D107 

58 self.lock = lock 

59 

60 async def __aenter__(self) -> BaseAsyncFileLock: # noqa: D105 

61 return self.lock 

62 

63 async def __aexit__( # noqa: D105 

64 self, 

65 exc_type: type[BaseException] | None, 

66 exc_value: BaseException | None, 

67 traceback: TracebackType | None, 

68 ) -> None: 

69 await self.lock.release() 

70 

71 

72class AsyncFileLockMeta(FileLockMeta): 

73 def __call__( # ty: ignore[invalid-method-override] # noqa: PLR0913 

74 cls, # noqa: N805 

75 lock_file: str | os.PathLike[str], 

76 timeout: float = -1, 

77 mode: int = _UNSET_FILE_MODE, 

78 thread_local: bool = False, # noqa: FBT001, FBT002 

79 *, 

80 blocking: bool = True, 

81 is_singleton: bool = False, 

82 poll_interval: float = 0.05, 

83 lifetime: float | None = None, 

84 loop: asyncio.AbstractEventLoop | None = None, 

85 run_in_executor: bool = True, 

86 executor: futures.Executor | None = None, 

87 ) -> BaseAsyncFileLock: 

88 if thread_local and run_in_executor: 

89 msg = "run_in_executor is not supported when thread_local is True" 

90 raise ValueError(msg) 

91 instance = super().__call__( 

92 lock_file=lock_file, 

93 timeout=timeout, 

94 mode=mode, 

95 thread_local=thread_local, 

96 blocking=blocking, 

97 is_singleton=is_singleton, 

98 poll_interval=poll_interval, 

99 lifetime=lifetime, 

100 loop=loop, 

101 run_in_executor=run_in_executor, 

102 executor=executor, 

103 ) 

104 return cast("BaseAsyncFileLock", instance) 

105 

106 

107class BaseAsyncFileLock(BaseFileLock, metaclass=AsyncFileLockMeta): 

108 """ 

109 Base class for asynchronous file locks. 

110 

111 .. versionadded:: 3.15.0 

112 

113 """ 

114 

115 def __init__( # noqa: PLR0913 

116 self, 

117 lock_file: str | os.PathLike[str], 

118 timeout: float = -1, 

119 mode: int = _UNSET_FILE_MODE, 

120 thread_local: bool = False, # noqa: FBT001, FBT002 

121 *, 

122 blocking: bool = True, 

123 is_singleton: bool = False, 

124 poll_interval: float = 0.05, 

125 lifetime: float | None = None, 

126 loop: asyncio.AbstractEventLoop | None = None, 

127 run_in_executor: bool = True, 

128 executor: futures.Executor | None = None, 

129 ) -> None: 

130 """ 

131 Create a new lock object. 

132 

133 :param lock_file: path to the file 

134 :param timeout: default timeout when acquiring the lock, in seconds. It will be used as fallback value in the 

135 acquire method, if no timeout value (``None``) is given. If you want to disable the timeout, set it to a 

136 negative value. A timeout of 0 means that there is exactly one attempt to acquire the file lock. 

137 :param mode: file permissions for the lockfile. When not specified, the OS controls permissions via umask and 

138 default ACLs, preserving POSIX default ACL inheritance in shared directories. 

139 :param thread_local: Whether this object's internal context should be thread local or not. If this is set to 

140 ``False`` then the lock will be reentrant across threads. 

141 :param blocking: whether the lock should be blocking or not 

142 :param is_singleton: If this is set to ``True`` then only one instance of this class will be created per lock 

143 file. This is useful if you want to use the lock object for reentrant locking without needing to pass the 

144 same object around. 

145 :param poll_interval: default interval for polling the lock file, in seconds. It will be used as fallback value 

146 in the acquire method, if no poll_interval value (``None``) is given. 

147 :param lifetime: maximum time in seconds a lock can be held before it is considered expired. When set, a waiting 

148 process will break a lock whose file modification time is older than ``lifetime`` seconds. ``None`` (the 

149 default) means locks never expire. 

150 :param loop: The event loop to use. If not specified, the running event loop will be used. 

151 :param run_in_executor: If this is set to ``True`` then the lock will be acquired in an executor. 

152 :param executor: The executor to use. If not specified, the default executor will be used. 

153 

154 """ 

155 self._is_thread_local = thread_local 

156 self._is_singleton = is_singleton 

157 

158 # Create the context. Note that external code should not work with the context directly and should instead use 

159 # properties of this class. 

160 kwargs: dict[str, Any] = { 

161 "lock_file": os.fspath(lock_file), 

162 "timeout": timeout, 

163 "mode": mode, 

164 "blocking": blocking, 

165 "poll_interval": poll_interval, 

166 "lifetime": lifetime, 

167 "loop": loop, 

168 "run_in_executor": run_in_executor, 

169 "executor": executor, 

170 } 

171 self._context: AsyncFileLockContext = (AsyncThreadLocalFileContext if thread_local else AsyncFileLockContext)( 

172 **kwargs 

173 ) 

174 

175 @property 

176 def run_in_executor(self) -> bool: 

177 """:returns: whether run in executor.""" 

178 return self._context.run_in_executor 

179 

180 @property 

181 def executor(self) -> futures.Executor | None: 

182 """:returns: the executor.""" 

183 return self._context.executor 

184 

185 @executor.setter 

186 def executor(self, value: futures.Executor | None) -> None: # pragma: no cover 

187 """ 

188 Change the executor. 

189 

190 :param futures.Executor | None value: the new executor or ``None`` 

191 

192 """ 

193 self._context.executor = value 

194 

195 @property 

196 def loop(self) -> asyncio.AbstractEventLoop | None: 

197 """:returns: the event loop.""" 

198 return self._context.loop 

199 

200 async def acquire( # ty: ignore[invalid-method-override] 

201 self, 

202 timeout: float | None = None, 

203 poll_interval: float | None = None, 

204 *, 

205 blocking: bool | None = None, 

206 cancel_check: Callable[[], bool] | None = None, 

207 ) -> AsyncAcquireReturnProxy: 

208 """ 

209 Try to acquire the file lock. 

210 

211 :param timeout: maximum wait time for acquiring the lock, ``None`` means use the default 

212 :attr:`~BaseFileLock.timeout` is and if ``timeout < 0``, there is no timeout and this method will block 

213 until the lock could be acquired 

214 :param poll_interval: interval of trying to acquire the lock file, ``None`` means use the default 

215 :attr:`~BaseFileLock.poll_interval` 

216 :param blocking: defaults to True. If False, function will return immediately if it cannot obtain a lock on the 

217 first attempt. Otherwise, this method will block until the timeout expires or the lock is acquired. 

218 :param cancel_check: a callable returning ``True`` when the acquisition should be canceled. Checked on each poll 

219 iteration. When triggered, raises :class:`~Timeout` just like an expired timeout. 

220 

221 :returns: a context object that will unlock the file when the context is exited 

222 

223 :raises Timeout: if fails to acquire lock within the timeout period 

224 

225 .. code-block:: python 

226 

227 # You can use this method in the context manager (recommended) 

228 with lock.acquire(): 

229 pass 

230 

231 # Or use an equivalent try-finally construct: 

232 lock.acquire() 

233 try: 

234 pass 

235 finally: 

236 lock.release() 

237 

238 """ 

239 # Use the default timeout, if no timeout is provided. 

240 if timeout is None: 

241 timeout = self._context.timeout 

242 

243 if blocking is None: 

244 blocking = self._context.blocking 

245 

246 if poll_interval is None: 

247 poll_interval = self._context.poll_interval 

248 

249 # Increment the number right at the beginning. We can still undo it, if something fails. 

250 self._context.lock_counter += 1 

251 

252 lock_id = id(self) 

253 lock_filename = self.lock_file 

254 start_time = time.perf_counter() 

255 try: 

256 while True: 

257 if not self.is_locked: 

258 self._try_break_expired_lock() 

259 _LOGGER.debug("Attempting to acquire lock %s on %s", lock_id, lock_filename) 

260 await self._run_internal_method(self._acquire) 

261 if self.is_locked: 

262 _LOGGER.debug("Lock %s acquired on %s", lock_id, lock_filename) 

263 break 

264 if self._check_give_up( 

265 lock_id, 

266 lock_filename, 

267 blocking=blocking, 

268 cancel_check=cancel_check, 

269 timeout=timeout, 

270 start_time=start_time, 

271 ): 

272 raise Timeout(lock_filename) # noqa: TRY301 

273 msg = "Lock %s not acquired on %s, waiting %s seconds ..." 

274 _LOGGER.debug(msg, lock_id, lock_filename, poll_interval) 

275 await asyncio.sleep(poll_interval) 

276 except BaseException: # Something did go wrong, so decrement the counter. 

277 self._context.lock_counter = max(0, self._context.lock_counter - 1) 

278 raise 

279 return AsyncAcquireReturnProxy(lock=self) 

280 

281 async def release(self, force: bool = False) -> None: # ty: ignore[invalid-method-override] # noqa: FBT001, FBT002 

282 """ 

283 Release the file lock. The lock is only completely released when the lock counter reaches 0. The lock file 

284 itself is not automatically deleted. 

285 

286 :param force: If true, the lock counter is ignored and the lock is released in every case. 

287 

288 """ 

289 if self.is_locked: 

290 self._context.lock_counter -= 1 

291 

292 if self._context.lock_counter == 0 or force: 

293 lock_id, lock_filename = id(self), self.lock_file 

294 

295 _LOGGER.debug("Attempting to release lock %s on %s", lock_id, lock_filename) 

296 await self._run_internal_method(self._release) 

297 self._context.lock_counter = 0 

298 _LOGGER.debug("Lock %s released on %s", lock_id, lock_filename) 

299 

300 async def _run_internal_method(self, method: Callable[[], Any]) -> None: 

301 if iscoroutinefunction(method): 

302 await method() 

303 elif self.run_in_executor: 

304 loop = self.loop or asyncio.get_running_loop() 

305 await loop.run_in_executor(self.executor, method) 

306 else: 

307 method() 

308 

309 def __enter__(self) -> NoReturn: 

310 """ 

311 Replace old __enter__ method to avoid using it. 

312 

313 NOTE: DO NOT USE `with` FOR ASYNCIO LOCKS, USE `async with` INSTEAD. 

314 

315 :returns: none 

316 :rtype: NoReturn 

317 

318 """ 

319 msg = "Do not use `with` for asyncio locks, use `async with` instead." 

320 raise NotImplementedError(msg) 

321 

322 async def __aenter__(self) -> Self: 

323 """ 

324 Acquire the lock. 

325 

326 :returns: the lock object 

327 

328 """ 

329 await self.acquire() 

330 return self 

331 

332 async def __aexit__( 

333 self, 

334 exc_type: type[BaseException] | None, 

335 exc_value: BaseException | None, 

336 traceback: TracebackType | None, 

337 ) -> None: 

338 """ 

339 Release the lock. 

340 

341 :param exc_type: the exception type if raised 

342 :param exc_value: the exception value if raised 

343 :param traceback: the exception traceback if raised 

344 

345 """ 

346 await self.release() 

347 

348 def __del__(self) -> None: 

349 """Called when the lock object is deleted.""" 

350 with contextlib.suppress(RuntimeError): 

351 loop = self.loop or asyncio.get_running_loop() 

352 if not loop.is_running(): # pragma: no cover 

353 loop.run_until_complete(self.release(force=True)) 

354 else: 

355 loop.create_task(self.release(force=True)) 

356 

357 

358class AsyncSoftFileLock(SoftFileLock, BaseAsyncFileLock): 

359 """Simply watches the existence of the lock file.""" 

360 

361 

362class AsyncUnixFileLock(UnixFileLock, BaseAsyncFileLock): 

363 """Uses the :func:`fcntl.flock` to hard lock the lock file on unix systems.""" 

364 

365 

366class AsyncWindowsFileLock(WindowsFileLock, BaseAsyncFileLock): 

367 """Uses the :func:`msvcrt.locking` to hard lock the lock file on windows systems.""" 

368 

369 

370__all__ = [ 

371 "AsyncAcquireReturnProxy", 

372 "AsyncSoftFileLock", 

373 "AsyncUnixFileLock", 

374 "AsyncWindowsFileLock", 

375 "BaseAsyncFileLock", 

376]