Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/event/attr.py: 69%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

357 statements  

1# event/attr.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Attribute implementation for _Dispatch classes. 

9 

10The various listener targets for a particular event class are represented 

11as attributes, which refer to collections of listeners to be fired off. 

12These collections can exist at the class level as well as at the instance 

13level. An event is fired off using code like this:: 

14 

15 some_object.dispatch.first_connect(arg1, arg2) 

16 

17Above, ``some_object.dispatch`` would be an instance of ``_Dispatch`` and 

18``first_connect`` is typically an instance of ``_ListenerCollection`` 

19if event listeners are present, or ``_EmptyListener`` if none are present. 

20 

21The attribute mechanics here spend effort trying to ensure listener functions 

22are available with a minimum of function call overhead, that unnecessary 

23objects aren't created (i.e. many empty per-instance listener collections), 

24as well as that everything is garbage collectable when owning references are 

25lost. Other features such as "propagation" of listener functions across 

26many ``_Dispatch`` instances, "joining" of multiple ``_Dispatch`` instances, 

27as well as support for subclass propagation (e.g. events assigned to 

28``Pool`` vs. ``QueuePool``) are all implemented here. 

29 

30""" 

31 

32from __future__ import annotations 

33 

34import collections 

35from itertools import chain 

36import threading 

37from types import TracebackType 

38import typing 

39from typing import Any 

40from typing import cast 

41from typing import Collection 

42from typing import Deque 

43from typing import FrozenSet 

44from typing import Generic 

45from typing import Iterator 

46from typing import MutableMapping 

47from typing import MutableSequence 

48from typing import NoReturn 

49from typing import Optional 

50from typing import Sequence 

51from typing import Set 

52from typing import Tuple 

53from typing import Type 

54from typing import TypeVar 

55from typing import Union 

56import weakref 

57 

58from . import legacy 

59from . import registry 

60from .registry import _ET 

61from .registry import _EventKey 

62from .registry import _ListenerFnType 

63from .. import exc 

64from .. import util 

65from ..util.concurrency import AsyncAdaptedLock 

66from ..util.typing import Protocol 

67 

68_T = TypeVar("_T", bound=Any) 

69 

70if typing.TYPE_CHECKING: 

71 from .base import _Dispatch 

72 from .base import _DispatchCommon 

73 from .base import _HasEventsDispatch 

74 

75 

76class RefCollection(util.MemoizedSlots, Generic[_ET]): 

77 __slots__ = ("ref",) 

78 

79 ref: weakref.ref[RefCollection[_ET]] 

80 

81 def _memoized_attr_ref(self) -> weakref.ref[RefCollection[_ET]]: 

82 return weakref.ref(self, registry._collection_gced) 

83 

84 

85class _empty_collection(Collection[_T]): 

86 def append(self, element: _T) -> None: 

87 pass 

88 

89 def appendleft(self, element: _T) -> None: 

90 pass 

91 

92 def extend(self, other: Sequence[_T]) -> None: 

93 pass 

94 

95 def remove(self, element: _T) -> None: 

96 pass 

97 

98 def __contains__(self, element: Any) -> bool: 

99 return False 

100 

101 def __iter__(self) -> Iterator[_T]: 

102 return iter([]) 

103 

104 def clear(self) -> None: 

105 pass 

106 

107 def __len__(self) -> int: 

108 return 0 

109 

110 

111_ListenerFnSequenceType = Union[Deque[_T], _empty_collection[_T]] 

112 

113 

114class _ClsLevelDispatch(RefCollection[_ET]): 

115 """Class-level events on :class:`._Dispatch` classes.""" 

116 

117 __slots__ = ( 

118 "clsname", 

119 "name", 

120 "arg_names", 

121 "has_kw", 

122 "legacy_signatures", 

123 "_clslevel", 

124 "__weakref__", 

125 ) 

126 

127 clsname: str 

128 name: str 

129 arg_names: Sequence[str] 

130 has_kw: bool 

131 legacy_signatures: MutableSequence[legacy._LegacySignatureType] 

132 _clslevel: MutableMapping[ 

133 Type[_ET], _ListenerFnSequenceType[_ListenerFnType] 

134 ] 

135 

136 def __init__( 

137 self, 

138 parent_dispatch_cls: Type[_HasEventsDispatch[_ET]], 

139 fn: _ListenerFnType, 

140 ): 

141 self.name = fn.__name__ 

142 self.clsname = parent_dispatch_cls.__name__ 

143 argspec = util.inspect_getfullargspec(fn) 

144 self.arg_names = argspec.args[1:] 

145 self.has_kw = bool(argspec.varkw) 

146 self.legacy_signatures = list( 

147 reversed( 

148 sorted( 

149 getattr(fn, "_legacy_signatures", []), key=lambda s: s[0] 

150 ) 

151 ) 

152 ) 

153 fn.__doc__ = legacy._augment_fn_docs(self, parent_dispatch_cls, fn) 

154 

155 self._clslevel = weakref.WeakKeyDictionary() 

156 

157 def _adjust_fn_spec( 

158 self, fn: _ListenerFnType, named: bool 

159 ) -> _ListenerFnType: 

160 if named: 

161 fn = self._wrap_fn_for_kw(fn) 

162 if self.legacy_signatures: 

163 try: 

164 argspec = util.get_callable_argspec(fn, no_self=True) 

165 except TypeError: 

166 pass 

167 else: 

168 fn = legacy._wrap_fn_for_legacy(self, fn, argspec) 

169 return fn 

170 

171 def _wrap_fn_for_kw(self, fn: _ListenerFnType) -> _ListenerFnType: 

172 def wrap_kw(*args: Any, **kw: Any) -> Any: 

173 argdict = dict(zip(self.arg_names, args)) 

174 argdict.update(kw) 

175 return fn(**argdict) 

176 

177 return wrap_kw 

178 

179 def _do_insert_or_append( 

180 self, event_key: _EventKey[_ET], is_append: bool 

181 ) -> None: 

182 target = event_key.dispatch_target 

183 assert isinstance( 

184 target, type 

185 ), "Class-level Event targets must be classes." 

186 if not getattr(target, "_sa_propagate_class_events", True): 

187 raise exc.InvalidRequestError( 

188 f"Can't assign an event directly to the {target} class" 

189 ) 

190 

191 cls: Type[_ET] 

192 

193 for cls in util.walk_subclasses(target): 

194 if cls is not target and cls not in self._clslevel: 

195 self.update_subclass(cls) 

196 else: 

197 if cls not in self._clslevel: 

198 self.update_subclass(cls) 

199 if is_append: 

200 self._clslevel[cls].append(event_key._listen_fn) 

201 else: 

202 self._clslevel[cls].appendleft(event_key._listen_fn) 

203 registry._stored_in_collection(event_key, self) 

204 

205 def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None: 

206 self._do_insert_or_append(event_key, is_append=False) 

207 

208 def append(self, event_key: _EventKey[_ET], propagate: bool) -> None: 

209 self._do_insert_or_append(event_key, is_append=True) 

210 

211 def update_subclass(self, target: Type[_ET]) -> None: 

212 if target not in self._clslevel: 

213 if getattr(target, "_sa_propagate_class_events", True): 

214 self._clslevel[target] = collections.deque() 

215 else: 

216 self._clslevel[target] = _empty_collection() 

217 

218 clslevel = self._clslevel[target] 

219 cls: Type[_ET] 

220 for cls in target.__mro__[1:]: 

221 if cls in self._clslevel: 

222 clslevel.extend( 

223 [fn for fn in self._clslevel[cls] if fn not in clslevel] 

224 ) 

225 

226 def remove(self, event_key: _EventKey[_ET]) -> None: 

227 target = event_key.dispatch_target 

228 cls: Type[_ET] 

229 for cls in util.walk_subclasses(target): 

230 if cls in self._clslevel: 

231 self._clslevel[cls].remove(event_key._listen_fn) 

232 registry._removed_from_collection(event_key, self) 

233 

234 def clear(self) -> None: 

235 """Clear all class level listeners""" 

236 

237 to_clear: Set[_ListenerFnType] = set() 

238 for dispatcher in self._clslevel.values(): 

239 to_clear.update(dispatcher) 

240 dispatcher.clear() 

241 registry._clear(self, to_clear) 

242 

243 def for_modify(self, obj: _Dispatch[_ET]) -> _ClsLevelDispatch[_ET]: 

244 """Return an event collection which can be modified. 

