Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/local.py: 72%

275 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-09 07:17 +0000

1from __future__ import annotations 

2 

3import copy 

4import math 

5import operator 

6import typing as t 

7from contextvars import ContextVar 

8from functools import partial 

9from functools import update_wrapper 

10from operator import attrgetter 

11 

12from .wsgi import ClosingIterator 

13 

14if t.TYPE_CHECKING: 

15 from _typeshed.wsgi import StartResponse 

16 from _typeshed.wsgi import WSGIApplication 

17 from _typeshed.wsgi import WSGIEnvironment 

18 

19T = t.TypeVar("T") 

20F = t.TypeVar("F", bound=t.Callable[..., t.Any]) 

21 

22 

23def release_local(local: Local | LocalStack) -> None: 

24 """Release the data for the current context in a :class:`Local` or 

25 :class:`LocalStack` without using a :class:`LocalManager`. 

26 

27 This should not be needed for modern use cases, and may be removed 

28 in the future. 

29 

30 .. versionadded:: 0.6.1 

31 """ 

32 local.__release_local__() 

33 

34 

35class Local: 

36 """Create a namespace of context-local data. This wraps a 

37 :class:`ContextVar` containing a :class:`dict` value. 

38 

39 This may incur a performance penalty compared to using individual 

40 context vars, as it has to copy data to avoid mutating the dict 

41 between nested contexts. 

42 

43 :param context_var: The :class:`~contextvars.ContextVar` to use as 

44 storage for this local. If not given, one will be created. 

45 Context vars not created at the global scope may interfere with 

46 garbage collection. 

47 

48 .. versionchanged:: 2.0 

49 Uses ``ContextVar`` instead of a custom storage implementation. 

50 """ 

51 

52 __slots__ = ("__storage",) 

53 

54 def __init__(self, context_var: ContextVar[dict[str, t.Any]] | None = None) -> None: 

55 if context_var is None: 

56 # A ContextVar not created at global scope interferes with 

57 # Python's garbage collection. However, a local only makes 

58 # sense defined at the global scope as well, in which case 

59 # the GC issue doesn't seem relevant. 

60 context_var = ContextVar(f"werkzeug.Local<{id(self)}>.storage") 

61 

62 object.__setattr__(self, "_Local__storage", context_var) 

63 

64 def __iter__(self) -> t.Iterator[tuple[str, t.Any]]: 

65 return iter(self.__storage.get({}).items()) 

66 

67 def __call__(self, name: str, *, unbound_message: str | None = None) -> LocalProxy: 

68 """Create a :class:`LocalProxy` that access an attribute on this 

69 local namespace. 

70 

71 :param name: Proxy this attribute. 

72 :param unbound_message: The error message that the proxy will 

73 show if the attribute isn't set. 

74 """ 

75 return LocalProxy(self, name, unbound_message=unbound_message) 

76 

77 def __release_local__(self) -> None: 

78 self.__storage.set({}) 

79 

80 def __getattr__(self, name: str) -> t.Any: 

81 values = self.__storage.get({}) 

82 

83 if name in values: 

84 return values[name] 

85 

86 raise AttributeError(name) 

87 

88 def __setattr__(self, name: str, value: t.Any) -> None: 

89 values = self.__storage.get({}).copy() 

90 values[name] = value 

91 self.__storage.set(values) 

92 

93 def __delattr__(self, name: str) -> None: 

94 values = self.__storage.get({}) 

95 

96 if name in values: 

97 values = values.copy() 

98 del values[name] 

99 self.__storage.set(values) 

100 else: 

101 raise AttributeError(name) 

102 

103 

104class LocalStack(t.Generic[T]): 

105 """Create a stack of context-local data. This wraps a 

106 :class:`ContextVar` containing a :class:`list` value. 

107 

108 This may incur a performance penalty compared to using individual 

109 context vars, as it has to copy data to avoid mutating the list 

110 between nested contexts. 

111 

112 :param context_var: The :class:`~contextvars.ContextVar` to use as 

113 storage for this local. If not given, one will be created. 

114 Context vars not created at the global scope may interfere with 

115 garbage collection. 

116 

117 .. versionchanged:: 2.0 

118 Uses ``ContextVar`` instead of a custom storage implementation. 

119 

120 .. versionadded:: 0.6.1 

121 """ 

