Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/base_protocol.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

81 statements  

1import asyncio 

2from typing import TYPE_CHECKING, Any, cast 

3 

4from .client_exceptions import ClientConnectionResetError 

5from .helpers import set_exception 

6from .tcp_helpers import tcp_nodelay 

7 

8if TYPE_CHECKING: 

9 from .http_parser import HttpParser 

10 

11 

12class BaseProtocol(asyncio.Protocol): 

13 __slots__ = ( 

14 "_loop", 

15 "_paused", 

16 "_parser", 

17 "_drain_waiter", 

18 "_connection_lost", 

19 "_reading_paused", 

20 "_upgraded", 

21 "transport", 

22 ) 

23 

24 def __init__( 

25 self, loop: asyncio.AbstractEventLoop, parser: "HttpParser[Any] | None" = None 

26 ) -> None: 

27 self._loop: asyncio.AbstractEventLoop = loop 

28 self._paused = False 

29 self._drain_waiter: asyncio.Future[None] | None = None 

30 self._reading_paused = False 

31 self._parser = parser 

32 self._upgraded = False 

33 

34 self.transport: asyncio.Transport | None = None 

35 

36 @property 

37 def connected(self) -> bool: 

38 """Return True if the connection is open.""" 

39 return self.transport is not None 

40 

41 @property 

42 def writing_paused(self) -> bool: 

43 return self._paused 

44 

45 def pause_writing(self) -> None: 

46 assert not self._paused 

47 self._paused = True 

48 

49 def resume_writing(self) -> None: 

50 assert self._paused 

51 self._paused = False 

52 

53 waiter = self._drain_waiter 

54 if waiter is not None: 

55 self._drain_waiter = None 

56 if not waiter.done(): 

57 waiter.set_result(None) 

58 

59 def pause_reading(self) -> None: 

60 self._reading_paused = True 

61 # Parser shouldn't be paused on websockets. 

62 if not self._upgraded: 

63 assert self._parser is not None 

64 self._parser.pause_reading() 

65 if self.transport is not None: 

66 try: 

67 self.transport.pause_reading() 

68 except (AttributeError, NotImplementedError, RuntimeError): 

69 pass 

70 

71 def resume_reading(self, resume_parser: bool = True) -> None: 

72 self._reading_paused = False 

73 

74 # This will resume parsing any unprocessed data from the last pause. 

75 if not self._upgraded and resume_parser: 

76 self.data_received(b"") 

77 

78 # Reading may have been paused again in the above call if there was a lot of 

79 # compressed data still pending. 

80 if not self._reading_paused and self.transport is not None: 

81 try: 

82 self.transport.resume_reading() 

83 except (AttributeError, NotImplementedError, RuntimeError): 

84 pass 

85 self._reading_paused = False 

86 

87 def connection_made(self, transport: asyncio.BaseTransport) -> None: 

88 tr = cast(asyncio.Transport, transport) 

89 tcp_nodelay(tr, True) 

90 self.transport = tr 

91 

92 def connection_lost(self, exc: BaseException | None) -> None: 

93 # Wake up the writer if currently paused. 

94 self.transport = None 

95 if not self._paused: 

96 return 

97 waiter = self._drain_waiter 

98 if waiter is None: 

99 return 

100 self._drain_waiter = None 

101 if waiter.done(): 

102 return 

103 if exc is None: 

104 waiter.set_result(None) 

105 else: 

106 set_exception( 

107 waiter, 

108 ConnectionError("Connection lost"), 

109 exc, 

110 ) 

111 

112 async def _drain_helper(self) -> None: 

113 if self.transport is None: 

114 raise ClientConnectionResetError("Connection lost") 

115 if not self._paused: 

116 return 

117 waiter = self._drain_waiter 

118 if waiter is None: 

119 waiter = self._loop.create_future() 

120 self._drain_waiter = waiter 

121 await waiter