Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/base.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

842 statements  

1# sql/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Foundational utilities common to many sql modules. 

10 

11""" 

12 

13 

14from __future__ import annotations 

15 

16import collections 

17from enum import Enum 

18import itertools 

19from itertools import zip_longest 

20import operator 

21import re 

22from typing import Any 

23from typing import Callable 

24from typing import cast 

25from typing import Dict 

26from typing import FrozenSet 

27from typing import Generic 

28from typing import Iterable 

29from typing import Iterator 

30from typing import List 

31from typing import Mapping 

32from typing import MutableMapping 

33from typing import NamedTuple 

34from typing import NoReturn 

35from typing import Optional 

36from typing import overload 

37from typing import Sequence 

38from typing import Set 

39from typing import Tuple 

40from typing import Type 

41from typing import TYPE_CHECKING 

42from typing import TypeVar 

43from typing import Union 

44 

45from . import roles 

46from . import visitors 

47from .cache_key import HasCacheKey # noqa 

48from .cache_key import MemoizedHasCacheKey # noqa 

49from .traversals import HasCopyInternals # noqa 

50from .visitors import ClauseVisitor 

51from .visitors import ExtendedInternalTraversal 

52from .visitors import ExternallyTraversible 

53from .visitors import InternalTraversal 

54from .. import event 

55from .. import exc 

56from .. import util 

57from ..util import HasMemoized as HasMemoized 

58from ..util import hybridmethod 

59from ..util import typing as compat_typing 

60from ..util.typing import Protocol 

61from ..util.typing import Self 

62from ..util.typing import TypeGuard 

63 

64if TYPE_CHECKING: 

65 from . import coercions 

66 from . import elements 

67 from . import type_api 

68 from ._orm_types import DMLStrategyArgument 

69 from ._orm_types import SynchronizeSessionArgument 

70 from ._typing import _CLE 

71 from .compiler import SQLCompiler 

72 from .elements import BindParameter 

73 from .elements import ClauseList 

74 from .elements import ColumnClause # noqa 

75 from .elements import ColumnElement 

76 from .elements import NamedColumn 

77 from .elements import SQLCoreOperations 

78 from .elements import TextClause 

79 from .schema import Column 

80 from .schema import DefaultGenerator 

81 from .selectable import _JoinTargetElement 

82 from .selectable import _SelectIterable 

83 from .selectable import FromClause 

84 from ..engine import Connection 

85 from ..engine import CursorResult 

86 from ..engine.interfaces import _CoreMultiExecuteParams 

87 from ..engine.interfaces import _ExecuteOptions 

88 from ..engine.interfaces import _ImmutableExecuteOptions 

89 from ..engine.interfaces import CacheStats 

90 from ..engine.interfaces import Compiled 

91 from ..engine.interfaces import CompiledCacheType 

92 from ..engine.interfaces import CoreExecuteOptionsParameter 

93 from ..engine.interfaces import Dialect 

94 from ..engine.interfaces import IsolationLevel 

95 from ..engine.interfaces import SchemaTranslateMapType 

96 from ..event import dispatcher 

97 

98if not TYPE_CHECKING: 

99 coercions = None # noqa 

100 elements = None # noqa 

101 type_api = None # noqa 

102 

103 

104class _NoArg(Enum): 

105 NO_ARG = 0 

106 

107 def __repr__(self): 

108 return f"_NoArg.{self.name}" 

109 

110 

111NO_ARG = _NoArg.NO_ARG 

112 

113 

114class _NoneName(Enum): 

115 NONE_NAME = 0 

116 """indicate a 'deferred' name that was ultimately the value None.""" 

117 

118 

119_NONE_NAME = _NoneName.NONE_NAME 

120 

121_T = TypeVar("_T", bound=Any) 

122 

123_Fn = TypeVar("_Fn", bound=Callable[..., Any]) 

124 

125_AmbiguousTableNameMap = MutableMapping[str, str] 

126 

127 

128class _DefaultDescriptionTuple(NamedTuple): 

129 arg: Any 

130 is_scalar: Optional[bool] 

131 is_callable: Optional[bool] 

132 is_sentinel: Optional[bool] 

133 

134 @classmethod 

135 def _from_column_default( 

136 cls, default: Optional[DefaultGenerator] 

137 ) -> _DefaultDescriptionTuple: 

138 return ( 

139 _DefaultDescriptionTuple( 

140 default.arg, # type: ignore 

141 default.is_scalar, 

142 default.is_callable, 

143 default.is_sentinel, 

144 ) 

145 if default 

146 and ( 

147 default.has_arg 

148 or (not default.for_update and default.is_sentinel) 

149 ) 

150 else _DefaultDescriptionTuple(None, None, None, None) 

151 ) 

152 

153 

154_never_select_column = operator.attrgetter("_omit_from_statements") 

155 

156 

157class _EntityNamespace(Protocol): 

158 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ... 

159 

160 

161class _HasEntityNamespace(Protocol): 

162 @util.ro_non_memoized_property 

163 def entity_namespace(self) -> _EntityNamespace: ... 

164 

165 

166def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]: 

167 return hasattr(element, "entity_namespace") 

168 

169 

170# Remove when https://github.com/python/mypy/issues/14640 will be fixed 

171_Self = TypeVar("_Self", bound=Any) 

172 

173 

174class Immutable: 

175 """mark a ClauseElement as 'immutable' when expressions are cloned. 

176 

177 "immutable" objects refers to the "mutability" of an object in the 

178 context of SQL DQL and DML generation. Such as, in DQL, one can 

179 compose a SELECT or subquery of varied forms, but one cannot modify 

180 the structure of a specific table or column within DQL. 

181 :class:`.Immutable` is mostly intended to follow this concept, and as 

182 such the primary "immutable" objects are :class:`.ColumnClause`, 

183 :class:`.Column`, :class:`.TableClause`, :class:`.Table`. 

184 

185 """ 

186 

187 __slots__ = () 

188 

189 _is_immutable = True 

190 

191 def unique_params(self, *optionaldict, **kwargs): 

192 raise NotImplementedError("Immutable objects do not support copying") 

193 

194 def params(self, *optionaldict, **kwargs): 

195 raise NotImplementedError("Immutable objects do not support copying") 

196 

197 def _clone(self: _Self, **kw: Any) -> _Self: 

198 return self 

199 

200 def _copy_internals( 

201 self, *, omit_attrs: Iterable[str] = (), **kw: Any 

202 ) -> None: 

203 pass 

204 

205 

206class SingletonConstant(Immutable): 

207 """Represent SQL constants like NULL, TRUE, FALSE""" 

208 

209 _is_singleton_constant = True 

210 

211 _singleton: SingletonConstant 

212 

213 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T: 

214 return cast(_T, cls._singleton) 

215 

216 @util.non_memoized_property 

217 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]: 

218 raise NotImplementedError() 

219 

220 @classmethod 

221 def _create_singleton(cls): 

222 obj = object.__new__(cls) 

223 obj.__init__() # type: ignore 

224 

225 # for a long time this was an empty frozenset, meaning 

226 # a SingletonConstant would never be a "corresponding column" in 

227 # a statement. This referred to #6259. However, in #7154 we see 

228 # that we do in fact need "correspondence" to work when matching cols 

229 # in result sets, so the non-correspondence was moved to a more 

230 # specific level when we are actually adapting expressions for SQL 

231 # render only. 

232 obj.proxy_set = frozenset([obj]) 

233 cls._singleton = obj 

234 

235 

236def _from_objects( 

237 *elements: Union[ 

238 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement 

239 ] 

240) -> Iterator[FromClause]: 

241 return itertools.chain.from_iterable( 

242 [element._from_objects for element in elements] 

243 ) 

244 

245 

246def _select_iterables( 

247 elements: Iterable[roles.ColumnsClauseRole], 

248) -> _SelectIterable: 

249 """expand tables into individual columns in the 

250 given list of column expressions. 

251 

252 """ 

253 return itertools.chain.from_iterable( 

254 [c._select_iterable for c in elements] 

255 ) 

256 

257 

258_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType") 

259 

260 

261class _GenerativeType(compat_typing.Protocol): 

262 def _generate(self) -> Self: ... 

263 

264 

265def _generative(fn: _Fn) -> _Fn: 

266 """non-caching _generative() decorator. 

267 

268 This is basically the legacy decorator that copies the object and 

269 runs a method on the new copy. 

270 

271 """ 

272 

273 @util.decorator 

274 def _generative( 

275 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any 

276 ) -> _SelfGenerativeType: 

277 """Mark a method as generative.""" 

278 

279 self = self._generate() 

280 x = fn(self, *args, **kw) 

281 assert x is self, "generative methods must return self" 

282 return self 

283 

284 decorated = _generative(fn) 

285 decorated.non_generative = fn # type: ignore 

286 return decorated 

287 

288 

289def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]: 

290 msgs = kw.pop("msgs", {}) 

291 

292 defaults = kw.pop("defaults", {}) 

293 

294 getters = [ 

295 (name, operator.attrgetter(name), defaults.get(name, None)) 

296 for name in names 

297 ] 

298 

299 @util.decorator 

300 def check(fn, *args, **kw): 

301 # make pylance happy by not including "self" in the argument 

302 # list 

303 self = args[0] 

304 args = args[1:] 

305 for name, getter, default_ in getters: 

306 if getter(self) is not default_: 

307 msg = msgs.get( 

308 name, 

309 "Method %s() has already been invoked on this %s construct" 

310 % (fn.__name__, self.__class__), 

311 ) 

312 raise exc.InvalidRequestError(msg) 

313 return fn(self, *args, **kw) 

314 

315 return check 

316 

317 

318def _clone(element, **kw): 

319 return element._clone(**kw) 

320 

321 

322def _expand_cloned( 

323 elements: Iterable[_CLE], 

324) -> Iterable[_CLE]: 

325 """expand the given set of ClauseElements to be the set of all 'cloned' 

