Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_app.py: 38%
253 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
1import asyncio
2import logging
3import warnings
4from functools import partial, update_wrapper
5from typing import ( # noqa
6 TYPE_CHECKING,
7 Any,
8 AsyncIterator,
9 Awaitable,
10 Callable,
11 Dict,
12 Iterable,
13 Iterator,
14 List,
15 Mapping,
16 MutableMapping,
17 Optional,
18 Sequence,
19 Tuple,
20 Type,
21 TypeVar,
22 Union,
23 cast,
24 overload,
25)
27from aiosignal import Signal
28from frozenlist import FrozenList
29from typing_extensions import final
31from . import hdrs
32from .helpers import AppKey
33from .log import web_logger
34from .typedefs import Middleware
35from .web_middlewares import _fix_request_current_app
36from .web_request import Request
37from .web_response import StreamResponse
38from .web_routedef import AbstractRouteDef
39from .web_urldispatcher import (
40 AbstractResource,
41 AbstractRoute,
42 Domain,
43 MaskDomain,
44 MatchedSubAppResource,
45 PrefixedSubAppResource,
46 UrlDispatcher,
47)
49__all__ = ("Application", "CleanupError")
52if TYPE_CHECKING: # pragma: no cover
53 _AppSignal = Signal[Callable[["Application"], Awaitable[None]]]
54 _RespPrepareSignal = Signal[Callable[[Request, StreamResponse], Awaitable[None]]]
55 _Middlewares = FrozenList[Middleware]
56 _MiddlewaresHandlers = Sequence[Middleware]
57 _Subapps = List["Application"]
58else:
59 # No type checker mode, skip types
60 _AppSignal = Signal
61 _RespPrepareSignal = Signal
62 _Handler = Callable
63 _Middlewares = FrozenList
64 _MiddlewaresHandlers = Sequence
65 _Subapps = List
67_T = TypeVar("_T")
68_U = TypeVar("_U")
71@final
72class Application(MutableMapping[Union[str, AppKey[Any]], Any]):
73 __slots__ = (
74 "logger",
75 "_debug",
76 "_router",
77 "_loop",
78 "_handler_args",
79 "_middlewares",
80 "_middlewares_handlers",
81 "_run_middlewares",
82 "_state",
83 "_frozen",
84 "_pre_frozen",
85 "_subapps",
86 "_on_response_prepare",
87 "_on_startup",
88 "_on_shutdown",
89 "_on_cleanup",
90 "_client_max_size",
91 "_cleanup_ctx",
92 )
94 def __init__(
95 self,
96 *,
97 logger: logging.Logger = web_logger,
98 middlewares: Iterable[Middleware] = (),
99 handler_args: Optional[Mapping[str, Any]] = None,
100 client_max_size: int = 1024**2,
101 debug: Any = ..., # mypy doesn't support ellipsis
102 ) -> None:
103 if debug is not ...:
104 warnings.warn(
105 "debug argument is no-op since 4.0 " "and scheduled for removal in 5.0",
106 DeprecationWarning,
107 stacklevel=2,
108 )
109 self._router = UrlDispatcher()
110 self._handler_args = handler_args
111 self.logger = logger
113 self._middlewares: _Middlewares = FrozenList(middlewares)
115 # initialized on freezing
116 self._middlewares_handlers: _MiddlewaresHandlers = tuple()
117 # initialized on freezing
118 self._run_middlewares: Optional[bool] = None
120 self._state: Dict[Union[AppKey[Any], str], object] = {}
121 self._frozen = False
122 self._pre_frozen = False
123 self._subapps: _Subapps = []
125 self._on_response_prepare: _RespPrepareSignal = Signal(self)
126 self._on_startup: _AppSignal = Signal(self)
127 self._on_shutdown: _AppSignal = Signal(self)
128 self._on_cleanup: _AppSignal = Signal(self)
129 self._cleanup_ctx = CleanupContext()
130 self._on_startup.append(self._cleanup_ctx._on_startup)
131 self._on_cleanup.append(self._cleanup_ctx._on_cleanup)
132 self._client_max_size = client_max_size
134 def __init_subclass__(cls: Type["Application"]) -> None:
135 raise TypeError(
136 "Inheritance class {} from web.Application "
137 "is forbidden".format(cls.__name__)
138 )
140 # MutableMapping API
142 def __eq__(self, other: object) -> bool:
143 return self is other
145 @overload # type: ignore[override]
146 def __getitem__(self, key: AppKey[_T]) -> _T:
147 ...
149 @overload
150 def __getitem__(self, key: str) -> Any:
151 ...
153 def __getitem__(self, key: Union[str, AppKey[_T]]) -> Any:
154 return self._state[key]
156 def _check_frozen(self) -> None:
157 if self._frozen:
158 raise RuntimeError(
159 "Changing state of started or joined " "application is forbidden"
160 )
162 @overload # type: ignore[override]
163 def __setitem__(self, key: AppKey[_T], value: _T) -> None:
164 ...
166 @overload
167 def __setitem__(self, key: str, value: Any) -> None:
168 ...
170 def __setitem__(self, key: Union[str, AppKey[_T]], value: Any) -> None:
171 self._check_frozen()
172 if not isinstance(key, AppKey):
173 warnings.warn(
174 "It is recommended to use web.AppKey instances for keys.\n"
175 + "https://docs.aiohttp.org/en/stable/web_advanced.html"
176 + "#application-s-config"
177 )
178 self._state[key] = value
180 def __delitem__(self, key: Union[str, AppKey[_T]]) -> None:
181 self._check_frozen()
182 del self._state[key]
184 def __len__(self) -> int:
185 return len(self._state)
187 def __iter__(self) -> Iterator[Union[str, AppKey[Any]]]:
188 return iter(self._state)
190 @overload # type: ignore[override]
191 def get(self, key: AppKey[_T], default: None = ...) -> Optional[_T]:
192 ...
194 @overload
195 def get(self, key: AppKey[_T], default: _U) -> Union[_T, _U]:
196 ...
198 @overload
199 def get(self, key: str, default: Any = ...) -> Any:
200 ...
202 def get(self, key: Union[str, AppKey[_T]], default: Any = None) -> Any:
203 return self._state.get(key, default)
205 ########
206 def _set_loop(self, loop: Optional[asyncio.AbstractEventLoop]) -> None:
207 warnings.warn(
208 "_set_loop() is no-op since 4.0 " "and scheduled for removal in 5.0",
209 DeprecationWarning,
210 stacklevel=2,
211 )
213 @property
214 def pre_frozen(self) -> bool:
215 return self._pre_frozen
217 def pre_freeze(self) -> None:
218 if self._pre_frozen:
219 return
221 self._pre_frozen = True
222 self._middlewares.freeze()
223 self._router.freeze()
224 self._on_response_prepare.freeze()
225 self._cleanup_ctx.freeze()
226 self._on_startup.freeze()
227 self._on_shutdown.freeze()
228 self._on_cleanup.freeze()
229 self._middlewares_handlers = tuple(self._prepare_middleware())
231 # If current app and any subapp do not have middlewares avoid run all
232 # of the code footprint that it implies, which have a middleware
233 # hardcoded per app that sets up the current_app attribute. If no
234 # middlewares are configured the handler will receive the proper
235 # current_app without needing all of this code.
236 self._run_middlewares = True if self.middlewares else False
238 for subapp in self._subapps:
239 subapp.pre_freeze()
240 self._run_middlewares = self._run_middlewares or subapp._run_middlewares
242 @property
243 def frozen(self) -> bool:
244 return self._frozen
246 def freeze(self) -> None:
247 if self._frozen:
248 return
250 self.pre_freeze()
251 self._frozen = True
252 for subapp in self._subapps:
253 subapp.freeze()
255 @property
256 def debug(self) -> bool:
257 warnings.warn(
258 "debug property is deprecated since 4.0" "and scheduled for removal in 5.0",
259 DeprecationWarning,
260 stacklevel=2,
261 )
262 return asyncio.get_event_loop().get_debug()
264 def _reg_subapp_signals(self, subapp: "Application") -> None:
265 def reg_handler(signame: str) -> None:
266 subsig = getattr(subapp, signame)
268 async def handler(app: "Application") -> None:
269 await subsig.send(subapp)
271 appsig = getattr(self, signame)
272 appsig.append(handler)
274 reg_handler("on_startup")
275 reg_handler("on_shutdown")
276 reg_handler("on_cleanup")
278 def add_subapp(self, prefix: str, subapp: "Application") -> AbstractResource:
279 if not isinstance(prefix, str):
280 raise TypeError("Prefix must be str")
281 prefix = prefix.rstrip("/")
282 if not prefix:
283 raise ValueError("Prefix cannot be empty")
284 factory = partial(PrefixedSubAppResource, prefix, subapp)
285 return self._add_subapp(factory, subapp)
287 def _add_subapp(
288 self, resource_factory: Callable[[], AbstractResource], subapp: "Application"
289 ) -> AbstractResource:
290 if self.frozen:
291 raise RuntimeError("Cannot add sub application to frozen application")
292 if subapp.frozen:
293 raise RuntimeError("Cannot add frozen application")
294 resource = resource_factory()
295 self.router.register_resource(resource)
296 self._reg_subapp_signals(subapp)
297 self._subapps.append(subapp)
298 subapp.pre_freeze()
299 return resource
301 def add_domain(self, domain: str, subapp: "Application") -> AbstractResource:
302 if not isinstance(domain, str):
303 raise TypeError("Domain must be str")
304 elif "*" in domain:
305 rule: Domain = MaskDomain(domain)
306 else:
307 rule = Domain(domain)
308 factory = partial(MatchedSubAppResource, rule, subapp)
309 return self._add_subapp(factory, subapp)
311 def add_routes(self, routes: Iterable[AbstractRouteDef]) -> List[AbstractRoute]:
312 return self.router.add_routes(routes)
314 @property
315 def on_response_prepare(self) -> _RespPrepareSignal:
316 return self._on_response_prepare
318 @property
319 def on_startup(self) -> _AppSignal:
320 return self._on_startup
322 @property
323 def on_shutdown(self) -> _AppSignal:
324 return self._on_shutdown
326 @property
327 def on_cleanup(self) -> _AppSignal:
328 return self._on_cleanup
330 @property
331 def cleanup_ctx(self) -> "CleanupContext":
332 return self._cleanup_ctx
334 @property
335 def router(self) -> UrlDispatcher:
336 return self._router
338 @property
339 def middlewares(self) -> _Middlewares:
340 return self._middlewares
342 async def startup(self) -> None:
343 """Causes on_startup signal
345 Should be called in the event loop along with the request handler.
346 """
347 await self.on_startup.send(self)
349 async def shutdown(self) -> None:
350 """Causes on_shutdown signal
352 Should be called before cleanup()
353 """
354 await self.on_shutdown.send(self)
356 async def cleanup(self) -> None:
357 """Causes on_cleanup signal
359 Should be called after shutdown()
360 """
361 if self.on_cleanup.frozen:
362 await self.on_cleanup.send(self)
363 else:
364 # If an exception occurs in startup, ensure cleanup contexts are completed.
365 await self._cleanup_ctx._on_cleanup(self)
367 def _prepare_middleware(self) -> Iterator[Middleware]:
368 yield from reversed(self._middlewares)
369 yield _fix_request_current_app(self)
371 async def _handle(self, request: Request) -> StreamResponse:
372 match_info = await self._router.resolve(request)
373 match_info.add_app(self)
374 match_info.freeze()
376 resp = None
377 request._match_info = match_info
378 expect = request.headers.get(hdrs.EXPECT)
379 if expect:
380 resp = await match_info.expect_handler(request)
381 await request.writer.drain()
383 if resp is None:
384 handler = match_info.handler
386 if self._run_middlewares:
387 for app in match_info.apps[::-1]:
388 assert app.pre_frozen, "middleware handlers are not ready"
389 for m in app._middlewares_handlers:
390 handler = update_wrapper(partial(m, handler=handler), handler)
392 resp = await handler(request)
394 return resp
396 def __call__(self) -> "Application":
397 """gunicorn compatibility"""
398 return self
400 def __repr__(self) -> str:
401 return f"<Application 0x{id(self):x}>"
403 def __bool__(self) -> bool:
404 return True
407class CleanupError(RuntimeError):
408 @property
409 def exceptions(self) -> List[BaseException]:
410 return cast(List[BaseException], self.args[1])
413if TYPE_CHECKING: # pragma: no cover
414 _CleanupContextBase = FrozenList[Callable[[Application], AsyncIterator[None]]]
415else:
416 _CleanupContextBase = FrozenList
419class CleanupContext(_CleanupContextBase):
420 def __init__(self) -> None:
421 super().__init__()
422 self._exits: List[AsyncIterator[None]] = []
424 async def _on_startup(self, app: Application) -> None:
425 for cb in self:
426 it = cb(app).__aiter__()
427 await it.__anext__()
428 self._exits.append(it)
430 async def _on_cleanup(self, app: Application) -> None:
431 errors = []
432 for it in reversed(self._exits):
433 try:
434 await it.__anext__()
435 except StopAsyncIteration:
436 pass
437 except Exception as exc:
438 errors.append(exc)
439 else:
440 errors.append(RuntimeError(f"{it!r} has more than one 'yield'"))
441 if errors:
442 if len(errors) == 1:
443 raise errors[0]
444 else:
445 raise CleanupError("Multiple errors on cleanup stage", errors)