Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/event/attr.py: 66%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

357 statements  

1# event/attr.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Attribute implementation for _Dispatch classes. 

9 

10The various listener targets for a particular event class are represented 

11as attributes, which refer to collections of listeners to be fired off. 

12These collections can exist at the class level as well as at the instance 

13level. An event is fired off using code like this:: 

14 

15 some_object.dispatch.first_connect(arg1, arg2) 

16 

17Above, ``some_object.dispatch`` would be an instance of ``_Dispatch`` and 

18``first_connect`` is typically an instance of ``_ListenerCollection`` 

19if event listeners are present, or ``_EmptyListener`` if none are present. 

20 

21The attribute mechanics here spend effort trying to ensure listener functions 

22are available with a minimum of function call overhead, that unnecessary 

23objects aren't created (i.e. many empty per-instance listener collections), 

24as well as that everything is garbage collectable when owning references are 

25lost. Other features such as "propagation" of listener functions across 

26many ``_Dispatch`` instances, "joining" of multiple ``_Dispatch`` instances, 

27as well as support for subclass propagation (e.g. events assigned to 

28``Pool`` vs. ``QueuePool``) are all implemented here. 

29 

30""" 

31 

32from __future__ import annotations 

33 

34import collections 

35from itertools import chain 

36import threading 

37from types import TracebackType 

38import typing 

39from typing import Any 

40from typing import cast 

41from typing import Collection 

42from typing import Deque 

43from typing import FrozenSet 

44from typing import Generic 

45from typing import Iterator 

46from typing import MutableMapping 

47from typing import MutableSequence 

48from typing import NoReturn 

49from typing import Optional 

50from typing import Protocol 

51from typing import Sequence 

52from typing import Set 

53from typing import Tuple 

54from typing import Type 

55from typing import TypeVar 

56from typing import Union 

57import weakref 

58 

59from . import legacy 

60from . import registry 

61from .registry import _ET 

62from .registry import _EventKey 

63from .registry import _ListenerFnType 

64from .. import exc 

65from .. import util 

66from ..util.concurrency import AsyncAdaptedLock 

67 

68_T = TypeVar("_T", bound=Any) 

69 

70if typing.TYPE_CHECKING: 

71 from .base import _Dispatch 

72 from .base import _DispatchCommon 

73 from .base import _HasEventsDispatch 

74 

75 

76class RefCollection(util.MemoizedSlots, Generic[_ET]): 

77 __slots__ = ("ref",) 

78 

79 ref: weakref.ref[RefCollection[_ET]] 

80 

81 def _memoized_attr_ref(self) -> weakref.ref[RefCollection[_ET]]: 

82 return weakref.ref(self, registry._collection_gced) 

83 

84 

85class _empty_collection(Collection[_T]): 

86 def append(self, element: _T) -> None: 

87 pass 

88 

89 def appendleft(self, element: _T) -> None: 

90 pass 

91 

92 def extend(self, other: Sequence[_T]) -> None: 

93 pass 

94 

95 def remove(self, element: _T) -> None: 

96 pass 

97 

98 def __contains__(self, element: Any) -> bool: 

99 return False 

100 

101 def __iter__(self) -> Iterator[_T]: 

102 return iter([]) 

103 

104 def clear(self) -> None: 

105 pass 

106 

107 def __len__(self) -> int: 

108 return 0 

109 

110 

111_ListenerFnSequenceType = Union[Deque[_T], _empty_collection[_T]] 

112 

113 

114class _ClsLevelDispatch(RefCollection[_ET]): 

115 """Class-level events on :class:`._Dispatch` classes.""" 

116 

117 __slots__ = ( 

118 "clsname", 

119 "name", 

120 "arg_names", 

121 "has_kw", 

122 "legacy_signatures", 

123 "_clslevel", 

124 "__weakref__", 

125 ) 

126 

127 clsname: str 

128 name: str 

129 arg_names: Sequence[str] 

130 has_kw: bool 

131 legacy_signatures: MutableSequence[legacy._LegacySignatureType] 

132 _clslevel: MutableMapping[ 

133 Type[_ET], _ListenerFnSequenceType[_ListenerFnType] 

134 ] 

135 

136 def __init__( 

137 self, 

138 parent_dispatch_cls: Type[_HasEventsDispatch[_ET]], 

139 fn: _ListenerFnType, 

140 ): 

141 self.name = fn.__name__ 

142 self.clsname = parent_dispatch_cls.__name__ 

143 argspec = util.inspect_getfullargspec(fn) 

144 self.arg_names = argspec.args[1:] 

145 self.has_kw = bool(argspec.varkw) 

146 self.legacy_signatures = list( 

147 reversed( 

148 sorted( 

149 getattr(fn, "_legacy_signatures", []), key=lambda s: s[0] 

150 ) 

151 ) 

152 ) 

153 fn.__doc__ = legacy._augment_fn_docs(self, parent_dispatch_cls, fn) 

154 

155 self._clslevel = weakref.WeakKeyDictionary() 

156 

157 def _adjust_fn_spec( 

158 self, fn: _ListenerFnType, named: bool 

159 ) -> _ListenerFnType: 

160 if named: 

161 fn = self._wrap_fn_for_kw(fn) 

162 if self.legacy_signatures: 

163 try: 

164 argspec = util.get_callable_argspec(fn, no_self=True) 

165 except TypeError: 

166 pass 

167 else: 

168 fn = legacy._wrap_fn_for_legacy(self, fn, argspec) 

169 return fn 

170 

171 def _wrap_fn_for_kw(self, fn: _ListenerFnType) -> _ListenerFnType: 

172 def wrap_kw(*args: Any, **kw: Any) -> Any: 

173 argdict = dict(zip(self.arg_names, args)) 

174 argdict.update(kw) 

175 return fn(**argdict) 

176 

177 return wrap_kw 

178 

179 def _do_insert_or_append( 

180 self, event_key: _EventKey[_ET], is_append: bool 

181 ) -> None: 

182 target = event_key.dispatch_target 

183 assert isinstance( 

184 target, type 

185 ), "Class-level Event targets must be classes." 

186 if not getattr(target, "_sa_propagate_class_events", True): 

187 raise exc.InvalidRequestError( 

188 f"Can't assign an event directly to the {target} class" 

189 ) 

190 

191 cls: Type[_ET] 

192 

193 for cls in util.walk_subclasses(target): 

194 if cls is not target and cls not in self._clslevel: 

195 self.update_subclass(cls) 

196 else: 

197 if cls not in self._clslevel: 

198 self.update_subclass(cls) 

199 if is_append: 

200 self._clslevel[cls].append(event_key._listen_fn) 

201 else: 

202 self._clslevel[cls].appendleft(event_key._listen_fn) 

203 registry._stored_in_collection(event_key, self) 

204 

205 def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None: 

206 self._do_insert_or_append(event_key, is_append=False) 

207 

208 def append(self, event_key: _EventKey[_ET], propagate: bool) -> None: 

209 self._do_insert_or_append(event_key, is_append=True) 

210 

211 def update_subclass(self, target: Type[_ET]) -> None: 

212 if target not in self._clslevel: 

213 if getattr(target, "_sa_propagate_class_events", True): 

214 self._clslevel[target] = collections.deque() 

215 else: 

216 self._clslevel[target] = _empty_collection() 

217 

218 clslevel = self._clslevel[target] 

219 cls: Type[_ET] 

220 for cls in target.__mro__[1:]: 

221 if cls in self._clslevel: 

222 clslevel.extend( 

223 [fn for fn in self._clslevel[cls] if fn not in clslevel] 

224 ) 

225 

226 def remove(self, event_key: _EventKey[_ET]) -> None: 

227 target = event_key.dispatch_target 

228 cls: Type[_ET] 

229 for cls in util.walk_subclasses(target): 

230 if cls in self._clslevel: 

231 self._clslevel[cls].remove(event_key._listen_fn) 

232 registry._removed_from_collection(event_key, self) 

233 

234 def clear(self) -> None: 

235 """Clear all class level listeners""" 

236 

237 to_clear: Set[_ListenerFnType] = set() 

238 for dispatcher in self._clslevel.values(): 

239 to_clear.update(dispatcher) 

240 dispatcher.clear() 

241 registry._clear(self, to_clear) 

242 

243 def for_modify(self, obj: _Dispatch[_ET]) -> _ClsLevelDispatch[_ET]: 

244 """Return an event collection which can be modified. 

