Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/event/attr.py: 63%

234 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# event/attr.py 

2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Attribute implementation for _Dispatch classes. 

9 

10The various listener targets for a particular event class are represented 

11as attributes, which refer to collections of listeners to be fired off. 

12These collections can exist at the class level as well as at the instance 

13level. An event is fired off using code like this:: 

14 

15 some_object.dispatch.first_connect(arg1, arg2) 

16 

17Above, ``some_object.dispatch`` would be an instance of ``_Dispatch`` and 

18``first_connect`` is typically an instance of ``_ListenerCollection`` 

19if event listeners are present, or ``_EmptyListener`` if none are present. 

20 

21The attribute mechanics here spend effort trying to ensure listener functions 

22are available with a minimum of function call overhead, that unnecessary 

23objects aren't created (i.e. many empty per-instance listener collections), 

24as well as that everything is garbage collectable when owning references are 

25lost. Other features such as "propagation" of listener functions across 

26many ``_Dispatch`` instances, "joining" of multiple ``_Dispatch`` instances, 

27as well as support for subclass propagation (e.g. events assigned to 

28``Pool`` vs. ``QueuePool``) are all implemented here. 

29 

30""" 

31 

32from __future__ import absolute_import 

33from __future__ import with_statement 

34 

35import collections 

36from itertools import chain 

37import weakref 

38 

39from . import legacy 

40from . import registry 

41from .. import exc 

42from .. import util 

43from ..util import threading 

44from ..util.concurrency import AsyncAdaptedLock 

45 

46 

47class RefCollection(util.MemoizedSlots): 

48 __slots__ = ("ref",) 

49 

50 def _memoized_attr_ref(self): 

51 return weakref.ref(self, registry._collection_gced) 

52 

53 

54class _empty_collection(object): 

55 def append(self, element): 

56 pass 

57 

58 def extend(self, other): 

59 pass 

60 

61 def remove(self, element): 

62 pass 

63 

64 def __iter__(self): 

65 return iter([]) 

66 

67 def clear(self): 

68 pass 

69 

70 

71class _ClsLevelDispatch(RefCollection): 

72 """Class-level events on :class:`._Dispatch` classes.""" 

73 

74 __slots__ = ( 

75 "clsname", 

76 "name", 

77 "arg_names", 

78 "has_kw", 

79 "legacy_signatures", 

80 "_clslevel", 

81 "__weakref__", 

82 ) 

83 

84 def __init__(self, parent_dispatch_cls, fn): 

85 self.name = fn.__name__ 

86 self.clsname = parent_dispatch_cls.__name__ 

87 argspec = util.inspect_getfullargspec(fn) 

88 self.arg_names = argspec.args[1:] 

89 self.has_kw = bool(argspec.varkw) 

90 self.legacy_signatures = list( 

91 reversed( 

92 sorted( 

93 getattr(fn, "_legacy_signatures", []), key=lambda s: s[0] 

94 ) 

95 ) 

96 ) 

97 fn.__doc__ = legacy._augment_fn_docs(self, parent_dispatch_cls, fn) 

98 

99 self._clslevel = weakref.WeakKeyDictionary() 

100 

101 def _adjust_fn_spec(self, fn, named): 

102 if named: 

103 fn = self._wrap_fn_for_kw(fn) 

104 if self.legacy_signatures: 

105 try: 

106 argspec = util.get_callable_argspec(fn, no_self=True) 

107 except TypeError: 

108 pass 

109 else: 

110 fn = legacy._wrap_fn_for_legacy(self, fn, argspec) 

111 return fn 

112 

113 def _wrap_fn_for_kw(self, fn): 

114 def wrap_kw(*args, **kw): 

115 argdict = dict(zip(self.arg_names, args)) 

116 argdict.update(kw) 

117 return fn(**argdict) 

118 

119 return wrap_kw 

120 

121 def _do_insert_or_append(self, event_key, is_append): 

122 target = event_key.dispatch_target 

123 assert isinstance( 

124 target, type 

125 ), "Class-level Event targets must be classes." 

126 if not getattr(target, "_sa_propagate_class_events", True): 

127 raise exc.InvalidRequestError( 

128 "Can't assign an event directly to the %s class" % (target,) 

129 ) 

130 

131 for cls in util.walk_subclasses(target): 

132 if cls is not target and cls not in self._clslevel: 

133 self.update_subclass(cls) 

134 else: 

135 if cls not in self._clslevel: 

136 self.update_subclass(cls) 

137 if is_append: 

138 self._clslevel[cls].append(event_key._listen_fn) 

139 else: 

140 self._clslevel[cls].appendleft(event_key._listen_fn) 

141 registry._stored_in_collection(event_key, self) 

142 

143 def insert(self, event_key, propagate): 

144 self._do_insert_or_append(event_key, is_append=False) 

145 

146 def append(self, event_key, propagate): 

147 self._do_insert_or_append(event_key, is_append=True) 

148 

149 def update_subclass(self, target): 

150 if target not in self._clslevel: 

151 if getattr(target, "_sa_propagate_class_events", True): 

152 self._clslevel[target] = collections.deque() 

153 else: 

154 self._clslevel[target] = _empty_collection() 

155 

156 clslevel = self._clslevel[target] 

157 

158 for cls in target.__mro__[1:]: 

159 if cls in self._clslevel: 

160 clslevel.extend( 

161 [fn for fn in self._clslevel[cls] if fn not in clslevel] 

162 ) 

163 

164 def remove(self, event_key): 

165 target = event_key.dispatch_target 

166 

167 for cls in util.walk_subclasses(target): 

168 if cls in self._clslevel: 

169 self._clslevel[cls].remove(event_key._listen_fn) 

170 registry._removed_from_collection(event_key, self) 

171 

172 def clear(self): 

173 """Clear all class level listeners""" 

174 

175 to_clear = set() 

176 for dispatcher in self._clslevel.values(): 

177 to_clear.update(dispatcher) 

178 dispatcher.clear() 

179 registry._clear(self, to_clear) 

180 

181 def for_modify(self, obj): 

182 """Return an event collection which can be modified. 

