Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/_parsers/base.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

230 statements  

1import logging 

2import sys 

3from abc import ABC 

4from asyncio import IncompleteReadError, StreamReader, TimeoutError 

5from typing import Awaitable, Callable, List, Optional, Protocol, Union 

6 

7from redis.maint_notifications import ( 

8 MaintenanceNotification, 

9 NodeFailedOverNotification, 

10 NodeFailingOverNotification, 

11 NodeMigratedNotification, 

12 NodeMigratingNotification, 

13 NodeMovingNotification, 

14) 

15 

16if sys.version_info.major >= 3 and sys.version_info.minor >= 11: 

17 from asyncio import timeout as async_timeout 

18else: 

19 from async_timeout import timeout as async_timeout 

20 

21from ..exceptions import ( 

22 AskError, 

23 AuthenticationError, 

24 AuthenticationWrongNumberOfArgsError, 

25 BusyLoadingError, 

26 ClusterCrossSlotError, 

27 ClusterDownError, 

28 ConnectionError, 

29 ExecAbortError, 

30 ExternalAuthProviderError, 

31 MasterDownError, 

32 ModuleError, 

33 MovedError, 

34 NoPermissionError, 

35 NoScriptError, 

36 OutOfMemoryError, 

37 ReadOnlyError, 

38 ResponseError, 

39 TryAgainError, 

40) 

41from ..typing import EncodableT 

42from .encoders import Encoder 

43from .socket import SERVER_CLOSED_CONNECTION_ERROR, SocketBuffer 

44 

45MODULE_LOAD_ERROR = "Error loading the extension. Please check the server logs." 

46NO_SUCH_MODULE_ERROR = "Error unloading module: no such module with that name" 

47MODULE_UNLOAD_NOT_POSSIBLE_ERROR = "Error unloading module: operation not possible." 

48MODULE_EXPORTS_DATA_TYPES_ERROR = ( 

49 "Error unloading module: the module " 

50 "exports one or more module-side data " 

51 "types, can't unload" 

52) 

53# user send an AUTH cmd to a server without authorization configured 

54NO_AUTH_SET_ERROR = { 

55 # Redis >= 6.0 

56 "AUTH <password> called without any password " 

57 "configured for the default user. Are you sure " 

58 "your configuration is correct?": AuthenticationError, 

59 # Redis < 6.0 

60 "Client sent AUTH, but no password is set": AuthenticationError, 

61} 

62 

63EXTERNAL_AUTH_PROVIDER_ERROR = { 

64 "problem with LDAP service": ExternalAuthProviderError, 

65} 

66 

67logger = logging.getLogger(__name__) 

68 

69 

70class BaseParser(ABC): 

71 EXCEPTION_CLASSES = { 

72 "ERR": { 

73 "max number of clients reached": ConnectionError, 

74 "invalid password": AuthenticationError, 

75 # some Redis server versions report invalid command syntax 

76 # in lowercase 

77 "wrong number of arguments " 

78 "for 'auth' command": AuthenticationWrongNumberOfArgsError, 

79 # some Redis server versions report invalid command syntax 

80 # in uppercase 

81 "wrong number of arguments " 

82 "for 'AUTH' command": AuthenticationWrongNumberOfArgsError, 

83 MODULE_LOAD_ERROR: ModuleError, 

84 MODULE_EXPORTS_DATA_TYPES_ERROR: ModuleError, 

85 NO_SUCH_MODULE_ERROR: ModuleError, 

86 MODULE_UNLOAD_NOT_POSSIBLE_ERROR: ModuleError, 

87 **NO_AUTH_SET_ERROR, 

88 **EXTERNAL_AUTH_PROVIDER_ERROR, 

89 }, 

90 "OOM": OutOfMemoryError, 

91 "WRONGPASS": AuthenticationError, 

92 "EXECABORT": ExecAbortError, 

93 "LOADING": BusyLoadingError, 

94 "NOSCRIPT": NoScriptError, 

95 "READONLY": ReadOnlyError, 

96 "NOAUTH": AuthenticationError, 

97 "NOPERM": NoPermissionError, 

98 "ASK": AskError, 

99 "TRYAGAIN": TryAgainError, 

100 "MOVED": MovedError, 

101 "CLUSTERDOWN": ClusterDownError, 

102 "CROSSSLOT": ClusterCrossSlotError, 

103 "MASTERDOWN": MasterDownError, 

104 } 

