Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/flask/ctx.py: 27%

150 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1import contextvars 

2import sys 

3import typing as t 

4from functools import update_wrapper 

5from types import TracebackType 

6 

7from werkzeug.exceptions import HTTPException 

8 

9from . import typing as ft 

10from .globals import _cv_app 

11from .globals import _cv_request 

12from .signals import appcontext_popped 

13from .signals import appcontext_pushed 

14 

15if t.TYPE_CHECKING: # pragma: no cover 

16 from .app import Flask 

17 from .sessions import SessionMixin 

18 from .wrappers import Request 

19 

20 

21# a singleton sentinel value for parameter defaults 

22_sentinel = object() 

23 

24 

25class _AppCtxGlobals: 

26 """A plain object. Used as a namespace for storing data during an 

27 application context. 

28 

29 Creating an app context automatically creates this object, which is 

30 made available as the :data:`g` proxy. 

31 

32 .. describe:: 'key' in g 

33 

34 Check whether an attribute is present. 

35 

36 .. versionadded:: 0.10 

37 

38 .. describe:: iter(g) 

39 

40 Return an iterator over the attribute names. 

41 

42 .. versionadded:: 0.10 

43 """ 

44 

45 # Define attr methods to let mypy know this is a namespace object 

46 # that has arbitrary attributes. 

47 

48 def __getattr__(self, name: str) -> t.Any: 

49 try: 

50 return self.__dict__[name] 

51 except KeyError: 

52 raise AttributeError(name) from None 

53 

54 def __setattr__(self, name: str, value: t.Any) -> None: 

55 self.__dict__[name] = value 

56 

57 def __delattr__(self, name: str) -> None: 

58 try: 

59 del self.__dict__[name] 

60 except KeyError: 

61 raise AttributeError(name) from None 

62 

63 def get(self, name: str, default: t.Optional[t.Any] = None) -> t.Any: 

64 """Get an attribute by name, or a default value. Like 

65 :meth:`dict.get`. 

66 

67 :param name: Name of attribute to get. 

68 :param default: Value to return if the attribute is not present. 

69 

70 .. versionadded:: 0.10 

71 """ 

72 return self.__dict__.get(name, default) 

73 

74 def pop(self, name: str, default: t.Any = _sentinel) -> t.Any: 

75 """Get and remove an attribute by name. Like :meth:`dict.pop`. 

76 

77 :param name: Name of attribute to pop. 

78 :param default: Value to return if the attribute is not present, 

79 instead of raising a ``KeyError``. 

80 

81 .. versionadded:: 0.11 

82 """ 

83 if default is _sentinel: 

84 return self.__dict__.pop(name) 

85 else: 

86 return self.__dict__.pop(name, default) 

87 

88 def setdefault(self, name: str, default: t.Any = None) -> t.Any: 

89 """Get the value of an attribute if it is present, otherwise 

90 set and return a default value. Like :meth:`dict.setdefault`. 

91 

92 :param name: Name of attribute to get. 

93 :param default: Value to set and return if the attribute is not 

94 present. 

95 

96 .. versionadded:: 0.11 

97 """ 

98 return self.__dict__.setdefault(name, default) 

99 

100 def __contains__(self, item: str) -> bool: 

101 return item in self.__dict__ 

102 

103 def __iter__(self) -> t.Iterator[str]: 

104 return iter(self.__dict__) 

105 

106 def __repr__(self) -> str: 

107 ctx = _cv_app.get(None) 

108 if ctx is not None: 

109 return f"<flask.g of '{ctx.app.name}'>" 

110 return object.__repr__(self) 

111 

112 

113def after_this_request(f: ft.AfterRequestCallable) -> ft.AfterRequestCallable: 

114 """Executes a function after this request. This is useful to modify 

115 response objects. The function is passed the response object and has 

116 to return the same or a new one. 

117 

118 Example:: 

119 

120 @app.route('/') 

121 def index(): 

122 @after_this_request 

123 def add_header(response): 

124 response.headers['X-Foo'] = 'Parachute' 

125 return response 

126 return 'Hello World!' 

127 

128 This is more useful if a function other than the view function wants to 

129 modify a response. For instance think of a decorator that wants to add 

130 some headers without converting the return value into a response object. 

131 

132 .. versionadded:: 0.9 

133 """ 

134 ctx = _cv_request.get(None) 

135 

136 if ctx is None: 

137 raise RuntimeError( 

138 "'after_this_request' can only be used when a request" 

139 " context is active, such as in a view function." 

140 ) 

141 

142 ctx._after_request_functions.append(f) 

143 return f 

144 

145 

146def copy_current_request_context(f: t.Callable) -> t.Callable: 

