Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/base.py: 50%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

836 statements  

1# sql/base.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Foundational utilities common to many sql modules. 

10 

11""" 

12 

13 

14from __future__ import annotations 

15 

16import collections 

17from enum import Enum 

18import itertools 

19from itertools import zip_longest 

20import operator 

21import re 

22from typing import Any 

23from typing import Callable 

24from typing import cast 

25from typing import Dict 

26from typing import FrozenSet 

27from typing import Generic 

28from typing import Iterable 

29from typing import Iterator 

30from typing import List 

31from typing import Mapping 

32from typing import MutableMapping 

33from typing import NamedTuple 

34from typing import NoReturn 

35from typing import Optional 

36from typing import overload 

37from typing import Protocol 

38from typing import Sequence 

39from typing import Set 

40from typing import Tuple 

41from typing import Type 

42from typing import TYPE_CHECKING 

43from typing import TypeVar 

44from typing import Union 

45 

46from . import roles 

47from . import visitors 

48from .cache_key import HasCacheKey # noqa 

49from .cache_key import MemoizedHasCacheKey # noqa 

50from .traversals import HasCopyInternals # noqa 

51from .visitors import ClauseVisitor 

52from .visitors import ExtendedInternalTraversal 

53from .visitors import ExternallyTraversible 

54from .visitors import InternalTraversal 

55from .. import event 

56from .. import exc 

57from .. import util 

58from ..util import HasMemoized as HasMemoized 

59from ..util import hybridmethod 

60from ..util.typing import Self 

61from ..util.typing import TypeGuard 

62 

63if TYPE_CHECKING: 

64 from . import coercions 

65 from . import elements 

66 from . import type_api 

67 from ._orm_types import DMLStrategyArgument 

68 from ._orm_types import SynchronizeSessionArgument 

69 from ._typing import _CLE 

70 from .elements import BindParameter 

71 from .elements import ClauseList 

72 from .elements import ColumnClause # noqa 

73 from .elements import ColumnElement 

74 from .elements import NamedColumn 

75 from .elements import SQLCoreOperations 

76 from .elements import TextClause 

77 from .schema import Column 

78 from .schema import DefaultGenerator 

79 from .selectable import _JoinTargetElement 

80 from .selectable import _SelectIterable 

81 from .selectable import FromClause 

82 from ..engine import Connection 

83 from ..engine import CursorResult 

84 from ..engine.interfaces import _CoreMultiExecuteParams 

85 from ..engine.interfaces import _ExecuteOptions 

86 from ..engine.interfaces import _ImmutableExecuteOptions 

87 from ..engine.interfaces import CacheStats 

88 from ..engine.interfaces import Compiled 

89 from ..engine.interfaces import CompiledCacheType 

90 from ..engine.interfaces import CoreExecuteOptionsParameter 

91 from ..engine.interfaces import Dialect 

92 from ..engine.interfaces import IsolationLevel 

93 from ..engine.interfaces import SchemaTranslateMapType 

94 from ..event import dispatcher 

95 

96if not TYPE_CHECKING: 

97 coercions = None # noqa 

98 elements = None # noqa 

99 type_api = None # noqa 

100 

101 

102class _NoArg(Enum): 

103 NO_ARG = 0 

104 

105 def __repr__(self): 

106 return f"_NoArg.{self.name}" 

107 

108 

109NO_ARG = _NoArg.NO_ARG 

110 

111 

112class _NoneName(Enum): 

113 NONE_NAME = 0 

114 """indicate a 'deferred' name that was ultimately the value None.""" 

115 

116 

117_NONE_NAME = _NoneName.NONE_NAME 

118 

119_T = TypeVar("_T", bound=Any) 

120 

121_Fn = TypeVar("_Fn", bound=Callable[..., Any]) 

122 

123_AmbiguousTableNameMap = MutableMapping[str, str] 

124 

125 

126class _DefaultDescriptionTuple(NamedTuple): 

127 arg: Any 

128 is_scalar: Optional[bool] 

129 is_callable: Optional[bool] 

130 is_sentinel: Optional[bool] 

131 

132 @classmethod 

133 def _from_column_default( 

134 cls, default: Optional[DefaultGenerator] 

135 ) -> _DefaultDescriptionTuple: 

136 return ( 

137 _DefaultDescriptionTuple( 

138 default.arg, # type: ignore 

139 default.is_scalar, 

140 default.is_callable, 

141 default.is_sentinel, 

142 ) 

143 if default 

144 and ( 

145 default.has_arg 

146 or (not default.for_update and default.is_sentinel) 

147 ) 

148 else _DefaultDescriptionTuple(None, None, None, None) 

149 ) 

150 

151 

152_never_select_column = operator.attrgetter("_omit_from_statements") 

153 

154 

155class _EntityNamespace(Protocol): 

156 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ... 

157 

158 

159class _HasEntityNamespace(Protocol): 

160 @util.ro_non_memoized_property 

161 def entity_namespace(self) -> _EntityNamespace: ... 

162 

163 

164def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]: 

165 return hasattr(element, "entity_namespace") 

166 

167 

168# Remove when https://github.com/python/mypy/issues/14640 will be fixed 

169_Self = TypeVar("_Self", bound=Any) 

170 

171 

172class Immutable: 

173 """mark a ClauseElement as 'immutable' when expressions are cloned. 

174 

175 "immutable" objects refers to the "mutability" of an object in the 

176 context of SQL DQL and DML generation. Such as, in DQL, one can 

177 compose a SELECT or subquery of varied forms, but one cannot modify 

178 the structure of a specific table or column within DQL. 

179 :class:`.Immutable` is mostly intended to follow this concept, and as 

180 such the primary "immutable" objects are :class:`.ColumnClause`, 

181 :class:`.Column`, :class:`.TableClause`, :class:`.Table`. 

182 

183 """ 

184 

185 __slots__ = () 

186 

187 _is_immutable = True 

188 

189 def unique_params(self, *optionaldict, **kwargs): 

190 raise NotImplementedError("Immutable objects do not support copying") 

191 

192 def params(self, *optionaldict, **kwargs): 

193 raise NotImplementedError("Immutable objects do not support copying") 

194 

195 def _clone(self: _Self, **kw: Any) -> _Self: 

196 return self 

197 

198 def _copy_internals( 

199 self, *, omit_attrs: Iterable[str] = (), **kw: Any 

200 ) -> None: 

201 pass 

202 

203 

204class SingletonConstant(Immutable): 

205 """Represent SQL constants like NULL, TRUE, FALSE""" 

206 

207 _is_singleton_constant = True 

208 

209 _singleton: SingletonConstant 

210 

211 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T: 

212 return cast(_T, cls._singleton) 

213 

214 @util.non_memoized_property 

215 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]: 

216 raise NotImplementedError() 

217 

218 @classmethod 

219 def _create_singleton(cls): 

220 obj = object.__new__(cls) 

221 obj.__init__() # type: ignore 

222 

223 # for a long time this was an empty frozenset, meaning 

224 # a SingletonConstant would never be a "corresponding column" in 

225 # a statement. This referred to #6259. However, in #7154 we see 

226 # that we do in fact need "correspondence" to work when matching cols 

227 # in result sets, so the non-correspondence was moved to a more 

228 # specific level when we are actually adapting expressions for SQL 

229 # render only. 

230 obj.proxy_set = frozenset([obj]) 

231 cls._singleton = obj 

232 

233 

234def _from_objects( 

235 *elements: Union[ 

236 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement 

237 ] 

238) -> Iterator[FromClause]: 

239 return itertools.chain.from_iterable( 

240 [element._from_objects for element in elements] 

241 ) 

242 

243 

244def _select_iterables( 

245 elements: Iterable[roles.ColumnsClauseRole], 

246) -> _SelectIterable: 

247 """expand tables into individual columns in the 

248 given list of column expressions. 

249 

250 """ 

251 return itertools.chain.from_iterable( 

252 [c._select_iterable for c in elements] 

253 ) 

254 

255 

256_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType") 

257 

258 

259class _GenerativeType(Protocol): 

260 def _generate(self) -> Self: ... 

261 

262 

263def _generative(fn: _Fn) -> _Fn: 

264 """non-caching _generative() decorator. 

265 

266 This is basically the legacy decorator that copies the object and 

267 runs a method on the new copy. 

268 

269 """ 

270 

271 @util.decorator 

272 def _generative( 

273 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any 

274 ) -> _SelfGenerativeType: 

275 """Mark a method as generative.""" 

276 

277 self = self._generate() 

278 x = fn(self, *args, **kw) 

279 assert x is self, "generative methods must return self" 

280 return self 

281 

282 decorated = _generative(fn) 

283 decorated.non_generative = fn # type: ignore 

284 return decorated 

285 

286 

287def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]: 

288 msgs = kw.pop("msgs", {}) 

289 

290 defaults = kw.pop("defaults", {}) 

291 

292 getters = [ 

293 (name, operator.attrgetter(name), defaults.get(name, None)) 

294 for name in names 

295 ] 

296 

297 @util.decorator 

298 def check(fn, *args, **kw): 

299 # make pylance happy by not including "self" in the argument 

300 # list 

301 self = args[0] 

302 args = args[1:] 

303 for name, getter, default_ in getters: 

304 if getter(self) is not default_: 

305 msg = msgs.get( 

306 name, 

307 "Method %s() has already been invoked on this %s construct" 

308 % (fn.__name__, self.__class__), 

309 ) 

310 raise exc.InvalidRequestError(msg) 

311 return fn(self, *args, **kw) 

312 

313 return check 

314 

315 

316def _clone(element, **kw): 

317 return element._clone(**kw) 

318 

319 

320def _expand_cloned( 

321 elements: Iterable[_CLE], 

322) -> Iterable[_CLE]: 

323 """expand the given set of ClauseElements to be the set of all 'cloned' 