245 

246 For _ClsLevelDispatch at the class level of 

247 a dispatcher, this returns self. 

248 

249 """ 

250 return self 

251 

252 

253class _InstanceLevelDispatch(RefCollection[_ET], Collection[_ListenerFnType]): 

254 __slots__ = () 

255 

256 parent: _ClsLevelDispatch[_ET] 

257 

258 def _adjust_fn_spec( 

259 self, fn: _ListenerFnType, named: bool 

260 ) -> _ListenerFnType: 

261 return self.parent._adjust_fn_spec(fn, named) 

262 

263 def __contains__(self, item: Any) -> bool: 

264 raise NotImplementedError() 

265 

266 def __len__(self) -> int: 

267 raise NotImplementedError() 

268 

269 def __iter__(self) -> Iterator[_ListenerFnType]: 

270 raise NotImplementedError() 

271 

272 def __bool__(self) -> bool: 

273 raise NotImplementedError() 

274 

275 def exec_once(self, *args: Any, **kw: Any) -> None: 

276 raise NotImplementedError() 

277 

278 def exec_once_unless_exception(self, *args: Any, **kw: Any) -> None: 

279 raise NotImplementedError() 

280 

281 def _exec_w_sync_on_first_run(self, *args: Any, **kw: Any) -> None: 

282 raise NotImplementedError() 

283 

284 def __call__(self, *args: Any, **kw: Any) -> None: 

285 raise NotImplementedError() 

286 

287 def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None: 

288 raise NotImplementedError() 

289 

290 def append(self, event_key: _EventKey[_ET], propagate: bool) -> None: 

291 raise NotImplementedError() 

292 

293 def remove(self, event_key: _EventKey[_ET]) -> None: 

294 raise NotImplementedError() 

295 

296 def for_modify( 

297 self, obj: _DispatchCommon[_ET] 

298 ) -> _InstanceLevelDispatch[_ET]: 

299 """Return an event collection which can be modified. 

