Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/base.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

846 statements  

1# sql/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Foundational utilities common to many sql modules.""" 

10 

11 

12from __future__ import annotations 

13 

14import collections 

15from enum import Enum 

16import itertools 

17from itertools import zip_longest 

18import operator 

19import re 

20from typing import Any 

21from typing import Callable 

22from typing import cast 

23from typing import Dict 

24from typing import FrozenSet 

25from typing import Generator 

26from typing import Generic 

27from typing import Iterable 

28from typing import Iterator 

29from typing import List 

30from typing import Mapping 

31from typing import MutableMapping 

32from typing import NamedTuple 

33from typing import NoReturn 

34from typing import Optional 

35from typing import overload 

36from typing import Sequence 

37from typing import Set 

38from typing import Tuple 

39from typing import Type 

40from typing import TYPE_CHECKING 

41from typing import TypeVar 

42from typing import Union 

43 

44from . import roles 

45from . import visitors 

46from .cache_key import HasCacheKey # noqa 

47from .cache_key import MemoizedHasCacheKey # noqa 

48from .traversals import HasCopyInternals # noqa 

49from .visitors import ClauseVisitor 

50from .visitors import ExtendedInternalTraversal 

51from .visitors import ExternallyTraversible 

52from .visitors import InternalTraversal 

53from .. import event 

54from .. import exc 

55from .. import util 

56from ..util import HasMemoized as HasMemoized 

57from ..util import hybridmethod 

58from ..util import typing as compat_typing 

59from ..util.typing import Final 

60from ..util.typing import Protocol 

61from ..util.typing import Self 

62from ..util.typing import TypeGuard 

63 

64if TYPE_CHECKING: 

65 from . import coercions 

66 from . import elements 

67 from . import type_api 

68 from ._orm_types import DMLStrategyArgument 

69 from ._orm_types import SynchronizeSessionArgument 

70 from ._typing import _CLE 

71 from .cache_key import CacheKey 

72 from .compiler import SQLCompiler 

73 from .elements import BindParameter 

74 from .elements import ClauseList 

75 from .elements import ColumnClause # noqa 

76 from .elements import ColumnElement 

77 from .elements import NamedColumn 

78 from .elements import SQLCoreOperations 

79 from .elements import TextClause 

80 from .schema import Column 

81 from .schema import DefaultGenerator 

82 from .selectable import _JoinTargetElement 

83 from .selectable import _SelectIterable 

84 from .selectable import FromClause 

85 from .visitors import anon_map 

86 from ..engine import Connection 

87 from ..engine import CursorResult 

88 from ..engine.interfaces import _CoreMultiExecuteParams 

89 from ..engine.interfaces import _ExecuteOptions 

90 from ..engine.interfaces import _ImmutableExecuteOptions 

91 from ..engine.interfaces import CacheStats 

92 from ..engine.interfaces import Compiled 

93 from ..engine.interfaces import CompiledCacheType 

94 from ..engine.interfaces import CoreExecuteOptionsParameter 

95 from ..engine.interfaces import Dialect 

96 from ..engine.interfaces import IsolationLevel 

97 from ..engine.interfaces import SchemaTranslateMapType 

98 from ..event import dispatcher 

99 

100if not TYPE_CHECKING: 

101 coercions = None # noqa 

102 elements = None # noqa 

103 type_api = None # noqa 

104 

105 

106class _NoArg(Enum): 

107 NO_ARG = 0 

108 

109 def __repr__(self): 

110 return f"_NoArg.{self.name}" 

111 

112 

113NO_ARG: Final = _NoArg.NO_ARG 

114 

115 

116class _NoneName(Enum): 

117 NONE_NAME = 0 

118 """indicate a 'deferred' name that was ultimately the value None.""" 

119 

120 

121_NONE_NAME: Final = _NoneName.NONE_NAME 

122 

123_T = TypeVar("_T", bound=Any) 

124 

125_Fn = TypeVar("_Fn", bound=Callable[..., Any]) 

126 

127_AmbiguousTableNameMap = MutableMapping[str, str] 

128 

129 

130class _DefaultDescriptionTuple(NamedTuple): 

131 arg: Any 

132 is_scalar: Optional[bool] 

133 is_callable: Optional[bool] 

134 is_sentinel: Optional[bool] 

135 

136 @classmethod 

137 def _from_column_default( 

138 cls, default: Optional[DefaultGenerator] 

139 ) -> _DefaultDescriptionTuple: 

140 return ( 

141 _DefaultDescriptionTuple( 

142 default.arg, # type: ignore 

143 default.is_scalar, 

144 default.is_callable, 

145 default.is_sentinel, 

146 ) 

147 if default 

148 and ( 

149 default.has_arg 

150 or (not default.for_update and default.is_sentinel) 

151 ) 

152 else _DefaultDescriptionTuple(None, None, None, None) 

153 ) 

154 

155 

156_never_select_column: operator.attrgetter[Any] = operator.attrgetter( 

157 "_omit_from_statements" 

158) 

159 

160 

161class _EntityNamespace(Protocol): 

162 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ... 

163 

164 

165class _HasEntityNamespace(Protocol): 

166 @util.ro_non_memoized_property 

167 def entity_namespace(self) -> _EntityNamespace: ... 

168 

169 

170def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]: 

171 return hasattr(element, "entity_namespace") 

172 

173 

174# Remove when https://github.com/python/mypy/issues/14640 will be fixed 

175_Self = TypeVar("_Self", bound=Any) 

176 

177 

178class Immutable: 

179 """mark a ClauseElement as 'immutable' when expressions are cloned. 

180 

181 "immutable" objects refers to the "mutability" of an object in the 

182 context of SQL DQL and DML generation. Such as, in DQL, one can 

183 compose a SELECT or subquery of varied forms, but one cannot modify 

184 the structure of a specific table or column within DQL. 

185 :class:`.Immutable` is mostly intended to follow this concept, and as 

186 such the primary "immutable" objects are :class:`.ColumnClause`, 

187 :class:`.Column`, :class:`.TableClause`, :class:`.Table`. 

188 

189 """ 

190 

191 __slots__ = () 

192 

193 _is_immutable: bool = True 

194 

195 def unique_params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn: 

196 raise NotImplementedError("Immutable objects do not support copying") 

197 

198 def params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn: 

199 raise NotImplementedError("Immutable objects do not support copying") 

200 

201 def _clone(self: _Self, **kw: Any) -> _Self: 

202 return self 

203 

204 def _copy_internals( 

205 self, *, omit_attrs: Iterable[str] = (), **kw: Any 

206 ) -> None: 

207 pass 

208 

209 

210class SingletonConstant(Immutable): 

211 """Represent SQL constants like NULL, TRUE, FALSE""" 

212 

213 _is_singleton_constant: bool = True 

214 

215 _singleton: SingletonConstant 

216 

217 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T: 

218 return cast(_T, cls._singleton) 

219 

220 @util.non_memoized_property 

221 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]: 

222 raise NotImplementedError() 

223 

224 @classmethod 

225 def _create_singleton(cls) -> None: 

226 obj = object.__new__(cls) 

227 obj.__init__() # type: ignore 

228 

229 # for a long time this was an empty frozenset, meaning 

230 # a SingletonConstant would never be a "corresponding column" in 

231 # a statement. This referred to #6259. However, in #7154 we see 

232 # that we do in fact need "correspondence" to work when matching cols 

233 # in result sets, so the non-correspondence was moved to a more 

234 # specific level when we are actually adapting expressions for SQL 

235 # render only. 

236 obj.proxy_set = frozenset([obj]) 

237 cls._singleton = obj 

238 

239 

240def _from_objects( 

241 *elements: Union[ 

242 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement 

243 ] 

244) -> Iterator[FromClause]: 

245 return itertools.chain.from_iterable( 

246 [element._from_objects for element in elements] 

247 ) 

248 

249 

250def _select_iterables( 

251 elements: Iterable[roles.ColumnsClauseRole], 

252) -> _SelectIterable: 

253 """expand tables into individual columns in the 

254 given list of column expressions. 

255 

256 """ 

257 return itertools.chain.from_iterable( 

258 [c._select_iterable for c in elements] 

259 ) 

260 

261 

262_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType") 

263 

264 

265class _GenerativeType(compat_typing.Protocol): 

266 def _generate(self) -> Self: ... 

267 

268 

269def _generative(fn: _Fn) -> _Fn: 

270 """non-caching _generative() decorator. 

271 

272 This is basically the legacy decorator that copies the object and 

273 runs a method on the new copy. 

274 

275 """ 

276 

277 @util.decorator 

278 def _generative( 

279 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any 

280 ) -> _SelfGenerativeType: 

281 """Mark a method as generative.""" 

282 

283 self = self._generate() 

284 x = fn(self, *args, **kw) 

285 assert x is self, "generative methods must return self" 

286 return self 

287 

288 decorated = _generative(fn) 

289 decorated.non_generative = fn # type: ignore 

290 return decorated 

291 

292 

293def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]: 

294 msgs: Dict[str, str] = kw.pop("msgs", {}) 

295 

296 defaults: Dict[str, str] = kw.pop("defaults", {}) 

297 

298 getters: List[Tuple[str, operator.attrgetter[Any], Optional[str]]] = [ 

299 (name, operator.attrgetter(name), defaults.get(name, None)) 

300 for name in names 

301 ] 

302 

303 @util.decorator 

304 def check(fn: _Fn, *args: Any, **kw: Any) -> Any: 

305 # make pylance happy by not including "self" in the argument 

306 # list 

307 self = args[0] 

308 args = args[1:] 

309 for name, getter, default_ in getters: 