326 predecessors. 

327 

328 """ 

329 # TODO: cython candidate 

330 return itertools.chain(*[x._cloned_set for x in elements]) 

331 

332 

333def _de_clone( 

334 elements: Iterable[_CLE], 

335) -> Iterable[_CLE]: 

336 for x in elements: 

337 while x._is_clone_of is not None: 

338 x = x._is_clone_of 

339 yield x 

340 

341 

342def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

343 """return the intersection of sets a and b, counting 

344 any overlap between 'cloned' predecessors. 

345 

346 The returned set is in terms of the entities present within 'a'. 

347 

348 """ 

349 all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b)) 

350 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)} 

351 

352 

353def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

354 all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b)) 

355 return { 

356 elem for elem in a if not all_overlap.intersection(elem._cloned_set) 

357 } 

358 

359 

360class _DialectArgView(MutableMapping[str, Any]): 

361 """A dictionary view of dialect-level arguments in the form 

362 <dialectname>_<argument_name>. 

363 

364 """ 

365 

366 __slots__ = ("obj",) 

367 

368 def __init__(self, obj): 

369 self.obj = obj 

370 

371 def _key(self, key): 

372 try: 

373 dialect, value_key = key.split("_", 1) 

374 except ValueError as err: 

375 raise KeyError(key) from err 

376 else: 

377 return dialect, value_key 

378 

379 def __getitem__(self, key): 

380 dialect, value_key = self._key(key) 

381 

382 try: 

383 opt = self.obj.dialect_options[dialect] 

384 except exc.NoSuchModuleError as err: 

385 raise KeyError(key) from err 

386 else: 

387 return opt[value_key] 

388 

389 def __setitem__(self, key, value): 

390 try: 

391 dialect, value_key = self._key(key) 

392 except KeyError as err: 

393 raise exc.ArgumentError( 

394 "Keys must be of the form <dialectname>_<argname>" 

395 ) from err 

396 else: 

397 self.obj.dialect_options[dialect][value_key] = value 

398 

399 def __delitem__(self, key): 

400 dialect, value_key = self._key(key) 

401 del self.obj.dialect_options[dialect][value_key] 

402 

403 def __len__(self): 

404 return sum( 

405 len(args._non_defaults) 

406 for args in self.obj.dialect_options.values() 

407 ) 

408 

409 def __iter__(self): 

410 return ( 

411 "%s_%s" % (dialect_name, value_name) 

412 for dialect_name in self.obj.dialect_options 

413 for value_name in self.obj.dialect_options[ 

414 dialect_name 

415 ]._non_defaults 

416 ) 

417 

418 

419class _DialectArgDict(MutableMapping[str, Any]): 

420 """A dictionary view of dialect-level arguments for a specific 

421 dialect. 

422 

423 Maintains a separate collection of user-specified arguments 

424 and dialect-specified default arguments. 

425 

426 """ 

427 

428 def __init__(self): 

429 self._non_defaults = {} 

430 self._defaults = {} 

431 

432 def __len__(self): 

433 return len(set(self._non_defaults).union(self._defaults)) 

434 

435 def __iter__(self): 

436 return iter(set(self._non_defaults).union(self._defaults)) 

437 

438 def __getitem__(self, key): 

439 if key in self._non_defaults: 

440 return self._non_defaults[key] 

441 else: 

442 return self._defaults[key] 

443 

444 def __setitem__(self, key, value): 

445 self._non_defaults[key] = value 

446 

447 def __delitem__(self, key): 

448 del self._non_defaults[key] 

449 

450 

451@util.preload_module("sqlalchemy.dialects") 

452def _kw_reg_for_dialect(dialect_name): 

453 dialect_cls = util.preloaded.dialects.registry.load(dialect_name) 

454 if dialect_cls.construct_arguments is None: 

455 return None 

456 return dict(dialect_cls.construct_arguments) 

457 

458 

459class DialectKWArgs: 

460 """Establish the ability for a class to have dialect-specific arguments 

461 with defaults and constructor validation. 

462 

463 The :class:`.DialectKWArgs` interacts with the 

464 :attr:`.DefaultDialect.construct_arguments` present on a dialect. 

465 

466 .. seealso:: 

467 

468 :attr:`.DefaultDialect.construct_arguments` 

469 

470 """ 

471 

472 __slots__ = () 

473 

474 _dialect_kwargs_traverse_internals = [ 

475 ("dialect_options", InternalTraversal.dp_dialect_options) 

476 ] 

477 

478 @classmethod 

479 def argument_for(cls, dialect_name, argument_name, default): 

480 """Add a new kind of dialect-specific keyword argument for this class. 

481 

482 E.g.:: 

483 

484 Index.argument_for("mydialect", "length", None) 

485 

486 some_index = Index("a", "b", mydialect_length=5) 

487 

488 The :meth:`.DialectKWArgs.argument_for` method is a per-argument 

489 way adding extra arguments to the 

490 :attr:`.DefaultDialect.construct_arguments` dictionary. This 

491 dictionary provides a list of argument names accepted by various 

492 schema-level constructs on behalf of a dialect. 

493 

494 New dialects should typically specify this dictionary all at once as a 

495 data member of the dialect class. The use case for ad-hoc addition of 

496 argument names is typically for end-user code that is also using 

497 a custom compilation scheme which consumes the additional arguments. 

498 

499 :param dialect_name: name of a dialect. The dialect must be 

500 locatable, else a :class:`.NoSuchModuleError` is raised. The 

501 dialect must also include an existing 

502 :attr:`.DefaultDialect.construct_arguments` collection, indicating 

503 that it participates in the keyword-argument validation and default 

504 system, else :class:`.ArgumentError` is raised. If the dialect does 

505 not include this collection, then any keyword argument can be 

506 specified on behalf of this dialect already. All dialects packaged 

507 within SQLAlchemy include this collection, however for third party 

508 dialects, support may vary. 

509 

510 :param argument_name: name of the parameter. 

511 

512 :param default: default value of the parameter. 

513 

514 """ 

515 

516 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name] 

517 if construct_arg_dictionary is None: 

518 raise exc.ArgumentError( 

519 "Dialect '%s' does have keyword-argument " 

520 "validation and defaults enabled configured" % dialect_name 

521 ) 

522 if cls not in construct_arg_dictionary: 

523 construct_arg_dictionary[cls] = {} 

524 construct_arg_dictionary[cls][argument_name] = default 

525 

526 @property 

527 def dialect_kwargs(self): 

528 """A collection of keyword arguments specified as dialect-specific 

529 options to this construct. 

530 

531 The arguments are present here in their original ``<dialect>_<kwarg>`` 

532 format. Only arguments that were actually passed are included; 

533 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which 

534 contains all options known by this dialect including defaults. 

535 

536 The collection is also writable; keys are accepted of the 

537 form ``<dialect>_<kwarg>`` where the value will be assembled 

538 into the list of options. 

539 

540 .. seealso:: 

541 

542 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form 

543 

544 """ 

545 return _DialectArgView(self) 

546 

547 @property 

548 def kwargs(self): 

549 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`.""" 

550 return self.dialect_kwargs 

551 

552 _kw_registry = util.PopulateDict(_kw_reg_for_dialect) 

553 

554 @classmethod 

555 def _kw_reg_for_dialect_cls(cls, dialect_name): 

556 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name] 

557 d = _DialectArgDict() 

558 

559 if construct_arg_dictionary is None: 

560 d._defaults.update({"*": None}) 

561 else: 

562 for cls in reversed(cls.__mro__): 

563 if cls in construct_arg_dictionary: 

564 d._defaults.update(construct_arg_dictionary[cls]) 

565 return d 

566 

567 @util.memoized_property 

568 def dialect_options(self): 

569 """A collection of keyword arguments specified as dialect-specific 

570 options to this construct. 

571 

572 This is a two-level nested registry, keyed to ``<dialect_name>`` 

573 and ``<argument_name>``. For example, the ``postgresql_where`` 

574 argument would be locatable as:: 

575 

576 arg = my_object.dialect_options["postgresql"]["where"] 

577 

578 .. versionadded:: 0.9.2 

579 

580 .. seealso:: 

581 

582 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form 

583 

584 """ 

585 

586 return util.PopulateDict(self._kw_reg_for_dialect_cls) 

587 

588 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None: 

589 # validate remaining kwargs that they all specify DB prefixes 

590 

591 if not kwargs: 

592 return 

593 

594 for k in kwargs: 

595 m = re.match("^(.+?)_(.+)$", k) 

596 if not m: 

597 raise TypeError( 

598 "Additional arguments should be " 

599 "named <dialectname>_<argument>, got '%s'" % k 

600 ) 

601 dialect_name, arg_name = m.group(1, 2) 

602 

603 try: 