300 

301 For _ClsLevelDispatch at the class level of 

302 a dispatcher, this returns self. 

303 

304 """ 

305 return self 

306 

307 

308class _EmptyListener(_InstanceLevelDispatch[_ET]): 

309 """Serves as a proxy interface to the events 

310 served by a _ClsLevelDispatch, when there are no 

311 instance-level events present. 

312 

313 Is replaced by _ListenerCollection when instance-level 

314 events are added. 

315 

316 """ 

317 

318 __slots__ = "parent", "parent_listeners", "name" 

319 

320 propagate: FrozenSet[_ListenerFnType] = frozenset() 

321 listeners: Tuple[()] = () 

322 parent: _ClsLevelDispatch[_ET] 

323 parent_listeners: _ListenerFnSequenceType[_ListenerFnType] 

324 name: str 

325 

326 def __init__(self, parent: _ClsLevelDispatch[_ET], target_cls: Type[_ET]): 

327 if target_cls not in parent._clslevel: 

328 parent.update_subclass(target_cls) 

329 self.parent = parent 

330 self.parent_listeners = parent._clslevel[target_cls] 

331 self.name = parent.name 

332 

333 def for_modify( 

334 self, obj: _DispatchCommon[_ET] 

335 ) -> _ListenerCollection[_ET]: 

336 """Return an event collection which can be modified. 

337 

338 For _EmptyListener at the instance level of 

339 a dispatcher, this generates a new 

340 _ListenerCollection, applies it to the instance, 

341 and returns it. 

