Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_server.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

55 statements  

1"""Low level HTTP server.""" 

2 

3import asyncio 

4import warnings 

5from collections.abc import Awaitable, Callable 

6from typing import Any, Generic, TypeVar, overload 

7 

8from .abc import AbstractStreamWriter 

9from .http_parser import RawRequestMessage 

10from .streams import StreamReader 

11from .web_protocol import RequestHandler 

12from .web_request import BaseRequest 

13from .web_response import StreamResponse 

14 

15__all__ = ("Server",) 

16 

17_Request = TypeVar("_Request", bound=BaseRequest) 

18_RequestFactory = Callable[ 

19 [ 

20 RawRequestMessage, 

21 StreamReader, 

22 "RequestHandler[_Request]", 

23 AbstractStreamWriter, 

24 "asyncio.Task[None]", 

25 ], 

26 _Request, 

27] 

28 

29 

30class Server(Generic[_Request]): 

31 request_factory: _RequestFactory[_Request] 

32 

33 @overload 

34 def __init__( 

35 self: "Server[BaseRequest]", 

36 handler: Callable[[_Request], Awaitable[StreamResponse]], 

37 *, 

38 debug: bool | None = None, 

39 handler_cancellation: bool = False, 

40 **kwargs: Any, # TODO(PY311): Use Unpack to define kwargs from RequestHandler 

41 ) -> None: ... 

42 @overload 

43 def __init__( 

44 self, 

45 handler: Callable[[_Request], Awaitable[StreamResponse]], 

46 *, 

47 request_factory: _RequestFactory[_Request] | None, 

48 debug: bool | None = None, 

49 handler_cancellation: bool = False, 

50 **kwargs: Any, 

51 ) -> None: ... 

52 def __init__( 

53 self, 

54 handler: Callable[[_Request], Awaitable[StreamResponse]], 

55 *, 

56 request_factory: _RequestFactory[_Request] | None = None, 

57 debug: bool | None = None, 

58 handler_cancellation: bool = False, 

59 **kwargs: Any, 

60 ) -> None: 

61 if debug is not None: 

62 warnings.warn( 

63 "debug argument is no-op since 4.0 and scheduled for removal in 5.0", 

64 DeprecationWarning, 

65 stacklevel=2, 

66 ) 

67 self._loop = asyncio.get_running_loop() 

68 self._connections: dict[RequestHandler[_Request], asyncio.Transport] = {} 

69 self._kwargs = kwargs 

70 # requests_count is the number of requests being processed by the server 

71 # for the lifetime of the server. 

72 self.requests_count = 0 

73 self.request_handler = handler 

74 self.request_factory = request_factory or self._make_request # type: ignore[assignment] 

75 self.handler_cancellation = handler_cancellation 

76 

77 @property 

78 def connections(self) -> list[RequestHandler[_Request]]: 

79 return list(self._connections.keys()) 

80 

81 def connection_made( 

82 self, handler: RequestHandler[_Request], transport: asyncio.Transport 

83 ) -> None: 

84 self._connections[handler] = transport 

85 

86 def connection_lost( 

87 self, handler: RequestHandler[_Request], exc: BaseException | None = None 

88 ) -> None: 

89 if handler in self._connections: 

90 if handler._task_handler: 

91 handler._task_handler.add_done_callback( 

92 lambda f: self._connections.pop(handler, None) 

93 ) 

94 else: 

95 del self._connections[handler] 

96 

97 def _make_request( 

98 self, 

99 message: RawRequestMessage, 

100 payload: StreamReader, 

101 protocol: RequestHandler[BaseRequest], 

102 writer: AbstractStreamWriter, 

103 task: "asyncio.Task[None]", 

104 ) -> BaseRequest: 

105 return BaseRequest(message, payload, protocol, writer, task, self._loop) 

106 

107 def pre_shutdown(self) -> None: 

108 for conn in self._connections: 

109 conn.close() 

110 

111 async def shutdown(self, timeout: float | None = None) -> None: 

112 coros = (conn.shutdown(timeout) for conn in self._connections) 

113 await asyncio.gather(*coros) 

114 self._connections.clear() 

115 

116 def __call__(self) -> RequestHandler[_Request]: 

117 try: 

118 return RequestHandler(self, loop=self._loop, **self._kwargs) 

119 except TypeError: 

120 # Failsafe creation: remove all custom handler_args 

121 kwargs = { 

122 k: v 

123 for k, v in self._kwargs.items() 

124 if k in ["debug", "access_log_class"] 

125 } 

126 return RequestHandler(self, loop=self._loop, **kwargs)