Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/wrapt/synchronization.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

202 statements  

1"""Synchronization decorators and calling-convention markers/bridges. 

2 

3Provides ``synchronized`` for thread and async locking, ``mark_as_sync`` 

4and ``mark_as_async`` for declaring the effective calling convention of a 

5wrapped callable (without converting it), and ``async_to_sync`` / 

6``sync_to_async`` for bridging between the two. 

7""" 

8 

9import asyncio 

10import sys 

11from functools import partial 

12from inspect import ( 

13 CO_ASYNC_GENERATOR, 

14 CO_COROUTINE, 

15 CO_GENERATOR, 

16 CO_ITERABLE_COROUTINE, 

17 iscoroutinefunction, 

18) 

19from threading import Lock, RLock 

20 

21from .__wrapt__ import BoundFunctionWrapper, CallableObjectProxy, FunctionWrapper 

22from .decorators import decorator 

23 

24# Calling-convention marker wrappers. These manipulate __code__.co_flags 

25# so that inspect.iscoroutinefunction() reports the intended calling 

26# convention, which lets stdlib code and the synchronized() decorator 

27# auto-select the correct sync or async wrapping behaviour even when 

28# stacked decorators change the effective convention (for example an 

29# inner decorator that invokes an async def via asyncio.run()). 

30 

31 

32class _SyncCodeProxy(CallableObjectProxy): 

33 

34 def __init__(self, wrapped, generator=None): 

35 super().__init__(wrapped) 

36 self._self_generator = generator 

37 

38 @property 

39 def co_flags(self): 

40 original = self.__wrapped__.co_flags 

41 # Strip async-axis and iterable-coroutine bits; sync means neither 

42 # coroutine function nor async generator nor types.coroutine-style. 

43 flags = original & ~(CO_COROUTINE | CO_ASYNC_GENERATOR | CO_ITERABLE_COROUTINE) 

44 if self._self_generator is True: 

45 flags |= CO_GENERATOR 

46 elif self._self_generator is False: 

47 flags &= ~CO_GENERATOR 

48 else: 

49 # Auto: if input was an async generator, preserve generator-ness 

50 # on the sync side by setting CO_GENERATOR. Otherwise leave 

51 # CO_GENERATOR as-is (already copied from the wrapped flags). 

52 if original & CO_ASYNC_GENERATOR: 

53 flags |= CO_GENERATOR 

54 return flags 

55 

56 

57class _SyncFunctionSurrogate(CallableObjectProxy): 

58 

59 def __init__(self, wrapped, generator=None): 

60 super().__init__(wrapped) 

61 self._self_generator = generator 

62 

63 @property 

64 def __code__(self): 

65 return _SyncCodeProxy(self.__wrapped__.__code__, self._self_generator) 

66 

67 

68class _BoundSyncFunctionWrapper(BoundFunctionWrapper): 

69 

70 def __init__(self, *args, **kwargs): 

71 super().__init__(*args, **kwargs) 

72 self._self_is_not_coroutine = True 

73 

74 @property 

75 def __func__(self): 

76 return _SyncFunctionSurrogate( 

77 self.__wrapped__.__func__, self._self_parent._self_generator 

78 ) 

79 

80 

81class _SyncFunctionWrapper(FunctionWrapper): 

82 

83 __bound_function_wrapper__ = _BoundSyncFunctionWrapper 

84 

85 def __init__(self, wrapped, wrapper, generator=None): 

86 super().__init__(wrapped, wrapper) 

87 self._self_is_not_coroutine = True 

88 self._self_generator = generator 

89 

90 @property 

91 def __code__(self): 

92 return _SyncCodeProxy(self.__wrapped__.__code__, self._self_generator) 

93 

94 

95class _AsyncCodeProxy(CallableObjectProxy): 

96 

97 def __init__(self, wrapped, generator=None): 

98 super().__init__(wrapped) 

99 self._self_generator = generator 

100 

101 @property 

102 def co_flags(self): 

103 original = self.__wrapped__.co_flags 

104 # Strip all four convention bits; we reassert the right ones below. 

105 flags = original & ~( 

106 CO_GENERATOR | CO_COROUTINE | CO_ITERABLE_COROUTINE | CO_ASYNC_GENERATOR 

107 ) 

108 if self._self_generator is True: 

109 flags |= CO_ASYNC_GENERATOR 

110 elif self._self_generator is False: 

111 flags |= CO_COROUTINE 

112 else: 

113 # Auto: if input was a generator (sync or async), produce an 

114 # async generator; otherwise produce a coroutine function. 

115 if original & (CO_GENERATOR | CO_ASYNC_GENERATOR): 

116 flags |= CO_ASYNC_GENERATOR 

117 else: 

118 flags |= CO_COROUTINE 

119 return flags 

120 

121 

