Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/_parsers/base.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

230 statements  

1import logging 

2import sys 

3from abc import ABC 

4from asyncio import IncompleteReadError, StreamReader, TimeoutError 

5from typing import Awaitable, Callable, List, Optional, Protocol, Union 

6 

7from redis.maint_notifications import ( 

8 MaintenanceNotification, 

9 NodeFailedOverNotification, 

10 NodeFailingOverNotification, 

11 NodeMigratedNotification, 

12 NodeMigratingNotification, 

13 NodeMovingNotification, 

14) 

15 

16if sys.version_info.major >= 3 and sys.version_info.minor >= 11: 

17 from asyncio import timeout as async_timeout 

18else: 

19 from async_timeout import timeout as async_timeout 

20 

21from ..exceptions import ( 

22 AskError, 

23 AuthenticationError, 

24 AuthenticationWrongNumberOfArgsError, 

25 BusyLoadingError, 

26 ClusterCrossSlotError, 

27 ClusterDownError, 

28 ConnectionError, 

29 ExecAbortError, 

30 ExternalAuthProviderError, 

31 MasterDownError, 

32 ModuleError, 

33 MovedError, 

34 NoPermissionError, 

35 NoScriptError, 

36 OutOfMemoryError, 

37 ReadOnlyError, 

38 RedisError, 

39 ResponseError, 

40 TryAgainError, 

41) 

42from ..typing import EncodableT 

43from .encoders import Encoder 

44from .socket import SERVER_CLOSED_CONNECTION_ERROR, SocketBuffer 

45 

46MODULE_LOAD_ERROR = "Error loading the extension. Please check the server logs." 

47NO_SUCH_MODULE_ERROR = "Error unloading module: no such module with that name" 

48MODULE_UNLOAD_NOT_POSSIBLE_ERROR = "Error unloading module: operation not possible." 

49MODULE_EXPORTS_DATA_TYPES_ERROR = ( 

50 "Error unloading module: the module " 

51 "exports one or more module-side data " 

52 "types, can't unload" 

53) 

54# user send an AUTH cmd to a server without authorization configured 

55NO_AUTH_SET_ERROR = { 

56 # Redis >= 6.0 

57 "AUTH <password> called without any password " 

58 "configured for the default user. Are you sure " 

59 "your configuration is correct?": AuthenticationError, 

60 # Redis < 6.0 

61 "Client sent AUTH, but no password is set": AuthenticationError, 

62} 

63 

64EXTERNAL_AUTH_PROVIDER_ERROR = { 

65 "problem with LDAP service": ExternalAuthProviderError, 

66} 

67 

68logger = logging.getLogger(__name__) 

69 

70 

71class BaseParser(ABC): 

72 EXCEPTION_CLASSES = { 

73 "ERR": { 

74 "max number of clients reached": ConnectionError, 

75 "invalid password": AuthenticationError, 

76 # some Redis server versions report invalid command syntax 

77 # in lowercase 

78 "wrong number of arguments " 

79 "for 'auth' command": AuthenticationWrongNumberOfArgsError, 

80 # some Redis server versions report invalid command syntax 

81 # in uppercase 

82 "wrong number of arguments " 

83 "for 'AUTH' command": AuthenticationWrongNumberOfArgsError, 

84 MODULE_LOAD_ERROR: ModuleError, 

85 MODULE_EXPORTS_DATA_TYPES_ERROR: ModuleError, 

86 NO_SUCH_MODULE_ERROR: ModuleError, 

87 MODULE_UNLOAD_NOT_POSSIBLE_ERROR: ModuleError, 

88 **NO_AUTH_SET_ERROR, 

89 **EXTERNAL_AUTH_PROVIDER_ERROR, 

90 }, 

91 "OOM": OutOfMemoryError, 

92 "WRONGPASS": AuthenticationError, 

93 "EXECABORT": ExecAbortError, 

94 "LOADING": BusyLoadingError, 

95 "NOSCRIPT": NoScriptError, 

96 "READONLY": ReadOnlyError, 

97 "NOAUTH": AuthenticationError, 

98 "NOPERM": NoPermissionError, 

99 "ASK": AskError, 

100 "TRYAGAIN": TryAgainError, 

101 "MOVED": MovedError, 

102 "CLUSTERDOWN": ClusterDownError, 

103 "CROSSSLOT": ClusterCrossSlotError, 

104 "MASTERDOWN": MasterDownError, 

105 } 

