Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/event.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

201 statements  

1import asyncio 

2import threading 

3from abc import ABC, abstractmethod 

4from enum import Enum 

5from typing import List, Optional, Union 

6 

7from redis.auth.token import TokenInterface 

8from redis.credentials import CredentialProvider, StreamingCredentialProvider 

9 

10 

11class EventListenerInterface(ABC): 

12 """ 

13 Represents a listener for given event object. 

14 """ 

15 

16 @abstractmethod 

17 def listen(self, event: object): 

18 pass 

19 

20 

21class AsyncEventListenerInterface(ABC): 

22 """ 

23 Represents an async listener for given event object. 

24 """ 

25 

26 @abstractmethod 

27 async def listen(self, event: object): 

28 pass 

29 

30 

31class EventDispatcherInterface(ABC): 

32 """ 

33 Represents a dispatcher that dispatches events to listeners 

34 associated with given event. 

35 """ 

36 

37 @abstractmethod 

38 def dispatch(self, event: object): 

39 pass 

40 

41 @abstractmethod 

42 async def dispatch_async(self, event: object): 

43 pass 

44 

45 

46class EventException(Exception): 

47 """ 

48 Exception wrapper that adds an event object into exception context. 

49 """ 

50 

51 def __init__(self, exception: Exception, event: object): 

52 self.exception = exception 

53 self.event = event 

54 super().__init__(exception) 

55 

56 

57class EventDispatcher(EventDispatcherInterface): 

58 # TODO: Make dispatcher to accept external mappings. 

59 def __init__(self): 

60 """ 

61 Mapping should be extended for any new events or listeners to be added. 

62 """ 

63 self._event_listeners_mapping = { 

64 AfterConnectionReleasedEvent: [ 

65 ReAuthConnectionListener(), 

66 ], 

67 AfterPooledConnectionsInstantiationEvent: [ 

68 RegisterReAuthForPooledConnections() 

69 ], 

70 AfterSingleConnectionInstantiationEvent: [ 

71 RegisterReAuthForSingleConnection() 

72 ], 

73 AfterPubSubConnectionInstantiationEvent: [RegisterReAuthForPubSub()], 

74 AfterAsyncClusterInstantiationEvent: [RegisterReAuthForAsyncClusterNodes()], 

75 AsyncAfterConnectionReleasedEvent: [ 

76 AsyncReAuthConnectionListener(), 

77 ], 

78 } 

79 

80 def dispatch(self, event: object): 

81 listeners = self._event_listeners_mapping.get(type(event)) 

82 

83 for listener in listeners: 

84 listener.listen(event) 

85 

86 async def dispatch_async(self, event: object): 

87 listeners = self._event_listeners_mapping.get(type(event)) 

88 

89 for listener in listeners: 

90 await listener.listen(event) 

91 

92 

93class AfterConnectionReleasedEvent: 

94 """ 

95 Event that will be fired before each command execution. 

96 """ 

97 

98 def __init__(self, connection): 

99 self._connection = connection 

100 

101 @property 

102 def connection(self): 

103 return self._connection 

104 

105 

106class AsyncAfterConnectionReleasedEvent(AfterConnectionReleasedEvent): 

107 pass 

108 

109 

110class ClientType(Enum): 

111 SYNC = ("sync",) 

112 ASYNC = ("async",) 

113 

114 

115class AfterPooledConnectionsInstantiationEvent: 

116 """ 

117 Event that will be fired after pooled connection instances was created. 

118 """ 

119 

120 def __init__( 

121 self, 

122 connection_pools: List, 

123 client_type: ClientType, 

124 credential_provider: Optional[CredentialProvider] = None, 

125 ): 

126 self._connection_pools = connection_pools 

127 self._client_type = client_type 

128 self._credential_provider = credential_provider 

129 

130 @property 

131 def connection_pools(self): 

132 return self._connection_pools 

133 

134 @property 

135 def client_type(self) -> ClientType: 

136 return self._client_type 

137 

138 @property 

139 def credential_provider(self) -> Union[CredentialProvider, None]: 

140 return self._credential_provider 

141 

142 

143class AfterSingleConnectionInstantiationEvent: 