122class _AsyncFunctionSurrogate(CallableObjectProxy): 

123 

124 def __init__(self, wrapped, generator=None): 

125 super().__init__(wrapped) 

126 self._self_generator = generator 

127 

128 @property 

129 def __code__(self): 

130 return _AsyncCodeProxy(self.__wrapped__.__code__, self._self_generator) 

131 

132 

133class _BoundAsyncFunctionWrapper(BoundFunctionWrapper): 

134 

135 @property 

136 def __func__(self): 

137 return _AsyncFunctionSurrogate( 

138 self.__wrapped__.__func__, self._self_parent._self_generator 

139 ) 

140 

141 

142class _AsyncFunctionWrapper(FunctionWrapper): 

143 

144 __bound_function_wrapper__ = _BoundAsyncFunctionWrapper 

145 

146 def __init__(self, wrapped, wrapper, generator=None): 

147 super().__init__(wrapped, wrapper) 

148 self._self_generator = generator 

149 

150 @property 

151 def __code__(self): 

152 return _AsyncCodeProxy(self.__wrapped__.__code__, self._self_generator) 

153 

154 

155def mark_as_sync(wrapped=None, /, *, generator=None): 

156 """Mark a callable as synchronous from the perspective of calling 

157 convention detection. The returned wrapper is a pass-through that 

158 reports `inspect.iscoroutinefunction()` as False regardless of 

159 whether the underlying callable is declared `async def`. Useful 

160 when a stacked decorator has already collapsed an async function 

161 into a synchronous one (for example by using `asyncio.run()`). 

162 

163 The `generator` keyword toggles the sync generator bit 

164 (`CO_GENERATOR`) on the resulting wrapper. Tri-state: 

165 

166 - `None` (default): auto. Preserve generator-ness from the input -- 

167 if the input was an async generator, the wrapper reports as a sync 

168 generator; otherwise CO_GENERATOR is copied through unchanged. 

169 - `True`: force CO_GENERATOR on. Wrapper reports as a sync generator. 

170 - `False`: force CO_GENERATOR off. Wrapper reports as a plain sync 

171 function even if the input had CO_GENERATOR set. 

172 

173 Regardless of `generator`, CO_COROUTINE, CO_ASYNC_GENERATOR, and 

174 CO_ITERABLE_COROUTINE are all cleared (sync means none of those). 

175 """ 

176 

177 def _decorator(wrapped): 

178 def _wrapper(wrapped, instance, args, kwargs): 

179 return wrapped(*args, **kwargs) 

180 

181 return _SyncFunctionWrapper(wrapped, _wrapper, generator=generator) 

182 

183 if wrapped is None: 

184 return _decorator 

185 return _decorator(wrapped) 

186 

187 

188def mark_as_async(wrapped=None, /, *, generator=None): 

189 """Mark a callable as asynchronous from the perspective of calling 

190 convention detection. The returned wrapper reports 

191 `inspect.iscoroutinefunction()` as True regardless of whether the 

192 underlying callable is declared `async def`. Useful when a stacked 

193 decorator returns a coroutine from a plain `def` wrapper. 

194 

195 The `generator` keyword chooses between coroutine function and 

196 async generator reporting. Tri-state: 

197 

198 - `None` (default): auto. If the input was a sync or async 

199 generator, the wrapper reports as an async generator 

200 (`CO_ASYNC_GENERATOR`); otherwise it reports as a coroutine 

201 function (`CO_COROUTINE`). 

202 - `True`: force async generator reporting (`CO_ASYNC_GENERATOR` set, 

203 `CO_COROUTINE` cleared). These two flags are mutually exclusive at 

204 the CPython code-object level. 

205 - `False`: force coroutine function reporting (`CO_COROUTINE` set, 

206 `CO_ASYNC_GENERATOR` cleared). 

207 

208 CO_GENERATOR and CO_ITERABLE_COROUTINE are always cleared (the 

209 async path does not use either). 

210 """ 

211 

212 async def _wrapper(wrapped, instance, args, kwargs): 

213 return wrapped(*args, **kwargs) 

214 

215 def _decorator(wrapped): 

216 return _AsyncFunctionWrapper(wrapped, _wrapper, generator=generator) 

217 

218 if wrapped is None: 

219 return _decorator 

220 return _decorator(wrapped) 

221 

222 

223def async_to_sync(wrapped): 

224 """Adapt an async callable so it can be called synchronously. Each 

225 call runs the coroutine to completion via `asyncio.run()`. The 

226 returned wrapper reports as synchronous under 

227 `inspect.iscoroutinefunction()`. Naming follows the asgiref 

228 convention.""" 

229 

230 def wrapper(wrapped, instance, args, kwargs): 

231 return asyncio.run(wrapped(*args, **kwargs)) 

232 

233 return _SyncFunctionWrapper(wrapped, wrapper) 

