Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/backends/base.py: 100%

23 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import ssl 

2import time 

3import typing 

4 

5 

6class NetworkStream: 

7 def read(self, max_bytes: int, timeout: typing.Optional[float] = None) -> bytes: 

8 raise NotImplementedError() # pragma: nocover 

9 

10 def write(self, buffer: bytes, timeout: typing.Optional[float] = None) -> None: 

11 raise NotImplementedError() # pragma: nocover 

12 

13 def close(self) -> None: 

14 raise NotImplementedError() # pragma: nocover 

15 

16 def start_tls( 

17 self, 

18 ssl_context: ssl.SSLContext, 

19 server_hostname: typing.Optional[str] = None, 

20 timeout: typing.Optional[float] = None, 

21 ) -> "NetworkStream": 

22 raise NotImplementedError() # pragma: nocover 

23 

24 def get_extra_info(self, info: str) -> typing.Any: 

25 return None # pragma: nocover 

26 

27 

28class NetworkBackend: 

29 def connect_tcp( 

30 self, 

31 host: str, 

32 port: int, 

33 timeout: typing.Optional[float] = None, 

34 local_address: typing.Optional[str] = None, 

35 ) -> NetworkStream: 

36 raise NotImplementedError() # pragma: nocover 

37 

38 def connect_unix_socket( 

39 self, path: str, timeout: typing.Optional[float] = None 

40 ) -> NetworkStream: 

41 raise NotImplementedError() # pragma: nocover 

42 

43 def sleep(self, seconds: float) -> None: 

44 time.sleep(seconds) # pragma: nocover 

45 

46 

47class AsyncNetworkStream: 

48 async def read( 

49 self, max_bytes: int, timeout: typing.Optional[float] = None 

50 ) -> bytes: 

51 raise NotImplementedError() # pragma: nocover 

52 

53 async def write( 

54 self, buffer: bytes, timeout: typing.Optional[float] = None 

55 ) -> None: 

56 raise NotImplementedError() # pragma: nocover 

57 

58 async def aclose(self) -> None: 

59 raise NotImplementedError() # pragma: nocover 

60 

61 async def start_tls( 

62 self, 

63 ssl_context: ssl.SSLContext, 

64 server_hostname: typing.Optional[str] = None, 

65 timeout: typing.Optional[float] = None, 

66 ) -> "AsyncNetworkStream": 

67 raise NotImplementedError() # pragma: nocover 

68 

69 def get_extra_info(self, info: str) -> typing.Any: 

70 return None # pragma: nocover 

71 

72 

73class AsyncNetworkBackend: 

74 async def connect_tcp( 

75 self, 

76 host: str, 

77 port: int, 

78 timeout: typing.Optional[float] = None, 

79 local_address: typing.Optional[str] = None, 

80 ) -> AsyncNetworkStream: 

81 raise NotImplementedError() # pragma: nocover 

82 

83 async def connect_unix_socket( 

84 self, path: str, timeout: typing.Optional[float] = None 

85 ) -> AsyncNetworkStream: 

86 raise NotImplementedError() # pragma: nocover 

87 

88 async def sleep(self, seconds: float) -> None: 

89 raise NotImplementedError() # pragma: nocover