Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/local.py: 81%

249 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:03 +0000

1import copy 

2import math 

3import operator 

4import typing as t 

5from contextvars import ContextVar 

6from functools import partial 

7from functools import update_wrapper 

8 

9from .wsgi import ClosingIterator 

10 

11if t.TYPE_CHECKING: 

12 from _typeshed.wsgi import StartResponse 

13 from _typeshed.wsgi import WSGIApplication 

14 from _typeshed.wsgi import WSGIEnvironment 

15 

16F = t.TypeVar("F", bound=t.Callable[..., t.Any]) 

17 

18 

19def release_local(local: t.Union["Local", "LocalStack"]) -> None: 

20 """Releases the contents of the local for the current context. 

21 This makes it possible to use locals without a manager. 

22 

23 Example:: 

24 

25 >>> loc = Local() 

26 >>> loc.foo = 42 

27 >>> release_local(loc) 

28 >>> hasattr(loc, 'foo') 

29 False 

30 

31 With this function one can release :class:`Local` objects as well 

32 as :class:`LocalStack` objects. However it is not possible to 

33 release data held by proxies that way, one always has to retain 

34 a reference to the underlying local object in order to be able 

35 to release it. 

36 

37 .. versionadded:: 0.6.1 

38 """ 

39 local.__release_local__() 

40 

41 

42class Local: 

43 __slots__ = ("_storage",) 

44 

45 def __init__(self) -> None: 

46 object.__setattr__(self, "_storage", ContextVar("local_storage")) 

47 

48 def __iter__(self) -> t.Iterator[t.Tuple[int, t.Any]]: 

49 return iter(self._storage.get({}).items()) 

50 

51 def __call__(self, proxy: str) -> "LocalProxy": 

52 """Create a proxy for a name.""" 

53 return LocalProxy(self, proxy) 

54 

55 def __release_local__(self) -> None: 

56 self._storage.set({}) 

57 

58 def __getattr__(self, name: str) -> t.Any: 

59 values = self._storage.get({}) 

60 try: 

61 return values[name] 

62 except KeyError: 

63 raise AttributeError(name) from None 

64 

65 def __setattr__(self, name: str, value: t.Any) -> None: 

66 values = self._storage.get({}).copy() 

67 values[name] = value 

68 self._storage.set(values) 

69 

70 def __delattr__(self, name: str) -> None: 

71 values = self._storage.get({}).copy() 

72 try: 

73 del values[name] 

74 self._storage.set(values) 

75 except KeyError: 

76 raise AttributeError(name) from None 

77 

78 

79class LocalStack: 

80 """This class works similar to a :class:`Local` but keeps a stack 

81 of objects instead. This is best explained with an example:: 

82 

83 >>> ls = LocalStack() 

84 >>> ls.push(42) 

85 >>> ls.top 

86 42 

87 >>> ls.push(23) 

88 >>> ls.top 

89 23 

90 >>> ls.pop() 

91 23 

92 >>> ls.top 

93 42 

94 

95 They can be force released by using a :class:`LocalManager` or with 

96 the :func:`release_local` function but the correct way is to pop the 

97 item from the stack after using. When the stack is empty it will 

98 no longer be bound to the current context (and as such released). 

99 

100 By calling the stack without arguments it returns a proxy that resolves to 

101 the topmost item on the stack. 

102 

103 .. versionadded:: 0.6.1 

104 """ 

105 

106 def __init__(self) -> None: 

107 self._local = Local() 

108 

109 def __release_local__(self) -> None: 

110 self._local.__release_local__() 

111 

112 def __call__(self) -> "LocalProxy": 

113 def _lookup() -> t.Any: 

114 rv = self.top 

115 if rv is None: 

116 raise RuntimeError("object unbound") 

117 return rv 

118 

119 return LocalProxy(_lookup) 

120 

121 def push(self, obj: t.Any) -> t.List[t.Any]: 

122 """Pushes a new item to the stack""" 

123 rv = getattr(self._local, "stack", []).copy() 

124 rv.append(obj) 

125 self._local.stack = rv 

126 return rv 

127 

128 def pop(self) -> t.Any: 

129 """Removes the topmost item from the stack, will return the 

130 old value or `None` if the stack was already empty. 

131 """ 

132 stack = getattr(self._local, "stack", None) 

133 if stack is None: 

134 return None 

135 elif len(stack) == 1: 

