Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/_parsers/socket.py: 29%

98 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-23 06:16 +0000

1import errno 

2import io 

3import socket 

4from io import SEEK_END 

5from typing import Optional, Union 

6 

7from ..exceptions import ConnectionError, TimeoutError 

8from ..utils import SSL_AVAILABLE 

9 

10NONBLOCKING_EXCEPTION_ERROR_NUMBERS = {BlockingIOError: errno.EWOULDBLOCK} 

11 

12if SSL_AVAILABLE: 

13 import ssl 

14 

15 if hasattr(ssl, "SSLWantReadError"): 

16 NONBLOCKING_EXCEPTION_ERROR_NUMBERS[ssl.SSLWantReadError] = 2 

17 NONBLOCKING_EXCEPTION_ERROR_NUMBERS[ssl.SSLWantWriteError] = 2 

18 else: 

19 NONBLOCKING_EXCEPTION_ERROR_NUMBERS[ssl.SSLError] = 2 

20 

21NONBLOCKING_EXCEPTIONS = tuple(NONBLOCKING_EXCEPTION_ERROR_NUMBERS.keys()) 

22 

23SERVER_CLOSED_CONNECTION_ERROR = "Connection closed by server." 

24SENTINEL = object() 

25 

26SYM_CRLF = b"\r\n" 

27 

28 

29class SocketBuffer: 

30 def __init__( 

31 self, socket: socket.socket, socket_read_size: int, socket_timeout: float 

32 ): 

33 self._sock = socket 

34 self.socket_read_size = socket_read_size 

35 self.socket_timeout = socket_timeout 

36 self._buffer = io.BytesIO() 

37 

38 def unread_bytes(self) -> int: 

39 """ 

40 Remaining unread length of buffer 

41 """ 

42 pos = self._buffer.tell() 

43 end = self._buffer.seek(0, SEEK_END) 

44 self._buffer.seek(pos) 

45 return end - pos 

46 

47 def _read_from_socket( 

48 self, 

49 length: Optional[int] = None, 

50 timeout: Union[float, object] = SENTINEL, 

51 raise_on_timeout: Optional[bool] = True, 

52 ) -> bool: 

53 sock = self._sock 

54 socket_read_size = self.socket_read_size 

55 marker = 0 

56 custom_timeout = timeout is not SENTINEL 

57 

58 buf = self._buffer 

59 current_pos = buf.tell() 

60 buf.seek(0, SEEK_END) 

61 if custom_timeout: 

62 sock.settimeout(timeout) 

63 try: 

64 while True: 

65 data = self._sock.recv(socket_read_size) 

66 # an empty string indicates the server shutdown the socket 

67 if isinstance(data, bytes) and len(data) == 0: 

68 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

69 buf.write(data) 

70 data_length = len(data) 

71 marker += data_length 

72 

73 if length is not None and length > marker: 

74 continue 

75 return True 

76 except socket.timeout: 

77 if raise_on_timeout: 

78 raise TimeoutError("Timeout reading from socket") 

79 return False 

80 except NONBLOCKING_EXCEPTIONS as ex: 

81 # if we're in nonblocking mode and the recv raises a 

82 # blocking error, simply return False indicating that 

83 # there's no data to be read. otherwise raise the 

84 # original exception. 

85 allowed = NONBLOCKING_EXCEPTION_ERROR_NUMBERS.get(ex.__class__, -1) 

86 if not raise_on_timeout and ex.errno == allowed: 

87 return False 

88 raise ConnectionError(f"Error while reading from socket: {ex.args}") 

89 finally: 

90 buf.seek(current_pos) 

91 if custom_timeout: 

92 sock.settimeout(self.socket_timeout) 

93 

94 def can_read(self, timeout: float) -> bool: 

95 return bool(self.unread_bytes()) or self._read_from_socket( 

96 timeout=timeout, raise_on_timeout=False 

97 ) 

98 

99 def read(self, length: int) -> bytes: 

100 length = length + 2 # make sure to read the \r\n terminator 

101 # BufferIO will return less than requested if buffer is short 

102 data = self._buffer.read(length) 

103 missing = length - len(data) 

104 if missing: 

105 # fill up the buffer and read the remainder 

106 self._read_from_socket(missing) 

107 data += self._buffer.read(missing) 

108 return data[:-2] 

109 

110 def readline(self) -> bytes: 

111 buf = self._buffer 

112 data = buf.readline() 

113 while not data.endswith(SYM_CRLF): 

114 # there's more data in the socket that we need 

115 self._read_from_socket() 

116 data += buf.readline() 

117 

118 return data[:-2] 

119 

120 def get_pos(self) -> int: 

121 """ 

122 Get current read position 

123 """ 

124 return self._buffer.tell() 

125 

126 def rewind(self, pos: int) -> None: 

127 """ 

128 Rewind the buffer to a specific position, to re-start reading 

129 """ 

130 self._buffer.seek(pos) 

131 

132 def purge(self) -> None: 

133 """ 

134 After a successful read, purge the read part of buffer 

135 """ 

136 unread = self.unread_bytes() 

137 

138 # Only if we have read all of the buffer do we truncate, to 

139 # reduce the amount of memory thrashing. This heuristic 

140 # can be changed or removed later. 

141 if unread > 0: 

142 return 

143 

144 if unread > 0: 

145 # move unread data to the front 

146 view = self._buffer.getbuffer() 

147 view[:unread] = view[-unread:] 

148 self._buffer.truncate(unread) 

149 self._buffer.seek(0) 

150 

151 def close(self) -> None: 

152 try: 

153 self._buffer.close() 

154 except Exception: 

155 # issue #633 suggests the purge/close somehow raised a 

156 # BadFileDescriptor error. Perhaps the client ran out of 

157 # memory or something else? It's probably OK to ignore 

158 # any error being raised from purge/close since we're 

159 # removing the reference to the instance below. 

160 pass 

161 self._buffer = None 

162 self._sock = None