604 construct_arg_dictionary = self.dialect_options[dialect_name] 

605 except exc.NoSuchModuleError: 

606 util.warn( 

607 "Can't validate argument %r; can't " 

608 "locate any SQLAlchemy dialect named %r" 

609 % (k, dialect_name) 

610 ) 

611 self.dialect_options[dialect_name] = d = _DialectArgDict() 

612 d._defaults.update({"*": None}) 

613 d._non_defaults[arg_name] = kwargs[k] 

614 else: 

615 if ( 

616 "*" not in construct_arg_dictionary 

617 and arg_name not in construct_arg_dictionary 

618 ): 

619 raise exc.ArgumentError( 

620 "Argument %r is not accepted by " 

621 "dialect %r on behalf of %r" 

622 % (k, dialect_name, self.__class__) 

623 ) 

624 else: 

625 construct_arg_dictionary[arg_name] = kwargs[k] 

626 

627 

628class CompileState: 

629 """Produces additional object state necessary for a statement to be 

630 compiled. 

631 

632 the :class:`.CompileState` class is at the base of classes that assemble 

633 state for a particular statement object that is then used by the 

634 compiler. This process is essentially an extension of the process that 

635 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis 

636 on converting raw user intent into more organized structures rather than 

637 producing string output. The top-level :class:`.CompileState` for the 

638 statement being executed is also accessible when the execution context 

639 works with invoking the statement and collecting results. 

640 

641 The production of :class:`.CompileState` is specific to the compiler, such 

642 as within the :meth:`.SQLCompiler.visit_insert`, 

643 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also 

644 responsible for associating the :class:`.CompileState` with the 

645 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement, 

646 i.e. the outermost SQL statement that's actually being executed. 

647 There can be other :class:`.CompileState` objects that are not the 

648 toplevel, such as when a SELECT subquery or CTE-nested 

649 INSERT/UPDATE/DELETE is generated. 

650 

651 .. versionadded:: 1.4 

652 

653 """ 

654 

655 __slots__ = ("statement", "_ambiguous_table_name_map") 

656 

657 plugins: Dict[Tuple[str, str], Type[CompileState]] = {} 

658 

659 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] 

660 

661 @classmethod 

662 def create_for_statement( 

663 cls, statement: Executable, compiler: SQLCompiler, **kw: Any 

664 ) -> CompileState: 

665 # factory construction. 

666 

667 if statement._propagate_attrs: 

668 plugin_name = statement._propagate_attrs.get( 

669 "compile_state_plugin", "default" 

670 ) 

671 klass = cls.plugins.get( 

672 (plugin_name, statement._effective_plugin_target), None 

673 ) 

674 if klass is None: 

675 klass = cls.plugins[ 

676 ("default", statement._effective_plugin_target) 

677 ] 

678 

679 else: 

680 klass = cls.plugins[ 

681 ("default", statement._effective_plugin_target) 

682 ] 

683 

684 if klass is cls: 

685 return cls(statement, compiler, **kw) 

686 else: 

687 return klass.create_for_statement(statement, compiler, **kw) 

688 

689 def __init__(self, statement, compiler, **kw): 

690 self.statement = statement 

691 

692 @classmethod 

693 def get_plugin_class( 

694 cls, statement: Executable 

695 ) -> Optional[Type[CompileState]]: 

696 plugin_name = statement._propagate_attrs.get( 

697 "compile_state_plugin", None 

698 ) 

699 

700 if plugin_name: 

701 key = (plugin_name, statement._effective_plugin_target) 

702 if key in cls.plugins: 

703 return cls.plugins[key] 

704 

705 # there's no case where we call upon get_plugin_class() and want 

706 # to get None back, there should always be a default. return that 

707 # if there was no plugin-specific class (e.g. Insert with "orm" 

708 # plugin) 

709 try: 

710 return cls.plugins[("default", statement._effective_plugin_target)] 

711 except KeyError: 

712 return None 

713 

714 @classmethod 

715 def _get_plugin_class_for_plugin( 

716 cls, statement: Executable, plugin_name: str 

717 ) -> Optional[Type[CompileState]]: 

718 try: 

719 return cls.plugins[ 

720 (plugin_name, statement._effective_plugin_target) 

721 ] 

722 except KeyError: 

723 return None 

724 

725 @classmethod 

726 def plugin_for( 

727 cls, plugin_name: str, visit_name: str 

728 ) -> Callable[[_Fn], _Fn]: 

729 def decorate(cls_to_decorate): 

730 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate 

731 return cls_to_decorate 

732 

733 return decorate 

734 

735 

736class Generative(HasMemoized): 

737 """Provide a method-chaining pattern in conjunction with the 

738 @_generative decorator.""" 

739 

740 def _generate(self) -> Self: 

741 skip = self._memoized_keys 

742 cls = self.__class__ 

743 s = cls.__new__(cls) 

744 if skip: 

745 # ensure this iteration remains atomic 

746 s.__dict__ = { 

747 k: v for k, v in self.__dict__.copy().items() if k not in skip 

748 } 

749 else: 

750 s.__dict__ = self.__dict__.copy() 

751 return s 

752 

753 

754class InPlaceGenerative(HasMemoized): 

755 """Provide a method-chaining pattern in conjunction with the 

756 @_generative decorator that mutates in place.""" 

757 

758 __slots__ = () 

759 

760 def _generate(self): 

761 skip = self._memoized_keys 

762 # note __dict__ needs to be in __slots__ if this is used 

763 for k in skip: 

764 self.__dict__.pop(k, None) 

765 return self 

766 

767 

768class HasCompileState(Generative): 

769 """A class that has a :class:`.CompileState` associated with it.""" 

770 

771 _compile_state_plugin: Optional[Type[CompileState]] = None 

772 

773 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT 

774 

775 _compile_state_factory = CompileState.create_for_statement 

776 

777 

778class _MetaOptions(type): 

779 """metaclass for the Options class. 

780 

781 This metaclass is actually necessary despite the availability of the 

782 ``__init_subclass__()`` hook as this type also provides custom class-level 

783 behavior for the ``__add__()`` method. 

784 

785 """ 

786 

787 _cache_attrs: Tuple[str, ...] 

788 

789 def __add__(self, other): 

790 o1 = self() 

791 

792 if set(other).difference(self._cache_attrs): 

793 raise TypeError( 

794 "dictionary contains attributes not covered by " 

795 "Options class %s: %r" 

796 % (self, set(other).difference(self._cache_attrs)) 

797 ) 

798 

799 o1.__dict__.update(other) 

800 return o1 

801 

802 if TYPE_CHECKING: 

803 

804 def __getattr__(self, key: str) -> Any: ... 

805 

806 def __setattr__(self, key: str, value: Any) -> None: ... 

807 

808 def __delattr__(self, key: str) -> None: ... 

809 

810 

811class Options(metaclass=_MetaOptions): 

812 """A cacheable option dictionary with defaults.""" 

813 

814 __slots__ = () 

815 

816 _cache_attrs: Tuple[str, ...] 

817 

818 def __init_subclass__(cls) -> None: 

819 dict_ = cls.__dict__ 

820 cls._cache_attrs = tuple( 

821 sorted( 

822 d 

823 for d in dict_ 

824 if not d.startswith("__") 

825 and d not in ("_cache_key_traversal",) 

826 ) 

827 ) 

828 super().__init_subclass__() 

829 

830 def __init__(self, **kw): 

831 self.__dict__.update(kw) 

832 

833 def __add__(self, other): 

834 o1 = self.__class__.__new__(self.__class__) 

835 o1.__dict__.update(self.__dict__) 

836 

837 if set(other).difference(self._cache_attrs): 

838 raise TypeError( 

839 "dictionary contains attributes not covered by " 

840 "Options class %s: %r" 

841 % (self, set(other).difference(self._cache_attrs)) 

842 ) 

843 

844 o1.__dict__.update(other) 

845 return o1 

846 

847 def __eq__(self, other): 

848 # TODO: very inefficient. This is used only in test suites 

849 # right now. 

850 for a, b in zip_longest(self._cache_attrs, other._cache_attrs): 

851 if getattr(self, a) != getattr(other, b): 

852 return False 

853 return True 

854 

855 def __repr__(self): 

856 # TODO: fairly inefficient, used only in debugging right now. 

857 

858 return "%s(%s)" % ( 

859 self.__class__.__name__, 

860 ", ".join( 

861 "%s=%r" % (k, self.__dict__[k]) 

862 for k in self._cache_attrs 

863 if k in self.__dict__ 

864 ), 

865 ) 

866 

867 @classmethod 

868 def isinstance(cls, klass: Type[Any]) -> bool: 

869 return issubclass(cls, klass) 

870 

871 @hybridmethod 

872 def add_to_element(self, name, value): 

873 return self + {name: getattr(self, name) + value} 

874 

875 @hybridmethod 

876 def _state_dict_inst(self) -> Mapping[str, Any]: 

877 return self.__dict__ 

878 

879 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT 

880 

881 @_state_dict_inst.classlevel 

882 def _state_dict(cls) -> Mapping[str, Any]: 

883 return cls._state_dict_const 

884 

885 @classmethod 

886 def safe_merge(cls, other): 

887 d = other._state_dict() 

888 

889 # only support a merge with another object of our class 

890 # and which does not have attrs that we don't. otherwise 

891 # we risk having state that might not be part of our cache 