136 release_local(self._local) 

137 return stack[-1] 

138 else: 

139 return stack.pop() 

140 

141 @property 

142 def top(self) -> t.Any: 

143 """The topmost item on the stack. If the stack is empty, 

144 `None` is returned. 

145 """ 

146 try: 

147 return self._local.stack[-1] 

148 except (AttributeError, IndexError): 

149 return None 

150 

151 

152class LocalManager: 

153 """Local objects cannot manage themselves. For that you need a local 

154 manager. You can pass a local manager multiple locals or add them 

155 later by appending them to `manager.locals`. Every time the manager 

156 cleans up, it will clean up all the data left in the locals for this 

157 context. 

158 

159 .. versionchanged:: 2.0 

160 ``ident_func`` is deprecated and will be removed in Werkzeug 

161 2.1. 

162 

163 .. versionchanged:: 0.6.1 

164 The :func:`release_local` function can be used instead of a 

165 manager. 

166 

167 .. versionchanged:: 0.7 

168 The ``ident_func`` parameter was added. 

169 """ 

170 

171 def __init__( 

172 self, locals: t.Optional[t.Iterable[t.Union[Local, LocalStack]]] = None 

173 ) -> None: 

174 if locals is None: 

175 self.locals = [] 

176 elif isinstance(locals, Local): 

177 self.locals = [locals] 

178 else: 

179 self.locals = list(locals) 

180 

181 def cleanup(self) -> None: 

182 """Manually clean up the data in the locals for this context. Call 

183 this at the end of the request or use `make_middleware()`. 

184 """ 

185 for local in self.locals: 

186 release_local(local) 

187 

188 def make_middleware(self, app: "WSGIApplication") -> "WSGIApplication": 

189 """Wrap a WSGI application so that cleaning up happens after 

190 request end. 

191 """ 

192 

193 def application( 

194 environ: "WSGIEnvironment", start_response: "StartResponse" 

195 ) -> t.Iterable[bytes]: 

196 return ClosingIterator(app(environ, start_response), self.cleanup) 

197 

198 return application 

199 

200 def middleware(self, func: "WSGIApplication") -> "WSGIApplication": 

201 """Like `make_middleware` but for decorating functions. 

202 

203 Example usage:: 

204 

205 @manager.middleware 

206 def application(environ, start_response): 

207 ... 

208 

209 The difference to `make_middleware` is that the function passed 

210 will have all the arguments copied from the inner application 

211 (name, docstring, module). 

212 """ 

213 return update_wrapper(self.make_middleware(func), func) 

214 

215 def __repr__(self) -> str: 

216 return f"<{type(self).__name__} storages: {len(self.locals)}>" 

217 

218 

219class _ProxyLookup: 

220 """Descriptor that handles proxied attribute lookup for 

221 :class:`LocalProxy`. 

222 

223 :param f: The built-in function this attribute is accessed through. 

224 Instead of looking up the special method, the function call 

225 is redone on the object. 

226 :param fallback: Return this function if the proxy is unbound 

227 instead of raising a :exc:`RuntimeError`. 

228 :param is_attr: This proxied name is an attribute, not a function. 

229 Call the fallback immediately to get the value. 

230 :param class_value: Value to return when accessed from the 

231 ``LocalProxy`` class directly. Used for ``__doc__`` so building 

232 docs still works. 

233 """ 

234 

235 __slots__ = ("bind_f", "fallback", "is_attr", "class_value", "name") 

236 

237 def __init__( 

238 self, 

239 f: t.Optional[t.Callable] = None, 

240 fallback: t.Optional[t.Callable] = None, 

241 class_value: t.Optional[t.Any] = None, 

242 is_attr: bool = False, 

243 ) -> None: 

244 bind_f: t.Optional[t.Callable[["LocalProxy", t.Any], t.Callable]] 

245 

246 if hasattr(f, "__get__"): 

247 # A Python function, can be turned into a bound method. 

248 

249 def bind_f(instance: "LocalProxy", obj: t.Any) -> t.Callable: 

250 return f.__get__(obj, type(obj)) # type: ignore 

251 

252 elif f is not None: 

253 # A C function, use partial to bind the first argument. 

254 

255 def bind_f(instance: "LocalProxy", obj: t.Any) -> t.Callable: 

256 return partial(f, obj) # type: ignore 

257 

258 else: 