324 predecessors. 

325 

326 """ 

327 # TODO: cython candidate 

328 return itertools.chain(*[x._cloned_set for x in elements]) 

329 

330 

331def _de_clone( 

332 elements: Iterable[_CLE], 

333) -> Iterable[_CLE]: 

334 for x in elements: 

335 while x._is_clone_of is not None: 

336 x = x._is_clone_of 

337 yield x 

338 

339 

340def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

341 """return the intersection of sets a and b, counting 

342 any overlap between 'cloned' predecessors. 

343 

344 The returned set is in terms of the entities present within 'a'. 

345 

346 """ 

347 all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b)) 

348 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)} 

349 

350 

351def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

352 all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b)) 

353 return { 

354 elem for elem in a if not all_overlap.intersection(elem._cloned_set) 

355 } 

356 

357 

358class _DialectArgView(MutableMapping[str, Any]): 

359 """A dictionary view of dialect-level arguments in the form 

360 <dialectname>_<argument_name>. 

361 

362 """ 

363 

364 def __init__(self, obj): 

365 self.obj = obj 

366 

367 def _key(self, key): 

368 try: 

369 dialect, value_key = key.split("_", 1) 

370 except ValueError as err: 

371 raise KeyError(key) from err 

372 else: 

373 return dialect, value_key 

374 

375 def __getitem__(self, key): 

376 dialect, value_key = self._key(key) 

377 

378 try: 

379 opt = self.obj.dialect_options[dialect] 

380 except exc.NoSuchModuleError as err: 

381 raise KeyError(key) from err 

382 else: 

383 return opt[value_key] 

384 

385 def __setitem__(self, key, value): 

386 try: 

387 dialect, value_key = self._key(key) 

388 except KeyError as err: 

389 raise exc.ArgumentError( 

390 "Keys must be of the form <dialectname>_<argname>" 

391 ) from err 

392 else: 

393 self.obj.dialect_options[dialect][value_key] = value 

394 

395 def __delitem__(self, key): 

396 dialect, value_key = self._key(key) 

397 del self.obj.dialect_options[dialect][value_key] 

398 

399 def __len__(self): 

400 return sum( 

401 len(args._non_defaults) 

402 for args in self.obj.dialect_options.values() 

403 ) 

404 

405 def __iter__(self): 

406 return ( 

407 "%s_%s" % (dialect_name, value_name) 

408 for dialect_name in self.obj.dialect_options 

409 for value_name in self.obj.dialect_options[ 

410 dialect_name 

411 ]._non_defaults 

412 ) 

413 

414 

415class _DialectArgDict(MutableMapping[str, Any]): 

416 """A dictionary view of dialect-level arguments for a specific 

417 dialect. 

418 

419 Maintains a separate collection of user-specified arguments 

420 and dialect-specified default arguments. 

421 

422 """ 

423 

424 def __init__(self): 

425 self._non_defaults = {} 

426 self._defaults = {} 

427 

428 def __len__(self): 

429 return len(set(self._non_defaults).union(self._defaults)) 

430 

431 def __iter__(self): 

432 return iter(set(self._non_defaults).union(self._defaults)) 

433 

434 def __getitem__(self, key): 

435 if key in self._non_defaults: 

436 return self._non_defaults[key] 

437 else: 

438 return self._defaults[key] 

439 

440 def __setitem__(self, key, value): 

441 self._non_defaults[key] = value 

442 

443 def __delitem__(self, key): 

444 del self._non_defaults[key] 

445 

446 

447@util.preload_module("sqlalchemy.dialects") 

448def _kw_reg_for_dialect(dialect_name): 

449 dialect_cls = util.preloaded.dialects.registry.load(dialect_name) 

450 if dialect_cls.construct_arguments is None: 

451 return None 

452 return dict(dialect_cls.construct_arguments) 

453 

454 

455class DialectKWArgs: 

456 """Establish the ability for a class to have dialect-specific arguments 

457 with defaults and constructor validation. 

458 

459 The :class:`.DialectKWArgs` interacts with the 

460 :attr:`.DefaultDialect.construct_arguments` present on a dialect. 

461 

462 .. seealso:: 

463 

464 :attr:`.DefaultDialect.construct_arguments` 

465 

466 """ 

467 

468 __slots__ = () 

469 

470 _dialect_kwargs_traverse_internals = [ 

471 ("dialect_options", InternalTraversal.dp_dialect_options) 

472 ] 

473 

474 @classmethod 

475 def argument_for(cls, dialect_name, argument_name, default): 

476 """Add a new kind of dialect-specific keyword argument for this class. 

477 

478 E.g.:: 

479 

480 Index.argument_for("mydialect", "length", None) 

481 

482 some_index = Index('a', 'b', mydialect_length=5) 

483 

484 The :meth:`.DialectKWArgs.argument_for` method is a per-argument 

485 way adding extra arguments to the 

486 :attr:`.DefaultDialect.construct_arguments` dictionary. This 

487 dictionary provides a list of argument names accepted by various 

488 schema-level constructs on behalf of a dialect. 

489 

490 New dialects should typically specify this dictionary all at once as a 

491 data member of the dialect class. The use case for ad-hoc addition of 

492 argument names is typically for end-user code that is also using 

493 a custom compilation scheme which consumes the additional arguments. 

494 

495 :param dialect_name: name of a dialect. The dialect must be 

496 locatable, else a :class:`.NoSuchModuleError` is raised. The 

497 dialect must also include an existing 

498 :attr:`.DefaultDialect.construct_arguments` collection, indicating 

499 that it participates in the keyword-argument validation and default 

500 system, else :class:`.ArgumentError` is raised. If the dialect does 

501 not include this collection, then any keyword argument can be 

502 specified on behalf of this dialect already. All dialects packaged 

503 within SQLAlchemy include this collection, however for third party 

504 dialects, support may vary. 

505 

506 :param argument_name: name of the parameter. 

507 

508 :param default: default value of the parameter. 

509 

510 """ 

511 

512 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name] 

513 if construct_arg_dictionary is None: 

514 raise exc.ArgumentError( 

515 "Dialect '%s' does have keyword-argument " 

516 "validation and defaults enabled configured" % dialect_name 

517 ) 

518 if cls not in construct_arg_dictionary: 

519 construct_arg_dictionary[cls] = {} 

520 construct_arg_dictionary[cls][argument_name] = default 

521 

522 @util.memoized_property 

523 def dialect_kwargs(self): 

524 """A collection of keyword arguments specified as dialect-specific 

525 options to this construct. 

526 

527 The arguments are present here in their original ``<dialect>_<kwarg>`` 

528 format. Only arguments that were actually passed are included; 

529 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which 

530 contains all options known by this dialect including defaults. 

531 

532 The collection is also writable; keys are accepted of the 

533 form ``<dialect>_<kwarg>`` where the value will be assembled 

534 into the list of options. 

535 

536 .. seealso:: 

537 

538 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form 

539 

540 """ 

541 return _DialectArgView(self) 

542 

543 @property 

544 def kwargs(self): 

545 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`.""" 

546 return self.dialect_kwargs 

547 

548 _kw_registry = util.PopulateDict(_kw_reg_for_dialect) 

549 

550 def _kw_reg_for_dialect_cls(self, dialect_name): 

551 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name] 

552 d = _DialectArgDict() 

553 

554 if construct_arg_dictionary is None: 

555 d._defaults.update({"*": None}) 

556 else: 

557 for cls in reversed(self.__class__.__mro__): 

558 if cls in construct_arg_dictionary: 

559 d._defaults.update(construct_arg_dictionary[cls]) 

560 return d 

561 

562 @util.memoized_property 

563 def dialect_options(self): 

564 """A collection of keyword arguments specified as dialect-specific 

565 options to this construct. 

566 

567 This is a two-level nested registry, keyed to ``<dialect_name>`` 

568 and ``<argument_name>``. For example, the ``postgresql_where`` 

569 argument would be locatable as:: 

570 

571 arg = my_object.dialect_options['postgresql']['where'] 

572 

573 .. versionadded:: 0.9.2 

574 

575 .. seealso:: 

576 

577 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form 

578 

579 """ 

580 

581 return util.PopulateDict( 

582 util.portable_instancemethod(self._kw_reg_for_dialect_cls) 

583 ) 

584 

585 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None: 

586 # validate remaining kwargs that they all specify DB prefixes 

587 

588 if not kwargs: 

589 return 

590 

591 for k in kwargs: 

592 m = re.match("^(.+?)_(.+)$", k) 

593 if not m: 

594 raise TypeError( 

595 "Additional arguments should be " 

596 "named <dialectname>_<argument>, got '%s'" % k 

597 ) 

598 dialect_name, arg_name = m.group(1, 2) 

599 

600 try: 