234 

235 

236def sync_to_async(wrapped): 

237 """Adapt a sync callable so it can be awaited. Each call dispatches 

238 the synchronous work to the default executor via 

239 `loop.run_in_executor()`. The returned wrapper reports as 

240 asynchronous under `inspect.iscoroutinefunction()`. Naming follows 

241 the asgiref convention.""" 

242 

243 async def wrapper(wrapped, instance, args, kwargs): 

244 loop = asyncio.get_running_loop() 

245 return await loop.run_in_executor(None, partial(wrapped, *args, **kwargs)) 

246 

247 return _AsyncFunctionWrapper(wrapped, wrapper) 

248 

249 

250def _synchronized_is_async_lock(obj): 

251 return iscoroutinefunction(getattr(obj, "acquire", None)) 

252 

253 

254def _synchronized_is_async_callable(obj): 

255 # Walk the __wrapped__ chain, returning True as soon as any layer 

256 # declares itself a coroutine function. A sync marker wrapper can 

257 # carry an authoritative `_self_is_not_coroutine` attribute that 

258 # short-circuits the walk before it descends into a genuinely 

259 # async inner layer. Cycle / runaway-chain protection modelled on 

260 # inspect.unwrap(). 

261 

262 memo = {id(obj): obj} 

263 recursion_limit = sys.getrecursionlimit() 

264 target = obj 

265 

266 while True: 

267 if isinstance(target, (classmethod, staticmethod)): 

268 inner = getattr(target, "__wrapped__", None) 

269 if inner is None: 

270 inner = target.__func__ 

271 target = inner 

272 id_target = id(target) 

273 if id_target in memo or len(memo) >= recursion_limit: 

274 raise ValueError("wrapper loop when unwrapping {!r}".format(obj)) 

275 memo[id_target] = target 

276 continue 

277 

278 if getattr(target, "_self_is_not_coroutine", False): 

279 return False 

280 

281 if iscoroutinefunction(target): 

282 return True 

283 

284 next_target = getattr(target, "__wrapped__", None) 

285 if next_target is None or next_target is target: 

286 return False 

287 target = next_target 

288 id_target = id(target) 

289 if id_target in memo or len(memo) >= recursion_limit: 

290 raise ValueError("wrapper loop when unwrapping {!r}".format(obj)) 

291 memo[id_target] = target 

292 

293 

294# Decorator for implementing thread synchronization. It can be used as a 

295# decorator, in which case the synchronization context is determined by 

296# what type of function is wrapped, or it can also be used as a context 

297# manager, where the user needs to supply the correct synchronization 

298# context. It is also possible to supply an object which appears to be a 

299# synchronization primitive of some sort, by virtue of having release() 

300# and acquire() methods. In that case that will be used directly as the 

301# synchronization primitive without creating a separate lock against the 

302# derived or supplied context. 

303 

304 

305def synchronized(wrapped): 

306 """Depending on the nature of the `wrapped` object, will either return a 

307 decorator which can be used to wrap a function or method, or a context 

308 manager, both of which will act accordingly depending on how used, to 

309 synchronize access to calling of the wrapped function, or the block of 

310 code within the context manager. If it is an object which is a 

311 synchronization primitive, such as a threading Lock, RLock, Semaphore, 

312 Condition, or Event, then it is assumed that the object is to be used 

313 directly as the synchronization primitive, otherwise a lock is created 

314 automatically and attached to the wrapped object and used as the 

315 synchronization primitive. 

316 

317 Async functions are supported: if the wrapped callable is an async 

318 function, an `asyncio.Lock` is created for the context and the wrapper 

319 awaits the lock. The returned object also exposes `__aenter__` and 

320 `__aexit__` so it can be used with `async with` to synchronise a block 

321 of code using an independent per-context `asyncio.Lock`. If an object 

322 with coroutine `acquire`/`release` methods (such as an `asyncio.Lock`) 

323 is supplied directly, the returned decorator and context manager will 

324 use it via the async protocol. 

325 """ 

326 

327 # Determine if being passed an object which is a synchronization 

328 # primitive. We can't check by type for Lock, RLock, Semaphore etc, 

329 # as the means of creating them isn't the type. Therefore use the 

330 # existence of acquire() and release() methods. This is more 

331 # extensible anyway as it allows custom synchronization mechanisms. 

332 

333 if hasattr(wrapped, "acquire") and hasattr(wrapped, "release"): 

334 # We remember what the original lock is and then return a new 

335 # decorator which accesses and locks it. When returning the new 

336 # decorator we wrap it with an object proxy so we can override 

337 # the context manager methods in case it is being used to wrap 

338 # synchronized statements with a 'with' statement. 

339 

340 lock = wrapped 

341 

342 if _synchronized_is_async_lock(lock): 

343 

344 @decorator 

