Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/_parsers/base.py: 36%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import logging
2import sys
3from abc import ABC
4from asyncio import IncompleteReadError, StreamReader, TimeoutError
5from typing import Awaitable, Callable, List, Optional, Protocol, Union
7from redis.maint_notifications import (
8 MaintenanceNotification,
9 NodeFailedOverNotification,
10 NodeFailingOverNotification,
11 NodeMigratedNotification,
12 NodeMigratingNotification,
13 NodeMovingNotification,
14)
16if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
17 from asyncio import timeout as async_timeout
18else:
19 from async_timeout import timeout as async_timeout
21from ..exceptions import (
22 AskError,
23 AuthenticationError,
24 AuthenticationWrongNumberOfArgsError,
25 BusyLoadingError,
26 ClusterCrossSlotError,
27 ClusterDownError,
28 ConnectionError,
29 ExecAbortError,
30 ExternalAuthProviderError,
31 MasterDownError,
32 ModuleError,
33 MovedError,
34 NoPermissionError,
35 NoScriptError,
36 OutOfMemoryError,
37 ReadOnlyError,
38 ResponseError,
39 TryAgainError,
40)
41from ..typing import EncodableT
42from .encoders import Encoder
43from .socket import SERVER_CLOSED_CONNECTION_ERROR, SocketBuffer
45MODULE_LOAD_ERROR = "Error loading the extension. Please check the server logs."
46NO_SUCH_MODULE_ERROR = "Error unloading module: no such module with that name"
47MODULE_UNLOAD_NOT_POSSIBLE_ERROR = "Error unloading module: operation not possible."
48MODULE_EXPORTS_DATA_TYPES_ERROR = (
49 "Error unloading module: the module "
50 "exports one or more module-side data "
51 "types, can't unload"
52)
53# user send an AUTH cmd to a server without authorization configured
54NO_AUTH_SET_ERROR = {
55 # Redis >= 6.0
56 "AUTH <password> called without any password "
57 "configured for the default user. Are you sure "
58 "your configuration is correct?": AuthenticationError,
59 # Redis < 6.0
60 "Client sent AUTH, but no password is set": AuthenticationError,
61}
63EXTERNAL_AUTH_PROVIDER_ERROR = {
64 "problem with LDAP service": ExternalAuthProviderError,
65}
67logger = logging.getLogger(__name__)
70class BaseParser(ABC):
71 EXCEPTION_CLASSES = {
72 "ERR": {
73 "max number of clients reached": ConnectionError,
74 "invalid password": AuthenticationError,
75 # some Redis server versions report invalid command syntax
76 # in lowercase
77 "wrong number of arguments "
78 "for 'auth' command": AuthenticationWrongNumberOfArgsError,
79 # some Redis server versions report invalid command syntax
80 # in uppercase
81 "wrong number of arguments "
82 "for 'AUTH' command": AuthenticationWrongNumberOfArgsError,
83 MODULE_LOAD_ERROR: ModuleError,
84 MODULE_EXPORTS_DATA_TYPES_ERROR: ModuleError,
85 NO_SUCH_MODULE_ERROR: ModuleError,
86 MODULE_UNLOAD_NOT_POSSIBLE_ERROR: ModuleError,
87 **NO_AUTH_SET_ERROR,
88 **EXTERNAL_AUTH_PROVIDER_ERROR,
89 },
90 "OOM": OutOfMemoryError,
91 "WRONGPASS": AuthenticationError,
92 "EXECABORT": ExecAbortError,
93 "LOADING": BusyLoadingError,
94 "NOSCRIPT": NoScriptError,
95 "READONLY": ReadOnlyError,
96 "NOAUTH": AuthenticationError,
97 "NOPERM": NoPermissionError,
98 "ASK": AskError,
99 "TRYAGAIN": TryAgainError,
100 "MOVED": MovedError,
101 "CLUSTERDOWN": ClusterDownError,
102 "CROSSSLOT": ClusterCrossSlotError,
103 "MASTERDOWN": MasterDownError,
104 }
106 @classmethod
107 def parse_error(cls, response):
108 "Parse an error response"
109 error_code = response.split(" ")[0]
110 if error_code in cls.EXCEPTION_CLASSES:
111 response = response[len(error_code) + 1 :]
112 exception_class = cls.EXCEPTION_CLASSES[error_code]
113 if isinstance(exception_class, dict):
114 exception_class = exception_class.get(response, ResponseError)
115 return exception_class(response)
116 return ResponseError(response)
118 def on_disconnect(self):
119 raise NotImplementedError()
121 def on_connect(self, connection):
122 raise NotImplementedError()
125class _RESPBase(BaseParser):
126 """Base class for sync-based resp parsing"""
128 def __init__(self, socket_read_size):
129 self.socket_read_size = socket_read_size
130 self.encoder = None
131 self._sock = None
132 self._buffer = None
134 def __del__(self):
135 try:
136 self.on_disconnect()
137 except Exception:
138 pass
140 def on_connect(self, connection):
141 "Called when the socket connects"
142 self._sock = connection._sock
143 self._buffer = SocketBuffer(
144 self._sock, self.socket_read_size, connection.socket_timeout
145 )
146 self.encoder = connection.encoder
148 def on_disconnect(self):
149 "Called when the socket disconnects"
150 self._sock = None
151 if self._buffer is not None:
152 self._buffer.close()
153 self._buffer = None
154 self.encoder = None
156 def can_read(self, timeout):
157 return self._buffer and self._buffer.can_read(timeout)
160class AsyncBaseParser(BaseParser):
161 """Base parsing class for the python-backed async parser"""
163 __slots__ = "_stream", "_read_size"
165 def __init__(self, socket_read_size: int):
166 self._stream: Optional[StreamReader] = None
167 self._read_size = socket_read_size
169 async def can_read_destructive(self) -> bool:
170 raise NotImplementedError()
172 async def read_response(
173 self, disable_decoding: bool = False
174 ) -> Union[EncodableT, ResponseError, None, List[EncodableT]]:
175 raise NotImplementedError()
178class MaintenanceNotificationsParser:
179 """Protocol defining maintenance push notification parsing functionality"""
181 @staticmethod
182 def parse_maintenance_start_msg(response, notification_type):
183 # Expected message format is: <notification_type> <seq_number> <time>
184 id = response[1]
185 ttl = response[2]
186 return notification_type(id, ttl)
188 @staticmethod
189 def parse_maintenance_completed_msg(response, notification_type):
190 # Expected message format is: <notification_type> <seq_number>
191 id = response[1]
192 return notification_type(id)
194 @staticmethod
195 def parse_moving_msg(response):
196 # Expected message format is: MOVING <seq_number> <time> <endpoint>
197 id = response[1]
198 ttl = response[2]
199 if response[3] is None:
200 host, port = None, None
201 else:
202 value = response[3]
203 if isinstance(value, bytes):
204 value = value.decode()
205 host, port = value.split(":")
206 port = int(port) if port is not None else None
208 return NodeMovingNotification(id, host, port, ttl)
211_INVALIDATION_MESSAGE = "invalidate"
212_MOVING_MESSAGE = "MOVING"
213_MIGRATING_MESSAGE = "MIGRATING"
214_MIGRATED_MESSAGE = "MIGRATED"
215_FAILING_OVER_MESSAGE = "FAILING_OVER"
216_FAILED_OVER_MESSAGE = "FAILED_OVER"
218_MAINTENANCE_MESSAGES = (
219 _MIGRATING_MESSAGE,
220 _MIGRATED_MESSAGE,
221 _FAILING_OVER_MESSAGE,
222 _FAILED_OVER_MESSAGE,
223)
225MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING: dict[
226 str, tuple[type[MaintenanceNotification], Callable]
227] = {
228 _MIGRATING_MESSAGE: (
229 NodeMigratingNotification,
230 MaintenanceNotificationsParser.parse_maintenance_start_msg,
231 ),
232 _MIGRATED_MESSAGE: (
233 NodeMigratedNotification,
234 MaintenanceNotificationsParser.parse_maintenance_completed_msg,
235 ),
236 _FAILING_OVER_MESSAGE: (
237 NodeFailingOverNotification,
238 MaintenanceNotificationsParser.parse_maintenance_start_msg,
239 ),
240 _FAILED_OVER_MESSAGE: (
241 NodeFailedOverNotification,
242 MaintenanceNotificationsParser.parse_maintenance_completed_msg,
243 ),
244 _MOVING_MESSAGE: (
245 NodeMovingNotification,
246 MaintenanceNotificationsParser.parse_moving_msg,
247 ),
248}
251class PushNotificationsParser(Protocol):
252 """Protocol defining RESP3-specific parsing functionality"""
254 pubsub_push_handler_func: Callable
255 invalidation_push_handler_func: Optional[Callable] = None
256 node_moving_push_handler_func: Optional[Callable] = None
257 maintenance_push_handler_func: Optional[Callable] = None
259 def handle_pubsub_push_response(self, response):
260 """Handle pubsub push responses"""
261 raise NotImplementedError()
263 def handle_push_response(self, response, **kwargs):
264 msg_type = response[0]
265 if isinstance(msg_type, bytes):
266 msg_type = msg_type.decode()
268 if msg_type not in (
269 _INVALIDATION_MESSAGE,
270 *_MAINTENANCE_MESSAGES,
271 _MOVING_MESSAGE,
272 ):
273 return self.pubsub_push_handler_func(response)
275 try:
276 if (
277 msg_type == _INVALIDATION_MESSAGE
278 and self.invalidation_push_handler_func
279 ):
280 return self.invalidation_push_handler_func(response)
282 if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func:
283 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
284 msg_type
285 ][1]
287 notification = parser_function(response)
288 return self.node_moving_push_handler_func(notification)
290 if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
291 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
292 msg_type
293 ][1]
294 notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
295 msg_type
296 ][0]
297 notification = parser_function(response, notification_type)
299 if notification is not None:
300 return self.maintenance_push_handler_func(notification)
301 except Exception as e:
302 logger.error(
303 "Error handling {} message ({}): {}".format(msg_type, response, e)
304 )
306 return None
308 def set_pubsub_push_handler(self, pubsub_push_handler_func):
309 self.pubsub_push_handler_func = pubsub_push_handler_func
311 def set_invalidation_push_handler(self, invalidation_push_handler_func):
312 self.invalidation_push_handler_func = invalidation_push_handler_func
314 def set_node_moving_push_handler(self, node_moving_push_handler_func):
315 self.node_moving_push_handler_func = node_moving_push_handler_func
317 def set_maintenance_push_handler(self, maintenance_push_handler_func):
318 self.maintenance_push_handler_func = maintenance_push_handler_func
321class AsyncPushNotificationsParser(Protocol):
322 """Protocol defining async RESP3-specific parsing functionality"""
324 pubsub_push_handler_func: Callable
325 invalidation_push_handler_func: Optional[Callable] = None
326 node_moving_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
327 maintenance_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
329 async def handle_pubsub_push_response(self, response):
330 """Handle pubsub push responses asynchronously"""
331 raise NotImplementedError()
333 async def handle_push_response(self, response, **kwargs):
334 """Handle push responses asynchronously"""
336 msg_type = response[0]
337 if isinstance(msg_type, bytes):
338 msg_type = msg_type.decode()
340 if msg_type not in (
341 _INVALIDATION_MESSAGE,
342 *_MAINTENANCE_MESSAGES,
343 _MOVING_MESSAGE,
344 ):
345 return await self.pubsub_push_handler_func(response)
347 try:
348 if (
349 msg_type == _INVALIDATION_MESSAGE
350 and self.invalidation_push_handler_func
351 ):
352 return await self.invalidation_push_handler_func(response)
354 if isinstance(msg_type, bytes):
355 msg_type = msg_type.decode()
357 if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func:
358 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
359 msg_type
360 ][1]
361 notification = parser_function(response)
362 return await self.node_moving_push_handler_func(notification)
364 if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
365 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
366 msg_type
367 ][1]
368 notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
369 msg_type
370 ][0]
371 notification = parser_function(response, notification_type)
373 if notification is not None:
374 return await self.maintenance_push_handler_func(notification)
375 except Exception as e:
376 logger.error(
377 "Error handling {} message ({}): {}".format(msg_type, response, e)
378 )
380 return None
382 def set_pubsub_push_handler(self, pubsub_push_handler_func):
383 """Set the pubsub push handler function"""
384 self.pubsub_push_handler_func = pubsub_push_handler_func
386 def set_invalidation_push_handler(self, invalidation_push_handler_func):
387 """Set the invalidation push handler function"""
388 self.invalidation_push_handler_func = invalidation_push_handler_func
390 def set_node_moving_push_handler(self, node_moving_push_handler_func):
391 self.node_moving_push_handler_func = node_moving_push_handler_func
393 def set_maintenance_push_handler(self, maintenance_push_handler_func):
394 self.maintenance_push_handler_func = maintenance_push_handler_func
397class _AsyncRESPBase(AsyncBaseParser):
398 """Base class for async resp parsing"""
400 __slots__ = AsyncBaseParser.__slots__ + ("encoder", "_buffer", "_pos", "_chunks")
402 def __init__(self, socket_read_size: int):
403 super().__init__(socket_read_size)
404 self.encoder: Optional[Encoder] = None
405 self._buffer = b""
406 self._chunks = []
407 self._pos = 0
409 def _clear(self):
410 self._buffer = b""
411 self._chunks.clear()
413 def on_connect(self, connection):
414 """Called when the stream connects"""
415 self._stream = connection._reader
416 if self._stream is None:
417 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
418 self.encoder = connection.encoder
419 self._clear()
420 self._connected = True
422 def on_disconnect(self):
423 """Called when the stream disconnects"""
424 self._connected = False
426 async def can_read_destructive(self) -> bool:
427 if not self._connected:
428 raise OSError("Buffer is closed.")
429 if self._buffer:
430 return True
431 try:
432 async with async_timeout(0):
433 return self._stream.at_eof()
434 except TimeoutError:
435 return False
437 async def _read(self, length: int) -> bytes:
438 """
439 Read `length` bytes of data. These are assumed to be followed
440 by a '\r\n' terminator which is subsequently discarded.
441 """
442 want = length + 2
443 end = self._pos + want
444 if len(self._buffer) >= end:
445 result = self._buffer[self._pos : end - 2]
446 else:
447 tail = self._buffer[self._pos :]
448 try:
449 data = await self._stream.readexactly(want - len(tail))
450 except IncompleteReadError as error:
451 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from error
452 result = (tail + data)[:-2]
453 self._chunks.append(data)
454 self._pos += want
455 return result
457 async def _readline(self) -> bytes:
458 """
459 read an unknown number of bytes up to the next '\r\n'
460 line separator, which is discarded.
461 """
462 found = self._buffer.find(b"\r\n", self._pos)
463 if found >= 0:
464 result = self._buffer[self._pos : found]
465 else:
466 tail = self._buffer[self._pos :]
467 data = await self._stream.readline()
468 if not data.endswith(b"\r\n"):
469 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
470 result = (tail + data)[:-2]
471 self._chunks.append(data)
472 self._pos += len(result) + 2
473 return result