183 

184 For _ClsLevelDispatch at the class level of 

185 a dispatcher, this returns self. 

186 

187 """ 

188 return self 

189 

190 

191class _InstanceLevelDispatch(RefCollection): 

192 __slots__ = () 

193 

194 def _adjust_fn_spec(self, fn, named): 

195 return self.parent._adjust_fn_spec(fn, named) 

196 

197 

198class _EmptyListener(_InstanceLevelDispatch): 

199 """Serves as a proxy interface to the events 

200 served by a _ClsLevelDispatch, when there are no 

201 instance-level events present. 

202 

203 Is replaced by _ListenerCollection when instance-level 

204 events are added. 

205 

206 """ 

207 

208 propagate = frozenset() 

209 listeners = () 

210 

211 __slots__ = "parent", "parent_listeners", "name" 

212 

213 def __init__(self, parent, target_cls): 

214 if target_cls not in parent._clslevel: 

215 parent.update_subclass(target_cls) 

216 self.parent = parent # _ClsLevelDispatch 

217 self.parent_listeners = parent._clslevel[target_cls] 

218 self.name = parent.name 

219 

220 def for_modify(self, obj): 

221 """Return an event collection which can be modified. 

222 

223 For _EmptyListener at the instance level of 

224 a dispatcher, this generates a new 

225 _ListenerCollection, applies it to the instance, 

226 and returns it. 

227 

228 """ 

229 result = _ListenerCollection(self.parent, obj._instance_cls) 

230 if getattr(obj, self.name) is self: 

231 setattr(obj, self.name, result) 

232 else: 

233 assert isinstance(getattr(obj, self.name), _JoinedListener) 

234 return result 

235 

236 def _needs_modify(self, *args, **kw): 

237 raise NotImplementedError("need to call for_modify()") 

238 

239 exec_once = ( 

240 exec_once_unless_exception 

241 ) = insert = append = remove = clear = _needs_modify 

242 

243 def __call__(self, *args, **kw): 

244 """Execute this event.""" 

245 

246 for fn in self.parent_listeners: 

247 fn(*args, **kw) 

248 

249 def __len__(self): 

250 return len(self.parent_listeners) 

251 

252 def __iter__(self): 

253 return iter(self.parent_listeners) 

254 

255 def __bool__(self): 

256 return bool(self.parent_listeners) 

257 

258 __nonzero__ = __bool__ 

259 

260 

261class _CompoundListener(_InstanceLevelDispatch): 

262 __slots__ = "_exec_once_mutex", "_exec_once", "_exec_w_sync_once" 

263 

264 def _set_asyncio(self): 

265 self._exec_once_mutex = AsyncAdaptedLock() 

266 

267 def _memoized_attr__exec_once_mutex(self): 

268 return threading.Lock() 

269 

270 def _exec_once_impl(self, retry_on_exception, *args, **kw): 

271 with self._exec_once_mutex: 

272 if not self._exec_once: 

273 try: 

274 self(*args, **kw) 

275 exception = False 

276 except: 

277 exception = True 

278 raise 

279 finally: 

280 if not exception or not retry_on_exception: 

281 self._exec_once = True 

282 

283 def exec_once(self, *args, **kw): 

284 """Execute this event, but only if it has not been 

285 executed already for this collection.""" 

286 

287 if not self._exec_once: 

288 self._exec_once_impl(False, *args, **kw) 

289 

290 def exec_once_unless_exception(self, *args, **kw): 

291 """Execute this event, but only if it has not been 

292 executed already for this collection, or was called 

293 by a previous exec_once_unless_exception call and 

294 raised an exception. 

295 

296 If exec_once was already called, then this method will never run 

297 the callable regardless of whether it raised or not. 

298 

299 .. versionadded:: 1.3.8 

300 

