Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/base.py: 50%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

836 statements  

1# sql/base.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Foundational utilities common to many sql modules. 

10 

11""" 

12 

13 

14from __future__ import annotations 

15 

16import collections 

17from enum import Enum 

18import itertools 

19from itertools import zip_longest 

20import operator 

21import re 

22from typing import Any 

23from typing import Callable 

24from typing import cast 

25from typing import Dict 

26from typing import FrozenSet 

27from typing import Generic 

28from typing import Iterable 

29from typing import Iterator 

30from typing import List 

31from typing import Mapping 

32from typing import MutableMapping 

33from typing import NamedTuple 

34from typing import NoReturn 

35from typing import Optional 

36from typing import overload 

37from typing import Protocol 

38from typing import Sequence 

39from typing import Set 

40from typing import Tuple 

41from typing import Type 

42from typing import TYPE_CHECKING 

43from typing import TypeVar 

44from typing import Union 

45 

46from . import roles 

47from . import visitors 

48from .cache_key import HasCacheKey # noqa 

49from .cache_key import MemoizedHasCacheKey # noqa 

50from .traversals import HasCopyInternals # noqa 

51from .visitors import ClauseVisitor 

52from .visitors import ExtendedInternalTraversal 

53from .visitors import ExternallyTraversible 

54from .visitors import InternalTraversal 

55from .. import event 

56from .. import exc 

57from .. import util 

58from ..util import HasMemoized as HasMemoized 

59from ..util import hybridmethod 

60from ..util.typing import Self 

61from ..util.typing import TypeGuard 

62 

63if TYPE_CHECKING: 

64 from . import coercions 

65 from . import elements 

66 from . import type_api 

67 from ._orm_types import DMLStrategyArgument 

68 from ._orm_types import SynchronizeSessionArgument 

69 from ._typing import _CLE 

70 from .elements import BindParameter 

71 from .elements import ClauseList 

72 from .elements import ColumnClause # noqa 

73 from .elements import ColumnElement 

74 from .elements import NamedColumn 

75 from .elements import SQLCoreOperations 

76 from .elements import TextClause 

77 from .schema import Column 

78 from .schema import DefaultGenerator 

79 from .selectable import _JoinTargetElement 

80 from .selectable import _SelectIterable 

81 from .selectable import FromClause 

82 from ..engine import Connection 

83 from ..engine import CursorResult 

84 from ..engine.interfaces import _CoreMultiExecuteParams 

85 from ..engine.interfaces import _ExecuteOptions 

86 from ..engine.interfaces import _ImmutableExecuteOptions 

87 from ..engine.interfaces import CacheStats 

88 from ..engine.interfaces import Compiled 

89 from ..engine.interfaces import CompiledCacheType 

90 from ..engine.interfaces import CoreExecuteOptionsParameter 

91 from ..engine.interfaces import Dialect 

92 from ..engine.interfaces import IsolationLevel 

93 from ..engine.interfaces import SchemaTranslateMapType 

94 from ..event import dispatcher 

95 

96if not TYPE_CHECKING: 

97 coercions = None # noqa 

98 elements = None # noqa 

99 type_api = None # noqa 

100 

101 

102class _NoArg(Enum): 

103 NO_ARG = 0 

104 

105 def __repr__(self): 

106 return f"_NoArg.{self.name}" 

107 

108 

109NO_ARG = _NoArg.NO_ARG 

110 

111 

112class _NoneName(Enum): 

113 NONE_NAME = 0 

114 """indicate a 'deferred' name that was ultimately the value None.""" 

115 

116 

117_NONE_NAME = _NoneName.NONE_NAME 

118 

119_T = TypeVar("_T", bound=Any) 

120 

121_Fn = TypeVar("_Fn", bound=Callable[..., Any]) 

122 

123_AmbiguousTableNameMap = MutableMapping[str, str] 

124 

125 

126class _DefaultDescriptionTuple(NamedTuple): 

127 arg: Any 

128 is_scalar: Optional[bool] 

129 is_callable: Optional[bool] 

130 is_sentinel: Optional[bool] 

131 

132 @classmethod 

133 def _from_column_default( 

134 cls, default: Optional[DefaultGenerator] 

135 ) -> _DefaultDescriptionTuple: 

136 return ( 

137 _DefaultDescriptionTuple( 

138 default.arg, # type: ignore 

139 default.is_scalar, 

140 default.is_callable, 

141 default.is_sentinel, 

142 ) 

143 if default 

144 and ( 

145 default.has_arg 

146 or (not default.for_update and default.is_sentinel) 

147 ) 

148 else _DefaultDescriptionTuple(None, None, None, None) 

149 ) 

150 

151 

152_never_select_column = operator.attrgetter("_omit_from_statements") 

153 

154 

155class _EntityNamespace(Protocol): 

156 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ... 

157 

158 

159class _HasEntityNamespace(Protocol): 

160 @util.ro_non_memoized_property 

161 def entity_namespace(self) -> _EntityNamespace: ... 

162 

163 

164def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]: 

165 return hasattr(element, "entity_namespace") 

166 

167 

168# Remove when https://github.com/python/mypy/issues/14640 will be fixed 

169_Self = TypeVar("_Self", bound=Any) 

170 

171 

172class Immutable: 

173 """mark a ClauseElement as 'immutable' when expressions are cloned. 

174 

175 "immutable" objects refers to the "mutability" of an object in the 

176 context of SQL DQL and DML generation. Such as, in DQL, one can 

177 compose a SELECT or subquery of varied forms, but one cannot modify 

178 the structure of a specific table or column within DQL. 

179 :class:`.Immutable` is mostly intended to follow this concept, and as 

180 such the primary "immutable" objects are :class:`.ColumnClause`, 

181 :class:`.Column`, :class:`.TableClause`, :class:`.Table`. 

182 

183 """ 

184 

185 __slots__ = () 

186 

187 _is_immutable = True 

188 

189 def unique_params(self, *optionaldict, **kwargs): 

190 raise NotImplementedError("Immutable objects do not support copying") 

191 

192 def params(self, *optionaldict, **kwargs): 

193 raise NotImplementedError("Immutable objects do not support copying") 

194 

195 def _clone(self: _Self, **kw: Any) -> _Self: 

196 return self 

197 

198 def _copy_internals( 

199 self, *, omit_attrs: Iterable[str] = (), **kw: Any 

200 ) -> None: 

201 pass 

202 

203 

204class SingletonConstant(Immutable): 

205 """Represent SQL constants like NULL, TRUE, FALSE""" 

206 

207 _is_singleton_constant = True 

208 

209 _singleton: SingletonConstant 

210 

211 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T: 

212 return cast(_T, cls._singleton) 

213 

214 @util.non_memoized_property 

215 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]: 

216 raise NotImplementedError() 

217 

218 @classmethod 

219 def _create_singleton(cls): 

220 obj = object.__new__(cls) 

221 obj.__init__() # type: ignore 

222 

223 # for a long time this was an empty frozenset, meaning 

224 # a SingletonConstant would never be a "corresponding column" in 

225 # a statement. This referred to #6259. However, in #7154 we see 

226 # that we do in fact need "correspondence" to work when matching cols 

227 # in result sets, so the non-correspondence was moved to a more 

228 # specific level when we are actually adapting expressions for SQL 

229 # render only. 

230 obj.proxy_set = frozenset([obj]) 

231 cls._singleton = obj 

232 

233 

234def _from_objects( 

235 *elements: Union[ 

236 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement 

237 ] 

238) -> Iterator[FromClause]: 

239 return itertools.chain.from_iterable( 

240 [element._from_objects for element in elements] 

241 ) 

242 

243 

244def _select_iterables( 

245 elements: Iterable[roles.ColumnsClauseRole], 

246) -> _SelectIterable: 

247 """expand tables into individual columns in the 

248 given list of column expressions. 

249 

250 """ 

251 return itertools.chain.from_iterable( 

252 [c._select_iterable for c in elements] 

253 ) 

254 

255 

256_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType") 

257 

258 

259class _GenerativeType(Protocol): 

260 def _generate(self) -> Self: ... 

261 

262 

263def _generative(fn: _Fn) -> _Fn: 

264 """non-caching _generative() decorator. 

265 

266 This is basically the legacy decorator that copies the object and 

267 runs a method on the new copy. 

268 

269 """ 

270 

271 @util.decorator 

272 def _generative( 

273 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any 

274 ) -> _SelfGenerativeType: 

275 """Mark a method as generative.""" 

276 

277 self = self._generate() 

278 x = fn(self, *args, **kw) 

279 assert x is self, "generative methods must return self" 

280 return self 

281 

282 decorated = _generative(fn) 

283 decorated.non_generative = fn # type: ignore 

284 return decorated 

285 

286 

287def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]: 

288 msgs = kw.pop("msgs", {}) 

289 

290 defaults = kw.pop("defaults", {}) 

291 

292 getters = [ 

293 (name, operator.attrgetter(name), defaults.get(name, None)) 

294 for name in names 

295 ] 

296 

297 @util.decorator 

298 def check(fn, *args, **kw): 

299 # make pylance happy by not including "self" in the argument 

300 # list 

301 self = args[0] 

302 args = args[1:] 

303 for name, getter, default_ in getters: 

304 if getter(self) is not default_: 

305 msg = msgs.get( 

306 name, 

307 "Method %s() has already been invoked on this %s construct" 

308 % (fn.__name__, self.__class__), 

309 ) 

310 raise exc.InvalidRequestError(msg) 

311 return fn(self, *args, **kw) 

312 

313 return check 

314 

315 

316def _clone(element, **kw): 

317 return element._clone(**kw) 

318 

319 

320def _expand_cloned( 

321 elements: Iterable[_CLE], 

322) -> Iterable[_CLE]: 

323 """expand the given set of ClauseElements to be the set of all 'cloned' 

