Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_app.py: 39%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import logging
3import warnings
4from functools import lru_cache, partial, update_wrapper
5from typing import (
6 TYPE_CHECKING,
7 Any,
8 AsyncIterator,
9 Awaitable,
10 Callable,
11 Dict,
12 Iterable,
13 Iterator,
14 List,
15 Mapping,
16 MutableMapping,
17 Optional,
18 Sequence,
19 Tuple,
20 Type,
21 TypeVar,
22 Union,
23 cast,
24 final,
25 overload,
26)
28from aiosignal import Signal
29from frozenlist import FrozenList
31from . import hdrs
32from .helpers import AppKey
33from .log import web_logger
34from .typedefs import Handler, Middleware
35from .web_exceptions import NotAppKeyWarning
36from .web_middlewares import _fix_request_current_app
37from .web_request import Request
38from .web_response import StreamResponse
39from .web_routedef import AbstractRouteDef
40from .web_urldispatcher import (
41 AbstractResource,
42 AbstractRoute,
43 Domain,
44 MaskDomain,
45 MatchedSubAppResource,
46 PrefixedSubAppResource,
47 SystemRoute,
48 UrlDispatcher,
49)
51__all__ = ("Application", "CleanupError")
54if TYPE_CHECKING:
55 _AppSignal = Signal[Callable[["Application"], Awaitable[None]]]
56 _RespPrepareSignal = Signal[Callable[[Request, StreamResponse], Awaitable[None]]]
57 _Middlewares = FrozenList[Middleware]
58 _MiddlewaresHandlers = Sequence[Middleware]
59 _Subapps = List["Application"]
60else:
61 # No type checker mode, skip types
62 _AppSignal = Signal
63 _RespPrepareSignal = Signal
64 _Handler = Callable
65 _Middlewares = FrozenList
66 _MiddlewaresHandlers = Sequence
67 _Subapps = List
69_T = TypeVar("_T")
70_U = TypeVar("_U")
71_Resource = TypeVar("_Resource", bound=AbstractResource)
74def _build_middlewares(
75 handler: Handler, apps: Tuple["Application", ...]
76) -> Callable[[Request], Awaitable[StreamResponse]]:
77 """Apply middlewares to handler."""
78 # The slice is to reverse the order of the apps
79 # so they are applied in the order they were added
80 for app in apps[::-1]:
81 assert app.pre_frozen, "middleware handlers are not ready"
82 for m in app._middlewares_handlers:
83 handler = update_wrapper(partial(m, handler=handler), handler)
84 return handler
87_cached_build_middleware = lru_cache(maxsize=1024)(_build_middlewares)
90@final
91class Application(MutableMapping[Union[str, AppKey[Any]], Any]):
92 __slots__ = (
93 "logger",
94 "_router",
95 "_loop",
96 "_handler_args",
97 "_middlewares",
98 "_middlewares_handlers",
99 "_run_middlewares",
100 "_state",
101 "_frozen",
102 "_pre_frozen",
103 "_subapps",
104 "_on_response_prepare",
105 "_on_startup",
106 "_on_shutdown",
107 "_on_cleanup",
108 "_client_max_size",
109 "_cleanup_ctx",
110 )
112 def __init__(
113 self,
114 *,
115 logger: logging.Logger = web_logger,
116 middlewares: Iterable[Middleware] = (),
117 handler_args: Optional[Mapping[str, Any]] = None,
118 client_max_size: int = 1024**2,
119 debug: Any = ..., # mypy doesn't support ellipsis
120 ) -> None:
121 if debug is not ...:
122 warnings.warn(
123 "debug argument is no-op since 4.0 and scheduled for removal in 5.0",
124 DeprecationWarning,
125 stacklevel=2,
126 )
127 self._router = UrlDispatcher()
128 self._handler_args = handler_args
129 self.logger = logger
131 self._middlewares: _Middlewares = FrozenList(middlewares)
133 # initialized on freezing
134 self._middlewares_handlers: _MiddlewaresHandlers = tuple()
135 # initialized on freezing
136 self._run_middlewares: Optional[bool] = None
138 self._state: Dict[Union[AppKey[Any], str], object] = {}
139 self._frozen = False
140 self._pre_frozen = False
141 self._subapps: _Subapps = []
143 self._on_response_prepare: _RespPrepareSignal = Signal(self)
144 self._on_startup: _AppSignal = Signal(self)
145 self._on_shutdown: _AppSignal = Signal(self)
146 self._on_cleanup: _AppSignal = Signal(self)
147 self._cleanup_ctx = CleanupContext()
148 self._on_startup.append(self._cleanup_ctx._on_startup)
149 self._on_cleanup.append(self._cleanup_ctx._on_cleanup)
150 self._client_max_size = client_max_size
152 def __init_subclass__(cls: Type["Application"]) -> None:
153 raise TypeError(
154 "Inheritance class {} from web.Application "
155 "is forbidden".format(cls.__name__)
156 )
158 # MutableMapping API
160 def __eq__(self, other: object) -> bool:
161 return self is other
163 @overload # type: ignore[override]
164 def __getitem__(self, key: AppKey[_T]) -> _T: ...
166 @overload
167 def __getitem__(self, key: str) -> Any: ... # type: ignore[misc]
169 def __getitem__(self, key: Union[str, AppKey[_T]]) -> Any:
170 return self._state[key]
172 def _check_frozen(self) -> None:
173 if self._frozen:
174 raise RuntimeError(
175 "Changing state of started or joined application is forbidden"
176 )
178 @overload # type: ignore[override]
179 def __setitem__(self, key: AppKey[_T], value: _T) -> None: ...
181 @overload
182 def __setitem__(self, key: str, value: Any) -> None: ... # type: ignore[misc]
184 def __setitem__(self, key: Union[str, AppKey[_T]], value: Any) -> None:
185 self._check_frozen()
186 if not isinstance(key, AppKey):
187 warnings.warn(
188 "It is recommended to use web.AppKey instances for keys.\n"
189 + "https://docs.aiohttp.org/en/stable/web_advanced.html"
190 + "#application-s-config",
191 category=NotAppKeyWarning,
192 stacklevel=2,
193 )
194 self._state[key] = value
196 def __delitem__(self, key: Union[str, AppKey[_T]]) -> None:
197 self._check_frozen()
198 del self._state[key]
200 def __len__(self) -> int:
201 return len(self._state)
203 def __iter__(self) -> Iterator[Union[str, AppKey[Any]]]:
204 return iter(self._state)
206 def __hash__(self) -> int:
207 return id(self)
209 @overload # type: ignore[override]
210 def get(self, key: AppKey[_T], default: None = ...) -> Optional[_T]: ...
212 @overload
213 def get(self, key: AppKey[_T], default: _U) -> Union[_T, _U]: ...
215 @overload
216 def get(self, key: str, default: Any = ...) -> Any: ... # type: ignore[misc]
218 def get(self, key: Union[str, AppKey[_T]], default: Any = None) -> Any:
219 return self._state.get(key, default)
221 ########
222 def _set_loop(self, loop: Optional[asyncio.AbstractEventLoop]) -> None:
223 warnings.warn(
224 "_set_loop() is no-op since 4.0 and scheduled for removal in 5.0",
225 DeprecationWarning,
226 stacklevel=2,
227 )
229 @property
230 def pre_frozen(self) -> bool:
231 return self._pre_frozen
233 def pre_freeze(self) -> None:
234 if self._pre_frozen:
235 return
237 self._pre_frozen = True
238 self._middlewares.freeze()
239 self._router.freeze()
240 self._on_response_prepare.freeze()
241 self._cleanup_ctx.freeze()
242 self._on_startup.freeze()
243 self._on_shutdown.freeze()
244 self._on_cleanup.freeze()
245 self._middlewares_handlers = tuple(self._prepare_middleware())
247 # If current app and any subapp do not have middlewares avoid run all
248 # of the code footprint that it implies, which have a middleware
249 # hardcoded per app that sets up the current_app attribute. If no
250 # middlewares are configured the handler will receive the proper
251 # current_app without needing all of this code.
252 self._run_middlewares = True if self.middlewares else False
254 for subapp in self._subapps:
255 subapp.pre_freeze()
256 self._run_middlewares = self._run_middlewares or subapp._run_middlewares
258 @property
259 def frozen(self) -> bool:
260 return self._frozen
262 def freeze(self) -> None:
263 if self._frozen:
264 return
266 self.pre_freeze()
267 self._frozen = True
268 for subapp in self._subapps:
269 subapp.freeze()
271 @property
272 def debug(self) -> bool:
273 warnings.warn(
274 "debug property is deprecated since 4.0 and scheduled for removal in 5.0",
275 DeprecationWarning,
276 stacklevel=2,
277 )
278 return asyncio.get_event_loop().get_debug()
280 def _reg_subapp_signals(self, subapp: "Application") -> None:
281 def reg_handler(signame: str) -> None:
282 subsig = getattr(subapp, signame)
284 async def handler(app: "Application") -> None:
285 await subsig.send(subapp)
287 appsig = getattr(self, signame)
288 appsig.append(handler)
290 reg_handler("on_startup")
291 reg_handler("on_shutdown")
292 reg_handler("on_cleanup")
294 def add_subapp(self, prefix: str, subapp: "Application") -> PrefixedSubAppResource:
295 if not isinstance(prefix, str):
296 raise TypeError("Prefix must be str")
297 prefix = prefix.rstrip("/")
298 if not prefix:
299 raise ValueError("Prefix cannot be empty")
300 factory = partial(PrefixedSubAppResource, prefix, subapp)
301 return self._add_subapp(factory, subapp)
303 def _add_subapp(
304 self, resource_factory: Callable[[], _Resource], subapp: "Application"
305 ) -> _Resource:
306 if self.frozen:
307 raise RuntimeError("Cannot add sub application to frozen application")
308 if subapp.frozen:
309 raise RuntimeError("Cannot add frozen application")
310 resource = resource_factory()
311 self.router.register_resource(resource)
312 self._reg_subapp_signals(subapp)
313 self._subapps.append(subapp)
314 subapp.pre_freeze()
315 return resource
317 def add_domain(self, domain: str, subapp: "Application") -> MatchedSubAppResource:
318 if not isinstance(domain, str):
319 raise TypeError("Domain must be str")
320 elif "*" in domain:
321 rule: Domain = MaskDomain(domain)
322 else:
323 rule = Domain(domain)
324 factory = partial(MatchedSubAppResource, rule, subapp)
325 return self._add_subapp(factory, subapp)
327 def add_routes(self, routes: Iterable[AbstractRouteDef]) -> List[AbstractRoute]:
328 return self.router.add_routes(routes)
330 @property
331 def on_response_prepare(self) -> _RespPrepareSignal:
332 return self._on_response_prepare
334 @property
335 def on_startup(self) -> _AppSignal:
336 return self._on_startup
338 @property
339 def on_shutdown(self) -> _AppSignal:
340 return self._on_shutdown
342 @property
343 def on_cleanup(self) -> _AppSignal:
344 return self._on_cleanup
346 @property
347 def cleanup_ctx(self) -> "CleanupContext":
348 return self._cleanup_ctx
350 @property
351 def router(self) -> UrlDispatcher:
352 return self._router
354 @property
355 def middlewares(self) -> _Middlewares:
356 return self._middlewares
358 async def startup(self) -> None:
359 """Causes on_startup signal
361 Should be called in the event loop along with the request handler.
362 """
363 await self.on_startup.send(self)
365 async def shutdown(self) -> None:
366 """Causes on_shutdown signal
368 Should be called before cleanup()
369 """
370 await self.on_shutdown.send(self)
372 async def cleanup(self) -> None:
373 """Causes on_cleanup signal
375 Should be called after shutdown()
376 """
377 if self.on_cleanup.frozen:
378 await self.on_cleanup.send(self)
379 else:
380 # If an exception occurs in startup, ensure cleanup contexts are completed.
381 await self._cleanup_ctx._on_cleanup(self)
383 def _prepare_middleware(self) -> Iterator[Middleware]:
384 yield from reversed(self._middlewares)
385 yield _fix_request_current_app(self)
387 async def _handle(self, request: Request) -> StreamResponse:
388 match_info = await self._router.resolve(request)
389 match_info.add_app(self)
390 match_info.freeze()
392 request._match_info = match_info
394 if request.headers.get(hdrs.EXPECT):
395 resp = await match_info.expect_handler(request)
396 await request.writer.drain()
397 if resp is not None:
398 return resp
400 handler = match_info.handler
402 if self._run_middlewares:
403 # If its a SystemRoute, don't cache building the middlewares since
404 # they are constructed for every MatchInfoError as a new handler
405 # is made each time.
406 if isinstance(match_info.route, SystemRoute):
407 handler = _build_middlewares(handler, match_info.apps)
408 else:
409 handler = _cached_build_middleware(handler, match_info.apps)
411 return await handler(request)
413 def __call__(self) -> "Application":
414 """gunicorn compatibility"""
415 return self
417 def __repr__(self) -> str:
418 return f"<Application 0x{id(self):x}>"
420 def __bool__(self) -> bool:
421 return True
424class CleanupError(RuntimeError):
425 @property
426 def exceptions(self) -> List[BaseException]:
427 return cast(List[BaseException], self.args[1])
430if TYPE_CHECKING:
431 _CleanupContextBase = FrozenList[Callable[[Application], AsyncIterator[None]]]
432else:
433 _CleanupContextBase = FrozenList
436class CleanupContext(_CleanupContextBase):
437 def __init__(self) -> None:
438 super().__init__()
439 self._exits: List[AsyncIterator[None]] = []
441 async def _on_startup(self, app: Application) -> None:
442 for cb in self:
443 it = cb(app).__aiter__()
444 await it.__anext__()
445 self._exits.append(it)
447 async def _on_cleanup(self, app: Application) -> None:
448 errors = []
449 for it in reversed(self._exits):
450 try:
451 await it.__anext__()
452 except StopAsyncIteration:
453 pass
454 except (Exception, asyncio.CancelledError) as exc:
455 errors.append(exc)
456 else:
457 errors.append(RuntimeError(f"{it!r} has more than one 'yield'"))
458 if errors:
459 if len(errors) == 1:
460 raise errors[0]
461 else:
462 raise CleanupError("Multiple errors on cleanup stage", errors)