Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/_parsers/base.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

272 statements  

1import logging 

2import sys 

3from abc import ABC 

4from asyncio import IncompleteReadError, StreamReader, TimeoutError 

5from typing import Awaitable, Callable, List, Optional, Protocol, Union 

6 

7from redis.maint_notifications import ( 

8 MaintenanceNotification, 

9 NodeFailedOverNotification, 

10 NodeFailingOverNotification, 

11 NodeMigratedNotification, 

12 NodeMigratingNotification, 

13 NodeMovingNotification, 

14 OSSNodeMigratedNotification, 

15 OSSNodeMigratingNotification, 

16) 

17from redis.utils import safe_str 

18 

19if sys.version_info.major >= 3 and sys.version_info.minor >= 11: 

20 from asyncio import timeout as async_timeout 

21else: 

22 from async_timeout import timeout as async_timeout 

23 

24from ..exceptions import ( 

25 AskError, 

26 AuthenticationError, 

27 AuthenticationWrongNumberOfArgsError, 

28 BusyLoadingError, 

29 ClusterCrossSlotError, 

30 ClusterDownError, 

31 ConnectionError, 

32 ExecAbortError, 

33 ExternalAuthProviderError, 

34 MasterDownError, 

35 ModuleError, 

36 MovedError, 

37 NoPermissionError, 

38 NoScriptError, 

39 OutOfMemoryError, 

40 ReadOnlyError, 

41 ResponseError, 

42 TryAgainError, 

43) 

44from ..typing import EncodableT 

45from .encoders import Encoder 

46from .socket import SERVER_CLOSED_CONNECTION_ERROR, SocketBuffer 

47 

48MODULE_LOAD_ERROR = "Error loading the extension. Please check the server logs." 

49NO_SUCH_MODULE_ERROR = "Error unloading module: no such module with that name" 

50MODULE_UNLOAD_NOT_POSSIBLE_ERROR = "Error unloading module: operation not possible." 

51MODULE_EXPORTS_DATA_TYPES_ERROR = ( 

52 "Error unloading module: the module " 

53 "exports one or more module-side data " 

54 "types, can't unload" 

55) 

56# user send an AUTH cmd to a server without authorization configured 

57NO_AUTH_SET_ERROR = { 

58 # Redis >= 6.0 

59 "AUTH <password> called without any password " 

60 "configured for the default user. Are you sure " 

61 "your configuration is correct?": AuthenticationError, 

62 # Redis < 6.0 

63 "Client sent AUTH, but no password is set": AuthenticationError, 

64} 

65 

66EXTERNAL_AUTH_PROVIDER_ERROR = { 

67 "problem with LDAP service": ExternalAuthProviderError, 

68} 

69 

70logger = logging.getLogger(__name__) 

71 

72 

73class BaseParser(ABC): 

74 EXCEPTION_CLASSES = { 

75 "ERR": { 

76 "max number of clients reached": ConnectionError, 

77 "invalid password": AuthenticationError, 

78 # some Redis server versions report invalid command syntax 

79 # in lowercase 

80 "wrong number of arguments " 

81 "for 'auth' command": AuthenticationWrongNumberOfArgsError, 

82 # some Redis server versions report invalid command syntax 

83 # in uppercase 

84 "wrong number of arguments " 

85 "for 'AUTH' command": AuthenticationWrongNumberOfArgsError, 

86 MODULE_LOAD_ERROR: ModuleError, 

87 MODULE_EXPORTS_DATA_TYPES_ERROR: ModuleError, 

88 NO_SUCH_MODULE_ERROR: ModuleError, 

89 MODULE_UNLOAD_NOT_POSSIBLE_ERROR: ModuleError, 

90 **NO_AUTH_SET_ERROR, 

91 **EXTERNAL_AUTH_PROVIDER_ERROR, 

92 }, 

93 "OOM": OutOfMemoryError, 

94 "WRONGPASS": AuthenticationError, 

95 "EXECABORT": ExecAbortError, 

96 "LOADING": BusyLoadingError, 

97 "NOSCRIPT": NoScriptError, 

98 "READONLY": ReadOnlyError, 

99 "NOAUTH": AuthenticationError, 

100 "NOPERM": NoPermissionError, 

101 "ASK": AskError, 

102 "TRYAGAIN": TryAgainError, 

103 "MOVED": MovedError, 

104 "CLUSTERDOWN": ClusterDownError, 

105 "CROSSSLOT": ClusterCrossSlotError, 

106 "MASTERDOWN": MasterDownError, 

107 } 