259 # Use getattr, which will produce a bound method. 

260 bind_f = None 

261 

262 self.bind_f = bind_f 

263 self.fallback = fallback 

264 self.class_value = class_value 

265 self.is_attr = is_attr 

266 

267 def __set_name__(self, owner: "LocalProxy", name: str) -> None: 

268 self.name = name 

269 

270 def __get__(self, instance: "LocalProxy", owner: t.Optional[type] = None) -> t.Any: 

271 if instance is None: 

272 if self.class_value is not None: 

273 return self.class_value 

274 

275 return self 

276 

277 try: 

278 obj = instance._get_current_object() 

279 except RuntimeError: 

280 if self.fallback is None: 

281 raise 

282 

283 fallback = self.fallback.__get__(instance, owner) 

284 

285 if self.is_attr: 

286 # __class__ and __doc__ are attributes, not methods. 

287 # Call the fallback to get the value. 

288 return fallback() 

289 

290 return fallback 

291 

292 if self.bind_f is not None: 

293 return self.bind_f(instance, obj) 

294 

295 return getattr(obj, self.name) 

296 

297 def __repr__(self) -> str: 

298 return f"proxy {self.name}" 

299 

300 def __call__(self, instance: "LocalProxy", *args: t.Any, **kwargs: t.Any) -> t.Any: 

301 """Support calling unbound methods from the class. For example, 

302 this happens with ``copy.copy``, which does 

303 ``type(x).__copy__(x)``. ``type(x)`` can't be proxied, so it 

304 returns the proxy type and descriptor. 

305 """ 

306 return self.__get__(instance, type(instance))(*args, **kwargs) 

307 

308 

309class _ProxyIOp(_ProxyLookup): 

310 """Look up an augmented assignment method on a proxied object. The 

311 method is wrapped to return the proxy instead of the object. 

312 """ 

313 

314 __slots__ = () 

315 

316 def __init__( 

317 self, f: t.Optional[t.Callable] = None, fallback: t.Optional[t.Callable] = None 

318 ) -> None: 

319 super().__init__(f, fallback) 

320 

321 def bind_f(instance: "LocalProxy", obj: t.Any) -> t.Callable: 

322 def i_op(self: t.Any, other: t.Any) -> "LocalProxy": 

323 f(self, other) # type: ignore 

324 return instance 

325 

326 return i_op.__get__(obj, type(obj)) # type: ignore 

327 

328 self.bind_f = bind_f 

329 

330 

331def _l_to_r_op(op: F) -> F: 

332 """Swap the argument order to turn an l-op into an r-op.""" 

333 

334 def r_op(obj: t.Any, other: t.Any) -> t.Any: 

335 return op(other, obj) 

336 

337 return t.cast(F, r_op) 

338 

339 

340class LocalProxy: 

341 """A proxy to the object bound to a :class:`Local`. All operations 

342 on the proxy are forwarded to the bound object. If no object is 

343 bound, a :exc:`RuntimeError` is raised. 

344 

345 .. code-block:: python 

346 

347 from werkzeug.local import Local 

348 l = Local() 

349 

350 # a proxy to whatever l.user is set to 

351 user = l("user") 

352 

353 from werkzeug.local import LocalStack 

354 _request_stack = LocalStack() 

355 

356 # a proxy to _request_stack.top 

357 request = _request_stack() 

358 

359 # a proxy to the session attribute of the request proxy 

360 session = LocalProxy(lambda: request.session) 

361 

362 ``__repr__`` and ``__class__`` are forwarded, so ``repr(x)`` and 

363 ``isinstance(x, cls)`` will look like the proxied object. Use 

364 ``issubclass(type(x), LocalProxy)`` to check if an object is a 

365 proxy. 

366 

367 .. code-block:: python 

368 

369 repr(user) # <User admin> 

370 isinstance(user, User) # True 

371 issubclass(type(user), LocalProxy) # True 

372 

373 :param local: The :class:`Local` or callable that provides the 

374 proxied object. 

375 :param name: The attribute name to look up on a :class:`Local`. Not 

376 used if a callable is given. 

377 

378 .. versionchanged:: 2.0 

379 Updated proxied attributes and methods to reflect the current 

380 data model. 

381 

382 .. versionchanged:: 0.6.1 

383 The class can be instantiated with a callable. 

384 """ 

385 

386 __slots__ = ("__local", "__name", "__wrapped__") 

