Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/base_protocol.py: 23%
66 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1import asyncio
2from typing import Optional, cast
4from .tcp_helpers import tcp_nodelay
7class BaseProtocol(asyncio.Protocol):
8 __slots__ = (
9 "_loop",
10 "_paused",
11 "_drain_waiter",
12 "_connection_lost",
13 "_reading_paused",
14 "transport",
15 )
17 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
18 self._loop: asyncio.AbstractEventLoop = loop
19 self._paused = False
20 self._drain_waiter: Optional[asyncio.Future[None]] = None
21 self._reading_paused = False
23 self.transport: Optional[asyncio.Transport] = None
25 @property
26 def connected(self) -> bool:
27 """Return True if the connection is open."""
28 return self.transport is not None
30 def pause_writing(self) -> None:
31 assert not self._paused
32 self._paused = True
34 def resume_writing(self) -> None:
35 assert self._paused
36 self._paused = False
38 waiter = self._drain_waiter
39 if waiter is not None:
40 self._drain_waiter = None
41 if not waiter.done():
42 waiter.set_result(None)
44 def pause_reading(self) -> None:
45 if not self._reading_paused and self.transport is not None:
46 try:
47 self.transport.pause_reading()
48 except (AttributeError, NotImplementedError, RuntimeError):
49 pass
50 self._reading_paused = True
52 def resume_reading(self) -> None:
53 if self._reading_paused and self.transport is not None:
54 try:
55 self.transport.resume_reading()
56 except (AttributeError, NotImplementedError, RuntimeError):
57 pass
58 self._reading_paused = False
60 def connection_made(self, transport: asyncio.BaseTransport) -> None:
61 tr = cast(asyncio.Transport, transport)
62 tcp_nodelay(tr, True)
63 self.transport = tr
65 def connection_lost(self, exc: Optional[BaseException]) -> None:
66 # Wake up the writer if currently paused.
67 self.transport = None
68 if not self._paused:
69 return
70 waiter = self._drain_waiter
71 if waiter is None:
72 return
73 self._drain_waiter = None
74 if waiter.done():
75 return
76 if exc is None:
77 waiter.set_result(None)
78 else:
79 waiter.set_exception(exc)
81 async def _drain_helper(self) -> None:
82 if not self.connected:
83 raise ConnectionResetError("Connection lost")
84 if not self._paused:
85 return
86 waiter = self._drain_waiter
87 if waiter is None:
88 waiter = self._loop.create_future()
89 self._drain_waiter = waiter
90 await asyncio.shield(waiter)