310 if getter(self) is not default_: 

311 msg = msgs.get( 

312 name, 

313 "Method %s() has already been invoked on this %s construct" 

314 % (fn.__name__, self.__class__), 

315 ) 

316 raise exc.InvalidRequestError(msg) 

317 return fn(self, *args, **kw) 

318 

319 return check 

320 

321 

322def _clone(element, **kw): 

323 return element._clone(**kw) 

324 

325 

326def _expand_cloned( 

327 elements: Iterable[_CLE], 

328) -> Iterable[_CLE]: 

329 """expand the given set of ClauseElements to be the set of all 'cloned' 

330 predecessors. 

331 

332 """ 

333 # TODO: cython candidate 

334 return itertools.chain(*[x._cloned_set for x in elements]) 

335 

336 

337def _de_clone( 

338 elements: Iterable[_CLE], 

339) -> Iterable[_CLE]: 

340 for x in elements: 

341 while x._is_clone_of is not None: 

342 x = x._is_clone_of 

343 yield x 

344 

345 

346def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

347 """return the intersection of sets a and b, counting 

348 any overlap between 'cloned' predecessors. 

349 

350 The returned set is in terms of the entities present within 'a'. 

351 

352 """ 

353 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection( 

354 _expand_cloned(b) 

355 ) 

356 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)} 

357 

358 

359def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

360 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection( 

361 _expand_cloned(b) 

362 ) 

363 return { 

364 elem for elem in a if not all_overlap.intersection(elem._cloned_set) 

365 } 

366 

367 

368class _DialectArgView(MutableMapping[str, Any]): 

369 """A dictionary view of dialect-level arguments in the form 

370 <dialectname>_<argument_name>. 

371 

372 """ 

373 

374 __slots__ = ("obj",) 

375 

376 def __init__(self, obj: DialectKWArgs) -> None: 

377 self.obj = obj 

378 

379 def _key(self, key: str) -> Tuple[str, str]: 

380 try: 

381 dialect, value_key = key.split("_", 1) 

382 except ValueError as err: 

383 raise KeyError(key) from err 

384 else: 

385 return dialect, value_key 

386 

387 def __getitem__(self, key: str) -> Any: 

388 dialect, value_key = self._key(key) 

389 

390 try: 

391 opt = self.obj.dialect_options[dialect] 

392 except exc.NoSuchModuleError as err: 

393 raise KeyError(key) from err 

394 else: 

395 return opt[value_key] 

396 

397 def __setitem__(self, key: str, value: Any) -> None: 

398 try: 

399 dialect, value_key = self._key(key) 

400 except KeyError as err: 

401 raise exc.ArgumentError( 

402 "Keys must be of the form <dialectname>_<argname>" 

403 ) from err 

404 else: 

405 self.obj.dialect_options[dialect][value_key] = value 

406 

407 def __delitem__(self, key: str) -> None: 

408 dialect, value_key = self._key(key) 

409 del self.obj.dialect_options[dialect][value_key] 

410 

411 def __len__(self) -> int: 

412 return sum( 

413 len(args._non_defaults) 

414 for args in self.obj.dialect_options.values() 

415 ) 

416 

417 def __iter__(self) -> Generator[str, None, None]: 

418 return ( 

419 "%s_%s" % (dialect_name, value_name) 

420 for dialect_name in self.obj.dialect_options 

421 for value_name in self.obj.dialect_options[ 

422 dialect_name 

423 ]._non_defaults 

424 ) 

425 

426 

427class _DialectArgDict(MutableMapping[str, Any]): 

428 """A dictionary view of dialect-level arguments for a specific 

429 dialect. 

430 

431 Maintains a separate collection of user-specified arguments 

432 and dialect-specified default arguments. 

433 

434 """ 

435 

436 def __init__(self) -> None: 

437 self._non_defaults: Dict[str, Any] = {} 

438 self._defaults: Dict[str, Any] = {} 

439 

440 def __len__(self) -> int: 

441 return len(set(self._non_defaults).union(self._defaults)) 

442 

443 def __iter__(self) -> Iterator[str]: 

444 return iter(set(self._non_defaults).union(self._defaults)) 

445 

446 def __getitem__(self, key: str) -> Any: 

447 if key in self._non_defaults: 

448 return self._non_defaults[key] 

449 else: 

450 return self._defaults[key] 

451 

452 def __setitem__(self, key: str, value: Any) -> None: 

453 self._non_defaults[key] = value 

454 

455 def __delitem__(self, key: str) -> None: 

456 del self._non_defaults[key] 

457 

458 

459@util.preload_module("sqlalchemy.dialects") 

460def _kw_reg_for_dialect(dialect_name: str) -> Optional[Dict[Any, Any]]: 

461 dialect_cls = util.preloaded.dialects.registry.load(dialect_name) 

462 if dialect_cls.construct_arguments is None: 

463 return None 

464 return dict(dialect_cls.construct_arguments) 

465 

466 

467class DialectKWArgs: 

468 """Establish the ability for a class to have dialect-specific arguments 

469 with defaults and constructor validation. 

470 

471 The :class:`.DialectKWArgs` interacts with the 

472 :attr:`.DefaultDialect.construct_arguments` present on a dialect. 

473 

474 .. seealso:: 

475 

476 :attr:`.DefaultDialect.construct_arguments` 

477 

478 """ 

479 

480 __slots__ = () 

481 

482 _dialect_kwargs_traverse_internals: List[Tuple[str, Any]] = [ 

483 ("dialect_options", InternalTraversal.dp_dialect_options) 

484 ] 

485 

486 @classmethod 

487 def argument_for( 

488 cls, dialect_name: str, argument_name: str, default: Any 

489 ) -> None: 

490 """Add a new kind of dialect-specific keyword argument for this class. 

491 

492 E.g.:: 

493 

494 Index.argument_for("mydialect", "length", None) 

495 

496 some_index = Index("a", "b", mydialect_length=5) 

497 

498 The :meth:`.DialectKWArgs.argument_for` method is a per-argument 

499 way adding extra arguments to the 

500 :attr:`.DefaultDialect.construct_arguments` dictionary. This 

501 dictionary provides a list of argument names accepted by various 

502 schema-level constructs on behalf of a dialect. 

503 

504 New dialects should typically specify this dictionary all at once as a 

505 data member of the dialect class. The use case for ad-hoc addition of 

506 argument names is typically for end-user code that is also using 

507 a custom compilation scheme which consumes the additional arguments. 

508 

509 :param dialect_name: name of a dialect. The dialect must be 

510 locatable, else a :class:`.NoSuchModuleError` is raised. The 

511 dialect must also include an existing 

512 :attr:`.DefaultDialect.construct_arguments` collection, indicating 

513 that it participates in the keyword-argument validation and default 

514 system, else :class:`.ArgumentError` is raised. If the dialect does 

515 not include this collection, then any keyword argument can be 

516 specified on behalf of this dialect already. All dialects packaged 

517 within SQLAlchemy include this collection, however for third party 

518 dialects, support may vary. 

519 

520 :param argument_name: name of the parameter. 

521 

522 :param default: default value of the parameter. 

523 

524 """ 

525 

526 construct_arg_dictionary: Optional[Dict[Any, Any]] = ( 

527 DialectKWArgs._kw_registry[dialect_name] 

528 ) 

529 if construct_arg_dictionary is None: 

530 raise exc.ArgumentError( 

531 "Dialect '%s' does have keyword-argument " 

532 "validation and defaults enabled configured" % dialect_name 

533 ) 

534 if cls not in construct_arg_dictionary: 

535 construct_arg_dictionary[cls] = {} 

536 construct_arg_dictionary[cls][argument_name] = default 

537 

538 @property 

539 def dialect_kwargs(self) -> _DialectArgView: 

540 """A collection of keyword arguments specified as dialect-specific 

541 options to this construct. 

542 

543 The arguments are present here in their original ``<dialect>_<kwarg>`` 

544 format. Only arguments that were actually passed are included; 

545 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which 

546 contains all options known by this dialect including defaults. 

547 

548 The collection is also writable; keys are accepted of the 

549 form ``<dialect>_<kwarg>`` where the value will be assembled 

550 into the list of options. 

551 

552 .. seealso:: 

553 

554 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form 

555 

556 """ 

557 return _DialectArgView(self) 

558 

559 @property 

560 def kwargs(self) -> _DialectArgView: 