324 predecessors. 

325 

326 """ 

327 # TODO: cython candidate 

328 return itertools.chain(*[x._cloned_set for x in elements]) 

329 

330 

331def _de_clone( 

332 elements: Iterable[_CLE], 

333) -> Iterable[_CLE]: 

334 for x in elements: 

335 while x._is_clone_of is not None: 

336 x = x._is_clone_of 

337 yield x 

338 

339 

340def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

341 """return the intersection of sets a and b, counting 

342 any overlap between 'cloned' predecessors. 

343 

344 The returned set is in terms of the entities present within 'a'. 

345 

346 """ 

347 all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b)) 

348 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)} 

349 

350 

351def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

352 all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b)) 

353 return { 

354 elem for elem in a if not all_overlap.intersection(elem._cloned_set) 

355 } 

356 

357 

358class _DialectArgView(MutableMapping[str, Any]): 

359 """A dictionary view of dialect-level arguments in the form 

360 <dialectname>_<argument_name>. 

361 

362 """ 

363 

364 def __init__(self, obj): 

365 self.obj = obj 

366 

367 def _key(self, key): 

368 try: 

369 dialect, value_key = key.split("_", 1) 

370 except ValueError as err: 

371 raise KeyError(key) from err 

372 else: 

373 return dialect, value_key 

374 

375 def __getitem__(self, key): 

376 dialect, value_key = self._key(key) 

377 

378 try: 

379 opt = self.obj.dialect_options[dialect] 

380 except exc.NoSuchModuleError as err: 

381 raise KeyError(key) from err 

382 else: 

383 return opt[value_key] 

384 

385 def __setitem__(self, key, value): 

386 try: 

387 dialect, value_key = self._key(key) 

388 except KeyError as err: 

389 raise exc.ArgumentError( 

390 "Keys must be of the form <dialectname>_<argname>" 

391 ) from err 

392 else: 

393 self.obj.dialect_options[dialect][value_key] = value 

394 

395 def __delitem__(self, key): 

396 dialect, value_key = self._key(key) 

397 del self.obj.dialect_options[dialect][value_key] 

398 

399 def __len__(self): 

400 return sum( 

401 len(args._non_defaults) 

402 for args in self.obj.dialect_options.values() 

403 ) 

404 

405 def __iter__(self): 

406 return ( 

407 "%s_%s" % (dialect_name, value_name) 

408 for dialect_name in self.obj.dialect_options 

409 for value_name in self.obj.dialect_options[ 

410 dialect_name 

411 ]._non_defaults 

412 ) 

413 

414 

415class _DialectArgDict(MutableMapping[str, Any]): 

416 """A dictionary view of dialect-level arguments for a specific 

417 dialect. 

418 

419 Maintains a separate collection of user-specified arguments 

420 and dialect-specified default arguments. 

421 

422 """ 

423 

424 def __init__(self): 

425 self._non_defaults = {} 

426 self._defaults = {} 

427 

428 def __len__(self): 

429 return len(set(self._non_defaults).union(self._defaults)) 

430 

431 def __iter__(self): 

432 return iter(set(self._non_defaults).union(self._defaults)) 

433 

434 def __getitem__(self, key): 

435 if key in self._non_defaults: 

436 return self._non_defaults[key] 

437 else: 

438 return self._defaults[key] 

439 

440 def __setitem__(self, key, value): 

441 self._non_defaults[key] = value 

442 

443 def __delitem__(self, key): 

444 del self._non_defaults[key] 

445 

446 

447@util.preload_module("sqlalchemy.dialects") 

448def _kw_reg_for_dialect(dialect_name): 

449 dialect_cls = util.preloaded.dialects.registry.load(dialect_name) 

450 if dialect_cls.construct_arguments is None: 

451 return None 

452 return dict(dialect_cls.construct_arguments) 

453 

454 

455class DialectKWArgs: 

456 """Establish the ability for a class to have dialect-specific arguments 

457 with defaults and constructor validation. 

458 

459 The :class:`.DialectKWArgs` interacts with the 

460 :attr:`.DefaultDialect.construct_arguments` present on a dialect. 

461 

462 .. seealso:: 

463 

464 :attr:`.DefaultDialect.construct_arguments` 

465 

466 """ 

467 

468 __slots__ = () 

469 

470 _dialect_kwargs_traverse_internals = [ 

471 ("dialect_options", InternalTraversal.dp_dialect_options) 

472 ] 

473 

474 @classmethod 

475 def argument_for(cls, dialect_name, argument_name, default): 

476 """Add a new kind of dialect-specific keyword argument for this class. 

477 

478 E.g.:: 

479 

480 Index.argument_for("mydialect", "length", None) 

481 

482 some_index = Index('a', 'b', mydialect_length=5) 

483 

484 The :meth:`.DialectKWArgs.argument_for` method is a per-argument 

485 way adding extra arguments to the 

486 :attr:`.DefaultDialect.construct_arguments` dictionary. This 

487 dictionary provides a list of argument names accepted by various 

488 schema-level constructs on behalf of a dialect. 

489 

490 New dialects should typically specify this dictionary all at once as a 

491 data member of the dialect class. The use case for ad-hoc addition of 

492 argument names is typically for end-user code that is also using 

493 a custom compilation scheme which consumes the additional arguments. 

494 

495 :param dialect_name: name of a dialect. The dialect must be 

496 locatable, else a :class:`.NoSuchModuleError` is raised. The 

497 dialect must also include an existing 

498 :attr:`.DefaultDialect.construct_arguments` collection, indicating 

499 that it participates in the keyword-argument validation and default 

500 system, else :class:`.ArgumentError` is raised. If the dialect does 

501 not include this collection, then any keyword argument can be 

502 specified on behalf of this dialect already. All dialects packaged 

503 within SQLAlchemy include this collection, however for third party 

504 dialects, support may vary. 

505 

506 :param argument_name: name of the parameter. 

507 

508 :param default: default value of the parameter. 

509 

510 """ 

511 

512 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name] 

513 if construct_arg_dictionary is None: 

514 raise exc.ArgumentError( 

515 "Dialect '%s' does have keyword-argument " 

516 "validation and defaults enabled configured" % dialect_name 

517 ) 

518 if cls not in construct_arg_dictionary: 

519 construct_arg_dictionary[cls] = {} 

520 construct_arg_dictionary[cls][argument_name] = default 

521 

522 @util.memoized_property 

523 def dialect_kwargs(self): 

524 """A collection of keyword arguments specified as dialect-specific 

525 options to this construct. 

526 

527 The arguments are present here in their original ``<dialect>_<kwarg>`` 

528 format. Only arguments that were actually passed are included; 

529 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which 

530 contains all options known by this dialect including defaults. 

531 

532 The collection is also writable; keys are accepted of the 

533 form ``<dialect>_<kwarg>`` where the value will be assembled 

534 into the list of options. 

535 

536 .. seealso:: 

537 

538 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form 

539 

540 """ 

541 return _DialectArgView(self) 

542 

543 @property 

544 def kwargs(self): 

545 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`.""" 

546 return self.dialect_kwargs 

547 

548 _kw_registry = util.PopulateDict(_kw_reg_for_dialect) 

549 

550 def _kw_reg_for_dialect_cls(self, dialect_name): 

551 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name] 

552 d = _DialectArgDict() 

553 

554 if construct_arg_dictionary is None: 

555 d._defaults.update({"*": None}) 

556 else: 

557 for cls in reversed(self.__class__.__mro__): 

558 if cls in construct_arg_dictionary: 

559 d._defaults.update(construct_arg_dictionary[cls]) 

560 return d 

561 

562 @util.memoized_property 

563 def dialect_options(self): 

564 """A collection of keyword arguments specified as dialect-specific 

565 options to this construct. 

566 

567 This is a two-level nested registry, keyed to ``<dialect_name>`` 

568 and ``<argument_name>``. For example, the ``postgresql_where`` 

569 argument would be locatable as:: 

570 

571 arg = my_object.dialect_options['postgresql']['where'] 

572 

573 .. versionadded:: 0.9.2 

574 

575 .. seealso:: 

576 

577 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form 

578 

579 """ 

580 

581 return util.PopulateDict( 

582 util.portable_instancemethod(self._kw_reg_for_dialect_cls) 

583 ) 

584 

585 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None: 

586 # validate remaining kwargs that they all specify DB prefixes 

587 

588 if not kwargs: 

589 return 

590 

591 for k in kwargs: 

592 m = re.match("^(.+?)_(.+)$", k) 

593 if not m: 

594 raise TypeError( 

595 "Additional arguments should be " 

596 "named <dialectname>_<argument>, got '%s'" % k 

597 ) 

598 dialect_name, arg_name = m.group(1, 2) 