122 

123 __slots__ = ("_storage",) 

124 

125 def __init__(self, context_var: ContextVar[list[T]] | None = None) -> None: 

126 if context_var is None: 

127 # A ContextVar not created at global scope interferes with 

128 # Python's garbage collection. However, a local only makes 

129 # sense defined at the global scope as well, in which case 

130 # the GC issue doesn't seem relevant. 

131 context_var = ContextVar(f"werkzeug.LocalStack<{id(self)}>.storage") 

132 

133 self._storage = context_var 

134 

135 def __release_local__(self) -> None: 

136 self._storage.set([]) 

137 

138 def push(self, obj: T) -> list[T]: 

139 """Add a new item to the top of the stack.""" 

140 stack = self._storage.get([]).copy() 

141 stack.append(obj) 

142 self._storage.set(stack) 

143 return stack 

144 

145 def pop(self) -> T | None: 

146 """Remove the top item from the stack and return it. If the 

147 stack is empty, return ``None``. 

148 """ 

149 stack = self._storage.get([]) 

150 

151 if len(stack) == 0: 

152 return None 

153 

154 rv = stack[-1] 

155 self._storage.set(stack[:-1]) 

156 return rv 

157 

158 @property 

159 def top(self) -> T | None: 

160 """The topmost item on the stack. If the stack is empty, 

161 `None` is returned. 

162 """ 

163 stack = self._storage.get([]) 

164 

165 if len(stack) == 0: 

166 return None 

167 

168 return stack[-1] 

169 

170 def __call__( 

171 self, name: str | None = None, *, unbound_message: str | None = None 

172 ) -> LocalProxy: 

173 """Create a :class:`LocalProxy` that accesses the top of this 

174 local stack. 

175 

176 :param name: If given, the proxy access this attribute of the 

177 top item, rather than the item itself. 

178 :param unbound_message: The error message that the proxy will 

179 show if the stack is empty. 

180 """ 

181 return LocalProxy(self, name, unbound_message=unbound_message) 

182 

183 

184class LocalManager: 

185 """Manage releasing the data for the current context in one or more 

186 :class:`Local` and :class:`LocalStack` objects. 

187 

188 This should not be needed for modern use cases, and may be removed 

189 in the future. 

190 

191 :param locals: A local or list of locals to manage. 

192 

193 .. versionchanged:: 2.1 

194 The ``ident_func`` was removed. 

195 

196 .. versionchanged:: 0.7 

197 The ``ident_func`` parameter was added. 

198 

199 .. versionchanged:: 0.6.1 

200 The :func:`release_local` function can be used instead of a 

201 manager. 

202 """ 

203 

204 __slots__ = ("locals",) 

205 

206 def __init__( 

207 self, 

208 locals: None | (Local | LocalStack | t.Iterable[Local | LocalStack]) = None, 

209 ) -> None: 

210 if locals is None: 

211 self.locals = [] 

212 elif isinstance(locals, Local): 

213 self.locals = [locals] 

214 else: 

215 self.locals = list(locals) # type: ignore[arg-type] 

216 

217 def cleanup(self) -> None: 

218 """Release the data in the locals for this context. Call this at 

219 the end of each request or use :meth:`make_middleware`. 

220 """ 

221 for local in self.locals: 

222 release_local(local) 

223 

224 def make_middleware(self, app: WSGIApplication) -> WSGIApplication: 

225 """Wrap a WSGI application so that local data is released 

226 automatically after the response has been sent for a request. 

227 """ 

228 

229 def application( 

230 environ: WSGIEnvironment, start_response: StartResponse 

231 ) -> t.Iterable[bytes]: 

232 return ClosingIterator(app(environ, start_response), self.cleanup) 

233 

234 return application 

235 

236 def middleware(self, func: WSGIApplication) -> WSGIApplication: 

237 """Like :meth:`make_middleware` but used as a decorator on the 

238 WSGI application function. 

239 

240 .. code-block:: python 

241 

242 @manager.middleware 

243 def application(environ, start_response): 

244 ... 

245 """ 

246 return update_wrapper(self.make_middleware(func), func) 

247 

248 def __repr__(self) -> str: 

249 return f"<{type(self).__name__} storages: {len(self.locals)}>" 

