Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/_parsers/base.py: 34%

120 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-23 06:16 +0000

1import sys 

2from abc import ABC 

3from asyncio import IncompleteReadError, StreamReader, TimeoutError 

4from typing import List, Optional, Union 

5 

6if sys.version_info.major >= 3 and sys.version_info.minor >= 11: 

7 from asyncio import timeout as async_timeout 

8else: 

9 from async_timeout import timeout as async_timeout 

10 

11from ..exceptions import ( 

12 AuthenticationError, 

13 AuthenticationWrongNumberOfArgsError, 

14 BusyLoadingError, 

15 ConnectionError, 

16 ExecAbortError, 

17 ModuleError, 

18 NoPermissionError, 

19 NoScriptError, 

20 OutOfMemoryError, 

21 ReadOnlyError, 

22 RedisError, 

23 ResponseError, 

24) 

25from ..typing import EncodableT 

26from .encoders import Encoder 

27from .socket import SERVER_CLOSED_CONNECTION_ERROR, SocketBuffer 

28 

29MODULE_LOAD_ERROR = "Error loading the extension. " "Please check the server logs." 

30NO_SUCH_MODULE_ERROR = "Error unloading module: no such module with that name" 

31MODULE_UNLOAD_NOT_POSSIBLE_ERROR = "Error unloading module: operation not " "possible." 

32MODULE_EXPORTS_DATA_TYPES_ERROR = ( 

33 "Error unloading module: the module " 

34 "exports one or more module-side data " 

35 "types, can't unload" 

36) 

37# user send an AUTH cmd to a server without authorization configured 

38NO_AUTH_SET_ERROR = { 

39 # Redis >= 6.0 

40 "AUTH <password> called without any password " 

41 "configured for the default user. Are you sure " 

42 "your configuration is correct?": AuthenticationError, 

43 # Redis < 6.0 

44 "Client sent AUTH, but no password is set": AuthenticationError, 

45} 

46 

47 

48class BaseParser(ABC): 

49 EXCEPTION_CLASSES = { 

50 "ERR": { 

51 "max number of clients reached": ConnectionError, 

52 "invalid password": AuthenticationError, 

53 # some Redis server versions report invalid command syntax 

54 # in lowercase 

55 "wrong number of arguments " 

56 "for 'auth' command": AuthenticationWrongNumberOfArgsError, 

57 # some Redis server versions report invalid command syntax 

58 # in uppercase 

59 "wrong number of arguments " 

60 "for 'AUTH' command": AuthenticationWrongNumberOfArgsError, 

61 MODULE_LOAD_ERROR: ModuleError, 

62 MODULE_EXPORTS_DATA_TYPES_ERROR: ModuleError, 

63 NO_SUCH_MODULE_ERROR: ModuleError, 

64 MODULE_UNLOAD_NOT_POSSIBLE_ERROR: ModuleError, 

65 **NO_AUTH_SET_ERROR, 

66 }, 

67 "OOM": OutOfMemoryError, 

68 "WRONGPASS": AuthenticationError, 

69 "EXECABORT": ExecAbortError, 

70 "LOADING": BusyLoadingError, 

71 "NOSCRIPT": NoScriptError, 

72 "READONLY": ReadOnlyError, 

73 "NOAUTH": AuthenticationError, 

74 "NOPERM": NoPermissionError, 

75 } 

76 

77 @classmethod 

78 def parse_error(cls, response): 

79 "Parse an error response" 

80 error_code = response.split(" ")[0] 

81 if error_code in cls.EXCEPTION_CLASSES: 

82 response = response[len(error_code) + 1 :] 

83 exception_class = cls.EXCEPTION_CLASSES[error_code] 

84 if isinstance(exception_class, dict): 

85 exception_class = exception_class.get(response, ResponseError) 

86 return exception_class(response) 

87 return ResponseError(response) 

88 

89 def on_disconnect(self): 

90 raise NotImplementedError() 

91 

92 def on_connect(self, connection): 

93 raise NotImplementedError() 

94 

95 

96class _RESPBase(BaseParser): 

97 """Base class for sync-based resp parsing""" 

98 

