Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/event/base.py: 80%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

210 statements  

1# event/base.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Base implementation classes. 

9 

10The public-facing ``Events`` serves as the base class for an event interface; 

11its public attributes represent different kinds of events. These attributes 

12are mirrored onto a ``_Dispatch`` class, which serves as a container for 

13collections of listener functions. These collections are represented both 

14at the class level of a particular ``_Dispatch`` class as well as within 

15instances of ``_Dispatch``. 

16 

17""" 

18from __future__ import annotations 

19 

20import typing 

21from typing import Any 

22from typing import cast 

23from typing import Dict 

24from typing import Generic 

25from typing import Iterator 

26from typing import List 

27from typing import Mapping 

28from typing import MutableMapping 

29from typing import Optional 

30from typing import overload 

31from typing import Tuple 

32from typing import Type 

33from typing import Union 

34import weakref 

35 

36from .attr import _ClsLevelDispatch 

37from .attr import _EmptyListener 

38from .attr import _InstanceLevelDispatch 

39from .attr import _JoinedListener 

40from .registry import _ET 

41from .registry import _EventKey 

42from .. import util 

43from ..util.typing import Literal 

44 

45_registrars: MutableMapping[str, List[Type[_HasEventsDispatch[Any]]]] = ( 

46 util.defaultdict(list) 

47) 

48 

49 

50def _is_event_name(name: str) -> bool: 

51 # _sa_event prefix is special to support internal-only event names. 

52 # most event names are just plain method names that aren't 

53 # underscored. 

54 

55 return ( 

56 not name.startswith("_") and name != "dispatch" 

57 ) or name.startswith("_sa_event") 

58 

59 

60class _UnpickleDispatch: 

61 """Serializable callable that re-generates an instance of 

62 :class:`_Dispatch` given a particular :class:`.Events` subclass. 

63 

64 """ 

65 

66 def __call__(self, _instance_cls: Type[_ET]) -> _Dispatch[_ET]: 

67 for cls in _instance_cls.__mro__: 

68 if "dispatch" in cls.__dict__: 

69 return cast( 

70 "_Dispatch[_ET]", cls.__dict__["dispatch"].dispatch 

71 )._for_class(_instance_cls) 

72 else: 

73 raise AttributeError("No class with a 'dispatch' member present.") 

74 

75 

76class _DispatchCommon(Generic[_ET]): 

77 __slots__ = () 

78 

79 _instance_cls: Optional[Type[_ET]] 

80 

81 def _join(self, other: _DispatchCommon[_ET]) -> _JoinedDispatcher[_ET]: 

82 raise NotImplementedError() 

83 

84 def __getattr__(self, name: str) -> _InstanceLevelDispatch[_ET]: 

85 raise NotImplementedError() 

86 

87 @property 

88 def _events(self) -> Type[_HasEventsDispatch[_ET]]: 

89 raise NotImplementedError() 

90 

91 

92class _Dispatch(_DispatchCommon[_ET]): 

93 """Mirror the event listening definitions of an Events class with 

94 listener collections. 

95 

96 Classes which define a "dispatch" member will return a 

97 non-instantiated :class:`._Dispatch` subclass when the member 

98 is accessed at the class level. When the "dispatch" member is 

99 accessed at the instance level of its owner, an instance 

100 of the :class:`._Dispatch` class is returned. 

101 

102 A :class:`._Dispatch` class is generated for each :class:`.Events` 

103 class defined, by the :meth:`._HasEventsDispatch._create_dispatcher_class` 

104 method. The original :class:`.Events` classes remain untouched. 

105 This decouples the construction of :class:`.Events` subclasses from 

106 the implementation used by the event internals, and allows 

107 inspecting tools like Sphinx to work in an unsurprising 

108 way against the public API. 

109 

110 """ 

111 

112 # "active_history" is an ORM case we add here. ideally a better 

113 # system would be in place for ad-hoc attributes. 

114 __slots__ = "_parent", "_instance_cls", "__dict__", "_empty_listeners" 

115 

116 _active_history: bool 

117 

118 _empty_listener_reg: MutableMapping[ 

119 Type[_ET], Dict[str, _EmptyListener[_ET]] 

120 ] = weakref.WeakKeyDictionary() 

121 

122 _empty_listeners: Dict[str, _EmptyListener[_ET]] 

123 

124 _event_names: List[str] 

125 

126 _instance_cls: Optional[Type[_ET]] 

127 

128 _joined_dispatch_cls: Type[_JoinedDispatcher[_ET]] 

129 

130 _events: Type[_HasEventsDispatch[_ET]] 

131 """reference back to the Events class. 

