Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/event.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

231 statements  

1import asyncio 

2import threading 

3from abc import ABC, abstractmethod 

4from enum import Enum 

5from typing import Dict, List, Optional, Type, Union 

6 

7from redis.auth.token import TokenInterface 

8from redis.credentials import CredentialProvider, StreamingCredentialProvider 

9from redis.utils import check_protocol_version 

10 

11 

12class EventListenerInterface(ABC): 

13 """ 

14 Represents a listener for given event object. 

15 """ 

16 

17 @abstractmethod 

18 def listen(self, event: object): 

19 pass 

20 

21 

22class AsyncEventListenerInterface(ABC): 

23 """ 

24 Represents an async listener for given event object. 

25 """ 

26 

27 @abstractmethod 

28 async def listen(self, event: object): 

29 pass 

30 

31 

32class EventDispatcherInterface(ABC): 

33 """ 

34 Represents a dispatcher that dispatches events to listeners 

35 associated with given event. 

36 """ 

37 

38 @abstractmethod 

39 def dispatch(self, event: object): 

40 pass 

41 

42 @abstractmethod 

43 async def dispatch_async(self, event: object): 

44 pass 

45 

46 @abstractmethod 

47 def register_listeners( 

48 self, 

49 mappings: Dict[ 

50 Type[object], 

51 List[Union[EventListenerInterface, AsyncEventListenerInterface]], 

52 ], 

53 ): 

54 """Register additional listeners.""" 

55 pass 

56 

57 

58class EventException(Exception): 

59 """ 

60 Exception wrapper that adds an event object into exception context. 

61 """ 

62 

63 def __init__(self, exception: Exception, event: object): 

64 self.exception = exception 

65 self.event = event 

66 super().__init__(exception) 

67 

68 

69class EventDispatcher(EventDispatcherInterface): 

70 # TODO: Make dispatcher to accept external mappings. 

71 def __init__( 

72 self, 

73 event_listeners: Optional[ 

74 Dict[Type[object], List[EventListenerInterface]] 

75 ] = None, 

76 ): 

77 """ 

78 Dispatcher that dispatches events to listeners associated with given event. 

79 """ 

80 self._event_listeners_mapping: Dict[ 

81 Type[object], List[EventListenerInterface] 

82 ] = { 

83 AfterConnectionReleasedEvent: [ 

84 ReAuthConnectionListener(), 

85 ], 

86 AfterPooledConnectionsInstantiationEvent: [ 

87 RegisterReAuthForPooledConnections() 

88 ], 

89 AfterSingleConnectionInstantiationEvent: [ 

90 RegisterReAuthForSingleConnection() 

91 ], 

92 AfterPubSubConnectionInstantiationEvent: [RegisterReAuthForPubSub()], 

93 AfterAsyncClusterInstantiationEvent: [RegisterReAuthForAsyncClusterNodes()], 

94 AsyncAfterConnectionReleasedEvent: [ 

95 AsyncReAuthConnectionListener(), 

96 ], 

97 } 

98 

99 self._lock = threading.Lock() 

100 self._async_lock = None 

101 

102 if event_listeners: 

103 self.register_listeners(event_listeners) 

104 

105 def dispatch(self, event: object): 

106 with self._lock: 

107 listeners = self._event_listeners_mapping.get(type(event), []) 

108 

109 for listener in listeners: 

110 listener.listen(event) 

111 

112 async def dispatch_async(self, event: object): 

113 if self._async_lock is None: 

114 self._async_lock = asyncio.Lock() 

115 

116 async with self._async_lock: 

117 listeners = self._event_listeners_mapping.get(type(event), []) 

118 

119 for listener in listeners: 

120 await listener.listen(event) 

121 

122 def register_listeners( 

123 self, 

124 mappings: Dict[ 

125 Type[object], 

126 List[Union[EventListenerInterface, AsyncEventListenerInterface]], 

127 ], 

128 ): 

129 with self._lock: 

130 for event_type in mappings: 

131 if event_type in self._event_listeners_mapping: 

132 self._event_listeners_mapping[event_type] = list( 

133 set( 

134 self._event_listeners_mapping[event_type] 

135 + mappings[event_type] 

136 ) 

137 ) 

138 else: 

139 self._event_listeners_mapping[event_type] = mappings[event_type] 

140 

141 

142class AfterConnectionReleasedEvent: 

143 """ 

144 Event that will be fired before each command execution. 

145 """ 

146 

147 def __init__(self, connection): 

148 self._connection = connection 

149 

150 @property 

151 def connection(self): 

152 return self._connection 

153 

154 

155class AsyncAfterConnectionReleasedEvent(AfterConnectionReleasedEvent): 

156 pass 

157 

158 

159class ClientType(Enum): 

160 SYNC = ("sync",) 

161 ASYNC = ("async",) 

162 

163 

164class AfterPooledConnectionsInstantiationEvent: 

