Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/event/base.py: 78%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

217 statements  

1# event/base.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Base implementation classes. 

9 

10The public-facing ``Events`` serves as the base class for an event interface; 

11its public attributes represent different kinds of events. These attributes 

12are mirrored onto a ``_Dispatch`` class, which serves as a container for 

13collections of listener functions. These collections are represented both 

14at the class level of a particular ``_Dispatch`` class as well as within 

15instances of ``_Dispatch``. 

16 

17""" 

18 

19from __future__ import annotations 

20 

21import typing 

22from typing import Any 

23from typing import cast 

24from typing import Dict 

25from typing import Generic 

26from typing import Iterator 

27from typing import List 

28from typing import Literal 

29from typing import Mapping 

30from typing import MutableMapping 

31from typing import Optional 

32from typing import overload 

33from typing import Tuple 

34from typing import Type 

35from typing import Union 

36import weakref 

37 

38from .attr import _ClsLevelDispatch 

39from .attr import _EmptyListener 

40from .attr import _InstanceLevelDispatch 

41from .attr import _JoinedListener 

42from .registry import _ET 

43from .registry import _EventKey 

44from .. import util 

45 

46_registrars: MutableMapping[str, List[Type[_HasEventsDispatch[Any]]]] = ( 

47 util.defaultdict(list) 

48) 

49 

50 

51def _is_event_name(name: str) -> bool: 

52 # _sa_event prefix is special to support internal-only event names. 

53 # most event names are just plain method names that aren't 

54 # underscored. 

55 

56 return ( 

57 not name.startswith("_") and name != "dispatch" 

58 ) or name.startswith("_sa_event") 

59 

60 

61class _UnpickleDispatch: 

62 """Serializable callable that re-generates an instance of 

63 :class:`_Dispatch` given a particular :class:`.Events` subclass. 

64 

65 """ 

66 

67 def __call__(self, _instance_cls: Type[_ET]) -> _Dispatch[_ET]: 

68 for cls in _instance_cls.__mro__: 

69 if "dispatch" in cls.__dict__: 

70 return cast( 

71 "_Dispatch[_ET]", cls.__dict__["dispatch"].dispatch 

72 )._for_class(_instance_cls) 

73 else: 

74 raise AttributeError("No class with a 'dispatch' member present.") 

75 

76 

77class _DispatchCommon(Generic[_ET]): 

78 __slots__ = () 

79 

80 _instance_cls: Optional[Type[_ET]] 

81 

82 def _join(self, other: _DispatchCommon[_ET]) -> _JoinedDispatcher[_ET]: 

83 raise NotImplementedError() 

84 

85 def __getattr__(self, name: str) -> _InstanceLevelDispatch[_ET]: 

86 raise NotImplementedError() 

87 

88 @property 

89 def _events(self) -> Type[_HasEventsDispatch[_ET]]: 

90 raise NotImplementedError() 

91 

92 

93class _Dispatch(_DispatchCommon[_ET]): 

94 """Mirror the event listening definitions of an Events class with 

95 listener collections. 

96 

97 Classes which define a "dispatch" member will return a 

98 non-instantiated :class:`._Dispatch` subclass when the member 

99 is accessed at the class level. When the "dispatch" member is 

100 accessed at the instance level of its owner, an instance 

101 of the :class:`._Dispatch` class is returned. 

102 

103 A :class:`._Dispatch` class is generated for each :class:`.Events` 

104 class defined, by the :meth:`._HasEventsDispatch._create_dispatcher_class` 

105 method. The original :class:`.Events` classes remain untouched. 

106 This decouples the construction of :class:`.Events` subclasses from 

107 the implementation used by the event internals, and allows 

108 inspecting tools like Sphinx to work in an unsurprising 

109 way against the public API. 

110 

111 """ 

112 

113 # "active_history" is an ORM case we add here. ideally a better 

114 # system would be in place for ad-hoc attributes. 

115 __slots__ = "_parent", "_instance_cls", "__dict__", "_empty_listeners" 

116 

117 _active_history: bool 

118 

119 _empty_listener_reg: MutableMapping[ 

120 Type[_ET], Dict[str, _EmptyListener[_ET]] 

121 ] = weakref.WeakKeyDictionary() 

122 

123 _empty_listeners: Dict[str, _EmptyListener[_ET]] 

124 

125 _event_names: List[str] 

126 

127 _instance_cls: Optional[Type[_ET]] 

128 

129 _joined_dispatch_cls: Type[_JoinedDispatcher[_ET]] 

130 

131 _events: Type[_HasEventsDispatch[_ET]] 

132 """reference back to the Events class. 