599 

600 try: 

601 construct_arg_dictionary = self.dialect_options[dialect_name] 

602 except exc.NoSuchModuleError: 

603 util.warn( 

604 "Can't validate argument %r; can't " 

605 "locate any SQLAlchemy dialect named %r" 

606 % (k, dialect_name) 

607 ) 

608 self.dialect_options[dialect_name] = d = _DialectArgDict() 

609 d._defaults.update({"*": None}) 

610 d._non_defaults[arg_name] = kwargs[k] 

611 else: 

612 if ( 

613 "*" not in construct_arg_dictionary 

614 and arg_name not in construct_arg_dictionary 

615 ): 

616 raise exc.ArgumentError( 

617 "Argument %r is not accepted by " 

618 "dialect %r on behalf of %r" 

619 % (k, dialect_name, self.__class__) 

620 ) 

621 else: 

622 construct_arg_dictionary[arg_name] = kwargs[k] 

623 

624 

625class CompileState: 

626 """Produces additional object state necessary for a statement to be 

627 compiled. 

628 

629 the :class:`.CompileState` class is at the base of classes that assemble 

630 state for a particular statement object that is then used by the 

631 compiler. This process is essentially an extension of the process that 

632 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis 

633 on converting raw user intent into more organized structures rather than 

634 producing string output. The top-level :class:`.CompileState` for the 

635 statement being executed is also accessible when the execution context 

636 works with invoking the statement and collecting results. 

637 

638 The production of :class:`.CompileState` is specific to the compiler, such 

639 as within the :meth:`.SQLCompiler.visit_insert`, 

640 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also 

641 responsible for associating the :class:`.CompileState` with the 

642 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement, 

643 i.e. the outermost SQL statement that's actually being executed. 

644 There can be other :class:`.CompileState` objects that are not the 

645 toplevel, such as when a SELECT subquery or CTE-nested 

646 INSERT/UPDATE/DELETE is generated. 

647 

648 .. versionadded:: 1.4 

649 

650 """ 

651 

652 __slots__ = ("statement", "_ambiguous_table_name_map") 

653 

654 plugins: Dict[Tuple[str, str], Type[CompileState]] = {} 

655 

656 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] 

657 

658 @classmethod 

659 def create_for_statement(cls, statement, compiler, **kw): 

660 # factory construction. 

661 

662 if statement._propagate_attrs: 

663 plugin_name = statement._propagate_attrs.get( 

664 "compile_state_plugin", "default" 

665 ) 

666 klass = cls.plugins.get( 

667 (plugin_name, statement._effective_plugin_target), None 

668 ) 

669 if klass is None: 

670 klass = cls.plugins[ 

671 ("default", statement._effective_plugin_target) 

672 ] 

673 

674 else: 

675 klass = cls.plugins[ 

676 ("default", statement._effective_plugin_target) 

677 ] 

678 

679 if klass is cls: 

680 return cls(statement, compiler, **kw) 

681 else: 

682 return klass.create_for_statement(statement, compiler, **kw) 

683 

684 def __init__(self, statement, compiler, **kw): 

685 self.statement = statement 

686 

687 @classmethod 

688 def get_plugin_class( 

689 cls, statement: Executable 

690 ) -> Optional[Type[CompileState]]: 

691 plugin_name = statement._propagate_attrs.get( 

692 "compile_state_plugin", None 

693 ) 

694 

695 if plugin_name: 

696 key = (plugin_name, statement._effective_plugin_target) 

697 if key in cls.plugins: 

698 return cls.plugins[key] 

699 

700 # there's no case where we call upon get_plugin_class() and want 

701 # to get None back, there should always be a default. return that 

702 # if there was no plugin-specific class (e.g. Insert with "orm" 

703 # plugin) 

704 try: 

705 return cls.plugins[("default", statement._effective_plugin_target)] 

706 except KeyError: 

707 return None 

708 

709 @classmethod 

710 def _get_plugin_class_for_plugin( 

711 cls, statement: Executable, plugin_name: str 

712 ) -> Optional[Type[CompileState]]: 

713 try: 

714 return cls.plugins[ 

715 (plugin_name, statement._effective_plugin_target) 

716 ] 

717 except KeyError: 

718 return None 

719 

720 @classmethod 

721 def plugin_for( 

722 cls, plugin_name: str, visit_name: str 

723 ) -> Callable[[_Fn], _Fn]: 

724 def decorate(cls_to_decorate): 

725 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate 

726 return cls_to_decorate 

727 

728 return decorate 

729 

730 

731class Generative(HasMemoized): 

732 """Provide a method-chaining pattern in conjunction with the 

733 @_generative decorator.""" 

734 

735 def _generate(self) -> Self: 

736 skip = self._memoized_keys 

737 cls = self.__class__ 

738 s = cls.__new__(cls) 

739 if skip: 

740 # ensure this iteration remains atomic 

741 s.__dict__ = { 

742 k: v for k, v in self.__dict__.copy().items() if k not in skip 

743 } 

744 else: 

745 s.__dict__ = self.__dict__.copy() 

746 return s 

747 

748 

749class InPlaceGenerative(HasMemoized): 

750 """Provide a method-chaining pattern in conjunction with the 

751 @_generative decorator that mutates in place.""" 

752 

753 __slots__ = () 

754 

755 def _generate(self): 

756 skip = self._memoized_keys 

757 # note __dict__ needs to be in __slots__ if this is used 

758 for k in skip: 

759 self.__dict__.pop(k, None) 

760 return self 

761 

762 

763class HasCompileState(Generative): 

764 """A class that has a :class:`.CompileState` associated with it.""" 

765 

766 _compile_state_plugin: Optional[Type[CompileState]] = None 

767 

768 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT 

769 

770 _compile_state_factory = CompileState.create_for_statement 

771 

772 

773class _MetaOptions(type): 

774 """metaclass for the Options class. 

775 

776 This metaclass is actually necessary despite the availability of the 

777 ``__init_subclass__()`` hook as this type also provides custom class-level 

778 behavior for the ``__add__()`` method. 

779 

780 """ 

781 

782 _cache_attrs: Tuple[str, ...] 

783 

784 def __add__(self, other): 

785 o1 = self() 

786 

787 if set(other).difference(self._cache_attrs): 

788 raise TypeError( 

789 "dictionary contains attributes not covered by " 

790 "Options class %s: %r" 

791 % (self, set(other).difference(self._cache_attrs)) 

792 ) 

793 

794 o1.__dict__.update(other) 

795 return o1 

796 

797 if TYPE_CHECKING: 

798 

799 def __getattr__(self, key: str) -> Any: ... 

800 

801 def __setattr__(self, key: str, value: Any) -> None: ... 

802 

803 def __delattr__(self, key: str) -> None: ... 

804 

805 

806class Options(metaclass=_MetaOptions): 

807 """A cacheable option dictionary with defaults.""" 

808 

809 __slots__ = () 

810 

811 _cache_attrs: Tuple[str, ...] 

812 

813 def __init_subclass__(cls) -> None: 

814 dict_ = cls.__dict__ 

815 cls._cache_attrs = tuple( 

816 sorted( 

817 d 

818 for d in dict_ 

819 if not d.startswith("__") 

820 and d not in ("_cache_key_traversal",) 

821 ) 

822 ) 

823 super().__init_subclass__() 

824 

825 def __init__(self, **kw): 

826 self.__dict__.update(kw) 

827 

828 def __add__(self, other): 

829 o1 = self.__class__.__new__(self.__class__) 

830 o1.__dict__.update(self.__dict__) 

831 

832 if set(other).difference(self._cache_attrs): 

833 raise TypeError( 

834 "dictionary contains attributes not covered by " 

835 "Options class %s: %r" 

836 % (self, set(other).difference(self._cache_attrs)) 

837 ) 

838 

839 o1.__dict__.update(other) 

840 return o1 

841 

842 def __eq__(self, other): 

843 # TODO: very inefficient. This is used only in test suites 

844 # right now. 

845 for a, b in zip_longest(self._cache_attrs, other._cache_attrs): 

846 if getattr(self, a) != getattr(other, b): 

847 return False 

848 return True 

849 

850 def __repr__(self): 

851 # TODO: fairly inefficient, used only in debugging right now. 

852 

853 return "%s(%s)" % ( 

854 self.__class__.__name__, 

855 ", ".join( 

856 "%s=%r" % (k, self.__dict__[k]) 

857 for k in self._cache_attrs 

858 if k in self.__dict__ 

859 ), 

860 ) 

861 

862 @classmethod 

863 def isinstance(cls, klass: Type[Any]) -> bool: 

864 return issubclass(cls, klass) 

865 

866 @hybridmethod 

867 def add_to_element(self, name, value): 

868 return self + {name: getattr(self, name) + value} 

869 

870 @hybridmethod 

871 def _state_dict_inst(self) -> Mapping[str, Any]: 

872 return self.__dict__ 

873 

874 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT 

875 

876 @_state_dict_inst.classlevel 

877 def _state_dict(cls) -> Mapping[str, Any]: 

878 return cls._state_dict_const 

879 

880 @classmethod 

881 def safe_merge(cls, other): 

882 d = other._state_dict() 

883 

884 # only support a merge with another object of our class 

885 # and which does not have attrs that we don't. otherwise 

