Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/_parsers/socket.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

101 statements  

1import errno 

2import io 

3import socket 

4from io import SEEK_END 

5from typing import Optional, Union 

6 

7from ..exceptions import ConnectionError, TimeoutError 

8from ..utils import SENTINEL, SSL_AVAILABLE 

9 

10NONBLOCKING_EXCEPTION_ERROR_NUMBERS = {BlockingIOError: errno.EWOULDBLOCK} 

11 

12if SSL_AVAILABLE: 

13 import ssl 

14 

15 if hasattr(ssl, "SSLWantReadError"): 

16 NONBLOCKING_EXCEPTION_ERROR_NUMBERS[ssl.SSLWantReadError] = 2 

17 NONBLOCKING_EXCEPTION_ERROR_NUMBERS[ssl.SSLWantWriteError] = 2 

18 else: 

19 NONBLOCKING_EXCEPTION_ERROR_NUMBERS[ssl.SSLError] = 2 

20 

21NONBLOCKING_EXCEPTIONS = tuple(NONBLOCKING_EXCEPTION_ERROR_NUMBERS.keys()) 

22 

23SERVER_CLOSED_CONNECTION_ERROR = "Connection closed by server." 

24 

25SYM_CRLF = b"\r\n" 

26 

27 

28class SocketBuffer: 

29 def __init__( 

30 self, socket: socket.socket, socket_read_size: int, socket_timeout: float 

31 ): 

32 self._sock = socket 

33 self.socket_read_size = socket_read_size 

34 self.socket_timeout = socket_timeout 

35 self._buffer = io.BytesIO() 

36 

37 def unread_bytes(self) -> int: 

38 """ 

39 Remaining unread length of buffer 

40 """ 

41 pos = self._buffer.tell() 

42 end = self._buffer.seek(0, SEEK_END) 

43 self._buffer.seek(pos) 

44 return end - pos 

45 

46 def _read_from_socket( 

47 self, 

48 length: Optional[int] = None, 

49 timeout: Union[float, object] = SENTINEL, 

50 raise_on_timeout: Optional[bool] = True, 

51 ) -> bool: 

52 sock = self._sock 

53 socket_read_size = self.socket_read_size 

54 marker = 0 

55 custom_timeout = timeout is not SENTINEL 

56 

57 buf = self._buffer 

58 current_pos = buf.tell() 

59 buf.seek(0, SEEK_END) 

60 if custom_timeout: 

61 sock.settimeout(timeout) 

62 try: 

63 while True: 

64 data = sock.recv(socket_read_size) 

65 # an empty string indicates the server shutdown the socket 

66 if isinstance(data, bytes) and len(data) == 0: 

67 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

68 buf.write(data) 

69 data_length = len(data) 

70 marker += data_length 

71 

72 if length is not None and length > marker: 

73 continue 

74 return True 

75 except socket.timeout: 

76 if raise_on_timeout: 

77 raise TimeoutError("Timeout reading from socket") 

78 return False 

79 except NONBLOCKING_EXCEPTIONS as ex: 

80 # if we're in nonblocking mode and the recv raises a 

81 # blocking error, simply return False indicating that 

82 # there's no data to be read. otherwise raise the 

83 # original exception. 

84 allowed = NONBLOCKING_EXCEPTION_ERROR_NUMBERS.get(ex.__class__, -1) 

85 if ex.errno == allowed: 

86 if not raise_on_timeout: 

87 return False 

88 if timeout == 0: 

89 raise TimeoutError("Timeout reading from socket") 

90 raise ConnectionError(f"Error while reading from socket: {ex.args}") 

91 finally: 

92 buf.seek(current_pos) 

93 if custom_timeout: 

94 sock.settimeout(self.socket_timeout) 

95 

96 def can_read(self, timeout: float = 0) -> bool: 

97 return bool(self.unread_bytes()) or self._read_from_socket( 

98 timeout=timeout, raise_on_timeout=False 

99 ) 

100 

101 def read(self, length: int, timeout: Union[float, object] = SENTINEL) -> bytes: 

102 length = length + 2 # make sure to read the \r\n terminator 

103 # BufferIO will return less than requested if buffer is short 

104 data = self._buffer.read(length) 

105 missing = length - len(data) 

106 if missing: 

107 # fill up the buffer and read the remainder 

108 self._read_from_socket(length=missing, timeout=timeout) 

109 data += self._buffer.read(missing) 

110 return data[:-2] 

111 

112 def readline(self, timeout: Union[float, object] = SENTINEL) -> bytes: 

113 buf = self._buffer 

114 data = buf.readline() 

115 while not data.endswith(SYM_CRLF): 

116 # there's more data in the socket that we need 

117 self._read_from_socket(timeout=timeout) 

118 data += buf.readline() 

119 

120 return data[:-2] 

121 

122 def get_pos(self) -> int: 

123 """ 

124 Get current read position 

125 """ 

126 return self._buffer.tell() 

127 

128 def rewind(self, pos: int) -> None: 

129 """ 

130 Rewind the buffer to a specific position, to re-start reading 

131 """ 

132 self._buffer.seek(pos) 

133 

134 def purge(self) -> None: 

135 """ 

136 After a successful read, purge the read part of buffer 

137 """ 

138 unread = self.unread_bytes() 

139 

140 # Only if we have read all of the buffer do we truncate, to 

141 # reduce the amount of memory thrashing. This heuristic 

142 # can be changed or removed later. 

143 if unread > 0: 

144 return 

145 

146 if unread > 0: 

147 # move unread data to the front 

148 view = self._buffer.getbuffer() 

149 view[:unread] = view[-unread:] 

150 self._buffer.truncate(unread) 

151 self._buffer.seek(0) 

152 

153 def close(self) -> None: 

154 try: 

155 self._buffer.close() 

156 except Exception: 

157 # issue #633 suggests the purge/close somehow raised a 

158 # BadFileDescriptor error. Perhaps the client ran out of 

159 # memory or something else? It's probably OK to ignore 

160 # any error being raised from purge/close since we're 

161 # removing the reference to the instance below. 

162 pass 

163 self._buffer = None 

164 self._sock = None