133 

134 Bidirectional against _HasEventsDispatch.dispatch 

135 

136 """ 

137 

138 def __init__( 

139 self, 

140 parent: Optional[_Dispatch[_ET]], 

141 instance_cls: Optional[Type[_ET]] = None, 

142 ): 

143 self._parent = parent 

144 self._instance_cls = instance_cls 

145 

146 if instance_cls: 

147 assert parent is not None 

148 try: 

149 self._empty_listeners = self._empty_listener_reg[instance_cls] 

150 except KeyError: 

151 self._empty_listeners = self._empty_listener_reg[ 

152 instance_cls 

153 ] = { 

154 ls.name: _EmptyListener(ls, instance_cls) 

155 for ls in parent._event_descriptors 

156 } 

157 else: 

158 self._empty_listeners = {} 

159 

160 def __getattr__(self, name: str) -> _InstanceLevelDispatch[_ET]: 

161 # Assign EmptyListeners as attributes on demand 

162 # to reduce startup time for new dispatch objects. 

163 try: 

164 ls = self._empty_listeners[name] 

165 except KeyError: 

166 raise AttributeError(name) 

167 else: 

168 setattr(self, ls.name, ls) 

169 return ls 

170 

171 @property 

172 def _event_descriptors(self) -> Iterator[_ClsLevelDispatch[_ET]]: 

173 for k in self._event_names: 

174 # Yield _ClsLevelDispatch related 

175 # to relevant event name. 

176 yield getattr(self, k) 

177 

178 def _listen(self, event_key: _EventKey[_ET], **kw: Any) -> None: 

179 return self._events._listen(event_key, **kw) 

180 

181 def _for_class(self, instance_cls: Type[_ET]) -> _Dispatch[_ET]: 

182 return self.__class__(self, instance_cls) 

183 

184 def _for_instance(self, instance: _ET) -> _Dispatch[_ET]: 

185 instance_cls = instance.__class__ 

186 return self._for_class(instance_cls) 

187 

188 def _join(self, other: _DispatchCommon[_ET]) -> _JoinedDispatcher[_ET]: 

189 """Create a 'join' of this :class:`._Dispatch` and another. 

190 

191 This new dispatcher will dispatch events to both 

192 :class:`._Dispatch` objects. 

193 

194 """ 

195 assert "_joined_dispatch_cls" in self.__class__.__dict__ 

196 

197 return self._joined_dispatch_cls(self, other) 

198 

199 def __reduce__(self) -> Union[str, Tuple[Any, ...]]: 

200 return _UnpickleDispatch(), (self._instance_cls,) 

201 

202 def _update( 

203 self, other: _Dispatch[_ET], only_propagate: bool = True 

204 ) -> None: 

205 """Populate from the listeners in another :class:`_Dispatch` 

206 object.""" 

207 for ls in other._event_descriptors: 

208 if isinstance(ls, _EmptyListener): 

209 continue 

210 getattr(self, ls.name).for_modify(self)._update( 

211 ls, only_propagate=only_propagate 

212 ) 

213 

214 def _clear(self) -> None: 

215 for ls in self._event_descriptors: 

216 ls.for_modify(self).clear() 

217 

218 

219def _remove_dispatcher(cls: Type[_HasEventsDispatch[_ET]]) -> None: 

220 for k in cls.dispatch._event_names: 

221 _registrars[k].remove(cls) 

222 if not _registrars[k]: 

223 del _registrars[k] 

224 

225 

226class _HasEventsDispatch(Generic[_ET]): 

227 _dispatch_target: Optional[Type[_ET]] 

228 """class which will receive the .dispatch collection""" 

229 

230 dispatch: _Dispatch[_ET] 

231 """reference back to the _Dispatch class. 

232 

233 Bidirectional against _Dispatch._events 

234 

235 """ 

236 

237 if typing.TYPE_CHECKING: 

238 

239 def __getattr__(self, name: str) -> _InstanceLevelDispatch[_ET]: ... 

240 

241 def __init_subclass__(cls) -> None: 

242 """Intercept new Event subclasses and create associated _Dispatch 