892 # key strategy 

893 

894 if ( 

895 cls is not other.__class__ 

896 and other._cache_attrs 

897 and set(other._cache_attrs).difference(cls._cache_attrs) 

898 ): 

899 raise TypeError( 

900 "other element %r is not empty, is not of type %s, " 

901 "and contains attributes not covered here %r" 

902 % ( 

903 other, 

904 cls, 

905 set(other._cache_attrs).difference(cls._cache_attrs), 

906 ) 

907 ) 

908 return cls + d 

909 

910 @classmethod 

911 def from_execution_options( 

912 cls, key, attrs, exec_options, statement_exec_options 

913 ): 

914 """process Options argument in terms of execution options. 

915 

916 

917 e.g.:: 

918 

919 ( 

920 load_options, 

921 execution_options, 

922 ) = QueryContext.default_load_options.from_execution_options( 

923 "_sa_orm_load_options", 

924 {"populate_existing", "autoflush", "yield_per"}, 

925 execution_options, 

926 statement._execution_options, 

927 ) 

928 

929 get back the Options and refresh "_sa_orm_load_options" in the 

930 exec options dict w/ the Options as well 

931 

932 """ 

933 

934 # common case is that no options we are looking for are 

935 # in either dictionary, so cancel for that first 

936 check_argnames = attrs.intersection( 

937 set(exec_options).union(statement_exec_options) 

938 ) 

939 

940 existing_options = exec_options.get(key, cls) 

941 

942 if check_argnames: 

943 result = {} 

944 for argname in check_argnames: 

945 local = "_" + argname 

946 if argname in exec_options: 

947 result[local] = exec_options[argname] 

948 elif argname in statement_exec_options: 

949 result[local] = statement_exec_options[argname] 

950 

951 new_options = existing_options + result 

952 exec_options = util.immutabledict().merge_with( 

953 exec_options, {key: new_options} 

954 ) 

955 return new_options, exec_options 

956 

957 else: 

958 return existing_options, exec_options 

959 

960 if TYPE_CHECKING: 

961 

962 def __getattr__(self, key: str) -> Any: ... 

963 

964 def __setattr__(self, key: str, value: Any) -> None: ... 

965 

966 def __delattr__(self, key: str) -> None: ... 

967 

968 

969class CacheableOptions(Options, HasCacheKey): 

970 __slots__ = () 

971 

972 @hybridmethod 

973 def _gen_cache_key_inst(self, anon_map, bindparams): 

974 return HasCacheKey._gen_cache_key(self, anon_map, bindparams) 

975 

976 @_gen_cache_key_inst.classlevel 

977 def _gen_cache_key(cls, anon_map, bindparams): 

978 return (cls, ()) 

979 

980 @hybridmethod 

981 def _generate_cache_key(self): 

982 return HasCacheKey._generate_cache_key_for_object(self) 

983 

984 

985class ExecutableOption(HasCopyInternals): 

986 __slots__ = () 

987 

988 _annotations = util.EMPTY_DICT 

989 

990 __visit_name__ = "executable_option" 

991 

992 _is_has_cache_key = False 

993 

994 _is_core = True 

995 

996 def _clone(self, **kw): 

997 """Create a shallow copy of this ExecutableOption.""" 

998 c = self.__class__.__new__(self.__class__) 

999 c.__dict__ = dict(self.__dict__) # type: ignore 

1000 return c 

1001 

1002 

1003class Executable(roles.StatementRole): 

1004 """Mark a :class:`_expression.ClauseElement` as supporting execution. 

1005 

1006 :class:`.Executable` is a superclass for all "statement" types 

1007 of objects, including :func:`select`, :func:`delete`, :func:`update`, 

1008 :func:`insert`, :func:`text`. 

1009 

1010 """ 

1011 

1012 supports_execution: bool = True 

1013 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT 

1014 _is_default_generator = False 

1015 _with_options: Tuple[ExecutableOption, ...] = () 

1016 _with_context_options: Tuple[ 

1017 Tuple[Callable[[CompileState], None], Any], ... 

1018 ] = () 

1019 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]] 

1020 

1021 _executable_traverse_internals = [ 

1022 ("_with_options", InternalTraversal.dp_executable_options), 

1023 ( 

1024 "_with_context_options", 

1025 ExtendedInternalTraversal.dp_with_context_options, 

1026 ), 

1027 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs), 

1028 ] 

1029 

1030 is_select = False 

1031 is_from_statement = False 

1032 is_update = False 

1033 is_insert = False 

1034 is_text = False 

1035 is_delete = False 

1036 is_dml = False 

1037 

1038 if TYPE_CHECKING: 

1039 __visit_name__: str 

1040 

1041 def _compile_w_cache( 

1042 self, 

1043 dialect: Dialect, 

1044 *, 

1045 compiled_cache: Optional[CompiledCacheType], 

1046 column_keys: List[str], 

1047 for_executemany: bool = False, 

1048 schema_translate_map: Optional[SchemaTranslateMapType] = None, 

1049 **kw: Any, 

1050 ) -> Tuple[ 

1051 Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats 

1052 ]: ... 

1053 

1054 def _execute_on_connection( 

1055 self, 

1056 connection: Connection, 

1057 distilled_params: _CoreMultiExecuteParams, 

1058 execution_options: CoreExecuteOptionsParameter, 

1059 ) -> CursorResult[Any]: ... 

1060 

1061 def _execute_on_scalar( 

1062 self, 

1063 connection: Connection, 

1064 distilled_params: _CoreMultiExecuteParams, 

1065 execution_options: CoreExecuteOptionsParameter, 

1066 ) -> Any: ... 

1067 

1068 @util.ro_non_memoized_property 

1069 def _all_selected_columns(self): 

1070 raise NotImplementedError() 

1071 

1072 @property 

1073 def _effective_plugin_target(self) -> str: 

1074 return self.__visit_name__ 

1075 

1076 @_generative 

1077 def options(self, *options: ExecutableOption) -> Self: 

1078 """Apply options to this statement. 

1079 

1080 In the general sense, options are any kind of Python object 

1081 that can be interpreted by the SQL compiler for the statement. 

1082 These options can be consumed by specific dialects or specific kinds 

1083 of compilers. 

1084 

1085 The most commonly known kind of option are the ORM level options 

1086 that apply "eager load" and other loading behaviors to an ORM 

1087 query. However, options can theoretically be used for many other 

1088 purposes. 

1089 

1090 For background on specific kinds of options for specific kinds of 

1091 statements, refer to the documentation for those option objects. 

1092 

1093 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to 

1094 Core statement objects towards the goal of allowing unified 

1095 Core / ORM querying capabilities. 

1096 

1097 .. seealso:: 

1098 

1099 :ref:`loading_columns` - refers to options specific to the usage 

1100 of ORM queries 

1101 

1102 :ref:`relationship_loader_options` - refers to options specific 

1103 to the usage of ORM queries 

1104 

1105 """ 

1106 self._with_options += tuple( 

1107 coercions.expect(roles.ExecutableOptionRole, opt) 

1108 for opt in options 

1109 ) 

1110 return self 

1111 

1112 @_generative 

1113 def _set_compile_options(self, compile_options: CacheableOptions) -> Self: 

1114 """Assign the compile options to a new value. 

1115 

1116 :param compile_options: appropriate CacheableOptions structure 

1117 

1118 """ 

1119 

1120 self._compile_options = compile_options 

1121 return self 

1122 

1123 @_generative 

1124 def _update_compile_options(self, options: CacheableOptions) -> Self: 

1125 """update the _compile_options with new keys.""" 

1126 

1127 assert self._compile_options is not None 

1128 self._compile_options += options 

1129 return self 

1130 

1131 @_generative 

1132 def _add_context_option( 

1133 self, 

1134 callable_: Callable[[CompileState], None], 

1135 cache_args: Any, 

1136 ) -> Self: 

1137 """Add a context option to this statement. 

1138 

1139 These are callable functions that will 

1140 be given the CompileState object upon compilation. 

1141 

1142 A second argument cache_args is required, which will be combined with 

1143 the ``__code__`` identity of the function itself in order to produce a 

1144 cache key. 

1145 

1146 """ 

1147 self._with_context_options += ((callable_, cache_args),) 

1148 return self 

1149 

1150 @overload 

1151 def execution_options( 

1152 self, 

1153 *, 

1154 compiled_cache: Optional[CompiledCacheType] = ..., 

1155 logging_token: str = ..., 

1156 isolation_level: IsolationLevel = ..., 

1157 no_parameters: bool = False, 

1158 stream_results: bool = False, 

1159 max_row_buffer: int = ..., 

1160 yield_per: int = ..., 

1161 insertmanyvalues_page_size: int = ..., 

1162 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

1163 populate_existing: bool = False, 

1164 autoflush: bool = False, 

1165 synchronize_session: SynchronizeSessionArgument = ..., 

1166 dml_strategy: DMLStrategyArgument = ..., 

1167 render_nulls: bool = ..., 

1168 is_delete_using: bool = ..., 

1169 is_update_from: bool = ..., 

1170 preserve_rowcount: bool = False, 

1171 **opt: Any, 

1172 ) -> Self: ... 

1173 

1174 @overload 

1175 def execution_options(self, **opt: Any) -> Self: ... 

1176 

1177 @_generative 