106 

107 @classmethod 

108 def parse_error(cls, response): 

109 "Parse an error response" 

110 error_code = response.split(" ")[0] 

111 if error_code in cls.EXCEPTION_CLASSES: 

112 response = response[len(error_code) + 1 :] 

113 exception_class = cls.EXCEPTION_CLASSES[error_code] 

114 if isinstance(exception_class, dict): 

115 exception_class = exception_class.get(response, ResponseError) 

116 return exception_class(response) 

117 return ResponseError(response) 

118 

119 def on_disconnect(self): 

120 raise NotImplementedError() 

121 

122 def on_connect(self, connection): 

123 raise NotImplementedError() 

124 

125 

126class _RESPBase(BaseParser): 

127 """Base class for sync-based resp parsing""" 

128 

129 def __init__(self, socket_read_size): 

130 self.socket_read_size = socket_read_size 

131 self.encoder = None 

132 self._sock = None 

133 self._buffer = None 

134 

135 def __del__(self): 

136 try: 

137 self.on_disconnect() 

138 except Exception: 

139 pass 

140 

141 def on_connect(self, connection): 

142 "Called when the socket connects" 

143 self._sock = connection._sock 

144 self._buffer = SocketBuffer( 

145 self._sock, self.socket_read_size, connection.socket_timeout 

146 ) 

147 self.encoder = connection.encoder 

148 

149 def on_disconnect(self): 

150 "Called when the socket disconnects" 

151 self._sock = None 

152 if self._buffer is not None: 

153 self._buffer.close() 

154 self._buffer = None 

155 self.encoder = None 

156 

157 def can_read(self, timeout): 

158 return self._buffer and self._buffer.can_read(timeout) 

159 

160 

161class AsyncBaseParser(BaseParser): 

162 """Base parsing class for the python-backed async parser""" 

163 

164 __slots__ = "_stream", "_read_size" 

165 

166 def __init__(self, socket_read_size: int): 

167 self._stream: Optional[StreamReader] = None 

168 self._read_size = socket_read_size 

169 

170 async def can_read_destructive(self) -> bool: 

171 raise NotImplementedError() 

172 

173 async def read_response( 

174 self, disable_decoding: bool = False 

175 ) -> Union[EncodableT, ResponseError, None, List[EncodableT]]: 

176 raise NotImplementedError() 

177 

178 

179class MaintenanceNotificationsParser: 

180 """Protocol defining maintenance push notification parsing functionality""" 

181 

182 @staticmethod 

183 def parse_maintenance_start_msg(response, notification_type): 

184 # Expected message format is: <notification_type> <seq_number> <time> 

185 id = response[1] 

186 ttl = response[2] 

187 return notification_type(id, ttl) 

188 

189 @staticmethod 

190 def parse_maintenance_completed_msg(response, notification_type): 

191 # Expected message format is: <notification_type> <seq_number> 

192 id = response[1] 

193 return notification_type(id) 

194 

195 @staticmethod 

196 def parse_moving_msg(response): 

197 # Expected message format is: MOVING <seq_number> <time> <endpoint> 

198 id = response[1] 

199 ttl = response[2] 

200 if response[3] is None: 

201 host, port = None, None 

202 else: 

203 value = response[3] 

204 if isinstance(value, bytes): 

205 value = value.decode() 

206 host, port = value.split(":") 

207 port = int(port) if port is not None else None 

208 

209 return NodeMovingNotification(id, host, port, ttl) 

210 

211 

212_INVALIDATION_MESSAGE = "invalidate" 

213_MOVING_MESSAGE = "MOVING" 

214_MIGRATING_MESSAGE = "MIGRATING" 

215_MIGRATED_MESSAGE = "MIGRATED" 

216_FAILING_OVER_MESSAGE = "FAILING_OVER" 

217_FAILED_OVER_MESSAGE = "FAILED_OVER" 

218 

219_MAINTENANCE_MESSAGES = ( 

220 _MIGRATING_MESSAGE, 

221 _MIGRATED_MESSAGE, 

222 _FAILING_OVER_MESSAGE, 

223 _FAILED_OVER_MESSAGE, 

224) 

225 

226MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING: dict[ 

227 str, tuple[type[MaintenanceNotification], Callable] 

228] = { 

229 _MIGRATING_MESSAGE: ( 

230 NodeMigratingNotification, 

231 MaintenanceNotificationsParser.parse_maintenance_start_msg, 

232 ), 

233 _MIGRATED_MESSAGE: ( 

234 NodeMigratedNotification, 

235 MaintenanceNotificationsParser.parse_maintenance_completed_msg, 

236 ), 

237 _FAILING_OVER_MESSAGE: ( 

238 NodeFailingOverNotification, 

239 MaintenanceNotificationsParser.parse_maintenance_start_msg, 

240 ), 

241 _FAILED_OVER_MESSAGE: ( 

242 NodeFailedOverNotification, 

243 MaintenanceNotificationsParser.parse_maintenance_completed_msg, 

244 ), 

245 _MOVING_MESSAGE: ( 

246 NodeMovingNotification, 

247 MaintenanceNotificationsParser.parse_moving_msg, 

248 ), 

249} 

250 

251 

252class PushNotificationsParser(Protocol): 

253 """Protocol defining RESP3-specific parsing functionality""" 

254 

255 pubsub_push_handler_func: Callable 

256 invalidation_push_handler_func: Optional[Callable] = None 

257 node_moving_push_handler_func: Optional[Callable] = None 

258 maintenance_push_handler_func: Optional[Callable] = None 

259 

260 def handle_pubsub_push_response(self, response): 

261 """Handle pubsub push responses""" 

262 raise NotImplementedError() 

263 

264 def handle_push_response(self, response, **kwargs): 

265 msg_type = response[0] 

266 if isinstance(msg_type, bytes): 

267 msg_type = msg_type.decode() 

268 

269 if msg_type not in ( 

270 _INVALIDATION_MESSAGE, 

271 *_MAINTENANCE_MESSAGES, 

272 _MOVING_MESSAGE, 

273 ): 

274 return self.pubsub_push_handler_func(response) 

275 

276 try: 

277 if ( 

278 msg_type == _INVALIDATION_MESSAGE 

279 and self.invalidation_push_handler_func 

280 ): 

281 return self.invalidation_push_handler_func(response) 

282 

283 if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func: 

284 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[ 

285 msg_type 

286 ][1] 

287 

288 notification = parser_function(response) 

289 return self.node_moving_push_handler_func(notification) 

290 

291 if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func: 

292 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[ 

293 msg_type 

294 ][1] 

295 notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[ 

296 msg_type 

297 ][0] 

298 notification = parser_function(response, notification_type) 

299 

300 if notification is not None: 

301 return self.maintenance_push_handler_func(notification) 

302 except Exception as e: 

303 logger.error( 

304 "Error handling {} message ({}): {}".format(msg_type, response, e) 

305 ) 

306 

307 return None 

308 

309 def set_pubsub_push_handler(self, pubsub_push_handler_func): 

310 self.pubsub_push_handler_func = pubsub_push_handler_func 

311 

312 def set_invalidation_push_handler(self, invalidation_push_handler_func): 

313 self.invalidation_push_handler_func = invalidation_push_handler_func 

314 

315 def set_node_moving_push_handler(self, node_moving_push_handler_func): 

316 self.node_moving_push_handler_func = node_moving_push_handler_func 

317 

318 def set_maintenance_push_handler(self, maintenance_push_handler_func): 

319 self.maintenance_push_handler_func = maintenance_push_handler_func 

320 

321 

322class AsyncPushNotificationsParser(Protocol): 

323 """Protocol defining async RESP3-specific parsing functionality""" 

324 

325 pubsub_push_handler_func: Callable 

326 invalidation_push_handler_func: Optional[Callable] = None 

327 node_moving_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None 

328 maintenance_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None 

329 

330 async def handle_pubsub_push_response(self, response): 

331 """Handle pubsub push responses asynchronously""" 

332 raise NotImplementedError() 

333 

334 async def handle_push_response(self, response, **kwargs): 

335 """Handle push responses asynchronously""" 

336 

337 msg_type = response[0] 

338 if isinstance(msg_type, bytes): 

339 msg_type = msg_type.decode() 

340 

341 if msg_type not in ( 

342 _INVALIDATION_MESSAGE, 

343 *_MAINTENANCE_MESSAGES, 

344 _MOVING_MESSAGE, 

345 ): 

