Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/base.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

861 statements  

1# sql/base.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Foundational utilities common to many sql modules.""" 

10 

11 

12from __future__ import annotations 

13 

14import collections 

15from enum import Enum 

16import itertools 

17from itertools import zip_longest 

18import operator 

19import re 

20from typing import Any 

21from typing import Callable 

22from typing import cast 

23from typing import Dict 

24from typing import FrozenSet 

25from typing import Generator 

26from typing import Generic 

27from typing import Iterable 

28from typing import Iterator 

29from typing import List 

30from typing import Mapping 

31from typing import MutableMapping 

32from typing import NamedTuple 

33from typing import NoReturn 

34from typing import Optional 

35from typing import overload 

36from typing import Sequence 

37from typing import Set 

38from typing import Tuple 

39from typing import Type 

40from typing import TYPE_CHECKING 

41from typing import TypeVar 

42from typing import Union 

43 

44from . import roles 

45from . import visitors 

46from .cache_key import HasCacheKey # noqa 

47from .cache_key import MemoizedHasCacheKey # noqa 

48from .traversals import HasCopyInternals # noqa 

49from .visitors import ClauseVisitor 

50from .visitors import ExtendedInternalTraversal 

51from .visitors import ExternallyTraversible 

52from .visitors import InternalTraversal 

53from .. import event 

54from .. import exc 

55from .. import util 

56from ..util import HasMemoized as HasMemoized 

57from ..util import hybridmethod 

58from ..util import typing as compat_typing 

59from ..util import warn_deprecated 

60from ..util.typing import Final 

61from ..util.typing import Protocol 

62from ..util.typing import Self 

63from ..util.typing import TypeGuard 

64 

65if TYPE_CHECKING: 

66 from . import coercions 

67 from . import elements 

68 from . import type_api 

69 from ._orm_types import DMLStrategyArgument 

70 from ._orm_types import SynchronizeSessionArgument 

71 from ._typing import _CLE 

72 from .cache_key import CacheKey 

73 from .compiler import SQLCompiler 

74 from .elements import BindParameter 

75 from .elements import ClauseList 

76 from .elements import ColumnClause # noqa 

77 from .elements import ColumnElement 

78 from .elements import NamedColumn 

79 from .elements import SQLCoreOperations 

80 from .elements import TextClause 

81 from .schema import Column 

82 from .schema import DefaultGenerator 

83 from .selectable import _JoinTargetElement 

84 from .selectable import _SelectIterable 

85 from .selectable import FromClause 

86 from .visitors import anon_map 

87 from ..engine import Connection 

88 from ..engine import CursorResult 

89 from ..engine.interfaces import _CoreMultiExecuteParams 

90 from ..engine.interfaces import _ExecuteOptions 

91 from ..engine.interfaces import _ImmutableExecuteOptions 

92 from ..engine.interfaces import CacheStats 

93 from ..engine.interfaces import Compiled 

94 from ..engine.interfaces import CompiledCacheType 

95 from ..engine.interfaces import CoreExecuteOptionsParameter 

96 from ..engine.interfaces import Dialect 

97 from ..engine.interfaces import IsolationLevel 

98 from ..engine.interfaces import SchemaTranslateMapType 

99 from ..event import dispatcher 

100 

101if not TYPE_CHECKING: 

102 coercions = None # noqa 

103 elements = None # noqa 

104 type_api = None # noqa 

105 

106 

107class _NoArg(Enum): 

108 NO_ARG = 0 

109 

110 def __repr__(self): 

111 return f"_NoArg.{self.name}" 

112 

113 

114NO_ARG: Final = _NoArg.NO_ARG 

115 

116 

117class _NoneName(Enum): 

118 NONE_NAME = 0 

119 """indicate a 'deferred' name that was ultimately the value None.""" 

120 

121 

122_NONE_NAME: Final = _NoneName.NONE_NAME 

123 

124_T = TypeVar("_T", bound=Any) 

125 

126_Fn = TypeVar("_Fn", bound=Callable[..., Any]) 

127 

128_AmbiguousTableNameMap = MutableMapping[str, str] 

129 

130 

131class _DefaultDescriptionTuple(NamedTuple): 

132 arg: Any 

133 is_scalar: Optional[bool] 

134 is_callable: Optional[bool] 

135 is_sentinel: Optional[bool] 

136 

137 @classmethod 

138 def _from_column_default( 

139 cls, default: Optional[DefaultGenerator] 

140 ) -> _DefaultDescriptionTuple: 

141 return ( 

142 _DefaultDescriptionTuple( 

143 default.arg, # type: ignore 

144 default.is_scalar, 

145 default.is_callable, 

146 default.is_sentinel, 

147 ) 

148 if default 

149 and ( 

150 default.has_arg 

151 or (not default.for_update and default.is_sentinel) 

152 ) 

153 else _DefaultDescriptionTuple(None, None, None, None) 

154 ) 

155 

156 

157_never_select_column: operator.attrgetter[Any] = operator.attrgetter( 

158 "_omit_from_statements" 

159) 

160 

161 

162class _EntityNamespace(Protocol): 

163 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ... 

164 

165 

166class _HasEntityNamespace(Protocol): 

167 @util.ro_non_memoized_property 

168 def entity_namespace(self) -> _EntityNamespace: ... 

169 

170 

171def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]: 

172 return hasattr(element, "entity_namespace") 

173 

174 

175# Remove when https://github.com/python/mypy/issues/14640 will be fixed 

176_Self = TypeVar("_Self", bound=Any) 

177 

178 

179class Immutable: 

180 """mark a ClauseElement as 'immutable' when expressions are cloned. 

181 

182 "immutable" objects refers to the "mutability" of an object in the 

183 context of SQL DQL and DML generation. Such as, in DQL, one can 

184 compose a SELECT or subquery of varied forms, but one cannot modify 

185 the structure of a specific table or column within DQL. 

186 :class:`.Immutable` is mostly intended to follow this concept, and as 

187 such the primary "immutable" objects are :class:`.ColumnClause`, 

188 :class:`.Column`, :class:`.TableClause`, :class:`.Table`. 

189 

190 """ 

191 

192 __slots__ = () 

193 

194 _is_immutable: bool = True 

195 

196 def unique_params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn: 

197 raise NotImplementedError("Immutable objects do not support copying") 

198 

199 def params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn: 

200 raise NotImplementedError("Immutable objects do not support copying") 

201 

202 def _clone(self: _Self, **kw: Any) -> _Self: 

203 return self 

204 

205 def _copy_internals( 

206 self, *, omit_attrs: Iterable[str] = (), **kw: Any 

207 ) -> None: 

208 pass 

209 

210 

211class SingletonConstant(Immutable): 

212 """Represent SQL constants like NULL, TRUE, FALSE""" 

213 

214 _is_singleton_constant: bool = True 

215 

216 _singleton: SingletonConstant 

217 

218 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T: 

219 return cast(_T, cls._singleton) 

220 

221 @util.non_memoized_property 

222 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]: 

223 raise NotImplementedError() 

224 

225 @classmethod 

226 def _create_singleton(cls) -> None: 

227 obj = object.__new__(cls) 

228 obj.__init__() # type: ignore 

229 

230 # for a long time this was an empty frozenset, meaning 

231 # a SingletonConstant would never be a "corresponding column" in 

232 # a statement. This referred to #6259. However, in #7154 we see 

233 # that we do in fact need "correspondence" to work when matching cols 

234 # in result sets, so the non-correspondence was moved to a more 

235 # specific level when we are actually adapting expressions for SQL 

236 # render only. 

237 obj.proxy_set = frozenset([obj]) 

238 cls._singleton = obj 

239 

240 

241def _from_objects( 

242 *elements: Union[ 

243 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement 

244 ] 

245) -> Iterator[FromClause]: 

246 return itertools.chain.from_iterable( 

247 [element._from_objects for element in elements] 

248 ) 

249 

250 

251def _select_iterables( 

252 elements: Iterable[roles.ColumnsClauseRole], 

253) -> _SelectIterable: 

254 """expand tables into individual columns in the 

255 given list of column expressions. 

256 

257 """ 

258 return itertools.chain.from_iterable( 

259 [c._select_iterable for c in elements] 

260 ) 

261 

262 

263_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType") 

264 

265 

266class _GenerativeType(compat_typing.Protocol): 

267 def _generate(self) -> Self: ... 

268 

269 

270def _generative(fn: _Fn) -> _Fn: 

271 """non-caching _generative() decorator. 

272 

273 This is basically the legacy decorator that copies the object and 

274 runs a method on the new copy. 

275 

276 """ 

277 

278 @util.decorator 

279 def _generative( 

280 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any 

281 ) -> _SelfGenerativeType: 

282 """Mark a method as generative.""" 

283 

284 self = self._generate() 

285 x = fn(self, *args, **kw) 

286 assert x is self, "generative methods must return self" 

287 return self 

288 

289 decorated = _generative(fn) 

290 decorated.non_generative = fn # type: ignore 

291 return decorated 

292 

293 

294def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]: 

295 msgs: Dict[str, str] = kw.pop("msgs", {}) 

296 

297 defaults: Dict[str, str] = kw.pop("defaults", {}) 

298 

299 getters: List[Tuple[str, operator.attrgetter[Any], Optional[str]]] = [ 

300 (name, operator.attrgetter(name), defaults.get(name, None)) 

301 for name in names 

302 ] 

303 

304 @util.decorator 

305 def check(fn: _Fn, *args: Any, **kw: Any) -> Any: 

306 # make pylance happy by not including "self" in the argument 

307 # list 

308 self = args[0] 

309 args = args[1:] 

310 for name, getter, default_ in getters: 

311 if getter(self) is not default_: 

312 msg = msgs.get( 

313 name, 

314 "Method %s() has already been invoked on this %s construct" 

315 % (fn.__name__, self.__class__), 

316 ) 

317 raise exc.InvalidRequestError(msg) 

318 return fn(self, *args, **kw) 

319 

320 return check 