132 

133 Bidirectional against _HasEventsDispatch.dispatch 

134 

135 """ 

136 

137 def __init__( 

138 self, 

139 parent: Optional[_Dispatch[_ET]], 

140 instance_cls: Optional[Type[_ET]] = None, 

141 ): 

142 self._parent = parent 

143 self._instance_cls = instance_cls 

144 

145 if instance_cls: 

146 assert parent is not None 

147 try: 

148 self._empty_listeners = self._empty_listener_reg[instance_cls] 

149 except KeyError: 

150 self._empty_listeners = self._empty_listener_reg[ 

151 instance_cls 

152 ] = { 

153 ls.name: _EmptyListener(ls, instance_cls) 

154 for ls in parent._event_descriptors 

155 } 

156 else: 

157 self._empty_listeners = {} 

158 

159 def __getattr__(self, name: str) -> _InstanceLevelDispatch[_ET]: 

160 # Assign EmptyListeners as attributes on demand 

161 # to reduce startup time for new dispatch objects. 

162 try: 

163 ls = self._empty_listeners[name] 

164 except KeyError: 

165 raise AttributeError(name) 

166 else: 

167 setattr(self, ls.name, ls) 

168 return ls 

169 

170 @property 

171 def _event_descriptors(self) -> Iterator[_ClsLevelDispatch[_ET]]: 

172 for k in self._event_names: 

173 # Yield _ClsLevelDispatch related 

174 # to relevant event name. 

175 yield getattr(self, k) 

176 

177 def _listen(self, event_key: _EventKey[_ET], **kw: Any) -> None: 

178 return self._events._listen(event_key, **kw) 

179 

180 def _for_class(self, instance_cls: Type[_ET]) -> _Dispatch[_ET]: 

181 return self.__class__(self, instance_cls) 

182 

183 def _for_instance(self, instance: _ET) -> _Dispatch[_ET]: 

184 instance_cls = instance.__class__ 

185 return self._for_class(instance_cls) 

186 

187 def _join(self, other: _DispatchCommon[_ET]) -> _JoinedDispatcher[_ET]: 

188 """Create a 'join' of this :class:`._Dispatch` and another. 

189 

190 This new dispatcher will dispatch events to both 

191 :class:`._Dispatch` objects. 

192 

193 """ 

194 assert "_joined_dispatch_cls" in self.__class__.__dict__ 

195 

196 return self._joined_dispatch_cls(self, other) 

197 

198 def __reduce__(self) -> Union[str, Tuple[Any, ...]]: 

199 return _UnpickleDispatch(), (self._instance_cls,) 

200 

201 def _update( 

202 self, other: _Dispatch[_ET], only_propagate: bool = True 

203 ) -> None: 

204 """Populate from the listeners in another :class:`_Dispatch` 

205 object.""" 

206 for ls in other._event_descriptors: 

207 if isinstance(ls, _EmptyListener): 

208 continue 

209 getattr(self, ls.name).for_modify(self)._update( 

210 ls, only_propagate=only_propagate 

211 ) 

212 

213 def _clear(self) -> None: 

214 for ls in self._event_descriptors: 

215 ls.for_modify(self).clear() 

216 

217 

218def _remove_dispatcher(cls: Type[_HasEventsDispatch[_ET]]) -> None: 

219 for k in cls.dispatch._event_names: 

220 _registrars[k].remove(cls) 

221 if not _registrars[k]: 

222 del _registrars[k] 

223 

224 

225class _HasEventsDispatch(Generic[_ET]): 

226 _dispatch_target: Optional[Type[_ET]] 

227 """class which will receive the .dispatch collection""" 

228 

229 dispatch: _Dispatch[_ET] 

230 """reference back to the _Dispatch class. 

231 

232 Bidirectional against _Dispatch._events 

233 

234 """ 

235 

236 if typing.TYPE_CHECKING: 

237 

238 def __getattr__(self, name: str) -> _InstanceLevelDispatch[_ET]: ... 

239 

240 def __init_subclass__(cls) -> None: 

241 """Intercept new Event subclasses and create associated _Dispatch 