144 """ 

145 Event that will be fired after single connection instances was created. 

146 

147 :param connection_lock: For sync client thread-lock should be provided, 

148 for async asyncio.Lock 

149 """ 

150 

151 def __init__( 

152 self, 

153 connection, 

154 client_type: ClientType, 

155 connection_lock: Union[threading.RLock, asyncio.Lock], 

156 ): 

157 self._connection = connection 

158 self._client_type = client_type 

159 self._connection_lock = connection_lock 

160 

161 @property 

162 def connection(self): 

163 return self._connection 

164 

165 @property 

166 def client_type(self) -> ClientType: 

167 return self._client_type 

168 

169 @property 

170 def connection_lock(self) -> Union[threading.RLock, asyncio.Lock]: 

171 return self._connection_lock 

172 

173 

174class AfterPubSubConnectionInstantiationEvent: 

175 def __init__( 

176 self, 

177 pubsub_connection, 

178 connection_pool, 

179 client_type: ClientType, 

180 connection_lock: Union[threading.RLock, asyncio.Lock], 

181 ): 

182 self._pubsub_connection = pubsub_connection 

183 self._connection_pool = connection_pool 

184 self._client_type = client_type 

185 self._connection_lock = connection_lock 

186 

187 @property 

188 def pubsub_connection(self): 

189 return self._pubsub_connection 

190 

191 @property 

192 def connection_pool(self): 

193 return self._connection_pool 

194 

195 @property 

196 def client_type(self) -> ClientType: 

197 return self._client_type 

198 

199 @property 

200 def connection_lock(self) -> Union[threading.RLock, asyncio.Lock]: 

201 return self._connection_lock 

202 

203 

204class AfterAsyncClusterInstantiationEvent: 

205 """ 

206 Event that will be fired after async cluster instance was created. 

207 

208 Async cluster doesn't use connection pools, 

209 instead ClusterNode object manages connections. 

210 """ 

211 

212 def __init__( 

213 self, 

214 nodes: dict, 

215 credential_provider: Optional[CredentialProvider] = None, 

216 ): 

217 self._nodes = nodes 

218 self._credential_provider = credential_provider 

219 

220 @property 

221 def nodes(self) -> dict: 

222 return self._nodes 

223 

224 @property 

225 def credential_provider(self) -> Union[CredentialProvider, None]: 

226 return self._credential_provider 

227 

228 

229class ReAuthConnectionListener(EventListenerInterface): 

230 """ 

231 Listener that performs re-authentication of given connection. 

232 """ 

233 

234 def listen(self, event: AfterConnectionReleasedEvent): 

235 event.connection.re_auth() 

236 

237 

238class AsyncReAuthConnectionListener(AsyncEventListenerInterface): 

239 """ 

240 Async listener that performs re-authentication of given connection. 

241 """ 

242 

243 async def listen(self, event: AsyncAfterConnectionReleasedEvent): 

244 await event.connection.re_auth() 

245 

246 

247class RegisterReAuthForPooledConnections(EventListenerInterface): 

248 """ 

249 Listener that registers a re-authentication callback for pooled connections. 

250 Required by :class:`StreamingCredentialProvider`. 

251 """ 

252 

253 def __init__(self): 

254 self._event = None 

255 

256 def listen(self, event: AfterPooledConnectionsInstantiationEvent): 

257 if isinstance(event.credential_provider, StreamingCredentialProvider): 

258 self._event = event 

259 

260 if event.client_type == ClientType.SYNC: 

261 event.credential_provider.on_next(self._re_auth) 

262 event.credential_provider.on_error(self._raise_on_error) 

263 else: 

264 event.credential_provider.on_next(self._re_auth_async) 

265 event.credential_provider.on_error(self._raise_on_error_async) 

266 

267 def _re_auth(self, token): 

268 for pool in self._event.connection_pools: 

269 pool.re_auth_callback(token) 

270 

271 async def _re_auth_async(self, token): 

272 for pool in self._event.connection_pools: 

273 await pool.re_auth_callback(token) 

274 

275 def _raise_on_error(self, error: Exception): 

276 raise EventException(error, self._event) 

277 

278 async def _raise_on_error_async(self, error: Exception): 

279 raise EventException(error, self._event) 

280 

281 