321 

322 

323def _clone(element, **kw): 

324 return element._clone(**kw) 

325 

326 

327def _expand_cloned( 

328 elements: Iterable[_CLE], 

329) -> Iterable[_CLE]: 

330 """expand the given set of ClauseElements to be the set of all 'cloned' 

331 predecessors. 

332 

333 """ 

334 # TODO: cython candidate 

335 return itertools.chain(*[x._cloned_set for x in elements]) 

336 

337 

338def _de_clone( 

339 elements: Iterable[_CLE], 

340) -> Iterable[_CLE]: 

341 for x in elements: 

342 while x._is_clone_of is not None: 

343 x = x._is_clone_of 

344 yield x 

345 

346 

347def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

348 """return the intersection of sets a and b, counting 

349 any overlap between 'cloned' predecessors. 

350 

351 The returned set is in terms of the entities present within 'a'. 

352 

353 """ 

354 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection( 

355 _expand_cloned(b) 

356 ) 

357 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)} 

358 

359 

360def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

361 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection( 

362 _expand_cloned(b) 

363 ) 

364 return { 

365 elem for elem in a if not all_overlap.intersection(elem._cloned_set) 

366 } 

367 

368 

369class _DialectArgView(MutableMapping[str, Any]): 

370 """A dictionary view of dialect-level arguments in the form 

371 <dialectname>_<argument_name>. 

372 

373 """ 

374 

375 __slots__ = ("obj",) 

376 

377 def __init__(self, obj: DialectKWArgs) -> None: 

378 self.obj = obj 

379 

380 def _key(self, key: str) -> Tuple[str, str]: 

381 try: 

382 dialect, value_key = key.split("_", 1) 

383 except ValueError as err: 

384 raise KeyError(key) from err 

385 else: 

386 return dialect, value_key 

387 

388 def __getitem__(self, key: str) -> Any: 

389 dialect, value_key = self._key(key) 

390 

391 try: 

392 opt = self.obj.dialect_options[dialect] 

393 except exc.NoSuchModuleError as err: 

394 raise KeyError(key) from err 

395 else: 

396 return opt[value_key] 

397 

398 def __setitem__(self, key: str, value: Any) -> None: 

399 try: 

400 dialect, value_key = self._key(key) 

401 except KeyError as err: 

402 raise exc.ArgumentError( 

403 "Keys must be of the form <dialectname>_<argname>" 

404 ) from err 

405 else: 

406 self.obj.dialect_options[dialect][value_key] = value 

407 

408 def __delitem__(self, key: str) -> None: 

409 dialect, value_key = self._key(key) 

410 del self.obj.dialect_options[dialect][value_key] 

411 

412 def __len__(self) -> int: 

413 return sum( 

414 len(args._non_defaults) 

415 for args in self.obj.dialect_options.values() 

416 ) 

417 

418 def __iter__(self) -> Generator[str, None, None]: 

419 return ( 

420 "%s_%s" % (dialect_name, value_name) 

421 for dialect_name in self.obj.dialect_options 

422 for value_name in self.obj.dialect_options[ 

423 dialect_name 

424 ]._non_defaults 

425 ) 

426 

427 

428class _DialectArgDict(MutableMapping[str, Any]): 

429 """A dictionary view of dialect-level arguments for a specific 

430 dialect. 

431 

432 Maintains a separate collection of user-specified arguments 

433 and dialect-specified default arguments. 

434 

435 """ 

436 

437 def __init__(self) -> None: 

438 self._non_defaults: Dict[str, Any] = {} 

439 self._defaults: Dict[str, Any] = {} 

440 

441 def __len__(self) -> int: 

442 return len(set(self._non_defaults).union(self._defaults)) 

443 

444 def __iter__(self) -> Iterator[str]: 

445 return iter(set(self._non_defaults).union(self._defaults)) 

446 

447 def __getitem__(self, key: str) -> Any: 

448 if key in self._non_defaults: 

449 return self._non_defaults[key] 

450 else: 

451 return self._defaults[key] 

452 

453 def __setitem__(self, key: str, value: Any) -> None: 

454 self._non_defaults[key] = value 

455 

456 def __delitem__(self, key: str) -> None: 

457 del self._non_defaults[key] 

458 

459 

460@util.preload_module("sqlalchemy.dialects") 

461def _kw_reg_for_dialect(dialect_name: str) -> Optional[Dict[Any, Any]]: 

462 dialect_cls = util.preloaded.dialects.registry.load(dialect_name) 

463 if dialect_cls.construct_arguments is None: 

464 return None 

465 return dict(dialect_cls.construct_arguments) 

466 

467 

468class DialectKWArgs: 

469 """Establish the ability for a class to have dialect-specific arguments 

470 with defaults and constructor validation. 

471 

472 The :class:`.DialectKWArgs` interacts with the 

473 :attr:`.DefaultDialect.construct_arguments` present on a dialect. 

474 

475 .. seealso:: 

476 

477 :attr:`.DefaultDialect.construct_arguments` 

478 

479 """ 

480 

481 __slots__ = () 

482 

483 _dialect_kwargs_traverse_internals: List[Tuple[str, Any]] = [ 

484 ("dialect_options", InternalTraversal.dp_dialect_options) 

485 ] 

486 

487 def get_dialect_option( 

488 self, 

489 dialect: Dialect, 

490 argument_name: str, 

491 *, 

492 else_: Any = None, 

493 deprecated_fallback: Optional[str] = None, 

494 ) -> Any: 

495 r"""Return the value of a dialect-specific option, or *else_* if 

496 this dialect does not register the given argument. 

497 

498 This is useful for DDL compilers that may be inherited by 

499 third-party dialects whose ``construct_arguments`` do not 

500 include the same set of keys as the parent dialect. 

501 

502 :param dialect: The dialect for which to retrieve the option. 

503 :param argument_name: The name of the argument to retrieve. 

504 :param else\_: The value to return if the argument is not present. 

505 :param deprecated_fallback: Optional dialect name to fall back to 

506 if the argument is not present for the current dialect. If the 

507 argument is present for the fallback dialect but not the current 

508 dialect, a deprecation warning will be emitted. 

509 

510 """ 

511 

512 registry = DialectKWArgs._kw_registry[dialect.name] 

513 if registry is None: 

514 return else_ 

515 

516 if argument_name in registry.get(self.__class__, {}): 

517 if ( 

518 deprecated_fallback is None 

519 or dialect.name == deprecated_fallback 

520 ): 

521 return self.dialect_options[dialect.name][argument_name] 

522 

523 # deprecated_fallback is present; need to look in two places 

524 

525 # Current dialect has this option registered. 

526 # Check if user explicitly set it. 

527 if ( 

528 dialect.name in self.dialect_options 

529 and argument_name 

530 in self.dialect_options[dialect.name]._non_defaults 

531 ): 

532 # User explicitly set this dialect's option - use it 

533 return self.dialect_options[dialect.name][argument_name] 

534 

535 # User didn't set current dialect's option. 

536 # Check for deprecated fallback. 

537 elif ( 

538 deprecated_fallback in self.dialect_options 

539 and argument_name 

540 in self.dialect_options[deprecated_fallback]._non_defaults 

541 ): 

542 # User set fallback option but not current dialect's option 

543 warn_deprecated( 

544 f"Using '{deprecated_fallback}_{argument_name}' " 

545 f"with the '{dialect.name}' dialect is deprecated; " 

546 f"please additionally specify " 

547 f"'{dialect.name}_{argument_name}'.", 

548 version="2.1", 

549 ) 

550 return self.dialect_options[deprecated_fallback][argument_name] 

551 

552 # Return default value 

553 return self.dialect_options[dialect.name][argument_name] 

554 else: 

555 # Current dialect doesn't have the option registered at all. 

556 # Don't warn - if a third-party dialect doesn't support an 

557 # option, that's their choice, not a deprecation case. 

558 return else_ 

559 

560 @classmethod 

561 def argument_for( 

562 cls, dialect_name: str, argument_name: str, default: Any 

563 ) -> None: 

564 """Add a new kind of dialect-specific keyword argument for this class. 

565 

566 E.g.:: 

567 

568 Index.argument_for("mydialect", "length", None) 

569 

570 some_index = Index("a", "b", mydialect_length=5) 

571 

572 The :meth:`.DialectKWArgs.argument_for` method is a per-argument 

573 way adding extra arguments to the 

574 :attr:`.DefaultDialect.construct_arguments` dictionary. This 

575 dictionary provides a list of argument names accepted by various 

576 schema-level constructs on behalf of a dialect. 

577 

578 New dialects should typically specify this dictionary all at once as a 

579 data member of the dialect class. The use case for ad-hoc addition of 

580 argument names is typically for end-user code that is also using 

581 a custom compilation scheme which consumes the additional arguments. 

582 

583 :param dialect_name: name of a dialect. The dialect must be 

584 locatable, else a :class:`.NoSuchModuleError` is raised. The 

585 dialect must also include an existing 

586 :attr:`.DefaultDialect.construct_arguments` collection, indicating 

587 that it participates in the keyword-argument validation and default 

588 system, else :class:`.ArgumentError` is raised. If the dialect does 

589 not include this collection, then any keyword argument can be 

590 specified on behalf of this dialect already. All dialects packaged 

591 within SQLAlchemy include this collection, however for third party 

592 dialects, support may vary. 

593 

594 :param argument_name: name of the parameter. 

595 

596 :param default: default value of the parameter. 

597 

598 """ 

599 

600 construct_arg_dictionary: Optional[Dict[Any, Any]] = ( 

601 DialectKWArgs._kw_registry[dialect_name] 

602 ) 

603 if construct_arg_dictionary is None: 

604 raise exc.ArgumentError( 

605 "Dialect '%s' does have keyword-argument " 

606 "validation and defaults enabled configured" % dialect_name 

607 ) 

608 if cls not in construct_arg_dictionary: 

609 construct_arg_dictionary[cls] = {} 

610 construct_arg_dictionary[cls][argument_name] = default 

