Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_backends/base.py: 100%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

24 statements  

1import ssl 

2import time 

3import typing 

4 

5SOCKET_OPTION = typing.Union[ 

6 typing.Tuple[int, int, int], 

7 typing.Tuple[int, int, typing.Union[bytes, bytearray]], 

8 typing.Tuple[int, int, None, int], 

9] 

10 

11 

12class NetworkStream: 

13 def read(self, max_bytes: int, timeout: typing.Optional[float] = None) -> bytes: 

14 raise NotImplementedError() # pragma: nocover 

15 

16 def write(self, buffer: bytes, timeout: typing.Optional[float] = None) -> None: 

17 raise NotImplementedError() # pragma: nocover 

18 

19 def close(self) -> None: 

20 raise NotImplementedError() # pragma: nocover 

21 

22 def start_tls( 

23 self, 

24 ssl_context: ssl.SSLContext, 

25 server_hostname: typing.Optional[str] = None, 

26 timeout: typing.Optional[float] = None, 

27 ) -> "NetworkStream": 

28 raise NotImplementedError() # pragma: nocover 

29 

30 def get_extra_info(self, info: str) -> typing.Any: 

31 return None # pragma: nocover 

32 

33 

34class NetworkBackend: 

35 def connect_tcp( 

36 self, 

37 host: str, 

38 port: int, 

39 timeout: typing.Optional[float] = None, 

40 local_address: typing.Optional[str] = None, 

41 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None, 

42 ) -> NetworkStream: 

43 raise NotImplementedError() # pragma: nocover 

44 

45 def connect_unix_socket( 

46 self, 

47 path: str, 

48 timeout: typing.Optional[float] = None, 

49 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None, 

50 ) -> NetworkStream: 

51 raise NotImplementedError() # pragma: nocover 

52 

53 def sleep(self, seconds: float) -> None: 

54 time.sleep(seconds) # pragma: nocover 

55 

56 

57class AsyncNetworkStream: 

58 async def read( 

59 self, max_bytes: int, timeout: typing.Optional[float] = None 

60 ) -> bytes: 

61 raise NotImplementedError() # pragma: nocover 

62 

63 async def write( 

64 self, buffer: bytes, timeout: typing.Optional[float] = None 

65 ) -> None: 

66 raise NotImplementedError() # pragma: nocover 

67 

68 async def aclose(self) -> None: 

69 raise NotImplementedError() # pragma: nocover 

70 

71 async def start_tls( 

72 self, 

73 ssl_context: ssl.SSLContext, 

74 server_hostname: typing.Optional[str] = None, 

75 timeout: typing.Optional[float] = None, 

76 ) -> "AsyncNetworkStream": 

77 raise NotImplementedError() # pragma: nocover 

78 

79 def get_extra_info(self, info: str) -> typing.Any: 

80 return None # pragma: nocover 

81 

82 

83class AsyncNetworkBackend: 

84 async def connect_tcp( 

85 self, 

86 host: str, 

87 port: int, 

88 timeout: typing.Optional[float] = None, 

89 local_address: typing.Optional[str] = None, 

90 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None, 

91 ) -> AsyncNetworkStream: 

92 raise NotImplementedError() # pragma: nocover 

93 

94 async def connect_unix_socket( 

95 self, 

96 path: str, 

97 timeout: typing.Optional[float] = None, 

98 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None, 

99 ) -> AsyncNetworkStream: 

100 raise NotImplementedError() # pragma: nocover 

101 

102 async def sleep(self, seconds: float) -> None: 

103 raise NotImplementedError() # pragma: nocover