345 async def _synchronized(wrapped, instance, args, kwargs): 

346 async with lock: 

347 return await wrapped(*args, **kwargs) 

348 

349 class _AsyncSynchronizedLockProxy(CallableObjectProxy): 

350 

351 async def __aenter__(self): 

352 await lock.acquire() 

353 return lock 

354 

355 async def __aexit__(self, *args): 

356 lock.release() 

357 

358 return _AsyncSynchronizedLockProxy(wrapped=_synchronized) 

359 

360 @decorator 

361 def _synchronized(wrapped, instance, args, kwargs): 

362 # Execute the wrapped function while the original supplied 

363 # lock is held. 

364 

365 with lock: 

366 return wrapped(*args, **kwargs) 

367 

368 class _SynchronizedLockProxy(CallableObjectProxy): 

369 

370 def __enter__(self): 

371 lock.acquire() 

372 return lock 

373 

374 def __exit__(self, *args): 

375 lock.release() 

376 

377 return _SynchronizedLockProxy(wrapped=_synchronized) 

378 

379 # Following only apply when the lock is being created automatically 

380 # based on the context of what was supplied. In this case we supply 

381 # a final decorator, but need to use FunctionWrapper directly as we 

382 # want to derive from it to add context manager methods in case it is 

383 # being used to wrap synchronized statements with a 'with' statement. 

384 

385 def _synchronized_lock(context): 

386 # Attempt to retrieve the lock for the specific context. 

387 

388 lock = vars(context).get("_synchronized_lock", None) 

389 

390 if lock is None: 

391 # There is no existing lock defined for the context we 

392 # are dealing with so we need to create one. This needs 

393 # to be done in a way to guarantee there is only one 

394 # created, even if multiple threads try and create it at 

395 # the same time. We can't always use the setdefault() 

396 # method on the __dict__ for the context. This is the 

397 # case where the context is a class, as __dict__ is 

398 # actually a dictproxy. What we therefore do is use a 

399 # meta lock on this wrapper itself, to control the 

400 # creation and assignment of the lock attribute against 

401 # the context. 

402 

403 with synchronized._synchronized_meta_lock: 

404 # We need to check again for whether the lock we want 

405 # exists in case two threads were trying to create it 

406 # at the same time and were competing to create the 

407 # meta lock. 

408 

409 lock = vars(context).get("_synchronized_lock", None) 

410 

411 if lock is None: 

412 lock = RLock() 

413 setattr(context, "_synchronized_lock", lock) 

414 

415 return lock 

416 

417 def _synchronized_async_lock(context): 

418 # Per-context asyncio.Lock, created lazily on first use. Created 

419 # under the shared meta lock so creation is safe across threads; 

420 # the meta lock is never held across an await. asyncio.Lock is 

421 # not reentrant. 

422 

423 lock = vars(context).get("_synchronized_async_lock", None) 

424 

425 if lock is None: 

426 with synchronized._synchronized_meta_lock: 

427 lock = vars(context).get("_synchronized_async_lock", None) 

428 

429 if lock is None: 

430 lock = asyncio.Lock() 

431 setattr(context, "_synchronized_async_lock", lock) 

432 

433 return lock 

434 

435 def _synchronized_wrapper(wrapped, instance, args, kwargs): 

436 # Execute the wrapped function while the lock for the 

437 # desired context is held. If instance is None then the 

438 # wrapped function is used as the context. 

439 

440 with _synchronized_lock(instance if instance is not None else wrapped): 

441 return wrapped(*args, **kwargs) 

442 

443 async def _synchronized_async_wrapper(wrapped, instance, args, kwargs): 

444 async with _synchronized_async_lock( 

445 instance if instance is not None else wrapped 

446 ): 

447 return await wrapped(*args, **kwargs) 

448 

449 class _SynchronizedFunctionWrapper(FunctionWrapper): 

450 

451 def __enter__(self): 

452 self._self_lock = _synchronized_lock(self.__wrapped__) 

453 self._self_lock.acquire() 

454 return self._self_lock 

455 

456 def __exit__(self, *args): 

457 self._self_lock.release() 

458 

459 async def __aenter__(self): 

460 self._self_async_lock = _synchronized_async_lock(self.__wrapped__) 

461 await self._self_async_lock.acquire() 

462 return self._self_async_lock 

463 

464 async def __aexit__(self, *args): 

465 self._self_async_lock.release() 

466 

467 if _synchronized_is_async_callable(wrapped): 

468 return _SynchronizedFunctionWrapper( 

469 wrapped=wrapped, wrapper=_synchronized_async_wrapper 

470 ) 

471 

472 return _SynchronizedFunctionWrapper(wrapped=wrapped, wrapper=_synchronized_wrapper) 

473 

474 

475synchronized._synchronized_meta_lock = Lock() # type: ignore[attr-defined]