886 # we risk having state that might not be part of our cache 

887 # key strategy 

888 

889 if ( 

890 cls is not other.__class__ 

891 and other._cache_attrs 

892 and set(other._cache_attrs).difference(cls._cache_attrs) 

893 ): 

894 raise TypeError( 

895 "other element %r is not empty, is not of type %s, " 

896 "and contains attributes not covered here %r" 

897 % ( 

898 other, 

899 cls, 

900 set(other._cache_attrs).difference(cls._cache_attrs), 

901 ) 

902 ) 

903 return cls + d 

904 

905 @classmethod 

906 def from_execution_options( 

907 cls, key, attrs, exec_options, statement_exec_options 

908 ): 

909 """process Options argument in terms of execution options. 

910 

911 

912 e.g.:: 

913 

914 ( 

915 load_options, 

916 execution_options, 

917 ) = QueryContext.default_load_options.from_execution_options( 

918 "_sa_orm_load_options", 

919 { 

920 "populate_existing", 

921 "autoflush", 

922 "yield_per" 

923 }, 

924 execution_options, 

925 statement._execution_options, 

926 ) 

927 

928 get back the Options and refresh "_sa_orm_load_options" in the 

929 exec options dict w/ the Options as well 

930 

931 """ 

932 

933 # common case is that no options we are looking for are 

934 # in either dictionary, so cancel for that first 

935 check_argnames = attrs.intersection( 

936 set(exec_options).union(statement_exec_options) 

937 ) 

938 

939 existing_options = exec_options.get(key, cls) 

940 

941 if check_argnames: 

942 result = {} 

943 for argname in check_argnames: 

944 local = "_" + argname 

945 if argname in exec_options: 

946 result[local] = exec_options[argname] 

947 elif argname in statement_exec_options: 

948 result[local] = statement_exec_options[argname] 

949 

950 new_options = existing_options + result 

951 exec_options = util.immutabledict().merge_with( 

952 exec_options, {key: new_options} 

953 ) 

954 return new_options, exec_options 

955 

956 else: 

957 return existing_options, exec_options 

958 

959 if TYPE_CHECKING: 

960 

961 def __getattr__(self, key: str) -> Any: ... 

962 

963 def __setattr__(self, key: str, value: Any) -> None: ... 

964 

965 def __delattr__(self, key: str) -> None: ... 

966 

967 

968class CacheableOptions(Options, HasCacheKey): 

969 __slots__ = () 

970 

971 @hybridmethod 

972 def _gen_cache_key_inst(self, anon_map, bindparams): 

973 return HasCacheKey._gen_cache_key(self, anon_map, bindparams) 

974 

975 @_gen_cache_key_inst.classlevel 

976 def _gen_cache_key(cls, anon_map, bindparams): 

977 return (cls, ()) 

978 

979 @hybridmethod 

980 def _generate_cache_key(self): 

981 return HasCacheKey._generate_cache_key_for_object(self) 

982 

983 

984class ExecutableOption(HasCopyInternals): 

985 __slots__ = () 

986 

987 _annotations = util.EMPTY_DICT 

988 

989 __visit_name__ = "executable_option" 

990 

991 _is_has_cache_key = False 

992 

993 _is_core = True 

994 

995 def _clone(self, **kw): 

996 """Create a shallow copy of this ExecutableOption.""" 

997 c = self.__class__.__new__(self.__class__) 

998 c.__dict__ = dict(self.__dict__) # type: ignore 

999 return c 

1000 

1001 

1002class Executable(roles.StatementRole): 

1003 """Mark a :class:`_expression.ClauseElement` as supporting execution. 

1004 

1005 :class:`.Executable` is a superclass for all "statement" types 

1006 of objects, including :func:`select`, :func:`delete`, :func:`update`, 

1007 :func:`insert`, :func:`text`. 

1008 

1009 """ 

1010 

1011 supports_execution: bool = True 

1012 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT 

1013 _is_default_generator = False 

1014 _with_options: Tuple[ExecutableOption, ...] = () 

1015 _with_context_options: Tuple[ 

1016 Tuple[Callable[[CompileState], None], Any], ... 

1017 ] = () 

1018 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]] 

1019 

1020 _executable_traverse_internals = [ 

1021 ("_with_options", InternalTraversal.dp_executable_options), 

1022 ( 

1023 "_with_context_options", 

1024 ExtendedInternalTraversal.dp_with_context_options, 

1025 ), 

1026 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs), 

1027 ] 

1028 

1029 is_select = False 

1030 is_from_statement = False 

1031 is_update = False 

1032 is_insert = False 

1033 is_text = False 

1034 is_delete = False 

1035 is_dml = False 

1036 

1037 if TYPE_CHECKING: 

1038 __visit_name__: str 

1039 

1040 def _compile_w_cache( 

1041 self, 

1042 dialect: Dialect, 

1043 *, 

1044 compiled_cache: Optional[CompiledCacheType], 

1045 column_keys: List[str], 

1046 for_executemany: bool = False, 

1047 schema_translate_map: Optional[SchemaTranslateMapType] = None, 

1048 **kw: Any, 

1049 ) -> Tuple[ 

1050 Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats 

1051 ]: ... 

1052 

1053 def _execute_on_connection( 

1054 self, 

1055 connection: Connection, 

1056 distilled_params: _CoreMultiExecuteParams, 

1057 execution_options: CoreExecuteOptionsParameter, 

1058 ) -> CursorResult[Any]: ... 

1059 

1060 def _execute_on_scalar( 

1061 self, 

1062 connection: Connection, 

1063 distilled_params: _CoreMultiExecuteParams, 

1064 execution_options: CoreExecuteOptionsParameter, 

1065 ) -> Any: ... 

1066 

1067 @util.ro_non_memoized_property 

1068 def _all_selected_columns(self): 

1069 raise NotImplementedError() 

1070 

1071 @property 

1072 def _effective_plugin_target(self) -> str: 

1073 return self.__visit_name__ 

1074 

1075 @_generative 

1076 def options(self, *options: ExecutableOption) -> Self: 

1077 """Apply options to this statement. 

1078 

1079 In the general sense, options are any kind of Python object 

1080 that can be interpreted by the SQL compiler for the statement. 

1081 These options can be consumed by specific dialects or specific kinds 

1082 of compilers. 

1083 

1084 The most commonly known kind of option are the ORM level options 

1085 that apply "eager load" and other loading behaviors to an ORM 

1086 query. However, options can theoretically be used for many other 

1087 purposes. 

1088 

1089 For background on specific kinds of options for specific kinds of 

1090 statements, refer to the documentation for those option objects. 

1091 

1092 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to 

1093 Core statement objects towards the goal of allowing unified 

1094 Core / ORM querying capabilities. 

1095 

1096 .. seealso:: 

1097 

1098 :ref:`loading_columns` - refers to options specific to the usage 

1099 of ORM queries 

1100 

1101 :ref:`relationship_loader_options` - refers to options specific 

1102 to the usage of ORM queries 

1103 

1104 """ 

1105 self._with_options += tuple( 

1106 coercions.expect(roles.ExecutableOptionRole, opt) 

1107 for opt in options 

1108 ) 

1109 return self 

1110 

1111 @_generative 

1112 def _set_compile_options(self, compile_options: CacheableOptions) -> Self: 

1113 """Assign the compile options to a new value. 

1114 

1115 :param compile_options: appropriate CacheableOptions structure 

1116 

1117 """ 

1118 

1119 self._compile_options = compile_options 

1120 return self 

1121 

1122 @_generative 

1123 def _update_compile_options(self, options: CacheableOptions) -> Self: 

1124 """update the _compile_options with new keys.""" 

1125 

1126 assert self._compile_options is not None 

1127 self._compile_options += options 

1128 return self 

1129 

1130 @_generative 

1131 def _add_context_option( 

1132 self, 

1133 callable_: Callable[[CompileState], None], 

1134 cache_args: Any, 

1135 ) -> Self: 

1136 """Add a context option to this statement. 

1137 

1138 These are callable functions that will 

1139 be given the CompileState object upon compilation. 

1140 

1141 A second argument cache_args is required, which will be combined with 

1142 the ``__code__`` identity of the function itself in order to produce a 

1143 cache key. 

1144 

1145 """ 

1146 self._with_context_options += ((callable_, cache_args),) 

1147 return self 

1148 

1149 @overload 

1150 def execution_options( 

1151 self, 

1152 *, 

1153 compiled_cache: Optional[CompiledCacheType] = ..., 

1154 logging_token: str = ..., 

1155 isolation_level: IsolationLevel = ..., 

1156 no_parameters: bool = False, 

1157 stream_results: bool = False, 

1158 max_row_buffer: int = ..., 

1159 yield_per: int = ..., 

1160 insertmanyvalues_page_size: int = ..., 

1161 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

1162 populate_existing: bool = False, 

1163 autoflush: bool = False, 

1164 synchronize_session: SynchronizeSessionArgument = ..., 

1165 dml_strategy: DMLStrategyArgument = ..., 

1166 render_nulls: bool = ..., 

1167 is_delete_using: bool = ..., 

1168 is_update_from: bool = ..., 

1169 preserve_rowcount: bool = False, 

1170 **opt: Any, 

1171 ) -> Self: ... 

1172 

1173 @overload 

