Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/httpcore/_backends/base.py: 100%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

25 statements  

1from __future__ import annotations 

2 

3import ssl 

4import time 

5import typing 

6 

7SOCKET_OPTION = typing.Union[ 

8 typing.Tuple[int, int, int], 

9 typing.Tuple[int, int, typing.Union[bytes, bytearray]], 

10 typing.Tuple[int, int, None, int], 

11] 

12 

13 

14class NetworkStream: 

15 def read(self, max_bytes: int, timeout: float | None = None) -> bytes: 

16 raise NotImplementedError() # pragma: nocover 

17 

18 def write(self, buffer: bytes, timeout: float | None = None) -> None: 

19 raise NotImplementedError() # pragma: nocover 

20 

21 def close(self) -> None: 

22 raise NotImplementedError() # pragma: nocover 

23 

24 def start_tls( 

25 self, 

26 ssl_context: ssl.SSLContext, 

27 server_hostname: str | None = None, 

28 timeout: float | None = None, 

29 ) -> NetworkStream: 

30 raise NotImplementedError() # pragma: nocover 

31 

32 def get_extra_info(self, info: str) -> typing.Any: 

33 return None # pragma: nocover 

34 

35 

36class NetworkBackend: 

37 def connect_tcp( 

38 self, 

39 host: str, 

40 port: int, 

41 timeout: float | None = None, 

42 local_address: str | None = None, 

43 socket_options: typing.Iterable[SOCKET_OPTION] | None = None, 

44 ) -> NetworkStream: 

45 raise NotImplementedError() # pragma: nocover 

46 

47 def connect_unix_socket( 

48 self, 

49 path: str, 

50 timeout: float | None = None, 

51 socket_options: typing.Iterable[SOCKET_OPTION] | None = None, 

52 ) -> NetworkStream: 

53 raise NotImplementedError() # pragma: nocover 

54 

55 def sleep(self, seconds: float) -> None: 

56 time.sleep(seconds) # pragma: nocover 

57 

58 

59class AsyncNetworkStream: 

60 async def read(self, max_bytes: int, timeout: float | None = None) -> bytes: 

61 raise NotImplementedError() # pragma: nocover 

62 

63 async def write(self, buffer: bytes, timeout: float | None = None) -> None: 

64 raise NotImplementedError() # pragma: nocover 

65 

66 async def aclose(self) -> None: 

67 raise NotImplementedError() # pragma: nocover 

68 

69 async def start_tls( 

70 self, 

71 ssl_context: ssl.SSLContext, 

72 server_hostname: str | None = None, 

73 timeout: float | None = None, 

74 ) -> AsyncNetworkStream: 

75 raise NotImplementedError() # pragma: nocover 

76 

77 def get_extra_info(self, info: str) -> typing.Any: 

78 return None # pragma: nocover 

79 

80 

81class AsyncNetworkBackend: 

82 async def connect_tcp( 

83 self, 

84 host: str, 

85 port: int, 

86 timeout: float | None = None, 

87 local_address: str | None = None, 

88 socket_options: typing.Iterable[SOCKET_OPTION] | None = None, 

89 ) -> AsyncNetworkStream: 

90 raise NotImplementedError() # pragma: nocover 

91 

92 async def connect_unix_socket( 

93 self, 

94 path: str, 

95 timeout: float | None = None, 

96 socket_options: typing.Iterable[SOCKET_OPTION] | None = None, 

97 ) -> AsyncNetworkStream: 

98 raise NotImplementedError() # pragma: nocover 

99 

100 async def sleep(self, seconds: float) -> None: 

101 raise NotImplementedError() # pragma: nocover