Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/event/registry.py: 62%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

181 statements  

1# event/registry.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Provides managed registration services on behalf of :func:`.listen` 

9arguments. 

10 

11By "managed registration", we mean that event listening functions and 

12other objects can be added to various collections in such a way that their 

13membership in all those collections can be revoked at once, based on 

14an equivalent :class:`._EventKey`. 

15 

16""" 

17 

18from __future__ import annotations 

19 

20import collections 

21import types 

22import typing 

23from typing import Any 

24from typing import Callable 

25from typing import cast 

26from typing import Deque 

27from typing import Dict 

28from typing import Generic 

29from typing import Iterable 

30from typing import Optional 

31from typing import Tuple 

32from typing import TypeVar 

33from typing import Union 

34import weakref 

35 

36from .. import exc 

37from .. import util 

38 

39if typing.TYPE_CHECKING: 

40 from .attr import RefCollection 

41 from .base import dispatcher 

42 

43_ListenerFnType = Callable[..., Any] 

44_ListenerFnKeyType = Union[int, Tuple[int, int]] 

45_EventKeyTupleType = Tuple[int, str, _ListenerFnKeyType] 

46 

47 

48_ET = TypeVar("_ET", bound="EventTarget") 

49 

50 

51class EventTarget: 

52 """represents an event target, that is, something we can listen on 

53 either with that target as a class or as an instance. 

54 

55 Examples include: Connection, Mapper, Table, Session, 

56 InstrumentedAttribute, Engine, Pool, Dialect. 

57 

58 """ 

59 

60 __slots__ = () 

61 

62 dispatch: dispatcher[Any] 

63 

64 

65_RefCollectionToListenerType = Dict[ 

66 "weakref.ref[RefCollection[Any]]", 

67 "weakref.ref[_ListenerFnType]", 

68] 

69 

70_key_to_collection: Dict[_EventKeyTupleType, _RefCollectionToListenerType] = ( 

71 collections.defaultdict(dict) 

72) 

73""" 

74Given an original listen() argument, can locate all 

75listener collections and the listener fn contained 

76 

77(target, identifier, fn) -> { 

78 ref(listenercollection) -> ref(listener_fn) 

79 ref(listenercollection) -> ref(listener_fn) 

80 ref(listenercollection) -> ref(listener_fn) 

81 } 

82""" 

83 

84_ListenerToEventKeyType = Dict[ 

85 "weakref.ref[_ListenerFnType]", 

86 _EventKeyTupleType, 

87] 

88_collection_to_key: Dict[ 

89 weakref.ref[RefCollection[Any]], 

90 _ListenerToEventKeyType, 

91] = collections.defaultdict(dict) 

92""" 

93Given a _ListenerCollection or _ClsLevelListener, can locate 

94all the original listen() arguments and the listener fn contained 

95 

96ref(listenercollection) -> { 

97 ref(listener_fn) -> (target, identifier, fn), 

98 ref(listener_fn) -> (target, identifier, fn), 

99 ref(listener_fn) -> (target, identifier, fn), 

100 } 