342 

343 """ 

344 obj = cast("_Dispatch[_ET]", obj) 

345 

346 assert obj._instance_cls is not None 

347 existing = getattr(obj, self.name) 

348 

349 with util.mini_gil: 

350 if existing is self or isinstance(existing, _JoinedListener): 

351 result = _ListenerCollection(self.parent, obj._instance_cls) 

352 else: 

353 # this codepath is an extremely rare race condition 

354 # that has been observed in test_pool.py->test_timeout_race 

355 # with freethreaded. 

356 assert isinstance(existing, _ListenerCollection) 

357 return existing 

358 

359 if existing is self: 

360 setattr(obj, self.name, result) 

361 return result 

362 

363 def _needs_modify(self, *args: Any, **kw: Any) -> NoReturn: 

364 raise NotImplementedError("need to call for_modify()") 

365 

366 def exec_once(self, *args: Any, **kw: Any) -> NoReturn: 

367 self._needs_modify(*args, **kw) 

368 

369 def exec_once_unless_exception(self, *args: Any, **kw: Any) -> NoReturn: 

370 self._needs_modify(*args, **kw) 

371 

372 def insert(self, *args: Any, **kw: Any) -> NoReturn: 

373 self._needs_modify(*args, **kw) 

374 

375 def append(self, *args: Any, **kw: Any) -> NoReturn: 

376 self._needs_modify(*args, **kw) 

377 

378 def remove(self, *args: Any, **kw: Any) -> NoReturn: 

379 self._needs_modify(*args, **kw) 

380 

381 def clear(self, *args: Any, **kw: Any) -> NoReturn: 

382 self._needs_modify(*args, **kw) 

383 

384 def __call__(self, *args: Any, **kw: Any) -> None: 

385 """Execute this event.""" 

386 

387 for fn in self.parent_listeners: 

388 fn(*args, **kw) 

389 

390 def __contains__(self, item: Any) -> bool: 

391 return item in self.parent_listeners 

392 

393 def __len__(self) -> int: 

394 return len(self.parent_listeners) 

395 

396 def __iter__(self) -> Iterator[_ListenerFnType]: 

397 return iter(self.parent_listeners) 

398 

399 def __bool__(self) -> bool: 

400 return bool(self.parent_listeners) 

401 

402 

403class _MutexProtocol(Protocol): 

404 def __enter__(self) -> bool: ... 

405 

406 def __exit__( 

407 self, 

408 exc_type: Optional[Type[BaseException]], 

409 exc_val: Optional[BaseException], 

410 exc_tb: Optional[TracebackType], 

411 ) -> Optional[bool]: ... 

412 

413 

414class _CompoundListener(_InstanceLevelDispatch[_ET]): 

415 __slots__ = ( 

416 "_exec_once_mutex", 

417 "_exec_once", 

418 "_exec_w_sync_once", 

419 "_is_asyncio", 

420 ) 

421 

422 _exec_once_mutex: Optional[_MutexProtocol] 

423 parent_listeners: Collection[_ListenerFnType] 

424 listeners: Collection[_ListenerFnType] 

425 _exec_once: bool 

426 _exec_w_sync_once: bool 

427 

428 def __init__(self, *arg: Any, **kw: Any): 

429 super().__init__(*arg, **kw) 

430 self._is_asyncio = False 

431 

432 def _set_asyncio(self) -> None: 

433 self._is_asyncio = True 

434 

435 def _get_exec_once_mutex(self) -> _MutexProtocol: 

436 with util.mini_gil: 

437 if self._exec_once_mutex is not None: 

438 return self._exec_once_mutex 

439 

440 if self._is_asyncio: 

441 mutex = AsyncAdaptedLock() 

442 else: 

443 mutex = threading.Lock() # type: ignore[assignment] 

444 self._exec_once_mutex = mutex 

445 

446 return mutex 

447 

448 def _exec_once_impl( 

449 self, retry_on_exception: bool, *args: Any, **kw: Any 

450 ) -> None: 

451 with self._get_exec_once_mutex(): 

452 if not self._exec_once: 

453 try: 

454 self(*args, **kw) 

455 exception = False 

456 except: 

457 exception = True 

458 raise 

459 finally: 

460 if not exception or not retry_on_exception: 

461 self._exec_once = True 

462 

463 def exec_once(self, *args: Any, **kw: Any) -> None: 

464 """Execute this event, but only if it has not been 