561 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`.""" 

562 return self.dialect_kwargs 

563 

564 _kw_registry: util.PopulateDict[str, Optional[Dict[Any, Any]]] = ( 

565 util.PopulateDict(_kw_reg_for_dialect) 

566 ) 

567 

568 @classmethod 

569 def _kw_reg_for_dialect_cls(cls, dialect_name: str) -> _DialectArgDict: 

570 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name] 

571 d = _DialectArgDict() 

572 

573 if construct_arg_dictionary is None: 

574 d._defaults.update({"*": None}) 

575 else: 

576 for cls in reversed(cls.__mro__): 

577 if cls in construct_arg_dictionary: 

578 d._defaults.update(construct_arg_dictionary[cls]) 

579 return d 

580 

581 @util.memoized_property 

582 def dialect_options(self) -> util.PopulateDict[str, _DialectArgDict]: 

583 """A collection of keyword arguments specified as dialect-specific 

584 options to this construct. 

585 

586 This is a two-level nested registry, keyed to ``<dialect_name>`` 

587 and ``<argument_name>``. For example, the ``postgresql_where`` 

588 argument would be locatable as:: 

589 

590 arg = my_object.dialect_options["postgresql"]["where"] 

591 

592 .. versionadded:: 0.9.2 

593 

594 .. seealso:: 

595 

596 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form 

597 

598 """ 

599 

600 return util.PopulateDict(self._kw_reg_for_dialect_cls) 

601 

602 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None: 

603 # validate remaining kwargs that they all specify DB prefixes 

604 

605 if not kwargs: 

606 return 

607 

608 for k in kwargs: 

609 m = re.match("^(.+?)_(.+)$", k) 

610 if not m: 

611 raise TypeError( 

612 "Additional arguments should be " 

613 "named <dialectname>_<argument>, got '%s'" % k 

614 ) 

615 dialect_name, arg_name = m.group(1, 2) 

616 

617 try: 

618 construct_arg_dictionary = self.dialect_options[dialect_name] 

619 except exc.NoSuchModuleError: 

620 util.warn( 

621 "Can't validate argument %r; can't " 

622 "locate any SQLAlchemy dialect named %r" 

623 % (k, dialect_name) 

624 ) 

625 self.dialect_options[dialect_name] = d = _DialectArgDict() 

626 d._defaults.update({"*": None}) 

627 d._non_defaults[arg_name] = kwargs[k] 

628 else: 

629 if ( 

630 "*" not in construct_arg_dictionary 

631 and arg_name not in construct_arg_dictionary 

632 ): 

633 raise exc.ArgumentError( 

634 "Argument %r is not accepted by " 

635 "dialect %r on behalf of %r" 

636 % (k, dialect_name, self.__class__) 

637 ) 

638 else: 

639 construct_arg_dictionary[arg_name] = kwargs[k] 

640 

641 

642class CompileState: 

643 """Produces additional object state necessary for a statement to be 

644 compiled. 

645 

646 the :class:`.CompileState` class is at the base of classes that assemble 

647 state for a particular statement object that is then used by the 

648 compiler. This process is essentially an extension of the process that 

649 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis 

650 on converting raw user intent into more organized structures rather than 

651 producing string output. The top-level :class:`.CompileState` for the 

652 statement being executed is also accessible when the execution context 

653 works with invoking the statement and collecting results. 

654 

655 The production of :class:`.CompileState` is specific to the compiler, such 

656 as within the :meth:`.SQLCompiler.visit_insert`, 

657 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also 

658 responsible for associating the :class:`.CompileState` with the 

659 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement, 

660 i.e. the outermost SQL statement that's actually being executed. 

661 There can be other :class:`.CompileState` objects that are not the 

662 toplevel, such as when a SELECT subquery or CTE-nested 

663 INSERT/UPDATE/DELETE is generated. 

664 

665 .. versionadded:: 1.4 

666 

667 """ 

668 

669 __slots__ = ("statement", "_ambiguous_table_name_map") 

670 

671 plugins: Dict[Tuple[str, str], Type[CompileState]] = {} 

672 

673 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] 

674 

675 @classmethod 

676 def create_for_statement( 

677 cls, statement: Executable, compiler: SQLCompiler, **kw: Any 

678 ) -> CompileState: 

679 # factory construction. 

680 

681 if statement._propagate_attrs: 

682 plugin_name = statement._propagate_attrs.get( 

683 "compile_state_plugin", "default" 

684 ) 

685 klass = cls.plugins.get( 

686 (plugin_name, statement._effective_plugin_target), None 

687 ) 

688 if klass is None: 

689 klass = cls.plugins[ 

690 ("default", statement._effective_plugin_target) 

691 ] 

692 

693 else: 

694 klass = cls.plugins[ 

695 ("default", statement._effective_plugin_target) 

696 ] 

697 

698 if klass is cls: 

699 return cls(statement, compiler, **kw) 

700 else: 

701 return klass.create_for_statement(statement, compiler, **kw) 

702 

703 def __init__(self, statement, compiler, **kw): 

704 self.statement = statement 

705 

706 @classmethod 

707 def get_plugin_class( 

708 cls, statement: Executable 

709 ) -> Optional[Type[CompileState]]: 

710 plugin_name = statement._propagate_attrs.get( 

711 "compile_state_plugin", None 

712 ) 

713 

714 if plugin_name: 

715 key = (plugin_name, statement._effective_plugin_target) 

716 if key in cls.plugins: 

717 return cls.plugins[key] 

718 

719 # there's no case where we call upon get_plugin_class() and want 

720 # to get None back, there should always be a default. return that 

721 # if there was no plugin-specific class (e.g. Insert with "orm" 

722 # plugin) 

723 try: 

724 return cls.plugins[("default", statement._effective_plugin_target)] 

725 except KeyError: 

726 return None 

727 

728 @classmethod 

729 def _get_plugin_class_for_plugin( 

730 cls, statement: Executable, plugin_name: str 

731 ) -> Optional[Type[CompileState]]: 

732 try: 

733 return cls.plugins[ 

734 (plugin_name, statement._effective_plugin_target) 

735 ] 

736 except KeyError: 

737 return None 

738 

739 @classmethod 

740 def plugin_for( 

741 cls, plugin_name: str, visit_name: str 

742 ) -> Callable[[_Fn], _Fn]: 

743 def decorate(cls_to_decorate): 

744 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate 

745 return cls_to_decorate 

746 

747 return decorate 

748 

749 

750class Generative(HasMemoized): 

751 """Provide a method-chaining pattern in conjunction with the 

752 @_generative decorator.""" 

753 

754 def _generate(self) -> Self: 

755 skip = self._memoized_keys 

756 cls = self.__class__ 

757 s = cls.__new__(cls) 

758 if skip: 

759 # ensure this iteration remains atomic 

760 s.__dict__ = { 

761 k: v for k, v in self.__dict__.copy().items() if k not in skip 

762 } 

763 else: 

764 s.__dict__ = self.__dict__.copy() 

765 return s 

766 

767 

768class InPlaceGenerative(HasMemoized): 

769 """Provide a method-chaining pattern in conjunction with the 

770 @_generative decorator that mutates in place.""" 

771 

772 __slots__ = () 

773 

774 def _generate(self): 

775 skip = self._memoized_keys 

776 # note __dict__ needs to be in __slots__ if this is used 

777 for k in skip: 

778 self.__dict__.pop(k, None) 

779 return self 

780 

781 

782class HasCompileState(Generative): 

783 """A class that has a :class:`.CompileState` associated with it.""" 

784 

785 _compile_state_plugin: Optional[Type[CompileState]] = None 

786 

787 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT 

788 

789 _compile_state_factory = CompileState.create_for_statement 

790 

791 

792class _MetaOptions(type): 

793 """metaclass for the Options class. 

794 

795 This metaclass is actually necessary despite the availability of the 

796 ``__init_subclass__()`` hook as this type also provides custom class-level 

797 behavior for the ``__add__()`` method. 

798 

799 """ 

800 

801 _cache_attrs: Tuple[str, ...] 

802 

803 def __add__(self, other): 

804 o1 = self() 

805 

806 if set(other).difference(self._cache_attrs): 

807 raise TypeError( 

808 "dictionary contains attributes not covered by " 

809 "Options class %s: %r" 

810 % (self, set(other).difference(self._cache_attrs)) 

811 ) 

812 

813 o1.__dict__.update(other) 

814 return o1 

815 

816 if TYPE_CHECKING: 

817 

818 def __getattr__(self, key: str) -> Any: ... 

819 

820 def __setattr__(self, key: str, value: Any) -> None: ... 

821 

822 def __delattr__(self, key: str) -> None: ... 

823 

824 

825class Options(metaclass=_MetaOptions): 

826 """A cacheable option dictionary with defaults.""" 

827 

828 __slots__ = () 

829 

830 _cache_attrs: Tuple[str, ...] 

831 

832 def __init_subclass__(cls) -> None: 

833 dict_ = cls.__dict__ 

834 cls._cache_attrs = tuple( 

835 sorted( 

836 d 

837 for d in dict_ 

838 if not d.startswith("__") 

839 and d not in ("_cache_key_traversal",) 

840 ) 

841 ) 

842 super().__init_subclass__() 

843 

844 def __init__(self, **kw: Any) -> None: 

845 self.__dict__.update(kw) 

846 

847 def __add__(self, other): 

848 o1 = self.__class__.__new__(self.__class__) 

849 o1.__dict__.update(self.__dict__) 

850 

851 if set(other).difference(self._cache_attrs): 

852 raise TypeError( 

853 "dictionary contains attributes not covered by " 

854 "Options class %s: %r" 

855 % (self, set(other).difference(self._cache_attrs)) 

856 ) 

857 

858 o1.__dict__.update(other) 

859 return o1 

860 

861 def __eq__(self, other): 

862 # TODO: very inefficient. This is used only in test suites 

863 # right now. 

864 for a, b in zip_longest(self._cache_attrs, other._cache_attrs): 

865 if getattr(self, a) != getattr(other, b): 

866 return False 

867 return True 

868 

869 def __repr__(self) -> str: 

870 # TODO: fairly inefficient, used only in debugging right now. 

871 

872 return "%s(%s)" % ( 

873 self.__class__.__name__, 

874 ", ".join( 

875 "%s=%r" % (k, self.__dict__[k]) 

876 for k in self._cache_attrs 

877 if k in self.__dict__ 

878 ), 

879 ) 

880 

881 @classmethod 

882 def isinstance(cls, klass: Type[Any]) -> bool: 

883 return issubclass(cls, klass) 

884 

885 @hybridmethod 

886 def add_to_element(self, name: str, value: str) -> Any: 

887 return self + {name: getattr(self, name) + value} 

888 

889 @hybridmethod 

890 def _state_dict_inst(self) -> Mapping[str, Any]: 

891 return self.__dict__ 

892 

893 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT 

894 

895 @_state_dict_inst.classlevel 

896 def _state_dict(cls) -> Mapping[str, Any]: 

897 return cls._state_dict_const 

898 

899 @classmethod 

900 def safe_merge(cls, other: "Options") -> Any: 

901 d = other._state_dict() 

902 

903 # only support a merge with another object of our class 

904 # and which does not have attrs that we don't. otherwise 

905 # we risk having state that might not be part of our cache 

906 # key strategy 

907 

908 if ( 

909 cls is not other.__class__ 

910 and other._cache_attrs 

911 and set(other._cache_attrs).difference(cls._cache_attrs) 

912 ): 

913 raise TypeError( 

914 "other element %r is not empty, is not of type %s, " 

915 "and contains attributes not covered here %r" 

916 % ( 

917 other, 

918 cls, 

919 set(other._cache_attrs).difference(cls._cache_attrs), 

920 ) 

921 ) 

922 return cls + d 

923 

924 @classmethod 

925 def from_execution_options( 

926 cls, 

927 key: str, 

928 attrs: set[str], 

929 exec_options: Mapping[str, Any], 

930 statement_exec_options: Mapping[str, Any], 

931 ) -> Tuple["Options", Mapping[str, Any]]: 

932 """process Options argument in terms of execution options. 

933 

934 

935 e.g.:: 

936 

937 ( 

938 load_options, 

939 execution_options, 

940 ) = QueryContext.default_load_options.from_execution_options( 

941 "_sa_orm_load_options", 

942 {"populate_existing", "autoflush", "yield_per"}, 

943 execution_options, 

944 statement._execution_options, 

945 ) 

946 

947 get back the Options and refresh "_sa_orm_load_options" in the 

948 exec options dict w/ the Options as well 

949 

950 """ 

951 

952 # common case is that no options we are looking for are 

953 # in either dictionary, so cancel for that first 

954 check_argnames = attrs.intersection( 

955 set(exec_options).union(statement_exec_options) 

956 ) 

957 

958 existing_options = exec_options.get(key, cls) 

959 

960 if check_argnames: 

961 result = {} 

962 for argname in check_argnames: 

963 local = "_" + argname 

964 if argname in exec_options: 

965 result[local] = exec_options[argname] 

966 elif argname in statement_exec_options: 

967 result[local] = statement_exec_options[argname] 

968 

969 new_options = existing_options + result 

970 exec_options = util.immutabledict(exec_options).merge_with( 

971 {key: new_options} 

972 ) 

973 return new_options, exec_options 

974 

975 else: 

976 return existing_options, exec_options 

977 

978 if TYPE_CHECKING: 

979 

980 def __getattr__(self, key: str) -> Any: ... 

981 

982 def __setattr__(self, key: str, value: Any) -> None: ... 

983 

984 def __delattr__(self, key: str) -> None: ... 

985 

986 

987class CacheableOptions(Options, HasCacheKey): 

988 __slots__ = () 

989 

990 @hybridmethod 

991 def _gen_cache_key_inst( 

992 self, anon_map: Any, bindparams: List[BindParameter[Any]] 

993 ) -> Optional[Tuple[Any]]: 

994 return HasCacheKey._gen_cache_key(self, anon_map, bindparams) 

995 

996 @_gen_cache_key_inst.classlevel 

997 def _gen_cache_key( 

998 cls, anon_map: "anon_map", bindparams: List[BindParameter[Any]] 

999 ) -> Tuple[CacheableOptions, Any]: 

1000 return (cls, ()) 

1001 

1002 @hybridmethod 

1003 def _generate_cache_key(self) -> Optional[CacheKey]: 

1004 return HasCacheKey._generate_cache_key_for_object(self) 

1005 

1006 

1007class ExecutableOption(HasCopyInternals): 

1008 __slots__ = () 

1009 

1010 _annotations: _ImmutableExecuteOptions = util.EMPTY_DICT 

1011 

1012 __visit_name__: str = "executable_option" 

1013 

1014 _is_has_cache_key: bool = False 

1015 

1016 _is_core: bool = True 

1017 

1018 def _clone(self, **kw): 

1019 """Create a shallow copy of this ExecutableOption.""" 

1020 c = self.__class__.__new__(self.__class__) 

1021 c.__dict__ = dict(self.__dict__) # type: ignore 

1022 return c 

1023 

1024 

1025class Executable(roles.StatementRole): 

1026 """Mark a :class:`_expression.ClauseElement` as supporting execution. 

1027 

1028 :class:`.Executable` is a superclass for all "statement" types 

1029 of objects, including :func:`select`, :func:`delete`, :func:`update`, 

1030 :func:`insert`, :func:`text`. 

1031 

1032 """ 

1033 

1034 supports_execution: bool = True 

1035 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT 

1036 _is_default_generator: bool = False 

1037 _with_options: Tuple[ExecutableOption, ...] = () 

1038 _with_context_options: Tuple[ 

1039 Tuple[Callable[[CompileState], None], Any], ... 

1040 ] = () 

1041 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]] 

1042 

1043 _executable_traverse_internals = [ 

1044 ("_with_options", InternalTraversal.dp_executable_options), 

1045 ( 

1046 "_with_context_options", 

1047 ExtendedInternalTraversal.dp_with_context_options, 

1048 ), 

1049 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs), 

1050 ] 

1051 

1052 is_select: bool = False 

1053 is_from_statement: bool = False 

1054 is_update: bool = False 

1055 is_insert: bool = False 

1056 is_text: bool = False 

1057 is_delete: bool = False 

1058 is_dml: bool = False 

1059 

1060 if TYPE_CHECKING: 

1061 __visit_name__: str 

1062 

1063 def _compile_w_cache( 

1064 self, 

1065 dialect: Dialect, 

1066 *, 

1067 compiled_cache: Optional[CompiledCacheType], 

1068 column_keys: List[str], 

1069 for_executemany: bool = False, 

1070 schema_translate_map: Optional[SchemaTranslateMapType] = None, 

1071 **kw: Any, 

1072 ) -> Tuple[ 

1073 Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats 

1074 ]: ... 

1075 

1076 def _execute_on_connection( 

1077 self, 

1078 connection: Connection, 

1079 distilled_params: _CoreMultiExecuteParams, 

1080 execution_options: CoreExecuteOptionsParameter, 

1081 ) -> CursorResult[Any]: ... 

1082 

1083 def _execute_on_scalar( 

1084 self, 

1085 connection: Connection, 

1086 distilled_params: _CoreMultiExecuteParams, 

1087 execution_options: CoreExecuteOptionsParameter, 

1088 ) -> Any: ... 

1089 

1090 @util.ro_non_memoized_property 

1091 def _all_selected_columns(self) -> _SelectIterable: 

1092 raise NotImplementedError() 

1093 

1094 @property 

1095 def _effective_plugin_target(self) -> str: 

1096 return self.__visit_name__ 

1097 

1098 @_generative 

1099 def options(self, *options: ExecutableOption) -> Self: 

1100 """Apply options to this statement. 

1101 

1102 In the general sense, options are any kind of Python object 

1103 that can be interpreted by the SQL compiler for the statement. 

1104 These options can be consumed by specific dialects or specific kinds 

1105 of compilers. 

1106 

1107 The most commonly known kind of option are the ORM level options 

1108 that apply "eager load" and other loading behaviors to an ORM 

1109 query. However, options can theoretically be used for many other 

1110 purposes. 

1111 

1112 For background on specific kinds of options for specific kinds of 

1113 statements, refer to the documentation for those option objects. 

1114 

1115 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to 

1116 Core statement objects towards the goal of allowing unified 

1117 Core / ORM querying capabilities. 

1118 

1119 .. seealso:: 

1120 

1121 :ref:`loading_columns` - refers to options specific to the usage 

1122 of ORM queries 

1123 

1124 :ref:`relationship_loader_options` - refers to options specific 

1125 to the usage of ORM queries 

1126 

1127 """ 

1128 self._with_options += tuple( 

1129 coercions.expect(roles.ExecutableOptionRole, opt) 

1130 for opt in options 

1131 ) 

1132 return self 

1133 

1134 @_generative 

1135 def _set_compile_options(self, compile_options: CacheableOptions) -> Self: 

1136 """Assign the compile options to a new value. 

1137 

1138 :param compile_options: appropriate CacheableOptions structure 

1139 

1140 """ 

1141 

1142 self._compile_options = compile_options 

1143 return self 

1144 

1145 @_generative 

1146 def _update_compile_options(self, options: CacheableOptions) -> Self: 

1147 """update the _compile_options with new keys.""" 

1148 

1149 assert self._compile_options is not None 

1150 self._compile_options += options 

1151 return self 

1152 

1153 @_generative 

1154 def _add_context_option( 

1155 self, 

1156 callable_: Callable[[CompileState], None], 

1157 cache_args: Any, 

1158 ) -> Self: 

1159 """Add a context option to this statement. 

1160 

1161 These are callable functions that will 

1162 be given the CompileState object upon compilation. 

1163 

1164 A second argument cache_args is required, which will be combined with 

1165 the ``__code__`` identity of the function itself in order to produce a 

1166 cache key. 

1167 

1168 """ 

1169 self._with_context_options += ((callable_, cache_args),) 

1170 return self 

1171 

1172 @overload 

1173 def execution_options( 

1174 self, 

1175 *, 

1176 compiled_cache: Optional[CompiledCacheType] = ..., 

1177 logging_token: str = ..., 

1178 isolation_level: IsolationLevel = ..., 

1179 no_parameters: bool = False, 

1180 stream_results: bool = False, 

1181 max_row_buffer: int = ..., 

1182 yield_per: int = ..., 

1183 insertmanyvalues_page_size: int = ..., 

1184 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

1185 populate_existing: bool = False, 

1186 autoflush: bool = False, 

1187 synchronize_session: SynchronizeSessionArgument = ..., 

1188 dml_strategy: DMLStrategyArgument = ..., 

1189 render_nulls: bool = ..., 

1190 is_delete_using: bool = ..., 

1191 is_update_from: bool = ..., 

1192 preserve_rowcount: bool = False, 

1193 **opt: Any, 

1194 ) -> Self: ... 

1195 

1196 @overload 

1197 def execution_options(self, **opt: Any) -> Self: ... 

1198 

1199 @_generative 

1200 def execution_options(self, **kw: Any) -> Self: 

1201 """Set non-SQL options for the statement which take effect during 

1202 execution. 

1203 

1204 Execution options can be set at many scopes, including per-statement, 

1205 per-connection, or per execution, using methods such as 

1206 :meth:`_engine.Connection.execution_options` and parameters which 

1207 accept a dictionary of options such as 

1208 :paramref:`_engine.Connection.execute.execution_options` and 

1209 :paramref:`_orm.Session.execute.execution_options`. 

1210 

1211 The primary characteristic of an execution option, as opposed to 

1212 other kinds of options such as ORM loader options, is that 

1213 **execution options never affect the compiled SQL of a query, only 

1214 things that affect how the SQL statement itself is invoked or how 

1215 results are fetched**. That is, execution options are not part of 

1216 what's accommodated by SQL compilation nor are they considered part of 

1217 the cached state of a statement. 

1218 

1219 The :meth:`_sql.Executable.execution_options` method is 

1220 :term:`generative`, as 

1221 is the case for the method as applied to the :class:`_engine.Engine` 

1222 and :class:`_orm.Query` objects, which means when the method is called, 

1223 a copy of the object is returned, which applies the given parameters to 

1224 that new copy, but leaves the original unchanged:: 

1225 

1226 statement = select(table.c.x, table.c.y) 

1227 new_statement = statement.execution_options(my_option=True) 

1228 

1229 An exception to this behavior is the :class:`_engine.Connection` 

1230 object, where the :meth:`_engine.Connection.execution_options` method 

1231 is explicitly **not** generative. 

1232 

1233 The kinds of options that may be passed to 

1234 :meth:`_sql.Executable.execution_options` and other related methods and 

1235 parameter dictionaries include parameters that are explicitly consumed 

1236 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not 

1237 defined by SQLAlchemy, which means the methods and/or parameter 

1238 dictionaries may be used for user-defined parameters that interact with 

1239 custom code, which may access the parameters using methods such as 

1240 :meth:`_sql.Executable.get_execution_options` and 

1241 :meth:`_engine.Connection.get_execution_options`, or within selected 

1242 event hooks using a dedicated ``execution_options`` event parameter 

1243 such as 

1244 :paramref:`_events.ConnectionEvents.before_execute.execution_options` 

1245 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.:: 

1246 

1247 from sqlalchemy import event 

1248 

1249 

1250 @event.listens_for(some_engine, "before_execute") 

1251 def _process_opt(conn, statement, multiparams, params, execution_options): 

1252 "run a SQL function before invoking a statement" 

1253 

1254 if execution_options.get("do_special_thing", False): 

1255 conn.exec_driver_sql("run_special_function()") 

1256 

1257 Within the scope of options that are explicitly recognized by 

1258 SQLAlchemy, most apply to specific classes of objects and not others. 

1259 The most common execution options include: 

1260 

1261 * :paramref:`_engine.Connection.execution_options.isolation_level` - 

1262 sets the isolation level for a connection or a class of connections 

1263 via an :class:`_engine.Engine`. This option is accepted only 

1264 by :class:`_engine.Connection` or :class:`_engine.Engine`. 

1265 

1266 * :paramref:`_engine.Connection.execution_options.stream_results` - 

1267 indicates results should be fetched using a server side cursor; 

1268 this option is accepted by :class:`_engine.Connection`, by the 

1269 :paramref:`_engine.Connection.execute.execution_options` parameter 

1270 on :meth:`_engine.Connection.execute`, and additionally by 

1271 :meth:`_sql.Executable.execution_options` on a SQL statement object, 

1272 as well as by ORM constructs like :meth:`_orm.Session.execute`. 

1273 

1274 * :paramref:`_engine.Connection.execution_options.compiled_cache` - 

1275 indicates a dictionary that will serve as the 

1276 :ref:`SQL compilation cache <sql_caching>` 

1277 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as 

1278 well as for ORM methods like :meth:`_orm.Session.execute`. 

1279 Can be passed as ``None`` to disable caching for statements. 

1280 This option is not accepted by 

1281 :meth:`_sql.Executable.execution_options` as it is inadvisable to 

1282 carry along a compilation cache within a statement object. 

1283 

1284 * :paramref:`_engine.Connection.execution_options.schema_translate_map` 

1285 - a mapping of schema names used by the 

1286 :ref:`Schema Translate Map <schema_translating>` feature, accepted 

1287 by :class:`_engine.Connection`, :class:`_engine.Engine`, 

1288 :class:`_sql.Executable`, as well as by ORM constructs 

1289 like :meth:`_orm.Session.execute`. 

1290 

1291 .. seealso:: 

1292 

1293 :meth:`_engine.Connection.execution_options` 

1294 

1295 :paramref:`_engine.Connection.execute.execution_options` 

1296 

1297 :paramref:`_orm.Session.execute.execution_options` 

1298 

1299 :ref:`orm_queryguide_execution_options` - documentation on all 

1300 ORM-specific execution options 

1301 

1302 """ # noqa: E501 

1303 if "isolation_level" in kw: 

1304 raise exc.ArgumentError( 

1305 "'isolation_level' execution option may only be specified " 

1306 "on Connection.execution_options(), or " 

1307 "per-engine using the isolation_level " 

1308 "argument to create_engine()." 

1309 ) 

1310 if "compiled_cache" in kw: 

1311 raise exc.ArgumentError( 

1312 "'compiled_cache' execution option may only be specified " 

1313 "on Connection.execution_options(), not per statement." 

1314 ) 

1315 self._execution_options = self._execution_options.union(kw) 

1316 return self 

1317 

1318 def get_execution_options(self) -> _ExecuteOptions: 

1319 """Get the non-SQL options which will take effect during execution. 

1320 

1321 .. versionadded:: 1.3 

1322 

1323 .. seealso:: 

1324 

1325 :meth:`.Executable.execution_options` 

1326 """ 

1327 return self._execution_options 

1328 

1329 

1330class SchemaEventTarget(event.EventTarget): 

1331 """Base class for elements that are the targets of :class:`.DDLEvents` 

1332 events. 

1333 

1334 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`. 

