Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/backends/base.py: 100%
23 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import ssl
2import time
3import typing
6class NetworkStream:
7 def read(self, max_bytes: int, timeout: typing.Optional[float] = None) -> bytes:
8 raise NotImplementedError() # pragma: nocover
10 def write(self, buffer: bytes, timeout: typing.Optional[float] = None) -> None:
11 raise NotImplementedError() # pragma: nocover
13 def close(self) -> None:
14 raise NotImplementedError() # pragma: nocover
16 def start_tls(
17 self,
18 ssl_context: ssl.SSLContext,
19 server_hostname: typing.Optional[str] = None,
20 timeout: typing.Optional[float] = None,
21 ) -> "NetworkStream":
22 raise NotImplementedError() # pragma: nocover
24 def get_extra_info(self, info: str) -> typing.Any:
25 return None # pragma: nocover
28class NetworkBackend:
29 def connect_tcp(
30 self,
31 host: str,
32 port: int,
33 timeout: typing.Optional[float] = None,
34 local_address: typing.Optional[str] = None,
35 ) -> NetworkStream:
36 raise NotImplementedError() # pragma: nocover
38 def connect_unix_socket(
39 self, path: str, timeout: typing.Optional[float] = None
40 ) -> NetworkStream:
41 raise NotImplementedError() # pragma: nocover
43 def sleep(self, seconds: float) -> None:
44 time.sleep(seconds) # pragma: nocover
47class AsyncNetworkStream:
48 async def read(
49 self, max_bytes: int, timeout: typing.Optional[float] = None
50 ) -> bytes:
51 raise NotImplementedError() # pragma: nocover
53 async def write(
54 self, buffer: bytes, timeout: typing.Optional[float] = None
55 ) -> None:
56 raise NotImplementedError() # pragma: nocover
58 async def aclose(self) -> None:
59 raise NotImplementedError() # pragma: nocover
61 async def start_tls(
62 self,
63 ssl_context: ssl.SSLContext,
64 server_hostname: typing.Optional[str] = None,
65 timeout: typing.Optional[float] = None,
66 ) -> "AsyncNetworkStream":
67 raise NotImplementedError() # pragma: nocover
69 def get_extra_info(self, info: str) -> typing.Any:
70 return None # pragma: nocover
73class AsyncNetworkBackend:
74 async def connect_tcp(
75 self,
76 host: str,
77 port: int,
78 timeout: typing.Optional[float] = None,
79 local_address: typing.Optional[str] = None,
80 ) -> AsyncNetworkStream:
81 raise NotImplementedError() # pragma: nocover
83 async def connect_unix_socket(
84 self, path: str, timeout: typing.Optional[float] = None
85 ) -> AsyncNetworkStream:
86 raise NotImplementedError() # pragma: nocover
88 async def sleep(self, seconds: float) -> None:
89 raise NotImplementedError() # pragma: nocover