1174 def execution_options(self, **opt: Any) -> Self: ... 

1175 

1176 @_generative 

1177 def execution_options(self, **kw: Any) -> Self: 

1178 """Set non-SQL options for the statement which take effect during 

1179 execution. 

1180 

1181 Execution options can be set at many scopes, including per-statement, 

1182 per-connection, or per execution, using methods such as 

1183 :meth:`_engine.Connection.execution_options` and parameters which 

1184 accept a dictionary of options such as 

1185 :paramref:`_engine.Connection.execute.execution_options` and 

1186 :paramref:`_orm.Session.execute.execution_options`. 

1187 

1188 The primary characteristic of an execution option, as opposed to 

1189 other kinds of options such as ORM loader options, is that 

1190 **execution options never affect the compiled SQL of a query, only 

1191 things that affect how the SQL statement itself is invoked or how 

1192 results are fetched**. That is, execution options are not part of 

1193 what's accommodated by SQL compilation nor are they considered part of 

1194 the cached state of a statement. 

1195 

1196 The :meth:`_sql.Executable.execution_options` method is 

1197 :term:`generative`, as 

1198 is the case for the method as applied to the :class:`_engine.Engine` 

1199 and :class:`_orm.Query` objects, which means when the method is called, 

1200 a copy of the object is returned, which applies the given parameters to 

1201 that new copy, but leaves the original unchanged:: 

1202 

1203 statement = select(table.c.x, table.c.y) 

1204 new_statement = statement.execution_options(my_option=True) 

1205 

1206 An exception to this behavior is the :class:`_engine.Connection` 

1207 object, where the :meth:`_engine.Connection.execution_options` method 

1208 is explicitly **not** generative. 

1209 

1210 The kinds of options that may be passed to 

1211 :meth:`_sql.Executable.execution_options` and other related methods and 

1212 parameter dictionaries include parameters that are explicitly consumed 

1213 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not 

1214 defined by SQLAlchemy, which means the methods and/or parameter 

1215 dictionaries may be used for user-defined parameters that interact with 

1216 custom code, which may access the parameters using methods such as 

1217 :meth:`_sql.Executable.get_execution_options` and 

1218 :meth:`_engine.Connection.get_execution_options`, or within selected 

1219 event hooks using a dedicated ``execution_options`` event parameter 

1220 such as 

1221 :paramref:`_events.ConnectionEvents.before_execute.execution_options` 

1222 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.:: 

1223 

1224 from sqlalchemy import event 

1225 

1226 @event.listens_for(some_engine, "before_execute") 

1227 def _process_opt(conn, statement, multiparams, params, execution_options): 

1228 "run a SQL function before invoking a statement" 

1229 

1230 if execution_options.get("do_special_thing", False): 

1231 conn.exec_driver_sql("run_special_function()") 

1232 

1233 Within the scope of options that are explicitly recognized by 

1234 SQLAlchemy, most apply to specific classes of objects and not others. 

1235 The most common execution options include: 

1236 

1237 * :paramref:`_engine.Connection.execution_options.isolation_level` - 

1238 sets the isolation level for a connection or a class of connections 

1239 via an :class:`_engine.Engine`. This option is accepted only 

1240 by :class:`_engine.Connection` or :class:`_engine.Engine`. 

1241 

1242 * :paramref:`_engine.Connection.execution_options.stream_results` - 

1243 indicates results should be fetched using a server side cursor; 

1244 this option is accepted by :class:`_engine.Connection`, by the 

1245 :paramref:`_engine.Connection.execute.execution_options` parameter 

1246 on :meth:`_engine.Connection.execute`, and additionally by 

1247 :meth:`_sql.Executable.execution_options` on a SQL statement object, 

1248 as well as by ORM constructs like :meth:`_orm.Session.execute`. 

1249 

1250 * :paramref:`_engine.Connection.execution_options.compiled_cache` - 

1251 indicates a dictionary that will serve as the 

1252 :ref:`SQL compilation cache <sql_caching>` 

1253 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as 

1254 well as for ORM methods like :meth:`_orm.Session.execute`. 

1255 Can be passed as ``None`` to disable caching for statements. 

1256 This option is not accepted by 

1257 :meth:`_sql.Executable.execution_options` as it is inadvisable to 

1258 carry along a compilation cache within a statement object. 

1259 

1260 * :paramref:`_engine.Connection.execution_options.schema_translate_map` 

1261 - a mapping of schema names used by the 

1262 :ref:`Schema Translate Map <schema_translating>` feature, accepted 

1263 by :class:`_engine.Connection`, :class:`_engine.Engine`, 

1264 :class:`_sql.Executable`, as well as by ORM constructs 

1265 like :meth:`_orm.Session.execute`. 

1266 

1267 .. seealso:: 

1268 

1269 :meth:`_engine.Connection.execution_options` 

1270 

1271 :paramref:`_engine.Connection.execute.execution_options` 

1272 

1273 :paramref:`_orm.Session.execute.execution_options` 

1274 

1275 :ref:`orm_queryguide_execution_options` - documentation on all 

1276 ORM-specific execution options 

1277 

1278 """ # noqa: E501 

1279 if "isolation_level" in kw: 

1280 raise exc.ArgumentError( 

1281 "'isolation_level' execution option may only be specified " 

1282 "on Connection.execution_options(), or " 

1283 "per-engine using the isolation_level " 

1284 "argument to create_engine()." 

1285 ) 

1286 if "compiled_cache" in kw: 

1287 raise exc.ArgumentError( 

1288 "'compiled_cache' execution option may only be specified " 

1289 "on Connection.execution_options(), not per statement." 

1290 ) 

1291 self._execution_options = self._execution_options.union(kw) 

1292 return self 

1293 

1294 def get_execution_options(self) -> _ExecuteOptions: 

1295 """Get the non-SQL options which will take effect during execution. 

1296 

1297 .. versionadded:: 1.3 

1298 

1299 .. seealso:: 

1300 

1301 :meth:`.Executable.execution_options` 

1302 """ 

1303 return self._execution_options 

1304 

1305 

1306class SchemaEventTarget(event.EventTarget): 

1307 """Base class for elements that are the targets of :class:`.DDLEvents` 

1308 events. 

1309 

1310 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`. 

1311 

1312 """ 

1313 

1314 dispatch: dispatcher[SchemaEventTarget] 

1315 

1316 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: 

1317 """Associate with this SchemaEvent's parent object.""" 

1318 

1319 def _set_parent_with_dispatch( 

1320 self, parent: SchemaEventTarget, **kw: Any 

1321 ) -> None: 

1322 self.dispatch.before_parent_attach(self, parent) 

1323 self._set_parent(parent, **kw) 

1324 self.dispatch.after_parent_attach(self, parent) 

1325 

1326 

1327class SchemaVisitor(ClauseVisitor): 

1328 """Define the visiting for ``SchemaItem`` objects.""" 

1329 

1330 __traverse_options__ = {"schema_visitor": True} 

1331 

1332 

1333class _SentinelDefaultCharacterization(Enum): 

1334 NONE = "none" 

1335 UNKNOWN = "unknown" 

1336 CLIENTSIDE = "clientside" 

1337 SENTINEL_DEFAULT = "sentinel_default" 

1338 SERVERSIDE = "serverside" 

1339 IDENTITY = "identity" 

1340 SEQUENCE = "sequence" 

1341 

1342 

1343class _SentinelColumnCharacterization(NamedTuple): 

1344 columns: Optional[Sequence[Column[Any]]] = None 

1345 is_explicit: bool = False 

1346 is_autoinc: bool = False 

1347 default_characterization: _SentinelDefaultCharacterization = ( 

1348 _SentinelDefaultCharacterization.NONE 

1349 ) 

1350 

1351 

1352_COLKEY = TypeVar("_COLKEY", Union[None, str], str) 

1353 

1354_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True) 

1355_COL = TypeVar("_COL", bound="ColumnElement[Any]") 

1356 

1357 

1358class _ColumnMetrics(Generic[_COL_co]): 

1359 __slots__ = ("column",) 

1360 

1361 column: _COL_co 

1362 

1363 def __init__( 

1364 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co 

1365 ): 

1366 self.column = col 

1367 

1368 # proxy_index being non-empty means it was initialized. 

1369 # so we need to update it 

1370 pi = collection._proxy_index 

1371 if pi: 

1372 for eps_col in col._expanded_proxy_set: 

1373 pi[eps_col].add(self) 

1374 

1375 def get_expanded_proxy_set(self): 

1376 return self.column._expanded_proxy_set 

1377 

1378 def dispose(self, collection): 

1379 pi = collection._proxy_index 

1380 if not pi: 

1381 return 

1382 for col in self.column._expanded_proxy_set: 

1383 colset = pi.get(col, None) 

1384 if colset: 

1385 colset.discard(self) 

1386 if colset is not None and not colset: 

1387 del pi[col] 

1388 

1389 def embedded( 

1390 self, 

1391 target_set: Union[ 

1392 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]] 

1393 ], 

1394 ) -> bool: 

1395 expanded_proxy_set = self.column._expanded_proxy_set 

1396 for t in target_set.difference(expanded_proxy_set): 

1397 if not expanded_proxy_set.intersection(_expand_cloned([t])): 

1398 return False 

1399 return True 

1400 