1335 

1336 """ 

1337 

1338 dispatch: dispatcher[SchemaEventTarget] 

1339 

1340 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: 

1341 """Associate with this SchemaEvent's parent object.""" 

1342 

1343 def _set_parent_with_dispatch( 

1344 self, parent: SchemaEventTarget, **kw: Any 

1345 ) -> None: 

1346 self.dispatch.before_parent_attach(self, parent) 

1347 self._set_parent(parent, **kw) 

1348 self.dispatch.after_parent_attach(self, parent) 

1349 

1350 

1351class SchemaVisitable(SchemaEventTarget, visitors.Visitable): 

1352 """Base class for elements that are targets of a :class:`.SchemaVisitor`. 

1353 

1354 .. versionadded:: 2.0.41 

1355 

1356 """ 

1357 

1358 

1359class SchemaVisitor(ClauseVisitor): 

1360 """Define the visiting for ``SchemaItem`` and more 

1361 generally ``SchemaVisitable`` objects. 

1362 

1363 """ 

1364 

1365 __traverse_options__: Dict[str, Any] = {"schema_visitor": True} 

1366 

1367 

1368class _SentinelDefaultCharacterization(Enum): 

1369 NONE = "none" 

1370 UNKNOWN = "unknown" 

1371 CLIENTSIDE = "clientside" 

