Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/event/registry.py: 62%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

181 statements  

1# event/registry.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Provides managed registration services on behalf of :func:`.listen` 

9arguments. 

10 

11By "managed registration", we mean that event listening functions and 

12other objects can be added to various collections in such a way that their 

13membership in all those collections can be revoked at once, based on 

14an equivalent :class:`._EventKey`. 

15 

16""" 

17from __future__ import annotations 

18 

19import collections 

20import types 

21import typing 

22from typing import Any 

23from typing import Callable 

24from typing import cast 

25from typing import Deque 

26from typing import Dict 

27from typing import Generic 

28from typing import Iterable 

29from typing import Optional 

30from typing import Tuple 

31from typing import TypeVar 

32from typing import Union 

33import weakref 

34 

35from .. import exc 

36from .. import util 

37 

38if typing.TYPE_CHECKING: 

39 from .attr import RefCollection 

40 from .base import dispatcher 

41 

42_ListenerFnType = Callable[..., Any] 

43_ListenerFnKeyType = Union[int, Tuple[int, int]] 

44_EventKeyTupleType = Tuple[int, str, _ListenerFnKeyType] 

45 

46 

47_ET = TypeVar("_ET", bound="EventTarget") 

48 

49 

50class EventTarget: 

51 """represents an event target, that is, something we can listen on 

52 either with that target as a class or as an instance. 

53 

54 Examples include: Connection, Mapper, Table, Session, 

55 InstrumentedAttribute, Engine, Pool, Dialect. 

56 

57 """ 

58 

59 __slots__ = () 

60 

61 dispatch: dispatcher[Any] 

62 

63 

64_RefCollectionToListenerType = Dict[ 

65 "weakref.ref[RefCollection[Any]]", 

66 "weakref.ref[_ListenerFnType]", 

67] 

68 

69_key_to_collection: Dict[_EventKeyTupleType, _RefCollectionToListenerType] = ( 

70 collections.defaultdict(dict) 

71) 

72""" 

73Given an original listen() argument, can locate all 

74listener collections and the listener fn contained 

75 

76(target, identifier, fn) -> { 

77 ref(listenercollection) -> ref(listener_fn) 

78 ref(listenercollection) -> ref(listener_fn) 

79 ref(listenercollection) -> ref(listener_fn) 

80 } 

81""" 

82 

83_ListenerToEventKeyType = Dict[ 

84 "weakref.ref[_ListenerFnType]", 

85 _EventKeyTupleType, 

86] 

87_collection_to_key: Dict[ 

88 weakref.ref[RefCollection[Any]], 

89 _ListenerToEventKeyType, 

90] = collections.defaultdict(dict) 

91""" 

92Given a _ListenerCollection or _ClsLevelListener, can locate 

93all the original listen() arguments and the listener fn contained 

94 

95ref(listenercollection) -> { 

96 ref(listener_fn) -> (target, identifier, fn), 

97 ref(listener_fn) -> (target, identifier, fn), 

98 ref(listener_fn) -> (target, identifier, fn), 

99 } 

