Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_server.py: 53%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Low level HTTP server."""
3import asyncio
4import warnings
5from collections.abc import Awaitable, Callable
6from typing import Any, Generic, TypeVar, overload
8from .abc import AbstractStreamWriter
9from .http_parser import RawRequestMessage
10from .streams import StreamReader
11from .web_protocol import RequestHandler
12from .web_request import BaseRequest
13from .web_response import StreamResponse
15__all__ = ("Server",)
17_Request = TypeVar("_Request", bound=BaseRequest)
18_RequestFactory = Callable[
19 [
20 RawRequestMessage,
21 StreamReader,
22 "RequestHandler[_Request]",
23 AbstractStreamWriter,
24 "asyncio.Task[None]",
25 ],
26 _Request,
27]
30class Server(Generic[_Request]):
31 request_factory: _RequestFactory[_Request]
33 @overload
34 def __init__(
35 self: "Server[BaseRequest]",
36 handler: Callable[[_Request], Awaitable[StreamResponse]],
37 *,
38 debug: bool | None = None,
39 handler_cancellation: bool = False,
40 **kwargs: Any, # TODO(PY311): Use Unpack to define kwargs from RequestHandler
41 ) -> None: ...
42 @overload
43 def __init__(
44 self,
45 handler: Callable[[_Request], Awaitable[StreamResponse]],
46 *,
47 request_factory: _RequestFactory[_Request] | None,
48 debug: bool | None = None,
49 handler_cancellation: bool = False,
50 **kwargs: Any,
51 ) -> None: ...
52 def __init__(
53 self,
54 handler: Callable[[_Request], Awaitable[StreamResponse]],
55 *,
56 request_factory: _RequestFactory[_Request] | None = None,
57 debug: bool | None = None,
58 handler_cancellation: bool = False,
59 **kwargs: Any,
60 ) -> None:
61 if debug is not None:
62 warnings.warn(
63 "debug argument is no-op since 4.0 and scheduled for removal in 5.0",
64 DeprecationWarning,
65 stacklevel=2,
66 )
67 self._loop = asyncio.get_running_loop()
68 self._connections: dict[RequestHandler[_Request], asyncio.Transport] = {}
69 self._kwargs = kwargs
70 # requests_count is the number of requests being processed by the server
71 # for the lifetime of the server.
72 self.requests_count = 0
73 self.request_handler = handler
74 self.request_factory = request_factory or self._make_request # type: ignore[assignment]
75 self.handler_cancellation = handler_cancellation
77 @property
78 def connections(self) -> list[RequestHandler[_Request]]:
79 return list(self._connections.keys())
81 def connection_made(
82 self, handler: RequestHandler[_Request], transport: asyncio.Transport
83 ) -> None:
84 self._connections[handler] = transport
86 def connection_lost(
87 self, handler: RequestHandler[_Request], exc: BaseException | None = None
88 ) -> None:
89 if handler in self._connections:
90 if handler._task_handler:
91 handler._task_handler.add_done_callback(
92 lambda f: self._connections.pop(handler, None)
93 )
94 else:
95 del self._connections[handler]
97 def _make_request(
98 self,
99 message: RawRequestMessage,
100 payload: StreamReader,
101 protocol: RequestHandler[BaseRequest],
102 writer: AbstractStreamWriter,
103 task: "asyncio.Task[None]",
104 ) -> BaseRequest:
105 return BaseRequest(message, payload, protocol, writer, task, self._loop)
107 def pre_shutdown(self) -> None:
108 for conn in self._connections:
109 conn.close()
111 async def shutdown(self, timeout: float | None = None) -> None:
112 coros = (conn.shutdown(timeout) for conn in self._connections)
113 await asyncio.gather(*coros)
114 self._connections.clear()
116 def __call__(self) -> RequestHandler[_Request]:
117 try:
118 return RequestHandler(self, loop=self._loop, **self._kwargs)
119 except TypeError:
120 # Failsafe creation: remove all custom handler_args
121 kwargs = {
122 k: v
123 for k, v in self._kwargs.items()
124 if k in ["debug", "access_log_class"]
125 }
126 return RequestHandler(self, loop=self._loop, **kwargs)