611 

612 @property 

613 def dialect_kwargs(self) -> _DialectArgView: 

614 """A collection of keyword arguments specified as dialect-specific 

615 options to this construct. 

616 

617 The arguments are present here in their original ``<dialect>_<kwarg>`` 

618 format. Only arguments that were actually passed are included; 

619 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which 

620 contains all options known by this dialect including defaults. 

621 

622 The collection is also writable; keys are accepted of the 

623 form ``<dialect>_<kwarg>`` where the value will be assembled 

624 into the list of options. 

625 

626 .. seealso:: 

627 

628 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form 

629 

630 """ 

631 return _DialectArgView(self) 

632 

633 @property 

634 def kwargs(self) -> _DialectArgView: 

635 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`.""" 

636 return self.dialect_kwargs 

637 

638 _kw_registry: util.PopulateDict[str, Optional[Dict[Any, Any]]] = ( 

639 util.PopulateDict(_kw_reg_for_dialect) 

640 ) 

641 

642 @classmethod 

643 def _kw_reg_for_dialect_cls(cls, dialect_name: str) -> _DialectArgDict: 

644 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name] 

645 d = _DialectArgDict() 

646 

647 if construct_arg_dictionary is None: 

648 d._defaults.update({"*": None}) 

649 else: 

650 for cls in reversed(cls.__mro__): 

651 if cls in construct_arg_dictionary: 

652 d._defaults.update(construct_arg_dictionary[cls]) 

653 return d 

654 

655 @util.memoized_property 

656 def dialect_options(self) -> util.PopulateDict[str, _DialectArgDict]: 

657 """A collection of keyword arguments specified as dialect-specific 

658 options to this construct. 

659 

660 This is a two-level nested registry, keyed to ``<dialect_name>`` 

661 and ``<argument_name>``. For example, the ``postgresql_where`` 

662 argument would be locatable as:: 

663 

664 arg = my_object.dialect_options["postgresql"]["where"] 

665 

666 .. versionadded:: 0.9.2 

667 

668 .. seealso:: 

669 

670 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form 

671 

672 """ 

673 

674 return util.PopulateDict(self._kw_reg_for_dialect_cls) 

675 

676 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None: 

677 # validate remaining kwargs that they all specify DB prefixes 

678 

679 if not kwargs: 

680 return 

681 

682 for k in kwargs: 

683 m = re.match("^(.+?)_(.+)$", k) 

684 if not m: 

685 raise TypeError( 

686 "Additional arguments should be " 

687 "named <dialectname>_<argument>, got '%s'" % k 

688 ) 

689 dialect_name, arg_name = m.group(1, 2) 

690 

691 try: 

692 construct_arg_dictionary = self.dialect_options[dialect_name] 

693 except exc.NoSuchModuleError: 

694 util.warn( 

695 "Can't validate argument %r; can't " 

696 "locate any SQLAlchemy dialect named %r" 

697 % (k, dialect_name) 

698 ) 

699 self.dialect_options[dialect_name] = d = _DialectArgDict() 

700 d._defaults.update({"*": None}) 

701 d._non_defaults[arg_name] = kwargs[k] 

702 else: 

703 if ( 

704 "*" not in construct_arg_dictionary 

705 and arg_name not in construct_arg_dictionary 

706 ): 

707 raise exc.ArgumentError( 

708 "Argument %r is not accepted by " 

709 "dialect %r on behalf of %r" 

710 % (k, dialect_name, self.__class__) 

711 ) 

712 else: 

713 construct_arg_dictionary[arg_name] = kwargs[k] 

714 

715 

716class CompileState: 

717 """Produces additional object state necessary for a statement to be 

718 compiled. 

719 

720 the :class:`.CompileState` class is at the base of classes that assemble 

721 state for a particular statement object that is then used by the 

722 compiler. This process is essentially an extension of the process that 

723 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis 

724 on converting raw user intent into more organized structures rather than 

725 producing string output. The top-level :class:`.CompileState` for the 

726 statement being executed is also accessible when the execution context 

727 works with invoking the statement and collecting results. 

728 

729 The production of :class:`.CompileState` is specific to the compiler, such 

730 as within the :meth:`.SQLCompiler.visit_insert`, 

731 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also 

732 responsible for associating the :class:`.CompileState` with the 

733 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement, 

734 i.e. the outermost SQL statement that's actually being executed. 

735 There can be other :class:`.CompileState` objects that are not the 

736 toplevel, such as when a SELECT subquery or CTE-nested 

737 INSERT/UPDATE/DELETE is generated. 

738 

739 .. versionadded:: 1.4 

740 

741 """ 

742 

743 __slots__ = ("statement", "_ambiguous_table_name_map") 

744 

745 plugins: Dict[Tuple[str, str], Type[CompileState]] = {} 

746 

747 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] 

748 

749 @classmethod 

750 def create_for_statement( 

751 cls, statement: Executable, compiler: SQLCompiler, **kw: Any 

752 ) -> CompileState: 

753 # factory construction. 

754 

755 if statement._propagate_attrs: 

756 plugin_name = statement._propagate_attrs.get( 

757 "compile_state_plugin", "default" 

758 ) 

759 klass = cls.plugins.get( 

760 (plugin_name, statement._effective_plugin_target), None 

761 ) 

762 if klass is None: 

763 klass = cls.plugins[ 

764 ("default", statement._effective_plugin_target) 

765 ] 

766 

767 else: 

768 klass = cls.plugins[ 

769 ("default", statement._effective_plugin_target) 

770 ] 

771 

772 if klass is cls: 

773 return cls(statement, compiler, **kw) 

774 else: 

775 return klass.create_for_statement(statement, compiler, **kw) 

776 

777 def __init__(self, statement, compiler, **kw): 

778 self.statement = statement 

779 

780 @classmethod 

781 def get_plugin_class( 

782 cls, statement: Executable 

783 ) -> Optional[Type[CompileState]]: 

784 plugin_name = statement._propagate_attrs.get( 

785 "compile_state_plugin", None 

786 ) 

787 

788 if plugin_name: 

789 key = (plugin_name, statement._effective_plugin_target) 

790 if key in cls.plugins: 

791 return cls.plugins[key] 

792 

793 # there's no case where we call upon get_plugin_class() and want 

794 # to get None back, there should always be a default. return that 

795 # if there was no plugin-specific class (e.g. Insert with "orm" 

796 # plugin) 

797 try: 

798 return cls.plugins[("default", statement._effective_plugin_target)] 

799 except KeyError: 

800 return None 

801 

802 @classmethod 

803 def _get_plugin_class_for_plugin( 

804 cls, statement: Executable, plugin_name: str 

805 ) -> Optional[Type[CompileState]]: 

806 try: 

807 return cls.plugins[ 

808 (plugin_name, statement._effective_plugin_target) 

809 ] 

810 except KeyError: 

811 return None 

812 

813 @classmethod 

814 def plugin_for( 

815 cls, plugin_name: str, visit_name: str 

816 ) -> Callable[[_Fn], _Fn]: 

817 def decorate(cls_to_decorate): 

818 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate 

819 return cls_to_decorate 

820 

821 return decorate 

822 

823 

824class Generative(HasMemoized): 

825 """Provide a method-chaining pattern in conjunction with the 

826 @_generative decorator.""" 

827 

828 def _generate(self) -> Self: 

829 skip = self._memoized_keys 

830 cls = self.__class__ 

831 s = cls.__new__(cls) 

832 if skip: 

833 # ensure this iteration remains atomic 

834 s.__dict__ = { 

835 k: v for k, v in self.__dict__.copy().items() if k not in skip 

836 } 

837 else: 

838 s.__dict__ = self.__dict__.copy() 

839 return s 

840 

841 

842class InPlaceGenerative(HasMemoized): 

843 """Provide a method-chaining pattern in conjunction with the 

844 @_generative decorator that mutates in place.""" 

845 

846 __slots__ = () 

847 

848 def _generate(self) -> Self: 

849 skip = self._memoized_keys 

850 # note __dict__ needs to be in __slots__ if this is used 

851 for k in skip: 

852 self.__dict__.pop(k, None) 

853 return self 

854 

855 

856class HasCompileState(Generative): 

857 """A class that has a :class:`.CompileState` associated with it.""" 

858 

859 _compile_state_plugin: Optional[Type[CompileState]] = None 

860 

861 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT 

862 

863 _compile_state_factory = CompileState.create_for_statement 

864 

865 

866class _MetaOptions(type): 

867 """metaclass for the Options class. 

868 

869 This metaclass is actually necessary despite the availability of the 

870 ``__init_subclass__()`` hook as this type also provides custom class-level 

871 behavior for the ``__add__()`` method. 

872 

873 """ 

874 

875 _cache_attrs: Tuple[str, ...] 

876 

877 def __add__(self, other): 

878 o1 = self() 

879 

880 if set(other).difference(self._cache_attrs): 

881 raise TypeError( 

882 "dictionary contains attributes not covered by " 

883 "Options class %s: %r" 

884 % (self, set(other).difference(self._cache_attrs)) 

885 ) 

886 

887 o1.__dict__.update(other) 

888 return o1 

889 

890 if TYPE_CHECKING: 

891 

892 def __getattr__(self, key: str) -> Any: ... 

893 

894 def __setattr__(self, key: str, value: Any) -> None: ... 

895 

896 def __delattr__(self, key: str) -> None: ... 

897 

898 

899class Options(metaclass=_MetaOptions): 

900 """A cacheable option dictionary with defaults.""" 

901 

902 __slots__ = () 

903 

904 _cache_attrs: Tuple[str, ...] 

905 

906 def __init_subclass__(cls) -> None: 

907 dict_ = cls.__dict__ 

908 cls._cache_attrs = tuple( 

909 sorted( 

910 d 

911 for d in dict_ 

912 if not d.startswith("__") 

913 and d not in ("_cache_key_traversal",) 

914 ) 

915 ) 

916 super().__init_subclass__() 

917 

918 def __init__(self, **kw: Any) -> None: 

919 self.__dict__.update(kw) 

920 

921 def __add__(self, other): 

922 o1 = self.__class__.__new__(self.__class__) 

923 o1.__dict__.update(self.__dict__) 

924 

925 if set(other).difference(self._cache_attrs): 

926 raise TypeError( 

927 "dictionary contains attributes not covered by " 

928 "Options class %s: %r" 

929 % (self, set(other).difference(self._cache_attrs)) 

930 ) 

931 

932 o1.__dict__.update(other) 

933 return o1 

934 

935 def __eq__(self, other): 

936 # TODO: very inefficient. This is used only in test suites 

937 # right now. 

938 for a, b in zip_longest(self._cache_attrs, other._cache_attrs): 

939 if getattr(self, a) != getattr(other, b): 

940 return False 

941 return True 

942 

943 def __repr__(self) -> str: 

944 # TODO: fairly inefficient, used only in debugging right now. 

945 

946 return "%s(%s)" % ( 

947 self.__class__.__name__, 

948 ", ".join( 

949 "%s=%r" % (k, self.__dict__[k]) 

950 for k in self._cache_attrs 

951 if k in self.__dict__ 

952 ), 

953 ) 

954 

955 @classmethod 

956 def isinstance(cls, klass: Type[Any]) -> bool: 

957 return issubclass(cls, klass) 

958 

959 @hybridmethod 

960 def add_to_element(self, name: str, value: str) -> Any: 

961 return self + {name: getattr(self, name) + value} 

962 

963 @hybridmethod 

964 def _state_dict_inst(self) -> Mapping[str, Any]: 

965 return self.__dict__ 

966 

967 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT 

968 

969 @_state_dict_inst.classlevel 

970 def _state_dict(cls) -> Mapping[str, Any]: 

971 return cls._state_dict_const 

972 

973 @classmethod 

974 def safe_merge(cls, other: "Options") -> Any: 

975 d = other._state_dict() 

976 

977 # only support a merge with another object of our class 

978 # and which does not have attrs that we don't. otherwise 

979 # we risk having state that might not be part of our cache 

980 # key strategy 

981 

982 if ( 

983 cls is not other.__class__ 

984 and other._cache_attrs 

985 and set(other._cache_attrs).difference(cls._cache_attrs) 

986 ): 

987 raise TypeError( 

988 "other element %r is not empty, is not of type %s, " 

989 "and contains attributes not covered here %r" 

990 % ( 

991 other, 

992 cls, 

993 set(other._cache_attrs).difference(cls._cache_attrs), 

994 ) 

995 ) 

996 return cls + d 

997 

998 @classmethod 

999 def from_execution_options( 

1000 cls, 

1001 key: str, 

1002 attrs: set[str], 

1003 exec_options: Mapping[str, Any], 

1004 statement_exec_options: Mapping[str, Any], 

1005 ) -> Tuple["Options", Mapping[str, Any]]: 

1006 """process Options argument in terms of execution options. 

1007 

1008 

1009 e.g.:: 

1010 

1011 ( 

1012 load_options, 

1013 execution_options, 

1014 ) = QueryContext.default_load_options.from_execution_options( 

1015 "_sa_orm_load_options", 

1016 {"populate_existing", "autoflush", "yield_per"}, 

1017 execution_options, 

1018 statement._execution_options, 

1019 ) 

1020 

1021 get back the Options and refresh "_sa_orm_load_options" in the 

1022 exec options dict w/ the Options as well 

1023 

1024 """ 

1025 

1026 # common case is that no options we are looking for are 

1027 # in either dictionary, so cancel for that first 

1028 check_argnames = attrs.intersection( 

1029 set(exec_options).union(statement_exec_options) 

1030 ) 

1031 

1032 existing_options = exec_options.get(key, cls) 

1033 

1034 if check_argnames: 

1035 result = {} 

1036 for argname in check_argnames: 

1037 local = "_" + argname 

1038 if argname in exec_options: 

1039 result[local] = exec_options[argname] 

1040 elif argname in statement_exec_options: 

1041 result[local] = statement_exec_options[argname] 

1042 

1043 new_options = existing_options + result 

1044 exec_options = util.immutabledict(exec_options).merge_with( 

1045 {key: new_options} 

1046 ) 

1047 return new_options, exec_options 

1048 

1049 else: 

1050 return existing_options, exec_options 

1051 

1052 if TYPE_CHECKING: 

1053 

1054 def __getattr__(self, key: str) -> Any: ... 

1055 

1056 def __setattr__(self, key: str, value: Any) -> None: ... 

1057 

1058 def __delattr__(self, key: str) -> None: ... 

1059 

1060 

1061class CacheableOptions(Options, HasCacheKey): 

1062 __slots__ = () 

1063 

1064 @hybridmethod 

1065 def _gen_cache_key_inst( 

1066 self, anon_map: Any, bindparams: List[BindParameter[Any]] 

1067 ) -> Optional[Tuple[Any]]: 

1068 return HasCacheKey._gen_cache_key(self, anon_map, bindparams) 

1069 

1070 @_gen_cache_key_inst.classlevel 

1071 def _gen_cache_key( 

1072 cls, anon_map: "anon_map", bindparams: List[BindParameter[Any]] 

1073 ) -> Tuple[CacheableOptions, Any]: 

1074 return (cls, ()) 

1075 

1076 @hybridmethod 

1077 def _generate_cache_key(self) -> Optional[CacheKey]: 

1078 return HasCacheKey._generate_cache_key_for_object(self) 

1079 

1080 

1081class ExecutableOption(HasCopyInternals): 

1082 __slots__ = () 

1083 

1084 _annotations: _ImmutableExecuteOptions = util.EMPTY_DICT 

1085 

1086 __visit_name__: str = "executable_option" 

1087 

1088 _is_has_cache_key: bool = False 

1089 

1090 _is_core: bool = True 

1091 

1092 def _clone(self, **kw): 

1093 """Create a shallow copy of this ExecutableOption.""" 

1094 c = self.__class__.__new__(self.__class__) 

1095 c.__dict__ = dict(self.__dict__) # type: ignore 

1096 return c 

1097 

1098 

1099class Executable(roles.StatementRole): 

1100 """Mark a :class:`_expression.ClauseElement` as supporting execution. 

1101 

1102 :class:`.Executable` is a superclass for all "statement" types 

1103 of objects, including :func:`select`, :func:`delete`, :func:`update`, 

1104 :func:`insert`, :func:`text`. 

1105 

1106 """ 

1107 

1108 supports_execution: bool = True 

1109 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT 

1110 _is_default_generator: bool = False 

1111 _with_options: Tuple[ExecutableOption, ...] = () 

1112 _with_context_options: Tuple[ 

1113 Tuple[Callable[[CompileState], None], Any], ... 

1114 ] = () 

1115 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]] 

1116 

1117 _executable_traverse_internals = [ 

1118 ("_with_options", InternalTraversal.dp_executable_options), 

1119 ( 

1120 "_with_context_options", 

1121 ExtendedInternalTraversal.dp_with_context_options, 

1122 ), 

1123 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs), 

1124 ] 

1125 

1126 is_select: bool = False 

1127 is_from_statement: bool = False 

1128 is_update: bool = False 

1129 is_insert: bool = False 

1130 is_text: bool = False 

1131 is_delete: bool = False 

1132 is_dml: bool = False 

1133 

1134 if TYPE_CHECKING: 

1135 __visit_name__: str 

1136 

1137 def _compile_w_cache( 

1138 self, 

1139 dialect: Dialect, 

1140 *, 

1141 compiled_cache: Optional[CompiledCacheType], 

1142 column_keys: List[str], 

1143 for_executemany: bool = False, 

1144 schema_translate_map: Optional[SchemaTranslateMapType] = None, 

1145 **kw: Any, 

1146 ) -> Tuple[ 

1147 Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats 

1148 ]: ... 

1149 

1150 def _execute_on_connection( 

1151 self, 

1152 connection: Connection, 

1153 distilled_params: _CoreMultiExecuteParams, 

1154 execution_options: CoreExecuteOptionsParameter, 

1155 ) -> CursorResult[Any]: ... 

1156 

1157 def _execute_on_scalar( 

1158 self, 

1159 connection: Connection, 

1160 distilled_params: _CoreMultiExecuteParams, 

1161 execution_options: CoreExecuteOptionsParameter, 

1162 ) -> Any: ... 

1163 

1164 @util.ro_non_memoized_property 

1165 def _all_selected_columns(self) -> _SelectIterable: 

1166 raise NotImplementedError() 

1167 

1168 @property 

1169 def _effective_plugin_target(self) -> str: 

1170 return self.__visit_name__ 

1171 

1172 @_generative 

1173 def options(self, *options: ExecutableOption) -> Self: 

1174 """Apply options to this statement. 

1175 

1176 In the general sense, options are any kind of Python object 

1177 that can be interpreted by the SQL compiler for the statement. 

1178 These options can be consumed by specific dialects or specific kinds 

1179 of compilers. 

1180 

1181 The most commonly known kind of option are the ORM level options 

1182 that apply "eager load" and other loading behaviors to an ORM 

1183 query. However, options can theoretically be used for many other 

1184 purposes. 

1185 

1186 For background on specific kinds of options for specific kinds of 

1187 statements, refer to the documentation for those option objects. 

1188 

1189 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to 

1190 Core statement objects towards the goal of allowing unified 

1191 Core / ORM querying capabilities. 

1192 

1193 .. seealso:: 

1194 

1195 :ref:`loading_columns` - refers to options specific to the usage 

1196 of ORM queries 

1197 

1198 :ref:`relationship_loader_options` - refers to options specific 

1199 to the usage of ORM queries 

1200 

1201 """ 

1202 self._with_options += tuple( 

1203 coercions.expect(roles.ExecutableOptionRole, opt) 

1204 for opt in options 

1205 ) 

1206 return self 

1207 

1208 @_generative 

1209 def _set_compile_options(self, compile_options: CacheableOptions) -> Self: 

1210 """Assign the compile options to a new value. 

1211 

1212 :param compile_options: appropriate CacheableOptions structure 

1213 

1214 """ 

1215 

1216 self._compile_options = compile_options 

1217 return self 

1218 

1219 @_generative 

1220 def _update_compile_options(self, options: CacheableOptions) -> Self: 

1221 """update the _compile_options with new keys.""" 

1222 

1223 assert self._compile_options is not None 

1224 self._compile_options += options 

1225 return self 

1226 

1227 @_generative 

1228 def _add_context_option( 

1229 self, 

1230 callable_: Callable[[CompileState], None], 

1231 cache_args: Any, 

1232 ) -> Self: 

1233 """Add a context option to this statement. 

1234 

1235 These are callable functions that will 

1236 be given the CompileState object upon compilation. 

1237 

1238 A second argument cache_args is required, which will be combined with 

1239 the ``__code__`` identity of the function itself in order to produce a 

1240 cache key. 

1241 

1242 """ 

1243 self._with_context_options += ((callable_, cache_args),) 

1244 return self 

1245 

1246 @overload 

1247 def execution_options( 

1248 self, 

1249 *, 

1250 compiled_cache: Optional[CompiledCacheType] = ..., 

1251 logging_token: str = ..., 

1252 isolation_level: IsolationLevel = ..., 

1253 no_parameters: bool = False, 

1254 stream_results: bool = False, 

1255 max_row_buffer: int = ..., 

1256 yield_per: int = ..., 

1257 insertmanyvalues_page_size: int = ..., 

1258 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

1259 populate_existing: bool = False, 

1260 autoflush: bool = False, 

1261 synchronize_session: SynchronizeSessionArgument = ..., 

1262 dml_strategy: DMLStrategyArgument = ..., 

1263 render_nulls: bool = ..., 

1264 is_delete_using: bool = ..., 

1265 is_update_from: bool = ..., 

1266 preserve_rowcount: bool = False, 

1267 **opt: Any, 

1268 ) -> Self: ... 

1269 

1270 @overload 

1271 def execution_options(self, **opt: Any) -> Self: ... 

1272 

1273 @_generative 

1274 def execution_options(self, **kw: Any) -> Self: 

1275 """Set non-SQL options for the statement which take effect during 

1276 execution. 

1277 

1278 Execution options can be set at many scopes, including per-statement, 

1279 per-connection, or per execution, using methods such as 

1280 :meth:`_engine.Connection.execution_options` and parameters which 

1281 accept a dictionary of options such as 

1282 :paramref:`_engine.Connection.execute.execution_options` and 

1283 :paramref:`_orm.Session.execute.execution_options`. 

1284 

1285 The primary characteristic of an execution option, as opposed to 

1286 other kinds of options such as ORM loader options, is that 

1287 **execution options never affect the compiled SQL of a query, only 

1288 things that affect how the SQL statement itself is invoked or how 

1289 results are fetched**. That is, execution options are not part of 

1290 what's accommodated by SQL compilation nor are they considered part of 

1291 the cached state of a statement. 

1292 

1293 The :meth:`_sql.Executable.execution_options` method is 

1294 :term:`generative`, as 

1295 is the case for the method as applied to the :class:`_engine.Engine` 

1296 and :class:`_orm.Query` objects, which means when the method is called, 

1297 a copy of the object is returned, which applies the given parameters to 

1298 that new copy, but leaves the original unchanged:: 

1299 

1300 statement = select(table.c.x, table.c.y) 

1301 new_statement = statement.execution_options(my_option=True) 

1302 

1303 An exception to this behavior is the :class:`_engine.Connection` 

1304 object, where the :meth:`_engine.Connection.execution_options` method 

1305 is explicitly **not** generative. 

1306 

1307 The kinds of options that may be passed to 

1308 :meth:`_sql.Executable.execution_options` and other related methods and 

1309 parameter dictionaries include parameters that are explicitly consumed 

1310 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not 

1311 defined by SQLAlchemy, which means the methods and/or parameter 

1312 dictionaries may be used for user-defined parameters that interact with 

1313 custom code, which may access the parameters using methods such as 

1314 :meth:`_sql.Executable.get_execution_options` and 

1315 :meth:`_engine.Connection.get_execution_options`, or within selected 

1316 event hooks using a dedicated ``execution_options`` event parameter 

1317 such as 

1318 :paramref:`_events.ConnectionEvents.before_execute.execution_options` 

1319 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.:: 

1320 

1321 from sqlalchemy import event 

1322 

1323 

1324 @event.listens_for(some_engine, "before_execute") 

1325 def _process_opt(conn, statement, multiparams, params, execution_options): 

1326 "run a SQL function before invoking a statement" 

1327 

1328 if execution_options.get("do_special_thing", False): 

1329 conn.exec_driver_sql("run_special_function()") 

1330 

1331 Within the scope of options that are explicitly recognized by 

1332 SQLAlchemy, most apply to specific classes of objects and not others. 

1333 The most common execution options include: 

1334 

1335 * :paramref:`_engine.Connection.execution_options.isolation_level` - 

1336 sets the isolation level for a connection or a class of connections 

1337 via an :class:`_engine.Engine`. This option is accepted only 

1338 by :class:`_engine.Connection` or :class:`_engine.Engine`. 

1339 

1340 * :paramref:`_engine.Connection.execution_options.stream_results` - 

1341 indicates results should be fetched using a server side cursor; 

1342 this option is accepted by :class:`_engine.Connection`, by the 

1343 :paramref:`_engine.Connection.execute.execution_options` parameter 

1344 on :meth:`_engine.Connection.execute`, and additionally by 

1345 :meth:`_sql.Executable.execution_options` on a SQL statement object, 

1346 as well as by ORM constructs like :meth:`_orm.Session.execute`. 

1347 

1348 * :paramref:`_engine.Connection.execution_options.compiled_cache` - 

1349 indicates a dictionary that will serve as the 

1350 :ref:`SQL compilation cache <sql_caching>` 

1351 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as 

1352 well as for ORM methods like :meth:`_orm.Session.execute`. 

1353 Can be passed as ``None`` to disable caching for statements. 

1354 This option is not accepted by 

1355 :meth:`_sql.Executable.execution_options` as it is inadvisable to 

1356 carry along a compilation cache within a statement object. 

1357 

1358 * :paramref:`_engine.Connection.execution_options.schema_translate_map` 

1359 - a mapping of schema names used by the 

1360 :ref:`Schema Translate Map <schema_translating>` feature, accepted 

1361 by :class:`_engine.Connection`, :class:`_engine.Engine`, 

1362 :class:`_sql.Executable`, as well as by ORM constructs 

1363 like :meth:`_orm.Session.execute`. 

1364 

1365 .. seealso:: 

1366 

1367 :meth:`_engine.Connection.execution_options` 

1368 

1369 :paramref:`_engine.Connection.execute.execution_options` 

1370 

1371 :paramref:`_orm.Session.execute.execution_options` 

1372 

1373 :ref:`orm_queryguide_execution_options` - documentation on all 

1374 ORM-specific execution options 

1375 

1376 """ # noqa: E501 

1377 if "isolation_level" in kw: 

1378 raise exc.ArgumentError( 

1379 "'isolation_level' execution option may only be specified " 

1380 "on Connection.execution_options(), or " 

1381 "per-engine using the isolation_level " 

1382 "argument to create_engine()." 

1383 ) 

1384 if "compiled_cache" in kw: 

1385 raise exc.ArgumentError( 

1386 "'compiled_cache' execution option may only be specified " 

1387 "on Connection.execution_options(), not per statement." 

1388 ) 

1389 self._execution_options = self._execution_options.union(kw) 

1390 return self 

1391 

1392 def get_execution_options(self) -> _ExecuteOptions: 

1393 """Get the non-SQL options which will take effect during execution. 

1394 

1395 .. versionadded:: 1.3 

1396 

1397 .. seealso:: 

1398 

1399 :meth:`.Executable.execution_options` 

1400 """ 

1401 return self._execution_options 

1402 

1403 

1404class SchemaEventTarget(event.EventTarget): 

1405 """Base class for elements that are the targets of :class:`.DDLEvents` 

1406 events. 

1407 

1408 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`. 

