Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/_parsers/base.py: 34%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import logging
2import sys
3from abc import ABC
4from asyncio import IncompleteReadError, StreamReader, TimeoutError
5from typing import Awaitable, Callable, List, Optional, Protocol, Union
7from redis.maint_notifications import (
8 MaintenanceNotification,
9 NodeFailedOverNotification,
10 NodeFailingOverNotification,
11 NodeMigratedNotification,
12 NodeMigratingNotification,
13 NodeMovingNotification,
14 OSSNodeMigratedNotification,
15 OSSNodeMigratingNotification,
16)
17from redis.utils import safe_str
19if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
20 from asyncio import timeout as async_timeout
21else:
22 from async_timeout import timeout as async_timeout
24from ..exceptions import (
25 AskError,
26 AuthenticationError,
27 AuthenticationWrongNumberOfArgsError,
28 BusyLoadingError,
29 ClusterCrossSlotError,
30 ClusterDownError,
31 ConnectionError,
32 ExecAbortError,
33 ExternalAuthProviderError,
34 MasterDownError,
35 ModuleError,
36 MovedError,
37 NoPermissionError,
38 NoScriptError,
39 OutOfMemoryError,
40 ReadOnlyError,
41 ResponseError,
42 TryAgainError,
43)
44from ..typing import EncodableT
45from .encoders import Encoder
46from .socket import SERVER_CLOSED_CONNECTION_ERROR, SocketBuffer
48MODULE_LOAD_ERROR = "Error loading the extension. Please check the server logs."
49NO_SUCH_MODULE_ERROR = "Error unloading module: no such module with that name"
50MODULE_UNLOAD_NOT_POSSIBLE_ERROR = "Error unloading module: operation not possible."
51MODULE_EXPORTS_DATA_TYPES_ERROR = (
52 "Error unloading module: the module "
53 "exports one or more module-side data "
54 "types, can't unload"
55)
56# user send an AUTH cmd to a server without authorization configured
57NO_AUTH_SET_ERROR = {
58 # Redis >= 6.0
59 "AUTH <password> called without any password "
60 "configured for the default user. Are you sure "
61 "your configuration is correct?": AuthenticationError,
62 # Redis < 6.0
63 "Client sent AUTH, but no password is set": AuthenticationError,
64}
66EXTERNAL_AUTH_PROVIDER_ERROR = {
67 "problem with LDAP service": ExternalAuthProviderError,
68}
70logger = logging.getLogger(__name__)
73class BaseParser(ABC):
74 EXCEPTION_CLASSES = {
75 "ERR": {
76 "max number of clients reached": ConnectionError,
77 "invalid password": AuthenticationError,
78 # some Redis server versions report invalid command syntax
79 # in lowercase
80 "wrong number of arguments "
81 "for 'auth' command": AuthenticationWrongNumberOfArgsError,
82 # some Redis server versions report invalid command syntax
83 # in uppercase
84 "wrong number of arguments "
85 "for 'AUTH' command": AuthenticationWrongNumberOfArgsError,
86 MODULE_LOAD_ERROR: ModuleError,
87 MODULE_EXPORTS_DATA_TYPES_ERROR: ModuleError,
88 NO_SUCH_MODULE_ERROR: ModuleError,
89 MODULE_UNLOAD_NOT_POSSIBLE_ERROR: ModuleError,
90 **NO_AUTH_SET_ERROR,
91 **EXTERNAL_AUTH_PROVIDER_ERROR,
92 },
93 "OOM": OutOfMemoryError,
94 "WRONGPASS": AuthenticationError,
95 "EXECABORT": ExecAbortError,
96 "LOADING": BusyLoadingError,
97 "NOSCRIPT": NoScriptError,
98 "READONLY": ReadOnlyError,
99 "NOAUTH": AuthenticationError,
100 "NOPERM": NoPermissionError,
101 "ASK": AskError,
102 "TRYAGAIN": TryAgainError,
103 "MOVED": MovedError,
104 "CLUSTERDOWN": ClusterDownError,
105 "CROSSSLOT": ClusterCrossSlotError,
106 "MASTERDOWN": MasterDownError,
107 }
109 @classmethod
110 def parse_error(cls, response):
111 "Parse an error response"
112 error_code = response.split(" ")[0]
113 if error_code in cls.EXCEPTION_CLASSES:
114 response = response[len(error_code) + 1 :]
115 exception_class = cls.EXCEPTION_CLASSES[error_code]
116 if isinstance(exception_class, dict):
117 exception_class = exception_class.get(response, ResponseError)
118 return exception_class(response, status_code=error_code)
119 return ResponseError(response)
121 def on_disconnect(self):
122 raise NotImplementedError()
124 def on_connect(self, connection):
125 raise NotImplementedError()
128class _RESPBase(BaseParser):
129 """Base class for sync-based resp parsing"""
131 def __init__(self, socket_read_size):
132 self.socket_read_size = socket_read_size
133 self.encoder = None
134 self._sock = None
135 self._buffer = None
137 def __del__(self):
138 try:
139 self.on_disconnect()
140 except Exception:
141 pass
143 def on_connect(self, connection):
144 "Called when the socket connects"
145 self._sock = connection._sock
146 self._buffer = SocketBuffer(
147 self._sock, self.socket_read_size, connection.socket_timeout
148 )
149 self.encoder = connection.encoder
151 def on_disconnect(self):
152 "Called when the socket disconnects"
153 self._sock = None
154 if self._buffer is not None:
155 self._buffer.close()
156 self._buffer = None
157 self.encoder = None
159 def can_read(self, timeout):
160 return self._buffer and self._buffer.can_read(timeout)
163class AsyncBaseParser(BaseParser):
164 """Base parsing class for the python-backed async parser"""
166 __slots__ = "_stream", "_read_size"
168 def __init__(self, socket_read_size: int):
169 self._stream: Optional[StreamReader] = None
170 self._read_size = socket_read_size
172 async def can_read_destructive(self) -> bool:
173 raise NotImplementedError()
175 async def read_response(
176 self, disable_decoding: bool = False
177 ) -> Union[EncodableT, ResponseError, None, List[EncodableT]]:
178 raise NotImplementedError()
181class MaintenanceNotificationsParser:
182 """Protocol defining maintenance push notification parsing functionality"""
184 @staticmethod
185 def parse_oss_maintenance_start_msg(response):
186 # Expected message format is:
187 # SMIGRATING <seq_number> <slot, range1-range2,...>
188 id = response[1]
189 slots = safe_str(response[2])
190 return OSSNodeMigratingNotification(id, slots)
192 @staticmethod
193 def parse_oss_maintenance_completed_msg(response):
194 # Expected message format is:
195 # SMIGRATED <seq_number> [[<src_host:port> <dest_host:port> <slot_range>], ...]
196 id = response[1]
197 nodes_to_slots_mapping_data = response[2]
198 # Build the nodes_to_slots_mapping dict structure:
199 # {
200 # "src_host:port": [
201 # {"dest_host:port": "slot_range"},
202 # ...
203 # ],
204 # ...
205 # }
206 nodes_to_slots_mapping = {}
207 for src_node, dest_node, slots in nodes_to_slots_mapping_data:
208 src_node_str = safe_str(src_node)
209 dest_node_str = safe_str(dest_node)
210 slots_str = safe_str(slots)
212 if src_node_str not in nodes_to_slots_mapping:
213 nodes_to_slots_mapping[src_node_str] = []
214 nodes_to_slots_mapping[src_node_str].append({dest_node_str: slots_str})
216 return OSSNodeMigratedNotification(id, nodes_to_slots_mapping)
218 @staticmethod
219 def parse_maintenance_start_msg(response, notification_type):
220 # Expected message format is: <notification_type> <seq_number> <time>
221 # Examples:
222 # MIGRATING 1 10
223 # FAILING_OVER 2 20
224 id = response[1]
225 ttl = response[2]
226 return notification_type(id, ttl)
228 @staticmethod
229 def parse_maintenance_completed_msg(response, notification_type):
230 # Expected message format is: <notification_type> <seq_number>
231 # Examples:
232 # MIGRATED 1
233 # FAILED_OVER 2
234 id = response[1]
235 return notification_type(id)
237 @staticmethod
238 def parse_moving_msg(response):
239 # Expected message format is: MOVING <seq_number> <time> <endpoint>
240 id = response[1]
241 ttl = response[2]
242 if response[3] is None:
243 host, port = None, None
244 else:
245 value = safe_str(response[3])
246 host, port = value.split(":")
247 port = int(port) if port is not None else None
249 return NodeMovingNotification(id, host, port, ttl)
252_INVALIDATION_MESSAGE = "invalidate"
253_MOVING_MESSAGE = "MOVING"
254_MIGRATING_MESSAGE = "MIGRATING"
255_MIGRATED_MESSAGE = "MIGRATED"
256_FAILING_OVER_MESSAGE = "FAILING_OVER"
257_FAILED_OVER_MESSAGE = "FAILED_OVER"
258_SMIGRATING_MESSAGE = "SMIGRATING"
259_SMIGRATED_MESSAGE = "SMIGRATED"
261_MAINTENANCE_MESSAGES = (
262 _MIGRATING_MESSAGE,
263 _MIGRATED_MESSAGE,
264 _FAILING_OVER_MESSAGE,
265 _FAILED_OVER_MESSAGE,
266 _SMIGRATING_MESSAGE,
267)
269MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING: dict[
270 str, tuple[type[MaintenanceNotification], Callable]
271] = {
272 _MIGRATING_MESSAGE: (
273 NodeMigratingNotification,
274 MaintenanceNotificationsParser.parse_maintenance_start_msg,
275 ),
276 _MIGRATED_MESSAGE: (
277 NodeMigratedNotification,
278 MaintenanceNotificationsParser.parse_maintenance_completed_msg,
279 ),
280 _FAILING_OVER_MESSAGE: (
281 NodeFailingOverNotification,
282 MaintenanceNotificationsParser.parse_maintenance_start_msg,
283 ),
284 _FAILED_OVER_MESSAGE: (
285 NodeFailedOverNotification,
286 MaintenanceNotificationsParser.parse_maintenance_completed_msg,
287 ),
288 _MOVING_MESSAGE: (
289 NodeMovingNotification,
290 MaintenanceNotificationsParser.parse_moving_msg,
291 ),
292 _SMIGRATING_MESSAGE: (
293 OSSNodeMigratingNotification,
294 MaintenanceNotificationsParser.parse_oss_maintenance_start_msg,
295 ),
296 _SMIGRATED_MESSAGE: (
297 OSSNodeMigratedNotification,
298 MaintenanceNotificationsParser.parse_oss_maintenance_completed_msg,
299 ),
300}
303class PushNotificationsParser(Protocol):
304 """Protocol defining RESP3-specific parsing functionality"""
306 pubsub_push_handler_func: Callable
307 invalidation_push_handler_func: Optional[Callable] = None
308 node_moving_push_handler_func: Optional[Callable] = None
309 maintenance_push_handler_func: Optional[Callable] = None
310 oss_cluster_maint_push_handler_func: Optional[Callable] = None
312 def handle_pubsub_push_response(self, response):
313 """Handle pubsub push responses"""
314 raise NotImplementedError()
316 def handle_push_response(self, response, **kwargs):
317 msg_type = response[0]
318 if isinstance(msg_type, bytes):
319 msg_type = msg_type.decode()
321 if msg_type not in (
322 _INVALIDATION_MESSAGE,
323 *_MAINTENANCE_MESSAGES,
324 _MOVING_MESSAGE,
325 _SMIGRATED_MESSAGE,
326 ):
327 return self.pubsub_push_handler_func(response)
329 try:
330 if (
331 msg_type == _INVALIDATION_MESSAGE
332 and self.invalidation_push_handler_func
333 ):
334 return self.invalidation_push_handler_func(response)
336 if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func:
337 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
338 msg_type
339 ][1]
341 notification = parser_function(response)
342 return self.node_moving_push_handler_func(notification)
344 if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
345 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
346 msg_type
347 ][1]
348 if msg_type == _SMIGRATING_MESSAGE:
349 notification = parser_function(response)
350 else:
351 notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
352 msg_type
353 ][0]
354 notification = parser_function(response, notification_type)
356 if notification is not None:
357 return self.maintenance_push_handler_func(notification)
358 if msg_type == _SMIGRATED_MESSAGE and (
359 self.oss_cluster_maint_push_handler_func
360 or self.maintenance_push_handler_func
361 ):
362 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
363 msg_type
364 ][1]
365 notification = parser_function(response)
367 if notification is not None:
368 if self.maintenance_push_handler_func:
369 self.maintenance_push_handler_func(notification)
370 if self.oss_cluster_maint_push_handler_func:
371 self.oss_cluster_maint_push_handler_func(notification)
372 except Exception as e:
373 logger.error(
374 "Error handling {} message ({}): {}".format(msg_type, response, e)
375 )
377 return None
379 def set_pubsub_push_handler(self, pubsub_push_handler_func):
380 self.pubsub_push_handler_func = pubsub_push_handler_func
382 def set_invalidation_push_handler(self, invalidation_push_handler_func):
383 self.invalidation_push_handler_func = invalidation_push_handler_func
385 def set_node_moving_push_handler(self, node_moving_push_handler_func):
386 self.node_moving_push_handler_func = node_moving_push_handler_func
388 def set_maintenance_push_handler(self, maintenance_push_handler_func):
389 self.maintenance_push_handler_func = maintenance_push_handler_func
391 def set_oss_cluster_maint_push_handler(self, oss_cluster_maint_push_handler_func):
392 self.oss_cluster_maint_push_handler_func = oss_cluster_maint_push_handler_func
395class AsyncPushNotificationsParser(Protocol):
396 """Protocol defining async RESP3-specific parsing functionality"""
398 pubsub_push_handler_func: Callable
399 invalidation_push_handler_func: Optional[Callable] = None
400 node_moving_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
401 maintenance_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
402 oss_cluster_maint_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
404 async def handle_pubsub_push_response(self, response):
405 """Handle pubsub push responses asynchronously"""
406 raise NotImplementedError()
408 async def handle_push_response(self, response, **kwargs):
409 """Handle push responses asynchronously"""
411 msg_type = response[0]
412 if isinstance(msg_type, bytes):
413 msg_type = msg_type.decode()
415 if msg_type not in (
416 _INVALIDATION_MESSAGE,
417 *_MAINTENANCE_MESSAGES,
418 _MOVING_MESSAGE,
419 _SMIGRATED_MESSAGE,
420 ):
421 return await self.pubsub_push_handler_func(response)
423 try:
424 if (
425 msg_type == _INVALIDATION_MESSAGE
426 and self.invalidation_push_handler_func
427 ):
428 return await self.invalidation_push_handler_func(response)
430 if isinstance(msg_type, bytes):
431 msg_type = msg_type.decode()
433 if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func:
434 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
435 msg_type
436 ][1]
437 notification = parser_function(response)
438 return await self.node_moving_push_handler_func(notification)
440 if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
441 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
442 msg_type
443 ][1]
444 if msg_type == _SMIGRATING_MESSAGE:
445 notification = parser_function(response)
446 else:
447 notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
448 msg_type
449 ][0]
450 notification = parser_function(response, notification_type)
452 if notification is not None:
453 return await self.maintenance_push_handler_func(notification)
454 if (
455 msg_type == _SMIGRATED_MESSAGE
456 and self.oss_cluster_maint_push_handler_func
457 ):
458 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
459 msg_type
460 ][1]
461 notification = parser_function(response)
462 if notification is not None:
463 return await self.oss_cluster_maint_push_handler_func(notification)
464 except Exception as e:
465 logger.error(
466 "Error handling {} message ({}): {}".format(msg_type, response, e)
467 )
469 return None
471 def set_pubsub_push_handler(self, pubsub_push_handler_func):
472 """Set the pubsub push handler function"""
473 self.pubsub_push_handler_func = pubsub_push_handler_func
475 def set_invalidation_push_handler(self, invalidation_push_handler_func):
476 """Set the invalidation push handler function"""
477 self.invalidation_push_handler_func = invalidation_push_handler_func
479 def set_node_moving_push_handler(self, node_moving_push_handler_func):
480 self.node_moving_push_handler_func = node_moving_push_handler_func
482 def set_maintenance_push_handler(self, maintenance_push_handler_func):
483 self.maintenance_push_handler_func = maintenance_push_handler_func
485 def set_oss_cluster_maint_push_handler(self, oss_cluster_maint_push_handler_func):
486 self.oss_cluster_maint_push_handler_func = oss_cluster_maint_push_handler_func
489class _AsyncRESPBase(AsyncBaseParser):
490 """Base class for async resp parsing"""
492 __slots__ = AsyncBaseParser.__slots__ + ("encoder", "_buffer", "_pos", "_chunks")
494 def __init__(self, socket_read_size: int):
495 super().__init__(socket_read_size)
496 self.encoder: Optional[Encoder] = None
497 self._buffer = b""
498 self._chunks = []
499 self._pos = 0
501 def _clear(self):
502 self._buffer = b""
503 self._chunks.clear()
505 def on_connect(self, connection):
506 """Called when the stream connects"""
507 self._stream = connection._reader
508 if self._stream is None:
509 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
510 self.encoder = connection.encoder
511 self._clear()
512 self._connected = True
514 def on_disconnect(self):
515 """Called when the stream disconnects"""
516 self._connected = False
518 async def can_read_destructive(self) -> bool:
519 if not self._connected:
520 raise OSError("Buffer is closed.")
521 if self._buffer:
522 return True
523 try:
524 async with async_timeout(0):
525 return self._stream.at_eof()
526 except TimeoutError:
527 return False
529 async def _read(self, length: int) -> bytes:
530 """
531 Read `length` bytes of data. These are assumed to be followed
532 by a '\r\n' terminator which is subsequently discarded.
533 """
534 want = length + 2
535 end = self._pos + want
536 if len(self._buffer) >= end:
537 result = self._buffer[self._pos : end - 2]
538 else:
539 tail = self._buffer[self._pos :]
540 try:
541 data = await self._stream.readexactly(want - len(tail))
542 except IncompleteReadError as error:
543 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from error
544 result = (tail + data)[:-2]
545 self._chunks.append(data)
546 self._pos += want
547 return result
549 async def _readline(self) -> bytes:
550 """
551 read an unknown number of bytes up to the next '\r\n'
552 line separator, which is discarded.
553 """
554 found = self._buffer.find(b"\r\n", self._pos)
555 if found >= 0:
556 result = self._buffer[self._pos : found]
557 else:
558 tail = self._buffer[self._pos :]
559 data = await self._stream.readline()
560 if not data.endswith(b"\r\n"):
561 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
562 result = (tail + data)[:-2]
563 self._chunks.append(data)
564 self._pos += len(result) + 2
565 return result