Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/backends/sync.py: 30%

53 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import socket 

2import ssl 

3import sys 

4import typing 

5 

6from .._exceptions import ( 

7 ConnectError, 

8 ConnectTimeout, 

9 ExceptionMapping, 

10 ReadError, 

11 ReadTimeout, 

12 WriteError, 

13 WriteTimeout, 

14 map_exceptions, 

15) 

16from .._utils import is_socket_readable 

17from .base import NetworkBackend, NetworkStream 

18 

19 

20class SyncStream(NetworkStream): 

21 def __init__(self, sock: socket.socket) -> None: 

22 self._sock = sock 

23 

24 def read(self, max_bytes: int, timeout: typing.Optional[float] = None) -> bytes: 

25 exc_map: ExceptionMapping = {socket.timeout: ReadTimeout, OSError: ReadError} 

26 with map_exceptions(exc_map): 

27 self._sock.settimeout(timeout) 

28 return self._sock.recv(max_bytes) 

29 

30 def write(self, buffer: bytes, timeout: typing.Optional[float] = None) -> None: 

31 if not buffer: 

32 return 

33 

34 exc_map: ExceptionMapping = {socket.timeout: WriteTimeout, OSError: WriteError} 

35 with map_exceptions(exc_map): 

36 while buffer: 

37 self._sock.settimeout(timeout) 

38 n = self._sock.send(buffer) 

39 buffer = buffer[n:] 

40 

41 def close(self) -> None: 

42 self._sock.close() 

43 

44 def start_tls( 

45 self, 

46 ssl_context: ssl.SSLContext, 

47 server_hostname: typing.Optional[str] = None, 

48 timeout: typing.Optional[float] = None, 

49 ) -> NetworkStream: 

50 exc_map: ExceptionMapping = { 

51 socket.timeout: ConnectTimeout, 

52 OSError: ConnectError, 

53 } 

54 with map_exceptions(exc_map): 

55 try: 

56 self._sock.settimeout(timeout) 

57 sock = ssl_context.wrap_socket( 

58 self._sock, server_hostname=server_hostname 

59 ) 

60 except Exception as exc: # pragma: nocover 

61 self.close() 

62 raise exc 

63 return SyncStream(sock) 

64 

65 def get_extra_info(self, info: str) -> typing.Any: 

66 if info == "ssl_object" and isinstance(self._sock, ssl.SSLSocket): 

67 return self._sock._sslobj # type: ignore 

68 if info == "client_addr": 

69 return self._sock.getsockname() 

70 if info == "server_addr": 

71 return self._sock.getpeername() 

72 if info == "socket": 

73 return self._sock 

74 if info == "is_readable": 

75 return is_socket_readable(self._sock) 

76 return None 

77 

78 

79class SyncBackend(NetworkBackend): 

80 def connect_tcp( 

81 self, 

82 host: str, 

83 port: int, 

84 timeout: typing.Optional[float] = None, 

85 local_address: typing.Optional[str] = None, 

86 ) -> NetworkStream: 

87 address = (host, port) 

88 source_address = None if local_address is None else (local_address, 0) 

89 exc_map: ExceptionMapping = { 

90 socket.timeout: ConnectTimeout, 

91 OSError: ConnectError, 

92 } 

93 with map_exceptions(exc_map): 

94 sock = socket.create_connection( 

95 address, timeout, source_address=source_address 

96 ) 

97 return SyncStream(sock) 

98 

99 def connect_unix_socket( 

100 self, path: str, timeout: typing.Optional[float] = None 

101 ) -> NetworkStream: # pragma: nocover 

102 if sys.platform == "win32": 

103 raise RuntimeError( 

104 "Attempted to connect to a UNIX socket on a Windows system." 

105 ) 

106 

107 exc_map: ExceptionMapping = { 

108 socket.timeout: ConnectTimeout, 

109 OSError: ConnectError, 

110 } 

111 with map_exceptions(exc_map): 

112 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 

113 sock.settimeout(timeout) 

114 sock.connect(path) 

115 return SyncStream(sock)