601 construct_arg_dictionary = self.dialect_options[dialect_name] 

602 except exc.NoSuchModuleError: 

603 util.warn( 

604 "Can't validate argument %r; can't " 

605 "locate any SQLAlchemy dialect named %r" 

606 % (k, dialect_name) 

607 ) 

608 self.dialect_options[dialect_name] = d = _DialectArgDict() 

609 d._defaults.update({"*": None}) 

610 d._non_defaults[arg_name] = kwargs[k] 

611 else: 

612 if ( 

613 "*" not in construct_arg_dictionary 

614 and arg_name not in construct_arg_dictionary 

615 ): 

616 raise exc.ArgumentError( 

617 "Argument %r is not accepted by " 

618 "dialect %r on behalf of %r" 

619 % (k, dialect_name, self.__class__) 

620 ) 

621 else: 

622 construct_arg_dictionary[arg_name] = kwargs[k] 

623 

624 

625class CompileState: 

626 """Produces additional object state necessary for a statement to be 

627 compiled. 

628 

629 the :class:`.CompileState` class is at the base of classes that assemble 

630 state for a particular statement object that is then used by the 

631 compiler. This process is essentially an extension of the process that 

632 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis 

633 on converting raw user intent into more organized structures rather than 

634 producing string output. The top-level :class:`.CompileState` for the 

635 statement being executed is also accessible when the execution context 

636 works with invoking the statement and collecting results. 

637 

638 The production of :class:`.CompileState` is specific to the compiler, such 

639 as within the :meth:`.SQLCompiler.visit_insert`, 

640 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also 

641 responsible for associating the :class:`.CompileState` with the 

642 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement, 

643 i.e. the outermost SQL statement that's actually being executed. 

644 There can be other :class:`.CompileState` objects that are not the 

645 toplevel, such as when a SELECT subquery or CTE-nested 

646 INSERT/UPDATE/DELETE is generated. 

647 

648 .. versionadded:: 1.4 

649 

650 """ 

651 

652 __slots__ = ("statement", "_ambiguous_table_name_map") 

653 

654 plugins: Dict[Tuple[str, str], Type[CompileState]] = {} 

655 

656 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] 

657 

658 @classmethod 

659 def create_for_statement(cls, statement, compiler, **kw): 

660 # factory construction. 

661 

662 if statement._propagate_attrs: 

663 plugin_name = statement._propagate_attrs.get( 

664 "compile_state_plugin", "default" 

665 ) 

666 klass = cls.plugins.get( 

667 (plugin_name, statement._effective_plugin_target), None 

668 ) 

669 if klass is None: 

670 klass = cls.plugins[ 

671 ("default", statement._effective_plugin_target) 

672 ] 

673 

674 else: 

675 klass = cls.plugins[ 

676 ("default", statement._effective_plugin_target) 

677 ] 

678 

679 if klass is cls: 

680 return cls(statement, compiler, **kw) 

681 else: 

682 return klass.create_for_statement(statement, compiler, **kw) 

683 

684 def __init__(self, statement, compiler, **kw): 

685 self.statement = statement 

686 

687 @classmethod 

688 def get_plugin_class( 

689 cls, statement: Executable 

690 ) -> Optional[Type[CompileState]]: 

691 plugin_name = statement._propagate_attrs.get( 

692 "compile_state_plugin", None 

693 ) 

694 

695 if plugin_name: 

696 key = (plugin_name, statement._effective_plugin_target) 

697 if key in cls.plugins: 

698 return cls.plugins[key] 

699 

700 # there's no case where we call upon get_plugin_class() and want 

701 # to get None back, there should always be a default. return that 

702 # if there was no plugin-specific class (e.g. Insert with "orm" 

703 # plugin) 

704 try: 

705 return cls.plugins[("default", statement._effective_plugin_target)] 

706 except KeyError: 

707 return None 

708 

709 @classmethod 

710 def _get_plugin_class_for_plugin( 

711 cls, statement: Executable, plugin_name: str 

712 ) -> Optional[Type[CompileState]]: 

713 try: 

714 return cls.plugins[ 

715 (plugin_name, statement._effective_plugin_target) 

716 ] 

717 except KeyError: 

718 return None 

719 

720 @classmethod 

721 def plugin_for( 

722 cls, plugin_name: str, visit_name: str 

723 ) -> Callable[[_Fn], _Fn]: 

724 def decorate(cls_to_decorate): 

725 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate 

726 return cls_to_decorate 

727 

728 return decorate 

729 

730 

731class Generative(HasMemoized): 

732 """Provide a method-chaining pattern in conjunction with the 

733 @_generative decorator.""" 

734 

735 def _generate(self) -> Self: 

736 skip = self._memoized_keys 

737 cls = self.__class__ 

738 s = cls.__new__(cls) 

739 if skip: 

740 # ensure this iteration remains atomic 

741 s.__dict__ = { 

742 k: v for k, v in self.__dict__.copy().items() if k not in skip 

743 } 

744 else: 

745 s.__dict__ = self.__dict__.copy() 

746 return s 

747 

748 

749class InPlaceGenerative(HasMemoized): 

750 """Provide a method-chaining pattern in conjunction with the 

751 @_generative decorator that mutates in place.""" 

752 

753 __slots__ = () 

754 

755 def _generate(self): 

756 skip = self._memoized_keys 

757 # note __dict__ needs to be in __slots__ if this is used 

758 for k in skip: 

759 self.__dict__.pop(k, None) 

760 return self 

761 

762 

763class HasCompileState(Generative): 

764 """A class that has a :class:`.CompileState` associated with it.""" 

765 

766 _compile_state_plugin: Optional[Type[CompileState]] = None 

767 

768 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT 

769 

770 _compile_state_factory = CompileState.create_for_statement 

771 

772 

773class _MetaOptions(type): 

774 """metaclass for the Options class. 

775 

776 This metaclass is actually necessary despite the availability of the 

777 ``__init_subclass__()`` hook as this type also provides custom class-level 

778 behavior for the ``__add__()`` method. 

779 

780 """ 

781 

782 _cache_attrs: Tuple[str, ...] 

783 

784 def __add__(self, other): 

785 o1 = self() 

786 

787 if set(other).difference(self._cache_attrs): 

788 raise TypeError( 

789 "dictionary contains attributes not covered by " 

790 "Options class %s: %r" 

791 % (self, set(other).difference(self._cache_attrs)) 

792 ) 

793 

794 o1.__dict__.update(other) 

795 return o1 

796 

797 if TYPE_CHECKING: 

798 

799 def __getattr__(self, key: str) -> Any: ... 

800 

801 def __setattr__(self, key: str, value: Any) -> None: ... 

802 

803 def __delattr__(self, key: str) -> None: ... 

804 

805 

806class Options(metaclass=_MetaOptions): 

807 """A cacheable option dictionary with defaults.""" 

808 

809 __slots__ = () 

810 

811 _cache_attrs: Tuple[str, ...] 

812 

813 def __init_subclass__(cls) -> None: 

814 dict_ = cls.__dict__ 

815 cls._cache_attrs = tuple( 

816 sorted( 

817 d 

818 for d in dict_ 

819 if not d.startswith("__") 

820 and d not in ("_cache_key_traversal",) 

821 ) 

822 ) 

823 super().__init_subclass__() 

824 

825 def __init__(self, **kw): 

826 self.__dict__.update(kw) 

827 

828 def __add__(self, other): 

829 o1 = self.__class__.__new__(self.__class__) 

830 o1.__dict__.update(self.__dict__) 

831 

832 if set(other).difference(self._cache_attrs): 

833 raise TypeError( 

834 "dictionary contains attributes not covered by " 

835 "Options class %s: %r" 

836 % (self, set(other).difference(self._cache_attrs)) 

837 ) 

838 

839 o1.__dict__.update(other) 

840 return o1 

841 

842 def __eq__(self, other): 

843 # TODO: very inefficient. This is used only in test suites 

844 # right now. 

845 for a, b in zip_longest(self._cache_attrs, other._cache_attrs): 

846 if getattr(self, a) != getattr(other, b): 

847 return False 

848 return True 

849 

850 def __repr__(self): 

851 # TODO: fairly inefficient, used only in debugging right now. 

852 

853 return "%s(%s)" % ( 

854 self.__class__.__name__, 

855 ", ".join( 

856 "%s=%r" % (k, self.__dict__[k]) 

857 for k in self._cache_attrs 

858 if k in self.__dict__ 

859 ), 

860 ) 

861 

862 @classmethod 

863 def isinstance(cls, klass: Type[Any]) -> bool: 

864 return issubclass(cls, klass) 

865 

866 @hybridmethod 

867 def add_to_element(self, name, value): 

868 return self + {name: getattr(self, name) + value} 

869 

870 @hybridmethod 

871 def _state_dict_inst(self) -> Mapping[str, Any]: 

872 return self.__dict__ 

873 

874 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT 

875 

876 @_state_dict_inst.classlevel 

877 def _state_dict(cls) -> Mapping[str, Any]: 

878 return cls._state_dict_const 

879 

880 @classmethod 

881 def safe_merge(cls, other): 

882 d = other._state_dict() 

883 

884 # only support a merge with another object of our class 

885 # and which does not have attrs that we don't. otherwise 

886 # we risk having state that might not be part of our cache 