1401 

1402class ColumnCollection(Generic[_COLKEY, _COL_co]): 

1403 """Collection of :class:`_expression.ColumnElement` instances, 

1404 typically for 

1405 :class:`_sql.FromClause` objects. 

1406 

1407 The :class:`_sql.ColumnCollection` object is most commonly available 

1408 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection 

1409 on the :class:`_schema.Table` object, introduced at 

1410 :ref:`metadata_tables_and_columns`. 

1411 

1412 The :class:`_expression.ColumnCollection` has both mapping- and sequence- 

1413 like behaviors. A :class:`_expression.ColumnCollection` usually stores 

1414 :class:`_schema.Column` objects, which are then accessible both via mapping 

1415 style access as well as attribute access style. 

1416 

1417 To access :class:`_schema.Column` objects using ordinary attribute-style 

1418 access, specify the name like any other object attribute, such as below 

1419 a column named ``employee_name`` is accessed:: 

1420 

1421 >>> employee_table.c.employee_name 

1422 

1423 To access columns that have names with special characters or spaces, 

1424 index-style access is used, such as below which illustrates a column named 

1425 ``employee ' payment`` is accessed:: 

1426 

1427 >>> employee_table.c["employee ' payment"] 

1428 

1429 As the :class:`_sql.ColumnCollection` object provides a Python dictionary 

1430 interface, common dictionary method names like 

1431 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`, 

1432 and :meth:`_sql.ColumnCollection.items` are available, which means that 

1433 database columns that are keyed under these names also need to use indexed 

1434 access:: 

1435 

1436 >>> employee_table.c["values"] 

1437 

1438 

1439 The name for which a :class:`_schema.Column` would be present is normally 

1440 that of the :paramref:`_schema.Column.key` parameter. In some contexts, 

1441 such as a :class:`_sql.Select` object that uses a label style set 

1442 using the :meth:`_sql.Select.set_label_style` method, a column of a certain 

1443 key may instead be represented under a particular label name such 

1444 as ``tablename_columnname``:: 

1445 

1446 >>> from sqlalchemy import select, column, table 

1447 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL 

1448 >>> t = table("t", column("c")) 

1449 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL) 

1450 >>> subq = stmt.subquery() 

1451 >>> subq.c.t_c 

1452 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c> 

1453 

1454 :class:`.ColumnCollection` also indexes the columns in order and allows 

1455 them to be accessible by their integer position:: 

1456 

1457 >>> cc[0] 

1458 Column('x', Integer(), table=None) 

1459 >>> cc[1] 

1460 Column('y', Integer(), table=None) 

1461 

1462 .. versionadded:: 1.4 :class:`_expression.ColumnCollection` 

1463 allows integer-based 

1464 index access to the collection. 

1465 

1466 Iterating the collection yields the column expressions in order:: 

1467 

1468 >>> list(cc) 

1469 [Column('x', Integer(), table=None), 

1470 Column('y', Integer(), table=None)] 

1471 

1472 The base :class:`_expression.ColumnCollection` object can store 

1473 duplicates, which can 

1474 mean either two columns with the same key, in which case the column 

1475 returned by key access is **arbitrary**:: 

1476 

1477 >>> x1, x2 = Column('x', Integer), Column('x', Integer) 

1478 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)]) 

1479 >>> list(cc) 

1480 [Column('x', Integer(), table=None), 

1481 Column('x', Integer(), table=None)] 

1482 >>> cc['x'] is x1 

1483 False 

1484 >>> cc['x'] is x2 

1485 True 

1486 

1487 Or it can also mean the same column multiple times. These cases are 

1488 supported as :class:`_expression.ColumnCollection` 

1489 is used to represent the columns in 

1490 a SELECT statement which may include duplicates. 

1491 

1492 A special subclass :class:`.DedupeColumnCollection` exists which instead 

1493 maintains SQLAlchemy's older behavior of not allowing duplicates; this 

1494 collection is used for schema level objects like :class:`_schema.Table` 

1495 and 

1496 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The 

1497 :class:`.DedupeColumnCollection` class also has additional mutation methods 

1498 as the schema constructs have more use cases that require removal and 

1499 replacement of columns. 

1500 

1501 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection` 

1502 now stores duplicate 

1503 column keys as well as the same column in multiple positions. The 

1504 :class:`.DedupeColumnCollection` class is added to maintain the 

1505 former behavior in those cases where deduplication as well as 

1506 additional replace/remove operations are needed. 

1507 

1508 

1509 """ 

1510 

1511 __slots__ = "_collection", "_index", "_colset", "_proxy_index" 

1512 

1513 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]] 

1514 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]] 

1515 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]] 

1516 _colset: Set[_COL_co] 

1517 

1518 def __init__( 

1519 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None 

1520 ): 

1521 object.__setattr__(self, "_colset", set()) 

1522 object.__setattr__(self, "_index", {}) 

1523 object.__setattr__( 

1524 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

1525 ) 

1526 object.__setattr__(self, "_collection", []) 

1527 if columns: 

1528 self._initial_populate(columns) 

1529 

1530 @util.preload_module("sqlalchemy.sql.elements") 

1531 def __clause_element__(self) -> ClauseList: 

1532 elements = util.preloaded.sql_elements 

1533 

1534 return elements.ClauseList( 

1535 _literal_as_text_role=roles.ColumnsClauseRole, 

1536 group=False, 

1537 *self._all_columns, 

1538 ) 

1539 

1540 def _initial_populate( 

1541 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

1542 ) -> None: 

1543 self._populate_separate_keys(iter_) 

1544 

1545 @property 

1546 def _all_columns(self) -> List[_COL_co]: 

1547 return [col for (_, col, _) in self._collection] 

1548 

1549 def keys(self) -> List[_COLKEY]: 

1550 """Return a sequence of string key names for all columns in this 

1551 collection.""" 

1552 return [k for (k, _, _) in self._collection] 

1553 

1554 def values(self) -> List[_COL_co]: 

1555 """Return a sequence of :class:`_sql.ColumnClause` or 

1556 :class:`_schema.Column` objects for all columns in this 

1557 collection.""" 

1558 return [col for (_, col, _) in self._collection] 

1559 

1560 def items(self) -> List[Tuple[_COLKEY, _COL_co]]: 

1561 """Return a sequence of (key, column) tuples for all columns in this 

1562 collection each consisting of a string key name and a 

1563 :class:`_sql.ColumnClause` or 

1564 :class:`_schema.Column` object. 

1565 """ 

1566 

1567 return [(k, col) for (k, col, _) in self._collection] 

1568 

1569 def __bool__(self) -> bool: 

1570 return bool(self._collection) 

1571 

1572 def __len__(self) -> int: 

1573 return len(self._collection) 

1574 

1575 def __iter__(self) -> Iterator[_COL_co]: 

1576 # turn to a list first to maintain over a course of changes 

1577 return iter([col for _, col, _ in self._collection]) 

1578 

1579 @overload 

1580 def __getitem__(self, key: Union[str, int]) -> _COL_co: ... 

1581 

1582 @overload 

1583 def __getitem__( 

1584 self, key: Tuple[Union[str, int], ...] 

1585 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1586 

1587 @overload 

1588 def __getitem__( 

1589 self, key: slice 

1590 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1591 

1592 def __getitem__( 

1593 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]] 

1594 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]: 

1595 try: 

1596 if isinstance(key, (tuple, slice)): 

1597 if isinstance(key, slice): 

1598 cols = ( 

1599 (sub_key, col) 

1600 for (sub_key, col, _) in self._collection[key] 

1601 ) 

1602 else: 

1603 cols = (self._index[sub_key] for sub_key in key) 

1604 

1605 return ColumnCollection(cols).as_readonly() 

1606 else: 

1607 return self._index[key][1] 

1608 except KeyError as err: 

1609 if isinstance(err.args[0], int): 

1610 raise IndexError(err.args[0]) from err 

1611 else: 

1612 raise 

1613 

1614 def __getattr__(self, key: str) -> _COL_co: 

1615 try: 

1616 return self._index[key][1] 

1617 except KeyError as err: 

1618 raise AttributeError(key) from err 

1619 

1620 def __contains__(self, key: str) -> bool: 

1621 if key not in self._index: 

1622 if not isinstance(key, str): 

1623 raise exc.ArgumentError( 

1624 "__contains__ requires a string argument" 

1625 ) 

1626 return False 

1627 else: 

1628 return True 

1629 

1630 def compare(self, other: ColumnCollection[Any, Any]) -> bool: 

1631 """Compare this :class:`_expression.ColumnCollection` to another 

1632 based on the names of the keys""" 

1633 

1634 for l, r in zip_longest(self, other): 

1635 if l is not r: 

1636 return False 

1637 else: 

1638 return True 

1639 

1640 def __eq__(self, other: Any) -> bool: 

1641 return self.compare(other) 

1642 

1643 @overload 

1644 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ... 

1645 

1646 @overload 

1647 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ... 

1648 

1649 def get( 

1650 self, key: str, default: Optional[_COL] = None 

1651 ) -> Optional[Union[_COL_co, _COL]]: 

1652 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object 

1653 based on a string key name from this 