99 def __init__(self, socket_read_size): 

100 self.socket_read_size = socket_read_size 

101 self.encoder = None 

102 self._sock = None 

103 self._buffer = None 

104 

105 def __del__(self): 

106 try: 

107 self.on_disconnect() 

108 except Exception: 

109 pass 

110 

111 def on_connect(self, connection): 

112 "Called when the socket connects" 

113 self._sock = connection._sock 

114 self._buffer = SocketBuffer( 

115 self._sock, self.socket_read_size, connection.socket_timeout 

116 ) 

117 self.encoder = connection.encoder 

118 

119 def on_disconnect(self): 

120 "Called when the socket disconnects" 

121 self._sock = None 

122 if self._buffer is not None: 

123 self._buffer.close() 

124 self._buffer = None 

125 self.encoder = None 

126 

127 def can_read(self, timeout): 

128 return self._buffer and self._buffer.can_read(timeout) 

129 

130 

131class AsyncBaseParser(BaseParser): 

132 """Base parsing class for the python-backed async parser""" 

133 

134 __slots__ = "_stream", "_read_size" 

135 

136 def __init__(self, socket_read_size: int): 

137 self._stream: Optional[StreamReader] = None 

138 self._read_size = socket_read_size 

139 

140 async def can_read_destructive(self) -> bool: 

141 raise NotImplementedError() 

142 

143 async def read_response( 

144 self, disable_decoding: bool = False 

145 ) -> Union[EncodableT, ResponseError, None, List[EncodableT]]: 

146 raise NotImplementedError() 

147 

148 

149class _AsyncRESPBase(AsyncBaseParser): 

150 """Base class for async resp parsing""" 

151 

152 __slots__ = AsyncBaseParser.__slots__ + ("encoder", "_buffer", "_pos", "_chunks") 

153 

154 def __init__(self, socket_read_size: int): 

155 super().__init__(socket_read_size) 

156 self.encoder: Optional[Encoder] = None 

157 self._buffer = b"" 

158 self._chunks = [] 

159 self._pos = 0 

160 

161 def _clear(self): 

162 self._buffer = b"" 

163 self._chunks.clear() 

164 

165 def on_connect(self, connection): 

166 """Called when the stream connects""" 

167 self._stream = connection._reader 

168 if self._stream is None: 

169 raise RedisError("Buffer is closed.") 

170 self.encoder = connection.encoder 

171 self._clear() 

172 self._connected = True 

173 

174 def on_disconnect(self): 

175 """Called when the stream disconnects""" 

176 self._connected = False 

177 

178 async def can_read_destructive(self) -> bool: 

179 if not self._connected: 

180 raise RedisError("Buffer is closed.") 

181 if self._buffer: 

182 return True 

183 try: 

184 async with async_timeout(0): 

185 return self._stream.at_eof() 

186 except TimeoutError: 

187 return False 

188 

189 async def _read(self, length: int) -> bytes: 

190 """ 

191 Read `length` bytes of data. These are assumed to be followed 

192 by a '\r\n' terminator which is subsequently discarded. 

193 """ 

194 want = length + 2 

195 end = self._pos + want 

196 if len(self._buffer) >= end: 

197 result = self._buffer[self._pos : end - 2] 

198 else: 

199 tail = self._buffer[self._pos :] 

200 try: 

201 data = await self._stream.readexactly(want - len(tail)) 

202 except IncompleteReadError as error: 

203 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from error 

204 result = (tail + data)[:-2] 

205 self._chunks.append(data) 

206 self._pos += want 

207 return result 

208 

209 async def _readline(self) -> bytes: 

210 """ 

211 read an unknown number of bytes up to the next '\r\n' 

212 line separator, which is discarded. 

213 """ 

214 found = self._buffer.find(b"\r\n", self._pos) 

215 if found >= 0: 

216 result = self._buffer[self._pos : found] 

217 else: 

218 tail = self._buffer[self._pos :] 

219 data = await self._stream.readline() 

220 if not data.endswith(b"\r\n"): 

221 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

222 result = (tail + data)[:-2] 

223 self._chunks.append(data) 

224 self._pos += len(result) + 2 

225 return result