147 """A helper function that decorates a function to retain the current 

148 request context. This is useful when working with greenlets. The moment 

149 the function is decorated a copy of the request context is created and 

150 then pushed when the function is called. The current session is also 

151 included in the copied request context. 

152 

153 Example:: 

154 

155 import gevent 

156 from flask import copy_current_request_context 

157 

158 @app.route('/') 

159 def index(): 

160 @copy_current_request_context 

161 def do_some_work(): 

162 # do some work here, it can access flask.request or 

163 # flask.session like you would otherwise in the view function. 

164 ... 

165 gevent.spawn(do_some_work) 

166 return 'Regular response' 

167 

168 .. versionadded:: 0.10 

169 """ 

170 ctx = _cv_request.get(None) 

171 

172 if ctx is None: 

173 raise RuntimeError( 

174 "'copy_current_request_context' can only be used when a" 

175 " request context is active, such as in a view function." 

176 ) 

177 

178 ctx = ctx.copy() 

179 

180 def wrapper(*args, **kwargs): 

181 with ctx: 

182 return ctx.app.ensure_sync(f)(*args, **kwargs) 

183 

184 return update_wrapper(wrapper, f) 

185 

186 

187def has_request_context() -> bool: 

188 """If you have code that wants to test if a request context is there or 

189 not this function can be used. For instance, you may want to take advantage 

190 of request information if the request object is available, but fail 

191 silently if it is unavailable. 

192 

193 :: 

194 

195 class User(db.Model): 

196 

197 def __init__(self, username, remote_addr=None): 

198 self.username = username 

199 if remote_addr is None and has_request_context(): 

200 remote_addr = request.remote_addr 

201 self.remote_addr = remote_addr 

202 

203 Alternatively you can also just test any of the context bound objects 

204 (such as :class:`request` or :class:`g`) for truthness:: 

205 

206 class User(db.Model): 

207 

208 def __init__(self, username, remote_addr=None): 

209 self.username = username 

210 if remote_addr is None and request: 

211 remote_addr = request.remote_addr 

212 self.remote_addr = remote_addr 

213 

214 .. versionadded:: 0.7 

215 """ 

216 return _cv_request.get(None) is not None 

217 

218 

219def has_app_context() -> bool: 

220 """Works like :func:`has_request_context` but for the application 

221 context. You can also just do a boolean check on the 

222 :data:`current_app` object instead. 

223 

224 .. versionadded:: 0.9 

225 """ 

226 return _cv_app.get(None) is not None 

227 

228 

229class AppContext: 

230 """The app context contains application-specific information. An app 

231 context is created and pushed at the beginning of each request if 

232 one is not already active. An app context is also pushed when 

233 running CLI commands. 

234 """ 

235 

236 def __init__(self, app: "Flask") -> None: 

237 self.app = app 

238 self.url_adapter = app.create_url_adapter(None) 

239 self.g: _AppCtxGlobals = app.app_ctx_globals_class() 

240 self._cv_tokens: t.List[contextvars.Token] = [] 

241 

242 def push(self) -> None: 

243 """Binds the app context to the current context.""" 

244 self._cv_tokens.append(_cv_app.set(self)) 

245 appcontext_pushed.send(self.app) 

246 

247 def pop(self, exc: t.Optional[BaseException] = _sentinel) -> None: # type: ignore 

248 """Pops the app context.""" 

249 try: 

250 if len(self._cv_tokens) == 1: 

251 if exc is _sentinel: 

252 exc = sys.exc_info()[1] 

253 self.app.do_teardown_appcontext(exc) 

254 finally: 

255 ctx = _cv_app.get() 

256 _cv_app.reset(self._cv_tokens.pop()) 

257 

258 if ctx is not self: 

259 raise AssertionError( 

260 f"Popped wrong app context. ({ctx!r} instead of {self!r})" 

261 ) 

262 

263 appcontext_popped.send(self.app) 

264 

265 def __enter__(self) -> "AppContext": 

266 self.push() 

267 return self 

268 

269 def __exit__( 

270 self, 

271 exc_type: t.Optional[type], 

272 exc_value: t.Optional[BaseException], 

273 tb: t.Optional[TracebackType], 

274 ) -> None: 

275 self.pop(exc_value) 

276 

277 

278class RequestContext: 

279 """The request context contains per-request information. The Flask 

280 app creates and pushes it at the beginning of the request, then pops 

281 it at the end of the request. It will create the URL adapter and 

282 request object for the WSGI environment provided. 

283 

284 Do not attempt to use this class directly, instead use 

285 :meth:`~flask.Flask.test_request_context` and 

286 :meth:`~flask.Flask.request_context` to create this object. 

287 

288 When the request context is popped, it will evaluate all the 

289 functions registered on the application for teardown execution 

290 (:meth:`~flask.Flask.teardown_request`). 

291 

292 The request context is automatically popped at the end of the 

293 request. When using the interactive debugger, the context will be 

294 restored so ``request`` is still accessible. Similarly, the test 

295 client can preserve the context after the request ends. However, 

296 teardown functions may already have closed some resources such as 

297 database connections. 

298 """ 

299 