346 return await self.pubsub_push_handler_func(response) 

347 

348 try: 

349 if ( 

350 msg_type == _INVALIDATION_MESSAGE 

351 and self.invalidation_push_handler_func 

352 ): 

353 return await self.invalidation_push_handler_func(response) 

354 

355 if isinstance(msg_type, bytes): 

356 msg_type = msg_type.decode() 

357 

358 if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func: 

359 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[ 

360 msg_type 

361 ][1] 

362 notification = parser_function(response) 

363 return await self.node_moving_push_handler_func(notification) 

364 

365 if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func: 

366 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[ 

367 msg_type 

368 ][1] 

369 notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[ 

370 msg_type 

371 ][0] 

372 notification = parser_function(response, notification_type) 

373 

374 if notification is not None: 

375 return await self.maintenance_push_handler_func(notification) 

376 except Exception as e: 

377 logger.error( 

378 "Error handling {} message ({}): {}".format(msg_type, response, e) 

379 ) 

380 

381 return None 

382 

383 def set_pubsub_push_handler(self, pubsub_push_handler_func): 

384 """Set the pubsub push handler function""" 

385 self.pubsub_push_handler_func = pubsub_push_handler_func 

386 

387 def set_invalidation_push_handler(self, invalidation_push_handler_func): 

388 """Set the invalidation push handler function""" 

389 self.invalidation_push_handler_func = invalidation_push_handler_func 

390 

391 def set_node_moving_push_handler(self, node_moving_push_handler_func): 

392 self.node_moving_push_handler_func = node_moving_push_handler_func 

393 

394 def set_maintenance_push_handler(self, maintenance_push_handler_func): 

395 self.maintenance_push_handler_func = maintenance_push_handler_func 

396 

397 

398class _AsyncRESPBase(AsyncBaseParser): 

399 """Base class for async resp parsing""" 

400 

401 __slots__ = AsyncBaseParser.__slots__ + ("encoder", "_buffer", "_pos", "_chunks") 

402 

403 def __init__(self, socket_read_size: int): 

404 super().__init__(socket_read_size) 

405 self.encoder: Optional[Encoder] = None 

406 self._buffer = b"" 

407 self._chunks = [] 

408 self._pos = 0 

409 

410 def _clear(self): 

411 self._buffer = b"" 

412 self._chunks.clear() 

413 

414 def on_connect(self, connection): 

415 """Called when the stream connects""" 

416 self._stream = connection._reader 

417 if self._stream is None: 

418 raise RedisError("Buffer is closed.") 

419 self.encoder = connection.encoder 

420 self._clear() 

421 self._connected = True 

422 

423 def on_disconnect(self): 

424 """Called when the stream disconnects""" 

425 self._connected = False 

426 

427 async def can_read_destructive(self) -> bool: 

428 if not self._connected: 

429 raise RedisError("Buffer is closed.") 

430 if self._buffer: 

431 return True 

432 try: 

433 async with async_timeout(0): 

434 return self._stream.at_eof() 

435 except TimeoutError: 

436 return False 

437 

438 async def _read(self, length: int) -> bytes: 

439 """ 

440 Read `length` bytes of data. These are assumed to be followed 

441 by a '\r\n' terminator which is subsequently discarded. 

442 """ 

443 want = length + 2 

444 end = self._pos + want 

445 if len(self._buffer) >= end: 

446 result = self._buffer[self._pos : end - 2] 

447 else: 

448 tail = self._buffer[self._pos :] 

449 try: 

450 data = await self._stream.readexactly(want - len(tail)) 

451 except IncompleteReadError as error: 

452 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from error 

453 result = (tail + data)[:-2] 

454 self._chunks.append(data) 

455 self._pos += want 

456 return result 

457 

458 async def _readline(self) -> bytes: 

459 """ 

460 read an unknown number of bytes up to the next '\r\n' 

461 line separator, which is discarded. 

462 """ 

463 found = self._buffer.find(b"\r\n", self._pos) 

464 if found >= 0: 

465 result = self._buffer[self._pos : found] 

466 else: 

467 tail = self._buffer[self._pos :] 

468 data = await self._stream.readline() 

469 if not data.endswith(b"\r\n"): 

470 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

471 result = (tail + data)[:-2] 

472 self._chunks.append(data) 

473 self._pos += len(result) + 2 

474 return result