465 executed already for this collection.""" 

466 

467 if not self._exec_once: 

468 self._exec_once_impl(False, *args, **kw) 

469 

470 def exec_once_unless_exception(self, *args: Any, **kw: Any) -> None: 

471 """Execute this event, but only if it has not been 

472 executed already for this collection, or was called 

473 by a previous exec_once_unless_exception call and 

474 raised an exception. 

475 

476 If exec_once was already called, then this method will never run 

477 the callable regardless of whether it raised or not. 

478 

479 .. versionadded:: 1.3.8 

480 

481 """ 

482 if not self._exec_once: 

483 self._exec_once_impl(True, *args, **kw) 

484 

485 def _exec_w_sync_on_first_run(self, *args: Any, **kw: Any) -> None: 

486 """Execute this event, and use a mutex if it has not been 

487 executed already for this collection, or was called 

488 by a previous _exec_w_sync_on_first_run call and 

489 raised an exception. 

490 

491 If _exec_w_sync_on_first_run was already called and didn't raise an 

492 exception, then a mutex is not used. It's not guaranteed 

493 the mutex won't be used more than once in the case of very rare 

494 race conditions. 

495 

496 .. versionadded:: 1.4.11 

497 

498 """ 

499 if not self._exec_w_sync_once: 

500 with self._get_exec_once_mutex(): 

501 try: 

502 self(*args, **kw) 

503 except: 

504 raise 

505 else: 

506 self._exec_w_sync_once = True 

507 else: 

508 self(*args, **kw) 

509 

510 def __call__(self, *args: Any, **kw: Any) -> None: 

511 """Execute this event.""" 

512 

513 for fn in self.parent_listeners: 

514 fn(*args, **kw) 

515 for fn in self.listeners: 

516 fn(*args, **kw) 

517 

518 def __contains__(self, item: Any) -> bool: 

519 return item in self.parent_listeners or item in self.listeners 

520 

521 def __len__(self) -> int: 

522 return len(self.parent_listeners) + len(self.listeners) 

523 

524 def __iter__(self) -> Iterator[_ListenerFnType]: 

525 return chain(self.parent_listeners, self.listeners) 

526 

527 def __bool__(self) -> bool: 

528 return bool(self.listeners or self.parent_listeners) 

529 

530 

531class _ListenerCollection(_CompoundListener[_ET]): 

532 """Instance-level attributes on instances of :class:`._Dispatch`. 

533 

534 Represents a collection of listeners. 

535 

536 As of 0.7.9, _ListenerCollection is only first 

537 created via the _EmptyListener.for_modify() method. 

538 

539 """ 

540 

541 __slots__ = ( 

542 "parent_listeners", 

543 "parent", 

544 "name", 

545 "listeners", 

546 "propagate", 

547 "__weakref__", 

548 ) 

549 

550 parent_listeners: Collection[_ListenerFnType] 

551 parent: _ClsLevelDispatch[_ET] 

552 name: str 

553 listeners: Deque[_ListenerFnType] 

554 propagate: Set[_ListenerFnType] 

555 

556 def __init__(self, parent: _ClsLevelDispatch[_ET], target_cls: Type[_ET]): 

557 super().__init__() 

558 if target_cls not in parent._clslevel: 

559 parent.update_subclass(target_cls) 

560 self._exec_once = False 

561 self._exec_w_sync_once = False 

562 self._exec_once_mutex = None 

563 self.parent_listeners = parent._clslevel[target_cls] 

564 self.parent = parent 

565 self.name = parent.name 

566 self.listeners = collections.deque() 

567 self.propagate = set() 

568 

569 def for_modify( 

570 self, obj: _DispatchCommon[_ET] 

571 ) -> _ListenerCollection[_ET]: 

572 """Return an event collection which can be modified. 