1409 

1410 """ 

1411 

1412 dispatch: dispatcher[SchemaEventTarget] 

1413 

1414 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: 

1415 """Associate with this SchemaEvent's parent object.""" 

1416 

1417 def _set_parent_with_dispatch( 

1418 self, parent: SchemaEventTarget, **kw: Any 

1419 ) -> None: 

1420 self.dispatch.before_parent_attach(self, parent) 

1421 self._set_parent(parent, **kw) 

1422 self.dispatch.after_parent_attach(self, parent) 

1423 

1424 

1425class SchemaVisitable(SchemaEventTarget, visitors.Visitable): 

1426 """Base class for elements that are targets of a :class:`.SchemaVisitor`. 

1427 

1428 .. versionadded:: 2.0.41 

1429 

1430 """ 

1431 

1432 

1433class SchemaVisitor(ClauseVisitor): 

1434 """Define the visiting for ``SchemaItem`` and more 

1435 generally ``SchemaVisitable`` objects. 

1436 

1437 """ 

1438 

1439 __traverse_options__: Dict[str, Any] = {"schema_visitor": True} 

1440 

1441 

1442class _SentinelDefaultCharacterization(Enum): 

1443 NONE = "none" 

1444 UNKNOWN = "unknown" 

1445 CLIENTSIDE = "clientside" 