245 

246 For _ClsLevelDispatch at the class level of 

247 a dispatcher, this returns self. 

248 

249 """ 

250 return self 

251 

252 

253class _InstanceLevelDispatch(RefCollection[_ET], Collection[_ListenerFnType]): 

254 __slots__ = () 

255 

256 parent: _ClsLevelDispatch[_ET] 

257 

258 def _adjust_fn_spec( 

259 self, fn: _ListenerFnType, named: bool 

260 ) -> _ListenerFnType: 

261 return self.parent._adjust_fn_spec(fn, named) 

262 

263 def __contains__(self, item: Any) -> bool: 

264 raise NotImplementedError() 

265 

266 def __len__(self) -> int: 

267 raise NotImplementedError() 

268 

269 def __iter__(self) -> Iterator[_ListenerFnType]: 

270 raise NotImplementedError() 

271 

272 def __bool__(self) -> bool: 

273 raise NotImplementedError() 

274 

275 def exec_once(self, *args: Any, **kw: Any) -> None: 

276 raise NotImplementedError() 

277 

278 def exec_once_unless_exception(self, *args: Any, **kw: Any) -> None: 

279 raise NotImplementedError() 

280 

281 def _exec_w_sync_on_first_run(self, *args: Any, **kw: Any) -> None: 

282 raise NotImplementedError() 

283 

284 def __call__(self, *args: Any, **kw: Any) -> None: 

285 raise NotImplementedError() 

286 

287 def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None: 

288 raise NotImplementedError() 

289 

290 def append(self, event_key: _EventKey[_ET], propagate: bool) -> None: 

291 raise NotImplementedError() 

292 

293 def remove(self, event_key: _EventKey[_ET]) -> None: 

294 raise NotImplementedError() 

295 

296 def for_modify( 

297 self, obj: _DispatchCommon[_ET] 

298 ) -> _InstanceLevelDispatch[_ET]: 

299 """Return an event collection which can be modified. 