387 

388 def __init__( 

389 self, 

390 local: t.Union["Local", t.Callable[[], t.Any]], 

391 name: t.Optional[str] = None, 

392 ) -> None: 

393 object.__setattr__(self, "_LocalProxy__local", local) 

394 object.__setattr__(self, "_LocalProxy__name", name) 

395 

396 if callable(local) and not hasattr(local, "__release_local__"): 

397 # "local" is a callable that is not an instance of Local or 

398 # LocalManager: mark it as a wrapped function. 

399 object.__setattr__(self, "__wrapped__", local) 

400 

401 def _get_current_object(self) -> t.Any: 

402 """Return the current object. This is useful if you want the real 

403 object behind the proxy at a time for performance reasons or because 

404 you want to pass the object into a different context. 

405 """ 

406 if not hasattr(self.__local, "__release_local__"): # type: ignore 

407 return self.__local() # type: ignore 

408 

409 try: 

410 return getattr(self.__local, self.__name) # type: ignore 

411 except AttributeError: 

412 name = self.__name # type: ignore 

413 raise RuntimeError(f"no object bound to {name}") from None 

414 

415 __doc__ = _ProxyLookup( # type: ignore 

416 class_value=__doc__, fallback=lambda self: type(self).__doc__, is_attr=True 

417 ) 

418 # __del__ should only delete the proxy 

419 __repr__ = _ProxyLookup( # type: ignore 

420 repr, fallback=lambda self: f"<{type(self).__name__} unbound>" 

421 ) 

422 __str__ = _ProxyLookup(str) # type: ignore 

423 __bytes__ = _ProxyLookup(bytes) 

424 __format__ = _ProxyLookup() # type: ignore 

425 __lt__ = _ProxyLookup(operator.lt) 

426 __le__ = _ProxyLookup(operator.le) 

427 __eq__ = _ProxyLookup(operator.eq) # type: ignore 

428 __ne__ = _ProxyLookup(operator.ne) # type: ignore 

429 __gt__ = _ProxyLookup(operator.gt) 

430 __ge__ = _ProxyLookup(operator.ge) 

431 __hash__ = _ProxyLookup(hash) # type: ignore 

432 __bool__ = _ProxyLookup(bool, fallback=lambda self: False) 

433 __getattr__ = _ProxyLookup(getattr) 

434 # __getattribute__ triggered through __getattr__ 

435 __setattr__ = _ProxyLookup(setattr) # type: ignore 

436 __delattr__ = _ProxyLookup(delattr) # type: ignore 

437 __dir__ = _ProxyLookup(dir, fallback=lambda self: []) # type: ignore 

438 # __get__ (proxying descriptor not supported) 

439 # __set__ (descriptor) 

440 # __delete__ (descriptor) 

441 # __set_name__ (descriptor) 

442 # __objclass__ (descriptor) 

443 # __slots__ used by proxy itself 

444 # __dict__ (__getattr__) 

445 # __weakref__ (__getattr__) 

446 # __init_subclass__ (proxying metaclass not supported) 

447 # __prepare__ (metaclass) 

448 __class__ = _ProxyLookup( 

449 fallback=lambda self: type(self), is_attr=True 

450 ) # type: ignore 

451 __instancecheck__ = _ProxyLookup(lambda self, other: isinstance(other, self)) 

452 __subclasscheck__ = _ProxyLookup(lambda self, other: issubclass(other, self)) 

453 # __class_getitem__ triggered through __getitem__ 

454 __call__ = _ProxyLookup(lambda self, *args, **kwargs: self(*args, **kwargs)) 

455 __len__ = _ProxyLookup(len) 

456 __length_hint__ = _ProxyLookup(operator.length_hint) 

457 __getitem__ = _ProxyLookup(operator.getitem) 

458 __setitem__ = _ProxyLookup(operator.setitem) 

459 __delitem__ = _ProxyLookup(operator.delitem) 

460 # __missing__ triggered through __getitem__ 

461 __iter__ = _ProxyLookup(iter) 

462 __next__ = _ProxyLookup(next) 

463 __reversed__ = _ProxyLookup(reversed) 

464 __contains__ = _ProxyLookup(operator.contains) 

465 __add__ = _ProxyLookup(operator.add) 

466 __sub__ = _ProxyLookup(operator.sub) 

467 __mul__ = _ProxyLookup(operator.mul) 

