Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/event/base.py: 75%

155 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# event/base.py 

2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Base implementation classes. 

9 

10The public-facing ``Events`` serves as the base class for an event interface; 

11its public attributes represent different kinds of events. These attributes 

12are mirrored onto a ``_Dispatch`` class, which serves as a container for 

13collections of listener functions. These collections are represented both 

14at the class level of a particular ``_Dispatch`` class as well as within 

15instances of ``_Dispatch``. 

16 

17""" 

18from __future__ import absolute_import 

19 

20import weakref 

21 

22from .attr import _ClsLevelDispatch 

23from .attr import _EmptyListener 

24from .attr import _JoinedListener 

25from .. import util 

26 

27 

28_registrars = util.defaultdict(list) 

29 

30 

31def _is_event_name(name): 

32 # _sa_event prefix is special to support internal-only event names. 

33 # most event names are just plain method names that aren't 

34 # underscored. 

35 

36 return ( 

37 not name.startswith("_") and name != "dispatch" 

38 ) or name.startswith("_sa_event") 

39 

40 

41class _UnpickleDispatch(object): 

42 """Serializable callable that re-generates an instance of 

43 :class:`_Dispatch` given a particular :class:`.Events` subclass. 

44 

45 """ 

46 

47 def __call__(self, _instance_cls): 

48 for cls in _instance_cls.__mro__: 

49 if "dispatch" in cls.__dict__: 

50 return cls.__dict__["dispatch"].dispatch._for_class( 

51 _instance_cls 

52 ) 

53 else: 

54 raise AttributeError("No class with a 'dispatch' member present.") 

55 

56 

57class _Dispatch(object): 

58 """Mirror the event listening definitions of an Events class with 

59 listener collections. 

60 

61 Classes which define a "dispatch" member will return a 

62 non-instantiated :class:`._Dispatch` subclass when the member 

63 is accessed at the class level. When the "dispatch" member is 

64 accessed at the instance level of its owner, an instance 

65 of the :class:`._Dispatch` class is returned. 

66 

67 A :class:`._Dispatch` class is generated for each :class:`.Events` 

68 class defined, by the :func:`._create_dispatcher_class` function. 

69 The original :class:`.Events` classes remain untouched. 

70 This decouples the construction of :class:`.Events` subclasses from 

71 the implementation used by the event internals, and allows 

72 inspecting tools like Sphinx to work in an unsurprising 

73 way against the public API. 

74 

75 """ 

76 

77 # In one ORM edge case, an attribute is added to _Dispatch, 

78 # so __dict__ is used in just that case and potentially others. 

79 __slots__ = "_parent", "_instance_cls", "__dict__", "_empty_listeners" 

80 

81 _empty_listener_reg = weakref.WeakKeyDictionary() 

82 

83 def __init__(self, parent, instance_cls=None): 

84 self._parent = parent 

85 self._instance_cls = instance_cls 

86 

87 if instance_cls: 

88 try: 

89 self._empty_listeners = self._empty_listener_reg[instance_cls] 

90 except KeyError: 

91 self._empty_listeners = self._empty_listener_reg[ 

92 instance_cls 

93 ] = { 

94 ls.name: _EmptyListener(ls, instance_cls) 

95 for ls in parent._event_descriptors 

96 } 

97 else: 

98 self._empty_listeners = {} 

99 

100 def __getattr__(self, name): 

101 # Assign EmptyListeners as attributes on demand 

102 # to reduce startup time for new dispatch objects. 

103 try: 

104 ls = self._empty_listeners[name] 

105 except KeyError: 

106 raise AttributeError(name) 

107 else: 

108 setattr(self, ls.name, ls) 

109 return ls 

110 

111 @property 

112 def _event_descriptors(self): 

113 for k in self._event_names: 

114 # Yield _ClsLevelDispatch related 

115 # to relevant event name. 

116 yield getattr(self, k) 

117 

118 @property 

119 def _listen(self): 

120 return self._events._listen 

121 

122 def _for_class(self, instance_cls): 

123 return self.__class__(self, instance_cls) 

124 

125 def _for_instance(self, instance): 

126 instance_cls = instance.__class__ 

127 return self._for_class(instance_cls) 

128 

129 def _join(self, other): 

130 """Create a 'join' of this :class:`._Dispatch` and another. 