1446 SENTINEL_DEFAULT = "sentinel_default" 

1447 SERVERSIDE = "serverside" 

1448 IDENTITY = "identity" 

1449 SEQUENCE = "sequence" 

1450 

1451 

1452class _SentinelColumnCharacterization(NamedTuple): 

1453 columns: Optional[Sequence[Column[Any]]] = None 

1454 is_explicit: bool = False 

1455 is_autoinc: bool = False 

1456 default_characterization: _SentinelDefaultCharacterization = ( 

1457 _SentinelDefaultCharacterization.NONE 

1458 ) 

1459 

1460 

1461_COLKEY = TypeVar("_COLKEY", Union[None, str], str) 

1462 

1463_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True) 

1464_COL = TypeVar("_COL", bound="ColumnElement[Any]") 

1465 

1466 

1467class _ColumnMetrics(Generic[_COL_co]): 

1468 __slots__ = ("column",) 

1469 

1470 column: _COL_co 

1471 

1472 def __init__( 

1473 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co 

1474 ) -> None: 

1475 self.column = col 

1476 

1477 # proxy_index being non-empty means it was initialized. 

1478 # so we need to update it 

1479 pi = collection._proxy_index 

1480 if pi: 

1481 for eps_col in col._expanded_proxy_set: 

1482 pi[eps_col].add(self) 