1178 def execution_options(self, **kw: Any) -> Self: 

1179 """Set non-SQL options for the statement which take effect during 

1180 execution. 

1181 

1182 Execution options can be set at many scopes, including per-statement, 

1183 per-connection, or per execution, using methods such as 

1184 :meth:`_engine.Connection.execution_options` and parameters which 

1185 accept a dictionary of options such as 

1186 :paramref:`_engine.Connection.execute.execution_options` and 

1187 :paramref:`_orm.Session.execute.execution_options`. 

1188 

1189 The primary characteristic of an execution option, as opposed to 

1190 other kinds of options such as ORM loader options, is that 

1191 **execution options never affect the compiled SQL of a query, only 

1192 things that affect how the SQL statement itself is invoked or how 

1193 results are fetched**. That is, execution options are not part of 

1194 what's accommodated by SQL compilation nor are they considered part of 

1195 the cached state of a statement. 

1196 

1197 The :meth:`_sql.Executable.execution_options` method is 

1198 :term:`generative`, as 

1199 is the case for the method as applied to the :class:`_engine.Engine` 

1200 and :class:`_orm.Query` objects, which means when the method is called, 

1201 a copy of the object is returned, which applies the given parameters to 

1202 that new copy, but leaves the original unchanged:: 

1203 

1204 statement = select(table.c.x, table.c.y) 

1205 new_statement = statement.execution_options(my_option=True) 

1206 

1207 An exception to this behavior is the :class:`_engine.Connection` 

1208 object, where the :meth:`_engine.Connection.execution_options` method 

1209 is explicitly **not** generative. 

1210 

1211 The kinds of options that may be passed to 

1212 :meth:`_sql.Executable.execution_options` and other related methods and 

1213 parameter dictionaries include parameters that are explicitly consumed 

1214 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not 

1215 defined by SQLAlchemy, which means the methods and/or parameter 

1216 dictionaries may be used for user-defined parameters that interact with 

1217 custom code, which may access the parameters using methods such as 

1218 :meth:`_sql.Executable.get_execution_options` and 

1219 :meth:`_engine.Connection.get_execution_options`, or within selected 

1220 event hooks using a dedicated ``execution_options`` event parameter 

1221 such as 

1222 :paramref:`_events.ConnectionEvents.before_execute.execution_options` 

1223 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.:: 

1224 

1225 from sqlalchemy import event 

1226 

1227 

1228 @event.listens_for(some_engine, "before_execute") 

1229 def _process_opt(conn, statement, multiparams, params, execution_options): 

1230 "run a SQL function before invoking a statement" 

1231 

1232 if execution_options.get("do_special_thing", False): 

1233 conn.exec_driver_sql("run_special_function()") 

1234 

1235 Within the scope of options that are explicitly recognized by 

1236 SQLAlchemy, most apply to specific classes of objects and not others. 

1237 The most common execution options include: 

1238 

1239 * :paramref:`_engine.Connection.execution_options.isolation_level` - 

1240 sets the isolation level for a connection or a class of connections 

1241 via an :class:`_engine.Engine`. This option is accepted only 

1242 by :class:`_engine.Connection` or :class:`_engine.Engine`. 

1243 

1244 * :paramref:`_engine.Connection.execution_options.stream_results` - 

1245 indicates results should be fetched using a server side cursor; 

1246 this option is accepted by :class:`_engine.Connection`, by the 

1247 :paramref:`_engine.Connection.execute.execution_options` parameter 

1248 on :meth:`_engine.Connection.execute`, and additionally by 

1249 :meth:`_sql.Executable.execution_options` on a SQL statement object, 

1250 as well as by ORM constructs like :meth:`_orm.Session.execute`. 

1251 

1252 * :paramref:`_engine.Connection.execution_options.compiled_cache` - 

1253 indicates a dictionary that will serve as the 

1254 :ref:`SQL compilation cache <sql_caching>` 

1255 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as 

1256 well as for ORM methods like :meth:`_orm.Session.execute`. 

1257 Can be passed as ``None`` to disable caching for statements. 

1258 This option is not accepted by 

1259 :meth:`_sql.Executable.execution_options` as it is inadvisable to 

1260 carry along a compilation cache within a statement object. 

1261 

1262 * :paramref:`_engine.Connection.execution_options.schema_translate_map` 

1263 - a mapping of schema names used by the 

1264 :ref:`Schema Translate Map <schema_translating>` feature, accepted 

1265 by :class:`_engine.Connection`, :class:`_engine.Engine`, 

1266 :class:`_sql.Executable`, as well as by ORM constructs 

1267 like :meth:`_orm.Session.execute`. 

1268 

1269 .. seealso:: 

1270 

1271 :meth:`_engine.Connection.execution_options` 

1272 

1273 :paramref:`_engine.Connection.execute.execution_options` 

1274 

1275 :paramref:`_orm.Session.execute.execution_options` 

1276 

1277 :ref:`orm_queryguide_execution_options` - documentation on all 

1278 ORM-specific execution options 

1279 

1280 """ # noqa: E501 

1281 if "isolation_level" in kw: 

1282 raise exc.ArgumentError( 

1283 "'isolation_level' execution option may only be specified " 

1284 "on Connection.execution_options(), or " 

1285 "per-engine using the isolation_level " 

1286 "argument to create_engine()." 

1287 ) 

1288 if "compiled_cache" in kw: 

1289 raise exc.ArgumentError( 

1290 "'compiled_cache' execution option may only be specified " 

1291 "on Connection.execution_options(), not per statement." 

1292 ) 

1293 self._execution_options = self._execution_options.union(kw) 

1294 return self 

1295 

1296 def get_execution_options(self) -> _ExecuteOptions: 

1297 """Get the non-SQL options which will take effect during execution. 

1298 

1299 .. versionadded:: 1.3 

1300 

1301 .. seealso:: 

1302 

1303 :meth:`.Executable.execution_options` 

1304 """ 

1305 return self._execution_options 

1306 

1307 

1308class SchemaEventTarget(event.EventTarget): 

1309 """Base class for elements that are the targets of :class:`.DDLEvents` 

1310 events. 

1311 

1312 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`. 

1313 

1314 """ 

1315 

1316 dispatch: dispatcher[SchemaEventTarget] 

1317 

1318 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: 

1319 """Associate with this SchemaEvent's parent object.""" 

1320 

1321 def _set_parent_with_dispatch( 

1322 self, parent: SchemaEventTarget, **kw: Any 

1323 ) -> None: 

1324 self.dispatch.before_parent_attach(self, parent) 

1325 self._set_parent(parent, **kw) 

1326 self.dispatch.after_parent_attach(self, parent) 

1327 

1328 

1329class SchemaVisitable(SchemaEventTarget, visitors.Visitable): 

1330 """Base class for elements that are targets of a :class:`.SchemaVisitor`. 

1331 

1332 .. versionadded:: 2.0.41 

1333 

1334 """ 

1335 

1336 

1337class SchemaVisitor(ClauseVisitor): 

1338 """Define the visiting for ``SchemaItem`` and more 

1339 generally ``SchemaVisitable`` objects. 

1340 

1341 """ 

1342 

1343 __traverse_options__ = {"schema_visitor": True} 

1344 

1345 

1346class _SentinelDefaultCharacterization(Enum): 

1347 NONE = "none" 

1348 UNKNOWN = "unknown" 

1349 CLIENTSIDE = "clientside" 

1350 SENTINEL_DEFAULT = "sentinel_default" 

1351 SERVERSIDE = "serverside" 

1352 IDENTITY = "identity" 

1353 SEQUENCE = "sequence" 

1354 

1355 

1356class _SentinelColumnCharacterization(NamedTuple): 

1357 columns: Optional[Sequence[Column[Any]]] = None 

1358 is_explicit: bool = False 

1359 is_autoinc: bool = False 

1360 default_characterization: _SentinelDefaultCharacterization = ( 

1361 _SentinelDefaultCharacterization.NONE 

1362 ) 

1363 

1364 

1365_COLKEY = TypeVar("_COLKEY", Union[None, str], str) 

1366 

1367_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True) 

1368_COL = TypeVar("_COL", bound="ColumnElement[Any]") 

1369 

1370 

1371class _ColumnMetrics(Generic[_COL_co]): 

1372 __slots__ = ("column",) 

1373 

1374 column: _COL_co 

1375 

1376 def __init__( 

1377 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co 

1378 ): 

1379 self.column = col 

1380 

1381 # proxy_index being non-empty means it was initialized. 

1382 # so we need to update it 

1383 pi = collection._proxy_index 

1384 if pi: 

1385 for eps_col in col._expanded_proxy_set: 

1386 pi[eps_col].add(self) 

1387 

1388 def get_expanded_proxy_set(self): 

1389 return self.column._expanded_proxy_set 

1390 

1391 def dispose(self, collection): 

1392 pi = collection._proxy_index 

1393 if not pi: 

1394 return 

1395 for col in self.column._expanded_proxy_set: 

1396 colset = pi.get(col, None) 

1397 if colset: 

1398 colset.discard(self) 

1399 if colset is not None and not colset: 

1400 del pi[col] 

1401 

1402 def embedded( 

1403 self, 

1404 target_set: Union[ 

1405 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]] 

1406 ], 

1407 ) -> bool: 

1408 expanded_proxy_set = self.column._expanded_proxy_set 