131 

132 This new dispatcher will dispatch events to both 

133 :class:`._Dispatch` objects. 

134 

135 """ 

136 if "_joined_dispatch_cls" not in self.__class__.__dict__: 

137 cls = type( 

138 "Joined%s" % self.__class__.__name__, 

139 (_JoinedDispatcher,), 

140 {"__slots__": self._event_names}, 

141 ) 

142 

143 self.__class__._joined_dispatch_cls = cls 

144 return self._joined_dispatch_cls(self, other) 

145 

146 def __reduce__(self): 

147 return _UnpickleDispatch(), (self._instance_cls,) 

148 

149 def _update(self, other, only_propagate=True): 

150 """Populate from the listeners in another :class:`_Dispatch` 

151 object.""" 

152 for ls in other._event_descriptors: 

153 if isinstance(ls, _EmptyListener): 

154 continue 

155 getattr(self, ls.name).for_modify(self)._update( 

156 ls, only_propagate=only_propagate 

157 ) 

158 

159 def _clear(self): 

160 for ls in self._event_descriptors: 

161 ls.for_modify(self).clear() 

162 

163 

164class _EventMeta(type): 

165 """Intercept new Event subclasses and create 

166 associated _Dispatch classes.""" 

167 

168 def __init__(cls, classname, bases, dict_): 

169 _create_dispatcher_class(cls, classname, bases, dict_) 

170 type.__init__(cls, classname, bases, dict_) 

171 

172 

173def _create_dispatcher_class(cls, classname, bases, dict_): 

174 """Create a :class:`._Dispatch` class corresponding to an 