105 

106 @classmethod 

107 def parse_error(cls, response): 

108 "Parse an error response" 

109 error_code = response.split(" ")[0] 

110 if error_code in cls.EXCEPTION_CLASSES: 

111 response = response[len(error_code) + 1 :] 

112 exception_class = cls.EXCEPTION_CLASSES[error_code] 

113 if isinstance(exception_class, dict): 

114 exception_class = exception_class.get(response, ResponseError) 

115 return exception_class(response) 

116 return ResponseError(response) 

117 

118 def on_disconnect(self): 

119 raise NotImplementedError() 

120 

121 def on_connect(self, connection): 

122 raise NotImplementedError() 

123 

124 

125class _RESPBase(BaseParser): 

126 """Base class for sync-based resp parsing""" 

127 

128 def __init__(self, socket_read_size): 

129 self.socket_read_size = socket_read_size 

130 self.encoder = None 

131 self._sock = None 

132 self._buffer = None 

133 

134 def __del__(self): 

135 try: 

136 self.on_disconnect() 

137 except Exception: 

138 pass 

139 

140 def on_connect(self, connection): 

141 "Called when the socket connects" 

142 self._sock = connection._sock 

143 self._buffer = SocketBuffer( 

144 self._sock, self.socket_read_size, connection.socket_timeout 

145 ) 

146 self.encoder = connection.encoder 

147 

148 def on_disconnect(self): 

149 "Called when the socket disconnects" 

150 self._sock = None 

151 if self._buffer is not None: 

152 self._buffer.close() 

153 self._buffer = None 

154 self.encoder = None 

155 

156 def can_read(self, timeout): 

157 return self._buffer and self._buffer.can_read(timeout) 

158 

159 

160class AsyncBaseParser(BaseParser): 

161 """Base parsing class for the python-backed async parser""" 

162 

163 __slots__ = "_stream", "_read_size" 

164 

165 def __init__(self, socket_read_size: int): 

166 self._stream: Optional[StreamReader] = None 

167 self._read_size = socket_read_size 

168 

169 async def can_read_destructive(self) -> bool: 

170 raise NotImplementedError() 

171 

172 async def read_response( 

173 self, disable_decoding: bool = False 

174 ) -> Union[EncodableT, ResponseError, None, List[EncodableT]]: 

175 raise NotImplementedError() 

176 

177 

178class MaintenanceNotificationsParser: 

179 """Protocol defining maintenance push notification parsing functionality""" 

180 

181 @staticmethod 

182 def parse_maintenance_start_msg(response, notification_type): 

183 # Expected message format is: <notification_type> <seq_number> <time> 

184 id = response[1] 

185 ttl = response[2] 

186 return notification_type(id, ttl) 

187 

188 @staticmethod 

189 def parse_maintenance_completed_msg(response, notification_type): 

190 # Expected message format is: <notification_type> <seq_number> 

191 id = response[1] 

192 return notification_type(id) 

193 

194 @staticmethod 

195 def parse_moving_msg(response): 

196 # Expected message format is: MOVING <seq_number> <time> <endpoint> 

197 id = response[1] 

198 ttl = response[2] 

199 if response[3] is None: 

200 host, port = None, None 

201 else: 

202 value = response[3] 

203 if isinstance(value, bytes): 

204 value = value.decode() 

205 host, port = value.split(":") 

206 port = int(port) if port is not None else None 

207 

208 return NodeMovingNotification(id, host, port, ttl) 

209 

210 

211_INVALIDATION_MESSAGE = "invalidate" 

212_MOVING_MESSAGE = "MOVING" 

213_MIGRATING_MESSAGE = "MIGRATING" 

214_MIGRATED_MESSAGE = "MIGRATED" 

215_FAILING_OVER_MESSAGE = "FAILING_OVER" 

216_FAILED_OVER_MESSAGE = "FAILED_OVER" 

217 

218_MAINTENANCE_MESSAGES = ( 

219 _MIGRATING_MESSAGE, 

220 _MIGRATED_MESSAGE, 

221 _FAILING_OVER_MESSAGE, 

222 _FAILED_OVER_MESSAGE, 

223) 

224 

225MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING: dict[ 

226 str, tuple[type[MaintenanceNotification], Callable] 

227] = { 

228 _MIGRATING_MESSAGE: ( 

229 NodeMigratingNotification, 

230 MaintenanceNotificationsParser.parse_maintenance_start_msg, 

231 ), 

232 _MIGRATED_MESSAGE: ( 

233 NodeMigratedNotification, 

234 MaintenanceNotificationsParser.parse_maintenance_completed_msg, 

235 ), 

236 _FAILING_OVER_MESSAGE: ( 

237 NodeFailingOverNotification, 

238 MaintenanceNotificationsParser.parse_maintenance_start_msg, 

239 ), 

240 _FAILED_OVER_MESSAGE: ( 

241 NodeFailedOverNotification, 

242 MaintenanceNotificationsParser.parse_maintenance_completed_msg, 

243 ), 

244 _MOVING_MESSAGE: ( 

245 NodeMovingNotification, 

246 MaintenanceNotificationsParser.parse_moving_msg, 

247 ), 

248} 

249 

250 

251class PushNotificationsParser(Protocol): 

252 """Protocol defining RESP3-specific parsing functionality""" 

253 

254 pubsub_push_handler_func: Callable 

255 invalidation_push_handler_func: Optional[Callable] = None 

256 node_moving_push_handler_func: Optional[Callable] = None 

257 maintenance_push_handler_func: Optional[Callable] = None 

258 

259 def handle_pubsub_push_response(self, response): 

260 """Handle pubsub push responses""" 

261 raise NotImplementedError() 

262 

263 def handle_push_response(self, response, **kwargs): 

264 msg_type = response[0] 

265 if isinstance(msg_type, bytes): 

266 msg_type = msg_type.decode() 

267 

268 if msg_type not in ( 

269 _INVALIDATION_MESSAGE, 

270 *_MAINTENANCE_MESSAGES, 

271 _MOVING_MESSAGE, 

272 ): 

273 return self.pubsub_push_handler_func(response) 

274 

275 try: 

276 if ( 

277 msg_type == _INVALIDATION_MESSAGE 

278 and self.invalidation_push_handler_func 

279 ): 

280 return self.invalidation_push_handler_func(response) 

281 

282 if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func: 

283 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[ 

284 msg_type 

285 ][1] 

286 

287 notification = parser_function(response) 

288 return self.node_moving_push_handler_func(notification) 

289 

290 if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func: 

291 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[ 

292 msg_type 

293 ][1] 

294 notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[ 

295 msg_type 

296 ][0] 

297 notification = parser_function(response, notification_type) 

298 

299 if notification is not None: 

300 return self.maintenance_push_handler_func(notification) 

301 except Exception as e: 

302 logger.error( 

303 "Error handling {} message ({}): {}".format(msg_type, response, e) 

304 ) 

305 

306 return None 

307 

308 def set_pubsub_push_handler(self, pubsub_push_handler_func): 

309 self.pubsub_push_handler_func = pubsub_push_handler_func 

310 

311 def set_invalidation_push_handler(self, invalidation_push_handler_func): 

312 self.invalidation_push_handler_func = invalidation_push_handler_func 

313 

314 def set_node_moving_push_handler(self, node_moving_push_handler_func): 

315 self.node_moving_push_handler_func = node_moving_push_handler_func 

316 

317 def set_maintenance_push_handler(self, maintenance_push_handler_func): 

318 self.maintenance_push_handler_func = maintenance_push_handler_func 

319 

320 

321class AsyncPushNotificationsParser(Protocol): 

322 """Protocol defining async RESP3-specific parsing functionality""" 

323 

324 pubsub_push_handler_func: Callable 

325 invalidation_push_handler_func: Optional[Callable] = None 

326 node_moving_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None 

327 maintenance_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None 

328 

329 async def handle_pubsub_push_response(self, response): 

330 """Handle pubsub push responses asynchronously""" 

331 raise NotImplementedError() 

332 

333 async def handle_push_response(self, response, **kwargs): 

334 """Handle push responses asynchronously""" 

335 

336 msg_type = response[0] 

337 if isinstance(msg_type, bytes): 

338 msg_type = msg_type.decode() 

339 

340 if msg_type not in ( 

341 _INVALIDATION_MESSAGE, 

342 *_MAINTENANCE_MESSAGES, 

343 _MOVING_MESSAGE, 

344 ): 

