Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/backends/base.py: 100%
24 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1import ssl
2import time
3import typing
5SOCKET_OPTION = typing.Union[
6 typing.Tuple[int, int, int],
7 typing.Tuple[int, int, typing.Union[bytes, bytearray]],
8 typing.Tuple[int, int, None, int],
9]
12class NetworkStream:
13 def read(self, max_bytes: int, timeout: typing.Optional[float] = None) -> bytes:
14 raise NotImplementedError() # pragma: nocover
16 def write(self, buffer: bytes, timeout: typing.Optional[float] = None) -> None:
17 raise NotImplementedError() # pragma: nocover
19 def close(self) -> None:
20 raise NotImplementedError() # pragma: nocover
22 def start_tls(
23 self,
24 ssl_context: ssl.SSLContext,
25 server_hostname: typing.Optional[str] = None,
26 timeout: typing.Optional[float] = None,
27 ) -> "NetworkStream":
28 raise NotImplementedError() # pragma: nocover
30 def get_extra_info(self, info: str) -> typing.Any:
31 return None # pragma: nocover
34class NetworkBackend:
35 def connect_tcp(
36 self,
37 host: str,
38 port: int,
39 timeout: typing.Optional[float] = None,
40 local_address: typing.Optional[str] = None,
41 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None,
42 ) -> NetworkStream:
43 raise NotImplementedError() # pragma: nocover
45 def connect_unix_socket(
46 self,
47 path: str,
48 timeout: typing.Optional[float] = None,
49 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None,
50 ) -> NetworkStream:
51 raise NotImplementedError() # pragma: nocover
53 def sleep(self, seconds: float) -> None:
54 time.sleep(seconds) # pragma: nocover
57class AsyncNetworkStream:
58 async def read(
59 self, max_bytes: int, timeout: typing.Optional[float] = None
60 ) -> bytes:
61 raise NotImplementedError() # pragma: nocover
63 async def write(
64 self, buffer: bytes, timeout: typing.Optional[float] = None
65 ) -> None:
66 raise NotImplementedError() # pragma: nocover
68 async def aclose(self) -> None:
69 raise NotImplementedError() # pragma: nocover
71 async def start_tls(
72 self,
73 ssl_context: ssl.SSLContext,
74 server_hostname: typing.Optional[str] = None,
75 timeout: typing.Optional[float] = None,
76 ) -> "AsyncNetworkStream":
77 raise NotImplementedError() # pragma: nocover
79 def get_extra_info(self, info: str) -> typing.Any:
80 return None # pragma: nocover
83class AsyncNetworkBackend:
84 async def connect_tcp(
85 self,
86 host: str,
87 port: int,
88 timeout: typing.Optional[float] = None,
89 local_address: typing.Optional[str] = None,
90 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None,
91 ) -> AsyncNetworkStream:
92 raise NotImplementedError() # pragma: nocover
94 async def connect_unix_socket(
95 self,
96 path: str,
97 timeout: typing.Optional[float] = None,
98 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None,
99 ) -> AsyncNetworkStream:
100 raise NotImplementedError() # pragma: nocover
102 async def sleep(self, seconds: float) -> None:
103 raise NotImplementedError() # pragma: nocover