165 """ 

166 Event that will be fired after pooled connection instances was created. 

167 """ 

168 

169 def __init__( 

170 self, 

171 connection_pools: List, 

172 client_type: ClientType, 

173 credential_provider: Optional[CredentialProvider] = None, 

174 ): 

175 self._connection_pools = connection_pools 

176 self._client_type = client_type 

177 self._credential_provider = credential_provider 

178 

179 @property 

180 def connection_pools(self): 

181 return self._connection_pools 

182 

183 @property 

184 def client_type(self) -> ClientType: 

185 return self._client_type 

186 

187 @property 

188 def credential_provider(self) -> Union[CredentialProvider, None]: 

189 return self._credential_provider 

190 

191 

192class AfterSingleConnectionInstantiationEvent: 

193 """ 

194 Event that will be fired after single connection instances was created. 

195 

196 :param connection_lock: For sync client thread-lock should be provided, 

197 for async asyncio.Lock 

198 """ 

199 

200 def __init__( 

201 self, 

202 connection, 

203 client_type: ClientType, 

204 connection_lock: Union[threading.RLock, asyncio.Lock], 

205 ): 

206 self._connection = connection 

207 self._client_type = client_type 

208 self._connection_lock = connection_lock 

209 

210 @property 

211 def connection(self): 

212 return self._connection 

213 

214 @property 

215 def client_type(self) -> ClientType: 

216 return self._client_type 

217 

218 @property 

219 def connection_lock(self) -> Union[threading.RLock, asyncio.Lock]: 

220 return self._connection_lock 

221 

222 

223class AfterPubSubConnectionInstantiationEvent: 

224 def __init__( 

225 self, 

226 pubsub_connection, 

227 connection_pool, 

228 client_type: ClientType, 

229 connection_lock: Union[threading.RLock, asyncio.Lock], 

230 ): 

231 self._pubsub_connection = pubsub_connection 

232 self._connection_pool = connection_pool 

233 self._client_type = client_type 

234 self._connection_lock = connection_lock 

235 

236 @property 

237 def pubsub_connection(self): 

238 return self._pubsub_connection 

239 

240 @property 

241 def connection_pool(self): 

242 return self._connection_pool 

243 

244 @property 

245 def client_type(self) -> ClientType: 

246 return self._client_type 

247 

248 @property 

249 def connection_lock(self) -> Union[threading.RLock, asyncio.Lock]: 

250 return self._connection_lock 

251 

252 

253class AfterAsyncClusterInstantiationEvent: 

254 """ 

255 Event that will be fired after async cluster instance was created. 

256 

257 Async cluster doesn't use connection pools, 

258 instead ClusterNode object manages connections. 

259 """ 

260 

261 def __init__( 

262 self, 

263 nodes: dict, 

264 credential_provider: Optional[CredentialProvider] = None, 

265 ): 

266 self._nodes = nodes 

267 self._credential_provider = credential_provider 

268 

269 @property 

270 def nodes(self) -> dict: 

271 return self._nodes 

272 

273 @property 

274 def credential_provider(self) -> Union[CredentialProvider, None]: 

275 return self._credential_provider 

276 

277 

278class OnCommandsFailEvent: 

279 """ 

280 Event fired whenever a command fails during the execution. 

281 """ 

282 

283 def __init__( 

284 self, 

285 commands: tuple, 

286 exception: Exception, 

287 ): 

288 self._commands = commands 

289 self._exception = exception 

290 

291 @property 

292 def commands(self) -> tuple: 

293 return self._commands 

294 

295 @property 

296 def exception(self) -> Exception: 

297 return self._exception 

298 

299 

300class AsyncOnCommandsFailEvent(OnCommandsFailEvent): 

301 pass 

302 

303 

304class ReAuthConnectionListener(EventListenerInterface): 

305 """ 

306 Listener that performs re-authentication of given connection. 

307 """ 

308 

309 def listen(self, event: AfterConnectionReleasedEvent): 

310 event.connection.re_auth() 

311 

312 

313class AsyncReAuthConnectionListener(AsyncEventListenerInterface): 

314 """ 

315 Async listener that performs re-authentication of given connection. 

316 """ 

317 

318 async def listen(self, event: AsyncAfterConnectionReleasedEvent): 

319 await event.connection.re_auth() 

320 

321 

322class RegisterReAuthForPooledConnections(EventListenerInterface): 

323 """ 

324 Listener that registers a re-authentication callback for pooled connections. 

325 Required by :class:`StreamingCredentialProvider`. 

326 """ 

327 

328 def __init__(self): 

329 self._event = None 

330 

331 def listen(self, event: AfterPooledConnectionsInstantiationEvent): 

332 if isinstance(event.credential_provider, StreamingCredentialProvider): 

333 self._event = event 

334 

335 if event.client_type == ClientType.SYNC: 

336 event.credential_provider.on_next(self._re_auth) 

337 event.credential_provider.on_error(self._raise_on_error) 