175 :class:`.Events` class.""" 

176 

177 # there's all kinds of ways to do this, 

178 # i.e. make a Dispatch class that shares the '_listen' method 

179 # of the Event class, this is the straight monkeypatch. 

180 if hasattr(cls, "dispatch"): 

181 dispatch_base = cls.dispatch.__class__ 

182 else: 

183 dispatch_base = _Dispatch 

184 

185 event_names = [k for k in dict_ if _is_event_name(k)] 

186 dispatch_cls = type( 

187 "%sDispatch" % classname, (dispatch_base,), {"__slots__": event_names} 

188 ) 

189 

190 dispatch_cls._event_names = event_names 

191 

192 dispatch_inst = cls._set_dispatch(cls, dispatch_cls) 

193 for k in dispatch_cls._event_names: 

194 setattr(dispatch_inst, k, _ClsLevelDispatch(cls, dict_[k])) 

195 _registrars[k].append(cls) 

196 

197 for super_ in dispatch_cls.__bases__: 

198 if issubclass(super_, _Dispatch) and super_ is not _Dispatch: 

199 for ls in super_._events.dispatch._event_descriptors: 

200 setattr(dispatch_inst, ls.name, ls) 

201 dispatch_cls._event_names.append(ls.name) 

202 

203 if getattr(cls, "_dispatch_target", None): 

204 the_cls = cls._dispatch_target 

205 if ( 

206 hasattr(the_cls, "__slots__") 

207 and "_slots_dispatch" in the_cls.__slots__ 

208 ): 

209 cls._dispatch_target.dispatch = slots_dispatcher(cls) 

210 else: 

211 cls._dispatch_target.dispatch = dispatcher(cls) 

212 

213 

214def _remove_dispatcher(cls): 

215 for k in cls.dispatch._event_names: 

216 _registrars[k].remove(cls) 

217 if not _registrars[k]: 

218 del _registrars[k] 

219 

220 

221class Events(util.with_metaclass(_EventMeta, object)): 

222 """Define event listening functions for a particular target type.""" 

223 

224 @staticmethod 

225 def _set_dispatch(cls, dispatch_cls): 

226 # This allows an Events subclass to define additional utility 

227 # methods made available to the target via 

228 # "self.dispatch._events.<utilitymethod>" 

229 # @staticmethod to allow easy "super" calls while in a metaclass 

230 # constructor. 

231 cls.dispatch = dispatch_cls(None) 

232 dispatch_cls._events = cls 

233 return cls.dispatch 

234 

235 @classmethod 

236 def _accept_with(cls, target): 

237 def dispatch_is(*types): 

238 return all(isinstance(target.dispatch, t) for t in types) 

239 

240 def dispatch_parent_is(t): 

241 return isinstance(target.dispatch.parent, t) 

242 

243 # Mapper, ClassManager, Session override this to 

244 # also accept classes, scoped_sessions, sessionmakers, etc. 

245 if hasattr(target, "dispatch"): 

246 if ( 

247 dispatch_is(cls.dispatch.__class__) 

248 or dispatch_is(type, cls.dispatch.__class__) 

249 or ( 

250 dispatch_is(_JoinedDispatcher) 

251 and dispatch_parent_is(cls.dispatch.__class__) 

252 ) 

253 ): 

254 return target 

255 

256 @classmethod 

257 def _listen( 

258 cls, 

259 event_key, 

260 propagate=False, 

261 insert=False, 

262 named=False, 

263 asyncio=False, 

264 ): 

265 event_key.base_listen( 

266 propagate=propagate, insert=insert, named=named, asyncio=asyncio 

267 ) 

268 

269 @classmethod 

270 def _remove(cls, event_key): 

271 event_key.remove() 

272 

273 @classmethod 

274 def _clear(cls): 

275 cls.dispatch._clear() 

276 

277 

278class _JoinedDispatcher(object): 

279 """Represent a connection between two _Dispatch objects.""" 

280 

281 __slots__ = "local", "parent", "_instance_cls" 

282 

283 def __init__(self, local, parent): 

284 self.local = local 

285 self.parent = parent 

286 self._instance_cls = self.local._instance_cls 

287 

288 def __getattr__(self, name): 

289 # Assign _JoinedListeners as attributes on demand 

290 # to reduce startup time for new dispatch objects. 

291 ls = getattr(self.local, name) 

292 jl = _JoinedListener(self.parent, ls.name, ls) 

293 setattr(self, ls.name, jl) 

294 return jl 

295 

296 @property 

297 def _listen(self): 

298 return self.parent._listen 

299 

300 @property 

301 def _events(self): 

302 return self.parent._events 

303 

304 

305class dispatcher(object): 

306 """Descriptor used by target classes to 

307 deliver the _Dispatch class at the class level 

308 and produce new _Dispatch instances for target 

309 instances. 

310 

311 """ 

312 

313 def __init__(self, events): 

314 self.dispatch = events.dispatch 

315 self.events = events 

316 

317 def __get__(self, obj, cls): 

318 if obj is None: 

319 return self.dispatch 

320 

321 disp = self.dispatch._for_instance(obj) 

322 try: 

323 obj.__dict__["dispatch"] = disp 

324 except AttributeError as ae: 

325 util.raise_( 

326 TypeError( 

327 "target %r doesn't have __dict__, should it be " 

328 "defining _slots_dispatch?" % (obj,) 

329 ), 

330 replace_context=ae, 

331 ) 

332 return disp 

333 

334 

335class slots_dispatcher(dispatcher): 

336 def __get__(self, obj, cls): 

337 if obj is None: 

338 return self.dispatch 

339 

340 if hasattr(obj, "_slots_dispatch"): 

341 return obj._slots_dispatch 

342 

343 disp = self.dispatch._for_instance(obj) 

344 obj._slots_dispatch = disp 

345 return disp