250 

251 

252class _ProxyLookup: 

253 """Descriptor that handles proxied attribute lookup for 

254 :class:`LocalProxy`. 

255 

256 :param f: The built-in function this attribute is accessed through. 

257 Instead of looking up the special method, the function call 

258 is redone on the object. 

259 :param fallback: Return this function if the proxy is unbound 

260 instead of raising a :exc:`RuntimeError`. 

261 :param is_attr: This proxied name is an attribute, not a function. 

262 Call the fallback immediately to get the value. 

263 :param class_value: Value to return when accessed from the 

264 ``LocalProxy`` class directly. Used for ``__doc__`` so building 

265 docs still works. 

266 """ 

267 

268 __slots__ = ("bind_f", "fallback", "is_attr", "class_value", "name") 

269 

270 def __init__( 

271 self, 

272 f: t.Callable | None = None, 

273 fallback: t.Callable | None = None, 

274 class_value: t.Any | None = None, 

275 is_attr: bool = False, 

276 ) -> None: 

277 bind_f: t.Callable[[LocalProxy, t.Any], t.Callable] | None 

278 

279 if hasattr(f, "__get__"): 

280 # A Python function, can be turned into a bound method. 

281 

282 def bind_f(instance: LocalProxy, obj: t.Any) -> t.Callable: 

283 return f.__get__(obj, type(obj)) # type: ignore 

284 

285 elif f is not None: 

286 # A C function, use partial to bind the first argument. 

287 

288 def bind_f(instance: LocalProxy, obj: t.Any) -> t.Callable: 

289 return partial(f, obj) 

290 

291 else: 

292 # Use getattr, which will produce a bound method. 

293 bind_f = None 

294 

295 self.bind_f = bind_f 

296 self.fallback = fallback 

297 self.class_value = class_value 

298 self.is_attr = is_attr 

299 

300 def __set_name__(self, owner: LocalProxy, name: str) -> None: 

301 self.name = name 

302 

303 def __get__(self, instance: LocalProxy, owner: type | None = None) -> t.Any: 

304 if instance is None: 

305 if self.class_value is not None: 

306 return self.class_value 

307 

308 return self 

309 

310 try: 

311 obj = instance._get_current_object() 

312 except RuntimeError: 

313 if self.fallback is None: 

314 raise 

315 

316 fallback = self.fallback.__get__(instance, owner) 

317 

318 if self.is_attr: 

319 # __class__ and __doc__ are attributes, not methods. 

320 # Call the fallback to get the value. 

321 return fallback() 

322 

323 return fallback 

324 

325 if self.bind_f is not None: 

326 return self.bind_f(instance, obj) 

327 

328 return getattr(obj, self.name) 

329 

330 def __repr__(self) -> str: 

331 return f"proxy {self.name}" 

332 

333 def __call__(self, instance: LocalProxy, *args: t.Any, **kwargs: t.Any) -> t.Any: 

334 """Support calling unbound methods from the class. For example, 

335 this happens with ``copy.copy``, which does 

336 ``type(x).__copy__(x)``. ``type(x)`` can't be proxied, so it 

337 returns the proxy type and descriptor. 

338 """ 

339 return self.__get__(instance, type(instance))(*args, **kwargs) 

340 

341 

342class _ProxyIOp(_ProxyLookup): 

343 """Look up an augmented assignment method on a proxied object. The 

344 method is wrapped to return the proxy instead of the object. 

345 """ 

346 

347 __slots__ = () 

348 

349 def __init__( 

350 self, f: t.Callable | None = None, fallback: t.Callable | None = None 

351 ) -> None: 

352 super().__init__(f, fallback) 

353 

354 def bind_f(instance: LocalProxy, obj: t.Any) -> t.Callable: 

355 def i_op(self: t.Any, other: t.Any) -> LocalProxy: 

356 f(self, other) # type: ignore 

357 return instance 

358 

359 return i_op.__get__(obj, type(obj)) # type: ignore 

360 

361 self.bind_f = bind_f 

362 

363 

364def _l_to_r_op(op: F) -> F: 

365 """Swap the argument order to turn an l-op into an r-op.""" 

366 

367 def r_op(obj: t.Any, other: t.Any) -> t.Any: 