242 classes.""" 

243 

244 cls._create_dispatcher_class(cls.__name__, cls.__bases__, cls.__dict__) 

245 

246 @classmethod 

247 def _accept_with( 

248 cls, target: Union[_ET, Type[_ET]], identifier: str 

249 ) -> Optional[Union[_ET, Type[_ET]]]: 

250 raise NotImplementedError() 

251 

252 @classmethod 

253 def _listen( 

254 cls, 

255 event_key: _EventKey[_ET], 

256 *, 

257 propagate: bool = False, 

258 insert: bool = False, 

259 named: bool = False, 

260 asyncio: bool = False, 

261 ) -> None: 

262 raise NotImplementedError() 

263 

264 @staticmethod 

265 def _set_dispatch( 

266 klass: Type[_HasEventsDispatch[_ET]], 

267 dispatch_cls: Type[_Dispatch[_ET]], 

268 ) -> _Dispatch[_ET]: 

269 # This allows an Events subclass to define additional utility 

270 # methods made available to the target via 

271 # "self.dispatch._events.<utilitymethod>" 

272 # @staticmethod to allow easy "super" calls while in a metaclass 

273 # constructor. 

274 klass.dispatch = dispatch_cls(None) 

275 dispatch_cls._events = klass 

276 return klass.dispatch 

277 

278 @classmethod 

279 def _create_dispatcher_class( 

280 cls, classname: str, bases: Tuple[type, ...], dict_: Mapping[str, Any] 

281 ) -> None: 

282 """Create a :class:`._Dispatch` class corresponding to an 