300 

301 For _ClsLevelDispatch at the class level of 

302 a dispatcher, this returns self. 

303 

304 """ 

305 return self 

306 

307 

308class _EmptyListener(_InstanceLevelDispatch[_ET]): 

309 """Serves as a proxy interface to the events 

310 served by a _ClsLevelDispatch, when there are no 

311 instance-level events present. 

312 

313 Is replaced by _ListenerCollection when instance-level 

314 events are added. 

315 

316 """ 

317 

318 __slots__ = "parent", "parent_listeners", "name" 

319 

320 propagate: FrozenSet[_ListenerFnType] = frozenset() 

321 listeners: Tuple[()] = () 

322 parent: _ClsLevelDispatch[_ET] 

323 parent_listeners: _ListenerFnSequenceType[_ListenerFnType] 

324 name: str 

325 

326 def __init__(self, parent: _ClsLevelDispatch[_ET], target_cls: Type[_ET]): 

327 if target_cls not in parent._clslevel: 

328 parent.update_subclass(target_cls) 

329 self.parent = parent 

330 self.parent_listeners = parent._clslevel[target_cls] 

331 self.name = parent.name 

332 

333 def for_modify( 

334 self, obj: _DispatchCommon[_ET] 

335 ) -> _ListenerCollection[_ET]: 

336 """Return an event collection which can be modified. 

337 

338 For _EmptyListener at the instance level of 

339 a dispatcher, this generates a new 

340 _ListenerCollection, applies it to the instance, 

341 and returns it. 

342 