1654 :class:`_expression.ColumnCollection`.""" 

1655 

1656 if key in self._index: 

1657 return self._index[key][1] 

1658 else: 

1659 return default 

1660 

1661 def __str__(self) -> str: 

1662 return "%s(%s)" % ( 

1663 self.__class__.__name__, 

1664 ", ".join(str(c) for c in self), 

1665 ) 

1666 

1667 def __setitem__(self, key: str, value: Any) -> NoReturn: 

1668 raise NotImplementedError() 

1669 

1670 def __delitem__(self, key: str) -> NoReturn: 

1671 raise NotImplementedError() 

1672 

1673 def __setattr__(self, key: str, obj: Any) -> NoReturn: 

1674 raise NotImplementedError() 

1675 

1676 def clear(self) -> NoReturn: 

1677 """Dictionary clear() is not implemented for 

1678 :class:`_sql.ColumnCollection`.""" 

1679 raise NotImplementedError() 

1680 

1681 def remove(self, column: Any) -> None: 

1682 raise NotImplementedError() 

1683 

1684 def update(self, iter_: Any) -> NoReturn: 

1685 """Dictionary update() is not implemented for 

1686 :class:`_sql.ColumnCollection`.""" 

1687 raise NotImplementedError() 

1688 

1689 # https://github.com/python/mypy/issues/4266 

1690 __hash__ = None # type: ignore 

1691 

1692 def _populate_separate_keys( 

1693 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

1694 ) -> None: 

1695 """populate from an iterator of (key, column)""" 

1696 

1697 self._collection[:] = collection = [ 

1698 (k, c, _ColumnMetrics(self, c)) for k, c in iter_ 

1699 ] 

1700 self._colset.update(c._deannotate() for _, c, _ in collection) 

1701 self._index.update( 

1702 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)} 

1703 ) 

1704 self._index.update({k: (k, col) for k, col, _ in reversed(collection)}) 

1705 

1706 def add( 

1707 self, column: ColumnElement[Any], key: Optional[_COLKEY] = None 

1708 ) -> None: 

1709 """Add a column to this :class:`_sql.ColumnCollection`. 

1710 

1711 .. note:: 

1712 

1713 This method is **not normally used by user-facing code**, as the 

1714 :class:`_sql.ColumnCollection` is usually part of an existing 

1715 object such as a :class:`_schema.Table`. To add a 

1716 :class:`_schema.Column` to an existing :class:`_schema.Table` 

1717 object, use the :meth:`_schema.Table.append_column` method. 

1718 

1719 """ 

1720 colkey: _COLKEY 

1721 

1722 if key is None: 

1723 colkey = column.key # type: ignore 

1724 else: 

1725 colkey = key 

1726 

1727 l = len(self._collection) 

1728 

1729 # don't really know how this part is supposed to work w/ the 

1730 # covariant thing 

1731 

1732 _column = cast(_COL_co, column) 

1733 

1734 self._collection.append( 

1735 (colkey, _column, _ColumnMetrics(self, _column)) 

1736 ) 

1737 self._colset.add(_column._deannotate()) 

1738 self._index[l] = (colkey, _column) 

1739 if colkey not in self._index: 

1740 self._index[colkey] = (colkey, _column) 

1741 

1742 def __getstate__(self) -> Dict[str, Any]: 

1743 return { 

1744 "_collection": [(k, c) for k, c, _ in self._collection], 

1745 "_index": self._index, 

1746 } 

1747 

1748 def __setstate__(self, state: Dict[str, Any]) -> None: 

1749 object.__setattr__(self, "_index", state["_index"]) 

1750 object.__setattr__( 

1751 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

1752 ) 

1753 object.__setattr__( 

1754 self, 

1755 "_collection", 

1756 [ 

1757 (k, c, _ColumnMetrics(self, c)) 

1758 for (k, c) in state["_collection"] 

1759 ], 

1760 ) 

1761 object.__setattr__( 

1762 self, "_colset", {col for k, col, _ in self._collection} 

1763 ) 

1764 

1765 def contains_column(self, col: ColumnElement[Any]) -> bool: 

1766 """Checks if a column object exists in this collection""" 

1767 if col not in self._colset: 

1768 if isinstance(col, str): 

1769 raise exc.ArgumentError( 

1770 "contains_column cannot be used with string arguments. " 

1771 "Use ``col_name in table.c`` instead." 

1772 ) 

1773 return False 

1774 else: 

1775 return True 

1776 

1777 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: 

1778 """Return a "read only" form of this 

1779 :class:`_sql.ColumnCollection`.""" 

1780 

1781 return ReadOnlyColumnCollection(self) 

1782 

1783 def _init_proxy_index(self): 

1784 """populate the "proxy index", if empty. 

1785 

1786 proxy index is added in 2.0 to provide more efficient operation 

1787 for the corresponding_column() method. 

1788 

1789 For reasons of both time to construct new .c collections as well as 

1790 memory conservation for large numbers of large .c collections, the 

1791 proxy_index is only filled if corresponding_column() is called. once 

1792 filled it stays that way, and new _ColumnMetrics objects created after 

1793 that point will populate it with new data. Note this case would be 

1794 unusual, if not nonexistent, as it means a .c collection is being 

1795 mutated after corresponding_column() were used, however it is tested in 

1796 test/base/test_utils.py. 

1797 

1798 """ 

1799 pi = self._proxy_index 

1800 if pi: 

1801 return 

1802 

1803 for _, _, metrics in self._collection: 

1804 eps = metrics.column._expanded_proxy_set 

1805 

1806 for eps_col in eps: 

1807 pi[eps_col].add(metrics) 

1808 

1809 def corresponding_column( 

1810 self, column: _COL, require_embedded: bool = False 

1811 ) -> Optional[Union[_COL, _COL_co]]: 

1812 """Given a :class:`_expression.ColumnElement`, return the exported 

1813 :class:`_expression.ColumnElement` object from this 

1814 :class:`_expression.ColumnCollection` 

1815 which corresponds to that original :class:`_expression.ColumnElement` 

1816 via a common 

1817 ancestor column. 

1818 

1819 :param column: the target :class:`_expression.ColumnElement` 

1820 to be matched. 

1821 

1822 :param require_embedded: only return corresponding columns for 

1823 the given :class:`_expression.ColumnElement`, if the given 

1824 :class:`_expression.ColumnElement` 

1825 is actually present within a sub-element 

1826 of this :class:`_expression.Selectable`. 

1827 Normally the column will match if 

1828 it merely shares a common ancestor with one of the exported 

1829 columns of this :class:`_expression.Selectable`. 

1830 

1831 .. seealso:: 

1832 

1833 :meth:`_expression.Selectable.corresponding_column` 

1834 - invokes this method 

1835 against the collection returned by 

1836 :attr:`_expression.Selectable.exported_columns`. 

1837 

1838 .. versionchanged:: 1.4 the implementation for ``corresponding_column`` 

1839 was moved onto the :class:`_expression.ColumnCollection` itself. 

1840 

1841 """ 

1842 # TODO: cython candidate 

1843 

1844 # don't dig around if the column is locally present 

1845 if column in self._colset: 

1846 return column 

1847 

1848 selected_intersection, selected_metrics = None, None 

1849 target_set = column.proxy_set 

1850 

1851 pi = self._proxy_index 

1852 if not pi: 

1853 self._init_proxy_index() 

1854 

1855 for current_metrics in ( 

1856 mm for ts in target_set if ts in pi for mm in pi[ts] 

1857 ): 

1858 if not require_embedded or current_metrics.embedded(target_set): 

1859 if selected_metrics is None: 

1860 # no corresponding column yet, pick this one. 

1861 selected_metrics = current_metrics 

1862 continue 

1863 

1864 current_intersection = target_set.intersection( 

1865 current_metrics.column._expanded_proxy_set 

1866 ) 

1867 if selected_intersection is None: 

1868 selected_intersection = target_set.intersection( 

1869 selected_metrics.column._expanded_proxy_set 

1870 ) 

1871 

1872 if len(current_intersection) > len(selected_intersection): 

1873 # 'current' has a larger field of correspondence than 

1874 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x 

1875 # matches a1.c.x->table.c.x better than 

1876 # selectable.c.x->table.c.x does. 

1877 

1878 selected_metrics = current_metrics 

1879 selected_intersection = current_intersection 

1880 elif current_intersection == selected_intersection: 

1881 # they have the same field of correspondence. see 

1882 # which proxy_set has fewer columns in it, which 

1883 # indicates a closer relationship with the root 

1884 # column. Also take into account the "weight" 

1885 # attribute which CompoundSelect() uses to give 

1886 # higher precedence to columns based on vertical 

1887 # position in the compound statement, and discard 

1888 # columns that have no reference to the target 

1889 # column (also occurs with CompoundSelect) 

1890 

1891 selected_col_distance = sum( 

1892 [ 

1893 sc._annotations.get("weight", 1) 

1894 for sc in ( 

1895 selected_metrics.column._uncached_proxy_list() 

1896 ) 

1897 if sc.shares_lineage(column) 

1898 ], 

1899 ) 

1900 current_col_distance = sum( 

1901 [ 

1902 sc._annotations.get("weight", 1) 

1903 for sc in ( 

1904 current_metrics.column._uncached_proxy_list() 

1905 ) 

1906 if sc.shares_lineage(column) 

1907 ], 

1908 ) 

1909 if current_col_distance < selected_col_distance: 

1910 selected_metrics = current_metrics 

1911 selected_intersection = current_intersection 

1912 

1913 return selected_metrics.column if selected_metrics else None 

1914 

1915 

1916_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]") 

1917 

1918 

1919class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]): 

1920 """A :class:`_expression.ColumnCollection` 