338 else: 

339 event.credential_provider.on_next(self._re_auth_async) 

340 event.credential_provider.on_error(self._raise_on_error_async) 

341 

342 def _re_auth(self, token): 

343 for pool in self._event.connection_pools: 

344 pool.re_auth_callback(token) 

345 

346 async def _re_auth_async(self, token): 

347 for pool in self._event.connection_pools: 

348 await pool.re_auth_callback(token) 

349 

350 def _raise_on_error(self, error: Exception): 

351 raise EventException(error, self._event) 

352 

353 async def _raise_on_error_async(self, error: Exception): 

354 raise EventException(error, self._event) 

355 

356 

357class RegisterReAuthForSingleConnection(EventListenerInterface): 

358 """ 

359 Listener that registers a re-authentication callback for single connection. 

360 Required by :class:`StreamingCredentialProvider`. 

361 """ 

362 

363 def __init__(self): 

364 self._event = None 

365 

366 def listen(self, event: AfterSingleConnectionInstantiationEvent): 

367 if isinstance( 

368 event.connection.credential_provider, StreamingCredentialProvider 

369 ): 

370 self._event = event 

371 

372 if event.client_type == ClientType.SYNC: 

373 event.connection.credential_provider.on_next(self._re_auth) 

374 event.connection.credential_provider.on_error(self._raise_on_error) 

375 else: 

376 event.connection.credential_provider.on_next(self._re_auth_async) 

377 event.connection.credential_provider.on_error( 

378 self._raise_on_error_async 

379 ) 

380 

381 def _re_auth(self, token): 

382 with self._event.connection_lock: 

383 self._event.connection.send_command( 

384 "AUTH", token.try_get("oid"), token.get_value() 

385 ) 

386 self._event.connection.read_response() 

387 

388 async def _re_auth_async(self, token): 

389 async with self._event.connection_lock: 

390 await self._event.connection.send_command( 

391 "AUTH", token.try_get("oid"), token.get_value() 

392 ) 

393 await self._event.connection.read_response() 

394 

395 def _raise_on_error(self, error: Exception): 

396 raise EventException(error, self._event) 

397 

398 async def _raise_on_error_async(self, error: Exception): 

399 raise EventException(error, self._event) 

400 

401 

402class RegisterReAuthForAsyncClusterNodes(EventListenerInterface): 

403 def __init__(self): 

404 self._event = None 

405 

406 def listen(self, event: AfterAsyncClusterInstantiationEvent): 

407 if isinstance(event.credential_provider, StreamingCredentialProvider): 

408 self._event = event 

409 event.credential_provider.on_next(self._re_auth) 

410 event.credential_provider.on_error(self._raise_on_error) 

411 

412 async def _re_auth(self, token: TokenInterface): 

413 for key in self._event.nodes: 

414 await self._event.nodes[key].re_auth_callback(token) 

415 

416 async def _raise_on_error(self, error: Exception): 

417 raise EventException(error, self._event) 

418 

419 

420class RegisterReAuthForPubSub(EventListenerInterface): 

421 def __init__(self): 

422 self._connection = None 

423 self._connection_pool = None 

424 self._client_type = None 

425 self._connection_lock = None 

426 self._event = None 

427 

428 def listen(self, event: AfterPubSubConnectionInstantiationEvent): 

429 if isinstance( 

430 event.pubsub_connection.credential_provider, StreamingCredentialProvider 

431 ) and check_protocol_version(event.pubsub_connection.get_protocol(), 3): 

432 self._event = event 

433 self._connection = event.pubsub_connection 

434 self._connection_pool = event.connection_pool 

435 self._client_type = event.client_type 

436 self._connection_lock = event.connection_lock 

437 

438 if self._client_type == ClientType.SYNC: 

439 self._connection.credential_provider.on_next(self._re_auth) 

440 self._connection.credential_provider.on_error(self._raise_on_error) 

441 else: 

442 self._connection.credential_provider.on_next(self._re_auth_async) 

443 self._connection.credential_provider.on_error( 

444 self._raise_on_error_async 

445 ) 

446 

447 def _re_auth(self, token: TokenInterface): 

448 with self._connection_lock: 

449 self._connection.send_command( 

450 "AUTH", token.try_get("oid"), token.get_value() 

451 ) 

452 self._connection.read_response() 

453 

454 self._connection_pool.re_auth_callback(token) 

455 

456 async def _re_auth_async(self, token: TokenInterface): 

457 async with self._connection_lock: 

458 await self._connection.send_command( 

459 "AUTH", token.try_get("oid"), token.get_value() 

460 ) 

461 await self._connection.read_response() 

462 

463 await self._connection_pool.re_auth_callback(token) 

464 

465 def _raise_on_error(self, error: Exception): 

466 raise EventException(error, self._event) 

467 

468 async def _raise_on_error_async(self, error: Exception): 

469 raise EventException(error, self._event)