100""" 

101 

102 

103def _collection_gced(ref: weakref.ref[Any]) -> None: 

104 # defaultdict, so can't get a KeyError 

105 if not _collection_to_key or ref not in _collection_to_key: 

106 return 

107 

108 ref = cast("weakref.ref[RefCollection[EventTarget]]", ref) 

109 

110 listener_to_key = _collection_to_key.pop(ref) 

111 for key in listener_to_key.values(): 

112 if key in _key_to_collection: 

113 # defaultdict, so can't get a KeyError 

114 dispatch_reg = _key_to_collection[key] 

115 dispatch_reg.pop(ref) 

116 if not dispatch_reg: 

117 _key_to_collection.pop(key) 

118 

119 

120def _stored_in_collection( 

121 event_key: _EventKey[_ET], owner: RefCollection[_ET] 

122) -> bool: 

123 key = event_key._key 

124 

125 dispatch_reg = _key_to_collection[key] 

126 

127 owner_ref = owner.ref 

128 listen_ref = weakref.ref(event_key._listen_fn) 

129 

130 if owner_ref in dispatch_reg: 

131 return False 

132 

133 dispatch_reg[owner_ref] = listen_ref 

134 

135 listener_to_key = _collection_to_key[owner_ref] 

136 listener_to_key[listen_ref] = key 

137 

138 return True 

139 

140 

141def _removed_from_collection( 

142 event_key: _EventKey[_ET], owner: RefCollection[_ET] 

143) -> None: 

144 key = event_key._key 

145 

146 dispatch_reg = _key_to_collection[key] 

147 

148 listen_ref = weakref.ref(event_key._listen_fn) 

149 

150 owner_ref = owner.ref 

151 dispatch_reg.pop(owner_ref, None) 

152 if not dispatch_reg: 

153 del _key_to_collection[key] 

154 

155 if owner_ref in _collection_to_key: 

156 listener_to_key = _collection_to_key[owner_ref] 

157 # see #12216 - this guards against a removal that already occurred 

158 # here. however, I cannot come up with a test that shows any negative 

159 # side effects occurring from this removal happening, even though an 

160 # event key may still be referenced from a clsleveldispatch here 

161 listener_to_key.pop(listen_ref, None) 

162 

163 

164def _stored_in_collection_multi( 

165 newowner: RefCollection[_ET], 

166 oldowner: RefCollection[_ET], 

167 elements: Iterable[_ListenerFnType], 

168) -> None: 

169 if not elements: 

170 return 

171 

172 oldowner_ref = oldowner.ref 

173 newowner_ref = newowner.ref 

174 

175 old_listener_to_key = _collection_to_key[oldowner_ref] 

176 new_listener_to_key = _collection_to_key[newowner_ref] 

177 

178 for listen_fn in elements: 

179 listen_ref = weakref.ref(listen_fn) 

180 try: 

181 key = old_listener_to_key[listen_ref] 

182 except KeyError: 

183 # can occur during interpreter shutdown. 

184 # see #6740 

185 continue 

186 

187 try: 

188 dispatch_reg = _key_to_collection[key] 

189 except KeyError: 

190 continue 

191 

192 if newowner_ref in dispatch_reg: 

193 assert dispatch_reg[newowner_ref] == listen_ref 

194 else: 

195 dispatch_reg[newowner_ref] = listen_ref 

196 

197 new_listener_to_key[listen_ref] = key 

198 

199 

200def _clear( 

201 owner: RefCollection[_ET], 

202 elements: Iterable[_ListenerFnType], 

203) -> None: 

204 if not elements: 

205 return 

206 

207 owner_ref = owner.ref 

208 listener_to_key = _collection_to_key[owner_ref] 

209 for listen_fn in elements: 

210 listen_ref = weakref.ref(listen_fn) 

211 key = listener_to_key[listen_ref] 

212 dispatch_reg = _key_to_collection[key] 

213 dispatch_reg.pop(owner_ref, None) 

214 

215 if not dispatch_reg: 

216 del _key_to_collection[key] 

217 

218 

219class _EventKey(Generic[_ET]): 

220 """Represent :func:`.listen` arguments.""" 

221 

222 __slots__ = ( 

223 "target", 

224 "identifier", 

225 "fn", 

226 "fn_key", 

227 "fn_wrap", 

228 "dispatch_target", 

229 ) 

230 

231 target: _ET 

232 identifier: str 

233 fn: _ListenerFnType 

234 fn_key: _ListenerFnKeyType 

235 dispatch_target: Any 

236 _fn_wrap: Optional[_ListenerFnType] 

237 

238 def __init__( 

239 self, 

240 target: _ET, 

241 identifier: str, 

242 fn: _ListenerFnType, 

243 dispatch_target: Any, 

244 _fn_wrap: Optional[_ListenerFnType] = None, 

245 ): 

246 self.target = target 

247 self.identifier = identifier 

248 self.fn = fn 

249 if isinstance(fn, types.MethodType): 

250 self.fn_key = id(fn.__func__), id(fn.__self__) 

251 else: 

252 self.fn_key = id(fn) 

253 self.fn_wrap = _fn_wrap 

254 self.dispatch_target = dispatch_target 

255 

256 @property 

257 def _key(self) -> _EventKeyTupleType: 

258 return (id(self.target), self.identifier, self.fn_key) 

259 

260 def with_wrapper(self, fn_wrap: _ListenerFnType) -> _EventKey[_ET]: 

261 if fn_wrap is self._listen_fn: 

262 return self 

263 else: 

264 return _EventKey( 

265 self.target, 

266 self.identifier, 

267 self.fn, 

268 self.dispatch_target, 

269 _fn_wrap=fn_wrap, 

270 ) 

271 

272 def with_dispatch_target(self, dispatch_target: Any) -> _EventKey[_ET]: 

273 if dispatch_target is self.dispatch_target: 

274 return self 

275 else: 

276 return _EventKey( 

277 self.target, 

278 self.identifier, 

279 self.fn, 

280 dispatch_target, 

281 _fn_wrap=self.fn_wrap, 

282 ) 

283 

284 def listen(self, *args: Any, **kw: Any) -> None: 

285 once = kw.pop("once", False) 

286 once_unless_exception = kw.pop("_once_unless_exception", False) 

287 named = kw.pop("named", False) 

288 

289 target, identifier, fn = ( 

290 self.dispatch_target, 

291 self.identifier, 

292 self._listen_fn, 

293 ) 

294 

295 dispatch_collection = getattr(target.dispatch, identifier) 

296 

297 adjusted_fn = dispatch_collection._adjust_fn_spec(fn, named) 

298 

299 self = self.with_wrapper(adjusted_fn) 

300 

301 stub_function = getattr( 

302 self.dispatch_target.dispatch._events, self.identifier 

303 ) 

304 if hasattr(stub_function, "_sa_warn"): 

305 stub_function._sa_warn() 

306 

307 if once or once_unless_exception: 

308 self.with_wrapper( 

309 util.only_once( 

310 self._listen_fn, retry_on_exception=once_unless_exception 

311 ) 

312 ).listen(*args, **kw) 

313 else: 

314 self.dispatch_target.dispatch._listen(self, *args, **kw) 

315 

316 def remove(self) -> None: 

317 key = self._key 

318 

319 if key not in _key_to_collection: 

320 raise exc.InvalidRequestError( 

321 "No listeners found for event %s / %r / %s " 

322 % (self.target, self.identifier, self.fn) 

323 ) 

324 

325 dispatch_reg = _key_to_collection.pop(key) 

326 

327 for collection_ref, listener_ref in dispatch_reg.items(): 

328 collection = collection_ref() 

329 listener_fn = listener_ref() 

330 if collection is not None and listener_fn is not None: 

331 collection.remove(self.with_wrapper(listener_fn)) 

332 

333 def contains(self) -> bool: 

334 """Return True if this event key is registered to listen.""" 

335 return self._key in _key_to_collection 

336 

337 def base_listen( 

338 self, 

339 propagate: bool = False, 

340 insert: bool = False, 

341 named: bool = False, 

342 retval: Optional[bool] = None, 

343 asyncio: bool = False, 

344 ) -> None: 

345 target, identifier = self.dispatch_target, self.identifier 

346 

347 dispatch_collection = getattr(target.dispatch, identifier) 

348 

349 for_modify = dispatch_collection.for_modify(target.dispatch) 

350 if asyncio: 

351 for_modify._set_asyncio() 

352 

353 if insert: 

354 for_modify.insert(self, propagate) 

355 else: 

356 for_modify.append(self, propagate) 

357 

358 @property 

359 def _listen_fn(self) -> _ListenerFnType: 

360 return self.fn_wrap or self.fn 

361 

362 def append_to_list( 

363 self, 

364 owner: RefCollection[_ET], 

365 list_: Deque[_ListenerFnType], 

366 ) -> bool: 

367 if _stored_in_collection(self, owner): 

368 list_.append(self._listen_fn) 

369 return True 

370 else: 

371 return False 

372 

373 def remove_from_list( 

374 self, 

375 owner: RefCollection[_ET], 

376 list_: Deque[_ListenerFnType], 

377 ) -> None: 

378 _removed_from_collection(self, owner) 

379 list_.remove(self._listen_fn) 

380 

381 def prepend_to_list( 

382 self, 

383 owner: RefCollection[_ET], 

384 list_: Deque[_ListenerFnType], 

385 ) -> bool: 

386 if _stored_in_collection(self, owner): 

387 list_.appendleft(self._listen_fn) 

388 return True 

389 else: 

390 return False