887 # key strategy 

888 

889 if ( 

890 cls is not other.__class__ 

891 and other._cache_attrs 

892 and set(other._cache_attrs).difference(cls._cache_attrs) 

893 ): 

894 raise TypeError( 

895 "other element %r is not empty, is not of type %s, " 

896 "and contains attributes not covered here %r" 

897 % ( 

898 other, 

899 cls, 

900 set(other._cache_attrs).difference(cls._cache_attrs), 

901 ) 

902 ) 

903 return cls + d 

904 

905 @classmethod 

906 def from_execution_options( 

907 cls, key, attrs, exec_options, statement_exec_options 

908 ): 

909 """process Options argument in terms of execution options. 

910 

911 

912 e.g.:: 

913 

914 ( 

915 load_options, 

916 execution_options, 

917 ) = QueryContext.default_load_options.from_execution_options( 

918 "_sa_orm_load_options", 

919 { 

920 "populate_existing", 

921 "autoflush", 

922 "yield_per" 

923 }, 

924 execution_options, 

925 statement._execution_options, 

926 ) 

927 

928 get back the Options and refresh "_sa_orm_load_options" in the 

929 exec options dict w/ the Options as well 

930 

931 """ 

932 

933 # common case is that no options we are looking for are 

934 # in either dictionary, so cancel for that first 

935 check_argnames = attrs.intersection( 

936 set(exec_options).union(statement_exec_options) 

937 ) 

938 

939 existing_options = exec_options.get(key, cls) 

940 

941 if check_argnames: 

942 result = {} 

943 for argname in check_argnames: 

944 local = "_" + argname 

945 if argname in exec_options: 

946 result[local] = exec_options[argname] 

947 elif argname in statement_exec_options: 

948 result[local] = statement_exec_options[argname] 

949 

950 new_options = existing_options + result 

951 exec_options = util.immutabledict().merge_with( 

952 exec_options, {key: new_options} 

953 ) 

954 return new_options, exec_options 

955 

956 else: 

957 return existing_options, exec_options 

958 

959 if TYPE_CHECKING: 

960 

961 def __getattr__(self, key: str) -> Any: ... 

962 

963 def __setattr__(self, key: str, value: Any) -> None: ... 

964 

965 def __delattr__(self, key: str) -> None: ... 

966 

967 

968class CacheableOptions(Options, HasCacheKey): 

969 __slots__ = () 

970 

971 @hybridmethod 

972 def _gen_cache_key_inst(self, anon_map, bindparams): 

973 return HasCacheKey._gen_cache_key(self, anon_map, bindparams) 

974 

975 @_gen_cache_key_inst.classlevel 

976 def _gen_cache_key(cls, anon_map, bindparams): 

977 return (cls, ()) 

978 

979 @hybridmethod 

980 def _generate_cache_key(self): 

981 return HasCacheKey._generate_cache_key_for_object(self) 

982 

983 

984class ExecutableOption(HasCopyInternals): 

985 __slots__ = () 

986 

987 _annotations = util.EMPTY_DICT 

988 

989 __visit_name__ = "executable_option" 

990 

991 _is_has_cache_key = False 

992 

993 _is_core = True 

994 

995 def _clone(self, **kw): 

996 """Create a shallow copy of this ExecutableOption.""" 

997 c = self.__class__.__new__(self.__class__) 

998 c.__dict__ = dict(self.__dict__) # type: ignore 

999 return c 

1000 

1001 

1002class Executable(roles.StatementRole): 

1003 """Mark a :class:`_expression.ClauseElement` as supporting execution. 

1004 

1005 :class:`.Executable` is a superclass for all "statement" types 

1006 of objects, including :func:`select`, :func:`delete`, :func:`update`, 

1007 :func:`insert`, :func:`text`. 

1008 

1009 """ 

1010 

1011 supports_execution: bool = True 

1012 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT 

1013 _is_default_generator = False 

1014 _with_options: Tuple[ExecutableOption, ...] = () 

1015 _with_context_options: Tuple[ 

1016 Tuple[Callable[[CompileState], None], Any], ... 

1017 ] = () 

1018 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]] 

1019 

1020 _executable_traverse_internals = [ 

1021 ("_with_options", InternalTraversal.dp_executable_options), 

1022 ( 

1023 "_with_context_options", 

1024 ExtendedInternalTraversal.dp_with_context_options, 

1025 ), 

1026 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs), 

1027 ] 

1028 

1029 is_select = False 

1030 is_from_statement = False 

1031 is_update = False 

1032 is_insert = False 

1033 is_text = False 

1034 is_delete = False 

1035 is_dml = False 

1036 

1037 if TYPE_CHECKING: 

1038 __visit_name__: str 

1039 

1040 def _compile_w_cache( 

1041 self, 

1042 dialect: Dialect, 

1043 *, 

1044 compiled_cache: Optional[CompiledCacheType], 

1045 column_keys: List[str], 

1046 for_executemany: bool = False, 

1047 schema_translate_map: Optional[SchemaTranslateMapType] = None, 

1048 **kw: Any, 

1049 ) -> Tuple[ 

1050 Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats 

1051 ]: ... 

1052 

1053 def _execute_on_connection( 

1054 self, 

1055 connection: Connection, 

1056 distilled_params: _CoreMultiExecuteParams, 

1057 execution_options: CoreExecuteOptionsParameter, 

1058 ) -> CursorResult[Any]: ... 

1059 

1060 def _execute_on_scalar( 

1061 self, 

1062 connection: Connection, 

1063 distilled_params: _CoreMultiExecuteParams, 

1064 execution_options: CoreExecuteOptionsParameter, 

1065 ) -> Any: ... 

1066 

1067 @util.ro_non_memoized_property 

1068 def _all_selected_columns(self): 

1069 raise NotImplementedError() 

1070 

1071 @property 

1072 def _effective_plugin_target(self) -> str: 

1073 return self.__visit_name__ 

1074 

1075 @_generative 

1076 def options(self, *options: ExecutableOption) -> Self: 

1077 """Apply options to this statement. 

1078 

1079 In the general sense, options are any kind of Python object 

1080 that can be interpreted by the SQL compiler for the statement. 

1081 These options can be consumed by specific dialects or specific kinds 

1082 of compilers. 

1083 

1084 The most commonly known kind of option are the ORM level options 

1085 that apply "eager load" and other loading behaviors to an ORM 

1086 query. However, options can theoretically be used for many other 

1087 purposes. 

1088 

1089 For background on specific kinds of options for specific kinds of 

1090 statements, refer to the documentation for those option objects. 

1091 

1092 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to 

1093 Core statement objects towards the goal of allowing unified 

1094 Core / ORM querying capabilities. 

1095 

1096 .. seealso:: 

1097 

1098 :ref:`loading_columns` - refers to options specific to the usage 

1099 of ORM queries 

1100 

1101 :ref:`relationship_loader_options` - refers to options specific 

1102 to the usage of ORM queries 

1103 

1104 """ 

1105 self._with_options += tuple( 

1106 coercions.expect(roles.ExecutableOptionRole, opt) 

1107 for opt in options 

1108 ) 

1109 return self 

1110 

1111 @_generative 

1112 def _set_compile_options(self, compile_options: CacheableOptions) -> Self: 

1113 """Assign the compile options to a new value. 

1114 

1115 :param compile_options: appropriate CacheableOptions structure 

1116 

1117 """ 

1118 

1119 self._compile_options = compile_options 

1120 return self 

1121 

1122 @_generative 

1123 def _update_compile_options(self, options: CacheableOptions) -> Self: 

1124 """update the _compile_options with new keys.""" 

1125 

1126 assert self._compile_options is not None 

1127 self._compile_options += options 

1128 return self 

1129 

1130 @_generative 

1131 def _add_context_option( 

1132 self, 

1133 callable_: Callable[[CompileState], None], 

1134 cache_args: Any, 

1135 ) -> Self: 

1136 """Add a context option to this statement. 

1137 

1138 These are callable functions that will 

1139 be given the CompileState object upon compilation. 

1140 

1141 A second argument cache_args is required, which will be combined with 

1142 the ``__code__`` identity of the function itself in order to produce a 

1143 cache key. 

1144 

1145 """ 

1146 self._with_context_options += ((callable_, cache_args),) 

1147 return self 

1148 

1149 @overload 

1150 def execution_options( 

1151 self, 

1152 *, 

1153 compiled_cache: Optional[CompiledCacheType] = ..., 

1154 logging_token: str = ..., 

1155 isolation_level: IsolationLevel = ..., 

1156 no_parameters: bool = False, 

1157 stream_results: bool = False, 

1158 max_row_buffer: int = ..., 

1159 yield_per: int = ..., 

1160 driver_column_names: bool = ..., 

1161 insertmanyvalues_page_size: int = ..., 

1162 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

1163 populate_existing: bool = False, 

1164 autoflush: bool = False, 

1165 synchronize_session: SynchronizeSessionArgument = ..., 

1166 dml_strategy: DMLStrategyArgument = ..., 

1167 render_nulls: bool = ..., 

1168 is_delete_using: bool = ..., 

1169 is_update_from: bool = ..., 

1170 preserve_rowcount: bool = False, 

1171 **opt: Any, 

1172 ) -> Self: ... 

1173 

1174 @overload 

