Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/backends/sync.py: 29%

56 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1import socket 

2import ssl 

3import sys 

4import typing 

5 

6from .._exceptions import ( 

7 ConnectError, 

8 ConnectTimeout, 

9 ExceptionMapping, 

10 ReadError, 

11 ReadTimeout, 

12 WriteError, 

13 WriteTimeout, 

14 map_exceptions, 

15) 

16from .._utils import is_socket_readable 

17from .base import SOCKET_OPTION, NetworkBackend, NetworkStream 

18 

19 

20class SyncStream(NetworkStream): 

21 def __init__(self, sock: socket.socket) -> None: 

22 self._sock = sock 

23 

24 def read(self, max_bytes: int, timeout: typing.Optional[float] = None) -> bytes: 

25 exc_map: ExceptionMapping = {socket.timeout: ReadTimeout, OSError: ReadError} 

26 with map_exceptions(exc_map): 

27 self._sock.settimeout(timeout) 

28 return self._sock.recv(max_bytes) 

29 

30 def write(self, buffer: bytes, timeout: typing.Optional[float] = None) -> None: 

31 if not buffer: 

32 return 

33 

34 exc_map: ExceptionMapping = {socket.timeout: WriteTimeout, OSError: WriteError} 

35 with map_exceptions(exc_map): 

36 while buffer: 

37 self._sock.settimeout(timeout) 

38 n = self._sock.send(buffer) 

39 buffer = buffer[n:] 

40 

41 def close(self) -> None: 

42 self._sock.close() 

43 

44 def start_tls( 

45 self, 

46 ssl_context: ssl.SSLContext, 

47 server_hostname: typing.Optional[str] = None, 

48 timeout: typing.Optional[float] = None, 

49 ) -> NetworkStream: 

50 exc_map: ExceptionMapping = { 

51 socket.timeout: ConnectTimeout, 

52 OSError: ConnectError, 

53 } 

54 with map_exceptions(exc_map): 

55 try: 

56 self._sock.settimeout(timeout) 

57 sock = ssl_context.wrap_socket( 

58 self._sock, server_hostname=server_hostname 

59 ) 

60 except Exception as exc: # pragma: nocover 

61 self.close() 

62 raise exc 

63 return SyncStream(sock) 

64 

65 def get_extra_info(self, info: str) -> typing.Any: 

66 if info == "ssl_object" and isinstance(self._sock, ssl.SSLSocket): 

67 return self._sock._sslobj # type: ignore 

68 if info == "client_addr": 

69 return self._sock.getsockname() 

70 if info == "server_addr": 

71 return self._sock.getpeername() 

72 if info == "socket": 

73 return self._sock 

74 if info == "is_readable": 

75 return is_socket_readable(self._sock) 

76 return None 

77 

78 

79class SyncBackend(NetworkBackend): 

80 def connect_tcp( 

81 self, 

82 host: str, 

83 port: int, 

84 timeout: typing.Optional[float] = None, 

85 local_address: typing.Optional[str] = None, 

86 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None, 

87 ) -> NetworkStream: 

88 # Note that we automatically include `TCP_NODELAY` 

89 # in addition to any other custom socket options. 

90 if socket_options is None: 

91 socket_options = [] # pragma: no cover 

92 address = (host, port) 

93 source_address = None if local_address is None else (local_address, 0) 

94 exc_map: ExceptionMapping = { 

95 socket.timeout: ConnectTimeout, 

96 OSError: ConnectError, 

97 } 

98 

99 with map_exceptions(exc_map): 

100 sock = socket.create_connection( 

101 address, 

102 timeout, 

103 source_address=source_address, 

104 ) 

105 for option in socket_options: 

106 sock.setsockopt(*option) # pragma: no cover 

107 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 

108 return SyncStream(sock) 

109 

110 def connect_unix_socket( 

111 self, 

112 path: str, 

113 timeout: typing.Optional[float] = None, 

114 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None, 

115 ) -> NetworkStream: # pragma: nocover 

116 if sys.platform == "win32": 

117 raise RuntimeError( 

118 "Attempted to connect to a UNIX socket on a Windows system." 

119 ) 

120 if socket_options is None: 

121 socket_options = [] 

122 

123 exc_map: ExceptionMapping = { 

124 socket.timeout: ConnectTimeout, 

125 OSError: ConnectError, 

126 } 

127 with map_exceptions(exc_map): 

128 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 

129 for option in socket_options: 

130 sock.setsockopt(*option) 

131 sock.settimeout(timeout) 

132 sock.connect(path) 

133 return SyncStream(sock)