1483 

1484 def get_expanded_proxy_set(self) -> FrozenSet[ColumnElement[Any]]: 

1485 return self.column._expanded_proxy_set 

1486 

1487 def dispose(self, collection: ColumnCollection[_COLKEY, _COL_co]) -> None: 

1488 pi = collection._proxy_index 

1489 if not pi: 

1490 return 

1491 for col in self.column._expanded_proxy_set: 

1492 colset = pi.get(col, None) 

1493 if colset: 

1494 colset.discard(self) 

1495 if colset is not None and not colset: 

1496 del pi[col] 

1497 

1498 def embedded( 

1499 self, 

1500 target_set: Union[ 

1501 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]] 

1502 ], 

1503 ) -> bool: 

1504 expanded_proxy_set = self.column._expanded_proxy_set 

1505 for t in target_set.difference(expanded_proxy_set): 

1506 if not expanded_proxy_set.intersection(_expand_cloned([t])): 

1507 return False 

1508 return True 

1509 

1510 

1511class ColumnCollection(Generic[_COLKEY, _COL_co]): 

1512 """Collection of :class:`_expression.ColumnElement` instances, 

1513 typically for 

1514 :class:`_sql.FromClause` objects. 

1515 

1516 The :class:`_sql.ColumnCollection` object is most commonly available 

1517 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection 

1518 on the :class:`_schema.Table` object, introduced at 

1519 :ref:`metadata_tables_and_columns`. 

1520 

1521 The :class:`_expression.ColumnCollection` has both mapping- and sequence- 

1522 like behaviors. A :class:`_expression.ColumnCollection` usually stores 

1523 :class:`_schema.Column` objects, which are then accessible both via mapping 

1524 style access as well as attribute access style. 

1525 

1526 To access :class:`_schema.Column` objects using ordinary attribute-style 

1527 access, specify the name like any other object attribute, such as below 

1528 a column named ``employee_name`` is accessed:: 

1529 

1530 >>> employee_table.c.employee_name 

1531 

1532 To access columns that have names with special characters or spaces, 

1533 index-style access is used, such as below which illustrates a column named 

1534 ``employee ' payment`` is accessed:: 

1535 

1536 >>> employee_table.c["employee ' payment"] 

1537 

1538 As the :class:`_sql.ColumnCollection` object provides a Python dictionary 

1539 interface, common dictionary method names like 

1540 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`, 

1541 and :meth:`_sql.ColumnCollection.items` are available, which means that 

1542 database columns that are keyed under these names also need to use indexed 

1543 access:: 

1544 

1545 >>> employee_table.c["values"] 

1546 

1547 

1548 The name for which a :class:`_schema.Column` would be present is normally 

1549 that of the :paramref:`_schema.Column.key` parameter. In some contexts, 

1550 such as a :class:`_sql.Select` object that uses a label style set 

1551 using the :meth:`_sql.Select.set_label_style` method, a column of a certain 

1552 key may instead be represented under a particular label name such 

1553 as ``tablename_columnname``:: 

1554 

1555 >>> from sqlalchemy import select, column, table 

1556 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL 

1557 >>> t = table("t", column("c")) 

1558 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL) 

1559 >>> subq = stmt.subquery() 

1560 >>> subq.c.t_c 

1561 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c> 

1562 

1563 :class:`.ColumnCollection` also indexes the columns in order and allows 

1564 them to be accessible by their integer position:: 

1565 

1566 >>> cc[0] 

1567 Column('x', Integer(), table=None) 

1568 >>> cc[1] 

1569 Column('y', Integer(), table=None) 

1570 

1571 .. versionadded:: 1.4 :class:`_expression.ColumnCollection` 

1572 allows integer-based 

1573 index access to the collection. 

1574 

1575 Iterating the collection yields the column expressions in order:: 

1576 

1577 >>> list(cc) 

1578 [Column('x', Integer(), table=None), 

1579 Column('y', Integer(), table=None)] 

1580 

1581 The base :class:`_expression.ColumnCollection` object can store 

1582 duplicates, which can 

1583 mean either two columns with the same key, in which case the column 

1584 returned by key access is **arbitrary**:: 

1585 

1586 >>> x1, x2 = Column("x", Integer), Column("x", Integer) 

1587 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)]) 

1588 >>> list(cc) 

1589 [Column('x', Integer(), table=None), 

1590 Column('x', Integer(), table=None)] 

1591 >>> cc["x"] is x1 

1592 False 

1593 >>> cc["x"] is x2 

1594 True 

1595 

1596 Or it can also mean the same column multiple times. These cases are 

1597 supported as :class:`_expression.ColumnCollection` 

1598 is used to represent the columns in 

1599 a SELECT statement which may include duplicates. 

1600 

1601 A special subclass :class:`.DedupeColumnCollection` exists which instead 

1602 maintains SQLAlchemy's older behavior of not allowing duplicates; this 

1603 collection is used for schema level objects like :class:`_schema.Table` 

1604 and 

1605 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The 

1606 :class:`.DedupeColumnCollection` class also has additional mutation methods 

1607 as the schema constructs have more use cases that require removal and 

1608 replacement of columns. 

1609 

1610 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection` 

1611 now stores duplicate 

1612 column keys as well as the same column in multiple positions. The 

1613 :class:`.DedupeColumnCollection` class is added to maintain the 

1614 former behavior in those cases where deduplication as well as 

1615 additional replace/remove operations are needed. 

1616 

1617 

1618 """ 

1619 

1620 __slots__ = ("_collection", "_index", "_colset", "_proxy_index") 

1621 

1622 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]] 

1623 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]] 

1624 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]] 

1625 _colset: Set[_COL_co] 

1626 

1627 def __init__( 

1628 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None 

1629 ): 

1630 object.__setattr__(self, "_colset", set()) 

1631 object.__setattr__(self, "_index", {}) 

1632 object.__setattr__( 

1633 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

1634 ) 

1635 object.__setattr__(self, "_collection", []) 

1636 if columns: 

1637 self._initial_populate(columns) 

1638 

1639 @util.preload_module("sqlalchemy.sql.elements") 

1640 def __clause_element__(self) -> ClauseList: 

1641 elements = util.preloaded.sql_elements 

1642 

1643 return elements.ClauseList( 

1644 _literal_as_text_role=roles.ColumnsClauseRole, 

1645 group=False, 

1646 *self._all_columns, 

1647 ) 

1648 

1649 def _initial_populate( 

1650 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

1651 ) -> None: 

1652 self._populate_separate_keys(iter_) 

1653 

1654 @property 

1655 def _all_columns(self) -> List[_COL_co]: 

1656 return [col for (_, col, _) in self._collection] 

1657 

1658 def keys(self) -> List[_COLKEY]: 

1659 """Return a sequence of string key names for all columns in this 

1660 collection.""" 

1661 return [k for (k, _, _) in self._collection] 

1662 

1663 def values(self) -> List[_COL_co]: 

1664 """Return a sequence of :class:`_sql.ColumnClause` or 

1665 :class:`_schema.Column` objects for all columns in this 

1666 collection.""" 

1667 return [col for (_, col, _) in self._collection] 

1668 

1669 def items(self) -> List[Tuple[_COLKEY, _COL_co]]: 

1670 """Return a sequence of (key, column) tuples for all columns in this 

1671 collection each consisting of a string key name and a 

1672 :class:`_sql.ColumnClause` or 

1673 :class:`_schema.Column` object. 

1674 """ 

1675 

1676 return [(k, col) for (k, col, _) in self._collection] 

1677 

1678 def __bool__(self) -> bool: 

1679 return bool(self._collection) 

1680 

1681 def __len__(self) -> int: 

1682 return len(self._collection) 

1683 

1684 def __iter__(self) -> Iterator[_COL_co]: 

1685 # turn to a list first to maintain over a course of changes 

1686 return iter([col for _, col, _ in self._collection]) 

1687 

1688 @overload 

1689 def __getitem__(self, key: Union[str, int]) -> _COL_co: ... 

1690 

1691 @overload 

1692 def __getitem__( 

1693 self, key: Tuple[Union[str, int], ...] 

1694 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1695 

1696 @overload 

1697 def __getitem__( 

1698 self, key: slice 

1699 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1700 

1701 def __getitem__( 

1702 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]] 

1703 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]: 

1704 try: 

1705 if isinstance(key, (tuple, slice)): 

1706 if isinstance(key, slice): 

1707 cols = ( 

1708 (sub_key, col) 

1709 for (sub_key, col, _) in self._collection[key] 

1710 ) 

1711 else: 

1712 cols = (self._index[sub_key] for sub_key in key) 

1713 

1714 return ColumnCollection(cols).as_readonly() 

1715 else: 

1716 return self._index[key][1] 

1717 except KeyError as err: 

1718 if isinstance(err.args[0], int): 

1719 raise IndexError(err.args[0]) from err 

1720 else: 

1721 raise 

1722 

1723 def __getattr__(self, key: str) -> _COL_co: 

1724 try: 

1725 return self._index[key][1] 

1726 except KeyError as err: 

1727 raise AttributeError(key) from err 