1372 SENTINEL_DEFAULT = "sentinel_default" 

1373 SERVERSIDE = "serverside" 

1374 IDENTITY = "identity" 

1375 SEQUENCE = "sequence" 

1376 

1377 

1378class _SentinelColumnCharacterization(NamedTuple): 

1379 columns: Optional[Sequence[Column[Any]]] = None 

1380 is_explicit: bool = False 

1381 is_autoinc: bool = False 

1382 default_characterization: _SentinelDefaultCharacterization = ( 

1383 _SentinelDefaultCharacterization.NONE 

1384 ) 

1385 

1386 

1387_COLKEY = TypeVar("_COLKEY", Union[None, str], str) 

1388 

1389_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True) 

1390_COL = TypeVar("_COL", bound="ColumnElement[Any]") 

1391 

1392 

1393class _ColumnMetrics(Generic[_COL_co]): 

1394 __slots__ = ("column",) 

1395 

1396 column: _COL_co 

1397 

1398 def __init__( 

1399 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co 

1400 ) -> None: 

1401 self.column = col 

1402 

1403 # proxy_index being non-empty means it was initialized. 

1404 # so we need to update it 

1405 pi = collection._proxy_index 

1406 if pi: 

1407 for eps_col in col._expanded_proxy_set: 

1408 pi[eps_col].add(self) 

1409 

1410 def get_expanded_proxy_set(self) -> FrozenSet[ColumnElement[Any]]: 

1411 return self.column._expanded_proxy_set 

1412 