1409 for t in target_set.difference(expanded_proxy_set): 

1410 if not expanded_proxy_set.intersection(_expand_cloned([t])): 

1411 return False 

1412 return True 

1413 

1414 

1415class ColumnCollection(Generic[_COLKEY, _COL_co]): 

1416 """Collection of :class:`_expression.ColumnElement` instances, 

1417 typically for 

1418 :class:`_sql.FromClause` objects. 

1419 

1420 The :class:`_sql.ColumnCollection` object is most commonly available 

1421 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection 

1422 on the :class:`_schema.Table` object, introduced at 

1423 :ref:`metadata_tables_and_columns`. 

1424 

1425 The :class:`_expression.ColumnCollection` has both mapping- and sequence- 

1426 like behaviors. A :class:`_expression.ColumnCollection` usually stores 

1427 :class:`_schema.Column` objects, which are then accessible both via mapping 

1428 style access as well as attribute access style. 

1429 

1430 To access :class:`_schema.Column` objects using ordinary attribute-style 

1431 access, specify the name like any other object attribute, such as below 

1432 a column named ``employee_name`` is accessed:: 

1433 

1434 >>> employee_table.c.employee_name 

1435 

1436 To access columns that have names with special characters or spaces, 

1437 index-style access is used, such as below which illustrates a column named 

1438 ``employee ' payment`` is accessed:: 

1439 

1440 >>> employee_table.c["employee ' payment"] 

1441 

1442 As the :class:`_sql.ColumnCollection` object provides a Python dictionary 

1443 interface, common dictionary method names like 

1444 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`, 

1445 and :meth:`_sql.ColumnCollection.items` are available, which means that 

1446 database columns that are keyed under these names also need to use indexed 

1447 access:: 

1448 

1449 >>> employee_table.c["values"] 

1450 

1451 

1452 The name for which a :class:`_schema.Column` would be present is normally 

1453 that of the :paramref:`_schema.Column.key` parameter. In some contexts, 

1454 such as a :class:`_sql.Select` object that uses a label style set 

1455 using the :meth:`_sql.Select.set_label_style` method, a column of a certain 

1456 key may instead be represented under a particular label name such 

1457 as ``tablename_columnname``:: 

1458 

1459 >>> from sqlalchemy import select, column, table 

1460 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL 

1461 >>> t = table("t", column("c")) 

1462 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL) 

1463 >>> subq = stmt.subquery() 

1464 >>> subq.c.t_c 

1465 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c> 

1466 

1467 :class:`.ColumnCollection` also indexes the columns in order and allows 

1468 them to be accessible by their integer position:: 

1469 

1470 >>> cc[0] 

1471 Column('x', Integer(), table=None) 

1472 >>> cc[1] 

1473 Column('y', Integer(), table=None) 

1474 

1475 .. versionadded:: 1.4 :class:`_expression.ColumnCollection` 

1476 allows integer-based 

1477 index access to the collection. 

1478 

1479 Iterating the collection yields the column expressions in order:: 

1480 

1481 >>> list(cc) 

1482 [Column('x', Integer(), table=None), 

1483 Column('y', Integer(), table=None)] 

1484 

1485 The base :class:`_expression.ColumnCollection` object can store 

1486 duplicates, which can 

1487 mean either two columns with the same key, in which case the column 

1488 returned by key access is **arbitrary**:: 

1489 

1490 >>> x1, x2 = Column("x", Integer), Column("x", Integer) 

1491 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)]) 

1492 >>> list(cc) 

1493 [Column('x', Integer(), table=None), 

1494 Column('x', Integer(), table=None)] 

1495 >>> cc["x"] is x1 

1496 False 

1497 >>> cc["x"] is x2 

1498 True 

1499 

1500 Or it can also mean the same column multiple times. These cases are 

1501 supported as :class:`_expression.ColumnCollection` 

1502 is used to represent the columns in 

1503 a SELECT statement which may include duplicates. 

1504 

1505 A special subclass :class:`.DedupeColumnCollection` exists which instead 

1506 maintains SQLAlchemy's older behavior of not allowing duplicates; this 

1507 collection is used for schema level objects like :class:`_schema.Table` 

1508 and 

1509 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The 

1510 :class:`.DedupeColumnCollection` class also has additional mutation methods 

1511 as the schema constructs have more use cases that require removal and 

1512 replacement of columns. 

1513 

1514 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection` 

1515 now stores duplicate 

1516 column keys as well as the same column in multiple positions. The 

1517 :class:`.DedupeColumnCollection` class is added to maintain the 

1518 former behavior in those cases where deduplication as well as 

1519 additional replace/remove operations are needed. 

1520 

1521 

1522 """ 

1523 

1524 __slots__ = "_collection", "_index", "_colset", "_proxy_index" 

1525 

1526 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]] 

1527 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]] 

1528 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]] 

1529 _colset: Set[_COL_co] 

1530 

1531 def __init__( 

1532 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None 

1533 ): 

1534 object.__setattr__(self, "_colset", set()) 

1535 object.__setattr__(self, "_index", {}) 

1536 object.__setattr__( 

1537 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

1538 ) 

1539 object.__setattr__(self, "_collection", []) 

1540 if columns: 

1541 self._initial_populate(columns) 

1542 

1543 @util.preload_module("sqlalchemy.sql.elements") 

1544 def __clause_element__(self) -> ClauseList: 

1545 elements = util.preloaded.sql_elements 

1546 

1547 return elements.ClauseList( 

1548 _literal_as_text_role=roles.ColumnsClauseRole, 

1549 group=False, 

1550 *self._all_columns, 

1551 ) 

1552 

1553 def _initial_populate( 

1554 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

1555 ) -> None: 

1556 self._populate_separate_keys(iter_) 

1557 

1558 @property 

1559 def _all_columns(self) -> List[_COL_co]: 

1560 return [col for (_, col, _) in self._collection] 

1561 

1562 def keys(self) -> List[_COLKEY]: 

1563 """Return a sequence of string key names for all columns in this 

1564 collection.""" 

1565 return [k for (k, _, _) in self._collection] 

1566 

1567 def values(self) -> List[_COL_co]: 

1568 """Return a sequence of :class:`_sql.ColumnClause` or 

1569 :class:`_schema.Column` objects for all columns in this 

1570 collection.""" 

1571 return [col for (_, col, _) in self._collection] 

1572 

1573 def items(self) -> List[Tuple[_COLKEY, _COL_co]]: 

1574 """Return a sequence of (key, column) tuples for all columns in this 

1575 collection each consisting of a string key name and a 

1576 :class:`_sql.ColumnClause` or 

1577 :class:`_schema.Column` object. 

1578 """ 

1579 

1580 return [(k, col) for (k, col, _) in self._collection] 

1581 

1582 def __bool__(self) -> bool: 

1583 return bool(self._collection) 

1584 

1585 def __len__(self) -> int: 

1586 return len(self._collection) 

1587 

1588 def __iter__(self) -> Iterator[_COL_co]: 

1589 # turn to a list first to maintain over a course of changes 

1590 return iter([col for _, col, _ in self._collection]) 

1591 

1592 @overload 

1593 def __getitem__(self, key: Union[str, int]) -> _COL_co: ... 

1594 

1595 @overload 

1596 def __getitem__( 

1597 self, key: Tuple[Union[str, int], ...] 

1598 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1599 

1600 @overload 

1601 def __getitem__( 

1602 self, key: slice 

1603 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1604 

1605 def __getitem__( 

1606 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]] 

1607 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]: 

1608 try: 

1609 if isinstance(key, (tuple, slice)): 

1610 if isinstance(key, slice): 

1611 cols = ( 

1612 (sub_key, col) 

1613 for (sub_key, col, _) in self._collection[key] 

1614 ) 

1615 else: 

1616 cols = (self._index[sub_key] for sub_key in key) 

1617 

1618 return ColumnCollection(cols).as_readonly() 

1619 else: 

1620 return self._index[key][1] 

1621 except KeyError as err: 

1622 if isinstance(err.args[0], int): 

1623 raise IndexError(err.args[0]) from err 

1624 else: 

1625 raise 

1626 

1627 def __getattr__(self, key: str) -> _COL_co: 

1628 try: 

1629 return self._index[key][1] 

1630 except KeyError as err: 

1631 raise AttributeError(key) from err 

1632 

1633 def __contains__(self, key: str) -> bool: 

1634 if key not in self._index: 

1635 if not isinstance(key, str): 

1636 raise exc.ArgumentError( 

1637 "__contains__ requires a string argument" 

1638 ) 

1639 return False 

1640 else: 

1641 return True 

1642 

1643 def compare(self, other: ColumnCollection[Any, Any]) -> bool: 

1644 """Compare this :class:`_expression.ColumnCollection` to another 

1645 based on the names of the keys""" 

1646 

1647 for l, r in zip_longest(self, other): 

1648 if l is not r: 

1649 return False 

1650 else: 

1651 return True 

1652 

1653 def __eq__(self, other: Any) -> bool: 

1654 return self.compare(other) 

1655 

1656 @overload 

1657 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ... 

1658 

1659 @overload 

1660 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ... 

1661 

1662 def get( 

1663 self, key: str, default: Optional[_COL] = None 

1664 ) -> Optional[Union[_COL_co, _COL]]: 

1665 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object 

1666 based on a string key name from this 