343 """ 

344 obj = cast("_Dispatch[_ET]", obj) 

345 

346 assert obj._instance_cls is not None 

347 existing = getattr(obj, self.name) 

348 

349 with util.mini_gil: 

350 if existing is self or isinstance(existing, _JoinedListener): 

351 result = _ListenerCollection(self.parent, obj._instance_cls) 

352 else: 

353 # this codepath is an extremely rare race condition 

354 # that has been observed in test_pool.py->test_timeout_race 

355 # with freethreaded. 

356 assert isinstance(existing, _ListenerCollection) 

357 return existing 

358 

359 if existing is self: 

360 setattr(obj, self.name, result) 

361 return result 

362 

363 def _needs_modify(self, *args: Any, **kw: Any) -> NoReturn: 

364 raise NotImplementedError("need to call for_modify()") 

365 

366 def exec_once(self, *args: Any, **kw: Any) -> NoReturn: 

367 self._needs_modify(*args, **kw) 

368 

369 def exec_once_unless_exception(self, *args: Any, **kw: Any) -> NoReturn: 

370 self._needs_modify(*args, **kw) 

371 

372 def insert(self, *args: Any, **kw: Any) -> NoReturn: 

373 self._needs_modify(*args, **kw) 

374 

375 def append(self, *args: Any, **kw: Any) -> NoReturn: 

376 self._needs_modify(*args, **kw) 

377 

378 def remove(self, *args: Any, **kw: Any) -> NoReturn: 

379 self._needs_modify(*args, **kw) 

380 

381 def clear(self, *args: Any, **kw: Any) -> NoReturn: 

382 self._needs_modify(*args, **kw) 

383 

384 def __call__(self, *args: Any, **kw: Any) -> None: 

385 """Execute this event.""" 

386 

387 for fn in self.parent_listeners: 

388 fn(*args, **kw) 

389 

390 def __contains__(self, item: Any) -> bool: 

391 return item in self.parent_listeners 

392 

393 def __len__(self) -> int: 

394 return len(self.parent_listeners) 

395 

396 def __iter__(self) -> Iterator[_ListenerFnType]: 

397 return iter(self.parent_listeners) 

398 

399 def __bool__(self) -> bool: 

400 return bool(self.parent_listeners) 

401 

402 

403class _MutexProtocol(Protocol): 

404 def __enter__(self) -> bool: ... 

405 

406 def __exit__( 

407 self, 

408 exc_type: Optional[Type[BaseException]], 

409 exc_val: Optional[BaseException], 

410 exc_tb: Optional[TracebackType], 

411 ) -> Optional[bool]: ... 

412 

413 

414class _CompoundListener(_InstanceLevelDispatch[_ET]): 

415 __slots__ = ( 

416 "_exec_once_mutex", 

417 "_exec_once", 

418 "_exec_w_sync_once", 

419 "_is_asyncio", 

420 ) 

421 

422 _exec_once_mutex: Optional[_MutexProtocol] 

423 parent_listeners: Collection[_ListenerFnType] 

424 listeners: Collection[_ListenerFnType] 

425 _exec_once: bool 

426 _exec_w_sync_once: bool 

427 

428 def __init__(self, *arg: Any, **kw: Any): 

429 super().__init__(*arg, **kw) 

430 self._is_asyncio = False 

431 

432 def _set_asyncio(self) -> None: 

433 self._is_asyncio = True 

434 

435 def _get_exec_once_mutex(self) -> _MutexProtocol: 

436 with util.mini_gil: 

437 if self._exec_once_mutex is not None: 

438 return self._exec_once_mutex 

439 

440 if self._is_asyncio: 

441 mutex = AsyncAdaptedLock() 

442 else: 

443 mutex = threading.Lock() # type: ignore[assignment] 

444 self._exec_once_mutex = mutex 

445 

446 return mutex 

447 

448 def _exec_once_impl( 

449 self, retry_on_exception: bool, *args: Any, **kw: Any 

450 ) -> None: 

451 with self._get_exec_once_mutex(): 

452 if not self._exec_once: 

453 try: 

454 self(*args, **kw) 

455 exception = False 

456 except: 

457 exception = True 

458 raise 

459 finally: 

460 if not exception or not retry_on_exception: 

461 self._exec_once = True 

462 

463 def exec_once(self, *args: Any, **kw: Any) -> None: 

464 """Execute this event, but only if it has not been 