368 return op(other, obj) 

369 

370 return t.cast(F, r_op) 

371 

372 

373def _identity(o: T) -> T: 

374 return o 

375 

376 

377class LocalProxy(t.Generic[T]): 

378 """A proxy to the object bound to a context-local object. All 

379 operations on the proxy are forwarded to the bound object. If no 

380 object is bound, a ``RuntimeError`` is raised. 

381 

382 :param local: The context-local object that provides the proxied 

383 object. 

384 :param name: Proxy this attribute from the proxied object. 

385 :param unbound_message: The error message to show if the 

386 context-local object is unbound. 

387 

388 Proxy a :class:`~contextvars.ContextVar` to make it easier to 

389 access. Pass a name to proxy that attribute. 

390 

391 .. code-block:: python 

392 

393 _request_var = ContextVar("request") 

394 request = LocalProxy(_request_var) 

395 session = LocalProxy(_request_var, "session") 

396 

397 Proxy an attribute on a :class:`Local` namespace by calling the 

398 local with the attribute name: 

399 

400 .. code-block:: python 

401 

402 data = Local() 

403 user = data("user") 

404 

405 Proxy the top item on a :class:`LocalStack` by calling the local. 

406 Pass a name to proxy that attribute. 

407 

408 .. code-block:: 

409 

410 app_stack = LocalStack() 

411 current_app = app_stack() 

412 g = app_stack("g") 

413 

414 Pass a function to proxy the return value from that function. This 

415 was previously used to access attributes of local objects before 

416 that was supported directly. 

417 

418 .. code-block:: python 

419 

420 session = LocalProxy(lambda: request.session) 

421 

422 ``__repr__`` and ``__class__`` are proxied, so ``repr(x)`` and 

423 ``isinstance(x, cls)`` will look like the proxied object. Use 

424 ``issubclass(type(x), LocalProxy)`` to check if an object is a 

425 proxy. 

426 

427 .. code-block:: python 

428 

429 repr(user) # <User admin> 

430 isinstance(user, User) # True 

431 issubclass(type(user), LocalProxy) # True 

432 

433 .. versionchanged:: 2.2.2 

434 ``__wrapped__`` is set when wrapping an object, not only when 

435 wrapping a function, to prevent doctest from failing. 

436 

437 .. versionchanged:: 2.2 

438 Can proxy a ``ContextVar`` or ``LocalStack`` directly. 

439 

440 .. versionchanged:: 2.2 

441 The ``name`` parameter can be used with any proxied object, not 

442 only ``Local``. 

443 

444 .. versionchanged:: 2.2 

445 Added the ``unbound_message`` parameter. 

446 

447 .. versionchanged:: 2.0 

448 Updated proxied attributes and methods to reflect the current 

449 data model. 

450 

451 .. versionchanged:: 0.6.1 

452 The class can be instantiated with a callable. 

453 """ 

454 

455 __slots__ = ("__wrapped", "_get_current_object") 

456 

457 _get_current_object: t.Callable[[], T] 

458 """Return the current object this proxy is bound to. If the proxy is 

459 unbound, this raises a ``RuntimeError``. 

460 

461 This should be used if you need to pass the object to something that 

462 doesn't understand the proxy. It can also be useful for performance 

463 if you are accessing the object multiple times in a function, rather 

464 than going through the proxy multiple times. 

465 """ 

466 

467 def __init__( 

468 self, 

469 local: ContextVar[T] | Local | LocalStack[T] | t.Callable[[], T], 

470 name: str | None = None, 

471 *, 

472 unbound_message: str | None = None, 

473 ) -> None: 

474 if name is None: 

475 get_name = _identity 

476 else: 

477 get_name = attrgetter(name) # type: ignore[assignment] 

478 

479 if unbound_message is None: 

480 unbound_message = "object is not bound" 

481 

482 if isinstance(local, Local): 

483 if name is None: 

484 raise TypeError("'name' is required when proxying a 'Local' object.") 

485 

486 def _get_current_object() -> T: 

487 try: 

488 return get_name(local) # type: ignore[return-value] 

489 except AttributeError: 

490 raise RuntimeError(unbound_message) from None 

491 

492 elif isinstance(local, LocalStack): 

493 

494 def _get_current_object() -> T: 

495 obj = local.top 