108 

109 @classmethod 

110 def parse_error(cls, response): 

111 "Parse an error response" 

112 error_code = response.split(" ")[0] 

113 if error_code in cls.EXCEPTION_CLASSES: 

114 response = response[len(error_code) + 1 :] 

115 exception_class = cls.EXCEPTION_CLASSES[error_code] 

116 if isinstance(exception_class, dict): 

117 exception_class = exception_class.get(response, ResponseError) 

118 return exception_class(response, status_code=error_code) 

119 return ResponseError(response) 

120 

121 def on_disconnect(self): 

122 raise NotImplementedError() 

123 

124 def on_connect(self, connection): 

125 raise NotImplementedError() 

126 

127 

128class _RESPBase(BaseParser): 

129 """Base class for sync-based resp parsing""" 

130 

131 def __init__(self, socket_read_size): 

132 self.socket_read_size = socket_read_size 

133 self.encoder = None 

134 self._sock = None 

135 self._buffer = None 

136 

137 def __del__(self): 

138 try: 

139 self.on_disconnect() 

140 except Exception: 

141 pass 

142 

143 def on_connect(self, connection): 

144 "Called when the socket connects" 

145 self._sock = connection._sock 

146 self._buffer = SocketBuffer( 

147 self._sock, self.socket_read_size, connection.socket_timeout 

148 ) 

149 self.encoder = connection.encoder 

150 

151 def on_disconnect(self): 

152 "Called when the socket disconnects" 

153 self._sock = None 

154 if self._buffer is not None: 

155 self._buffer.close() 

156 self._buffer = None 

157 self.encoder = None 

158 

159 def can_read(self, timeout): 

160 return self._buffer and self._buffer.can_read(timeout) 

161 

162 

163class AsyncBaseParser(BaseParser): 

164 """Base parsing class for the python-backed async parser""" 

165 

166 __slots__ = "_stream", "_read_size" 

167 

168 def __init__(self, socket_read_size: int): 

169 self._stream: Optional[StreamReader] = None 

170 self._read_size = socket_read_size 

171 

172 async def can_read_destructive(self) -> bool: 

173 raise NotImplementedError() 

174 

175 async def read_response( 

176 self, disable_decoding: bool = False 

177 ) -> Union[EncodableT, ResponseError, None, List[EncodableT]]: 

178 raise NotImplementedError() 

179 

180 

181class MaintenanceNotificationsParser: 

182 """Protocol defining maintenance push notification parsing functionality""" 

183 

184 @staticmethod 

185 def parse_oss_maintenance_start_msg(response): 

186 # Expected message format is: 

187 # SMIGRATING <seq_number> <slot, range1-range2,...> 

188 id = response[1] 

189 slots = safe_str(response[2]) 

190 return OSSNodeMigratingNotification(id, slots) 

191 

192 @staticmethod 

193 def parse_oss_maintenance_completed_msg(response): 

194 # Expected message format is: 

195 # SMIGRATED <seq_number> [[<src_host:port> <dest_host:port> <slot_range>], ...] 

196 id = response[1] 

197 nodes_to_slots_mapping_data = response[2] 

198 # Build the nodes_to_slots_mapping dict structure: 

199 # { 

200 # "src_host:port": [ 

201 # {"dest_host:port": "slot_range"}, 

202 # ... 

203 # ], 

204 # ... 

205 # } 

206 nodes_to_slots_mapping = {} 

207 for src_node, dest_node, slots in nodes_to_slots_mapping_data: 

208 src_node_str = safe_str(src_node) 

209 dest_node_str = safe_str(dest_node) 

210 slots_str = safe_str(slots) 

211 

212 if src_node_str not in nodes_to_slots_mapping: 

213 nodes_to_slots_mapping[src_node_str] = [] 

214 nodes_to_slots_mapping[src_node_str].append({dest_node_str: slots_str}) 

215 

216 return OSSNodeMigratedNotification(id, nodes_to_slots_mapping) 

217 

218 @staticmethod 

219 def parse_maintenance_start_msg(response, notification_type): 