1175 def execution_options(self, **opt: Any) -> Self: ... 

1176 

1177 @_generative 

1178 def execution_options(self, **kw: Any) -> Self: 

1179 """Set non-SQL options for the statement which take effect during 

1180 execution. 

1181 

1182 Execution options can be set at many scopes, including per-statement, 

1183 per-connection, or per execution, using methods such as 

1184 :meth:`_engine.Connection.execution_options` and parameters which 

1185 accept a dictionary of options such as 

1186 :paramref:`_engine.Connection.execute.execution_options` and 

1187 :paramref:`_orm.Session.execute.execution_options`. 

1188 

1189 The primary characteristic of an execution option, as opposed to 

1190 other kinds of options such as ORM loader options, is that 

1191 **execution options never affect the compiled SQL of a query, only 

1192 things that affect how the SQL statement itself is invoked or how 

1193 results are fetched**. That is, execution options are not part of 

1194 what's accommodated by SQL compilation nor are they considered part of 

1195 the cached state of a statement. 

1196 

1197 The :meth:`_sql.Executable.execution_options` method is 

1198 :term:`generative`, as 

1199 is the case for the method as applied to the :class:`_engine.Engine` 

1200 and :class:`_orm.Query` objects, which means when the method is called, 

1201 a copy of the object is returned, which applies the given parameters to 

1202 that new copy, but leaves the original unchanged:: 

1203 

1204 statement = select(table.c.x, table.c.y) 

1205 new_statement = statement.execution_options(my_option=True) 

1206 

1207 An exception to this behavior is the :class:`_engine.Connection` 

1208 object, where the :meth:`_engine.Connection.execution_options` method 

1209 is explicitly **not** generative. 

1210 

1211 The kinds of options that may be passed to 

1212 :meth:`_sql.Executable.execution_options` and other related methods and 

1213 parameter dictionaries include parameters that are explicitly consumed 

1214 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not 

1215 defined by SQLAlchemy, which means the methods and/or parameter 

1216 dictionaries may be used for user-defined parameters that interact with 

1217 custom code, which may access the parameters using methods such as 

1218 :meth:`_sql.Executable.get_execution_options` and 

1219 :meth:`_engine.Connection.get_execution_options`, or within selected 

1220 event hooks using a dedicated ``execution_options`` event parameter 

1221 such as 

1222 :paramref:`_events.ConnectionEvents.before_execute.execution_options` 

1223 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.:: 

1224 

1225 from sqlalchemy import event 

1226 

1227 @event.listens_for(some_engine, "before_execute") 

1228 def _process_opt(conn, statement, multiparams, params, execution_options): 

1229 "run a SQL function before invoking a statement" 

1230 

1231 if execution_options.get("do_special_thing", False): 

1232 conn.exec_driver_sql("run_special_function()") 

1233 

1234 Within the scope of options that are explicitly recognized by 

1235 SQLAlchemy, most apply to specific classes of objects and not others. 

1236 The most common execution options include: 

1237 

1238 * :paramref:`_engine.Connection.execution_options.isolation_level` - 

1239 sets the isolation level for a connection or a class of connections 

1240 via an :class:`_engine.Engine`. This option is accepted only 

1241 by :class:`_engine.Connection` or :class:`_engine.Engine`. 

1242 

1243 * :paramref:`_engine.Connection.execution_options.stream_results` - 

1244 indicates results should be fetched using a server side cursor; 

1245 this option is accepted by :class:`_engine.Connection`, by the 

1246 :paramref:`_engine.Connection.execute.execution_options` parameter 

1247 on :meth:`_engine.Connection.execute`, and additionally by 

1248 :meth:`_sql.Executable.execution_options` on a SQL statement object, 

1249 as well as by ORM constructs like :meth:`_orm.Session.execute`. 

1250 

1251 * :paramref:`_engine.Connection.execution_options.compiled_cache` - 

1252 indicates a dictionary that will serve as the 

1253 :ref:`SQL compilation cache <sql_caching>` 

1254 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as 

1255 well as for ORM methods like :meth:`_orm.Session.execute`. 

1256 Can be passed as ``None`` to disable caching for statements. 

1257 This option is not accepted by 

1258 :meth:`_sql.Executable.execution_options` as it is inadvisable to 

1259 carry along a compilation cache within a statement object. 

1260 

1261 * :paramref:`_engine.Connection.execution_options.schema_translate_map` 

1262 - a mapping of schema names used by the 

1263 :ref:`Schema Translate Map <schema_translating>` feature, accepted 

1264 by :class:`_engine.Connection`, :class:`_engine.Engine`, 

1265 :class:`_sql.Executable`, as well as by ORM constructs 

1266 like :meth:`_orm.Session.execute`. 

1267 

1268 .. seealso:: 

1269 

1270 :meth:`_engine.Connection.execution_options` 

1271 

1272 :paramref:`_engine.Connection.execute.execution_options` 

1273 

1274 :paramref:`_orm.Session.execute.execution_options` 

1275 

1276 :ref:`orm_queryguide_execution_options` - documentation on all 

1277 ORM-specific execution options 

1278 

1279 """ # noqa: E501 

1280 if "isolation_level" in kw: 

1281 raise exc.ArgumentError( 

1282 "'isolation_level' execution option may only be specified " 

1283 "on Connection.execution_options(), or " 

1284 "per-engine using the isolation_level " 

1285 "argument to create_engine()." 

1286 ) 

1287 if "compiled_cache" in kw: 

1288 raise exc.ArgumentError( 

1289 "'compiled_cache' execution option may only be specified " 

1290 "on Connection.execution_options(), not per statement." 

1291 ) 

1292 self._execution_options = self._execution_options.union(kw) 

1293 return self 

1294 

1295 def get_execution_options(self) -> _ExecuteOptions: 

1296 """Get the non-SQL options which will take effect during execution. 

1297 

1298 .. versionadded:: 1.3 

1299 

1300 .. seealso:: 

1301 

1302 :meth:`.Executable.execution_options` 

1303 """ 

1304 return self._execution_options 

1305 

1306 

1307class SchemaEventTarget(event.EventTarget): 

1308 """Base class for elements that are the targets of :class:`.DDLEvents` 

1309 events. 

1310 

1311 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`. 

1312 

1313 """ 

1314 

1315 dispatch: dispatcher[SchemaEventTarget] 

1316 

1317 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: 

1318 """Associate with this SchemaEvent's parent object.""" 

1319 

1320 def _set_parent_with_dispatch( 

1321 self, parent: SchemaEventTarget, **kw: Any 

1322 ) -> None: 

1323 self.dispatch.before_parent_attach(self, parent) 

1324 self._set_parent(parent, **kw) 

1325 self.dispatch.after_parent_attach(self, parent) 

1326 

1327 

1328class SchemaVisitor(ClauseVisitor): 

1329 """Define the visiting for ``SchemaItem`` objects.""" 

1330 

1331 __traverse_options__ = {"schema_visitor": True} 

1332 

1333 

1334class _SentinelDefaultCharacterization(Enum): 

1335 NONE = "none" 

1336 UNKNOWN = "unknown" 

1337 CLIENTSIDE = "clientside" 

1338 SENTINEL_DEFAULT = "sentinel_default" 

1339 SERVERSIDE = "serverside" 

1340 IDENTITY = "identity" 

1341 SEQUENCE = "sequence" 

1342 

1343 

1344class _SentinelColumnCharacterization(NamedTuple): 

1345 columns: Optional[Sequence[Column[Any]]] = None 

1346 is_explicit: bool = False 

1347 is_autoinc: bool = False 

1348 default_characterization: _SentinelDefaultCharacterization = ( 

1349 _SentinelDefaultCharacterization.NONE 

1350 ) 

1351 

1352 

1353_COLKEY = TypeVar("_COLKEY", Union[None, str], str) 

1354 

1355_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True) 

1356_COL = TypeVar("_COL", bound="ColumnElement[Any]") 

1357 

1358 

1359class _ColumnMetrics(Generic[_COL_co]): 

1360 __slots__ = ("column",) 

1361 

1362 column: _COL_co 

1363 

1364 def __init__( 

1365 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co 

1366 ): 

1367 self.column = col 

1368 

1369 # proxy_index being non-empty means it was initialized. 

1370 # so we need to update it 

1371 pi = collection._proxy_index 

1372 if pi: 

1373 for eps_col in col._expanded_proxy_set: 

1374 pi[eps_col].add(self) 

1375 

1376 def get_expanded_proxy_set(self): 

1377 return self.column._expanded_proxy_set 

1378 

1379 def dispose(self, collection): 

1380 pi = collection._proxy_index 

1381 if not pi: 

1382 return 

1383 for col in self.column._expanded_proxy_set: 

1384 colset = pi.get(col, None) 

1385 if colset: 

1386 colset.discard(self) 

1387 if colset is not None and not colset: 

1388 del pi[col] 

1389 

1390 def embedded( 

1391 self, 

1392 target_set: Union[ 

1393 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]] 

1394 ], 

1395 ) -> bool: 

1396 expanded_proxy_set = self.column._expanded_proxy_set 

1397 for t in target_set.difference(expanded_proxy_set): 

1398 if not expanded_proxy_set.intersection(_expand_cloned([t])): 

1399 return False 

1400 return True 

1401 