496 

497 if obj is None: 

498 raise RuntimeError(unbound_message) 

499 

500 return get_name(obj) 

501 

502 elif isinstance(local, ContextVar): 

503 

504 def _get_current_object() -> T: 

505 try: 

506 obj = local.get() 

507 except LookupError: 

508 raise RuntimeError(unbound_message) from None 

509 

510 return get_name(obj) 

511 

512 elif callable(local): 

513 

514 def _get_current_object() -> T: 

515 return get_name(local()) 

516 

517 else: 

518 raise TypeError(f"Don't know how to proxy '{type(local)}'.") 

519 

520 object.__setattr__(self, "_LocalProxy__wrapped", local) 

521 object.__setattr__(self, "_get_current_object", _get_current_object) 

522 

523 __doc__ = _ProxyLookup( # type: ignore 

524 class_value=__doc__, fallback=lambda self: type(self).__doc__, is_attr=True 

525 ) 

526 __wrapped__ = _ProxyLookup( 

527 fallback=lambda self: self._LocalProxy__wrapped, is_attr=True 

528 ) 

529 # __del__ should only delete the proxy 

530 __repr__ = _ProxyLookup( # type: ignore 

531 repr, fallback=lambda self: f"<{type(self).__name__} unbound>" 

532 ) 

533 __str__ = _ProxyLookup(str) # type: ignore 

534 __bytes__ = _ProxyLookup(bytes) 

535 __format__ = _ProxyLookup() # type: ignore 

536 __lt__ = _ProxyLookup(operator.lt) 

537 __le__ = _ProxyLookup(operator.le) 

538 __eq__ = _ProxyLookup(operator.eq) # type: ignore 

539 __ne__ = _ProxyLookup(operator.ne) # type: ignore 

540 __gt__ = _ProxyLookup(operator.gt) 

541 __ge__ = _ProxyLookup(operator.ge) 

542 __hash__ = _ProxyLookup(hash) # type: ignore 

543 __bool__ = _ProxyLookup(bool, fallback=lambda self: False) 

544 __getattr__ = _ProxyLookup(getattr) 

545 # __getattribute__ triggered through __getattr__ 

546 __setattr__ = _ProxyLookup(setattr) # type: ignore 

547 __delattr__ = _ProxyLookup(delattr) # type: ignore 

548 __dir__ = _ProxyLookup(dir, fallback=lambda self: []) # type: ignore 

549 # __get__ (proxying descriptor not supported) 

550 # __set__ (descriptor) 

551 # __delete__ (descriptor) 

552 # __set_name__ (descriptor) 

553 # __objclass__ (descriptor) 

554 # __slots__ used by proxy itself 

555 # __dict__ (__getattr__) 

556 # __weakref__ (__getattr__) 

557 # __init_subclass__ (proxying metaclass not supported) 

558 # __prepare__ (metaclass) 

559 __class__ = _ProxyLookup( 

560 fallback=lambda self: type(self), is_attr=True 

561 ) # type: ignore 

562 __instancecheck__ = _ProxyLookup(lambda self, other: isinstance(other, self)) 

563 __subclasscheck__ = _ProxyLookup(lambda self, other: issubclass(other, self)) 

564 # __class_getitem__ triggered through __getitem__ 

565 __call__ = _ProxyLookup(lambda self, *args, **kwargs: self(*args, **kwargs)) 

566 __len__ = _ProxyLookup(len) 

567 __length_hint__ = _ProxyLookup(operator.length_hint) 

568 __getitem__ = _ProxyLookup(operator.getitem) 

569 __setitem__ = _ProxyLookup(operator.setitem) 

570 __delitem__ = _ProxyLookup(operator.delitem) 

571 # __missing__ triggered through __getitem__ 

572 __iter__ = _ProxyLookup(iter) 

573 __next__ = _ProxyLookup(next) 

574 __reversed__ = _ProxyLookup(reversed) 

575 __contains__ = _ProxyLookup(operator.contains) 

576 __add__ = _ProxyLookup(operator.add) 

577 __sub__ = _ProxyLookup(operator.sub) 

578 __mul__ = _ProxyLookup(operator.mul) 

579 __matmul__ = _ProxyLookup(operator.matmul) 

