Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/base_protocol.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

84 statements  

1import asyncio 

2from typing import TYPE_CHECKING, Any, cast 

3 

4from .client_exceptions import ClientConnectionResetError 

5from .helpers import set_exception 

6from .tcp_helpers import tcp_nodelay 

7 

8if TYPE_CHECKING: 

9 from .http_parser import HttpParser 

10 

11# Raised by transport.pause_reading()/resume_reading() when the transport 

12# does not support flow control; safe to ignore. 

13# NOTE: Catch these with a plain try/except/pass, never contextlib.suppress(): 

14# pause/resume run on the hot read path and suppress() is ~6x slower than 

15# try/except here (it builds a context manager and unpacks this tuple per call). 

16PAUSE_RESUME_READING_ERRORS = (AttributeError, NotImplementedError, RuntimeError) 

17 

18 

19class BaseProtocol(asyncio.Protocol): 

20 __slots__ = ( 

21 "_loop", 

22 "_paused", 

23 "_parser", 

24 "_drain_waiter", 

25 "_connection_lost", 

26 "_reading_paused", 

27 "_upgraded", 

28 "transport", 

29 ) 

30 

31 def __init__( 

32 self, loop: asyncio.AbstractEventLoop, parser: "HttpParser[Any] | None" = None 

33 ) -> None: 

34 self._loop: asyncio.AbstractEventLoop = loop 

35 self._paused = False 

36 self._drain_waiter: asyncio.Future[None] | None = None 

37 self._reading_paused = False 

38 self._parser = parser 

39 self._upgraded = False 

40 

41 self.transport: asyncio.Transport | None = None 

42 

43 @property 

44 def connected(self) -> bool: 

45 """Return True if the connection is open.""" 

46 return self.transport is not None 

47 

48 @property 

49 def writing_paused(self) -> bool: 

50 return self._paused 

51 

52 def pause_writing(self) -> None: 

53 assert not self._paused 

54 self._paused = True 

55 

56 def resume_writing(self) -> None: 

57 assert self._paused 

58 self._paused = False 

59 

60 waiter = self._drain_waiter 

61 if waiter is not None: 

62 self._drain_waiter = None 

63 if not waiter.done(): 

64 waiter.set_result(None) 

65 

66 def pause_reading(self) -> None: 

67 self._reading_paused = True 

68 # Parser shouldn't be paused on websockets. 

69 if not self._upgraded: 

70 assert self._parser is not None 

71 self._parser.pause_reading() 

72 if self.transport is not None: 

73 try: 

74 self.transport.pause_reading() 

75 except PAUSE_RESUME_READING_ERRORS: 

76 # Transport lacks flow control; nothing to pause. Intentionally 

77 # ignored (see PAUSE_RESUME_READING_ERRORS; do not use suppress). 

78 pass 

79 

80 def _reading_paused_for_msg_queue(self) -> bool: 

81 """Keep the transport paused for protocol-specific reasons (overridden).""" 

82 return False 

83 

84 def resume_reading(self, resume_parser: bool = True) -> None: 

85 self._reading_paused = False 

86 

87 # This will resume parsing any unprocessed data from the last pause. 

88 if not self._upgraded and resume_parser: 

89 self.data_received(b"") 

90 

91 # Reading may have been paused again in the above call if there was a lot of 

92 # compressed data still pending. 

93 if ( 

94 not self._reading_paused 

95 and not self._reading_paused_for_msg_queue() 

96 and self.transport is not None 

97 ): 

98 try: 

99 self.transport.resume_reading() 

100 except PAUSE_RESUME_READING_ERRORS: 

101 # Transport lacks flow control; nothing to resume. Intentionally 

102 # ignored (see PAUSE_RESUME_READING_ERRORS; do not use suppress). 

103 pass 

104 self._reading_paused = False 

105 

106 def connection_made(self, transport: asyncio.BaseTransport) -> None: 

107 tr = cast(asyncio.Transport, transport) 

108 tcp_nodelay(tr, True) 

109 self.transport = tr 

110 

111 def connection_lost(self, exc: BaseException | None) -> None: 

112 # Wake up the writer if currently paused. 

113 self.transport = None 

114 if not self._paused: 

115 return 

116 waiter = self._drain_waiter 

117 if waiter is None: 

118 return 

119 self._drain_waiter = None 

120 if waiter.done(): 

121 return 

122 if exc is None: 

123 waiter.set_result(None) 

124 else: 

125 set_exception( 

126 waiter, 

127 ConnectionError("Connection lost"), 

128 exc, 

129 ) 

130 

131 async def _drain_helper(self) -> None: 

132 if self.transport is None: 

133 raise ClientConnectionResetError("Connection lost") 

134 if not self._paused: 

135 return 

136 waiter = self._drain_waiter 

137 if waiter is None: 

138 waiter = self._loop.create_future() 

139 self._drain_waiter = waiter 

140 await waiter