1413 def dispose(self, collection: ColumnCollection[_COLKEY, _COL_co]) -> None: 

1414 pi = collection._proxy_index 

1415 if not pi: 

1416 return 

1417 for col in self.column._expanded_proxy_set: 

1418 colset = pi.get(col, None) 

1419 if colset: 

1420 colset.discard(self) 

1421 if colset is not None and not colset: 

1422 del pi[col] 

1423 

1424 def embedded( 

1425 self, 

1426 target_set: Union[ 

1427 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]] 

1428 ], 

1429 ) -> bool: 

1430 expanded_proxy_set = self.column._expanded_proxy_set 

1431 for t in target_set.difference(expanded_proxy_set): 

1432 if not expanded_proxy_set.intersection(_expand_cloned([t])): 

1433 return False 

1434 return True 

1435 

1436 

1437class ColumnCollection(Generic[_COLKEY, _COL_co]): 

1438 """Collection of :class:`_expression.ColumnElement` instances, 

1439 typically for 

1440 :class:`_sql.FromClause` objects. 

1441 

1442 The :class:`_sql.ColumnCollection` object is most commonly available 

1443 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection 

1444 on the :class:`_schema.Table` object, introduced at 

1445 :ref:`metadata_tables_and_columns`. 

1446 

1447 The :class:`_expression.ColumnCollection` has both mapping- and sequence- 

1448 like behaviors. A :class:`_expression.ColumnCollection` usually stores 

1449 :class:`_schema.Column` objects, which are then accessible both via mapping 

1450 style access as well as attribute access style. 

1451 

1452 To access :class:`_schema.Column` objects using ordinary attribute-style 

1453 access, specify the name like any other object attribute, such as below 

1454 a column named ``employee_name`` is accessed:: 

1455 

1456 >>> employee_table.c.employee_name 

1457 

1458 To access columns that have names with special characters or spaces, 

1459 index-style access is used, such as below which illustrates a column named 

1460 ``employee ' payment`` is accessed:: 

1461 

1462 >>> employee_table.c["employee ' payment"] 

1463 

1464 As the :class:`_sql.ColumnCollection` object provides a Python dictionary 

1465 interface, common dictionary method names like 

1466 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`, 

1467 and :meth:`_sql.ColumnCollection.items` are available, which means that 

1468 database columns that are keyed under these names also need to use indexed 

1469 access:: 

1470 

1471 >>> employee_table.c["values"] 

1472 

1473 

1474 The name for which a :class:`_schema.Column` would be present is normally 

1475 that of the :paramref:`_schema.Column.key` parameter. In some contexts, 

1476 such as a :class:`_sql.Select` object that uses a label style set 

1477 using the :meth:`_sql.Select.set_label_style` method, a column of a certain 

1478 key may instead be represented under a particular label name such 

1479 as ``tablename_columnname``:: 

1480 

1481 >>> from sqlalchemy import select, column, table 

1482 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL 

1483 >>> t = table("t", column("c")) 

1484 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL) 

1485 >>> subq = stmt.subquery() 

1486 >>> subq.c.t_c 

1487 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c> 

1488 

1489 :class:`.ColumnCollection` also indexes the columns in order and allows 

1490 them to be accessible by their integer position:: 

1491 

1492 >>> cc[0] 

1493 Column('x', Integer(), table=None) 

1494 >>> cc[1] 

1495 Column('y', Integer(), table=None) 

1496 

1497 .. versionadded:: 1.4 :class:`_expression.ColumnCollection` 

1498 allows integer-based 

1499 index access to the collection. 

1500 

1501 Iterating the collection yields the column expressions in order:: 

1502 

1503 >>> list(cc) 

1504 [Column('x', Integer(), table=None), 

1505 Column('y', Integer(), table=None)] 

1506 

1507 The base :class:`_expression.ColumnCollection` object can store 

1508 duplicates, which can 

1509 mean either two columns with the same key, in which case the column 

1510 returned by key access is **arbitrary**:: 

1511 

1512 >>> x1, x2 = Column("x", Integer), Column("x", Integer) 

1513 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)]) 

1514 >>> list(cc) 

1515 [Column('x', Integer(), table=None), 

1516 Column('x', Integer(), table=None)] 

1517 >>> cc["x"] is x1 

1518 False 

1519 >>> cc["x"] is x2 

1520 True 

1521 

1522 Or it can also mean the same column multiple times. These cases are 

1523 supported as :class:`_expression.ColumnCollection` 

1524 is used to represent the columns in 

1525 a SELECT statement which may include duplicates. 

1526 

1527 A special subclass :class:`.DedupeColumnCollection` exists which instead 

1528 maintains SQLAlchemy's older behavior of not allowing duplicates; this 

1529 collection is used for schema level objects like :class:`_schema.Table` 

1530 and 

1531 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The 

1532 :class:`.DedupeColumnCollection` class also has additional mutation methods 

1533 as the schema constructs have more use cases that require removal and 

1534 replacement of columns. 

1535 

1536 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection` 

1537 now stores duplicate 

1538 column keys as well as the same column in multiple positions. The 

1539 :class:`.DedupeColumnCollection` class is added to maintain the 

1540 former behavior in those cases where deduplication as well as 

1541 additional replace/remove operations are needed. 

1542 

1543 

1544 """ 

1545 

1546 __slots__ = ("_collection", "_index", "_colset", "_proxy_index") 

1547 

1548 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]] 

1549 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]] 

1550 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]] 

1551 _colset: Set[_COL_co] 

1552 

1553 def __init__( 

1554 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None 

1555 ): 

1556 object.__setattr__(self, "_colset", set()) 

1557 object.__setattr__(self, "_index", {}) 

1558 object.__setattr__( 

1559 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

1560 ) 

1561 object.__setattr__(self, "_collection", []) 

1562 if columns: 

1563 self._initial_populate(columns) 

1564 

1565 @util.preload_module("sqlalchemy.sql.elements") 

1566 def __clause_element__(self) -> ClauseList: 

1567 elements = util.preloaded.sql_elements 

1568 

1569 return elements.ClauseList( 

1570 _literal_as_text_role=roles.ColumnsClauseRole, 

1571 group=False, 

1572 *self._all_columns, 

1573 ) 

1574 

1575 def _initial_populate( 

1576 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

1577 ) -> None: 

1578 self._populate_separate_keys(iter_) 

1579 

1580 @property 

1581 def _all_columns(self) -> List[_COL_co]: 

1582 return [col for (_, col, _) in self._collection] 

1583 

1584 def keys(self) -> List[_COLKEY]: 

1585 """Return a sequence of string key names for all columns in this 

1586 collection.""" 

1587 return [k for (k, _, _) in self._collection] 

1588 

1589 def values(self) -> List[_COL_co]: 

1590 """Return a sequence of :class:`_sql.ColumnClause` or 

1591 :class:`_schema.Column` objects for all columns in this 

1592 collection.""" 

1593 return [col for (_, col, _) in self._collection] 

1594 

1595 def items(self) -> List[Tuple[_COLKEY, _COL_co]]: 

1596 """Return a sequence of (key, column) tuples for all columns in this 

1597 collection each consisting of a string key name and a 

1598 :class:`_sql.ColumnClause` or 

1599 :class:`_schema.Column` object. 

1600 """ 

1601 

1602 return [(k, col) for (k, col, _) in self._collection] 

1603 

1604 def __bool__(self) -> bool: 

1605 return bool(self._collection) 

1606 

1607 def __len__(self) -> int: 

1608 return len(self._collection) 

1609 

1610 def __iter__(self) -> Iterator[_COL_co]: 

1611 # turn to a list first to maintain over a course of changes 

1612 return iter([col for _, col, _ in self._collection]) 

1613 

1614 @overload 

1615 def __getitem__(self, key: Union[str, int]) -> _COL_co: ... 

1616 

1617 @overload 

1618 def __getitem__( 

1619 self, key: Tuple[Union[str, int], ...] 

1620 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1621 

1622 @overload 

1623 def __getitem__( 

1624 self, key: slice 

1625 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1626 

1627 def __getitem__( 

1628 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]] 

1629 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]: 

1630 try: 

1631 if isinstance(key, (tuple, slice)): 

1632 if isinstance(key, slice): 

1633 cols = ( 

1634 (sub_key, col) 

1635 for (sub_key, col, _) in self._collection[key] 

1636 ) 

1637 else: 

1638 cols = (self._index[sub_key] for sub_key in key) 

1639 

1640 return ColumnCollection(cols).as_readonly() 

1641 else: 

1642 return self._index[key][1] 

1643 except KeyError as err: 

1644 if isinstance(err.args[0], int): 

1645 raise IndexError(err.args[0]) from err 

1646 else: 

1647 raise 

1648 

1649 def __getattr__(self, key: str) -> _COL_co: 

1650 try: 

1651 return self._index[key][1] 

1652 except KeyError as err: 

1653 raise AttributeError(key) from err 

1654 

1655 def __contains__(self, key: str) -> bool: 

1656 if key not in self._index: 

1657 if not isinstance(key, str): 

1658 raise exc.ArgumentError( 

1659 "__contains__ requires a string argument" 

1660 ) 

1661 return False 

1662 else: 

1663 return True 

1664 

1665 def compare(self, other: ColumnCollection[_COLKEY, _COL_co]) -> bool: 

1666 """Compare this :class:`_expression.ColumnCollection` to another 

1667 based on the names of the keys""" 

1668 

1669 for l, r in zip_longest(self, other): 

1670 if l is not r: 

1671 return False 

1672 else: 

1673 return True 

1674 

1675 def __eq__(self, other: Any) -> bool: 

