Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_app.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

253 statements  

1import asyncio 

2import logging 

3import warnings 

4from collections.abc import ( 

5 AsyncIterator, 

6 Awaitable, 

7 Callable, 

8 Iterable, 

9 Iterator, 

10 Mapping, 

11 MutableMapping, 

12 Sequence, 

13) 

14from contextlib import AbstractAsyncContextManager, asynccontextmanager 

15from functools import lru_cache, partial, update_wrapper 

16from typing import Any, TypeVar, cast, final, overload 

17 

18from aiosignal import Signal 

19from frozenlist import FrozenList 

20 

21from . import hdrs 

22from .helpers import AppKey 

23from .log import web_logger 

24from .typedefs import Handler, Middleware 

25from .web_exceptions import NotAppKeyWarning 

26from .web_middlewares import _fix_request_current_app 

27from .web_request import Request 

28from .web_response import StreamResponse 

29from .web_routedef import AbstractRouteDef 

30from .web_urldispatcher import ( 

31 AbstractResource, 

32 AbstractRoute, 

33 Domain, 

34 MaskDomain, 

35 MatchedSubAppResource, 

36 PrefixedSubAppResource, 

37 SystemRoute, 

38 UrlDispatcher, 

39) 

40 

41__all__ = ("Application", "CleanupError") 

42 

43_AppSignal = Signal["Application"] 

44_RespPrepareSignal = Signal[Request, StreamResponse] 

45_Middlewares = FrozenList[Middleware] 

46_MiddlewaresHandlers = Sequence[Middleware] 

47_Subapps = list["Application"] 

48 

49_T = TypeVar("_T") 

50_U = TypeVar("_U") 

51_Resource = TypeVar("_Resource", bound=AbstractResource) 

52 

53 

54def _build_middlewares( 

55 handler: Handler, apps: tuple["Application", ...] 

56) -> Callable[[Request], Awaitable[StreamResponse]]: 

57 """Apply middlewares to handler.""" 

58 # The slice is to reverse the order of the apps 

59 # so they are applied in the order they were added 

60 for app in apps[::-1]: 

61 assert app.pre_frozen, "middleware handlers are not ready" 

62 for m in app._middlewares_handlers: 

63 handler = update_wrapper(partial(m, handler=handler), handler) 

64 return handler 

65 

66 

67_cached_build_middleware = lru_cache(maxsize=1024)(_build_middlewares) 

68 

69 

70@final 

71class Application(MutableMapping[str | AppKey[Any], Any]): 

72 __slots__ = ( 

73 "logger", 

74 "_router", 

75 "_loop", 

76 "_handler_args", 

77 "_middlewares", 

78 "_middlewares_handlers", 

79 "_run_middlewares", 

80 "_state", 

81 "_frozen", 

82 "_pre_frozen", 

83 "_subapps", 

84 "_on_response_prepare", 

85 "_on_startup", 

86 "_on_shutdown", 

87 "_on_cleanup", 

88 "_client_max_size", 

89 "_cleanup_ctx", 

90 ) 

91 

92 def __init__( 

93 self, 

94 *, 

95 logger: logging.Logger = web_logger, 

96 middlewares: Iterable[Middleware] = (), 

97 handler_args: Mapping[str, Any] | None = None, 

98 client_max_size: int = 1024**2, 

99 debug: Any = ..., # mypy doesn't support ellipsis 

100 ) -> None: 

101 if debug is not ...: 

102 warnings.warn( 

103 "debug argument is no-op since 4.0 and scheduled for removal in 5.0", 

104 DeprecationWarning, 

105 stacklevel=2, 

106 ) 

107 self._router = UrlDispatcher() 

108 self._handler_args = handler_args 

109 self.logger = logger 

110 

111 self._middlewares: _Middlewares = FrozenList(middlewares) 

112 

113 # initialized on freezing 

114 self._middlewares_handlers: _MiddlewaresHandlers = tuple() 