243 classes.""" 

244 

245 cls._create_dispatcher_class(cls.__name__, cls.__bases__, cls.__dict__) 

246 

247 @classmethod 

248 def _accept_with( 

249 cls, target: Union[_ET, Type[_ET]], identifier: str 

250 ) -> Optional[Union[_ET, Type[_ET]]]: 

251 raise NotImplementedError() 

252 

253 @classmethod 

254 def _listen( 

255 cls, 

256 event_key: _EventKey[_ET], 

257 *, 

258 propagate: bool = False, 

259 insert: bool = False, 

260 named: bool = False, 

261 asyncio: bool = False, 

262 ) -> None: 

263 raise NotImplementedError() 

264 

265 @staticmethod 

266 def _set_dispatch( 

267 klass: Type[_HasEventsDispatch[_ET]], 

268 dispatch_cls: Type[_Dispatch[_ET]], 

269 ) -> _Dispatch[_ET]: 

270 # This allows an Events subclass to define additional utility 

271 # methods made available to the target via 

272 # "self.dispatch._events.<utilitymethod>" 

273 # @staticmethod to allow easy "super" calls while in a metaclass 

274 # constructor. 

275 klass.dispatch = dispatch_cls(None) 

276 dispatch_cls._events = klass 

277 return klass.dispatch 

278 

279 @classmethod 

280 def _create_dispatcher_class( 

281 cls, classname: str, bases: Tuple[type, ...], dict_: Mapping[str, Any] 

282 ) -> None: 

283 """Create a :class:`._Dispatch` class corresponding to an 

284 :class:`.Events` class.""" 

285 

286 # there's all kinds of ways to do this, 

287 # i.e. make a Dispatch class that shares the '_listen' method 

288 # of the Event class, this is the straight monkeypatch. 

289 if hasattr(cls, "dispatch"): 

290 dispatch_base = cls.dispatch.__class__ 

291 else: 

292 dispatch_base = _Dispatch 

293 

294 event_names = [k for k in dict_ if _is_event_name(k)] 

295 dispatch_cls = cast( 

296 "Type[_Dispatch[_ET]]", 

297 type( 

298 "%sDispatch" % classname, 

299 (dispatch_base,), 

300 {"__slots__": event_names}, 

301 ), 

302 ) 

303 

304 dispatch_cls._event_names = event_names 

305 dispatch_inst = cls._set_dispatch(cls, dispatch_cls) 

306 for k in dispatch_cls._event_names: 

307 setattr(dispatch_inst, k, _ClsLevelDispatch(cls, dict_[k])) 

308 _registrars[k].append(cls) 

309 

310 for super_ in dispatch_cls.__bases__: 

311 if issubclass(super_, _Dispatch) and super_ is not _Dispatch: 

312 for ls in super_._events.dispatch._event_descriptors: 

313 setattr(dispatch_inst, ls.name, ls) 

314 dispatch_cls._event_names.append(ls.name) 

315 

316 if getattr(cls, "_dispatch_target", None): 

317 dispatch_target_cls = cls._dispatch_target 

318 assert dispatch_target_cls is not None 

319 if ( 

320 hasattr(dispatch_target_cls, "__slots__") 

321 and "_slots_dispatch" in dispatch_target_cls.__slots__ 

322 ): 

323 dispatch_target_cls.dispatch = slots_dispatcher(cls) 

324 else: 

325 dispatch_target_cls.dispatch = dispatcher(cls) 

326 

327 klass = type( 

328 "Joined%s" % dispatch_cls.__name__, 

329 (_JoinedDispatcher,), 

330 {"__slots__": event_names}, 

331 ) 

332 dispatch_cls._joined_dispatch_cls = klass 

333 

334 # establish pickle capability by adding it to this module 

335 globals()[klass.__name__] = klass 

336 

337 

338class _JoinedDispatcher(_DispatchCommon[_ET]): 

339 """Represent a connection between two _Dispatch objects.""" 

340 

341 __slots__ = "local", "parent", "_instance_cls" 

342 

343 local: _DispatchCommon[_ET] 

344 parent: _DispatchCommon[_ET] 

345 _instance_cls: Optional[Type[_ET]] 

346 

347 def __init__( 

348 self, local: _DispatchCommon[_ET], parent: _DispatchCommon[_ET] 

349 ): 

350 self.local = local 

351 self.parent = parent 

352 self._instance_cls = self.local._instance_cls 

353 

354 def __reduce__(self) -> Any: 

355 return (self.__class__, (self.local, self.parent)) 

356 

357 def __getattr__(self, name: str) -> _JoinedListener[_ET]: 

358 # Assign _JoinedListeners as attributes on demand 

359 # to reduce startup time for new dispatch objects. 

360 ls = getattr(self.local, name) 

361 jl = _JoinedListener(self.parent, ls.name, ls) 

362 setattr(self, ls.name, jl) 

363 return jl 

364 

365 def _listen(self, event_key: _EventKey[_ET], **kw: Any) -> None: 

366 return self.parent._listen(event_key, **kw) 

367 

368 @property 

369 def _events(self) -> Type[_HasEventsDispatch[_ET]]: 

370 return self.parent._events 

371 

372 

373class Events(_HasEventsDispatch[_ET]): 

374 """Define event listening functions for a particular target type.""" 

375 

376 @classmethod 

377 def _accept_with( 

378 cls, target: Union[_ET, Type[_ET]], identifier: str 

379 ) -> Optional[Union[_ET, Type[_ET]]]: 

380 def dispatch_is(*types: Type[Any]) -> bool: 

381 return all(isinstance(target.dispatch, t) for t in types) 

382 

383 def dispatch_parent_is(t: Type[Any]) -> bool: 

384 parent = cast("_JoinedDispatcher[_ET]", target.dispatch).parent 

385 while isinstance(parent, _JoinedDispatcher): 

386 parent = cast("_JoinedDispatcher[_ET]", parent).parent 

387 

388 return isinstance(parent, t) 

389 

390 # Mapper, ClassManager, Session override this to 

391 # also accept classes, scoped_sessions, sessionmakers, etc. 

392 if hasattr(target, "dispatch"): 

393 if ( 

394 dispatch_is(cls.dispatch.__class__) 

395 or dispatch_is(type, cls.dispatch.__class__) 

396 or ( 

397 dispatch_is(_JoinedDispatcher) 

398 and dispatch_parent_is(cls.dispatch.__class__) 

399 ) 

400 ): 

401 return target 

402 

403 return None 

404 

405 @classmethod 

406 def _listen( 

407 cls, 

408 event_key: _EventKey[_ET], 

409 *, 

410 propagate: bool = False, 

411 insert: bool = False, 

412 named: bool = False, 

413 asyncio: bool = False, 

414 ) -> None: 

415 event_key.base_listen( 

416 propagate=propagate, insert=insert, named=named, asyncio=asyncio 

417 ) 

418 

419 @classmethod 

420 def _remove(cls, event_key: _EventKey[_ET]) -> None: 

421 event_key.remove() 

422 

423 @classmethod 

424 def _clear(cls) -> None: 

425 cls.dispatch._clear() 

426 

427 

428class dispatcher(Generic[_ET]): 

429 """Descriptor used by target classes to 