1402 

1403class ColumnCollection(Generic[_COLKEY, _COL_co]): 

1404 """Collection of :class:`_expression.ColumnElement` instances, 

1405 typically for 

1406 :class:`_sql.FromClause` objects. 

1407 

1408 The :class:`_sql.ColumnCollection` object is most commonly available 

1409 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection 

1410 on the :class:`_schema.Table` object, introduced at 

1411 :ref:`metadata_tables_and_columns`. 

1412 

1413 The :class:`_expression.ColumnCollection` has both mapping- and sequence- 

1414 like behaviors. A :class:`_expression.ColumnCollection` usually stores 

1415 :class:`_schema.Column` objects, which are then accessible both via mapping 

1416 style access as well as attribute access style. 

1417 

1418 To access :class:`_schema.Column` objects using ordinary attribute-style 

1419 access, specify the name like any other object attribute, such as below 

1420 a column named ``employee_name`` is accessed:: 

1421 

1422 >>> employee_table.c.employee_name 

1423 

1424 To access columns that have names with special characters or spaces, 

1425 index-style access is used, such as below which illustrates a column named 

1426 ``employee ' payment`` is accessed:: 

1427 

1428 >>> employee_table.c["employee ' payment"] 

1429 

1430 As the :class:`_sql.ColumnCollection` object provides a Python dictionary 

1431 interface, common dictionary method names like 

1432 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`, 

1433 and :meth:`_sql.ColumnCollection.items` are available, which means that 

1434 database columns that are keyed under these names also need to use indexed 

1435 access:: 

1436 

1437 >>> employee_table.c["values"] 

1438 

1439 

1440 The name for which a :class:`_schema.Column` would be present is normally 

1441 that of the :paramref:`_schema.Column.key` parameter. In some contexts, 

1442 such as a :class:`_sql.Select` object that uses a label style set 

1443 using the :meth:`_sql.Select.set_label_style` method, a column of a certain 

1444 key may instead be represented under a particular label name such 

1445 as ``tablename_columnname``:: 

1446 

1447 >>> from sqlalchemy import select, column, table 

1448 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL 

1449 >>> t = table("t", column("c")) 

1450 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL) 

1451 >>> subq = stmt.subquery() 

1452 >>> subq.c.t_c 

1453 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c> 

1454 

1455 :class:`.ColumnCollection` also indexes the columns in order and allows 

1456 them to be accessible by their integer position:: 

1457 

1458 >>> cc[0] 

1459 Column('x', Integer(), table=None) 

1460 >>> cc[1] 

1461 Column('y', Integer(), table=None) 

1462 

1463 .. versionadded:: 1.4 :class:`_expression.ColumnCollection` 

1464 allows integer-based 

1465 index access to the collection. 

1466 

1467 Iterating the collection yields the column expressions in order:: 

1468 

1469 >>> list(cc) 

1470 [Column('x', Integer(), table=None), 

1471 Column('y', Integer(), table=None)] 

1472 

1473 The base :class:`_expression.ColumnCollection` object can store 

1474 duplicates, which can 

1475 mean either two columns with the same key, in which case the column 

1476 returned by key access is **arbitrary**:: 

1477 

1478 >>> x1, x2 = Column('x', Integer), Column('x', Integer) 

1479 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)]) 

1480 >>> list(cc) 

1481 [Column('x', Integer(), table=None), 

1482 Column('x', Integer(), table=None)] 

1483 >>> cc['x'] is x1 

1484 False 

1485 >>> cc['x'] is x2 

1486 True 

1487 

1488 Or it can also mean the same column multiple times. These cases are 

1489 supported as :class:`_expression.ColumnCollection` 

1490 is used to represent the columns in 

1491 a SELECT statement which may include duplicates. 

1492 

1493 A special subclass :class:`.DedupeColumnCollection` exists which instead 

1494 maintains SQLAlchemy's older behavior of not allowing duplicates; this 

1495 collection is used for schema level objects like :class:`_schema.Table` 

1496 and 

1497 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The 

1498 :class:`.DedupeColumnCollection` class also has additional mutation methods 

1499 as the schema constructs have more use cases that require removal and 

1500 replacement of columns. 

1501 

1502 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection` 

1503 now stores duplicate 

1504 column keys as well as the same column in multiple positions. The 

1505 :class:`.DedupeColumnCollection` class is added to maintain the 

1506 former behavior in those cases where deduplication as well as 

1507 additional replace/remove operations are needed. 

1508 

1509 

1510 """ 

1511 

1512 __slots__ = "_collection", "_index", "_colset", "_proxy_index" 

1513 

1514 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]] 

1515 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]] 

1516 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]] 

1517 _colset: Set[_COL_co] 

1518 

1519 def __init__( 

1520 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None 

1521 ): 

1522 object.__setattr__(self, "_colset", set()) 

1523 object.__setattr__(self, "_index", {}) 

1524 object.__setattr__( 

1525 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

1526 ) 

1527 object.__setattr__(self, "_collection", []) 

1528 if columns: 

1529 self._initial_populate(columns) 

1530 

1531 @util.preload_module("sqlalchemy.sql.elements") 

1532 def __clause_element__(self) -> ClauseList: 

1533 elements = util.preloaded.sql_elements 

1534 

1535 return elements.ClauseList( 

1536 _literal_as_text_role=roles.ColumnsClauseRole, 

1537 group=False, 

1538 *self._all_columns, 

1539 ) 

1540 

1541 def _initial_populate( 

1542 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

1543 ) -> None: 

1544 self._populate_separate_keys(iter_) 

1545 

1546 @property 

1547 def _all_columns(self) -> List[_COL_co]: 

1548 return [col for (_, col, _) in self._collection] 

1549 

1550 def keys(self) -> List[_COLKEY]: 

1551 """Return a sequence of string key names for all columns in this 

1552 collection.""" 

1553 return [k for (k, _, _) in self._collection] 

1554 

1555 def values(self) -> List[_COL_co]: 

1556 """Return a sequence of :class:`_sql.ColumnClause` or 

1557 :class:`_schema.Column` objects for all columns in this 

1558 collection.""" 

1559 return [col for (_, col, _) in self._collection] 

1560 

1561 def items(self) -> List[Tuple[_COLKEY, _COL_co]]: 

1562 """Return a sequence of (key, column) tuples for all columns in this 

1563 collection each consisting of a string key name and a 

1564 :class:`_sql.ColumnClause` or 

1565 :class:`_schema.Column` object. 

1566 """ 

1567 

1568 return [(k, col) for (k, col, _) in self._collection] 

1569 

1570 def __bool__(self) -> bool: 

1571 return bool(self._collection) 

1572 

1573 def __len__(self) -> int: 

1574 return len(self._collection) 

1575 

1576 def __iter__(self) -> Iterator[_COL_co]: 

1577 # turn to a list first to maintain over a course of changes 

1578 return iter([col for _, col, _ in self._collection]) 

1579 

1580 @overload 

1581 def __getitem__(self, key: Union[str, int]) -> _COL_co: ... 

1582 

1583 @overload 

1584 def __getitem__( 

1585 self, key: Tuple[Union[str, int], ...] 

1586 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1587 

1588 @overload 

1589 def __getitem__( 

1590 self, key: slice 

1591 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1592 

1593 def __getitem__( 

1594 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]] 

1595 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]: 

1596 try: 

1597 if isinstance(key, (tuple, slice)): 

1598 if isinstance(key, slice): 

1599 cols = ( 

1600 (sub_key, col) 

1601 for (sub_key, col, _) in self._collection[key] 

1602 ) 

1603 else: 

1604 cols = (self._index[sub_key] for sub_key in key) 

1605 

1606 return ColumnCollection(cols).as_readonly() 

1607 else: 

1608 return self._index[key][1] 

1609 except KeyError as err: 

1610 if isinstance(err.args[0], int): 

1611 raise IndexError(err.args[0]) from err 

1612 else: 

1613 raise 

1614 

1615 def __getattr__(self, key: str) -> _COL_co: 

1616 try: 

1617 return self._index[key][1] 

1618 except KeyError as err: 

1619 raise AttributeError(key) from err 

1620 

1621 def __contains__(self, key: str) -> bool: 

1622 if key not in self._index: 

1623 if not isinstance(key, str): 

1624 raise exc.ArgumentError( 

1625 "__contains__ requires a string argument" 

1626 ) 

1627 return False 

1628 else: 

1629 return True 

1630 

1631 def compare(self, other: ColumnCollection[Any, Any]) -> bool: 

1632 """Compare this :class:`_expression.ColumnCollection` to another 

1633 based on the names of the keys""" 

1634 

1635 for l, r in zip_longest(self, other): 

1636 if l is not r: 

1637 return False 

1638 else: 

1639 return True 

1640 

1641 def __eq__(self, other: Any) -> bool: 

1642 return self.compare(other) 

1643 

1644 @overload 

1645 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ... 

1646 

1647 @overload 

1648 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ... 

1649 

1650 def get( 

1651 self, key: str, default: Optional[_COL] = None 

1652 ) -> Optional[Union[_COL_co, _COL]]: 

1653 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object 

1654 based on a string key name from this 