220 # Expected message format is: <notification_type> <seq_number> <time> 

221 # Examples: 

222 # MIGRATING 1 10 

223 # FAILING_OVER 2 20 

224 id = response[1] 

225 ttl = response[2] 

226 return notification_type(id, ttl) 

227 

228 @staticmethod 

229 def parse_maintenance_completed_msg(response, notification_type): 

230 # Expected message format is: <notification_type> <seq_number> 

231 # Examples: 

232 # MIGRATED 1 

233 # FAILED_OVER 2 

234 id = response[1] 

235 return notification_type(id) 

236 

237 @staticmethod 

238 def parse_moving_msg(response): 

239 # Expected message format is: MOVING <seq_number> <time> <endpoint> 

240 id = response[1] 

241 ttl = response[2] 

242 if response[3] is None: 

243 host, port = None, None 

244 else: 

245 value = safe_str(response[3]) 

246 host, port = value.split(":") 

247 port = int(port) if port is not None else None 

248 

249 return NodeMovingNotification(id, host, port, ttl) 

250 

251 

252_INVALIDATION_MESSAGE = "invalidate" 

253_MOVING_MESSAGE = "MOVING" 

254_MIGRATING_MESSAGE = "MIGRATING" 

255_MIGRATED_MESSAGE = "MIGRATED" 

256_FAILING_OVER_MESSAGE = "FAILING_OVER" 

257_FAILED_OVER_MESSAGE = "FAILED_OVER" 

258_SMIGRATING_MESSAGE = "SMIGRATING" 

259_SMIGRATED_MESSAGE = "SMIGRATED" 

260 

261_MAINTENANCE_MESSAGES = ( 

262 _MIGRATING_MESSAGE, 

263 _MIGRATED_MESSAGE, 

264 _FAILING_OVER_MESSAGE, 

265 _FAILED_OVER_MESSAGE, 

266 _SMIGRATING_MESSAGE, 

267) 

268 

269MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING: dict[ 

270 str, tuple[type[MaintenanceNotification], Callable] 

271] = { 

272 _MIGRATING_MESSAGE: ( 

273 NodeMigratingNotification, 

274 MaintenanceNotificationsParser.parse_maintenance_start_msg, 

275 ), 

276 _MIGRATED_MESSAGE: ( 

277 NodeMigratedNotification, 

278 MaintenanceNotificationsParser.parse_maintenance_completed_msg, 

279 ), 

280 _FAILING_OVER_MESSAGE: ( 

281 NodeFailingOverNotification, 

282 MaintenanceNotificationsParser.parse_maintenance_start_msg, 

283 ), 

284 _FAILED_OVER_MESSAGE: ( 

285 NodeFailedOverNotification, 

286 MaintenanceNotificationsParser.parse_maintenance_completed_msg, 

287 ), 

288 _MOVING_MESSAGE: ( 

289 NodeMovingNotification, 

290 MaintenanceNotificationsParser.parse_moving_msg, 

291 ), 

292 _SMIGRATING_MESSAGE: ( 

293 OSSNodeMigratingNotification, 

294 MaintenanceNotificationsParser.parse_oss_maintenance_start_msg, 

295 ), 

296 _SMIGRATED_MESSAGE: ( 

297 OSSNodeMigratedNotification, 

298 MaintenanceNotificationsParser.parse_oss_maintenance_completed_msg, 

299 ), 

300} 

301 

302 

303class PushNotificationsParser(Protocol): 

304 """Protocol defining RESP3-specific parsing functionality""" 

305 

306 pubsub_push_handler_func: Callable 

307 invalidation_push_handler_func: Optional[Callable] = None 

308 node_moving_push_handler_func: Optional[Callable] = None 

309 maintenance_push_handler_func: Optional[Callable] = None 

310 oss_cluster_maint_push_handler_func: Optional[Callable] = None 

311 

312 def handle_pubsub_push_response(self, response): 

313 """Handle pubsub push responses""" 

314 raise NotImplementedError() 

315 

316 def handle_push_response(self, response, **kwargs): 

317 msg_type = response[0] 

318 if isinstance(msg_type, bytes): 

319 msg_type = msg_type.decode() 

320 