465 executed already for this collection.""" 

466 

467 if not self._exec_once: 

468 self._exec_once_impl(False, *args, **kw) 

469 

470 def exec_once_unless_exception(self, *args: Any, **kw: Any) -> None: 

471 """Execute this event, but only if it has not been 

472 executed already for this collection, or was called 

473 by a previous exec_once_unless_exception call and 

474 raised an exception. 

475 

476 If exec_once was already called, then this method will never run 

477 the callable regardless of whether it raised or not. 

478 

479 """ 

480 if not self._exec_once: 

481 self._exec_once_impl(True, *args, **kw) 

482 

483 def _exec_w_sync_on_first_run(self, *args: Any, **kw: Any) -> None: 

484 """Execute this event, and use a mutex if it has not been 

485 executed already for this collection, or was called 

486 by a previous _exec_w_sync_on_first_run call and 

487 raised an exception. 

488 

489 If _exec_w_sync_on_first_run was already called and didn't raise an 

490 exception, then a mutex is not used. It's not guaranteed 

491 the mutex won't be used more than once in the case of very rare 

492 race conditions. 

493 

494 .. versionadded:: 1.4.11 

495 

496 """ 

497 if not self._exec_w_sync_once: 

498 with self._get_exec_once_mutex(): 

499 try: 

500 self(*args, **kw) 

501 except: 

502 raise 

503 else: 

504 self._exec_w_sync_once = True 

505 else: 

506 self(*args, **kw) 

507 

508 def __call__(self, *args: Any, **kw: Any) -> None: 

509 """Execute this event.""" 

510 

511 for fn in self.parent_listeners: 

512 fn(*args, **kw) 

513 for fn in self.listeners: 

514 fn(*args, **kw) 

515 

516 def __contains__(self, item: Any) -> bool: 

517 return item in self.parent_listeners or item in self.listeners 

518 

519 def __len__(self) -> int: 

520 return len(self.parent_listeners) + len(self.listeners) 

521 

522 def __iter__(self) -> Iterator[_ListenerFnType]: 

523 return chain(self.parent_listeners, self.listeners) 

524 

525 def __bool__(self) -> bool: 

526 return bool(self.listeners or self.parent_listeners) 

527 

528 

529class _ListenerCollection(_CompoundListener[_ET]): 

530 """Instance-level attributes on instances of :class:`._Dispatch`. 

531 

532 Represents a collection of listeners. 

533 

534 As of 0.7.9, _ListenerCollection is only first 

535 created via the _EmptyListener.for_modify() method. 

536 

537 """ 

538 

539 __slots__ = ( 

540 "parent_listeners", 

541 "parent", 

542 "name", 

543 "listeners", 

544 "propagate", 

545 "__weakref__", 

546 ) 

547 

548 parent_listeners: Collection[_ListenerFnType] 

549 parent: _ClsLevelDispatch[_ET] 

550 name: str 

551 listeners: Deque[_ListenerFnType] 

552 propagate: Set[_ListenerFnType] 

553 

554 def __init__(self, parent: _ClsLevelDispatch[_ET], target_cls: Type[_ET]): 

555 super().__init__() 

556 if target_cls not in parent._clslevel: 

557 parent.update_subclass(target_cls) 

558 self._exec_once = False 

559 self._exec_w_sync_once = False 

560 self._exec_once_mutex = None 

561 self.parent_listeners = parent._clslevel[target_cls] 

562 self.parent = parent 

563 self.name = parent.name 

564 self.listeners = collections.deque() 

565 self.propagate = set() 

566 

567 def for_modify( 

568 self, obj: _DispatchCommon[_ET] 

569 ) -> _ListenerCollection[_ET]: 

570 """Return an event collection which can be modified. 