1655 :class:`_expression.ColumnCollection`.""" 

1656 

1657 if key in self._index: 

1658 return self._index[key][1] 

1659 else: 

1660 return default 

1661 

1662 def __str__(self) -> str: 

1663 return "%s(%s)" % ( 

1664 self.__class__.__name__, 

1665 ", ".join(str(c) for c in self), 

1666 ) 

1667 

1668 def __setitem__(self, key: str, value: Any) -> NoReturn: 

1669 raise NotImplementedError() 

1670 

1671 def __delitem__(self, key: str) -> NoReturn: 

1672 raise NotImplementedError() 

1673 

1674 def __setattr__(self, key: str, obj: Any) -> NoReturn: 

1675 raise NotImplementedError() 

1676 

1677 def clear(self) -> NoReturn: 

1678 """Dictionary clear() is not implemented for 

1679 :class:`_sql.ColumnCollection`.""" 

1680 raise NotImplementedError() 

1681 

1682 def remove(self, column: Any) -> None: 

1683 raise NotImplementedError() 

1684 

1685 def update(self, iter_: Any) -> NoReturn: 

1686 """Dictionary update() is not implemented for 

1687 :class:`_sql.ColumnCollection`.""" 

1688 raise NotImplementedError() 

1689 

1690 # https://github.com/python/mypy/issues/4266 

1691 __hash__ = None # type: ignore 

1692 

1693 def _populate_separate_keys( 

1694 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

1695 ) -> None: 

1696 """populate from an iterator of (key, column)""" 

1697 

1698 self._collection[:] = collection = [ 

1699 (k, c, _ColumnMetrics(self, c)) for k, c in iter_ 

1700 ] 

1701 self._colset.update(c._deannotate() for _, c, _ in collection) 

1702 self._index.update( 

1703 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)} 

1704 ) 

1705 self._index.update({k: (k, col) for k, col, _ in reversed(collection)}) 

1706 

1707 def add( 

1708 self, column: ColumnElement[Any], key: Optional[_COLKEY] = None 

1709 ) -> None: 

1710 """Add a column to this :class:`_sql.ColumnCollection`. 

1711 

1712 .. note:: 

1713 

1714 This method is **not normally used by user-facing code**, as the 

1715 :class:`_sql.ColumnCollection` is usually part of an existing 

1716 object such as a :class:`_schema.Table`. To add a 

1717 :class:`_schema.Column` to an existing :class:`_schema.Table` 

1718 object, use the :meth:`_schema.Table.append_column` method. 

1719 

1720 """ 

1721 colkey: _COLKEY 

1722 

1723 if key is None: 

1724 colkey = column.key # type: ignore 

1725 else: 

1726 colkey = key 

1727 

1728 l = len(self._collection) 

1729 

1730 # don't really know how this part is supposed to work w/ the 

1731 # covariant thing 

1732 

1733 _column = cast(_COL_co, column) 

1734 

1735 self._collection.append( 

1736 (colkey, _column, _ColumnMetrics(self, _column)) 

1737 ) 

1738 self._colset.add(_column._deannotate()) 

1739 self._index[l] = (colkey, _column) 

1740 if colkey not in self._index: 

1741 self._index[colkey] = (colkey, _column) 

1742 

1743 def __getstate__(self) -> Dict[str, Any]: 

1744 return { 

1745 "_collection": [(k, c) for k, c, _ in self._collection], 

1746 "_index": self._index, 

1747 } 

1748 

1749 def __setstate__(self, state: Dict[str, Any]) -> None: 

1750 object.__setattr__(self, "_index", state["_index"]) 

1751 object.__setattr__( 

1752 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

1753 ) 

1754 object.__setattr__( 

1755 self, 

1756 "_collection", 

1757 [ 

1758 (k, c, _ColumnMetrics(self, c)) 

1759 for (k, c) in state["_collection"] 

1760 ], 

1761 ) 

1762 object.__setattr__( 

1763 self, "_colset", {col for k, col, _ in self._collection} 

1764 ) 

1765 

1766 def contains_column(self, col: ColumnElement[Any]) -> bool: 

1767 """Checks if a column object exists in this collection""" 

1768 if col not in self._colset: 

1769 if isinstance(col, str): 

1770 raise exc.ArgumentError( 

1771 "contains_column cannot be used with string arguments. " 

1772 "Use ``col_name in table.c`` instead." 

1773 ) 

1774 return False 

1775 else: 

1776 return True 

1777 

1778 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: 

1779 """Return a "read only" form of this 

1780 :class:`_sql.ColumnCollection`.""" 

1781 

1782 return ReadOnlyColumnCollection(self) 

1783 

1784 def _init_proxy_index(self): 

1785 """populate the "proxy index", if empty. 

1786 

1787 proxy index is added in 2.0 to provide more efficient operation 

1788 for the corresponding_column() method. 

1789 

1790 For reasons of both time to construct new .c collections as well as 

1791 memory conservation for large numbers of large .c collections, the 

1792 proxy_index is only filled if corresponding_column() is called. once 

1793 filled it stays that way, and new _ColumnMetrics objects created after 

1794 that point will populate it with new data. Note this case would be 

1795 unusual, if not nonexistent, as it means a .c collection is being 

1796 mutated after corresponding_column() were used, however it is tested in 

1797 test/base/test_utils.py. 

1798 

1799 """ 

1800 pi = self._proxy_index 

1801 if pi: 

1802 return 

1803 

1804 for _, _, metrics in self._collection: 

1805 eps = metrics.column._expanded_proxy_set 

1806 

1807 for eps_col in eps: 

1808 pi[eps_col].add(metrics) 

1809 

1810 def corresponding_column( 

1811 self, column: _COL, require_embedded: bool = False 

1812 ) -> Optional[Union[_COL, _COL_co]]: 

1813 """Given a :class:`_expression.ColumnElement`, return the exported 

1814 :class:`_expression.ColumnElement` object from this 

1815 :class:`_expression.ColumnCollection` 

1816 which corresponds to that original :class:`_expression.ColumnElement` 

1817 via a common 

1818 ancestor column. 

1819 

1820 :param column: the target :class:`_expression.ColumnElement` 

1821 to be matched. 

1822 

1823 :param require_embedded: only return corresponding columns for 

1824 the given :class:`_expression.ColumnElement`, if the given 

1825 :class:`_expression.ColumnElement` 

1826 is actually present within a sub-element 

1827 of this :class:`_expression.Selectable`. 

1828 Normally the column will match if 

1829 it merely shares a common ancestor with one of the exported 

1830 columns of this :class:`_expression.Selectable`. 

1831 

1832 .. seealso:: 

1833 

1834 :meth:`_expression.Selectable.corresponding_column` 

1835 - invokes this method 

1836 against the collection returned by 

1837 :attr:`_expression.Selectable.exported_columns`. 

1838 

1839 .. versionchanged:: 1.4 the implementation for ``corresponding_column`` 

1840 was moved onto the :class:`_expression.ColumnCollection` itself. 

1841 

1842 """ 

1843 # TODO: cython candidate 

1844 

1845 # don't dig around if the column is locally present 

1846 if column in self._colset: 

1847 return column 

1848 

1849 selected_intersection, selected_metrics = None, None 

1850 target_set = column.proxy_set 

1851 

1852 pi = self._proxy_index 

1853 if not pi: 

1854 self._init_proxy_index() 

1855 

1856 for current_metrics in ( 

1857 mm for ts in target_set if ts in pi for mm in pi[ts] 

1858 ): 

1859 if not require_embedded or current_metrics.embedded(target_set): 

1860 if selected_metrics is None: 

1861 # no corresponding column yet, pick this one. 

1862 selected_metrics = current_metrics 

1863 continue 

1864 

1865 current_intersection = target_set.intersection( 

1866 current_metrics.column._expanded_proxy_set 

1867 ) 

1868 if selected_intersection is None: 

1869 selected_intersection = target_set.intersection( 

1870 selected_metrics.column._expanded_proxy_set 

1871 ) 

1872 

1873 if len(current_intersection) > len(selected_intersection): 

1874 # 'current' has a larger field of correspondence than 

1875 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x 

1876 # matches a1.c.x->table.c.x better than 

1877 # selectable.c.x->table.c.x does. 

1878 

1879 selected_metrics = current_metrics 

1880 selected_intersection = current_intersection 

1881 elif current_intersection == selected_intersection: 

1882 # they have the same field of correspondence. see 

1883 # which proxy_set has fewer columns in it, which 

1884 # indicates a closer relationship with the root 

1885 # column. Also take into account the "weight" 

1886 # attribute which CompoundSelect() uses to give 

1887 # higher precedence to columns based on vertical 

1888 # position in the compound statement, and discard 

1889 # columns that have no reference to the target 

1890 # column (also occurs with CompoundSelect) 

1891 

1892 selected_col_distance = sum( 

1893 [ 

1894 sc._annotations.get("weight", 1) 

1895 for sc in ( 

1896 selected_metrics.column._uncached_proxy_list() 

1897 ) 

1898 if sc.shares_lineage(column) 

1899 ], 

1900 ) 

1901 current_col_distance = sum( 

1902 [ 

1903 sc._annotations.get("weight", 1) 

1904 for sc in ( 

1905 current_metrics.column._uncached_proxy_list() 

1906 ) 

1907 if sc.shares_lineage(column) 

1908 ], 

1909 ) 

1910 if current_col_distance < selected_col_distance: 

1911 selected_metrics = current_metrics 

1912 selected_intersection = current_intersection 

1913 

1914 return selected_metrics.column if selected_metrics else None 

1915 

1916 

1917_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]") 

1918 

1919 

1920class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]): 

1921 """A :class:`_expression.ColumnCollection` 