321 if msg_type not in ( 

322 _INVALIDATION_MESSAGE, 

323 *_MAINTENANCE_MESSAGES, 

324 _MOVING_MESSAGE, 

325 _SMIGRATED_MESSAGE, 

326 ): 

327 return self.pubsub_push_handler_func(response) 

328 

329 try: 

330 if ( 

331 msg_type == _INVALIDATION_MESSAGE 

332 and self.invalidation_push_handler_func 

333 ): 

334 return self.invalidation_push_handler_func(response) 

335 

336 if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func: 

337 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[ 

338 msg_type 

339 ][1] 

340 

341 notification = parser_function(response) 

342 return self.node_moving_push_handler_func(notification) 

343 

344 if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func: 

345 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[ 

346 msg_type 

347 ][1] 

348 if msg_type == _SMIGRATING_MESSAGE: 

349 notification = parser_function(response) 

350 else: 

351 notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[ 

352 msg_type 

353 ][0] 

354 notification = parser_function(response, notification_type) 

355 

356 if notification is not None: 

357 return self.maintenance_push_handler_func(notification) 

358 if msg_type == _SMIGRATED_MESSAGE and ( 

359 self.oss_cluster_maint_push_handler_func 

360 or self.maintenance_push_handler_func 

361 ): 

362 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[ 

363 msg_type 

364 ][1] 

365 notification = parser_function(response) 

366 

367 if notification is not None: 

368 if self.maintenance_push_handler_func: 

369 self.maintenance_push_handler_func(notification) 

370 if self.oss_cluster_maint_push_handler_func: 

371 self.oss_cluster_maint_push_handler_func(notification) 

372 except Exception as e: 

373 logger.error( 

374 "Error handling {} message ({}): {}".format(msg_type, response, e) 

375 ) 

376 

377 return None 

378 

379 def set_pubsub_push_handler(self, pubsub_push_handler_func): 

380 self.pubsub_push_handler_func = pubsub_push_handler_func 

381 

382 def set_invalidation_push_handler(self, invalidation_push_handler_func): 

383 self.invalidation_push_handler_func = invalidation_push_handler_func 

384 

385 def set_node_moving_push_handler(self, node_moving_push_handler_func): 

386 self.node_moving_push_handler_func = node_moving_push_handler_func 

387 

388 def set_maintenance_push_handler(self, maintenance_push_handler_func): 

389 self.maintenance_push_handler_func = maintenance_push_handler_func 

390 

391 def set_oss_cluster_maint_push_handler(self, oss_cluster_maint_push_handler_func): 

392 self.oss_cluster_maint_push_handler_func = oss_cluster_maint_push_handler_func 

393 

394 

395class AsyncPushNotificationsParser(Protocol): 

396 """Protocol defining async RESP3-specific parsing functionality""" 

397 

398 pubsub_push_handler_func: Callable 

399 invalidation_push_handler_func: Optional[Callable] = None 

400 node_moving_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None 

401 maintenance_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None 

402 oss_cluster_maint_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None 

403 

404 async def handle_pubsub_push_response(self, response): 

405 """Handle pubsub push responses asynchronously""" 

406 raise NotImplementedError() 

407 

408 async def handle_push_response(self, response, **kwargs): 

409 """Handle push responses asynchronously""" 

410 

411 msg_type = response[0] 

412 if isinstance(msg_type, bytes): 

413 msg_type = msg_type.decode() 

414 

415 if msg_type not in ( 

416 _INVALIDATION_MESSAGE, 

417 *_MAINTENANCE_MESSAGES, 

418 _MOVING_MESSAGE, 

419 _SMIGRATED_MESSAGE, 

420 ): 

421 return await self.pubsub_push_handler_func(response) 

422 

423 try: 

424 if ( 

425 msg_type == _INVALIDATION_MESSAGE 

426 and self.invalidation_push_handler_func 

427 ): 

428 return await self.invalidation_push_handler_func(response) 

429 

430 if isinstance(msg_type, bytes): 

431 msg_type = msg_type.decode() 

432 

433 if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func: 

434 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[ 

435 msg_type 

436 ][1] 

437 notification = parser_function(response) 

438 return await self.node_moving_push_handler_func(notification) 

439 

440 if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func: 

441 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[ 

442 msg_type 

443 ][1] 

444 if msg_type == _SMIGRATING_MESSAGE: 