571 

572 For _ListenerCollection at the instance level of 

573 a dispatcher, this returns self. 

574 

575 """ 

576 return self 

577 

578 def _update( 

579 self, other: _ListenerCollection[_ET], only_propagate: bool = True 

580 ) -> None: 

581 """Populate from the listeners in another :class:`_Dispatch` 

582 object.""" 

583 existing_listeners = self.listeners 

584 existing_listener_set = set(existing_listeners) 

585 self.propagate.update(other.propagate) 

586 other_listeners = [ 

587 l 

588 for l in other.listeners 

589 if l not in existing_listener_set 

590 and not only_propagate 

591 or l in self.propagate 

592 ] 

593 

594 existing_listeners.extend(other_listeners) 

595 

596 if other._is_asyncio: 

597 self._set_asyncio() 

598 

599 to_associate = other.propagate.union(other_listeners) 

600 registry._stored_in_collection_multi(self, other, to_associate) 

601 

602 def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None: 

603 if event_key.prepend_to_list(self, self.listeners): 

604 if propagate: 

605 self.propagate.add(event_key._listen_fn) 

606 

607 def append(self, event_key: _EventKey[_ET], propagate: bool) -> None: 

608 if event_key.append_to_list(self, self.listeners): 

609 if propagate: 

610 self.propagate.add(event_key._listen_fn) 

611 

612 def remove(self, event_key: _EventKey[_ET]) -> None: 

613 self.listeners.remove(event_key._listen_fn) 

614 self.propagate.discard(event_key._listen_fn) 

615 registry._removed_from_collection(event_key, self) 

616 

617 def clear(self) -> None: 

618 registry._clear(self, self.listeners) 

619 self.propagate.clear() 

620 self.listeners.clear() 

621 

622 

623class _JoinedListener(_CompoundListener[_ET]): 

624 __slots__ = "parent_dispatch", "name", "local", "parent_listeners" 

625 

626 parent_dispatch: _DispatchCommon[_ET] 

627 name: str 

628 local: _InstanceLevelDispatch[_ET] 

629 parent_listeners: Collection[_ListenerFnType] 

630 

631 def __init__( 

632 self, 

633 parent_dispatch: _DispatchCommon[_ET], 

634 name: str, 

635 local: _EmptyListener[_ET], 

636 ): 

637 self._exec_once = False 

638 self._exec_w_sync_once = False 

639 self._exec_once_mutex = None 

640 self.parent_dispatch = parent_dispatch 

641 self.name = name 

642 self.local = local 

643 self.parent_listeners = self.local 

644 

645 if not typing.TYPE_CHECKING: 

646 # first error, I don't really understand: 

647 # Signature of "listeners" incompatible with 

648 # supertype "_CompoundListener" [override] 

649 # the name / return type are exactly the same 

650 # second error is getattr_isn't typed, the cast() here 

651 # adds too much method overhead 

652 @property 

653 def listeners(self) -> Collection[_ListenerFnType]: 

654 return getattr(self.parent_dispatch, self.name) 

655 

656 def _adjust_fn_spec( 

657 self, fn: _ListenerFnType, named: bool 

658 ) -> _ListenerFnType: 

659 return self.local._adjust_fn_spec(fn, named) 

660 

661 def for_modify(self, obj: _DispatchCommon[_ET]) -> _JoinedListener[_ET]: 

662 self.local = self.parent_listeners = self.local.for_modify(obj) 

663 return self 

664 

665 def insert(self, event_key: _EventKey[_ET], propagate: bool) -> None: 

666 self.local.insert(event_key, propagate) 

667 

668 def append(self, event_key: _EventKey[_ET], propagate: bool) -> None: 

669 self.local.append(event_key, propagate) 

670 

671 def remove(self, event_key: _EventKey[_ET]) -> None: 

672 self.local.remove(event_key) 

673 

674 def clear(self) -> None: 

675 raise NotImplementedError()