Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_backends/anyio.py: 30%
54 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:38 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:38 +0000
1import ssl
2import typing
4import anyio
6from .._exceptions import (
7 ConnectError,
8 ConnectTimeout,
9 ReadError,
10 ReadTimeout,
11 WriteError,
12 WriteTimeout,
13 map_exceptions,
14)
15from .._utils import is_socket_readable
16from .base import SOCKET_OPTION, AsyncNetworkBackend, AsyncNetworkStream
19class AnyIOStream(AsyncNetworkStream):
20 def __init__(self, stream: anyio.abc.ByteStream) -> None:
21 self._stream = stream
23 async def read(
24 self, max_bytes: int, timeout: typing.Optional[float] = None
25 ) -> bytes:
26 exc_map = {
27 TimeoutError: ReadTimeout,
28 anyio.BrokenResourceError: ReadError,
29 anyio.ClosedResourceError: ReadError,
30 }
31 with map_exceptions(exc_map):
32 with anyio.fail_after(timeout):
33 try:
34 return await self._stream.receive(max_bytes=max_bytes)
35 except anyio.EndOfStream: # pragma: nocover
36 return b""
38 async def write(
39 self, buffer: bytes, timeout: typing.Optional[float] = None
40 ) -> None:
41 if not buffer:
42 return
44 exc_map = {
45 TimeoutError: WriteTimeout,
46 anyio.BrokenResourceError: WriteError,
47 anyio.ClosedResourceError: WriteError,
48 }
49 with map_exceptions(exc_map):
50 with anyio.fail_after(timeout):
51 await self._stream.send(item=buffer)
53 async def aclose(self) -> None:
54 await self._stream.aclose()
56 async def start_tls(
57 self,
58 ssl_context: ssl.SSLContext,
59 server_hostname: typing.Optional[str] = None,
60 timeout: typing.Optional[float] = None,
61 ) -> AsyncNetworkStream:
62 exc_map = {
63 TimeoutError: ConnectTimeout,
64 anyio.BrokenResourceError: ConnectError,
65 }
66 with map_exceptions(exc_map):
67 try:
68 with anyio.fail_after(timeout):
69 ssl_stream = await anyio.streams.tls.TLSStream.wrap(
70 self._stream,
71 ssl_context=ssl_context,
72 hostname=server_hostname,
73 standard_compatible=False,
74 server_side=False,
75 )
76 except Exception as exc: # pragma: nocover
77 await self.aclose()
78 raise exc
79 return AnyIOStream(ssl_stream)
81 def get_extra_info(self, info: str) -> typing.Any:
82 if info == "ssl_object":
83 return self._stream.extra(anyio.streams.tls.TLSAttribute.ssl_object, None)
84 if info == "client_addr":
85 return self._stream.extra(anyio.abc.SocketAttribute.local_address, None)
86 if info == "server_addr":
87 return self._stream.extra(anyio.abc.SocketAttribute.remote_address, None)
88 if info == "socket":
89 return self._stream.extra(anyio.abc.SocketAttribute.raw_socket, None)
90 if info == "is_readable":
91 sock = self._stream.extra(anyio.abc.SocketAttribute.raw_socket, None)
92 return is_socket_readable(sock)
93 return None
96class AnyIOBackend(AsyncNetworkBackend):
97 async def connect_tcp(
98 self,
99 host: str,
100 port: int,
101 timeout: typing.Optional[float] = None,
102 local_address: typing.Optional[str] = None,
103 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None,
104 ) -> AsyncNetworkStream:
105 if socket_options is None:
106 socket_options = [] # pragma: no cover
107 exc_map = {
108 TimeoutError: ConnectTimeout,
109 OSError: ConnectError,
110 anyio.BrokenResourceError: ConnectError,
111 }
112 with map_exceptions(exc_map):
113 with anyio.fail_after(timeout):
114 stream: anyio.abc.ByteStream = await anyio.connect_tcp(
115 remote_host=host,
116 remote_port=port,
117 local_host=local_address,
118 )
119 # By default TCP sockets opened in `asyncio` include TCP_NODELAY.
120 for option in socket_options:
121 stream._raw_socket.setsockopt(*option) # type: ignore[attr-defined] # pragma: no cover
122 return AnyIOStream(stream)
124 async def connect_unix_socket(
125 self,
126 path: str,
127 timeout: typing.Optional[float] = None,
128 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None,
129 ) -> AsyncNetworkStream: # pragma: nocover
130 if socket_options is None:
131 socket_options = []
132 exc_map = {
133 TimeoutError: ConnectTimeout,
134 OSError: ConnectError,
135 anyio.BrokenResourceError: ConnectError,
136 }
137 with map_exceptions(exc_map):
138 with anyio.fail_after(timeout):
139 stream: anyio.abc.ByteStream = await anyio.connect_unix(path)
140 for option in socket_options:
141 stream._raw_socket.setsockopt(*option) # type: ignore[attr-defined] # pragma: no cover
142 return AnyIOStream(stream)
144 async def sleep(self, seconds: float) -> None:
145 await anyio.sleep(seconds) # pragma: nocover