445 notification = parser_function(response) 

446 else: 

447 notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[ 

448 msg_type 

449 ][0] 

450 notification = parser_function(response, notification_type) 

451 

452 if notification is not None: 

453 return await self.maintenance_push_handler_func(notification) 

454 if ( 

455 msg_type == _SMIGRATED_MESSAGE 

456 and self.oss_cluster_maint_push_handler_func 

457 ): 

458 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[ 

459 msg_type 

460 ][1] 

461 notification = parser_function(response) 

462 if notification is not None: 

463 return await self.oss_cluster_maint_push_handler_func(notification) 

464 except Exception as e: 

465 logger.error( 

466 "Error handling {} message ({}): {}".format(msg_type, response, e) 

467 ) 

468 

469 return None 

470 

471 def set_pubsub_push_handler(self, pubsub_push_handler_func): 

472 """Set the pubsub push handler function""" 

473 self.pubsub_push_handler_func = pubsub_push_handler_func 

474 

475 def set_invalidation_push_handler(self, invalidation_push_handler_func): 

476 """Set the invalidation push handler function""" 

477 self.invalidation_push_handler_func = invalidation_push_handler_func 

478 

479 def set_node_moving_push_handler(self, node_moving_push_handler_func): 

480 self.node_moving_push_handler_func = node_moving_push_handler_func 

481 

482 def set_maintenance_push_handler(self, maintenance_push_handler_func): 

483 self.maintenance_push_handler_func = maintenance_push_handler_func 

484 

485 def set_oss_cluster_maint_push_handler(self, oss_cluster_maint_push_handler_func): 

486 self.oss_cluster_maint_push_handler_func = oss_cluster_maint_push_handler_func 

487 

488 

489class _AsyncRESPBase(AsyncBaseParser): 

490 """Base class for async resp parsing""" 

491 

492 __slots__ = AsyncBaseParser.__slots__ + ("encoder", "_buffer", "_pos", "_chunks") 

493 

494 def __init__(self, socket_read_size: int): 

495 super().__init__(socket_read_size) 

496 self.encoder: Optional[Encoder] = None 

497 self._buffer = b"" 

498 self._chunks = [] 

499 self._pos = 0 

500 

501 def _clear(self): 

502 self._buffer = b"" 

503 self._chunks.clear() 

504 

505 def on_connect(self, connection): 

506 """Called when the stream connects""" 

507 self._stream = connection._reader 

508 if self._stream is None: 

509 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

510 self.encoder = connection.encoder 

511 self._clear() 

512 self._connected = True 

513 

514 def on_disconnect(self): 

515 """Called when the stream disconnects""" 

516 self._connected = False 

517 

518 async def can_read_destructive(self) -> bool: 

519 if not self._connected: 

520 raise OSError("Buffer is closed.") 

521 if self._buffer: 

522 return True 

523 try: 

524 async with async_timeout(0): 

525 return self._stream.at_eof() 

526 except TimeoutError: 

527 return False 

528 

529 async def _read(self, length: int) -> bytes: 

530 """ 

531 Read `length` bytes of data. These are assumed to be followed 

532 by a '\r\n' terminator which is subsequently discarded. 

533 """ 

534 want = length + 2 

535 end = self._pos + want 

536 if len(self._buffer) >= end: 

537 result = self._buffer[self._pos : end - 2] 

538 else: 

539 tail = self._buffer[self._pos :] 

540 try: 

541 data = await self._stream.readexactly(want - len(tail)) 

542 except IncompleteReadError as error: 

543 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from error 

544 result = (tail + data)[:-2] 

545 self._chunks.append(data) 

546 self._pos += want 

547 return result 

548 

549 async def _readline(self) -> bytes: 

550 """ 

551 read an unknown number of bytes up to the next '\r\n' 

552 line separator, which is discarded. 

553 """ 

554 found = self._buffer.find(b"\r\n", self._pos) 

555 if found >= 0: 

556 result = self._buffer[self._pos : found] 

557 else: 

558 tail = self._buffer[self._pos :] 

559 data = await self._stream.readline() 

560 if not data.endswith(b"\r\n"): 

561 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

562 result = (tail + data)[:-2] 

563 self._chunks.append(data) 

564 self._pos += len(result) + 2 

565 return result