1922 that maintains deduplicating behavior. 

1923 

1924 This is useful by schema level objects such as :class:`_schema.Table` and 

1925 :class:`.PrimaryKeyConstraint`. The collection includes more 

1926 sophisticated mutator methods as well to suit schema objects which 

1927 require mutable column collections. 

1928 

1929 .. versionadded:: 1.4 

1930 

1931 """ 

1932 

1933 def add( # type: ignore[override] 

1934 self, column: _NAMEDCOL, key: Optional[str] = None 

1935 ) -> None: 

1936 if key is not None and column.key != key: 

1937 raise exc.ArgumentError( 

1938 "DedupeColumnCollection requires columns be under " 

1939 "the same key as their .key" 

1940 ) 

1941 key = column.key 

1942 

1943 if key is None: 

1944 raise exc.ArgumentError( 

1945 "Can't add unnamed column to column collection" 

1946 ) 

1947 

1948 if key in self._index: 

1949 existing = self._index[key][1] 

1950 

1951 if existing is column: 

1952 return 

1953 

1954 self.replace(column) 

1955 

1956 # pop out memoized proxy_set as this 

1957 # operation may very well be occurring 

1958 # in a _make_proxy operation 

1959 util.memoized_property.reset(column, "proxy_set") 

1960 else: 

1961 self._append_new_column(key, column) 

1962 

1963 def _append_new_column(self, key: str, named_column: _NAMEDCOL) -> None: 

1964 l = len(self._collection) 

1965 self._collection.append( 

1966 (key, named_column, _ColumnMetrics(self, named_column)) 

1967 ) 

1968 self._colset.add(named_column._deannotate()) 

1969 self._index[l] = (key, named_column) 

1970 self._index[key] = (key, named_column) 

1971 

1972 def _populate_separate_keys( 

1973 self, iter_: Iterable[Tuple[str, _NAMEDCOL]] 

1974 ) -> None: 

1975 """populate from an iterator of (key, column)""" 

1976 cols = list(iter_) 

1977 

1978 replace_col = [] 

1979 for k, col in cols: 

1980 if col.key != k: 

1981 raise exc.ArgumentError( 

1982 "DedupeColumnCollection requires columns be under " 

1983 "the same key as their .key" 

1984 ) 

1985 if col.name in self._index and col.key != col.name: 

1986 replace_col.append(col) 

1987 elif col.key in self._index: 

1988 replace_col.append(col) 

1989 else: 

1990 self._index[k] = (k, col) 

1991 self._collection.append((k, col, _ColumnMetrics(self, col))) 

1992 self._colset.update(c._deannotate() for (k, c, _) in self._collection) 

1993 

1994 self._index.update( 

1995 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection) 

1996 ) 

1997 for col in replace_col: 

1998 self.replace(col) 

1999 

2000 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None: 

2001 self._populate_separate_keys((col.key, col) for col in iter_) 

2002 

2003 def remove(self, column: _NAMEDCOL) -> None: 

2004 if column not in self._colset: 

2005 raise ValueError( 

2006 "Can't remove column %r; column is not in this collection" 

2007 % column 

2008 ) 

2009 del self._index[column.key] 

2010 self._colset.remove(column) 

2011 self._collection[:] = [ 

2012 (k, c, metrics) 

2013 for (k, c, metrics) in self._collection 

2014 if c is not column 

2015 ] 

2016 for metrics in self._proxy_index.get(column, ()): 

2017 metrics.dispose(self) 

2018 

2019 self._index.update( 

2020 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2021 ) 

2022 # delete higher index 

2023 del self._index[len(self._collection)] 

2024 

2025 def replace( 

2026 self, 

2027 column: _NAMEDCOL, 

2028 extra_remove: Optional[Iterable[_NAMEDCOL]] = None, 

2029 ) -> None: 

2030 """add the given column to this collection, removing unaliased 

2031 versions of this column as well as existing columns with the 

2032 same key. 

2033 

2034 e.g.:: 

2035 

2036 t = Table('sometable', metadata, Column('col1', Integer)) 

2037 t.columns.replace(Column('col1', Integer, key='columnone')) 

2038 

2039 will remove the original 'col1' from the collection, and add 

2040 the new column under the name 'columnname'. 

2041 

2042 Used by schema.Column to override columns during table reflection. 

2043 

2044 """ 

2045 

2046 if extra_remove: 

2047 remove_col = set(extra_remove) 

2048 else: 

2049 remove_col = set() 

2050 # remove up to two columns based on matches of name as well as key 

2051 if column.name in self._index and column.key != column.name: 

2052 other = self._index[column.name][1] 

2053 if other.name == other.key: 

2054 remove_col.add(other) 

2055 

2056 if column.key in self._index: 

2057 remove_col.add(self._index[column.key][1]) 

2058 

2059 if not remove_col: 

2060 self._append_new_column(column.key, column) 

2061 return 

2062 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = [] 

2063 replaced = False 

2064 for k, col, metrics in self._collection: 

2065 if col in remove_col: 

2066 if not replaced: 

2067 replaced = True 

2068 new_cols.append( 

2069 (column.key, column, _ColumnMetrics(self, column)) 

2070 ) 

2071 else: 

2072 new_cols.append((k, col, metrics)) 

2073 

2074 if remove_col: 

2075 self._colset.difference_update(remove_col) 

2076 

2077 for rc in remove_col: 

2078 for metrics in self._proxy_index.get(rc, ()): 

2079 metrics.dispose(self) 

2080 

2081 if not replaced: 

2082 new_cols.append((column.key, column, _ColumnMetrics(self, column))) 

2083 

2084 self._colset.add(column._deannotate()) 

2085 self._collection[:] = new_cols 

2086 

2087 self._index.clear() 

2088 

2089 self._index.update( 

2090 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2091 ) 

2092 self._index.update({k: (k, col) for (k, col, _) in self._collection}) 

2093 

2094 

2095class ReadOnlyColumnCollection( 

2096 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co] 

2097): 

2098 __slots__ = ("_parent",) 

2099 

2100 def __init__(self, collection): 

2101 object.__setattr__(self, "_parent", collection) 

2102 object.__setattr__(self, "_colset", collection._colset) 

2103 object.__setattr__(self, "_index", collection._index) 

2104 object.__setattr__(self, "_collection", collection._collection) 

2105 object.__setattr__(self, "_proxy_index", collection._proxy_index) 

2106 

2107 def __getstate__(self): 

2108 return {"_parent": self._parent} 

2109 

2110 def __setstate__(self, state): 

2111 parent = state["_parent"] 

2112 self.__init__(parent) # type: ignore 

2113 

2114 def add(self, column: Any, key: Any = ...) -> Any: 

2115 self._readonly() 

2116 

2117 def extend(self, elements: Any) -> NoReturn: 

2118 self._readonly() 

2119 

2120 def remove(self, item: Any) -> NoReturn: 

2121 self._readonly() 

2122 

2123 

2124class ColumnSet(util.OrderedSet["ColumnClause[Any]"]): 

2125 def contains_column(self, col): 

2126 return col in self 

2127 

2128 def extend(self, cols): 

2129 for col in cols: 

2130 self.add(col) 

2131 

2132 def __eq__(self, other): 

2133 l = [] 

2134 for c in other: 

2135 for local in self: 

2136 if c.shares_lineage(local): 

2137 l.append(c == local) 

2138 return elements.and_(*l) 

2139 

2140 def __hash__(self): # type: ignore[override] 

2141 return hash(tuple(x for x in self)) 

2142 

2143 

2144def _entity_namespace( 

2145 entity: Union[_HasEntityNamespace, ExternallyTraversible] 

2146) -> _EntityNamespace: 

2147 """Return the nearest .entity_namespace for the given entity. 

2148 

2149 If not immediately available, does an iterate to find a sub-element 

2150 that has one, if any. 

2151 

2152 """ 

2153 try: 

2154 return cast(_HasEntityNamespace, entity).entity_namespace 

2155 except AttributeError: 

2156 for elem in visitors.iterate(cast(ExternallyTraversible, entity)): 

2157 if _is_has_entity_namespace(elem): 

2158 return elem.entity_namespace 

2159 else: 

2160 raise 

2161 

2162 

2163def _entity_namespace_key( 

2164 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2165 key: str, 

2166 default: Union[SQLCoreOperations[Any], _NoArg] = NO_ARG, 

2167) -> SQLCoreOperations[Any]: 

2168 """Return an entry from an entity_namespace. 

2169 

2170 

2171 Raises :class:`_exc.InvalidRequestError` rather than attribute error 

2172 on not found. 

2173 

2174 """ 

2175 

2176 try: 

2177 ns = _entity_namespace(entity) 

2178 if default is not NO_ARG: 

2179 return getattr(ns, key, default) 

2180 else: 

2181 return getattr(ns, key) # type: ignore 

2182 except AttributeError as err: 

2183 raise exc.InvalidRequestError( 

2184 'Entity namespace for "%s" has no property "%s"' % (entity, key) 

2185 ) from err