115 # initialized on freezing 

116 self._run_middlewares: bool | None = None 

117 

118 self._state: dict[AppKey[Any] | str, object] = {} 

119 self._frozen = False 

120 self._pre_frozen = False 

121 self._subapps: _Subapps = [] 

122 

123 self._on_response_prepare: _RespPrepareSignal = Signal(self) 

124 self._on_startup: _AppSignal = Signal(self) 

125 self._on_shutdown: _AppSignal = Signal(self) 

126 self._on_cleanup: _AppSignal = Signal(self) 

127 self._cleanup_ctx = CleanupContext() 

128 self._on_startup.append(self._cleanup_ctx._on_startup) 

129 self._on_cleanup.append(self._cleanup_ctx._on_cleanup) 

130 self._client_max_size = client_max_size 

131 

132 def __init_subclass__(cls: type["Application"]) -> None: 

133 raise TypeError( 

134 f"Inheritance class {cls.__name__} from web.Application is forbidden" 

135 ) 

136 

137 # MutableMapping API 

138 

139 def __eq__(self, other: object) -> bool: 

140 return self is other 

141 

142 @overload # type: ignore[override] 

143 def __getitem__(self, key: AppKey[_T]) -> _T: ... 

144 

145 @overload 

146 def __getitem__(self, key: str) -> Any: ... 

147 

148 def __getitem__(self, key: str | AppKey[_T]) -> Any: 

149 return self._state[key] 

150 

151 def _check_frozen(self) -> None: 

152 if self._frozen: 

153 raise RuntimeError( 

154 "Changing state of started or joined application is forbidden" 

155 ) 

156 

157 @overload # type: ignore[override] 

158 def __setitem__(self, key: AppKey[_T], value: _T) -> None: ... 

159 

160 @overload 

161 def __setitem__(self, key: str, value: Any) -> None: ... 

162 

163 def __setitem__(self, key: str | AppKey[_T], value: Any) -> None: 

164 self._check_frozen() 

165 if not isinstance(key, AppKey): 

166 warnings.warn( 

167 "It is recommended to use web.AppKey instances for keys.\n" 

168 + "https://docs.aiohttp.org/en/stable/web_advanced.html" 

169 + "#application-s-config", 

170 category=NotAppKeyWarning, 

171 stacklevel=2, 

172 ) 

173 self._state[key] = value 

174 

175 def __delitem__(self, key: str | AppKey[_T]) -> None: 

176 self._check_frozen() 

177 del self._state[key] 

178 

179 def __len__(self) -> int: 

180 return len(self._state) 

181 

182 def __iter__(self) -> Iterator[str | AppKey[Any]]: 

183 return iter(self._state) 

184 

185 def __hash__(self) -> int: 

186 return id(self) 

187 

188 @overload # type: ignore[override] 

189 def get(self, key: AppKey[_T], default: None = ...) -> _T | None: ... 

190 

191 @overload 

192 def get(self, key: AppKey[_T], default: _U) -> _T | _U: ... 

193 

194 @overload 

195 def get(self, key: str, default: Any = ...) -> Any: ... 

196 

197 def get(self, key: str | AppKey[_T], default: Any = None) -> Any: 

198 return self._state.get(key, default) 

199 

200 ######## 

201 def _set_loop(self, loop: asyncio.AbstractEventLoop | None) -> None: 

202 warnings.warn( 

203 "_set_loop() is no-op since 4.0 and scheduled for removal in 5.0", 

204 DeprecationWarning, 

205 stacklevel=2, 

206 ) 

207 

208 @property 

209 def pre_frozen(self) -> bool: 

210 return self._pre_frozen 

211 

212 def pre_freeze(self) -> None: 

213 if self._pre_frozen: 

214 return 

215 

216 self._pre_frozen = True 

217 self._middlewares.freeze() 

218 self._router.freeze() 

219 self._on_response_prepare.freeze() 

