Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_app.py: 38%
261 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-26 06:16 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-26 06:16 +0000
1import asyncio
2import logging
3import warnings
4from functools import partial, update_wrapper
5from typing import (
6 TYPE_CHECKING,
7 Any,
8 AsyncIterator,
9 Awaitable,
10 Callable,
11 Dict,
12 Iterable,
13 Iterator,
14 List,
15 Mapping,
16 MutableMapping,
17 Optional,
18 Sequence,
19 Type,
20 TypeVar,
21 Union,
22 cast,
23 final,
24 overload,
25)
27from aiosignal import Signal
28from frozenlist import FrozenList
30from . import hdrs
31from .helpers import AppKey
32from .log import web_logger
33from .typedefs import Middleware
34from .web_exceptions import NotAppKeyWarning
35from .web_middlewares import _fix_request_current_app
36from .web_request import Request
37from .web_response import StreamResponse
38from .web_routedef import AbstractRouteDef
39from .web_urldispatcher import (
40 AbstractResource,
41 AbstractRoute,
42 Domain,
43 MaskDomain,
44 MatchedSubAppResource,
45 PrefixedSubAppResource,
46 UrlDispatcher,
47)
49__all__ = ("Application", "CleanupError")
52if TYPE_CHECKING:
53 _AppSignal = Signal[Callable[["Application"], Awaitable[None]]]
54 _RespPrepareSignal = Signal[Callable[[Request, StreamResponse], Awaitable[None]]]
55 _Middlewares = FrozenList[Middleware]
56 _MiddlewaresHandlers = Sequence[Middleware]
57 _Subapps = List["Application"]
58else:
59 # No type checker mode, skip types
60 _AppSignal = Signal
61 _RespPrepareSignal = Signal
62 _Handler = Callable
63 _Middlewares = FrozenList
64 _MiddlewaresHandlers = Sequence
65 _Subapps = List
67_T = TypeVar("_T")
68_U = TypeVar("_U")
71@final
72class Application(MutableMapping[Union[str, AppKey[Any]], Any]):
73 __slots__ = (
74 "logger",
75 "_debug",
76 "_router",
77 "_loop",
78 "_handler_args",
79 "_middlewares",
80 "_middlewares_handlers",
81 "_run_middlewares",
82 "_state",
83 "_frozen",
84 "_pre_frozen",
85 "_subapps",
86 "_on_response_prepare",
87 "_on_startup",
88 "_on_shutdown",
89 "_on_cleanup",
90 "_client_max_size",
91 "_cleanup_ctx",
92 )
94 def __init__(
95 self,
96 *,
97 logger: logging.Logger = web_logger,
98 middlewares: Iterable[Middleware] = (),
99 handler_args: Optional[Mapping[str, Any]] = None,
100 client_max_size: int = 1024**2,
101 debug: Any = ..., # mypy doesn't support ellipsis
102 ) -> None:
103 if debug is not ...:
104 warnings.warn(
105 "debug argument is no-op since 4.0 " "and scheduled for removal in 5.0",
106 DeprecationWarning,
107 stacklevel=2,
108 )
109 self._router = UrlDispatcher()
110 self._handler_args = handler_args
111 self.logger = logger
113 self._middlewares: _Middlewares = FrozenList(middlewares)
115 # initialized on freezing
116 self._middlewares_handlers: _MiddlewaresHandlers = tuple()
117 # initialized on freezing
118 self._run_middlewares: Optional[bool] = None
120 self._state: Dict[Union[AppKey[Any], str], object] = {}
121 self._frozen = False
122 self._pre_frozen = False
123 self._subapps: _Subapps = []
125 self._on_response_prepare: _RespPrepareSignal = Signal(self)
126 self._on_startup: _AppSignal = Signal(self)
127 self._on_shutdown: _AppSignal = Signal(self)
128 self._on_cleanup: _AppSignal = Signal(self)
129 self._cleanup_ctx = CleanupContext()
130 self._on_startup.append(self._cleanup_ctx._on_startup)
131 self._on_cleanup.append(self._cleanup_ctx._on_cleanup)
132 self._client_max_size = client_max_size
134 def __init_subclass__(cls: Type["Application"]) -> None:
135 raise TypeError(
136 "Inheritance class {} from web.Application "
137 "is forbidden".format(cls.__name__)
138 )
140 # MutableMapping API
142 def __eq__(self, other: object) -> bool:
143 return self is other
145 @overload # type: ignore[override]
146 def __getitem__(self, key: AppKey[_T]) -> _T:
147 ...
149 @overload
150 def __getitem__(self, key: str) -> Any:
151 ...
153 def __getitem__(self, key: Union[str, AppKey[_T]]) -> Any:
154 return self._state[key]
156 def _check_frozen(self) -> None:
157 if self._frozen:
158 raise RuntimeError(
159 "Changing state of started or joined " "application is forbidden"
160 )
162 @overload # type: ignore[override]
163 def __setitem__(self, key: AppKey[_T], value: _T) -> None:
164 ...
166 @overload
167 def __setitem__(self, key: str, value: Any) -> None:
168 ...
170 def __setitem__(self, key: Union[str, AppKey[_T]], value: Any) -> None:
171 self._check_frozen()
172 if not isinstance(key, AppKey):
173 warnings.warn(
174 "It is recommended to use web.AppKey instances for keys.\n"
175 + "https://docs.aiohttp.org/en/stable/web_advanced.html"
176 + "#application-s-config",
177 category=NotAppKeyWarning,
178 stacklevel=2,
179 )
180 self._state[key] = value
182 def __delitem__(self, key: Union[str, AppKey[_T]]) -> None:
183 self._check_frozen()
184 del self._state[key]
186 def __len__(self) -> int:
187 return len(self._state)
189 def __iter__(self) -> Iterator[Union[str, AppKey[Any]]]:
190 return iter(self._state)
192 @overload # type: ignore[override]
193 def get(self, key: AppKey[_T], default: None = ...) -> Optional[_T]:
194 ...
196 @overload
197 def get(self, key: AppKey[_T], default: _U) -> Union[_T, _U]:
198 ...
200 @overload
201 def get(self, key: str, default: Any = ...) -> Any:
202 ...
204 def get(self, key: Union[str, AppKey[_T]], default: Any = None) -> Any:
205 return self._state.get(key, default)
207 ########
208 def _set_loop(self, loop: Optional[asyncio.AbstractEventLoop]) -> None:
209 warnings.warn(
210 "_set_loop() is no-op since 4.0 " "and scheduled for removal in 5.0",
211 DeprecationWarning,
212 stacklevel=2,
213 )
215 @property
216 def pre_frozen(self) -> bool:
217 return self._pre_frozen
219 def pre_freeze(self) -> None:
220 if self._pre_frozen:
221 return
223 self._pre_frozen = True
224 self._middlewares.freeze()
225 self._router.freeze()
226 self._on_response_prepare.freeze()
227 self._cleanup_ctx.freeze()
228 self._on_startup.freeze()
229 self._on_shutdown.freeze()
230 self._on_cleanup.freeze()
231 self._middlewares_handlers = tuple(self._prepare_middleware())
233 # If current app and any subapp do not have middlewares avoid run all
234 # of the code footprint that it implies, which have a middleware
235 # hardcoded per app that sets up the current_app attribute. If no
236 # middlewares are configured the handler will receive the proper
237 # current_app without needing all of this code.
238 self._run_middlewares = True if self.middlewares else False
240 for subapp in self._subapps:
241 subapp.pre_freeze()
242 self._run_middlewares = self._run_middlewares or subapp._run_middlewares
244 @property
245 def frozen(self) -> bool:
246 return self._frozen
248 def freeze(self) -> None:
249 if self._frozen:
250 return
252 self.pre_freeze()
253 self._frozen = True
254 for subapp in self._subapps:
255 subapp.freeze()
257 @property
258 def debug(self) -> bool:
259 warnings.warn(
260 "debug property is deprecated since 4.0" "and scheduled for removal in 5.0",
261 DeprecationWarning,
262 stacklevel=2,
263 )
264 return asyncio.get_event_loop().get_debug()
266 def _reg_subapp_signals(self, subapp: "Application") -> None:
267 def reg_handler(signame: str) -> None:
268 subsig = getattr(subapp, signame)
270 async def handler(app: "Application") -> None:
271 await subsig.send(subapp)
273 appsig = getattr(self, signame)
274 appsig.append(handler)
276 reg_handler("on_startup")
277 reg_handler("on_shutdown")
278 reg_handler("on_cleanup")
280 def add_subapp(self, prefix: str, subapp: "Application") -> AbstractResource:
281 if not isinstance(prefix, str):
282 raise TypeError("Prefix must be str")
283 prefix = prefix.rstrip("/")
284 if not prefix:
285 raise ValueError("Prefix cannot be empty")
286 factory = partial(PrefixedSubAppResource, prefix, subapp)
287 return self._add_subapp(factory, subapp)
289 def _add_subapp(
290 self, resource_factory: Callable[[], AbstractResource], subapp: "Application"
291 ) -> AbstractResource:
292 if self.frozen:
293 raise RuntimeError("Cannot add sub application to frozen application")
294 if subapp.frozen:
295 raise RuntimeError("Cannot add frozen application")
296 resource = resource_factory()
297 self.router.register_resource(resource)
298 self._reg_subapp_signals(subapp)
299 self._subapps.append(subapp)
300 subapp.pre_freeze()
301 return resource
303 def add_domain(self, domain: str, subapp: "Application") -> AbstractResource:
304 if not isinstance(domain, str):
305 raise TypeError("Domain must be str")
306 elif "*" in domain:
307 rule: Domain = MaskDomain(domain)
308 else:
309 rule = Domain(domain)
310 factory = partial(MatchedSubAppResource, rule, subapp)
311 return self._add_subapp(factory, subapp)
313 def add_routes(self, routes: Iterable[AbstractRouteDef]) -> List[AbstractRoute]:
314 return self.router.add_routes(routes)
316 @property
317 def on_response_prepare(self) -> _RespPrepareSignal:
318 return self._on_response_prepare
320 @property
321 def on_startup(self) -> _AppSignal:
322 return self._on_startup
324 @property
325 def on_shutdown(self) -> _AppSignal:
326 return self._on_shutdown
328 @property
329 def on_cleanup(self) -> _AppSignal:
330 return self._on_cleanup
332 @property
333 def cleanup_ctx(self) -> "CleanupContext":
334 return self._cleanup_ctx
336 @property
337 def router(self) -> UrlDispatcher:
338 return self._router
340 @property
341 def middlewares(self) -> _Middlewares:
342 return self._middlewares
344 async def startup(self) -> None:
345 """Causes on_startup signal
347 Should be called in the event loop along with the request handler.
348 """
349 await self.on_startup.send(self)
351 async def shutdown(self) -> None:
352 """Causes on_shutdown signal
354 Should be called before cleanup()
355 """
356 await self.on_shutdown.send(self)
358 async def cleanup(self) -> None:
359 """Causes on_cleanup signal
361 Should be called after shutdown()
362 """
363 if self.on_cleanup.frozen:
364 await self.on_cleanup.send(self)
365 else:
366 # If an exception occurs in startup, ensure cleanup contexts are completed.
367 await self._cleanup_ctx._on_cleanup(self)
369 def _prepare_middleware(self) -> Iterator[Middleware]:
370 yield from reversed(self._middlewares)
371 yield _fix_request_current_app(self)
373 async def _handle(self, request: Request) -> StreamResponse:
374 match_info = await self._router.resolve(request)
375 match_info.add_app(self)
376 match_info.freeze()
378 resp = None
379 request._match_info = match_info
380 expect = request.headers.get(hdrs.EXPECT)
381 if expect:
382 resp = await match_info.expect_handler(request)
383 await request.writer.drain()
385 if resp is None:
386 handler = match_info.handler
388 if self._run_middlewares:
389 for app in match_info.apps[::-1]:
390 assert app.pre_frozen, "middleware handlers are not ready"
391 for m in app._middlewares_handlers:
392 handler = update_wrapper(partial(m, handler=handler), handler)
394 resp = await handler(request)
396 return resp
398 def __call__(self) -> "Application":
399 """gunicorn compatibility"""
400 return self
402 def __repr__(self) -> str:
403 return f"<Application 0x{id(self):x}>"
405 def __bool__(self) -> bool:
406 return True
409class CleanupError(RuntimeError):
410 @property
411 def exceptions(self) -> List[BaseException]:
412 return cast(List[BaseException], self.args[1])
415if TYPE_CHECKING:
416 _CleanupContextBase = FrozenList[Callable[[Application], AsyncIterator[None]]]
417else:
418 _CleanupContextBase = FrozenList
421class CleanupContext(_CleanupContextBase):
422 def __init__(self) -> None:
423 super().__init__()
424 self._exits: List[AsyncIterator[None]] = []
426 async def _on_startup(self, app: Application) -> None:
427 for cb in self:
428 it = cb(app).__aiter__()
429 await it.__anext__()
430 self._exits.append(it)
432 async def _on_cleanup(self, app: Application) -> None:
433 errors = []
434 for it in reversed(self._exits):
435 try:
436 await it.__anext__()
437 except StopAsyncIteration:
438 pass
439 except Exception as exc:
440 errors.append(exc)
441 else:
442 errors.append(RuntimeError(f"{it!r} has more than one 'yield'"))
443 if errors:
444 if len(errors) == 1:
445 raise errors[0]
446 else:
447 raise CleanupError("Multiple errors on cleanup stage", errors)