Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/_parsers/base.py: 34%
120 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 06:16 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 06:16 +0000
1import sys
2from abc import ABC
3from asyncio import IncompleteReadError, StreamReader, TimeoutError
4from typing import List, Optional, Union
6if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
7 from asyncio import timeout as async_timeout
8else:
9 from async_timeout import timeout as async_timeout
11from ..exceptions import (
12 AuthenticationError,
13 AuthenticationWrongNumberOfArgsError,
14 BusyLoadingError,
15 ConnectionError,
16 ExecAbortError,
17 ModuleError,
18 NoPermissionError,
19 NoScriptError,
20 OutOfMemoryError,
21 ReadOnlyError,
22 RedisError,
23 ResponseError,
24)
25from ..typing import EncodableT
26from .encoders import Encoder
27from .socket import SERVER_CLOSED_CONNECTION_ERROR, SocketBuffer
29MODULE_LOAD_ERROR = "Error loading the extension. " "Please check the server logs."
30NO_SUCH_MODULE_ERROR = "Error unloading module: no such module with that name"
31MODULE_UNLOAD_NOT_POSSIBLE_ERROR = "Error unloading module: operation not " "possible."
32MODULE_EXPORTS_DATA_TYPES_ERROR = (
33 "Error unloading module: the module "
34 "exports one or more module-side data "
35 "types, can't unload"
36)
37# user send an AUTH cmd to a server without authorization configured
38NO_AUTH_SET_ERROR = {
39 # Redis >= 6.0
40 "AUTH <password> called without any password "
41 "configured for the default user. Are you sure "
42 "your configuration is correct?": AuthenticationError,
43 # Redis < 6.0
44 "Client sent AUTH, but no password is set": AuthenticationError,
45}
48class BaseParser(ABC):
49 EXCEPTION_CLASSES = {
50 "ERR": {
51 "max number of clients reached": ConnectionError,
52 "invalid password": AuthenticationError,
53 # some Redis server versions report invalid command syntax
54 # in lowercase
55 "wrong number of arguments "
56 "for 'auth' command": AuthenticationWrongNumberOfArgsError,
57 # some Redis server versions report invalid command syntax
58 # in uppercase
59 "wrong number of arguments "
60 "for 'AUTH' command": AuthenticationWrongNumberOfArgsError,
61 MODULE_LOAD_ERROR: ModuleError,
62 MODULE_EXPORTS_DATA_TYPES_ERROR: ModuleError,
63 NO_SUCH_MODULE_ERROR: ModuleError,
64 MODULE_UNLOAD_NOT_POSSIBLE_ERROR: ModuleError,
65 **NO_AUTH_SET_ERROR,
66 },
67 "OOM": OutOfMemoryError,
68 "WRONGPASS": AuthenticationError,
69 "EXECABORT": ExecAbortError,
70 "LOADING": BusyLoadingError,
71 "NOSCRIPT": NoScriptError,
72 "READONLY": ReadOnlyError,
73 "NOAUTH": AuthenticationError,
74 "NOPERM": NoPermissionError,
75 }
77 @classmethod
78 def parse_error(cls, response):
79 "Parse an error response"
80 error_code = response.split(" ")[0]
81 if error_code in cls.EXCEPTION_CLASSES:
82 response = response[len(error_code) + 1 :]
83 exception_class = cls.EXCEPTION_CLASSES[error_code]
84 if isinstance(exception_class, dict):
85 exception_class = exception_class.get(response, ResponseError)
86 return exception_class(response)
87 return ResponseError(response)
89 def on_disconnect(self):
90 raise NotImplementedError()
92 def on_connect(self, connection):
93 raise NotImplementedError()
96class _RESPBase(BaseParser):
97 """Base class for sync-based resp parsing"""
99 def __init__(self, socket_read_size):
100 self.socket_read_size = socket_read_size
101 self.encoder = None
102 self._sock = None
103 self._buffer = None
105 def __del__(self):
106 try:
107 self.on_disconnect()
108 except Exception:
109 pass
111 def on_connect(self, connection):
112 "Called when the socket connects"
113 self._sock = connection._sock
114 self._buffer = SocketBuffer(
115 self._sock, self.socket_read_size, connection.socket_timeout
116 )
117 self.encoder = connection.encoder
119 def on_disconnect(self):
120 "Called when the socket disconnects"
121 self._sock = None
122 if self._buffer is not None:
123 self._buffer.close()
124 self._buffer = None
125 self.encoder = None
127 def can_read(self, timeout):
128 return self._buffer and self._buffer.can_read(timeout)
131class AsyncBaseParser(BaseParser):
132 """Base parsing class for the python-backed async parser"""
134 __slots__ = "_stream", "_read_size"
136 def __init__(self, socket_read_size: int):
137 self._stream: Optional[StreamReader] = None
138 self._read_size = socket_read_size
140 async def can_read_destructive(self) -> bool:
141 raise NotImplementedError()
143 async def read_response(
144 self, disable_decoding: bool = False
145 ) -> Union[EncodableT, ResponseError, None, List[EncodableT]]:
146 raise NotImplementedError()
149class _AsyncRESPBase(AsyncBaseParser):
150 """Base class for async resp parsing"""
152 __slots__ = AsyncBaseParser.__slots__ + ("encoder", "_buffer", "_pos", "_chunks")
154 def __init__(self, socket_read_size: int):
155 super().__init__(socket_read_size)
156 self.encoder: Optional[Encoder] = None
157 self._buffer = b""
158 self._chunks = []
159 self._pos = 0
161 def _clear(self):
162 self._buffer = b""
163 self._chunks.clear()
165 def on_connect(self, connection):
166 """Called when the stream connects"""
167 self._stream = connection._reader
168 if self._stream is None:
169 raise RedisError("Buffer is closed.")
170 self.encoder = connection.encoder
171 self._clear()
172 self._connected = True
174 def on_disconnect(self):
175 """Called when the stream disconnects"""
176 self._connected = False
178 async def can_read_destructive(self) -> bool:
179 if not self._connected:
180 raise RedisError("Buffer is closed.")
181 if self._buffer:
182 return True
183 try:
184 async with async_timeout(0):
185 return self._stream.at_eof()
186 except TimeoutError:
187 return False
189 async def _read(self, length: int) -> bytes:
190 """
191 Read `length` bytes of data. These are assumed to be followed
192 by a '\r\n' terminator which is subsequently discarded.
193 """
194 want = length + 2
195 end = self._pos + want
196 if len(self._buffer) >= end:
197 result = self._buffer[self._pos : end - 2]
198 else:
199 tail = self._buffer[self._pos :]
200 try:
201 data = await self._stream.readexactly(want - len(tail))
202 except IncompleteReadError as error:
203 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from error
204 result = (tail + data)[:-2]
205 self._chunks.append(data)
206 self._pos += want
207 return result
209 async def _readline(self) -> bytes:
210 """
211 read an unknown number of bytes up to the next '\r\n'
212 line separator, which is discarded.
213 """
214 found = self._buffer.find(b"\r\n", self._pos)
215 if found >= 0:
216 result = self._buffer[self._pos : found]
217 else:
218 tail = self._buffer[self._pos :]
219 data = await self._stream.readline()
220 if not data.endswith(b"\r\n"):
221 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
222 result = (tail + data)[:-2]
223 self._chunks.append(data)
224 self._pos += len(result) + 2
225 return result