1667 :class:`_expression.ColumnCollection`.""" 

1668 

1669 if key in self._index: 

1670 return self._index[key][1] 

1671 else: 

1672 return default 

1673 

1674 def __str__(self) -> str: 

1675 return "%s(%s)" % ( 

1676 self.__class__.__name__, 

1677 ", ".join(str(c) for c in self), 

1678 ) 

1679 

1680 def __setitem__(self, key: str, value: Any) -> NoReturn: 

1681 raise NotImplementedError() 

1682 

1683 def __delitem__(self, key: str) -> NoReturn: 

1684 raise NotImplementedError() 

1685 

1686 def __setattr__(self, key: str, obj: Any) -> NoReturn: 

1687 raise NotImplementedError() 

1688 

1689 def clear(self) -> NoReturn: 

1690 """Dictionary clear() is not implemented for 

1691 :class:`_sql.ColumnCollection`.""" 

1692 raise NotImplementedError() 

1693 

1694 def remove(self, column: Any) -> None: 

1695 raise NotImplementedError() 

1696 

1697 def update(self, iter_: Any) -> NoReturn: 

1698 """Dictionary update() is not implemented for 

1699 :class:`_sql.ColumnCollection`.""" 

1700 raise NotImplementedError() 

1701 

1702 # https://github.com/python/mypy/issues/4266 

1703 __hash__ = None # type: ignore 

1704 

1705 def _populate_separate_keys( 

1706 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

1707 ) -> None: 

1708 """populate from an iterator of (key, column)""" 

1709 

1710 self._collection[:] = collection = [ 

1711 (k, c, _ColumnMetrics(self, c)) for k, c in iter_ 

1712 ] 

1713 self._colset.update(c._deannotate() for _, c, _ in collection) 

1714 self._index.update( 

1715 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)} 

1716 ) 

1717 self._index.update({k: (k, col) for k, col, _ in reversed(collection)}) 

1718 

1719 def add( 

1720 self, column: ColumnElement[Any], key: Optional[_COLKEY] = None 

1721 ) -> None: 

1722 """Add a column to this :class:`_sql.ColumnCollection`. 

1723 

1724 .. note:: 

1725 

1726 This method is **not normally used by user-facing code**, as the 

1727 :class:`_sql.ColumnCollection` is usually part of an existing 

1728 object such as a :class:`_schema.Table`. To add a 

1729 :class:`_schema.Column` to an existing :class:`_schema.Table` 

1730 object, use the :meth:`_schema.Table.append_column` method. 

1731 

1732 """ 

1733 colkey: _COLKEY 

1734 

1735 if key is None: 

1736 colkey = column.key # type: ignore 

1737 else: 

1738 colkey = key 

1739 

1740 l = len(self._collection) 

1741 

1742 # don't really know how this part is supposed to work w/ the 

1743 # covariant thing 

1744 

1745 _column = cast(_COL_co, column) 

1746 

1747 self._collection.append( 

1748 (colkey, _column, _ColumnMetrics(self, _column)) 

1749 ) 

1750 self._colset.add(_column._deannotate()) 

1751 self._index[l] = (colkey, _column) 

1752 if colkey not in self._index: 

1753 self._index[colkey] = (colkey, _column) 

1754 

1755 def __getstate__(self) -> Dict[str, Any]: 

1756 return { 

1757 "_collection": [(k, c) for k, c, _ in self._collection], 

1758 "_index": self._index, 

1759 } 

1760 

1761 def __setstate__(self, state: Dict[str, Any]) -> None: 

1762 object.__setattr__(self, "_index", state["_index"]) 

1763 object.__setattr__( 

1764 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

1765 ) 

1766 object.__setattr__( 

1767 self, 

1768 "_collection", 

1769 [ 

1770 (k, c, _ColumnMetrics(self, c)) 

1771 for (k, c) in state["_collection"] 

1772 ], 

1773 ) 

1774 object.__setattr__( 

1775 self, "_colset", {col for k, col, _ in self._collection} 

1776 ) 

1777 

1778 def contains_column(self, col: ColumnElement[Any]) -> bool: 

1779 """Checks if a column object exists in this collection""" 

1780 if col not in self._colset: 

1781 if isinstance(col, str): 

1782 raise exc.ArgumentError( 

1783 "contains_column cannot be used with string arguments. " 

1784 "Use ``col_name in table.c`` instead." 

1785 ) 

1786 return False 

1787 else: 

1788 return True 

1789 

1790 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: 

1791 """Return a "read only" form of this 

1792 :class:`_sql.ColumnCollection`.""" 

1793 

1794 return ReadOnlyColumnCollection(self) 

1795 

1796 def _init_proxy_index(self): 

1797 """populate the "proxy index", if empty. 

1798 

1799 proxy index is added in 2.0 to provide more efficient operation 

1800 for the corresponding_column() method. 

1801 

1802 For reasons of both time to construct new .c collections as well as 

1803 memory conservation for large numbers of large .c collections, the 

1804 proxy_index is only filled if corresponding_column() is called. once 

1805 filled it stays that way, and new _ColumnMetrics objects created after 

1806 that point will populate it with new data. Note this case would be 

1807 unusual, if not nonexistent, as it means a .c collection is being 

1808 mutated after corresponding_column() were used, however it is tested in 

1809 test/base/test_utils.py. 

1810 

1811 """ 

1812 pi = self._proxy_index 

1813 if pi: 

1814 return 

1815 

1816 for _, _, metrics in self._collection: 

1817 eps = metrics.column._expanded_proxy_set 

1818 

1819 for eps_col in eps: 

1820 pi[eps_col].add(metrics) 

1821 

1822 def corresponding_column( 

1823 self, column: _COL, require_embedded: bool = False 

1824 ) -> Optional[Union[_COL, _COL_co]]: 

1825 """Given a :class:`_expression.ColumnElement`, return the exported 

1826 :class:`_expression.ColumnElement` object from this 

1827 :class:`_expression.ColumnCollection` 

1828 which corresponds to that original :class:`_expression.ColumnElement` 

1829 via a common 

1830 ancestor column. 

1831 

1832 :param column: the target :class:`_expression.ColumnElement` 

1833 to be matched. 

1834 

1835 :param require_embedded: only return corresponding columns for 

1836 the given :class:`_expression.ColumnElement`, if the given 

1837 :class:`_expression.ColumnElement` 

1838 is actually present within a sub-element 

1839 of this :class:`_expression.Selectable`. 

1840 Normally the column will match if 

1841 it merely shares a common ancestor with one of the exported 

1842 columns of this :class:`_expression.Selectable`. 

1843 

1844 .. seealso:: 

1845 

1846 :meth:`_expression.Selectable.corresponding_column` 

1847 - invokes this method 

1848 against the collection returned by 

1849 :attr:`_expression.Selectable.exported_columns`. 

1850 

1851 .. versionchanged:: 1.4 the implementation for ``corresponding_column`` 

1852 was moved onto the :class:`_expression.ColumnCollection` itself. 

1853 

1854 """ 

1855 # TODO: cython candidate 

1856 

1857 # don't dig around if the column is locally present 

1858 if column in self._colset: 

1859 return column 

1860 

1861 selected_intersection, selected_metrics = None, None 

1862 target_set = column.proxy_set 

1863 

1864 pi = self._proxy_index 

1865 if not pi: 

1866 self._init_proxy_index() 

1867 

1868 for current_metrics in ( 

1869 mm for ts in target_set if ts in pi for mm in pi[ts] 

1870 ): 

1871 if not require_embedded or current_metrics.embedded(target_set): 

1872 if selected_metrics is None: 

1873 # no corresponding column yet, pick this one. 

1874 selected_metrics = current_metrics 

1875 continue 

1876 

1877 current_intersection = target_set.intersection( 

1878 current_metrics.column._expanded_proxy_set 

1879 ) 

1880 if selected_intersection is None: 

1881 selected_intersection = target_set.intersection( 

1882 selected_metrics.column._expanded_proxy_set 

1883 ) 

1884 

1885 if len(current_intersection) > len(selected_intersection): 

1886 # 'current' has a larger field of correspondence than 

1887 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x 

1888 # matches a1.c.x->table.c.x better than 

1889 # selectable.c.x->table.c.x does. 

1890 

1891 selected_metrics = current_metrics 

1892 selected_intersection = current_intersection 

1893 elif current_intersection == selected_intersection: 

1894 # they have the same field of correspondence. see 

1895 # which proxy_set has fewer columns in it, which 

1896 # indicates a closer relationship with the root 

1897 # column. Also take into account the "weight" 

1898 # attribute which CompoundSelect() uses to give 

1899 # higher precedence to columns based on vertical 

1900 # position in the compound statement, and discard 

1901 # columns that have no reference to the target 

1902 # column (also occurs with CompoundSelect) 

1903 

1904 selected_col_distance = sum( 

1905 [ 

1906 sc._annotations.get("weight", 1) 

1907 for sc in ( 

1908 selected_metrics.column._uncached_proxy_list() 

1909 ) 

1910 if sc.shares_lineage(column) 

1911 ], 

1912 ) 

1913 current_col_distance = sum( 

1914 [ 

1915 sc._annotations.get("weight", 1) 

1916 for sc in ( 

1917 current_metrics.column._uncached_proxy_list() 

1918 ) 

1919 if sc.shares_lineage(column) 

1920 ], 

1921 ) 

1922 if current_col_distance < selected_col_distance: 

1923 selected_metrics = current_metrics 

1924 selected_intersection = current_intersection 

1925 

1926 return selected_metrics.column if selected_metrics else None 

1927 

1928 

1929_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]") 

1930 

1931 

1932class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]): 

1933 """A :class:`_expression.ColumnCollection` 