345 return await self.pubsub_push_handler_func(response) 

346 

347 try: 

348 if ( 

349 msg_type == _INVALIDATION_MESSAGE 

350 and self.invalidation_push_handler_func 

351 ): 

352 return await self.invalidation_push_handler_func(response) 

353 

354 if isinstance(msg_type, bytes): 

355 msg_type = msg_type.decode() 

356 

357 if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func: 

358 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[ 

359 msg_type 

360 ][1] 

361 notification = parser_function(response) 

362 return await self.node_moving_push_handler_func(notification) 

363 

364 if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func: 

365 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[ 

366 msg_type 

367 ][1] 

368 notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[ 

369 msg_type 

370 ][0] 

371 notification = parser_function(response, notification_type) 

372 

373 if notification is not None: 

374 return await self.maintenance_push_handler_func(notification) 

375 except Exception as e: 

376 logger.error( 

377 "Error handling {} message ({}): {}".format(msg_type, response, e) 

378 ) 

379 

380 return None 

381 

382 def set_pubsub_push_handler(self, pubsub_push_handler_func): 

383 """Set the pubsub push handler function""" 

384 self.pubsub_push_handler_func = pubsub_push_handler_func 

385 

386 def set_invalidation_push_handler(self, invalidation_push_handler_func): 

387 """Set the invalidation push handler function""" 

388 self.invalidation_push_handler_func = invalidation_push_handler_func 

389 

390 def set_node_moving_push_handler(self, node_moving_push_handler_func): 

391 self.node_moving_push_handler_func = node_moving_push_handler_func 

392 

393 def set_maintenance_push_handler(self, maintenance_push_handler_func): 

394 self.maintenance_push_handler_func = maintenance_push_handler_func 

395 

396 

397class _AsyncRESPBase(AsyncBaseParser): 

398 """Base class for async resp parsing""" 

399 

400 __slots__ = AsyncBaseParser.__slots__ + ("encoder", "_buffer", "_pos", "_chunks") 

401 

402 def __init__(self, socket_read_size: int): 

403 super().__init__(socket_read_size) 

404 self.encoder: Optional[Encoder] = None 

405 self._buffer = b"" 

406 self._chunks = [] 

407 self._pos = 0 

408 

409 def _clear(self): 

410 self._buffer = b"" 

411 self._chunks.clear() 

412 

413 def on_connect(self, connection): 

414 """Called when the stream connects""" 

415 self._stream = connection._reader 

416 if self._stream is None: 

417 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

418 self.encoder = connection.encoder 

419 self._clear() 

420 self._connected = True 

421 

422 def on_disconnect(self): 

423 """Called when the stream disconnects""" 

424 self._connected = False 

425 

426 async def can_read_destructive(self) -> bool: 

427 if not self._connected: 

428 raise OSError("Buffer is closed.") 

429 if self._buffer: 

430 return True 

431 try: 

432 async with async_timeout(0): 

433 return self._stream.at_eof() 

434 except TimeoutError: 

435 return False 

436 

437 async def _read(self, length: int) -> bytes: 

438 """ 

439 Read `length` bytes of data. These are assumed to be followed 

440 by a '\r\n' terminator which is subsequently discarded. 

441 """ 

442 want = length + 2 

443 end = self._pos + want 

444 if len(self._buffer) >= end: 

445 result = self._buffer[self._pos : end - 2] 

446 else: 

447 tail = self._buffer[self._pos :] 

448 try: 

449 data = await self._stream.readexactly(want - len(tail)) 

450 except IncompleteReadError as error: 

451 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from error 

452 result = (tail + data)[:-2] 

453 self._chunks.append(data) 

454 self._pos += want 

455 return result 

456 

457 async def _readline(self) -> bytes: 

458 """ 

459 read an unknown number of bytes up to the next '\r\n' 

460 line separator, which is discarded. 

461 """ 

462 found = self._buffer.find(b"\r\n", self._pos) 

463 if found >= 0: 

464 result = self._buffer[self._pos : found] 

465 else: 

466 tail = self._buffer[self._pos :] 

467 data = await self._stream.readline() 

468 if not data.endswith(b"\r\n"): 

469 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

470 result = (tail + data)[:-2] 

471 self._chunks.append(data) 

472 self._pos += len(result) + 2 

473 return result