1921 that maintains deduplicating behavior. 

1922 

1923 This is useful by schema level objects such as :class:`_schema.Table` and 

1924 :class:`.PrimaryKeyConstraint`. The collection includes more 

1925 sophisticated mutator methods as well to suit schema objects which 

1926 require mutable column collections. 

1927 

1928 .. versionadded:: 1.4 

1929 

1930 """ 

1931 

1932 def add( # type: ignore[override] 

1933 self, column: _NAMEDCOL, key: Optional[str] = None 

1934 ) -> None: 

1935 if key is not None and column.key != key: 

1936 raise exc.ArgumentError( 

1937 "DedupeColumnCollection requires columns be under " 

1938 "the same key as their .key" 

1939 ) 

1940 key = column.key 

1941 

1942 if key is None: 

1943 raise exc.ArgumentError( 

1944 "Can't add unnamed column to column collection" 

1945 ) 

1946 

1947 if key in self._index: 

1948 existing = self._index[key][1] 

1949 

1950 if existing is column: 

1951 return 

1952 

1953 self.replace(column) 

1954 

1955 # pop out memoized proxy_set as this 

1956 # operation may very well be occurring 

1957 # in a _make_proxy operation 

1958 util.memoized_property.reset(column, "proxy_set") 

1959 else: 

1960 self._append_new_column(key, column) 

1961 

1962 def _append_new_column(self, key: str, named_column: _NAMEDCOL) -> None: 

1963 l = len(self._collection) 

1964 self._collection.append( 

1965 (key, named_column, _ColumnMetrics(self, named_column)) 

1966 ) 

1967 self._colset.add(named_column._deannotate()) 

1968 self._index[l] = (key, named_column) 

1969 self._index[key] = (key, named_column) 

1970 

1971 def _populate_separate_keys( 

1972 self, iter_: Iterable[Tuple[str, _NAMEDCOL]] 

1973 ) -> None: 

1974 """populate from an iterator of (key, column)""" 

1975 cols = list(iter_) 

1976 

1977 replace_col = [] 

1978 for k, col in cols: 

1979 if col.key != k: 

1980 raise exc.ArgumentError( 

1981 "DedupeColumnCollection requires columns be under " 

1982 "the same key as their .key" 

1983 ) 

1984 if col.name in self._index and col.key != col.name: 

1985 replace_col.append(col) 

1986 elif col.key in self._index: 

1987 replace_col.append(col) 

1988 else: 

1989 self._index[k] = (k, col) 

1990 self._collection.append((k, col, _ColumnMetrics(self, col))) 

1991 self._colset.update(c._deannotate() for (k, c, _) in self._collection) 

1992 

1993 self._index.update( 

1994 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection) 

1995 ) 

1996 for col in replace_col: 

1997 self.replace(col) 

1998 

1999 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None: 

2000 self._populate_separate_keys((col.key, col) for col in iter_) 

2001 

2002 def remove(self, column: _NAMEDCOL) -> None: 

2003 if column not in self._colset: 

2004 raise ValueError( 

2005 "Can't remove column %r; column is not in this collection" 

2006 % column 

2007 ) 

2008 del self._index[column.key] 

2009 self._colset.remove(column) 

2010 self._collection[:] = [ 

2011 (k, c, metrics) 

2012 for (k, c, metrics) in self._collection 

2013 if c is not column 

2014 ] 

2015 for metrics in self._proxy_index.get(column, ()): 

2016 metrics.dispose(self) 

2017 

2018 self._index.update( 

2019 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2020 ) 

2021 # delete higher index 

2022 del self._index[len(self._collection)] 

2023 

2024 def replace( 

2025 self, 

2026 column: _NAMEDCOL, 

2027 extra_remove: Optional[Iterable[_NAMEDCOL]] = None, 

2028 ) -> None: 

2029 """add the given column to this collection, removing unaliased 

2030 versions of this column as well as existing columns with the 

2031 same key. 

2032 

2033 e.g.:: 

2034 

2035 t = Table('sometable', metadata, Column('col1', Integer)) 

2036 t.columns.replace(Column('col1', Integer, key='columnone')) 

2037 

2038 will remove the original 'col1' from the collection, and add 

2039 the new column under the name 'columnname'. 

2040 

2041 Used by schema.Column to override columns during table reflection. 

2042 

2043 """ 

2044 

2045 if extra_remove: 

2046 remove_col = set(extra_remove) 

2047 else: 

2048 remove_col = set() 

2049 # remove up to two columns based on matches of name as well as key 

2050 if column.name in self._index and column.key != column.name: 

2051 other = self._index[column.name][1] 

2052 if other.name == other.key: 

2053 remove_col.add(other) 

2054 

2055 if column.key in self._index: 

2056 remove_col.add(self._index[column.key][1]) 

2057 

2058 if not remove_col: 

2059 self._append_new_column(column.key, column) 

2060 return 

2061 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = [] 

2062 replaced = False 

2063 for k, col, metrics in self._collection: 

2064 if col in remove_col: 

2065 if not replaced: 

2066 replaced = True 

2067 new_cols.append( 

2068 (column.key, column, _ColumnMetrics(self, column)) 

2069 ) 

2070 else: 

2071 new_cols.append((k, col, metrics)) 

2072 

2073 if remove_col: 

2074 self._colset.difference_update(remove_col) 

2075 

2076 for rc in remove_col: 

2077 for metrics in self._proxy_index.get(rc, ()): 

2078 metrics.dispose(self) 

2079 

2080 if not replaced: 

2081 new_cols.append((column.key, column, _ColumnMetrics(self, column))) 

2082 

2083 self._colset.add(column._deannotate()) 

2084 self._collection[:] = new_cols 

2085 

2086 self._index.clear() 

2087 

2088 self._index.update( 

2089 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2090 ) 

2091 self._index.update({k: (k, col) for (k, col, _) in self._collection}) 

2092 

2093 

2094class ReadOnlyColumnCollection( 

2095 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co] 

2096): 

2097 __slots__ = ("_parent",) 

2098 

2099 def __init__(self, collection): 

2100 object.__setattr__(self, "_parent", collection) 

2101 object.__setattr__(self, "_colset", collection._colset) 

2102 object.__setattr__(self, "_index", collection._index) 

2103 object.__setattr__(self, "_collection", collection._collection) 

2104 object.__setattr__(self, "_proxy_index", collection._proxy_index) 

2105 

2106 def __getstate__(self): 

2107 return {"_parent": self._parent} 

2108 

2109 def __setstate__(self, state): 

2110 parent = state["_parent"] 

2111 self.__init__(parent) # type: ignore 

2112 

2113 def add(self, column: Any, key: Any = ...) -> Any: 

2114 self._readonly() 

2115 

2116 def extend(self, elements: Any) -> NoReturn: 

2117 self._readonly() 

2118 

2119 def remove(self, item: Any) -> NoReturn: 

2120 self._readonly() 

2121 

2122 

2123class ColumnSet(util.OrderedSet["ColumnClause[Any]"]): 

2124 def contains_column(self, col): 

2125 return col in self 

2126 

2127 def extend(self, cols): 

2128 for col in cols: 

2129 self.add(col) 

2130 

2131 def __eq__(self, other): 

2132 l = [] 

2133 for c in other: 

2134 for local in self: 

2135 if c.shares_lineage(local): 

2136 l.append(c == local) 

2137 return elements.and_(*l) 

2138 

2139 def __hash__(self): 

2140 return hash(tuple(x for x in self)) 

2141 

2142 

2143def _entity_namespace( 

2144 entity: Union[_HasEntityNamespace, ExternallyTraversible] 

2145) -> _EntityNamespace: 

2146 """Return the nearest .entity_namespace for the given entity. 

2147 

2148 If not immediately available, does an iterate to find a sub-element 

2149 that has one, if any. 

2150 

2151 """ 

2152 try: 

2153 return cast(_HasEntityNamespace, entity).entity_namespace 

2154 except AttributeError: 

2155 for elem in visitors.iterate(cast(ExternallyTraversible, entity)): 

2156 if _is_has_entity_namespace(elem): 

2157 return elem.entity_namespace 

2158 else: 

2159 raise 

2160 

2161 

2162def _entity_namespace_key( 

2163 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2164 key: str, 

2165 default: Union[SQLCoreOperations[Any], _NoArg] = NO_ARG, 

2166) -> SQLCoreOperations[Any]: 

2167 """Return an entry from an entity_namespace. 

2168 

2169 

2170 Raises :class:`_exc.InvalidRequestError` rather than attribute error 

2171 on not found. 

2172 

2173 """ 

2174 

2175 try: 

2176 ns = _entity_namespace(entity) 

2177 if default is not NO_ARG: 

2178 return getattr(ns, key, default) 

2179 else: 

2180 return getattr(ns, key) # type: ignore 

2181 except AttributeError as err: 

2182 raise exc.InvalidRequestError( 

2183 'Entity namespace for "%s" has no property "%s"' % (entity, key) 

2184 ) from err