Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_app.py: 40%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import logging
3import warnings
4from collections.abc import (
5 AsyncIterator,
6 Awaitable,
7 Callable,
8 Iterable,
9 Iterator,
10 Mapping,
11 MutableMapping,
12 Sequence,
13)
14from contextlib import AbstractAsyncContextManager, asynccontextmanager
15from functools import lru_cache, partial, update_wrapper
16from typing import Any, TypeVar, cast, final, overload
18from aiosignal import Signal
19from frozenlist import FrozenList
21from . import hdrs
22from .helpers import AppKey
23from .log import web_logger
24from .typedefs import Handler, Middleware
25from .web_exceptions import NotAppKeyWarning
26from .web_middlewares import _fix_request_current_app
27from .web_request import Request
28from .web_response import StreamResponse
29from .web_routedef import AbstractRouteDef
30from .web_urldispatcher import (
31 AbstractResource,
32 AbstractRoute,
33 Domain,
34 MaskDomain,
35 MatchedSubAppResource,
36 PrefixedSubAppResource,
37 SystemRoute,
38 UrlDispatcher,
39)
41__all__ = ("Application", "CleanupError")
43_AppSignal = Signal["Application"]
44_RespPrepareSignal = Signal[Request, StreamResponse]
45_Middlewares = FrozenList[Middleware]
46_MiddlewaresHandlers = Sequence[Middleware]
47_Subapps = list["Application"]
49_T = TypeVar("_T")
50_U = TypeVar("_U")
51_Resource = TypeVar("_Resource", bound=AbstractResource)
54def _build_middlewares(
55 handler: Handler, apps: tuple["Application", ...]
56) -> Callable[[Request], Awaitable[StreamResponse]]:
57 """Apply middlewares to handler."""
58 # The slice is to reverse the order of the apps
59 # so they are applied in the order they were added
60 for app in apps[::-1]:
61 assert app.pre_frozen, "middleware handlers are not ready"
62 for m in app._middlewares_handlers:
63 handler = update_wrapper(partial(m, handler=handler), handler)
64 return handler
67_cached_build_middleware = lru_cache(maxsize=1024)(_build_middlewares)
70@final
71class Application(MutableMapping[str | AppKey[Any], Any]):
72 __slots__ = (
73 "logger",
74 "_router",
75 "_loop",
76 "_handler_args",
77 "_middlewares",
78 "_middlewares_handlers",
79 "_run_middlewares",
80 "_state",
81 "_frozen",
82 "_pre_frozen",
83 "_subapps",
84 "_on_response_prepare",
85 "_on_startup",
86 "_on_shutdown",
87 "_on_cleanup",
88 "_client_max_size",
89 "_cleanup_ctx",
90 )
92 def __init__(
93 self,
94 *,
95 logger: logging.Logger = web_logger,
96 middlewares: Iterable[Middleware] = (),
97 handler_args: Mapping[str, Any] | None = None,
98 client_max_size: int = 1024**2,
99 debug: Any = ..., # mypy doesn't support ellipsis
100 ) -> None:
101 if debug is not ...:
102 warnings.warn(
103 "debug argument is no-op since 4.0 and scheduled for removal in 5.0",
104 DeprecationWarning,
105 stacklevel=2,
106 )
107 self._router = UrlDispatcher()
108 self._handler_args = handler_args
109 self.logger = logger
111 self._middlewares: _Middlewares = FrozenList(middlewares)
113 # initialized on freezing
114 self._middlewares_handlers: _MiddlewaresHandlers = tuple()
115 # initialized on freezing
116 self._run_middlewares: bool | None = None
118 self._state: dict[AppKey[Any] | str, object] = {}
119 self._frozen = False
120 self._pre_frozen = False
121 self._subapps: _Subapps = []
123 self._on_response_prepare: _RespPrepareSignal = Signal(self)
124 self._on_startup: _AppSignal = Signal(self)
125 self._on_shutdown: _AppSignal = Signal(self)
126 self._on_cleanup: _AppSignal = Signal(self)
127 self._cleanup_ctx = CleanupContext()
128 self._on_startup.append(self._cleanup_ctx._on_startup)
129 self._on_cleanup.append(self._cleanup_ctx._on_cleanup)
130 self._client_max_size = client_max_size
132 def __init_subclass__(cls: type["Application"]) -> None:
133 raise TypeError(
134 f"Inheritance class {cls.__name__} from web.Application is forbidden"
135 )
137 # MutableMapping API
139 def __eq__(self, other: object) -> bool:
140 return self is other
142 @overload # type: ignore[override]
143 def __getitem__(self, key: AppKey[_T]) -> _T: ...
145 @overload
146 def __getitem__(self, key: str) -> Any: ...
148 def __getitem__(self, key: str | AppKey[_T]) -> Any:
149 return self._state[key]
151 def _check_frozen(self) -> None:
152 if self._frozen:
153 raise RuntimeError(
154 "Changing state of started or joined application is forbidden"
155 )
157 @overload # type: ignore[override]
158 def __setitem__(self, key: AppKey[_T], value: _T) -> None: ...
160 @overload
161 def __setitem__(self, key: str, value: Any) -> None: ...
163 def __setitem__(self, key: str | AppKey[_T], value: Any) -> None:
164 self._check_frozen()
165 if not isinstance(key, AppKey):
166 warnings.warn(
167 "It is recommended to use web.AppKey instances for keys.\n"
168 + "https://docs.aiohttp.org/en/stable/web_advanced.html"
169 + "#application-s-config",
170 category=NotAppKeyWarning,
171 stacklevel=2,
172 )
173 self._state[key] = value
175 def __delitem__(self, key: str | AppKey[_T]) -> None:
176 self._check_frozen()
177 del self._state[key]
179 def __len__(self) -> int:
180 return len(self._state)
182 def __iter__(self) -> Iterator[str | AppKey[Any]]:
183 return iter(self._state)
185 def __hash__(self) -> int:
186 return id(self)
188 @overload # type: ignore[override]
189 def get(self, key: AppKey[_T], default: None = ...) -> _T | None: ...
191 @overload
192 def get(self, key: AppKey[_T], default: _U) -> _T | _U: ...
194 @overload
195 def get(self, key: str, default: Any = ...) -> Any: ...
197 def get(self, key: str | AppKey[_T], default: Any = None) -> Any:
198 return self._state.get(key, default)
200 ########
201 def _set_loop(self, loop: asyncio.AbstractEventLoop | None) -> None:
202 warnings.warn(
203 "_set_loop() is no-op since 4.0 and scheduled for removal in 5.0",
204 DeprecationWarning,
205 stacklevel=2,
206 )
208 @property
209 def pre_frozen(self) -> bool:
210 return self._pre_frozen
212 def pre_freeze(self) -> None:
213 if self._pre_frozen:
214 return
216 self._pre_frozen = True
217 self._middlewares.freeze()
218 self._router.freeze()
219 self._on_response_prepare.freeze()
220 self._cleanup_ctx.freeze()
221 self._on_startup.freeze()
222 self._on_shutdown.freeze()
223 self._on_cleanup.freeze()
224 self._middlewares_handlers = tuple(self._prepare_middleware())
226 # If current app and any subapp do not have middlewares avoid run all
227 # of the code footprint that it implies, which have a middleware
228 # hardcoded per app that sets up the current_app attribute. If no
229 # middlewares are configured the handler will receive the proper
230 # current_app without needing all of this code.
231 self._run_middlewares = True if self.middlewares else False
233 for subapp in self._subapps:
234 subapp.pre_freeze()
235 self._run_middlewares = self._run_middlewares or subapp._run_middlewares
237 @property
238 def frozen(self) -> bool:
239 return self._frozen
241 def freeze(self) -> None:
242 if self._frozen:
243 return
245 self.pre_freeze()
246 self._frozen = True
247 for subapp in self._subapps:
248 subapp.freeze()
250 @property
251 def debug(self) -> bool:
252 warnings.warn(
253 "debug property is deprecated since 4.0 and scheduled for removal in 5.0",
254 DeprecationWarning,
255 stacklevel=2,
256 )
257 return asyncio.get_event_loop().get_debug()
259 def _reg_subapp_signals(self, subapp: "Application") -> None:
260 def reg_handler(signame: str) -> None:
261 subsig = getattr(subapp, signame)
263 async def handler(app: "Application") -> None:
264 await subsig.send(subapp)
266 appsig = getattr(self, signame)
267 appsig.append(handler)
269 reg_handler("on_startup")
270 reg_handler("on_shutdown")
271 reg_handler("on_cleanup")
273 def add_subapp(self, prefix: str, subapp: "Application") -> PrefixedSubAppResource:
274 if not isinstance(prefix, str):
275 raise TypeError("Prefix must be str")
276 prefix = prefix.rstrip("/")
277 if not prefix:
278 raise ValueError("Prefix cannot be empty")
279 factory = partial(PrefixedSubAppResource, prefix, subapp)
280 return self._add_subapp(factory, subapp)
282 def _add_subapp(
283 self, resource_factory: Callable[[], _Resource], subapp: "Application"
284 ) -> _Resource:
285 if self.frozen:
286 raise RuntimeError("Cannot add sub application to frozen application")
287 if subapp.frozen:
288 raise RuntimeError("Cannot add frozen application")
289 resource = resource_factory()
290 self.router.register_resource(resource)
291 self._reg_subapp_signals(subapp)
292 self._subapps.append(subapp)
293 subapp.pre_freeze()
294 return resource
296 def add_domain(self, domain: str, subapp: "Application") -> MatchedSubAppResource:
297 if not isinstance(domain, str):
298 raise TypeError("Domain must be str")
299 elif "*" in domain:
300 rule: Domain = MaskDomain(domain)
301 else:
302 rule = Domain(domain)
303 factory = partial(MatchedSubAppResource, rule, subapp)
304 return self._add_subapp(factory, subapp)
306 def add_routes(self, routes: Iterable[AbstractRouteDef]) -> list[AbstractRoute]:
307 return self.router.add_routes(routes)
309 @property
310 def on_response_prepare(self) -> _RespPrepareSignal:
311 return self._on_response_prepare
313 @property
314 def on_startup(self) -> _AppSignal:
315 return self._on_startup
317 @property
318 def on_shutdown(self) -> _AppSignal:
319 return self._on_shutdown
321 @property
322 def on_cleanup(self) -> _AppSignal:
323 return self._on_cleanup
325 @property
326 def cleanup_ctx(self) -> "CleanupContext":
327 return self._cleanup_ctx
329 @property
330 def router(self) -> UrlDispatcher:
331 return self._router
333 @property
334 def middlewares(self) -> _Middlewares:
335 return self._middlewares
337 async def startup(self) -> None:
338 """Causes on_startup signal
340 Should be called in the event loop along with the request handler.
341 """
342 await self.on_startup.send(self)
344 async def shutdown(self) -> None:
345 """Causes on_shutdown signal
347 Should be called before cleanup()
348 """
349 await self.on_shutdown.send(self)
351 async def cleanup(self) -> None:
352 """Causes on_cleanup signal
354 Should be called after shutdown()
355 """
356 if self.on_cleanup.frozen:
357 await self.on_cleanup.send(self)
358 else:
359 # If an exception occurs in startup, ensure cleanup contexts are completed.
360 await self._cleanup_ctx._on_cleanup(self)
362 def _prepare_middleware(self) -> Iterator[Middleware]:
363 yield from reversed(self._middlewares)
364 yield _fix_request_current_app(self)
366 async def _handle(self, request: Request) -> StreamResponse:
367 match_info = await self._router.resolve(request)
368 match_info.add_app(self)
369 match_info.freeze()
371 request._match_info = match_info
373 if request.headers.get(hdrs.EXPECT):
374 resp = await match_info.expect_handler(request)
375 await request.writer.drain()
376 if resp is not None:
377 return resp
379 handler = match_info.handler
381 if self._run_middlewares:
382 # If its a SystemRoute, don't cache building the middlewares since
383 # they are constructed for every MatchInfoError as a new handler
384 # is made each time.
385 if isinstance(match_info.route, SystemRoute):
386 handler = _build_middlewares(handler, match_info.apps)
387 else:
388 handler = _cached_build_middleware(handler, match_info.apps)
390 return await handler(request)
392 def __call__(self) -> "Application":
393 """gunicorn compatibility"""
394 return self
396 def __repr__(self) -> str:
397 return f"<Application 0x{id(self):x}>"
399 def __bool__(self) -> bool:
400 return True
403class CleanupError(RuntimeError):
404 @property
405 def exceptions(self) -> list[BaseException]:
406 return cast(list[BaseException], self.args[1])
409_CleanupContextCallable = (
410 Callable[[Application], AbstractAsyncContextManager[None]]
411 | Callable[[Application], AsyncIterator[None]]
412)
415class CleanupContext(FrozenList[_CleanupContextCallable]):
416 def __init__(self) -> None:
417 super().__init__()
418 self._exits: list[AbstractAsyncContextManager[None]] = []
420 async def _on_startup(self, app: Application) -> None:
421 for cb in self:
422 ctx = cb(app)
424 if not isinstance(ctx, AbstractAsyncContextManager):
425 ctx = asynccontextmanager(cb)(app) # type: ignore[arg-type]
427 await ctx.__aenter__()
428 self._exits.append(ctx)
430 async def _on_cleanup(self, app: Application) -> None:
431 errors = []
432 for it in reversed(self._exits):
433 try:
434 await it.__aexit__(None, None, None)
435 except (Exception, asyncio.CancelledError) as exc:
436 errors.append(exc)
437 if errors:
438 if len(errors) == 1:
439 raise errors[0]
440 else:
441 raise CleanupError("Multiple errors on cleanup stage", errors)