Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_server.py: 45%
40 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
1"""Low level HTTP server."""
2import asyncio
3import warnings
4from typing import Any, Awaitable, Callable, Dict, List, Optional # noqa
6from .abc import AbstractStreamWriter
7from .http_parser import RawRequestMessage
8from .streams import StreamReader
9from .web_protocol import RequestHandler, _RequestFactory, _RequestHandler
10from .web_request import BaseRequest
12__all__ = ("Server",)
15class Server:
16 def __init__(
17 self,
18 handler: _RequestHandler,
19 *,
20 request_factory: Optional[_RequestFactory] = None,
21 debug: Optional[bool] = None,
22 handler_cancellation: bool = False,
23 **kwargs: Any,
24 ) -> None:
25 if debug is not None:
26 warnings.warn(
27 "debug argument is no-op since 4.0 " "and scheduled for removal in 5.0",
28 DeprecationWarning,
29 stacklevel=2,
30 )
31 self._loop = asyncio.get_running_loop()
32 self._connections: Dict[RequestHandler, asyncio.Transport] = {}
33 self._kwargs = kwargs
34 self.requests_count = 0
35 self.request_handler = handler
36 self.request_factory = request_factory or self._make_request
37 self.handler_cancellation = handler_cancellation
39 @property
40 def connections(self) -> List[RequestHandler]:
41 return list(self._connections.keys())
43 def connection_made(
44 self, handler: RequestHandler, transport: asyncio.Transport
45 ) -> None:
46 self._connections[handler] = transport
48 def connection_lost(
49 self, handler: RequestHandler, exc: Optional[BaseException] = None
50 ) -> None:
51 if handler in self._connections:
52 del self._connections[handler]
54 def _make_request(
55 self,
56 message: RawRequestMessage,
57 payload: StreamReader,
58 protocol: RequestHandler,
59 writer: AbstractStreamWriter,
60 task: "asyncio.Task[None]",
61 ) -> BaseRequest:
62 return BaseRequest(message, payload, protocol, writer, task, self._loop)
64 async def shutdown(self, timeout: Optional[float] = None) -> None:
65 coros = [conn.shutdown(timeout) for conn in self._connections]
66 await asyncio.gather(*coros)
67 self._connections.clear()
69 def __call__(self) -> RequestHandler:
70 try:
71 return RequestHandler(self, loop=self._loop, **self._kwargs)
72 except TypeError:
73 # Failsafe creation: remove all custom handler_args
74 kwargs = {
75 k: v
76 for k, v in self._kwargs.items()
77 if k in ["debug", "access_log_class"]
78 }
79 return RequestHandler(self, loop=self._loop, **kwargs)