1from __future__ import annotations
2
3import ssl
4import time
5import typing
6
7SOCKET_OPTION = typing.Union[
8 typing.Tuple[int, int, int],
9 typing.Tuple[int, int, typing.Union[bytes, bytearray]],
10 typing.Tuple[int, int, None, int],
11]
12
13
14class NetworkStream:
15 def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
16 raise NotImplementedError() # pragma: nocover
17
18 def write(self, buffer: bytes, timeout: float | None = None) -> None:
19 raise NotImplementedError() # pragma: nocover
20
21 def close(self) -> None:
22 raise NotImplementedError() # pragma: nocover
23
24 def start_tls(
25 self,
26 ssl_context: ssl.SSLContext,
27 server_hostname: str | None = None,
28 timeout: float | None = None,
29 ) -> NetworkStream:
30 raise NotImplementedError() # pragma: nocover
31
32 def get_extra_info(self, info: str) -> typing.Any:
33 return None # pragma: nocover
34
35
36class NetworkBackend:
37 def connect_tcp(
38 self,
39 host: str,
40 port: int,
41 timeout: float | None = None,
42 local_address: str | None = None,
43 socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
44 ) -> NetworkStream:
45 raise NotImplementedError() # pragma: nocover
46
47 def connect_unix_socket(
48 self,
49 path: str,
50 timeout: float | None = None,
51 socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
52 ) -> NetworkStream:
53 raise NotImplementedError() # pragma: nocover
54
55 def sleep(self, seconds: float) -> None:
56 time.sleep(seconds) # pragma: nocover
57
58
59class AsyncNetworkStream:
60 async def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
61 raise NotImplementedError() # pragma: nocover
62
63 async def write(self, buffer: bytes, timeout: float | None = None) -> None:
64 raise NotImplementedError() # pragma: nocover
65
66 async def aclose(self) -> None:
67 raise NotImplementedError() # pragma: nocover
68
69 async def start_tls(
70 self,
71 ssl_context: ssl.SSLContext,
72 server_hostname: str | None = None,
73 timeout: float | None = None,
74 ) -> AsyncNetworkStream:
75 raise NotImplementedError() # pragma: nocover
76
77 def get_extra_info(self, info: str) -> typing.Any:
78 return None # pragma: nocover
79
80
81class AsyncNetworkBackend:
82 async def connect_tcp(
83 self,
84 host: str,
85 port: int,
86 timeout: float | None = None,
87 local_address: str | None = None,
88 socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
89 ) -> AsyncNetworkStream:
90 raise NotImplementedError() # pragma: nocover
91
92 async def connect_unix_socket(
93 self,
94 path: str,
95 timeout: float | None = None,
96 socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
97 ) -> AsyncNetworkStream:
98 raise NotImplementedError() # pragma: nocover
99
100 async def sleep(self, seconds: float) -> None:
101 raise NotImplementedError() # pragma: nocover