Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/base_protocol.py: 32%

66 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:52 +0000

1import asyncio 

2from typing import Optional, cast 

3 

4from .tcp_helpers import tcp_nodelay 

5 

6 

7class BaseProtocol(asyncio.Protocol): 

8 __slots__ = ( 

9 "_loop", 

10 "_paused", 

11 "_drain_waiter", 

12 "_connection_lost", 

13 "_reading_paused", 

14 "transport", 

15 ) 

16 

17 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

18 self._loop: asyncio.AbstractEventLoop = loop 

19 self._paused = False 

20 self._drain_waiter: Optional[asyncio.Future[None]] = None 

21 self._reading_paused = False 

22 

23 self.transport: Optional[asyncio.Transport] = None 

24 

25 @property 

26 def connected(self) -> bool: 

27 """Return True if the connection is open.""" 

28 return self.transport is not None 

29 

30 def pause_writing(self) -> None: 

31 assert not self._paused 

32 self._paused = True 

33 

34 def resume_writing(self) -> None: 

35 assert self._paused 

36 self._paused = False 

37 

38 waiter = self._drain_waiter 

39 if waiter is not None: 

40 self._drain_waiter = None 

41 if not waiter.done(): 

42 waiter.set_result(None) 

43 

44 def pause_reading(self) -> None: 

45 if not self._reading_paused and self.transport is not None: 

46 try: 

47 self.transport.pause_reading() 

48 except (AttributeError, NotImplementedError, RuntimeError): 

49 pass 

50 self._reading_paused = True 

51 

52 def resume_reading(self) -> None: 

53 if self._reading_paused and self.transport is not None: 

54 try: 

55 self.transport.resume_reading() 

56 except (AttributeError, NotImplementedError, RuntimeError): 

57 pass 

58 self._reading_paused = False 

59 

60 def connection_made(self, transport: asyncio.BaseTransport) -> None: 

61 tr = cast(asyncio.Transport, transport) 

62 tcp_nodelay(tr, True) 

63 self.transport = tr 

64 

65 def connection_lost(self, exc: Optional[BaseException]) -> None: 

66 # Wake up the writer if currently paused. 

67 self.transport = None 

68 if not self._paused: 

69 return 

70 waiter = self._drain_waiter 

71 if waiter is None: 

72 return 

73 self._drain_waiter = None 

74 if waiter.done(): 

75 return 

76 if exc is None: 

77 waiter.set_result(None) 

78 else: 

79 waiter.set_exception(exc) 

80 

81 async def _drain_helper(self) -> None: 

82 if not self.connected: 

83 raise ConnectionResetError("Connection lost") 

84 if not self._paused: 

85 return 

86 waiter = self._drain_waiter 

87 if waiter is None: 

88 waiter = self._loop.create_future() 

89 self._drain_waiter = waiter 

90 await asyncio.shield(waiter)