220 self._cleanup_ctx.freeze() 

221 self._on_startup.freeze() 

222 self._on_shutdown.freeze() 

223 self._on_cleanup.freeze() 

224 self._middlewares_handlers = tuple(self._prepare_middleware()) 

225 

226 # If current app and any subapp do not have middlewares avoid run all 

227 # of the code footprint that it implies, which have a middleware 

228 # hardcoded per app that sets up the current_app attribute. If no 

229 # middlewares are configured the handler will receive the proper 

230 # current_app without needing all of this code. 

231 self._run_middlewares = True if self.middlewares else False 

232 

233 for subapp in self._subapps: 

234 subapp.pre_freeze() 

235 self._run_middlewares = self._run_middlewares or subapp._run_middlewares 

236 

237 @property 

238 def frozen(self) -> bool: 

239 return self._frozen 

240 

241 def freeze(self) -> None: 

242 if self._frozen: 

243 return 

244 

245 self.pre_freeze() 

246 self._frozen = True 

247 for subapp in self._subapps: 

248 subapp.freeze() 

249 

250 @property 

251 def debug(self) -> bool: 

252 warnings.warn( 

253 "debug property is deprecated since 4.0 and scheduled for removal in 5.0", 

254 DeprecationWarning, 

255 stacklevel=2, 

256 ) 

257 return asyncio.get_event_loop().get_debug() 

258 

259 def _reg_subapp_signals(self, subapp: "Application") -> None: 

260 def reg_handler(signame: str) -> None: 

261 subsig = getattr(subapp, signame) 

262 

263 async def handler(app: "Application") -> None: 

264 await subsig.send(subapp) 

265 

266 appsig = getattr(self, signame) 

267 appsig.append(handler) 

268 

269 reg_handler("on_startup") 

270 reg_handler("on_shutdown") 

271 reg_handler("on_cleanup") 

272 

273 def add_subapp(self, prefix: str, subapp: "Application") -> PrefixedSubAppResource: 

274 if not isinstance(prefix, str): 

275 raise TypeError("Prefix must be str") 

276 prefix = prefix.rstrip("/") 

277 if not prefix: 

278 raise ValueError("Prefix cannot be empty") 

279 factory = partial(PrefixedSubAppResource, prefix, subapp) 

280 return self._add_subapp(factory, subapp) 

281 

282 def _add_subapp( 

283 self, resource_factory: Callable[[], _Resource], subapp: "Application" 

284 ) -> _Resource: 

285 if self.frozen: 

286 raise RuntimeError("Cannot add sub application to frozen application") 

287 if subapp.frozen: 

288 raise RuntimeError("Cannot add frozen application") 

289 resource = resource_factory() 

290 self.router.register_resource(resource) 

291 self._reg_subapp_signals(subapp) 

292 self._subapps.append(subapp) 

293 subapp.pre_freeze() 

294 return resource 

295 

296 def add_domain(self, domain: str, subapp: "Application") -> MatchedSubAppResource: 

297 if not isinstance(domain, str): 

298 raise TypeError("Domain must be str") 

299 elif "*" in domain: 

300 rule: Domain = MaskDomain(domain) 

301 else: 

302 rule = Domain(domain) 

303 factory = partial(MatchedSubAppResource, rule, subapp) 

304 return self._add_subapp(factory, subapp) 

305 

306 def add_routes(self, routes: Iterable[AbstractRouteDef]) -> list[AbstractRoute]: 

307 return self.router.add_routes(routes) 

308 

309 @property 

310 def on_response_prepare(self) -> _RespPrepareSignal: 

311 return self._on_response_prepare 

312 

313 @property 

314 def on_startup(self) -> _AppSignal: 

315 return self._on_startup 

316 

317 @property 

318 def on_shutdown(self) -> _AppSignal: 

319 return self._on_shutdown 

320 

321 @property 

322 def on_cleanup(self) -> _AppSignal: 