573 

574 For _ListenerCollection at the instance level of 

575 a dispatcher, this returns self. 

576 

577 """ 

578 return self 

579 

580 def _update( 

581 self, other: _ListenerCollection[_ET], only_propagate: bool = True 

582 ) -> None: 

583 """Populate from the listeners in another :class:`_Dispatch` 

584 object.""" 

585 existing_listeners = self.listeners 

586 existing_listener_set = set(existing_listeners) 

587 self.propagate.update(other.propagate) 

588 other_listeners = [ 

589 l 

590 for l in other.listeners 

591 if l not in existing_listener_set 

592 and not only_propagate 

593 or l in self.propagate 

594 ] 

595 

596 existing_listeners.extend(other_listeners) 

597 

598 if other._is_asyncio: 

599 self._set_asyncio() 

600 

601 to_associate = other.propagate.union(other_listeners) 

602 registry._stored_in_collection_multi(self, other, to_associate) 

603 

604 def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None: 

605 if event_key.prepend_to_list(self, self.listeners): 

606 if propagate: 

607 self.propagate.add(event_key._listen_fn) 

608 

609 def append(self, event_key: _EventKey[_ET], propagate: bool) -> None: 

610 if event_key.append_to_list(self, self.listeners): 

611 if propagate: 

612 self.propagate.add(event_key._listen_fn) 

613 

614 def remove(self, event_key: _EventKey[_ET]) -> None: 

615 self.listeners.remove(event_key._listen_fn) 

616 self.propagate.discard(event_key._listen_fn) 

617 registry._removed_from_collection(event_key, self) 

618 

619 def clear(self) -> None: 

620 registry._clear(self, self.listeners) 

621 self.propagate.clear() 

622 self.listeners.clear() 

623 

624 

625class _JoinedListener(_CompoundListener[_ET]): 

626 __slots__ = "parent_dispatch", "name", "local", "parent_listeners" 

627 

628 parent_dispatch: _DispatchCommon[_ET] 

629 name: str 

630 local: _InstanceLevelDispatch[_ET] 

631 parent_listeners: Collection[_ListenerFnType] 

632 

633 def __init__( 

634 self, 

635 parent_dispatch: _DispatchCommon[_ET], 

636 name: str, 

637 local: _EmptyListener[_ET], 

638 ): 

639 self._exec_once = False 

640 self._exec_w_sync_once = False 

641 self._exec_once_mutex = None 

642 self.parent_dispatch = parent_dispatch 

643 self.name = name 

644 self.local = local 

645 self.parent_listeners = self.local 

646 

647 if not typing.TYPE_CHECKING: 

648 # first error, I don't really understand: 

649 # Signature of "listeners" incompatible with 

650 # supertype "_CompoundListener" [override] 

651 # the name / return type are exactly the same 

652 # second error is getattr_isn't typed, the cast() here 

653 # adds too much method overhead 

654 @property 

655 def listeners(self) -> Collection[_ListenerFnType]: 

656 return getattr(self.parent_dispatch, self.name) 

657 

658 def _adjust_fn_spec( 

659 self, fn: _ListenerFnType, named: bool 

660 ) -> _ListenerFnType: 

661 return self.local._adjust_fn_spec(fn, named) 

662 

663 def for_modify(self, obj: _DispatchCommon[_ET]) -> _JoinedListener[_ET]: 

664 self.local = self.parent_listeners = self.local.for_modify(obj) 

665 return self 

666 

667 def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None: 

668 self.local.insert(event_key, propagate) 

669 

670 def append(self, event_key: _EventKey[_ET], propagate: bool) -> None: 

671 self.local.append(event_key, propagate) 

672 

673 def remove(self, event_key: _EventKey[_ET]) -> None: 

674 self.local.remove(event_key) 

675 

676 def clear(self) -> None: 

677 raise NotImplementedError()