301 """ 

302 if not self._exec_once: 

303 self._exec_once_impl(True, *args, **kw) 

304 

305 def _exec_w_sync_on_first_run(self, *args, **kw): 

306 """Execute this event, and use a mutex if it has not been 

307 executed already for this collection, or was called 

308 by a previous _exec_w_sync_on_first_run call and 

309 raised an exception. 

310 

311 If _exec_w_sync_on_first_run was already called and didn't raise an 

312 exception, then a mutex is not used. 

313 

314 .. versionadded:: 1.4.11 

315 

316 """ 

317 if not self._exec_w_sync_once: 

318 with self._exec_once_mutex: 

319 try: 

320 self(*args, **kw) 

321 except: 

322 raise 

323 else: 

324 self._exec_w_sync_once = True 

325 else: 

326 self(*args, **kw) 

327 

328 def __call__(self, *args, **kw): 

329 """Execute this event.""" 

330 

331 for fn in self.parent_listeners: 

332 fn(*args, **kw) 

333 for fn in self.listeners: 

334 fn(*args, **kw) 

335 

336 def __len__(self): 

337 return len(self.parent_listeners) + len(self.listeners) 

338 

339 def __iter__(self): 

340 return chain(self.parent_listeners, self.listeners) 

341 

342 def __bool__(self): 

343 return bool(self.listeners or self.parent_listeners) 

344 

345 __nonzero__ = __bool__ 

346 

347 

348class _ListenerCollection(_CompoundListener): 

349 """Instance-level attributes on instances of :class:`._Dispatch`. 

350 

351 Represents a collection of listeners. 

352 

353 As of 0.7.9, _ListenerCollection is only first 

354 created via the _EmptyListener.for_modify() method. 

355 

356 """ 

357 

358 __slots__ = ( 

359 "parent_listeners", 

360 "parent", 

361 "name", 

362 "listeners", 

363 "propagate", 

364 "__weakref__", 

365 ) 

366 

367 def __init__(self, parent, target_cls): 

368 if target_cls not in parent._clslevel: 

369 parent.update_subclass(target_cls) 

370 self._exec_once = False 

371 self._exec_w_sync_once = False 

372 self.parent_listeners = parent._clslevel[target_cls] 

373 self.parent = parent 

374 self.name = parent.name 

375 self.listeners = collections.deque() 

376 self.propagate = set() 

377 

378 def for_modify(self, obj): 

379 """Return an event collection which can be modified. 

380 

381 For _ListenerCollection at the instance level of 

382 a dispatcher, this returns self. 

383 

384 """ 

385 return self 

386 

387 def _update(self, other, only_propagate=True): 

388 """Populate from the listeners in another :class:`_Dispatch` 

389 object.""" 

390 

391 existing_listeners = self.listeners 

392 existing_listener_set = set(existing_listeners) 

393 self.propagate.update(other.propagate) 

394 other_listeners = [ 

395 l 

396 for l in other.listeners 

397 if l not in existing_listener_set 

398 and not only_propagate 

399 or l in self.propagate 

400 ] 

401 

402 existing_listeners.extend(other_listeners) 

403 

404 to_associate = other.propagate.union(other_listeners) 

405 registry._stored_in_collection_multi(self, other, to_associate) 

406 

407 def insert(self, event_key, propagate): 

408 if event_key.prepend_to_list(self, self.listeners): 

409 if propagate: 

410 self.propagate.add(event_key._listen_fn) 

411 

412 def append(self, event_key, propagate): 

413 if event_key.append_to_list(self, self.listeners): 

414 if propagate: 

415 self.propagate.add(event_key._listen_fn) 

416 

417 def remove(self, event_key): 

418 self.listeners.remove(event_key._listen_fn) 

419 self.propagate.discard(event_key._listen_fn) 

420 registry._removed_from_collection(event_key, self) 

421 

422 def clear(self): 

423 registry._clear(self, self.listeners) 

424 self.propagate.clear() 

425 self.listeners.clear() 

426 

427 

428class _JoinedListener(_CompoundListener): 

429 __slots__ = "parent", "name", "local", "parent_listeners" 

430 

431 def __init__(self, parent, name, local): 

432 self._exec_once = False 

433 self.parent = parent 

434 self.name = name 

435 self.local = local 

436 self.parent_listeners = self.local 

437 

438 @property 

439 def listeners(self): 

440 return getattr(self.parent, self.name) 

441 

442 def _adjust_fn_spec(self, fn, named): 

443 return self.local._adjust_fn_spec(fn, named) 

444 

445 def for_modify(self, obj): 

446 self.local = self.parent_listeners = self.local.for_modify(obj) 

447 return self 

448 

449 def insert(self, event_key, propagate): 

450 self.local.insert(event_key, propagate) 

451 

452 def append(self, event_key, propagate): 

453 self.local.append(event_key, propagate) 

454 

455 def remove(self, event_key): 

456 self.local.remove(event_key) 

457 

458 def clear(self): 

459 raise NotImplementedError()