323 return self._on_cleanup 

324 

325 @property 

326 def cleanup_ctx(self) -> "CleanupContext": 

327 return self._cleanup_ctx 

328 

329 @property 

330 def router(self) -> UrlDispatcher: 

331 return self._router 

332 

333 @property 

334 def middlewares(self) -> _Middlewares: 

335 return self._middlewares 

336 

337 async def startup(self) -> None: 

338 """Causes on_startup signal 

339 

340 Should be called in the event loop along with the request handler. 

341 """ 

342 await self.on_startup.send(self) 

343 

344 async def shutdown(self) -> None: 

345 """Causes on_shutdown signal 

346 

347 Should be called before cleanup() 

348 """ 

349 await self.on_shutdown.send(self) 

350 

351 async def cleanup(self) -> None: 

352 """Causes on_cleanup signal 

353 

354 Should be called after shutdown() 

355 """ 

356 if self.on_cleanup.frozen: 

357 await self.on_cleanup.send(self) 

358 else: 

359 # If an exception occurs in startup, ensure cleanup contexts are completed. 

360 await self._cleanup_ctx._on_cleanup(self) 

361 

362 def _prepare_middleware(self) -> Iterator[Middleware]: 

363 yield from reversed(self._middlewares) 

364 yield _fix_request_current_app(self) 

365 

366 async def _handle(self, request: Request) -> StreamResponse: 

367 match_info = await self._router.resolve(request) 

368 match_info.add_app(self) 

369 match_info.freeze() 

370 

371 request._match_info = match_info 

372 

373 if request.headers.get(hdrs.EXPECT): 

374 resp = await match_info.expect_handler(request) 

375 await request.writer.drain() 

376 if resp is not None: 

377 return resp 

378 

379 handler = match_info.handler 

380 

381 if self._run_middlewares: 

382 # If its a SystemRoute, don't cache building the middlewares since 

383 # they are constructed for every MatchInfoError as a new handler 

384 # is made each time. 

385 if isinstance(match_info.route, SystemRoute): 

386 handler = _build_middlewares(handler, match_info.apps) 

387 else: 

388 handler = _cached_build_middleware(handler, match_info.apps) 

389 

390 return await handler(request) 

391 

392 def __call__(self) -> "Application": 

393 """gunicorn compatibility""" 

394 return self 

395 

396 def __repr__(self) -> str: 

397 return f"<Application 0x{id(self):x}>" 

398 

399 def __bool__(self) -> bool: 

400 return True 

401 

402 

403class CleanupError(RuntimeError): 

404 @property 

405 def exceptions(self) -> list[BaseException]: 

406 return cast(list[BaseException], self.args[1]) 

407 

408 

409_CleanupContextCallable = ( 

410 Callable[[Application], AbstractAsyncContextManager[None]] 

411 | Callable[[Application], AsyncIterator[None]] 

412) 

413 

414 

415class CleanupContext(FrozenList[_CleanupContextCallable]): 

416 def __init__(self) -> None: 

417 super().__init__() 

418 self._exits: list[AbstractAsyncContextManager[None]] = [] 

419 

420 async def _on_startup(self, app: Application) -> None: 

421 for cb in self: 

422 ctx = cb(app) 

423 

424 if not isinstance(ctx, AbstractAsyncContextManager): 

425 ctx = asynccontextmanager(cb)(app) # type: ignore[arg-type] 

426 

427 await ctx.__aenter__() 

428 self._exits.append(ctx) 

429 

430 async def _on_cleanup(self, app: Application) -> None: 

431 errors = [] 

432 for it in reversed(self._exits): 

433 try: 

434 await it.__aexit__(None, None, None) 

435 except (Exception, asyncio.CancelledError) as exc: 

436 errors.append(exc) 

437 if errors: 

438 if len(errors) == 1: 

439 raise errors[0] 

440 else: 

441 raise CleanupError("Multiple errors on cleanup stage", errors)