580 __truediv__ = _ProxyLookup(operator.truediv) 

581 __floordiv__ = _ProxyLookup(operator.floordiv) 

582 __mod__ = _ProxyLookup(operator.mod) 

583 __divmod__ = _ProxyLookup(divmod) 

584 __pow__ = _ProxyLookup(pow) 

585 __lshift__ = _ProxyLookup(operator.lshift) 

586 __rshift__ = _ProxyLookup(operator.rshift) 

587 __and__ = _ProxyLookup(operator.and_) 

588 __xor__ = _ProxyLookup(operator.xor) 

589 __or__ = _ProxyLookup(operator.or_) 

590 __radd__ = _ProxyLookup(_l_to_r_op(operator.add)) 

591 __rsub__ = _ProxyLookup(_l_to_r_op(operator.sub)) 

592 __rmul__ = _ProxyLookup(_l_to_r_op(operator.mul)) 

593 __rmatmul__ = _ProxyLookup(_l_to_r_op(operator.matmul)) 

594 __rtruediv__ = _ProxyLookup(_l_to_r_op(operator.truediv)) 

595 __rfloordiv__ = _ProxyLookup(_l_to_r_op(operator.floordiv)) 

596 __rmod__ = _ProxyLookup(_l_to_r_op(operator.mod)) 

597 __rdivmod__ = _ProxyLookup(_l_to_r_op(divmod)) 

598 __rpow__ = _ProxyLookup(_l_to_r_op(pow)) 

599 __rlshift__ = _ProxyLookup(_l_to_r_op(operator.lshift)) 

600 __rrshift__ = _ProxyLookup(_l_to_r_op(operator.rshift)) 

601 __rand__ = _ProxyLookup(_l_to_r_op(operator.and_)) 

602 __rxor__ = _ProxyLookup(_l_to_r_op(operator.xor)) 

603 __ror__ = _ProxyLookup(_l_to_r_op(operator.or_)) 

604 __iadd__ = _ProxyIOp(operator.iadd) 

605 __isub__ = _ProxyIOp(operator.isub) 

606 __imul__ = _ProxyIOp(operator.imul) 

607 __imatmul__ = _ProxyIOp(operator.imatmul) 

608 __itruediv__ = _ProxyIOp(operator.itruediv) 

609 __ifloordiv__ = _ProxyIOp(operator.ifloordiv) 

610 __imod__ = _ProxyIOp(operator.imod) 

611 __ipow__ = _ProxyIOp(operator.ipow) 

612 __ilshift__ = _ProxyIOp(operator.ilshift) 

613 __irshift__ = _ProxyIOp(operator.irshift) 

614 __iand__ = _ProxyIOp(operator.iand) 

615 __ixor__ = _ProxyIOp(operator.ixor) 

616 __ior__ = _ProxyIOp(operator.ior) 

617 __neg__ = _ProxyLookup(operator.neg) 

618 __pos__ = _ProxyLookup(operator.pos) 

619 __abs__ = _ProxyLookup(abs) 

620 __invert__ = _ProxyLookup(operator.invert) 

621 __complex__ = _ProxyLookup(complex) 

622 __int__ = _ProxyLookup(int) 

623 __float__ = _ProxyLookup(float) 

624 __index__ = _ProxyLookup(operator.index) 

625 __round__ = _ProxyLookup(round) 

626 __trunc__ = _ProxyLookup(math.trunc) 

627 __floor__ = _ProxyLookup(math.floor) 

628 __ceil__ = _ProxyLookup(math.ceil) 

629 __enter__ = _ProxyLookup() 

630 __exit__ = _ProxyLookup() 

631 __await__ = _ProxyLookup() 

632 __aiter__ = _ProxyLookup() 

633 __anext__ = _ProxyLookup() 

634 __aenter__ = _ProxyLookup() 

635 __aexit__ = _ProxyLookup() 

636 __copy__ = _ProxyLookup(copy.copy) 

637 __deepcopy__ = _ProxyLookup(copy.deepcopy) 

638 # __getnewargs_ex__ (pickle through proxy not supported) 

639 # __getnewargs__ (pickle) 

640 # __getstate__ (pickle) 

641 # __setstate__ (pickle) 

642 # __reduce__ (pickle) 

643 # __reduce_ex__ (pickle)