1676 return self.compare(other) 

1677 

1678 @overload 

1679 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ... 

1680 

1681 @overload 

1682 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ... 

1683 

1684 def get( 

1685 self, key: str, default: Optional[_COL] = None 

1686 ) -> Optional[Union[_COL_co, _COL]]: 

1687 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object 

1688 based on a string key name from this 

1689 :class:`_expression.ColumnCollection`.""" 

1690 

1691 if key in self._index: 

1692 return self._index[key][1] 

1693 else: 

1694 return default 

1695 

1696 def __str__(self) -> str: 

1697 return "%s(%s)" % ( 

1698 self.__class__.__name__, 

1699 ", ".join(str(c) for c in self), 

1700 ) 

1701 

1702 def __setitem__(self, key: str, value: Any) -> NoReturn: 

1703 raise NotImplementedError() 

1704 

1705 def __delitem__(self, key: str) -> NoReturn: 

1706 raise NotImplementedError() 

1707 

1708 def __setattr__(self, key: str, obj: Any) -> NoReturn: 

1709 raise NotImplementedError() 

1710 

1711 def clear(self) -> NoReturn: 

1712 """Dictionary clear() is not implemented for 

1713 :class:`_sql.ColumnCollection`.""" 

1714 raise NotImplementedError() 

1715 

1716 def remove(self, column: Any) -> NoReturn: 

1717 raise NotImplementedError() 

1718 

1719 def update(self, iter_: Any) -> NoReturn: 

1720 """Dictionary update() is not implemented for 

1721 :class:`_sql.ColumnCollection`.""" 

1722 raise NotImplementedError() 

1723 

1724 # https://github.com/python/mypy/issues/4266 

1725 __hash__: Optional[int] = None # type: ignore 

1726 

1727 def _populate_separate_keys( 

1728 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

1729 ) -> None: 

1730 """populate from an iterator of (key, column)""" 

1731 

1732 self._collection[:] = collection = [ 

1733 (k, c, _ColumnMetrics(self, c)) for k, c in iter_ 

1734 ] 

1735 self._colset.update(c._deannotate() for _, c, _ in collection) 

1736 self._index.update( 

1737 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)} 

1738 ) 

1739 self._index.update({k: (k, col) for k, col, _ in reversed(collection)}) 

1740 

1741 def add( 

1742 self, column: ColumnElement[Any], key: Optional[_COLKEY] = None 

1743 ) -> None: 

1744 """Add a column to this :class:`_sql.ColumnCollection`. 

1745 

1746 .. note:: 

1747 

1748 This method is **not normally used by user-facing code**, as the 

1749 :class:`_sql.ColumnCollection` is usually part of an existing 

1750 object such as a :class:`_schema.Table`. To add a 

1751 :class:`_schema.Column` to an existing :class:`_schema.Table` 

1752 object, use the :meth:`_schema.Table.append_column` method. 

1753 

1754 """ 

1755 colkey: _COLKEY 

1756 

1757 if key is None: 

1758 colkey = column.key # type: ignore 

1759 else: 

1760 colkey = key 

1761 

1762 l = len(self._collection) 

1763 

1764 # don't really know how this part is supposed to work w/ the 

1765 # covariant thing 

1766 

1767 _column = cast(_COL_co, column) 

1768 

1769 self._collection.append( 

1770 (colkey, _column, _ColumnMetrics(self, _column)) 

1771 ) 

1772 self._colset.add(_column._deannotate()) 

1773 self._index[l] = (colkey, _column) 

1774 if colkey not in self._index: 

1775 self._index[colkey] = (colkey, _column) 

1776 

1777 def __getstate__(self) -> Dict[str, Any]: 

1778 return { 

1779 "_collection": [(k, c) for k, c, _ in self._collection], 

1780 "_index": self._index, 

1781 } 

1782 

1783 def __setstate__(self, state: Dict[str, Any]) -> None: 

1784 object.__setattr__(self, "_index", state["_index"]) 

1785 object.__setattr__( 

1786 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

1787 ) 

1788 object.__setattr__( 

1789 self, 

1790 "_collection", 

1791 [ 

1792 (k, c, _ColumnMetrics(self, c)) 

1793 for (k, c) in state["_collection"] 

1794 ], 

1795 ) 

1796 object.__setattr__( 

1797 self, "_colset", {col for k, col, _ in self._collection} 

1798 ) 

1799 

1800 def contains_column(self, col: ColumnElement[Any]) -> bool: 

1801 """Checks if a column object exists in this collection""" 

1802 if col not in self._colset: 

1803 if isinstance(col, str): 

1804 raise exc.ArgumentError( 

1805 "contains_column cannot be used with string arguments. " 

1806 "Use ``col_name in table.c`` instead." 

1807 ) 

1808 return False 

1809 else: 

1810 return True 

1811 

1812 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: 

1813 """Return a "read only" form of this 

1814 :class:`_sql.ColumnCollection`.""" 

1815 

1816 return ReadOnlyColumnCollection(self) 

1817 

1818 def _init_proxy_index(self) -> None: 

1819 """populate the "proxy index", if empty. 

1820 

1821 proxy index is added in 2.0 to provide more efficient operation 

1822 for the corresponding_column() method. 

1823 

1824 For reasons of both time to construct new .c collections as well as 

1825 memory conservation for large numbers of large .c collections, the 

1826 proxy_index is only filled if corresponding_column() is called. once 

1827 filled it stays that way, and new _ColumnMetrics objects created after 

1828 that point will populate it with new data. Note this case would be 

1829 unusual, if not nonexistent, as it means a .c collection is being 

1830 mutated after corresponding_column() were used, however it is tested in 

1831 test/base/test_utils.py. 

1832 

1833 """ 

1834 pi = self._proxy_index 

1835 if pi: 

1836 return 

1837 

1838 for _, _, metrics in self._collection: 

1839 eps = metrics.column._expanded_proxy_set 

1840 

1841 for eps_col in eps: 

1842 pi[eps_col].add(metrics) 

1843 

1844 def corresponding_column( 

1845 self, column: _COL, require_embedded: bool = False 

1846 ) -> Optional[Union[_COL, _COL_co]]: 

1847 """Given a :class:`_expression.ColumnElement`, return the exported 

1848 :class:`_expression.ColumnElement` object from this 

1849 :class:`_expression.ColumnCollection` 

1850 which corresponds to that original :class:`_expression.ColumnElement` 

1851 via a common 

1852 ancestor column. 

1853 

1854 :param column: the target :class:`_expression.ColumnElement` 

1855 to be matched. 

1856 

1857 :param require_embedded: only return corresponding columns for 

1858 the given :class:`_expression.ColumnElement`, if the given 

1859 :class:`_expression.ColumnElement` 

1860 is actually present within a sub-element 

1861 of this :class:`_expression.Selectable`. 

1862 Normally the column will match if 

1863 it merely shares a common ancestor with one of the exported 

1864 columns of this :class:`_expression.Selectable`. 

1865 

1866 .. seealso:: 

1867 

1868 :meth:`_expression.Selectable.corresponding_column` 

1869 - invokes this method 

1870 against the collection returned by 

1871 :attr:`_expression.Selectable.exported_columns`. 

1872 

1873 .. versionchanged:: 1.4 the implementation for ``corresponding_column`` 

1874 was moved onto the :class:`_expression.ColumnCollection` itself. 

1875 

1876 """ 

1877 # TODO: cython candidate 

1878 

1879 # don't dig around if the column is locally present 

1880 if column in self._colset: 

1881 return column 

1882 

1883 selected_intersection, selected_metrics = None, None 

1884 target_set = column.proxy_set 

1885 

1886 pi = self._proxy_index 

1887 if not pi: 

1888 self._init_proxy_index() 

1889 

1890 for current_metrics in ( 

1891 mm for ts in target_set if ts in pi for mm in pi[ts] 

1892 ): 

1893 if not require_embedded or current_metrics.embedded(target_set): 

1894 if selected_metrics is None: 

1895 # no corresponding column yet, pick this one. 

1896 selected_metrics = current_metrics 

1897 continue 

1898 

1899 current_intersection = target_set.intersection( 

1900 current_metrics.column._expanded_proxy_set 

1901 ) 

1902 if selected_intersection is None: 

1903 selected_intersection = target_set.intersection( 

1904 selected_metrics.column._expanded_proxy_set 

1905 ) 

1906 

1907 if len(current_intersection) > len(selected_intersection): 

1908 # 'current' has a larger field of correspondence than 

1909 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x 

1910 # matches a1.c.x->table.c.x better than 

1911 # selectable.c.x->table.c.x does. 

1912 

1913 selected_metrics = current_metrics 

1914 selected_intersection = current_intersection 

1915 elif current_intersection == selected_intersection: 

1916 # they have the same field of correspondence. see 

1917 # which proxy_set has fewer columns in it, which 

1918 # indicates a closer relationship with the root 

1919 # column. Also take into account the "weight" 

1920 # attribute which CompoundSelect() uses to give 

1921 # higher precedence to columns based on vertical 

1922 # position in the compound statement, and discard 

1923 # columns that have no reference to the target 

1924 # column (also occurs with CompoundSelect) 

1925 

1926 selected_col_distance = sum( 

1927 [ 

1928 sc._annotations.get("weight", 1) 

1929 for sc in ( 

1930 selected_metrics.column._uncached_proxy_list() 

1931 ) 

1932 if sc.shares_lineage(column) 

1933 ], 

1934 ) 

1935 current_col_distance = sum( 

1936 [ 

1937 sc._annotations.get("weight", 1) 

1938 for sc in ( 

1939 current_metrics.column._uncached_proxy_list() 

1940 ) 

1941 if sc.shares_lineage(column) 

1942 ], 

1943 ) 

1944 if current_col_distance < selected_col_distance: 

1945 selected_metrics = current_metrics 

1946 selected_intersection = current_intersection 

1947 

1948 return selected_metrics.column if selected_metrics else None 

1949 

1950 

1951_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]") 

1952 

1953 

1954class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]): 

1955 """A :class:`_expression.ColumnCollection` 