283 :class:`.Events` class.""" 

284 

285 # there's all kinds of ways to do this, 

286 # i.e. make a Dispatch class that shares the '_listen' method 

287 # of the Event class, this is the straight monkeypatch. 

288 if hasattr(cls, "dispatch"): 

289 dispatch_base = cls.dispatch.__class__ 

290 else: 

291 dispatch_base = _Dispatch 

292 

293 event_names = [k for k in dict_ if _is_event_name(k)] 

294 dispatch_cls = cast( 

295 "Type[_Dispatch[_ET]]", 

296 type( 

297 "%sDispatch" % classname, 

298 (dispatch_base,), 

299 {"__slots__": event_names}, 

300 ), 

301 ) 

302 

303 dispatch_cls._event_names = event_names 

304 dispatch_inst = cls._set_dispatch(cls, dispatch_cls) 

305 for k in dispatch_cls._event_names: 

306 setattr(dispatch_inst, k, _ClsLevelDispatch(cls, dict_[k])) 

307 _registrars[k].append(cls) 

308 

309 for super_ in dispatch_cls.__bases__: 

310 if issubclass(super_, _Dispatch) and super_ is not _Dispatch: 

311 for ls in super_._events.dispatch._event_descriptors: 

312 setattr(dispatch_inst, ls.name, ls) 

313 dispatch_cls._event_names.append(ls.name) 

314 

315 if getattr(cls, "_dispatch_target", None): 

316 dispatch_target_cls = cls._dispatch_target 

317 assert dispatch_target_cls is not None 

318 if ( 

319 hasattr(dispatch_target_cls, "__slots__") 

320 and "_slots_dispatch" in dispatch_target_cls.__slots__ 

321 ): 

322 dispatch_target_cls.dispatch = slots_dispatcher(cls) 

323 else: 

324 dispatch_target_cls.dispatch = dispatcher(cls) 

325 

326 klass = type( 

327 "Joined%s" % dispatch_cls.__name__, 

328 (_JoinedDispatcher,), 

329 {"__slots__": event_names}, 

330 ) 

331 dispatch_cls._joined_dispatch_cls = klass 

332 

333 # establish pickle capability by adding it to this module 

334 globals()[klass.__name__] = klass 

335 

336 

337class _JoinedDispatcher(_DispatchCommon[_ET]): 

338 """Represent a connection between two _Dispatch objects.""" 

339 

340 __slots__ = "local", "parent", "_instance_cls" 

341 

342 local: _DispatchCommon[_ET] 

343 parent: _DispatchCommon[_ET] 

344 _instance_cls: Optional[Type[_ET]] 

345 

346 def __init__( 

347 self, local: _DispatchCommon[_ET], parent: _DispatchCommon[_ET] 

348 ): 

349 self.local = local 

350 self.parent = parent 

351 self._instance_cls = self.local._instance_cls 

352 

353 def __reduce__(self) -> Any: 

354 return (self.__class__, (self.local, self.parent)) 

355 

356 def __getattr__(self, name: str) -> _JoinedListener[_ET]: 

357 # Assign _JoinedListeners as attributes on demand 

358 # to reduce startup time for new dispatch objects. 

359 ls = getattr(self.local, name) 

360 jl = _JoinedListener(self.parent, ls.name, ls) 

361 setattr(self, ls.name, jl) 

362 return jl 

363 

364 def _listen(self, event_key: _EventKey[_ET], **kw: Any) -> None: 

365 return self.parent._listen(event_key, **kw) 

366 

367 @property 

368 def _events(self) -> Type[_HasEventsDispatch[_ET]]: 

369 return self.parent._events 

370 

371 

372class Events(_HasEventsDispatch[_ET]): 

373 """Define event listening functions for a particular target type.""" 

374 

375 @classmethod 

376 def _accept_with( 

377 cls, target: Union[_ET, Type[_ET]], identifier: str 

378 ) -> Optional[Union[_ET, Type[_ET]]]: 

379 def dispatch_is(*types: Type[Any]) -> bool: 

380 return all(isinstance(target.dispatch, t) for t in types) 

381 

382 def dispatch_parent_is(t: Type[Any]) -> bool: 

383 return isinstance( 

384 cast("_JoinedDispatcher[_ET]", target.dispatch).parent, t 

385 ) 

386 

387 # Mapper, ClassManager, Session override this to 

388 # also accept classes, scoped_sessions, sessionmakers, etc. 

389 if hasattr(target, "dispatch"): 

390 if ( 

391 dispatch_is(cls.dispatch.__class__) 

392 or dispatch_is(type, cls.dispatch.__class__) 

393 or ( 

394 dispatch_is(_JoinedDispatcher) 

395 and dispatch_parent_is(cls.dispatch.__class__) 

396 ) 

397 ): 

398 return target 

399 

400 return None 

401 

402 @classmethod 

403 def _listen( 

404 cls, 

405 event_key: _EventKey[_ET], 

406 *, 

407 propagate: bool = False, 

408 insert: bool = False, 

409 named: bool = False, 

410 asyncio: bool = False, 

411 ) -> None: 

412 event_key.base_listen( 

413 propagate=propagate, insert=insert, named=named, asyncio=asyncio 

414 ) 

415 

416 @classmethod 

417 def _remove(cls, event_key: _EventKey[_ET]) -> None: 

418 event_key.remove() 

419 

420 @classmethod 

421 def _clear(cls) -> None: 

422 cls.dispatch._clear() 

423 

424 

425class dispatcher(Generic[_ET]): 

426 """Descriptor used by target classes to 

427 deliver the _Dispatch class at the class level 

428 and produce new _Dispatch instances for target 

429 instances. 

430 

431 """ 

432 

433 def __init__(self, events: Type[_HasEventsDispatch[_ET]]): 

434 self.dispatch = events.dispatch 

435 self.events = events 

436 

437 @overload 

438 def __get__( 

439 self, obj: Literal[None], cls: Type[Any] 

440 ) -> Type[_Dispatch[_ET]]: ... 

441 

442 @overload 

443 def __get__(self, obj: Any, cls: Type[Any]) -> _DispatchCommon[_ET]: ... 

444 

445 def __get__(self, obj: Any, cls: Type[Any]) -> Any: 

446 if obj is None: 

447 return self.dispatch 

448 

449 disp = self.dispatch._for_instance(obj) 

450 try: 

451 obj.__dict__["dispatch"] = disp 

452 except AttributeError as ae: 

453 raise TypeError( 

454 "target %r doesn't have __dict__, should it be " 

455 "defining _slots_dispatch?" % (obj,) 

456 ) from ae 

457 return disp 

458 

459 

460class slots_dispatcher(dispatcher[_ET]): 

461 def __get__(self, obj: Any, cls: Type[Any]) -> Any: 

462 if obj is None: 

463 return self.dispatch 

464 

465 if hasattr(obj, "_slots_dispatch"): 

466 return obj._slots_dispatch 

467 

468 disp = self.dispatch._for_instance(obj) 

469 obj._slots_dispatch = disp 

470 return disp