Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/base_protocol.py: 41%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2from typing import TYPE_CHECKING, Any, cast
4from .client_exceptions import ClientConnectionResetError
5from .helpers import set_exception
6from .tcp_helpers import tcp_nodelay
8if TYPE_CHECKING:
9 from .http_parser import HttpParser
12class BaseProtocol(asyncio.Protocol):
13 __slots__ = (
14 "_loop",
15 "_paused",
16 "_parser",
17 "_drain_waiter",
18 "_connection_lost",
19 "_reading_paused",
20 "_upgraded",
21 "transport",
22 )
24 def __init__(
25 self, loop: asyncio.AbstractEventLoop, parser: "HttpParser[Any] | None" = None
26 ) -> None:
27 self._loop: asyncio.AbstractEventLoop = loop
28 self._paused = False
29 self._drain_waiter: asyncio.Future[None] | None = None
30 self._reading_paused = False
31 self._parser = parser
32 self._upgraded = False
34 self.transport: asyncio.Transport | None = None
36 @property
37 def connected(self) -> bool:
38 """Return True if the connection is open."""
39 return self.transport is not None
41 @property
42 def writing_paused(self) -> bool:
43 return self._paused
45 def pause_writing(self) -> None:
46 assert not self._paused
47 self._paused = True
49 def resume_writing(self) -> None:
50 assert self._paused
51 self._paused = False
53 waiter = self._drain_waiter
54 if waiter is not None:
55 self._drain_waiter = None
56 if not waiter.done():
57 waiter.set_result(None)
59 def pause_reading(self) -> None:
60 self._reading_paused = True
61 # Parser shouldn't be paused on websockets.
62 if not self._upgraded:
63 assert self._parser is not None
64 self._parser.pause_reading()
65 if self.transport is not None:
66 try:
67 self.transport.pause_reading()
68 except (AttributeError, NotImplementedError, RuntimeError):
69 pass
71 def resume_reading(self, resume_parser: bool = True) -> None:
72 self._reading_paused = False
74 # This will resume parsing any unprocessed data from the last pause.
75 if not self._upgraded and resume_parser:
76 self.data_received(b"")
78 # Reading may have been paused again in the above call if there was a lot of
79 # compressed data still pending.
80 if not self._reading_paused and self.transport is not None:
81 try:
82 self.transport.resume_reading()
83 except (AttributeError, NotImplementedError, RuntimeError):
84 pass
85 self._reading_paused = False
87 def connection_made(self, transport: asyncio.BaseTransport) -> None:
88 tr = cast(asyncio.Transport, transport)
89 tcp_nodelay(tr, True)
90 self.transport = tr
92 def connection_lost(self, exc: BaseException | None) -> None:
93 # Wake up the writer if currently paused.
94 self.transport = None
95 if not self._paused:
96 return
97 waiter = self._drain_waiter
98 if waiter is None:
99 return
100 self._drain_waiter = None
101 if waiter.done():
102 return
103 if exc is None:
104 waiter.set_result(None)
105 else:
106 set_exception(
107 waiter,
108 ConnectionError("Connection lost"),
109 exc,
110 )
112 async def _drain_helper(self) -> None:
113 if self.transport is None:
114 raise ClientConnectionResetError("Connection lost")
115 if not self._paused:
116 return
117 waiter = self._drain_waiter
118 if waiter is None:
119 waiter = self._loop.create_future()
120 self._drain_waiter = waiter
121 await waiter