Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/_parsers/base.py: 36%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import logging
2import sys
3from abc import ABC
4from asyncio import IncompleteReadError, StreamReader, TimeoutError
5from typing import Awaitable, Callable, List, Optional, Protocol, Union
7from redis.maint_notifications import (
8 MaintenanceNotification,
9 NodeFailedOverNotification,
10 NodeFailingOverNotification,
11 NodeMigratedNotification,
12 NodeMigratingNotification,
13 NodeMovingNotification,
14)
16if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
17 from asyncio import timeout as async_timeout
18else:
19 from async_timeout import timeout as async_timeout
21from ..exceptions import (
22 AskError,
23 AuthenticationError,
24 AuthenticationWrongNumberOfArgsError,
25 BusyLoadingError,
26 ClusterCrossSlotError,
27 ClusterDownError,
28 ConnectionError,
29 ExecAbortError,
30 ExternalAuthProviderError,
31 MasterDownError,
32 ModuleError,
33 MovedError,
34 NoPermissionError,
35 NoScriptError,
36 OutOfMemoryError,
37 ReadOnlyError,
38 RedisError,
39 ResponseError,
40 TryAgainError,
41)
42from ..typing import EncodableT
43from .encoders import Encoder
44from .socket import SERVER_CLOSED_CONNECTION_ERROR, SocketBuffer
46MODULE_LOAD_ERROR = "Error loading the extension. Please check the server logs."
47NO_SUCH_MODULE_ERROR = "Error unloading module: no such module with that name"
48MODULE_UNLOAD_NOT_POSSIBLE_ERROR = "Error unloading module: operation not possible."
49MODULE_EXPORTS_DATA_TYPES_ERROR = (
50 "Error unloading module: the module "
51 "exports one or more module-side data "
52 "types, can't unload"
53)
54# user send an AUTH cmd to a server without authorization configured
55NO_AUTH_SET_ERROR = {
56 # Redis >= 6.0
57 "AUTH <password> called without any password "
58 "configured for the default user. Are you sure "
59 "your configuration is correct?": AuthenticationError,
60 # Redis < 6.0
61 "Client sent AUTH, but no password is set": AuthenticationError,
62}
64EXTERNAL_AUTH_PROVIDER_ERROR = {
65 "problem with LDAP service": ExternalAuthProviderError,
66}
68logger = logging.getLogger(__name__)
71class BaseParser(ABC):
72 EXCEPTION_CLASSES = {
73 "ERR": {
74 "max number of clients reached": ConnectionError,
75 "invalid password": AuthenticationError,
76 # some Redis server versions report invalid command syntax
77 # in lowercase
78 "wrong number of arguments "
79 "for 'auth' command": AuthenticationWrongNumberOfArgsError,
80 # some Redis server versions report invalid command syntax
81 # in uppercase
82 "wrong number of arguments "
83 "for 'AUTH' command": AuthenticationWrongNumberOfArgsError,
84 MODULE_LOAD_ERROR: ModuleError,
85 MODULE_EXPORTS_DATA_TYPES_ERROR: ModuleError,
86 NO_SUCH_MODULE_ERROR: ModuleError,
87 MODULE_UNLOAD_NOT_POSSIBLE_ERROR: ModuleError,
88 **NO_AUTH_SET_ERROR,
89 **EXTERNAL_AUTH_PROVIDER_ERROR,
90 },
91 "OOM": OutOfMemoryError,
92 "WRONGPASS": AuthenticationError,
93 "EXECABORT": ExecAbortError,
94 "LOADING": BusyLoadingError,
95 "NOSCRIPT": NoScriptError,
96 "READONLY": ReadOnlyError,
97 "NOAUTH": AuthenticationError,
98 "NOPERM": NoPermissionError,
99 "ASK": AskError,
100 "TRYAGAIN": TryAgainError,
101 "MOVED": MovedError,
102 "CLUSTERDOWN": ClusterDownError,
103 "CROSSSLOT": ClusterCrossSlotError,
104 "MASTERDOWN": MasterDownError,
105 }
107 @classmethod
108 def parse_error(cls, response):
109 "Parse an error response"
110 error_code = response.split(" ")[0]
111 if error_code in cls.EXCEPTION_CLASSES:
112 response = response[len(error_code) + 1 :]
113 exception_class = cls.EXCEPTION_CLASSES[error_code]
114 if isinstance(exception_class, dict):
115 exception_class = exception_class.get(response, ResponseError)
116 return exception_class(response)
117 return ResponseError(response)
119 def on_disconnect(self):
120 raise NotImplementedError()
122 def on_connect(self, connection):
123 raise NotImplementedError()
126class _RESPBase(BaseParser):
127 """Base class for sync-based resp parsing"""
129 def __init__(self, socket_read_size):
130 self.socket_read_size = socket_read_size
131 self.encoder = None
132 self._sock = None
133 self._buffer = None
135 def __del__(self):
136 try:
137 self.on_disconnect()
138 except Exception:
139 pass
141 def on_connect(self, connection):
142 "Called when the socket connects"
143 self._sock = connection._sock
144 self._buffer = SocketBuffer(
145 self._sock, self.socket_read_size, connection.socket_timeout
146 )
147 self.encoder = connection.encoder
149 def on_disconnect(self):
150 "Called when the socket disconnects"
151 self._sock = None
152 if self._buffer is not None:
153 self._buffer.close()
154 self._buffer = None
155 self.encoder = None
157 def can_read(self, timeout):
158 return self._buffer and self._buffer.can_read(timeout)
161class AsyncBaseParser(BaseParser):
162 """Base parsing class for the python-backed async parser"""
164 __slots__ = "_stream", "_read_size"
166 def __init__(self, socket_read_size: int):
167 self._stream: Optional[StreamReader] = None
168 self._read_size = socket_read_size
170 async def can_read_destructive(self) -> bool:
171 raise NotImplementedError()
173 async def read_response(
174 self, disable_decoding: bool = False
175 ) -> Union[EncodableT, ResponseError, None, List[EncodableT]]:
176 raise NotImplementedError()
179class MaintenanceNotificationsParser:
180 """Protocol defining maintenance push notification parsing functionality"""
182 @staticmethod
183 def parse_maintenance_start_msg(response, notification_type):
184 # Expected message format is: <notification_type> <seq_number> <time>
185 id = response[1]
186 ttl = response[2]
187 return notification_type(id, ttl)
189 @staticmethod
190 def parse_maintenance_completed_msg(response, notification_type):
191 # Expected message format is: <notification_type> <seq_number>
192 id = response[1]
193 return notification_type(id)
195 @staticmethod
196 def parse_moving_msg(response):
197 # Expected message format is: MOVING <seq_number> <time> <endpoint>
198 id = response[1]
199 ttl = response[2]
200 if response[3] is None:
201 host, port = None, None
202 else:
203 value = response[3]
204 if isinstance(value, bytes):
205 value = value.decode()
206 host, port = value.split(":")
207 port = int(port) if port is not None else None
209 return NodeMovingNotification(id, host, port, ttl)
212_INVALIDATION_MESSAGE = "invalidate"
213_MOVING_MESSAGE = "MOVING"
214_MIGRATING_MESSAGE = "MIGRATING"
215_MIGRATED_MESSAGE = "MIGRATED"
216_FAILING_OVER_MESSAGE = "FAILING_OVER"
217_FAILED_OVER_MESSAGE = "FAILED_OVER"
219_MAINTENANCE_MESSAGES = (
220 _MIGRATING_MESSAGE,
221 _MIGRATED_MESSAGE,
222 _FAILING_OVER_MESSAGE,
223 _FAILED_OVER_MESSAGE,
224)
226MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING: dict[
227 str, tuple[type[MaintenanceNotification], Callable]
228] = {
229 _MIGRATING_MESSAGE: (
230 NodeMigratingNotification,
231 MaintenanceNotificationsParser.parse_maintenance_start_msg,
232 ),
233 _MIGRATED_MESSAGE: (
234 NodeMigratedNotification,
235 MaintenanceNotificationsParser.parse_maintenance_completed_msg,
236 ),
237 _FAILING_OVER_MESSAGE: (
238 NodeFailingOverNotification,
239 MaintenanceNotificationsParser.parse_maintenance_start_msg,
240 ),
241 _FAILED_OVER_MESSAGE: (
242 NodeFailedOverNotification,
243 MaintenanceNotificationsParser.parse_maintenance_completed_msg,
244 ),
245 _MOVING_MESSAGE: (
246 NodeMovingNotification,
247 MaintenanceNotificationsParser.parse_moving_msg,
248 ),
249}
252class PushNotificationsParser(Protocol):
253 """Protocol defining RESP3-specific parsing functionality"""
255 pubsub_push_handler_func: Callable
256 invalidation_push_handler_func: Optional[Callable] = None
257 node_moving_push_handler_func: Optional[Callable] = None
258 maintenance_push_handler_func: Optional[Callable] = None
260 def handle_pubsub_push_response(self, response):
261 """Handle pubsub push responses"""
262 raise NotImplementedError()
264 def handle_push_response(self, response, **kwargs):
265 msg_type = response[0]
266 if isinstance(msg_type, bytes):
267 msg_type = msg_type.decode()
269 if msg_type not in (
270 _INVALIDATION_MESSAGE,
271 *_MAINTENANCE_MESSAGES,
272 _MOVING_MESSAGE,
273 ):
274 return self.pubsub_push_handler_func(response)
276 try:
277 if (
278 msg_type == _INVALIDATION_MESSAGE
279 and self.invalidation_push_handler_func
280 ):
281 return self.invalidation_push_handler_func(response)
283 if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func:
284 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
285 msg_type
286 ][1]
288 notification = parser_function(response)
289 return self.node_moving_push_handler_func(notification)
291 if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
292 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
293 msg_type
294 ][1]
295 notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
296 msg_type
297 ][0]
298 notification = parser_function(response, notification_type)
300 if notification is not None:
301 return self.maintenance_push_handler_func(notification)
302 except Exception as e:
303 logger.error(
304 "Error handling {} message ({}): {}".format(msg_type, response, e)
305 )
307 return None
309 def set_pubsub_push_handler(self, pubsub_push_handler_func):
310 self.pubsub_push_handler_func = pubsub_push_handler_func
312 def set_invalidation_push_handler(self, invalidation_push_handler_func):
313 self.invalidation_push_handler_func = invalidation_push_handler_func
315 def set_node_moving_push_handler(self, node_moving_push_handler_func):
316 self.node_moving_push_handler_func = node_moving_push_handler_func
318 def set_maintenance_push_handler(self, maintenance_push_handler_func):
319 self.maintenance_push_handler_func = maintenance_push_handler_func
322class AsyncPushNotificationsParser(Protocol):
323 """Protocol defining async RESP3-specific parsing functionality"""
325 pubsub_push_handler_func: Callable
326 invalidation_push_handler_func: Optional[Callable] = None
327 node_moving_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
328 maintenance_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
330 async def handle_pubsub_push_response(self, response):
331 """Handle pubsub push responses asynchronously"""
332 raise NotImplementedError()
334 async def handle_push_response(self, response, **kwargs):
335 """Handle push responses asynchronously"""
337 msg_type = response[0]
338 if isinstance(msg_type, bytes):
339 msg_type = msg_type.decode()
341 if msg_type not in (
342 _INVALIDATION_MESSAGE,
343 *_MAINTENANCE_MESSAGES,
344 _MOVING_MESSAGE,
345 ):
346 return await self.pubsub_push_handler_func(response)
348 try:
349 if (
350 msg_type == _INVALIDATION_MESSAGE
351 and self.invalidation_push_handler_func
352 ):
353 return await self.invalidation_push_handler_func(response)
355 if isinstance(msg_type, bytes):
356 msg_type = msg_type.decode()
358 if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func:
359 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
360 msg_type
361 ][1]
362 notification = parser_function(response)
363 return await self.node_moving_push_handler_func(notification)
365 if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
366 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
367 msg_type
368 ][1]
369 notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
370 msg_type
371 ][0]
372 notification = parser_function(response, notification_type)
374 if notification is not None:
375 return await self.maintenance_push_handler_func(notification)
376 except Exception as e:
377 logger.error(
378 "Error handling {} message ({}): {}".format(msg_type, response, e)
379 )
381 return None
383 def set_pubsub_push_handler(self, pubsub_push_handler_func):
384 """Set the pubsub push handler function"""
385 self.pubsub_push_handler_func = pubsub_push_handler_func
387 def set_invalidation_push_handler(self, invalidation_push_handler_func):
388 """Set the invalidation push handler function"""
389 self.invalidation_push_handler_func = invalidation_push_handler_func
391 def set_node_moving_push_handler(self, node_moving_push_handler_func):
392 self.node_moving_push_handler_func = node_moving_push_handler_func
394 def set_maintenance_push_handler(self, maintenance_push_handler_func):
395 self.maintenance_push_handler_func = maintenance_push_handler_func
398class _AsyncRESPBase(AsyncBaseParser):
399 """Base class for async resp parsing"""
401 __slots__ = AsyncBaseParser.__slots__ + ("encoder", "_buffer", "_pos", "_chunks")
403 def __init__(self, socket_read_size: int):
404 super().__init__(socket_read_size)
405 self.encoder: Optional[Encoder] = None
406 self._buffer = b""
407 self._chunks = []
408 self._pos = 0
410 def _clear(self):
411 self._buffer = b""
412 self._chunks.clear()
414 def on_connect(self, connection):
415 """Called when the stream connects"""
416 self._stream = connection._reader
417 if self._stream is None:
418 raise RedisError("Buffer is closed.")
419 self.encoder = connection.encoder
420 self._clear()
421 self._connected = True
423 def on_disconnect(self):
424 """Called when the stream disconnects"""
425 self._connected = False
427 async def can_read_destructive(self) -> bool:
428 if not self._connected:
429 raise RedisError("Buffer is closed.")
430 if self._buffer:
431 return True
432 try:
433 async with async_timeout(0):
434 return self._stream.at_eof()
435 except TimeoutError:
436 return False
438 async def _read(self, length: int) -> bytes:
439 """
440 Read `length` bytes of data. These are assumed to be followed
441 by a '\r\n' terminator which is subsequently discarded.
442 """
443 want = length + 2
444 end = self._pos + want
445 if len(self._buffer) >= end:
446 result = self._buffer[self._pos : end - 2]
447 else:
448 tail = self._buffer[self._pos :]
449 try:
450 data = await self._stream.readexactly(want - len(tail))
451 except IncompleteReadError as error:
452 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from error
453 result = (tail + data)[:-2]
454 self._chunks.append(data)
455 self._pos += want
456 return result
458 async def _readline(self) -> bytes:
459 """
460 read an unknown number of bytes up to the next '\r\n'
461 line separator, which is discarded.
462 """
463 found = self._buffer.find(b"\r\n", self._pos)
464 if found >= 0:
465 result = self._buffer[self._pos : found]
466 else:
467 tail = self._buffer[self._pos :]
468 data = await self._stream.readline()
469 if not data.endswith(b"\r\n"):
470 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
471 result = (tail + data)[:-2]
472 self._chunks.append(data)
473 self._pos += len(result) + 2
474 return result