101""" 

102 

103 

104def _collection_gced(ref: weakref.ref[Any]) -> None: 

105 # defaultdict, so can't get a KeyError 

106 if not _collection_to_key or ref not in _collection_to_key: 

107 return 

108 

109 ref = cast("weakref.ref[RefCollection[EventTarget]]", ref) 

110 

111 listener_to_key = _collection_to_key.pop(ref) 

112 for key in listener_to_key.values(): 

113 if key in _key_to_collection: 

114 # defaultdict, so can't get a KeyError 

115 dispatch_reg = _key_to_collection[key] 

116 dispatch_reg.pop(ref) 

117 if not dispatch_reg: 

118 _key_to_collection.pop(key) 

119 

120 

121def _stored_in_collection( 

122 event_key: _EventKey[_ET], owner: RefCollection[_ET] 

123) -> bool: 

124 key = event_key._key 

125 

126 dispatch_reg = _key_to_collection[key] 

127 

128 owner_ref = owner.ref 

129 listen_ref = weakref.ref(event_key._listen_fn) 

130 

131 if owner_ref in dispatch_reg: 

132 return False 

133 

134 dispatch_reg[owner_ref] = listen_ref 

135 

136 listener_to_key = _collection_to_key[owner_ref] 

137 listener_to_key[listen_ref] = key 

138 

139 return True 

140 

141 

142def _removed_from_collection( 

143 event_key: _EventKey[_ET], owner: RefCollection[_ET] 

144) -> None: 

145 key = event_key._key 

146 

147 dispatch_reg = _key_to_collection[key] 

148 

149 listen_ref = weakref.ref(event_key._listen_fn) 

150 

151 owner_ref = owner.ref 

152 dispatch_reg.pop(owner_ref, None) 

153 if not dispatch_reg: 

154 del _key_to_collection[key] 

155 

156 if owner_ref in _collection_to_key: 

157 listener_to_key = _collection_to_key[owner_ref] 

158 # see #12216 - this guards against a removal that already occurred 

159 # here. however, I cannot come up with a test that shows any negative 

160 # side effects occurring from this removal happening, even though an 

161 # event key may still be referenced from a clsleveldispatch here 

162 listener_to_key.pop(listen_ref, None) 

163 

164 

165def _stored_in_collection_multi( 

166 newowner: RefCollection[_ET], 

167 oldowner: RefCollection[_ET], 

168 elements: Iterable[_ListenerFnType], 

169) -> None: 

170 if not elements: 

171 return 

172 

173 oldowner_ref = oldowner.ref 

174 newowner_ref = newowner.ref 

175 

176 old_listener_to_key = _collection_to_key[oldowner_ref] 

177 new_listener_to_key = _collection_to_key[newowner_ref] 

178 

179 for listen_fn in elements: 

180 listen_ref = weakref.ref(listen_fn) 

181 try: 

182 key = old_listener_to_key[listen_ref] 

183 except KeyError: 

184 # can occur during interpreter shutdown. 

185 # see #6740 

186 continue 

187 

188 try: 

189 dispatch_reg = _key_to_collection[key] 

190 except KeyError: 

191 continue 

192 

193 if newowner_ref in dispatch_reg: 

194 assert dispatch_reg[newowner_ref] == listen_ref 

195 else: 

196 dispatch_reg[newowner_ref] = listen_ref 

197 

198 new_listener_to_key[listen_ref] = key 

199 

200 

201def _clear( 

202 owner: RefCollection[_ET], 

203 elements: Iterable[_ListenerFnType], 

204) -> None: 

205 if not elements: 

206 return 

207 

208 owner_ref = owner.ref 

209 listener_to_key = _collection_to_key[owner_ref] 

210 for listen_fn in elements: 

211 listen_ref = weakref.ref(listen_fn) 

212 key = listener_to_key[listen_ref] 

213 dispatch_reg = _key_to_collection[key] 

214 dispatch_reg.pop(owner_ref, None) 

215 

216 if not dispatch_reg: 

217 del _key_to_collection[key] 

218 

219 

220class _EventKey(Generic[_ET]): 

221 """Represent :func:`.listen` arguments.""" 

222 

223 __slots__ = ( 

224 "target", 

225 "identifier", 

226 "fn", 

227 "fn_key", 

228 "fn_wrap", 

229 "dispatch_target", 

230 ) 

231 

232 target: _ET 

233 identifier: str 

234 fn: _ListenerFnType 

235 fn_key: _ListenerFnKeyType 

236 dispatch_target: Any 

237 _fn_wrap: Optional[_ListenerFnType] 

238 

239 def __init__( 

240 self, 

241 target: _ET, 

242 identifier: str, 

243 fn: _ListenerFnType, 

244 dispatch_target: Any, 

245 _fn_wrap: Optional[_ListenerFnType] = None, 

246 ): 

247 self.target = target 

248 self.identifier = identifier 

249 self.fn = fn 

250 if isinstance(fn, types.MethodType): 

251 self.fn_key = id(fn.__func__), id(fn.__self__) 

252 else: 

253 self.fn_key = id(fn) 

254 self.fn_wrap = _fn_wrap 

255 self.dispatch_target = dispatch_target 

256 

257 @property 

258 def _key(self) -> _EventKeyTupleType: 

259 return (id(self.target), self.identifier, self.fn_key) 

260 

261 def with_wrapper(self, fn_wrap: _ListenerFnType) -> _EventKey[_ET]: 

262 if fn_wrap is self._listen_fn: 

263 return self 

264 else: 

265 return _EventKey( 

266 self.target, 

267 self.identifier, 

268 self.fn, 

269 self.dispatch_target, 

270 _fn_wrap=fn_wrap, 

271 ) 

272 

273 def with_dispatch_target(self, dispatch_target: Any) -> _EventKey[_ET]: 

274 if dispatch_target is self.dispatch_target: 

275 return self 

276 else: 

277 return _EventKey( 

278 self.target, 

279 self.identifier, 

280 self.fn, 

281 dispatch_target, 

282 _fn_wrap=self.fn_wrap, 

283 ) 

284 

285 def listen(self, *args: Any, **kw: Any) -> None: 

286 once = kw.pop("once", False) 

287 once_unless_exception = kw.pop("_once_unless_exception", False) 

288 named = kw.pop("named", False) 

289 

290 target, identifier, fn = ( 

291 self.dispatch_target, 

292 self.identifier, 

293 self._listen_fn, 

294 ) 

295 

296 dispatch_collection = getattr(target.dispatch, identifier) 

297 

298 adjusted_fn = dispatch_collection._adjust_fn_spec(fn, named) 

299 

300 self = self.with_wrapper(adjusted_fn) 

301 

302 stub_function = getattr( 

303 self.dispatch_target.dispatch._events, self.identifier 

304 ) 

305 if hasattr(stub_function, "_sa_warn"): 

306 stub_function._sa_warn() 

307 

308 if once or once_unless_exception: 

309 self.with_wrapper( 

310 util.only_once( 

311 self._listen_fn, retry_on_exception=once_unless_exception 

312 ) 

313 ).listen(*args, **kw) 

314 else: 

315 self.dispatch_target.dispatch._listen(self, *args, **kw) 

316 

317 def remove(self) -> None: 

318 key = self._key 

319 

320 if key not in _key_to_collection: 

321 raise exc.InvalidRequestError( 

322 "No listeners found for event %s / %r / %s " 

323 % (self.target, self.identifier, self.fn) 

324 ) 

325 

326 dispatch_reg = _key_to_collection.pop(key) 

327 

328 for collection_ref, listener_ref in dispatch_reg.items(): 

329 collection = collection_ref() 

330 listener_fn = listener_ref() 

331 if collection is not None and listener_fn is not None: 

332 collection.remove(self.with_wrapper(listener_fn)) 

333 

334 def contains(self) -> bool: 

335 """Return True if this event key is registered to listen.""" 

336 return self._key in _key_to_collection 

337 

338 def base_listen( 

339 self, 

340 propagate: bool = False, 

341 insert: bool = False, 

342 named: bool = False, 

343 retval: Optional[bool] = None, 

344 asyncio: bool = False, 

345 ) -> None: 

346 target, identifier = self.dispatch_target, self.identifier 

347 

348 dispatch_collection = getattr(target.dispatch, identifier) 

349 

350 for_modify = dispatch_collection.for_modify(target.dispatch) 

351 if asyncio: 

352 for_modify._set_asyncio() 

353 

354 if insert: 

355 for_modify.insert(self, propagate) 

356 else: 

357 for_modify.append(self, propagate) 

358 

359 @property 

360 def _listen_fn(self) -> _ListenerFnType: 

361 return self.fn_wrap or self.fn 

362 

363 def append_to_list( 

364 self, 

365 owner: RefCollection[_ET], 

366 list_: Deque[_ListenerFnType], 

367 ) -> bool: 

368 if _stored_in_collection(self, owner): 

369 list_.append(self._listen_fn) 

370 return True 

371 else: 

372 return False 

373 

374 def remove_from_list( 

375 self, 

376 owner: RefCollection[_ET], 

377 list_: Deque[_ListenerFnType], 

378 ) -> None: 

379 _removed_from_collection(self, owner) 

380 list_.remove(self._listen_fn) 

381 

382 def prepend_to_list( 

383 self, 

384 owner: RefCollection[_ET], 

385 list_: Deque[_ListenerFnType], 

386 ) -> bool: 

387 if _stored_in_collection(self, owner): 

388 list_.appendleft(self._listen_fn) 

389 return True 

390 else: 

391 return False