430 deliver the _Dispatch class at the class level 

431 and produce new _Dispatch instances for target 

432 instances. 

433 

434 """ 

435 

436 def __init__(self, events: Type[_HasEventsDispatch[_ET]]): 

437 self.dispatch = events.dispatch 

438 self.events = events 

439 

440 @overload 

441 def __get__( 

442 self, obj: Literal[None], cls: Type[Any] 

443 ) -> Type[_Dispatch[_ET]]: ... 

444 

445 @overload 

446 def __get__(self, obj: Any, cls: Type[Any]) -> _DispatchCommon[_ET]: ... 

447 

448 def __get__(self, obj: Any, cls: Type[Any]) -> Any: 

449 if obj is None: 

450 return self.dispatch 

451 

452 disp = self.dispatch._for_instance(obj) 

453 try: 

454 obj.__dict__["dispatch"] = disp 

455 except AttributeError as ae: 

456 raise TypeError( 

457 "target %r doesn't have __dict__, should it be " 

458 "defining _slots_dispatch?" % (obj,) 

459 ) from ae 

460 return disp 

461 

462 

463class slots_dispatcher(dispatcher[_ET]): 

464 def __get__(self, obj: Any, cls: Type[Any]) -> Any: 

465 if obj is None: 

466 return self.dispatch 

467 

468 if hasattr(obj, "_slots_dispatch"): 

469 return obj._slots_dispatch 

470 

471 disp = self.dispatch._for_instance(obj) 

472 obj._slots_dispatch = disp 

473 return disp