1934 that maintains deduplicating behavior. 

1935 

1936 This is useful by schema level objects such as :class:`_schema.Table` and 

1937 :class:`.PrimaryKeyConstraint`. The collection includes more 

1938 sophisticated mutator methods as well to suit schema objects which 

1939 require mutable column collections. 

1940 

1941 .. versionadded:: 1.4 

1942 

1943 """ 

1944 

1945 def add( # type: ignore[override] 

1946 self, column: _NAMEDCOL, key: Optional[str] = None 

1947 ) -> None: 

1948 if key is not None and column.key != key: 

1949 raise exc.ArgumentError( 

1950 "DedupeColumnCollection requires columns be under " 

1951 "the same key as their .key" 

1952 ) 

1953 key = column.key 

1954 

1955 if key is None: 

1956 raise exc.ArgumentError( 

1957 "Can't add unnamed column to column collection" 

1958 ) 

1959 

1960 if key in self._index: 

1961 existing = self._index[key][1] 

1962 

1963 if existing is column: 

1964 return 

1965 

1966 self.replace(column) 

1967 

1968 # pop out memoized proxy_set as this 

1969 # operation may very well be occurring 

1970 # in a _make_proxy operation 

1971 util.memoized_property.reset(column, "proxy_set") 

1972 else: 

1973 self._append_new_column(key, column) 

1974 

1975 def _append_new_column(self, key: str, named_column: _NAMEDCOL) -> None: 

1976 l = len(self._collection) 

1977 self._collection.append( 

1978 (key, named_column, _ColumnMetrics(self, named_column)) 

1979 ) 

1980 self._colset.add(named_column._deannotate()) 

1981 self._index[l] = (key, named_column) 

1982 self._index[key] = (key, named_column) 

1983 

1984 def _populate_separate_keys( 

1985 self, iter_: Iterable[Tuple[str, _NAMEDCOL]] 

1986 ) -> None: 

1987 """populate from an iterator of (key, column)""" 

1988 cols = list(iter_) 

1989 

1990 replace_col = [] 

1991 for k, col in cols: 

1992 if col.key != k: 

1993 raise exc.ArgumentError( 

1994 "DedupeColumnCollection requires columns be under " 

1995 "the same key as their .key" 

1996 ) 

1997 if col.name in self._index and col.key != col.name: 

1998 replace_col.append(col) 

1999 elif col.key in self._index: 

2000 replace_col.append(col) 

2001 else: 

2002 self._index[k] = (k, col) 

2003 self._collection.append((k, col, _ColumnMetrics(self, col))) 

2004 self._colset.update(c._deannotate() for (k, c, _) in self._collection) 

2005 

2006 self._index.update( 

2007 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection) 

2008 ) 

2009 for col in replace_col: 

2010 self.replace(col) 

2011 

2012 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None: 

2013 self._populate_separate_keys((col.key, col) for col in iter_) 

2014 

2015 def remove(self, column: _NAMEDCOL) -> None: 

2016 if column not in self._colset: 

2017 raise ValueError( 

2018 "Can't remove column %r; column is not in this collection" 

2019 % column 

2020 ) 

2021 del self._index[column.key] 

2022 self._colset.remove(column) 

2023 self._collection[:] = [ 

2024 (k, c, metrics) 

2025 for (k, c, metrics) in self._collection 

2026 if c is not column 

2027 ] 

2028 for metrics in self._proxy_index.get(column, ()): 

2029 metrics.dispose(self) 

2030 

2031 self._index.update( 

2032 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2033 ) 

2034 # delete higher index 

2035 del self._index[len(self._collection)] 

2036 

2037 def replace( 

2038 self, 

2039 column: _NAMEDCOL, 

2040 extra_remove: Optional[Iterable[_NAMEDCOL]] = None, 

2041 ) -> None: 

2042 """add the given column to this collection, removing unaliased 

2043 versions of this column as well as existing columns with the 

2044 same key. 

2045 

2046 e.g.:: 

2047 

2048 t = Table("sometable", metadata, Column("col1", Integer)) 

2049 t.columns.replace(Column("col1", Integer, key="columnone")) 

2050 

2051 will remove the original 'col1' from the collection, and add 

2052 the new column under the name 'columnname'. 

2053 

2054 Used by schema.Column to override columns during table reflection. 

2055 

2056 """ 

2057 

2058 if extra_remove: 

2059 remove_col = set(extra_remove) 

2060 else: 

2061 remove_col = set() 

2062 # remove up to two columns based on matches of name as well as key 

2063 if column.name in self._index and column.key != column.name: 

2064 other = self._index[column.name][1] 

2065 if other.name == other.key: 

2066 remove_col.add(other) 

2067 

2068 if column.key in self._index: 

2069 remove_col.add(self._index[column.key][1]) 

2070 

2071 if not remove_col: 

2072 self._append_new_column(column.key, column) 

2073 return 

2074 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = [] 

2075 replaced = False 

2076 for k, col, metrics in self._collection: 

2077 if col in remove_col: 

2078 if not replaced: 

2079 replaced = True 

2080 new_cols.append( 

2081 (column.key, column, _ColumnMetrics(self, column)) 

2082 ) 

2083 else: 

2084 new_cols.append((k, col, metrics)) 

2085 

2086 if remove_col: 

2087 self._colset.difference_update(remove_col) 

2088 

2089 for rc in remove_col: 

2090 for metrics in self._proxy_index.get(rc, ()): 

2091 metrics.dispose(self) 

2092 

2093 if not replaced: 

2094 new_cols.append((column.key, column, _ColumnMetrics(self, column))) 

2095 

2096 self._colset.add(column._deannotate()) 

2097 self._collection[:] = new_cols 

2098 

2099 self._index.clear() 

2100 

2101 self._index.update( 

2102 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2103 ) 

2104 self._index.update({k: (k, col) for (k, col, _) in self._collection}) 

2105 

2106 

2107class ReadOnlyColumnCollection( 

2108 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co] 

2109): 

2110 __slots__ = ("_parent",) 

2111 

2112 def __init__(self, collection): 

2113 object.__setattr__(self, "_parent", collection) 

2114 object.__setattr__(self, "_colset", collection._colset) 

2115 object.__setattr__(self, "_index", collection._index) 

2116 object.__setattr__(self, "_collection", collection._collection) 

2117 object.__setattr__(self, "_proxy_index", collection._proxy_index) 

2118 

2119 def __getstate__(self): 

2120 return {"_parent": self._parent} 

2121 

2122 def __setstate__(self, state): 

2123 parent = state["_parent"] 

2124 self.__init__(parent) # type: ignore 

2125 

2126 def add(self, column: Any, key: Any = ...) -> Any: 

2127 self._readonly() 

2128 

2129 def extend(self, elements: Any) -> NoReturn: 

2130 self._readonly() 

2131 

2132 def remove(self, item: Any) -> NoReturn: 

2133 self._readonly() 

2134 

2135 

2136class ColumnSet(util.OrderedSet["ColumnClause[Any]"]): 

2137 def contains_column(self, col): 

2138 return col in self 

2139 

2140 def extend(self, cols): 

2141 for col in cols: 

2142 self.add(col) 

2143 

2144 def __eq__(self, other): 

2145 l = [] 

2146 for c in other: 

2147 for local in self: 

2148 if c.shares_lineage(local): 

2149 l.append(c == local) 

2150 return elements.and_(*l) 

2151 

2152 def __hash__(self): # type: ignore[override] 

2153 return hash(tuple(x for x in self)) 

2154 

2155 

2156def _entity_namespace( 

2157 entity: Union[_HasEntityNamespace, ExternallyTraversible] 

2158) -> _EntityNamespace: 

2159 """Return the nearest .entity_namespace for the given entity. 

2160 

2161 If not immediately available, does an iterate to find a sub-element 

2162 that has one, if any. 

2163 

2164 """ 

2165 try: 

2166 return cast(_HasEntityNamespace, entity).entity_namespace 

2167 except AttributeError: 

2168 for elem in visitors.iterate(cast(ExternallyTraversible, entity)): 

2169 if _is_has_entity_namespace(elem): 

2170 return elem.entity_namespace 

2171 else: 

2172 raise 

2173 

2174 

2175def _entity_namespace_key( 

2176 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2177 key: str, 

2178 default: Union[SQLCoreOperations[Any], _NoArg] = NO_ARG, 

2179) -> SQLCoreOperations[Any]: 

2180 """Return an entry from an entity_namespace. 

2181 

2182 

2183 Raises :class:`_exc.InvalidRequestError` rather than attribute error 

2184 on not found. 

2185 

2186 """ 

2187 

2188 try: 

2189 ns = _entity_namespace(entity) 

2190 if default is not NO_ARG: 

2191 return getattr(ns, key, default) 

2192 else: 

2193 return getattr(ns, key) # type: ignore 

2194 except AttributeError as err: 

2195 raise exc.InvalidRequestError( 

2196 'Entity namespace for "%s" has no property "%s"' % (entity, key) 

2197 ) from err