1956 that maintains deduplicating behavior. 

1957 

1958 This is useful by schema level objects such as :class:`_schema.Table` and 

1959 :class:`.PrimaryKeyConstraint`. The collection includes more 

1960 sophisticated mutator methods as well to suit schema objects which 

1961 require mutable column collections. 

1962 

1963 .. versionadded:: 1.4 

1964 

1965 """ 

1966 

1967 def add( # type: ignore[override] 

1968 self, column: _NAMEDCOL, key: Optional[str] = None 

1969 ) -> None: 

1970 if key is not None and column.key != key: 

1971 raise exc.ArgumentError( 

1972 "DedupeColumnCollection requires columns be under " 

1973 "the same key as their .key" 

1974 ) 

1975 key = column.key 

1976 

1977 if key is None: 

1978 raise exc.ArgumentError( 

1979 "Can't add unnamed column to column collection" 

1980 ) 

1981 

1982 if key in self._index: 

1983 existing = self._index[key][1] 

1984 

1985 if existing is column: 

1986 return 

1987 

1988 self.replace(column) 

1989 

1990 # pop out memoized proxy_set as this 

1991 # operation may very well be occurring 

1992 # in a _make_proxy operation 

1993 util.memoized_property.reset(column, "proxy_set") 

1994 else: 

1995 self._append_new_column(key, column) 

1996 

1997 def _append_new_column(self, key: str, named_column: _NAMEDCOL) -> None: 

1998 l = len(self._collection) 

1999 self._collection.append( 

2000 (key, named_column, _ColumnMetrics(self, named_column)) 

2001 ) 

2002 self._colset.add(named_column._deannotate()) 

2003 self._index[l] = (key, named_column) 

2004 self._index[key] = (key, named_column) 

2005 

2006 def _populate_separate_keys( 

2007 self, iter_: Iterable[Tuple[str, _NAMEDCOL]] 

2008 ) -> None: 

2009 """populate from an iterator of (key, column)""" 

2010 cols = list(iter_) 

2011 

2012 replace_col = [] 

2013 for k, col in cols: 

2014 if col.key != k: 

2015 raise exc.ArgumentError( 

2016 "DedupeColumnCollection requires columns be under " 

2017 "the same key as their .key" 

2018 ) 

2019 if col.name in self._index and col.key != col.name: 

2020 replace_col.append(col) 

2021 elif col.key in self._index: 

2022 replace_col.append(col) 

2023 else: 

2024 self._index[k] = (k, col) 

2025 self._collection.append((k, col, _ColumnMetrics(self, col))) 

2026 self._colset.update(c._deannotate() for (k, c, _) in self._collection) 

2027 

2028 self._index.update( 

2029 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection) 

2030 ) 

2031 for col in replace_col: 

2032 self.replace(col) 

2033 

2034 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None: 

2035 self._populate_separate_keys((col.key, col) for col in iter_) 

2036 

2037 def remove(self, column: _NAMEDCOL) -> None: # type: ignore[override] 

2038 if column not in self._colset: 

2039 raise ValueError( 

2040 "Can't remove column %r; column is not in this collection" 

2041 % column 

2042 ) 

2043 del self._index[column.key] 

2044 self._colset.remove(column) 

2045 self._collection[:] = [ 

2046 (k, c, metrics) 

2047 for (k, c, metrics) in self._collection 

2048 if c is not column 

2049 ] 

2050 for metrics in self._proxy_index.get(column, ()): 

2051 metrics.dispose(self) 

2052 

2053 self._index.update( 

2054 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2055 ) 

2056 # delete higher index 

2057 del self._index[len(self._collection)] 

2058 

2059 def replace( 

2060 self, 

2061 column: _NAMEDCOL, 

2062 extra_remove: Optional[Iterable[_NAMEDCOL]] = None, 

2063 ) -> None: 

2064 """add the given column to this collection, removing unaliased 

2065 versions of this column as well as existing columns with the 

2066 same key. 

2067 

2068 e.g.:: 

2069 

2070 t = Table("sometable", metadata, Column("col1", Integer)) 

2071 t.columns.replace(Column("col1", Integer, key="columnone")) 

2072 

2073 will remove the original 'col1' from the collection, and add 

2074 the new column under the name 'columnname'. 

2075 

2076 Used by schema.Column to override columns during table reflection. 

2077 

2078 """ 

2079 

2080 if extra_remove: 

2081 remove_col = set(extra_remove) 

2082 else: 

2083 remove_col = set() 

2084 # remove up to two columns based on matches of name as well as key 

2085 if column.name in self._index and column.key != column.name: 

2086 other = self._index[column.name][1] 

2087 if other.name == other.key: 

2088 remove_col.add(other) 

2089 

2090 if column.key in self._index: 

2091 remove_col.add(self._index[column.key][1]) 

2092 

2093 if not remove_col: 

2094 self._append_new_column(column.key, column) 

2095 return 

2096 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = [] 

2097 replaced = False 

2098 for k, col, metrics in self._collection: 

2099 if col in remove_col: 

2100 if not replaced: 

2101 replaced = True 

2102 new_cols.append( 

2103 (column.key, column, _ColumnMetrics(self, column)) 

2104 ) 

2105 else: 

2106 new_cols.append((k, col, metrics)) 

2107 

2108 if remove_col: 

2109 self._colset.difference_update(remove_col) 

2110 

2111 for rc in remove_col: 

2112 for metrics in self._proxy_index.get(rc, ()): 

2113 metrics.dispose(self) 

2114 

2115 if not replaced: 

2116 new_cols.append((column.key, column, _ColumnMetrics(self, column))) 

2117 

2118 self._colset.add(column._deannotate()) 

2119 self._collection[:] = new_cols 

2120 

2121 self._index.clear() 

2122 

2123 self._index.update( 

2124 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2125 ) 

2126 self._index.update({k: (k, col) for (k, col, _) in self._collection}) 

2127 

2128 

2129class ReadOnlyColumnCollection( 

2130 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co] 

2131): 

2132 __slots__ = ("_parent",) 

2133 

2134 def __init__(self, collection: ColumnCollection[_COLKEY, _COL_co]): 

2135 object.__setattr__(self, "_parent", collection) 

2136 object.__setattr__(self, "_colset", collection._colset) 

2137 object.__setattr__(self, "_index", collection._index) 

2138 object.__setattr__(self, "_collection", collection._collection) 

2139 object.__setattr__(self, "_proxy_index", collection._proxy_index) 

2140 

2141 def __getstate__(self) -> Dict[str, _COL_co]: 

2142 return {"_parent": self._parent} 

2143 

2144 def __setstate__(self, state: Dict[str, Any]) -> None: 

2145 parent = state["_parent"] 

2146 self.__init__(parent) # type: ignore 

2147 

2148 def add(self, column: Any, key: Any = ...) -> Any: 

2149 self._readonly() 

2150 

2151 def extend(self, elements: Any) -> NoReturn: 

2152 self._readonly() 

2153 

2154 def remove(self, item: Any) -> NoReturn: 

2155 self._readonly() 

2156 

2157 

2158class ColumnSet(util.OrderedSet["ColumnClause[Any]"]): 

2159 def contains_column(self, col: ColumnClause[Any]) -> bool: 

2160 return col in self 

2161 

2162 def extend(self, cols: Iterable[Any]) -> None: 

2163 for col in cols: 

2164 self.add(col) 

2165 

2166 def __eq__(self, other): 

2167 l = [] 

2168 for c in other: 

2169 for local in self: 

2170 if c.shares_lineage(local): 

2171 l.append(c == local) 

2172 return elements.and_(*l) 

2173 

2174 def __hash__(self) -> int: # type: ignore[override] 

2175 return hash(tuple(x for x in self)) 

2176 

2177 

2178def _entity_namespace( 

2179 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2180) -> _EntityNamespace: 

2181 """Return the nearest .entity_namespace for the given entity. 

2182 

2183 If not immediately available, does an iterate to find a sub-element 

2184 that has one, if any. 

2185 

2186 """ 

2187 try: 

2188 return cast(_HasEntityNamespace, entity).entity_namespace 

2189 except AttributeError: 

2190 for elem in visitors.iterate(cast(ExternallyTraversible, entity)): 

2191 if _is_has_entity_namespace(elem): 

2192 return elem.entity_namespace 

2193 else: 

2194 raise 

2195 

2196 

2197def _entity_namespace_key( 

2198 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2199 key: str, 

2200 default: Union[SQLCoreOperations[Any], _NoArg] = NO_ARG, 

2201) -> SQLCoreOperations[Any]: 

2202 """Return an entry from an entity_namespace. 

2203 

2204 

2205 Raises :class:`_exc.InvalidRequestError` rather than attribute error 

2206 on not found. 

2207 

2208 """ 

2209 

2210 try: 

2211 ns = _entity_namespace(entity) 

2212 if default is not NO_ARG: 

2213 return getattr(ns, key, default) 

2214 else: 

2215 return getattr(ns, key) # type: ignore 

2216 except AttributeError as err: 

2217 raise exc.InvalidRequestError( 

2218 'Entity namespace for "%s" has no property "%s"' % (entity, key) 

2219 ) from err