1728 

1729 def __contains__(self, key: str) -> bool: 

1730 if key not in self._index: 

1731 if not isinstance(key, str): 

1732 raise exc.ArgumentError( 

1733 "__contains__ requires a string argument" 

1734 ) 

1735 return False 

1736 else: 

1737 return True 

1738 

1739 def compare(self, other: ColumnCollection[_COLKEY, _COL_co]) -> bool: 

1740 """Compare this :class:`_expression.ColumnCollection` to another 

1741 based on the names of the keys""" 

1742 

1743 for l, r in zip_longest(self, other): 

1744 if l is not r: 

1745 return False 

1746 else: 

1747 return True 

1748 

1749 def __eq__(self, other: Any) -> bool: 

1750 return self.compare(other) 

1751 

1752 @overload 

1753 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ... 

1754 

1755 @overload 

1756 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ... 

1757 

1758 def get( 

1759 self, key: str, default: Optional[_COL] = None 

1760 ) -> Optional[Union[_COL_co, _COL]]: 

1761 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object 

1762 based on a string key name from this 

1763 :class:`_expression.ColumnCollection`.""" 

1764 

1765 if key in self._index: 

1766 return self._index[key][1] 

1767 else: 

1768 return default 

1769 

1770 def __str__(self) -> str: 

1771 return "%s(%s)" % ( 

1772 self.__class__.__name__, 

1773 ", ".join(str(c) for c in self), 

1774 ) 

1775 

1776 def __setitem__(self, key: str, value: Any) -> NoReturn: 

1777 raise NotImplementedError() 

1778 

1779 def __delitem__(self, key: str) -> NoReturn: 

1780 raise NotImplementedError() 

1781 

1782 def __setattr__(self, key: str, obj: Any) -> NoReturn: 

1783 raise NotImplementedError() 

1784 

1785 def clear(self) -> NoReturn: 

1786 """Dictionary clear() is not implemented for 

1787 :class:`_sql.ColumnCollection`.""" 

1788 raise NotImplementedError() 

1789 

1790 def remove(self, column: Any) -> NoReturn: 

1791 raise NotImplementedError() 

1792 

1793 def update(self, iter_: Any) -> NoReturn: 

1794 """Dictionary update() is not implemented for 

1795 :class:`_sql.ColumnCollection`.""" 

1796 raise NotImplementedError() 

1797 

1798 # https://github.com/python/mypy/issues/4266 

1799 __hash__: Optional[int] = None # type: ignore 

1800 

1801 def _populate_separate_keys( 

1802 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

1803 ) -> None: 

1804 """populate from an iterator of (key, column)""" 

1805 

1806 self._collection[:] = collection = [ 

1807 (k, c, _ColumnMetrics(self, c)) for k, c in iter_ 

1808 ] 

1809 self._colset.update(c._deannotate() for _, c, _ in collection) 

1810 self._index.update( 

1811 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)} 

1812 ) 

1813 self._index.update({k: (k, col) for k, col, _ in reversed(collection)}) 

1814 

1815 def add( 

1816 self, column: ColumnElement[Any], key: Optional[_COLKEY] = None 

1817 ) -> None: 

1818 """Add a column to this :class:`_sql.ColumnCollection`. 

1819 

1820 .. note:: 

1821 

1822 This method is **not normally used by user-facing code**, as the 

1823 :class:`_sql.ColumnCollection` is usually part of an existing 

1824 object such as a :class:`_schema.Table`. To add a 

1825 :class:`_schema.Column` to an existing :class:`_schema.Table` 

1826 object, use the :meth:`_schema.Table.append_column` method. 

1827 

1828 """ 

1829 colkey: _COLKEY 

1830 

1831 if key is None: 

1832 colkey = column.key # type: ignore 

1833 else: 

1834 colkey = key 

1835 

1836 l = len(self._collection) 

1837 

1838 # don't really know how this part is supposed to work w/ the 

1839 # covariant thing 

1840 

1841 _column = cast(_COL_co, column) 

1842 

1843 self._collection.append( 

1844 (colkey, _column, _ColumnMetrics(self, _column)) 

1845 ) 

1846 self._colset.add(_column._deannotate()) 

1847 self._index[l] = (colkey, _column) 

1848 if colkey not in self._index: 

1849 self._index[colkey] = (colkey, _column) 

1850 

1851 def __getstate__(self) -> Dict[str, Any]: 

1852 return { 

1853 "_collection": [(k, c) for k, c, _ in self._collection], 

1854 "_index": self._index, 

1855 } 

1856 

1857 def __setstate__(self, state: Dict[str, Any]) -> None: 

1858 object.__setattr__(self, "_index", state["_index"]) 

1859 object.__setattr__( 

1860 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

1861 ) 

1862 object.__setattr__( 

1863 self, 

1864 "_collection", 

1865 [ 

1866 (k, c, _ColumnMetrics(self, c)) 

1867 for (k, c) in state["_collection"] 

1868 ], 

1869 ) 

1870 object.__setattr__( 

1871 self, "_colset", {col for k, col, _ in self._collection} 

1872 ) 

1873 

1874 def contains_column(self, col: ColumnElement[Any]) -> bool: 

1875 """Checks if a column object exists in this collection""" 

1876 if col not in self._colset: 

1877 if isinstance(col, str): 

1878 raise exc.ArgumentError( 

1879 "contains_column cannot be used with string arguments. " 

1880 "Use ``col_name in table.c`` instead." 

1881 ) 

1882 return False 

1883 else: 

1884 return True 

1885 

1886 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: 

1887 """Return a "read only" form of this 

1888 :class:`_sql.ColumnCollection`.""" 

1889 

1890 return ReadOnlyColumnCollection(self) 

1891 

1892 def _init_proxy_index(self) -> None: 

1893 """populate the "proxy index", if empty. 

1894 

1895 proxy index is added in 2.0 to provide more efficient operation 

1896 for the corresponding_column() method. 

1897 

1898 For reasons of both time to construct new .c collections as well as 

1899 memory conservation for large numbers of large .c collections, the 

1900 proxy_index is only filled if corresponding_column() is called. once 

1901 filled it stays that way, and new _ColumnMetrics objects created after 

1902 that point will populate it with new data. Note this case would be 

1903 unusual, if not nonexistent, as it means a .c collection is being 

1904 mutated after corresponding_column() were used, however it is tested in 

1905 test/base/test_utils.py. 

1906 

1907 """ 

1908 pi = self._proxy_index 

1909 if pi: 

1910 return 

1911 

1912 for _, _, metrics in self._collection: 

1913 eps = metrics.column._expanded_proxy_set 

1914 

1915 for eps_col in eps: 

1916 pi[eps_col].add(metrics) 

1917 

1918 def corresponding_column( 

1919 self, column: _COL, require_embedded: bool = False 

1920 ) -> Optional[Union[_COL, _COL_co]]: 

1921 """Given a :class:`_expression.ColumnElement`, return the exported 

1922 :class:`_expression.ColumnElement` object from this 

1923 :class:`_expression.ColumnCollection` 

1924 which corresponds to that original :class:`_expression.ColumnElement` 

1925 via a common 

1926 ancestor column. 

1927 

1928 :param column: the target :class:`_expression.ColumnElement` 

1929 to be matched. 

1930 

1931 :param require_embedded: only return corresponding columns for 

1932 the given :class:`_expression.ColumnElement`, if the given 

1933 :class:`_expression.ColumnElement` 

1934 is actually present within a sub-element 

1935 of this :class:`_expression.Selectable`. 

1936 Normally the column will match if 

1937 it merely shares a common ancestor with one of the exported 

1938 columns of this :class:`_expression.Selectable`. 

1939 

1940 .. seealso:: 

1941 

1942 :meth:`_expression.Selectable.corresponding_column` 

1943 - invokes this method 

1944 against the collection returned by 

1945 :attr:`_expression.Selectable.exported_columns`. 

1946 

1947 .. versionchanged:: 1.4 the implementation for ``corresponding_column`` 

1948 was moved onto the :class:`_expression.ColumnCollection` itself. 

1949 

1950 """ 

1951 # TODO: cython candidate 

1952 

1953 # don't dig around if the column is locally present 

1954 if column in self._colset: 

1955 return column 

1956 

1957 selected_intersection, selected_metrics = None, None 

1958 target_set = column.proxy_set 

1959 

1960 pi = self._proxy_index 

1961 if not pi: 

1962 self._init_proxy_index() 

1963 

1964 for current_metrics in ( 

1965 mm for ts in target_set if ts in pi for mm in pi[ts] 

1966 ): 

1967 if not require_embedded or current_metrics.embedded(target_set): 

1968 if selected_metrics is None: 

1969 # no corresponding column yet, pick this one. 

1970 selected_metrics = current_metrics 

1971 continue 

1972 

1973 current_intersection = target_set.intersection( 

1974 current_metrics.column._expanded_proxy_set 

1975 ) 

1976 if selected_intersection is None: 

1977 selected_intersection = target_set.intersection( 

1978 selected_metrics.column._expanded_proxy_set 

1979 ) 

1980 

1981 if len(current_intersection) > len(selected_intersection): 

1982 # 'current' has a larger field of correspondence than 

1983 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x 

1984 # matches a1.c.x->table.c.x better than 

1985 # selectable.c.x->table.c.x does. 

1986 

1987 selected_metrics = current_metrics 

1988 selected_intersection = current_intersection 

1989 elif current_intersection == selected_intersection: 

1990 # they have the same field of correspondence. see 

1991 # which proxy_set has fewer columns in it, which 

1992 # indicates a closer relationship with the root 

1993 # column. Also take into account the "weight" 

1994 # attribute which CompoundSelect() uses to give 

1995 # higher precedence to columns based on vertical 

1996 # position in the compound statement, and discard 

1997 # columns that have no reference to the target 

1998 # column (also occurs with CompoundSelect) 

1999 

2000 selected_col_distance = sum( 

2001 [ 

2002 sc._annotations.get("weight", 1) 

2003 for sc in ( 

2004 selected_metrics.column._uncached_proxy_list() 

2005 ) 

2006 if sc.shares_lineage(column) 

2007 ], 

2008 ) 

2009 current_col_distance = sum( 

2010 [ 

2011 sc._annotations.get("weight", 1) 

2012 for sc in ( 

2013 current_metrics.column._uncached_proxy_list() 

2014 ) 

2015 if sc.shares_lineage(column) 

2016 ], 

2017 ) 

2018 if current_col_distance < selected_col_distance: 

2019 selected_metrics = current_metrics 

2020 selected_intersection = current_intersection 

2021 

2022 return selected_metrics.column if selected_metrics else None 

2023 

2024 

2025_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]") 

2026 

2027 

2028class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]): 

2029 """A :class:`_expression.ColumnCollection` 