468 __matmul__ = _ProxyLookup(operator.matmul) 

469 __truediv__ = _ProxyLookup(operator.truediv) 

470 __floordiv__ = _ProxyLookup(operator.floordiv) 

471 __mod__ = _ProxyLookup(operator.mod) 

472 __divmod__ = _ProxyLookup(divmod) 

473 __pow__ = _ProxyLookup(pow) 

474 __lshift__ = _ProxyLookup(operator.lshift) 

475 __rshift__ = _ProxyLookup(operator.rshift) 

476 __and__ = _ProxyLookup(operator.and_) 

477 __xor__ = _ProxyLookup(operator.xor) 

478 __or__ = _ProxyLookup(operator.or_) 

479 __radd__ = _ProxyLookup(_l_to_r_op(operator.add)) 

480 __rsub__ = _ProxyLookup(_l_to_r_op(operator.sub)) 

481 __rmul__ = _ProxyLookup(_l_to_r_op(operator.mul)) 

482 __rmatmul__ = _ProxyLookup(_l_to_r_op(operator.matmul)) 

483 __rtruediv__ = _ProxyLookup(_l_to_r_op(operator.truediv)) 

484 __rfloordiv__ = _ProxyLookup(_l_to_r_op(operator.floordiv)) 

485 __rmod__ = _ProxyLookup(_l_to_r_op(operator.mod)) 

486 __rdivmod__ = _ProxyLookup(_l_to_r_op(divmod)) 

487 __rpow__ = _ProxyLookup(_l_to_r_op(pow)) 

488 __rlshift__ = _ProxyLookup(_l_to_r_op(operator.lshift)) 

489 __rrshift__ = _ProxyLookup(_l_to_r_op(operator.rshift)) 

490 __rand__ = _ProxyLookup(_l_to_r_op(operator.and_)) 

491 __rxor__ = _ProxyLookup(_l_to_r_op(operator.xor)) 

492 __ror__ = _ProxyLookup(_l_to_r_op(operator.or_)) 

493 __iadd__ = _ProxyIOp(operator.iadd) 

494 __isub__ = _ProxyIOp(operator.isub) 

495 __imul__ = _ProxyIOp(operator.imul) 

496 __imatmul__ = _ProxyIOp(operator.imatmul) 

497 __itruediv__ = _ProxyIOp(operator.itruediv) 

498 __ifloordiv__ = _ProxyIOp(operator.ifloordiv) 

499 __imod__ = _ProxyIOp(operator.imod) 

500 __ipow__ = _ProxyIOp(operator.ipow) 

501 __ilshift__ = _ProxyIOp(operator.ilshift) 

502 __irshift__ = _ProxyIOp(operator.irshift) 

503 __iand__ = _ProxyIOp(operator.iand) 

504 __ixor__ = _ProxyIOp(operator.ixor) 

505 __ior__ = _ProxyIOp(operator.ior) 

506 __neg__ = _ProxyLookup(operator.neg) 

507 __pos__ = _ProxyLookup(operator.pos) 

508 __abs__ = _ProxyLookup(abs) 

509 __invert__ = _ProxyLookup(operator.invert) 

510 __complex__ = _ProxyLookup(complex) 

511 __int__ = _ProxyLookup(int) 

512 __float__ = _ProxyLookup(float) 

513 __index__ = _ProxyLookup(operator.index) 

514 __round__ = _ProxyLookup(round) 

515 __trunc__ = _ProxyLookup(math.trunc) 

516 __floor__ = _ProxyLookup(math.floor) 

517 __ceil__ = _ProxyLookup(math.ceil) 

518 __enter__ = _ProxyLookup() 

519 __exit__ = _ProxyLookup() 

520 __await__ = _ProxyLookup() 

521 __aiter__ = _ProxyLookup() 

522 __anext__ = _ProxyLookup() 

523 __aenter__ = _ProxyLookup() 

524 __aexit__ = _ProxyLookup() 

525 __copy__ = _ProxyLookup(copy.copy) 

526 __deepcopy__ = _ProxyLookup(copy.deepcopy) 

527 # __getnewargs_ex__ (pickle through proxy not supported) 

528 # __getnewargs__ (pickle) 

529 # __getstate__ (pickle) 

530 # __setstate__ (pickle) 

531 # __reduce__ (pickle) 

532 # __reduce_ex__ (pickle)