300 def __init__( 

301 self, 

302 app: "Flask", 

303 environ: dict, 

304 request: t.Optional["Request"] = None, 

305 session: t.Optional["SessionMixin"] = None, 

306 ) -> None: 

307 self.app = app 

308 if request is None: 

309 request = app.request_class(environ) 

310 request.json_module = app.json 

311 self.request: Request = request 

312 self.url_adapter = None 

313 try: 

314 self.url_adapter = app.create_url_adapter(self.request) 

315 except HTTPException as e: 

316 self.request.routing_exception = e 

317 self.flashes: t.Optional[t.List[t.Tuple[str, str]]] = None 

318 self.session: t.Optional["SessionMixin"] = session 

319 # Functions that should be executed after the request on the response 

320 # object. These will be called before the regular "after_request" 

321 # functions. 

322 self._after_request_functions: t.List[ft.AfterRequestCallable] = [] 

323 

324 self._cv_tokens: t.List[t.Tuple[contextvars.Token, t.Optional[AppContext]]] = [] 

325 

326 def copy(self) -> "RequestContext": 

327 """Creates a copy of this request context with the same request object. 

328 This can be used to move a request context to a different greenlet. 

329 Because the actual request object is the same this cannot be used to 

330 move a request context to a different thread unless access to the 

331 request object is locked. 

332 

333 .. versionadded:: 0.10 

334 

335 .. versionchanged:: 1.1 

336 The current session object is used instead of reloading the original 

337 data. This prevents `flask.session` pointing to an out-of-date object. 

338 """ 

339 return self.__class__( 

340 self.app, 

341 environ=self.request.environ, 

342 request=self.request, 

343 session=self.session, 

344 ) 

345 

346 def match_request(self) -> None: 

347 """Can be overridden by a subclass to hook into the matching 

348 of the request. 

349 """ 

350 try: 

351 result = self.url_adapter.match(return_rule=True) # type: ignore 

352 self.request.url_rule, self.request.view_args = result # type: ignore 

353 except HTTPException as e: 

354 self.request.routing_exception = e 

355 

356 def push(self) -> None: 

357 # Before we push the request context we have to ensure that there 

358 # is an application context. 

359 app_ctx = _cv_app.get(None) 

360 

361 if app_ctx is None or app_ctx.app is not self.app: 

362 app_ctx = self.app.app_context() 

363 app_ctx.push() 

364 else: 

365 app_ctx = None 

366 

367 self._cv_tokens.append((_cv_request.set(self), app_ctx)) 

368 

369 # Open the session at the moment that the request context is available. 

370 # This allows a custom open_session method to use the request context. 

371 # Only open a new session if this is the first time the request was 

372 # pushed, otherwise stream_with_context loses the session. 

373 if self.session is None: 

374 session_interface = self.app.session_interface 

375 self.session = session_interface.open_session(self.app, self.request) 

376 

377 if self.session is None: 

378 self.session = session_interface.make_null_session(self.app) 

379 

380 # Match the request URL after loading the session, so that the 

381 # session is available in custom URL converters. 

382 if self.url_adapter is not None: 

383 self.match_request() 

384 

385 def pop(self, exc: t.Optional[BaseException] = _sentinel) -> None: # type: ignore 

386 """Pops the request context and unbinds it by doing that. This will 

387 also trigger the execution of functions registered by the 

388 :meth:`~flask.Flask.teardown_request` decorator. 

389 

390 .. versionchanged:: 0.9 

391 Added the `exc` argument. 

392 """ 

393 clear_request = len(self._cv_tokens) == 1 

394 

395 try: 

396 if clear_request: 

397 if exc is _sentinel: 

398 exc = sys.exc_info()[1] 

399 self.app.do_teardown_request(exc) 

400 

401 request_close = getattr(self.request, "close", None) 

402 if request_close is not None: 

403 request_close() 

404 finally: 

405 ctx = _cv_request.get() 

406 token, app_ctx = self._cv_tokens.pop() 

407 _cv_request.reset(token) 

408 

409 # get rid of circular dependencies at the end of the request 

410 # so that we don't require the GC to be active. 

411 if clear_request: 

412 ctx.request.environ["werkzeug.request"] = None 

413 

414 if app_ctx is not None: 

415 app_ctx.pop(exc) 

416 

417 if ctx is not self: 

418 raise AssertionError( 

419 f"Popped wrong request context. ({ctx!r} instead of {self!r})" 

420 ) 

421 

422 def __enter__(self) -> "RequestContext": 

423 self.push() 

424 return self 

425 

426 def __exit__( 

427 self, 

428 exc_type: t.Optional[type], 

429 exc_value: t.Optional[BaseException], 

430 tb: t.Optional[TracebackType], 

431 ) -> None: 

432 self.pop(exc_value) 

433 

434 def __repr__(self) -> str: 

435 return ( 

436 f"<{type(self).__name__} {self.request.url!r}" 

437 f" [{self.request.method}] of {self.app.name}>" 

438 )