1import ssl
2import time
3import typing
4
5SOCKET_OPTION = typing.Union[
6 typing.Tuple[int, int, int],
7 typing.Tuple[int, int, typing.Union[bytes, bytearray]],
8 typing.Tuple[int, int, None, int],
9]
10
11
12class NetworkStream:
13 def read(self, max_bytes: int, timeout: typing.Optional[float] = None) -> bytes:
14 raise NotImplementedError() # pragma: nocover
15
16 def write(self, buffer: bytes, timeout: typing.Optional[float] = None) -> None:
17 raise NotImplementedError() # pragma: nocover
18
19 def close(self) -> None:
20 raise NotImplementedError() # pragma: nocover
21
22 def start_tls(
23 self,
24 ssl_context: ssl.SSLContext,
25 server_hostname: typing.Optional[str] = None,
26 timeout: typing.Optional[float] = None,
27 ) -> "NetworkStream":
28 raise NotImplementedError() # pragma: nocover
29
30 def get_extra_info(self, info: str) -> typing.Any:
31 return None # pragma: nocover
32
33
34class NetworkBackend:
35 def connect_tcp(
36 self,
37 host: str,
38 port: int,
39 timeout: typing.Optional[float] = None,
40 local_address: typing.Optional[str] = None,
41 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None,
42 ) -> NetworkStream:
43 raise NotImplementedError() # pragma: nocover
44
45 def connect_unix_socket(
46 self,
47 path: str,
48 timeout: typing.Optional[float] = None,
49 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None,
50 ) -> NetworkStream:
51 raise NotImplementedError() # pragma: nocover
52
53 def sleep(self, seconds: float) -> None:
54 time.sleep(seconds) # pragma: nocover
55
56
57class AsyncNetworkStream:
58 async def read(
59 self, max_bytes: int, timeout: typing.Optional[float] = None
60 ) -> bytes:
61 raise NotImplementedError() # pragma: nocover
62
63 async def write(
64 self, buffer: bytes, timeout: typing.Optional[float] = None
65 ) -> None:
66 raise NotImplementedError() # pragma: nocover
67
68 async def aclose(self) -> None:
69 raise NotImplementedError() # pragma: nocover
70
71 async def start_tls(
72 self,
73 ssl_context: ssl.SSLContext,
74 server_hostname: typing.Optional[str] = None,
75 timeout: typing.Optional[float] = None,
76 ) -> "AsyncNetworkStream":
77 raise NotImplementedError() # pragma: nocover
78
79 def get_extra_info(self, info: str) -> typing.Any:
80 return None # pragma: nocover
81
82
83class AsyncNetworkBackend:
84 async def connect_tcp(
85 self,
86 host: str,
87 port: int,
88 timeout: typing.Optional[float] = None,
89 local_address: typing.Optional[str] = None,
90 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None,
91 ) -> AsyncNetworkStream:
92 raise NotImplementedError() # pragma: nocover
93
94 async def connect_unix_socket(
95 self,
96 path: str,
97 timeout: typing.Optional[float] = None,
98 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None,
99 ) -> AsyncNetworkStream:
100 raise NotImplementedError() # pragma: nocover
101
102 async def sleep(self, seconds: float) -> None:
103 raise NotImplementedError() # pragma: nocover