282class RegisterReAuthForSingleConnection(EventListenerInterface): 

283 """ 

284 Listener that registers a re-authentication callback for single connection. 

285 Required by :class:`StreamingCredentialProvider`. 

286 """ 

287 

288 def __init__(self): 

289 self._event = None 

290 

291 def listen(self, event: AfterSingleConnectionInstantiationEvent): 

292 if isinstance( 

293 event.connection.credential_provider, StreamingCredentialProvider 

294 ): 

295 self._event = event 

296 

297 if event.client_type == ClientType.SYNC: 

298 event.connection.credential_provider.on_next(self._re_auth) 

299 event.connection.credential_provider.on_error(self._raise_on_error) 

300 else: 

301 event.connection.credential_provider.on_next(self._re_auth_async) 

302 event.connection.credential_provider.on_error( 

303 self._raise_on_error_async 

304 ) 

305 

306 def _re_auth(self, token): 

307 with self._event.connection_lock: 

308 self._event.connection.send_command( 

309 "AUTH", token.try_get("oid"), token.get_value() 

310 ) 

311 self._event.connection.read_response() 

312 

313 async def _re_auth_async(self, token): 

314 async with self._event.connection_lock: 

315 await self._event.connection.send_command( 

316 "AUTH", token.try_get("oid"), token.get_value() 

317 ) 

318 await self._event.connection.read_response() 

319 

320 def _raise_on_error(self, error: Exception): 

321 raise EventException(error, self._event) 

322 

323 async def _raise_on_error_async(self, error: Exception): 

324 raise EventException(error, self._event) 

325 

326 

327class RegisterReAuthForAsyncClusterNodes(EventListenerInterface): 

328 def __init__(self): 

329 self._event = None 

330 

331 def listen(self, event: AfterAsyncClusterInstantiationEvent): 

332 if isinstance(event.credential_provider, StreamingCredentialProvider): 

333 self._event = event 

334 event.credential_provider.on_next(self._re_auth) 

335 event.credential_provider.on_error(self._raise_on_error) 

336 

337 async def _re_auth(self, token: TokenInterface): 

338 for key in self._event.nodes: 

339 await self._event.nodes[key].re_auth_callback(token) 

340 

341 async def _raise_on_error(self, error: Exception): 

342 raise EventException(error, self._event) 

343 

344 

345class RegisterReAuthForPubSub(EventListenerInterface): 

346 def __init__(self): 

347 self._connection = None 

348 self._connection_pool = None 

349 self._client_type = None 

350 self._connection_lock = None 

351 self._event = None 

352 

353 def listen(self, event: AfterPubSubConnectionInstantiationEvent): 

354 if isinstance( 

355 event.pubsub_connection.credential_provider, StreamingCredentialProvider 

356 ) and event.pubsub_connection.get_protocol() in [3, "3"]: 

357 self._event = event 

358 self._connection = event.pubsub_connection 

359 self._connection_pool = event.connection_pool 

360 self._client_type = event.client_type 

361 self._connection_lock = event.connection_lock 

362 

363 if self._client_type == ClientType.SYNC: 

364 self._connection.credential_provider.on_next(self._re_auth) 

365 self._connection.credential_provider.on_error(self._raise_on_error) 

366 else: 

367 self._connection.credential_provider.on_next(self._re_auth_async) 

368 self._connection.credential_provider.on_error( 

369 self._raise_on_error_async 

370 ) 

371 

372 def _re_auth(self, token: TokenInterface): 

373 with self._connection_lock: 

374 self._connection.send_command( 

375 "AUTH", token.try_get("oid"), token.get_value() 

376 ) 

377 self._connection.read_response() 

378 

379 self._connection_pool.re_auth_callback(token) 

380 

381 async def _re_auth_async(self, token: TokenInterface): 

382 async with self._connection_lock: 

383 await self._connection.send_command( 

384 "AUTH", token.try_get("oid"), token.get_value() 

385 ) 

386 await self._connection.read_response() 

387 

388 await self._connection_pool.re_auth_callback(token) 

389 

390 def _raise_on_error(self, error: Exception): 

391 raise EventException(error, self._event) 

392 

393 async def _raise_on_error_async(self, error: Exception): 

394 raise EventException(error, self._event)