2030 that maintains deduplicating behavior. 

2031 

2032 This is useful by schema level objects such as :class:`_schema.Table` and 

2033 :class:`.PrimaryKeyConstraint`. The collection includes more 

2034 sophisticated mutator methods as well to suit schema objects which 

2035 require mutable column collections. 

2036 

2037 .. versionadded:: 1.4 

2038 

2039 """ 

2040 

2041 def add( # type: ignore[override] 

2042 self, column: _NAMEDCOL, key: Optional[str] = None 

2043 ) -> None: 

2044 if key is not None and column.key != key: 

2045 raise exc.ArgumentError( 

2046 "DedupeColumnCollection requires columns be under " 

2047 "the same key as their .key" 

2048 ) 

2049 key = column.key 

2050 

2051 if key is None: 

2052 raise exc.ArgumentError( 

2053 "Can't add unnamed column to column collection" 

2054 ) 

2055 

2056 if key in self._index: 

2057 existing = self._index[key][1] 

2058 

2059 if existing is column: 

2060 return 

2061 

2062 self.replace(column) 

2063 

2064 # pop out memoized proxy_set as this 

2065 # operation may very well be occurring 

2066 # in a _make_proxy operation 

2067 util.memoized_property.reset(column, "proxy_set") 

2068 else: 

2069 self._append_new_column(key, column) 

2070 

2071 def _append_new_column(self, key: str, named_column: _NAMEDCOL) -> None: 

2072 l = len(self._collection) 

2073 self._collection.append( 

2074 (key, named_column, _ColumnMetrics(self, named_column)) 

2075 ) 

2076 self._colset.add(named_column._deannotate()) 

2077 self._index[l] = (key, named_column) 

2078 self._index[key] = (key, named_column) 

2079 

2080 def _populate_separate_keys( 

2081 self, iter_: Iterable[Tuple[str, _NAMEDCOL]] 

2082 ) -> None: 

2083 """populate from an iterator of (key, column)""" 

2084 cols = list(iter_) 

2085 

2086 replace_col = [] 

2087 for k, col in cols: 

2088 if col.key != k: 

2089 raise exc.ArgumentError( 

2090 "DedupeColumnCollection requires columns be under " 

2091 "the same key as their .key" 

2092 ) 

2093 if col.name in self._index and col.key != col.name: 

2094 replace_col.append(col) 

2095 elif col.key in self._index: 

2096 replace_col.append(col) 

2097 else: 

2098 self._index[k] = (k, col) 

2099 self._collection.append((k, col, _ColumnMetrics(self, col))) 

2100 self._colset.update(c._deannotate() for (k, c, _) in self._collection) 

2101 

2102 self._index.update( 

2103 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection) 

2104 ) 

2105 for col in replace_col: 

2106 self.replace(col) 

2107 

2108 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None: 

2109 self._populate_separate_keys((col.key, col) for col in iter_) 

2110 

2111 def remove(self, column: _NAMEDCOL) -> None: # type: ignore[override] 

2112 if column not in self._colset: 

2113 raise ValueError( 

2114 "Can't remove column %r; column is not in this collection" 

2115 % column 

2116 ) 

2117 del self._index[column.key] 

2118 self._colset.remove(column) 

2119 self._collection[:] = [ 

2120 (k, c, metrics) 

2121 for (k, c, metrics) in self._collection 

2122 if c is not column 

2123 ] 

2124 for metrics in self._proxy_index.get(column, ()): 

2125 metrics.dispose(self) 

2126 

2127 self._index.update( 

2128 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2129 ) 

2130 # delete higher index 

2131 del self._index[len(self._collection)] 

2132 

2133 def replace( 

2134 self, 

2135 column: _NAMEDCOL, 

2136 extra_remove: Optional[Iterable[_NAMEDCOL]] = None, 

2137 ) -> None: 

2138 """add the given column to this collection, removing unaliased 

2139 versions of this column as well as existing columns with the 

2140 same key. 

2141 

2142 e.g.:: 

2143 

2144 t = Table("sometable", metadata, Column("col1", Integer)) 

2145 t.columns.replace(Column("col1", Integer, key="columnone")) 

2146 

2147 will remove the original 'col1' from the collection, and add 

2148 the new column under the name 'columnname'. 

2149 

2150 Used by schema.Column to override columns during table reflection. 

2151 

2152 """ 

2153 

2154 if extra_remove: 

2155 remove_col = set(extra_remove) 

2156 else: 

2157 remove_col = set() 

2158 # remove up to two columns based on matches of name as well as key 

2159 if column.name in self._index and column.key != column.name: 

2160 other = self._index[column.name][1] 

2161 if other.name == other.key: 

2162 remove_col.add(other) 

2163 

2164 if column.key in self._index: 

2165 remove_col.add(self._index[column.key][1]) 

2166 

2167 if not remove_col: 

2168 self._append_new_column(column.key, column) 

2169 return 

2170 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = [] 

2171 replaced = False 

2172 for k, col, metrics in self._collection: 

2173 if col in remove_col: 

2174 if not replaced: 

2175 replaced = True 

2176 new_cols.append( 

2177 (column.key, column, _ColumnMetrics(self, column)) 

2178 ) 

2179 else: 

2180 new_cols.append((k, col, metrics)) 

2181 

2182 if remove_col: 

2183 self._colset.difference_update(remove_col) 

2184 

2185 for rc in remove_col: 

2186 for metrics in self._proxy_index.get(rc, ()): 

2187 metrics.dispose(self) 

2188 

2189 if not replaced: 

2190 new_cols.append((column.key, column, _ColumnMetrics(self, column))) 

2191 

2192 self._colset.add(column._deannotate()) 

2193 self._collection[:] = new_cols 

2194 

2195 self._index.clear() 

2196 

2197 self._index.update( 

2198 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2199 ) 

2200 self._index.update({k: (k, col) for (k, col, _) in self._collection}) 

2201 

2202 

2203class ReadOnlyColumnCollection( 

2204 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co] 

2205): 

2206 __slots__ = ("_parent",) 

2207 

2208 def __init__(self, collection: ColumnCollection[_COLKEY, _COL_co]): 

2209 object.__setattr__(self, "_parent", collection) 

2210 object.__setattr__(self, "_colset", collection._colset) 

2211 object.__setattr__(self, "_index", collection._index) 

2212 object.__setattr__(self, "_collection", collection._collection) 

2213 object.__setattr__(self, "_proxy_index", collection._proxy_index) 

2214 

2215 def __getstate__(self) -> Dict[str, _COL_co]: 

2216 return {"_parent": self._parent} 

2217 

2218 def __setstate__(self, state: Dict[str, Any]) -> None: 

2219 parent = state["_parent"] 

2220 self.__init__(parent) # type: ignore 

2221 

2222 def add(self, column: Any, key: Any = ...) -> Any: 

2223 self._readonly() 

2224 

2225 def extend(self, elements: Any) -> NoReturn: 

2226 self._readonly() 

2227 

2228 def remove(self, item: Any) -> NoReturn: 

2229 self._readonly() 

2230 

2231 

2232class ColumnSet(util.OrderedSet["ColumnClause[Any]"]): 

2233 def contains_column(self, col: ColumnClause[Any]) -> bool: 

2234 return col in self 

2235 

2236 def extend(self, cols: Iterable[Any]) -> None: 

2237 for col in cols: 

2238 self.add(col) 

2239 

2240 def __eq__(self, other): 

2241 l = [] 

2242 for c in other: 

2243 for local in self: 

2244 if c.shares_lineage(local): 

2245 l.append(c == local) 

2246 return elements.and_(*l) 

2247 

2248 def __hash__(self) -> int: # type: ignore[override] 

2249 return hash(tuple(x for x in self)) 

2250 

2251 

2252def _entity_namespace( 

2253 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2254) -> _EntityNamespace: 

2255 """Return the nearest .entity_namespace for the given entity. 

2256 

2257 If not immediately available, does an iterate to find a sub-element 

2258 that has one, if any. 

2259 

2260 """ 

2261 try: 

2262 return cast(_HasEntityNamespace, entity).entity_namespace 

2263 except AttributeError: 

2264 for elem in visitors.iterate(cast(ExternallyTraversible, entity)): 

2265 if _is_has_entity_namespace(elem): 

2266 return elem.entity_namespace 

2267 else: 

2268 raise 

2269 

2270 

2271def _entity_namespace_key( 

2272 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2273 key: str, 

2274 default: Union[SQLCoreOperations[Any], _NoArg] = NO_ARG, 

2275) -> SQLCoreOperations[Any]: 

2276 """Return an entry from an entity_namespace. 

2277 

2278 

2279 Raises :class:`_exc.InvalidRequestError` rather than attribute error 

2280 on not found. 

2281 

2282 """ 

2283 

2284 try: 

2285 ns = _entity_namespace(entity) 

2286 if default is not NO_ARG: 

2287 return getattr(ns, key, default) 

2288 else: 

2289 return getattr(ns, key) # type: ignore 

2290 except AttributeError as err: 

2291 raise exc.InvalidRequestError( 

2292 'Entity namespace for "%s" has no property "%s"' % (entity, key) 

2293 ) from err