Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/base.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

861 statements  

1# sql/base.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Foundational utilities common to many sql modules.""" 

10 

11from __future__ import annotations 

12 

13import collections 

14from enum import Enum 

15import itertools 

16from itertools import zip_longest 

17import operator 

18import re 

19from typing import Any 

20from typing import Callable 

21from typing import cast 

22from typing import Dict 

23from typing import FrozenSet 

24from typing import Generator 

25from typing import Generic 

26from typing import Iterable 

27from typing import Iterator 

28from typing import List 

29from typing import Mapping 

30from typing import MutableMapping 

31from typing import NamedTuple 

32from typing import NoReturn 

33from typing import Optional 

34from typing import overload 

35from typing import Sequence 

36from typing import Set 

37from typing import Tuple 

38from typing import Type 

39from typing import TYPE_CHECKING 

40from typing import TypeVar 

41from typing import Union 

42 

43from . import roles 

44from . import visitors 

45from .cache_key import HasCacheKey # noqa 

46from .cache_key import MemoizedHasCacheKey # noqa 

47from .traversals import HasCopyInternals # noqa 

48from .visitors import ClauseVisitor 

49from .visitors import ExtendedInternalTraversal 

50from .visitors import ExternallyTraversible 

51from .visitors import InternalTraversal 

52from .. import event 

53from .. import exc 

54from .. import util 

55from ..util import HasMemoized as HasMemoized 

56from ..util import hybridmethod 

57from ..util import typing as compat_typing 

58from ..util import warn_deprecated 

59from ..util.typing import Final 

60from ..util.typing import Protocol 

61from ..util.typing import Self 

62from ..util.typing import TypeGuard 

63 

64if TYPE_CHECKING: 

65 from . import coercions 

66 from . import elements 

67 from . import type_api 

68 from ._orm_types import DMLStrategyArgument 

69 from ._orm_types import SynchronizeSessionArgument 

70 from ._typing import _CLE 

71 from .cache_key import CacheKey 

72 from .compiler import SQLCompiler 

73 from .elements import BindParameter 

74 from .elements import ClauseList 

75 from .elements import ColumnClause # noqa 

76 from .elements import ColumnElement 

77 from .elements import NamedColumn 

78 from .elements import SQLCoreOperations 

79 from .elements import TextClause 

80 from .schema import Column 

81 from .schema import DefaultGenerator 

82 from .selectable import _JoinTargetElement 

83 from .selectable import _SelectIterable 

84 from .selectable import FromClause 

85 from .visitors import anon_map 

86 from ..engine import Connection 

87 from ..engine import CursorResult 

88 from ..engine.interfaces import _CoreMultiExecuteParams 

89 from ..engine.interfaces import _ExecuteOptions 

90 from ..engine.interfaces import _ImmutableExecuteOptions 

91 from ..engine.interfaces import CacheStats 

92 from ..engine.interfaces import Compiled 

93 from ..engine.interfaces import CompiledCacheType 

94 from ..engine.interfaces import CoreExecuteOptionsParameter 

95 from ..engine.interfaces import Dialect 

96 from ..engine.interfaces import IsolationLevel 

97 from ..engine.interfaces import SchemaTranslateMapType 

98 from ..event import dispatcher 

99 

100if not TYPE_CHECKING: 

101 coercions = None # noqa 

102 elements = None # noqa 

103 type_api = None # noqa 

104 

105 

106class _NoArg(Enum): 

107 NO_ARG = 0 

108 

109 def __repr__(self): 

110 return f"_NoArg.{self.name}" 

111 

112 

113NO_ARG: Final = _NoArg.NO_ARG 

114 

115 

116class _NoneName(Enum): 

117 NONE_NAME = 0 

118 """indicate a 'deferred' name that was ultimately the value None.""" 

119 

120 

121_NONE_NAME: Final = _NoneName.NONE_NAME 

122 

123_T = TypeVar("_T", bound=Any) 

124 

125_Fn = TypeVar("_Fn", bound=Callable[..., Any]) 

126 

127_AmbiguousTableNameMap = MutableMapping[str, str] 

128 

129 

130class _DefaultDescriptionTuple(NamedTuple): 

131 arg: Any 

132 is_scalar: Optional[bool] 

133 is_callable: Optional[bool] 

134 is_sentinel: Optional[bool] 

135 

136 @classmethod 

137 def _from_column_default( 

138 cls, default: Optional[DefaultGenerator] 

139 ) -> _DefaultDescriptionTuple: 

140 return ( 

141 _DefaultDescriptionTuple( 

142 default.arg, # type: ignore 

143 default.is_scalar, 

144 default.is_callable, 

145 default.is_sentinel, 

146 ) 

147 if default 

148 and ( 

149 default.has_arg 

150 or (not default.for_update and default.is_sentinel) 

151 ) 

152 else _DefaultDescriptionTuple(None, None, None, None) 

153 ) 

154 

155 

156_never_select_column: operator.attrgetter[Any] = operator.attrgetter( 

157 "_omit_from_statements" 

158) 

159 

160 

161class _EntityNamespace(Protocol): 

162 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ... 

163 

164 

165class _HasEntityNamespace(Protocol): 

166 @util.ro_non_memoized_property 

167 def entity_namespace(self) -> _EntityNamespace: ... 

168 

169 

170def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]: 

171 return hasattr(element, "entity_namespace") 

172 

173 

174# Remove when https://github.com/python/mypy/issues/14640 will be fixed 

175_Self = TypeVar("_Self", bound=Any) 

176 

177 

178class Immutable: 

179 """mark a ClauseElement as 'immutable' when expressions are cloned. 

180 

181 "immutable" objects refers to the "mutability" of an object in the 

182 context of SQL DQL and DML generation. Such as, in DQL, one can 

183 compose a SELECT or subquery of varied forms, but one cannot modify 

184 the structure of a specific table or column within DQL. 

185 :class:`.Immutable` is mostly intended to follow this concept, and as 

186 such the primary "immutable" objects are :class:`.ColumnClause`, 

187 :class:`.Column`, :class:`.TableClause`, :class:`.Table`. 

188 

189 """ 

190 

191 __slots__ = () 

192 

193 _is_immutable: bool = True 

194 

195 def unique_params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn: 

196 raise NotImplementedError("Immutable objects do not support copying") 

197 

198 def params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn: 

199 raise NotImplementedError("Immutable objects do not support copying") 

200 

201 def _clone(self: _Self, **kw: Any) -> _Self: 

202 return self 

203 

204 def _copy_internals( 

205 self, *, omit_attrs: Iterable[str] = (), **kw: Any 

206 ) -> None: 

207 pass 

208 

209 

210class SingletonConstant(Immutable): 

211 """Represent SQL constants like NULL, TRUE, FALSE""" 

212 

213 _is_singleton_constant: bool = True 

214 

215 _singleton: SingletonConstant 

216 

217 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T: 

218 return cast(_T, cls._singleton) 

219 

220 @util.non_memoized_property 

221 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]: 

222 raise NotImplementedError() 

223 

224 @classmethod 

225 def _create_singleton(cls) -> None: 

226 obj = object.__new__(cls) 

227 obj.__init__() # type: ignore 

228 

229 # for a long time this was an empty frozenset, meaning 

230 # a SingletonConstant would never be a "corresponding column" in 

231 # a statement. This referred to #6259. However, in #7154 we see 

232 # that we do in fact need "correspondence" to work when matching cols 

233 # in result sets, so the non-correspondence was moved to a more 

234 # specific level when we are actually adapting expressions for SQL 

235 # render only. 

236 obj.proxy_set = frozenset([obj]) 

237 cls._singleton = obj 

238 

239 

240def _from_objects( 

241 *elements: Union[ 

242 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement 

243 ] 

244) -> Iterator[FromClause]: 

245 return itertools.chain.from_iterable( 

246 [element._from_objects for element in elements] 

247 ) 

248 

249 

250def _select_iterables( 

251 elements: Iterable[roles.ColumnsClauseRole], 

252) -> _SelectIterable: 

253 """expand tables into individual columns in the 

254 given list of column expressions. 

255 

256 """ 

257 return itertools.chain.from_iterable( 

258 [c._select_iterable for c in elements] 

259 ) 

260 

261 

262_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType") 

263 

264 

265class _GenerativeType(compat_typing.Protocol): 

266 def _generate(self) -> Self: ... 

267 

268 

269def _generative(fn: _Fn) -> _Fn: 

270 """non-caching _generative() decorator. 

271 

272 This is basically the legacy decorator that copies the object and 

273 runs a method on the new copy. 

274 

275 """ 

276 

277 @util.decorator 

278 def _generative( 

279 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any 

280 ) -> _SelfGenerativeType: 

281 """Mark a method as generative.""" 

282 

283 self = self._generate() 

284 x = fn(self, *args, **kw) 

285 assert x is self, "generative methods must return self" 

286 return self 

287 

288 decorated = _generative(fn) 

289 decorated.non_generative = fn # type: ignore 

290 return decorated 

291 

292 

293def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]: 

294 msgs: Dict[str, str] = kw.pop("msgs", {}) 

295 

296 defaults: Dict[str, str] = kw.pop("defaults", {}) 

297 

298 getters: List[Tuple[str, operator.attrgetter[Any], Optional[str]]] = [ 

299 (name, operator.attrgetter(name), defaults.get(name, None)) 

300 for name in names 

301 ] 

302 

303 @util.decorator 

304 def check(fn: _Fn, *args: Any, **kw: Any) -> Any: 

305 # make pylance happy by not including "self" in the argument 

306 # list 

307 self = args[0] 

308 args = args[1:] 

309 for name, getter, default_ in getters: 

310 if getter(self) is not default_: 

311 msg = msgs.get( 

312 name, 

313 "Method %s() has already been invoked on this %s construct" 

314 % (fn.__name__, self.__class__), 

315 ) 

316 raise exc.InvalidRequestError(msg) 

317 return fn(self, *args, **kw) 

318 

319 return check 

320 

321 

322def _clone(element, **kw): 

323 return element._clone(**kw) 

324 

325 

326def _expand_cloned( 

327 elements: Iterable[_CLE], 

328) -> Iterable[_CLE]: 

329 """expand the given set of ClauseElements to be the set of all 'cloned' 

330 predecessors. 

331 

332 """ 

333 # TODO: cython candidate 

334 return itertools.chain(*[x._cloned_set for x in elements]) 

335 

336 

337def _de_clone( 

338 elements: Iterable[_CLE], 

339) -> Iterable[_CLE]: 

340 for x in elements: 

341 while x._is_clone_of is not None: 

342 x = x._is_clone_of 

343 yield x 

344 

345 

346def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

347 """return the intersection of sets a and b, counting 

348 any overlap between 'cloned' predecessors. 

349 

350 The returned set is in terms of the entities present within 'a'. 

351 

352 """ 

353 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection( 

354 _expand_cloned(b) 

355 ) 

356 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)} 

357 

358 

359def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

360 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection( 

361 _expand_cloned(b) 

362 ) 

363 return { 

364 elem for elem in a if not all_overlap.intersection(elem._cloned_set) 

365 } 

366 

367 

368class _DialectArgView(MutableMapping[str, Any]): 

369 """A dictionary view of dialect-level arguments in the form 

370 <dialectname>_<argument_name>. 

371 

372 """ 

373 

374 __slots__ = ("obj",) 

375 

376 def __init__(self, obj: DialectKWArgs) -> None: 

377 self.obj = obj 

378 

379 def _key(self, key: str) -> Tuple[str, str]: 

380 try: 

381 dialect, value_key = key.split("_", 1) 

382 except ValueError as err: 

383 raise KeyError(key) from err 

384 else: 

385 return dialect, value_key 

386 

387 def __getitem__(self, key: str) -> Any: 

388 dialect, value_key = self._key(key) 

389 

390 try: 

391 opt = self.obj.dialect_options[dialect] 

392 except exc.NoSuchModuleError as err: 

393 raise KeyError(key) from err 

394 else: 

395 return opt[value_key] 

396 

397 def __setitem__(self, key: str, value: Any) -> None: 

398 try: 

399 dialect, value_key = self._key(key) 

400 except KeyError as err: 

401 raise exc.ArgumentError( 

402 "Keys must be of the form <dialectname>_<argname>" 

403 ) from err 

404 else: 

405 self.obj.dialect_options[dialect][value_key] = value 

406 

407 def __delitem__(self, key: str) -> None: 

408 dialect, value_key = self._key(key) 

409 del self.obj.dialect_options[dialect][value_key] 

410 

411 def __len__(self) -> int: 

412 return sum( 

413 len(args._non_defaults) 

414 for args in self.obj.dialect_options.values() 

415 ) 

416 

417 def __iter__(self) -> Generator[str, None, None]: 

418 return ( 

419 "%s_%s" % (dialect_name, value_name) 

420 for dialect_name in self.obj.dialect_options 

421 for value_name in self.obj.dialect_options[ 

422 dialect_name 

423 ]._non_defaults 

424 ) 

425 

426 

427class _DialectArgDict(MutableMapping[str, Any]): 

428 """A dictionary view of dialect-level arguments for a specific 

429 dialect. 

430 

431 Maintains a separate collection of user-specified arguments 

432 and dialect-specified default arguments. 

433 

434 """ 

435 

436 def __init__(self) -> None: 

437 self._non_defaults: Dict[str, Any] = {} 

438 self._defaults: Dict[str, Any] = {} 

439 

440 def __len__(self) -> int: 

441 return len(set(self._non_defaults).union(self._defaults)) 

442 

443 def __iter__(self) -> Iterator[str]: 

444 return iter(set(self._non_defaults).union(self._defaults)) 

445 

446 def __getitem__(self, key: str) -> Any: 

447 if key in self._non_defaults: 

448 return self._non_defaults[key] 

449 else: 

450 return self._defaults[key] 

451 

452 def __setitem__(self, key: str, value: Any) -> None: 

453 self._non_defaults[key] = value 

454 

455 def __delitem__(self, key: str) -> None: 

456 del self._non_defaults[key] 

457 

458 

459@util.preload_module("sqlalchemy.dialects") 

460def _kw_reg_for_dialect(dialect_name: str) -> Optional[Dict[Any, Any]]: 

461 dialect_cls = util.preloaded.dialects.registry.load(dialect_name) 

462 if dialect_cls.construct_arguments is None: 

463 return None 

464 return dict(dialect_cls.construct_arguments) 

465 

466 

467class DialectKWArgs: 

468 """Establish the ability for a class to have dialect-specific arguments 

469 with defaults and constructor validation. 

470 

471 The :class:`.DialectKWArgs` interacts with the 

472 :attr:`.DefaultDialect.construct_arguments` present on a dialect. 

473 

474 .. seealso:: 

475 

476 :attr:`.DefaultDialect.construct_arguments` 

477 

478 """ 

479 

480 __slots__ = () 

481 

482 _dialect_kwargs_traverse_internals: List[Tuple[str, Any]] = [ 

483 ("dialect_options", InternalTraversal.dp_dialect_options) 

484 ] 

485 

486 def get_dialect_option( 

487 self, 

488 dialect: Dialect, 

489 argument_name: str, 

490 *, 

491 else_: Any = None, 

492 deprecated_fallback: Optional[str] = None, 

493 ) -> Any: 

494 r"""Return the value of a dialect-specific option, or *else_* if 

495 this dialect does not register the given argument. 

496 

497 This is useful for DDL compilers that may be inherited by 

498 third-party dialects whose ``construct_arguments`` do not 

499 include the same set of keys as the parent dialect. 

500 

501 :param dialect: The dialect for which to retrieve the option. 

502 :param argument_name: The name of the argument to retrieve. 

503 :param else\_: The value to return if the argument is not present. 

504 :param deprecated_fallback: Optional dialect name to fall back to 

505 if the argument is not present for the current dialect. If the 

506 argument is present for the fallback dialect but not the current 

507 dialect, a deprecation warning will be emitted. 

508 

509 """ 

510 

511 registry = DialectKWArgs._kw_registry[dialect.name] 

512 if registry is None: 

513 return else_ 

514 

515 if argument_name in registry.get(self.__class__, {}): 

516 if ( 

517 deprecated_fallback is None 

518 or dialect.name == deprecated_fallback 

519 ): 

520 return self.dialect_options[dialect.name][argument_name] 

521 

522 # deprecated_fallback is present; need to look in two places 

523 

524 # Current dialect has this option registered. 

525 # Check if user explicitly set it. 

526 if ( 

527 dialect.name in self.dialect_options 

528 and argument_name 

529 in self.dialect_options[dialect.name]._non_defaults 

530 ): 

531 # User explicitly set this dialect's option - use it 

532 return self.dialect_options[dialect.name][argument_name] 

533 

534 # User didn't set current dialect's option. 

535 # Check for deprecated fallback. 

536 elif ( 

537 deprecated_fallback in self.dialect_options 

538 and argument_name 

539 in self.dialect_options[deprecated_fallback]._non_defaults 

540 ): 

541 # User set fallback option but not current dialect's option 

542 warn_deprecated( 

543 f"Using '{deprecated_fallback}_{argument_name}' " 

544 f"with the '{dialect.name}' dialect is deprecated; " 

545 f"please additionally specify " 

546 f"'{dialect.name}_{argument_name}'.", 

547 version="2.1", 

548 ) 

549 return self.dialect_options[deprecated_fallback][argument_name] 

550 

551 # Return default value 

552 return self.dialect_options[dialect.name][argument_name] 

553 else: 

554 # Current dialect doesn't have the option registered at all. 

555 # Don't warn - if a third-party dialect doesn't support an 

556 # option, that's their choice, not a deprecation case. 

557 return else_ 

558 

559 @classmethod 

560 def argument_for( 

561 cls, dialect_name: str, argument_name: str, default: Any 

562 ) -> None: 

563 """Add a new kind of dialect-specific keyword argument for this class. 

564 

565 E.g.:: 

566 

567 Index.argument_for("mydialect", "length", None) 

568 

569 some_index = Index("a", "b", mydialect_length=5) 

570 

571 The :meth:`.DialectKWArgs.argument_for` method is a per-argument 

572 way adding extra arguments to the 

573 :attr:`.DefaultDialect.construct_arguments` dictionary. This 

574 dictionary provides a list of argument names accepted by various 

575 schema-level constructs on behalf of a dialect. 

576 

577 New dialects should typically specify this dictionary all at once as a 

578 data member of the dialect class. The use case for ad-hoc addition of 

579 argument names is typically for end-user code that is also using 

580 a custom compilation scheme which consumes the additional arguments. 

581 

582 :param dialect_name: name of a dialect. The dialect must be 

583 locatable, else a :class:`.NoSuchModuleError` is raised. The 

584 dialect must also include an existing 

585 :attr:`.DefaultDialect.construct_arguments` collection, indicating 

586 that it participates in the keyword-argument validation and default 

587 system, else :class:`.ArgumentError` is raised. If the dialect does 

588 not include this collection, then any keyword argument can be 

589 specified on behalf of this dialect already. All dialects packaged 

590 within SQLAlchemy include this collection, however for third party 

591 dialects, support may vary. 

592 

593 :param argument_name: name of the parameter. 

594 

595 :param default: default value of the parameter. 

596 

597 """ 

598 

599 construct_arg_dictionary: Optional[Dict[Any, Any]] = ( 

600 DialectKWArgs._kw_registry[dialect_name] 

601 ) 

602 if construct_arg_dictionary is None: 

603 raise exc.ArgumentError( 

604 "Dialect '%s' does have keyword-argument " 

605 "validation and defaults enabled configured" % dialect_name 

606 ) 

607 if cls not in construct_arg_dictionary: 

608 construct_arg_dictionary[cls] = {} 

609 construct_arg_dictionary[cls][argument_name] = default 

610 

611 @property 

612 def dialect_kwargs(self) -> _DialectArgView: 

613 """A collection of keyword arguments specified as dialect-specific 

614 options to this construct. 

615 

616 The arguments are present here in their original ``<dialect>_<kwarg>`` 

617 format. Only arguments that were actually passed are included; 

618 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which 

619 contains all options known by this dialect including defaults. 

620 

621 The collection is also writable; keys are accepted of the 

622 form ``<dialect>_<kwarg>`` where the value will be assembled 

623 into the list of options. 

624 

625 .. seealso:: 

626 

627 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form 

628 

629 """ 

630 return _DialectArgView(self) 

631 

632 @property 

633 def kwargs(self) -> _DialectArgView: 

634 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`.""" 

635 return self.dialect_kwargs 

636 

637 _kw_registry: util.PopulateDict[str, Optional[Dict[Any, Any]]] = ( 

638 util.PopulateDict(_kw_reg_for_dialect) 

639 ) 

640 

641 @classmethod 

642 def _kw_reg_for_dialect_cls(cls, dialect_name: str) -> _DialectArgDict: 

643 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name] 

644 d = _DialectArgDict() 

645 

646 if construct_arg_dictionary is None: 

647 d._defaults.update({"*": None}) 

648 else: 

649 for cls in reversed(cls.__mro__): 

650 if cls in construct_arg_dictionary: 

651 d._defaults.update(construct_arg_dictionary[cls]) 

652 return d 

653 

654 @util.memoized_property 

655 def dialect_options(self) -> util.PopulateDict[str, _DialectArgDict]: 

656 """A collection of keyword arguments specified as dialect-specific 

657 options to this construct. 

658 

659 This is a two-level nested registry, keyed to ``<dialect_name>`` 

660 and ``<argument_name>``. For example, the ``postgresql_where`` 

661 argument would be locatable as:: 

662 

663 arg = my_object.dialect_options["postgresql"]["where"] 

664 

665 .. versionadded:: 0.9.2 

666 

667 .. seealso:: 

668 

669 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form 

670 

671 """ 

672 

673 return util.PopulateDict(self._kw_reg_for_dialect_cls) 

674 

675 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None: 

676 # validate remaining kwargs that they all specify DB prefixes 

677 

678 if not kwargs: 

679 return 

680 

681 for k in kwargs: 

682 m = re.match("^(.+?)_(.+)$", k) 

683 if not m: 

684 raise TypeError( 

685 "Additional arguments should be " 

686 "named <dialectname>_<argument>, got '%s'" % k 

687 ) 

688 dialect_name, arg_name = m.group(1, 2) 

689 

690 try: 

691 construct_arg_dictionary = self.dialect_options[dialect_name] 

692 except exc.NoSuchModuleError: 

693 util.warn( 

694 "Can't validate argument %r; can't " 

695 "locate any SQLAlchemy dialect named %r" 

696 % (k, dialect_name) 

697 ) 

698 self.dialect_options[dialect_name] = d = _DialectArgDict() 

699 d._defaults.update({"*": None}) 

700 d._non_defaults[arg_name] = kwargs[k] 

701 else: 

702 if ( 

703 "*" not in construct_arg_dictionary 

704 and arg_name not in construct_arg_dictionary 

705 ): 

706 raise exc.ArgumentError( 

707 "Argument %r is not accepted by " 

708 "dialect %r on behalf of %r" 

709 % (k, dialect_name, self.__class__) 

710 ) 

711 else: 

712 construct_arg_dictionary[arg_name] = kwargs[k] 

713 

714 

715class CompileState: 

716 """Produces additional object state necessary for a statement to be 

717 compiled. 

718 

719 the :class:`.CompileState` class is at the base of classes that assemble 

720 state for a particular statement object that is then used by the 

721 compiler. This process is essentially an extension of the process that 

722 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis 

723 on converting raw user intent into more organized structures rather than 

724 producing string output. The top-level :class:`.CompileState` for the 

725 statement being executed is also accessible when the execution context 

726 works with invoking the statement and collecting results. 

727 

728 The production of :class:`.CompileState` is specific to the compiler, such 

729 as within the :meth:`.SQLCompiler.visit_insert`, 

730 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also 

731 responsible for associating the :class:`.CompileState` with the 

732 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement, 

733 i.e. the outermost SQL statement that's actually being executed. 

734 There can be other :class:`.CompileState` objects that are not the 

735 toplevel, such as when a SELECT subquery or CTE-nested 

736 INSERT/UPDATE/DELETE is generated. 

737 

738 .. versionadded:: 1.4 

739 

740 """ 

741 

742 __slots__ = ("statement", "_ambiguous_table_name_map") 

743 

744 plugins: Dict[Tuple[str, str], Type[CompileState]] = {} 

745 

746 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] 

747 

748 @classmethod 

749 def create_for_statement( 

750 cls, statement: Executable, compiler: SQLCompiler, **kw: Any 

751 ) -> CompileState: 

752 # factory construction. 

753 

754 if statement._propagate_attrs: 

755 plugin_name = statement._propagate_attrs.get( 

756 "compile_state_plugin", "default" 

757 ) 

758 klass = cls.plugins.get( 

759 (plugin_name, statement._effective_plugin_target), None 

760 ) 

761 if klass is None: 

762 klass = cls.plugins[ 

763 ("default", statement._effective_plugin_target) 

764 ] 

765 

766 else: 

767 klass = cls.plugins[ 

768 ("default", statement._effective_plugin_target) 

769 ] 

770 

771 if klass is cls: 

772 return cls(statement, compiler, **kw) 

773 else: 

774 return klass.create_for_statement(statement, compiler, **kw) 

775 

776 def __init__(self, statement, compiler, **kw): 

777 self.statement = statement 

778 

779 @classmethod 

780 def get_plugin_class( 

781 cls, statement: Executable 

782 ) -> Optional[Type[CompileState]]: 

783 plugin_name = statement._propagate_attrs.get( 

784 "compile_state_plugin", None 

785 ) 

786 

787 if plugin_name: 

788 key = (plugin_name, statement._effective_plugin_target) 

789 if key in cls.plugins: 

790 return cls.plugins[key] 

791 

792 # there's no case where we call upon get_plugin_class() and want 

793 # to get None back, there should always be a default. return that 

794 # if there was no plugin-specific class (e.g. Insert with "orm" 

795 # plugin) 

796 try: 

797 return cls.plugins[("default", statement._effective_plugin_target)] 

798 except KeyError: 

799 return None 

800 

801 @classmethod 

802 def _get_plugin_class_for_plugin( 

803 cls, statement: Executable, plugin_name: str 

804 ) -> Optional[Type[CompileState]]: 

805 try: 

806 return cls.plugins[ 

807 (plugin_name, statement._effective_plugin_target) 

808 ] 

809 except KeyError: 

810 return None 

811 

812 @classmethod 

813 def plugin_for( 

814 cls, plugin_name: str, visit_name: str 

815 ) -> Callable[[_Fn], _Fn]: 

816 def decorate(cls_to_decorate): 

817 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate 

818 return cls_to_decorate 

819 

820 return decorate 

821 

822 

823class Generative(HasMemoized): 

824 """Provide a method-chaining pattern in conjunction with the 

825 @_generative decorator.""" 

826 

827 def _generate(self) -> Self: 

828 skip = self._memoized_keys 

829 cls = self.__class__ 

830 s = cls.__new__(cls) 

831 if skip: 

832 # ensure this iteration remains atomic 

833 s.__dict__ = { 

834 k: v for k, v in self.__dict__.copy().items() if k not in skip 

835 } 

836 else: 

837 s.__dict__ = self.__dict__.copy() 

838 return s 

839 

840 

841class InPlaceGenerative(HasMemoized): 

842 """Provide a method-chaining pattern in conjunction with the 

843 @_generative decorator that mutates in place.""" 

844 

845 __slots__ = () 

846 

847 def _generate(self) -> Self: 

848 skip = self._memoized_keys 

849 # note __dict__ needs to be in __slots__ if this is used 

850 for k in skip: 

851 self.__dict__.pop(k, None) 

852 return self 

853 

854 

855class HasCompileState(Generative): 

856 """A class that has a :class:`.CompileState` associated with it.""" 

857 

858 _compile_state_plugin: Optional[Type[CompileState]] = None 

859 

860 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT 

861 

862 _compile_state_factory = CompileState.create_for_statement 

863 

864 

865class _MetaOptions(type): 

866 """metaclass for the Options class. 

867 

868 This metaclass is actually necessary despite the availability of the 

869 ``__init_subclass__()`` hook as this type also provides custom class-level 

870 behavior for the ``__add__()`` method. 

871 

872 """ 

873 

874 _cache_attrs: Tuple[str, ...] 

875 

876 def __add__(self, other): 

877 o1 = self() 

878 

879 if set(other).difference(self._cache_attrs): 

880 raise TypeError( 

881 "dictionary contains attributes not covered by " 

882 "Options class %s: %r" 

883 % (self, set(other).difference(self._cache_attrs)) 

884 ) 

885 

886 o1.__dict__.update(other) 

887 return o1 

888 

889 if TYPE_CHECKING: 

890 

891 def __getattr__(self, key: str) -> Any: ... 

892 

893 def __setattr__(self, key: str, value: Any) -> None: ... 

894 

895 def __delattr__(self, key: str) -> None: ... 

896 

897 

898class Options(metaclass=_MetaOptions): 

899 """A cacheable option dictionary with defaults.""" 

900 

901 __slots__ = () 

902 

903 _cache_attrs: Tuple[str, ...] 

904 

905 def __init_subclass__(cls) -> None: 

906 dict_ = cls.__dict__ 

907 cls._cache_attrs = tuple( 

908 sorted( 

909 d 

910 for d in dict_ 

911 if not d.startswith("__") 

912 and d not in ("_cache_key_traversal",) 

913 ) 

914 ) 

915 super().__init_subclass__() 

916 

917 def __init__(self, **kw: Any) -> None: 

918 self.__dict__.update(kw) 

919 

920 def __add__(self, other): 

921 o1 = self.__class__.__new__(self.__class__) 

922 o1.__dict__.update(self.__dict__) 

923 

924 if set(other).difference(self._cache_attrs): 

925 raise TypeError( 

926 "dictionary contains attributes not covered by " 

927 "Options class %s: %r" 

928 % (self, set(other).difference(self._cache_attrs)) 

929 ) 

930 

931 o1.__dict__.update(other) 

932 return o1 

933 

934 def __eq__(self, other): 

935 # TODO: very inefficient. This is used only in test suites 

936 # right now. 

937 for a, b in zip_longest(self._cache_attrs, other._cache_attrs): 

938 if getattr(self, a) != getattr(other, b): 

939 return False 

940 return True 

941 

942 def __repr__(self) -> str: 

943 # TODO: fairly inefficient, used only in debugging right now. 

944 

945 return "%s(%s)" % ( 

946 self.__class__.__name__, 

947 ", ".join( 

948 "%s=%r" % (k, self.__dict__[k]) 

949 for k in self._cache_attrs 

950 if k in self.__dict__ 

951 ), 

952 ) 

953 

954 @classmethod 

955 def isinstance(cls, klass: Type[Any]) -> bool: 

956 return issubclass(cls, klass) 

957 

958 @hybridmethod 

959 def add_to_element(self, name: str, value: str) -> Any: 

960 return self + {name: getattr(self, name) + value} 

961 

962 @hybridmethod 

963 def _state_dict_inst(self) -> Mapping[str, Any]: 

964 return self.__dict__ 

965 

966 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT 

967 

968 @_state_dict_inst.classlevel 

969 def _state_dict(cls) -> Mapping[str, Any]: 

970 return cls._state_dict_const 

971 

972 @classmethod 

973 def safe_merge(cls, other: "Options") -> Any: 

974 d = other._state_dict() 

975 

976 # only support a merge with another object of our class 

977 # and which does not have attrs that we don't. otherwise 

978 # we risk having state that might not be part of our cache 

979 # key strategy 

980 

981 if ( 

982 cls is not other.__class__ 

983 and other._cache_attrs 

984 and set(other._cache_attrs).difference(cls._cache_attrs) 

985 ): 

986 raise TypeError( 

987 "other element %r is not empty, is not of type %s, " 

988 "and contains attributes not covered here %r" 

989 % ( 

990 other, 

991 cls, 

992 set(other._cache_attrs).difference(cls._cache_attrs), 

993 ) 

994 ) 

995 return cls + d 

996 

997 @classmethod 

998 def from_execution_options( 

999 cls, 

1000 key: str, 

1001 attrs: set[str], 

1002 exec_options: Mapping[str, Any], 

1003 statement_exec_options: Mapping[str, Any], 

1004 ) -> Tuple["Options", Mapping[str, Any]]: 

1005 """process Options argument in terms of execution options. 

1006 

1007 

1008 e.g.:: 

1009 

1010 ( 

1011 load_options, 

1012 execution_options, 

1013 ) = QueryContext.default_load_options.from_execution_options( 

1014 "_sa_orm_load_options", 

1015 {"populate_existing", "autoflush", "yield_per"}, 

1016 execution_options, 

1017 statement._execution_options, 

1018 ) 

1019 

1020 get back the Options and refresh "_sa_orm_load_options" in the 

1021 exec options dict w/ the Options as well 

1022 

1023 """ 

1024 

1025 # common case is that no options we are looking for are 

1026 # in either dictionary, so cancel for that first 

1027 check_argnames = attrs.intersection( 

1028 set(exec_options).union(statement_exec_options) 

1029 ) 

1030 

1031 existing_options = exec_options.get(key, cls) 

1032 

1033 if check_argnames: 

1034 result = {} 

1035 for argname in check_argnames: 

1036 local = "_" + argname 

1037 if argname in exec_options: 

1038 result[local] = exec_options[argname] 

1039 elif argname in statement_exec_options: 

1040 result[local] = statement_exec_options[argname] 

1041 

1042 new_options = existing_options + result 

1043 exec_options = util.immutabledict(exec_options).merge_with( 

1044 {key: new_options} 

1045 ) 

1046 return new_options, exec_options 

1047 

1048 else: 

1049 return existing_options, exec_options 

1050 

1051 if TYPE_CHECKING: 

1052 

1053 def __getattr__(self, key: str) -> Any: ... 

1054 

1055 def __setattr__(self, key: str, value: Any) -> None: ... 

1056 

1057 def __delattr__(self, key: str) -> None: ... 

1058 

1059 

1060class CacheableOptions(Options, HasCacheKey): 

1061 __slots__ = () 

1062 

1063 @hybridmethod 

1064 def _gen_cache_key_inst( 

1065 self, anon_map: Any, bindparams: List[BindParameter[Any]] 

1066 ) -> Optional[Tuple[Any]]: 

1067 return HasCacheKey._gen_cache_key(self, anon_map, bindparams) 

1068 

1069 @_gen_cache_key_inst.classlevel 

1070 def _gen_cache_key( 

1071 cls, anon_map: "anon_map", bindparams: List[BindParameter[Any]] 

1072 ) -> Tuple[CacheableOptions, Any]: 

1073 return (cls, ()) 

1074 

1075 @hybridmethod 

1076 def _generate_cache_key(self) -> Optional[CacheKey]: 

1077 return HasCacheKey._generate_cache_key_for_object(self) 

1078 

1079 

1080class ExecutableOption(HasCopyInternals): 

1081 __slots__ = () 

1082 

1083 _annotations: _ImmutableExecuteOptions = util.EMPTY_DICT 

1084 

1085 __visit_name__: str = "executable_option" 

1086 

1087 _is_has_cache_key: bool = False 

1088 

1089 _is_core: bool = True 

1090 

1091 def _clone(self, **kw): 

1092 """Create a shallow copy of this ExecutableOption.""" 

1093 c = self.__class__.__new__(self.__class__) 

1094 c.__dict__ = dict(self.__dict__) # type: ignore 

1095 return c 

1096 

1097 

1098class Executable(roles.StatementRole): 

1099 """Mark a :class:`_expression.ClauseElement` as supporting execution. 

1100 

1101 :class:`.Executable` is a superclass for all "statement" types 

1102 of objects, including :func:`select`, :func:`delete`, :func:`update`, 

1103 :func:`insert`, :func:`text`. 

1104 

1105 """ 

1106 

1107 supports_execution: bool = True 

1108 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT 

1109 _is_default_generator: bool = False 

1110 _with_options: Tuple[ExecutableOption, ...] = () 

1111 _with_context_options: Tuple[ 

1112 Tuple[Callable[[CompileState], None], Any], ... 

1113 ] = () 

1114 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]] 

1115 

1116 _executable_traverse_internals = [ 

1117 ("_with_options", InternalTraversal.dp_executable_options), 

1118 ( 

1119 "_with_context_options", 

1120 ExtendedInternalTraversal.dp_with_context_options, 

1121 ), 

1122 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs), 

1123 ] 

1124 

1125 is_select: bool = False 

1126 is_from_statement: bool = False 

1127 is_update: bool = False 

1128 is_insert: bool = False 

1129 is_text: bool = False 

1130 is_delete: bool = False 

1131 is_dml: bool = False 

1132 

1133 if TYPE_CHECKING: 

1134 __visit_name__: str 

1135 

1136 def _compile_w_cache( 

1137 self, 

1138 dialect: Dialect, 

1139 *, 

1140 compiled_cache: Optional[CompiledCacheType], 

1141 column_keys: List[str], 

1142 for_executemany: bool = False, 

1143 schema_translate_map: Optional[SchemaTranslateMapType] = None, 

1144 **kw: Any, 

1145 ) -> Tuple[ 

1146 Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats 

1147 ]: ... 

1148 

1149 def _execute_on_connection( 

1150 self, 

1151 connection: Connection, 

1152 distilled_params: _CoreMultiExecuteParams, 

1153 execution_options: CoreExecuteOptionsParameter, 

1154 ) -> CursorResult[Any]: ... 

1155 

1156 def _execute_on_scalar( 

1157 self, 

1158 connection: Connection, 

1159 distilled_params: _CoreMultiExecuteParams, 

1160 execution_options: CoreExecuteOptionsParameter, 

1161 ) -> Any: ... 

1162 

1163 @util.ro_non_memoized_property 

1164 def _all_selected_columns(self) -> _SelectIterable: 

1165 raise NotImplementedError() 

1166 

1167 @property 

1168 def _effective_plugin_target(self) -> str: 

1169 return self.__visit_name__ 

1170 

1171 @_generative 

1172 def options(self, *options: ExecutableOption) -> Self: 

1173 """Apply options to this statement. 

1174 

1175 In the general sense, options are any kind of Python object 

1176 that can be interpreted by the SQL compiler for the statement. 

1177 These options can be consumed by specific dialects or specific kinds 

1178 of compilers. 

1179 

1180 The most commonly known kind of option are the ORM level options 

1181 that apply "eager load" and other loading behaviors to an ORM 

1182 query. However, options can theoretically be used for many other 

1183 purposes. 

1184 

1185 For background on specific kinds of options for specific kinds of 

1186 statements, refer to the documentation for those option objects. 

1187 

1188 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to 

1189 Core statement objects towards the goal of allowing unified 

1190 Core / ORM querying capabilities. 

1191 

1192 .. seealso:: 

1193 

1194 :ref:`loading_columns` - refers to options specific to the usage 

1195 of ORM queries 

1196 

1197 :ref:`relationship_loader_options` - refers to options specific 

1198 to the usage of ORM queries 

1199 

1200 """ 

1201 self._with_options += tuple( 

1202 coercions.expect(roles.ExecutableOptionRole, opt) 

1203 for opt in options 

1204 ) 

1205 return self 

1206 

1207 @_generative 

1208 def _set_compile_options(self, compile_options: CacheableOptions) -> Self: 

1209 """Assign the compile options to a new value. 

1210 

1211 :param compile_options: appropriate CacheableOptions structure 

1212 

1213 """ 

1214 

1215 self._compile_options = compile_options 

1216 return self 

1217 

1218 @_generative 

1219 def _update_compile_options(self, options: CacheableOptions) -> Self: 

1220 """update the _compile_options with new keys.""" 

1221 

1222 assert self._compile_options is not None 

1223 self._compile_options += options 

1224 return self 

1225 

1226 @_generative 

1227 def _add_context_option( 

1228 self, 

1229 callable_: Callable[[CompileState], None], 

1230 cache_args: Any, 

1231 ) -> Self: 

1232 """Add a context option to this statement. 

1233 

1234 These are callable functions that will 

1235 be given the CompileState object upon compilation. 

1236 

1237 A second argument cache_args is required, which will be combined with 

1238 the ``__code__`` identity of the function itself in order to produce a 

1239 cache key. 

1240 

1241 """ 

1242 self._with_context_options += ((callable_, cache_args),) 

1243 return self 

1244 

1245 @overload 

1246 def execution_options( 

1247 self, 

1248 *, 

1249 compiled_cache: Optional[CompiledCacheType] = ..., 

1250 logging_token: str = ..., 

1251 isolation_level: IsolationLevel = ..., 

1252 no_parameters: bool = False, 

1253 stream_results: bool = False, 

1254 max_row_buffer: int = ..., 

1255 yield_per: int = ..., 

1256 insertmanyvalues_page_size: int = ..., 

1257 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

1258 populate_existing: bool = False, 

1259 autoflush: bool = False, 

1260 synchronize_session: SynchronizeSessionArgument = ..., 

1261 dml_strategy: DMLStrategyArgument = ..., 

1262 render_nulls: bool = ..., 

1263 is_delete_using: bool = ..., 

1264 is_update_from: bool = ..., 

1265 preserve_rowcount: bool = False, 

1266 **opt: Any, 

1267 ) -> Self: ... 

1268 

1269 @overload 

1270 def execution_options(self, **opt: Any) -> Self: ... 

1271 

1272 @_generative 

1273 def execution_options(self, **kw: Any) -> Self: 

1274 """Set non-SQL options for the statement which take effect during 

1275 execution. 

1276 

1277 Execution options can be set at many scopes, including per-statement, 

1278 per-connection, or per execution, using methods such as 

1279 :meth:`_engine.Connection.execution_options` and parameters which 

1280 accept a dictionary of options such as 

1281 :paramref:`_engine.Connection.execute.execution_options` and 

1282 :paramref:`_orm.Session.execute.execution_options`. 

1283 

1284 The primary characteristic of an execution option, as opposed to 

1285 other kinds of options such as ORM loader options, is that 

1286 **execution options never affect the compiled SQL of a query, only 

1287 things that affect how the SQL statement itself is invoked or how 

1288 results are fetched**. That is, execution options are not part of 

1289 what's accommodated by SQL compilation nor are they considered part of 

1290 the cached state of a statement. 

1291 

1292 The :meth:`_sql.Executable.execution_options` method is 

1293 :term:`generative`, as 

1294 is the case for the method as applied to the :class:`_engine.Engine` 

1295 and :class:`_orm.Query` objects, which means when the method is called, 

1296 a copy of the object is returned, which applies the given parameters to 

1297 that new copy, but leaves the original unchanged:: 

1298 

1299 statement = select(table.c.x, table.c.y) 

1300 new_statement = statement.execution_options(my_option=True) 

1301 

1302 An exception to this behavior is the :class:`_engine.Connection` 

1303 object, where the :meth:`_engine.Connection.execution_options` method 

1304 is explicitly **not** generative. 

1305 

1306 The kinds of options that may be passed to 

1307 :meth:`_sql.Executable.execution_options` and other related methods and 

1308 parameter dictionaries include parameters that are explicitly consumed 

1309 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not 

1310 defined by SQLAlchemy, which means the methods and/or parameter 

1311 dictionaries may be used for user-defined parameters that interact with 

1312 custom code, which may access the parameters using methods such as 

1313 :meth:`_sql.Executable.get_execution_options` and 

1314 :meth:`_engine.Connection.get_execution_options`, or within selected 

1315 event hooks using a dedicated ``execution_options`` event parameter 

1316 such as 

1317 :paramref:`_events.ConnectionEvents.before_execute.execution_options` 

1318 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.:: 

1319 

1320 from sqlalchemy import event 

1321 

1322 

1323 @event.listens_for(some_engine, "before_execute") 

1324 def _process_opt(conn, statement, multiparams, params, execution_options): 

1325 "run a SQL function before invoking a statement" 

1326 

1327 if execution_options.get("do_special_thing", False): 

1328 conn.exec_driver_sql("run_special_function()") 

1329 

1330 Within the scope of options that are explicitly recognized by 

1331 SQLAlchemy, most apply to specific classes of objects and not others. 

1332 The most common execution options include: 

1333 

1334 * :paramref:`_engine.Connection.execution_options.isolation_level` - 

1335 sets the isolation level for a connection or a class of connections 

1336 via an :class:`_engine.Engine`. This option is accepted only 

1337 by :class:`_engine.Connection` or :class:`_engine.Engine`. 

1338 

1339 * :paramref:`_engine.Connection.execution_options.stream_results` - 

1340 indicates results should be fetched using a server side cursor; 

1341 this option is accepted by :class:`_engine.Connection`, by the 

1342 :paramref:`_engine.Connection.execute.execution_options` parameter 

1343 on :meth:`_engine.Connection.execute`, and additionally by 

1344 :meth:`_sql.Executable.execution_options` on a SQL statement object, 

1345 as well as by ORM constructs like :meth:`_orm.Session.execute`. 

1346 

1347 * :paramref:`_engine.Connection.execution_options.compiled_cache` - 

1348 indicates a dictionary that will serve as the 

1349 :ref:`SQL compilation cache <sql_caching>` 

1350 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as 

1351 well as for ORM methods like :meth:`_orm.Session.execute`. 

1352 Can be passed as ``None`` to disable caching for statements. 

1353 This option is not accepted by 

1354 :meth:`_sql.Executable.execution_options` as it is inadvisable to 

1355 carry along a compilation cache within a statement object. 

1356 

1357 * :paramref:`_engine.Connection.execution_options.schema_translate_map` 

1358 - a mapping of schema names used by the 

1359 :ref:`Schema Translate Map <schema_translating>` feature, accepted 

1360 by :class:`_engine.Connection`, :class:`_engine.Engine`, 

1361 :class:`_sql.Executable`, as well as by ORM constructs 

1362 like :meth:`_orm.Session.execute`. 

1363 

1364 .. seealso:: 

1365 

1366 :meth:`_engine.Connection.execution_options` 

1367 

1368 :paramref:`_engine.Connection.execute.execution_options` 

1369 

1370 :paramref:`_orm.Session.execute.execution_options` 

1371 

1372 :ref:`orm_queryguide_execution_options` - documentation on all 

1373 ORM-specific execution options 

1374 

1375 """ # noqa: E501 

1376 if "isolation_level" in kw: 

1377 raise exc.ArgumentError( 

1378 "'isolation_level' execution option may only be specified " 

1379 "on Connection.execution_options(), or " 

1380 "per-engine using the isolation_level " 

1381 "argument to create_engine()." 

1382 ) 

1383 if "compiled_cache" in kw: 

1384 raise exc.ArgumentError( 

1385 "'compiled_cache' execution option may only be specified " 

1386 "on Connection.execution_options(), not per statement." 

1387 ) 

1388 self._execution_options = self._execution_options.union(kw) 

1389 return self 

1390 

1391 def get_execution_options(self) -> _ExecuteOptions: 

1392 """Get the non-SQL options which will take effect during execution. 

1393 

1394 .. versionadded:: 1.3 

1395 

1396 .. seealso:: 

1397 

1398 :meth:`.Executable.execution_options` 

1399 """ 

1400 return self._execution_options 

1401 

1402 

1403class SchemaEventTarget(event.EventTarget): 

1404 """Base class for elements that are the targets of :class:`.DDLEvents` 

1405 events. 

1406 

1407 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`. 

1408 

1409 """ 

1410 

1411 dispatch: dispatcher[SchemaEventTarget] 

1412 

1413 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: 

1414 """Associate with this SchemaEvent's parent object.""" 

1415 

1416 def _set_parent_with_dispatch( 

1417 self, parent: SchemaEventTarget, **kw: Any 

1418 ) -> None: 

1419 self.dispatch.before_parent_attach(self, parent) 

1420 self._set_parent(parent, **kw) 

1421 self.dispatch.after_parent_attach(self, parent) 

1422 

1423 

1424class SchemaVisitable(SchemaEventTarget, visitors.Visitable): 

1425 """Base class for elements that are targets of a :class:`.SchemaVisitor`. 

1426 

1427 .. versionadded:: 2.0.41 

1428 

1429 """ 

1430 

1431 

1432class SchemaVisitor(ClauseVisitor): 

1433 """Define the visiting for ``SchemaItem`` and more 

1434 generally ``SchemaVisitable`` objects. 

1435 

1436 """ 

1437 

1438 __traverse_options__: Dict[str, Any] = {"schema_visitor": True} 

1439 

1440 

1441class _SentinelDefaultCharacterization(Enum): 

1442 NONE = "none" 

1443 UNKNOWN = "unknown" 

1444 CLIENTSIDE = "clientside" 

1445 SENTINEL_DEFAULT = "sentinel_default" 

1446 SERVERSIDE = "serverside" 

1447 IDENTITY = "identity" 

1448 SEQUENCE = "sequence" 

1449 

1450 

1451class _SentinelColumnCharacterization(NamedTuple): 

1452 columns: Optional[Sequence[Column[Any]]] = None 

1453 is_explicit: bool = False 

1454 is_autoinc: bool = False 

1455 default_characterization: _SentinelDefaultCharacterization = ( 

1456 _SentinelDefaultCharacterization.NONE 

1457 ) 

1458 

1459 

1460_COLKEY = TypeVar("_COLKEY", Union[None, str], str) 

1461 

1462_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True) 

1463_COL = TypeVar("_COL", bound="ColumnElement[Any]") 

1464 

1465 

1466class _ColumnMetrics(Generic[_COL_co]): 

1467 __slots__ = ("column",) 

1468 

1469 column: _COL_co 

1470 

1471 def __init__( 

1472 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co 

1473 ) -> None: 

1474 self.column = col 

1475 

1476 # proxy_index being non-empty means it was initialized. 

1477 # so we need to update it 

1478 pi = collection._proxy_index 

1479 if pi: 

1480 for eps_col in col._expanded_proxy_set: 

1481 pi[eps_col].add(self) 

1482 

1483 def get_expanded_proxy_set(self) -> FrozenSet[ColumnElement[Any]]: 

1484 return self.column._expanded_proxy_set 

1485 

1486 def dispose(self, collection: ColumnCollection[_COLKEY, _COL_co]) -> None: 

1487 pi = collection._proxy_index 

1488 if not pi: 

1489 return 

1490 for col in self.column._expanded_proxy_set: 

1491 colset = pi.get(col, None) 

1492 if colset: 

1493 colset.discard(self) 

1494 if colset is not None and not colset: 

1495 del pi[col] 

1496 

1497 def embedded( 

1498 self, 

1499 target_set: Union[ 

1500 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]] 

1501 ], 

1502 ) -> bool: 

1503 expanded_proxy_set = self.column._expanded_proxy_set 

1504 for t in target_set.difference(expanded_proxy_set): 

1505 if not expanded_proxy_set.intersection(_expand_cloned([t])): 

1506 return False 

1507 return True 

1508 

1509 

1510class ColumnCollection(Generic[_COLKEY, _COL_co]): 

1511 """Collection of :class:`_expression.ColumnElement` instances, 

1512 typically for 

1513 :class:`_sql.FromClause` objects. 

1514 

1515 The :class:`_sql.ColumnCollection` object is most commonly available 

1516 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection 

1517 on the :class:`_schema.Table` object, introduced at 

1518 :ref:`metadata_tables_and_columns`. 

1519 

1520 The :class:`_expression.ColumnCollection` has both mapping- and sequence- 

1521 like behaviors. A :class:`_expression.ColumnCollection` usually stores 

1522 :class:`_schema.Column` objects, which are then accessible both via mapping 

1523 style access as well as attribute access style. 

1524 

1525 To access :class:`_schema.Column` objects using ordinary attribute-style 

1526 access, specify the name like any other object attribute, such as below 

1527 a column named ``employee_name`` is accessed:: 

1528 

1529 >>> employee_table.c.employee_name 

1530 

1531 To access columns that have names with special characters or spaces, 

1532 index-style access is used, such as below which illustrates a column named 

1533 ``employee ' payment`` is accessed:: 

1534 

1535 >>> employee_table.c["employee ' payment"] 

1536 

1537 As the :class:`_sql.ColumnCollection` object provides a Python dictionary 

1538 interface, common dictionary method names like 

1539 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`, 

1540 and :meth:`_sql.ColumnCollection.items` are available, which means that 

1541 database columns that are keyed under these names also need to use indexed 

1542 access:: 

1543 

1544 >>> employee_table.c["values"] 

1545 

1546 

1547 The name for which a :class:`_schema.Column` would be present is normally 

1548 that of the :paramref:`_schema.Column.key` parameter. In some contexts, 

1549 such as a :class:`_sql.Select` object that uses a label style set 

1550 using the :meth:`_sql.Select.set_label_style` method, a column of a certain 

1551 key may instead be represented under a particular label name such 

1552 as ``tablename_columnname``:: 

1553 

1554 >>> from sqlalchemy import select, column, table 

1555 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL 

1556 >>> t = table("t", column("c")) 

1557 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL) 

1558 >>> subq = stmt.subquery() 

1559 >>> subq.c.t_c 

1560 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c> 

1561 

1562 :class:`.ColumnCollection` also indexes the columns in order and allows 

1563 them to be accessible by their integer position:: 

1564 

1565 >>> cc[0] 

1566 Column('x', Integer(), table=None) 

1567 >>> cc[1] 

1568 Column('y', Integer(), table=None) 

1569 

1570 .. versionadded:: 1.4 :class:`_expression.ColumnCollection` 

1571 allows integer-based 

1572 index access to the collection. 

1573 

1574 Iterating the collection yields the column expressions in order:: 

1575 

1576 >>> list(cc) 

1577 [Column('x', Integer(), table=None), 

1578 Column('y', Integer(), table=None)] 

1579 

1580 The base :class:`_expression.ColumnCollection` object can store 

1581 duplicates, which can 

1582 mean either two columns with the same key, in which case the column 

1583 returned by key access is **arbitrary**:: 

1584 

1585 >>> x1, x2 = Column("x", Integer), Column("x", Integer) 

1586 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)]) 

1587 >>> list(cc) 

1588 [Column('x', Integer(), table=None), 

1589 Column('x', Integer(), table=None)] 

1590 >>> cc["x"] is x1 

1591 False 

1592 >>> cc["x"] is x2 

1593 True 

1594 

1595 Or it can also mean the same column multiple times. These cases are 

1596 supported as :class:`_expression.ColumnCollection` 

1597 is used to represent the columns in 

1598 a SELECT statement which may include duplicates. 

1599 

1600 A special subclass :class:`.DedupeColumnCollection` exists which instead 

1601 maintains SQLAlchemy's older behavior of not allowing duplicates; this 

1602 collection is used for schema level objects like :class:`_schema.Table` 

1603 and 

1604 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The 

1605 :class:`.DedupeColumnCollection` class also has additional mutation methods 

1606 as the schema constructs have more use cases that require removal and 

1607 replacement of columns. 

1608 

1609 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection` 

1610 now stores duplicate 

1611 column keys as well as the same column in multiple positions. The 

1612 :class:`.DedupeColumnCollection` class is added to maintain the 

1613 former behavior in those cases where deduplication as well as 

1614 additional replace/remove operations are needed. 

1615 

1616 

1617 """ 

1618 

1619 __slots__ = ("_collection", "_index", "_colset", "_proxy_index") 

1620 

1621 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]] 

1622 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]] 

1623 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]] 

1624 _colset: Set[_COL_co] 

1625 

1626 def __init__( 

1627 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None 

1628 ): 

1629 object.__setattr__(self, "_colset", set()) 

1630 object.__setattr__(self, "_index", {}) 

1631 object.__setattr__( 

1632 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

1633 ) 

1634 object.__setattr__(self, "_collection", []) 

1635 if columns: 

1636 self._initial_populate(columns) 

1637 

1638 @util.preload_module("sqlalchemy.sql.elements") 

1639 def __clause_element__(self) -> ClauseList: 

1640 elements = util.preloaded.sql_elements 

1641 

1642 return elements.ClauseList( 

1643 _literal_as_text_role=roles.ColumnsClauseRole, 

1644 group=False, 

1645 *self._all_columns, 

1646 ) 

1647 

1648 def _initial_populate( 

1649 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

1650 ) -> None: 

1651 self._populate_separate_keys(iter_) 

1652 

1653 @property 

1654 def _all_columns(self) -> List[_COL_co]: 

1655 return [col for (_, col, _) in self._collection] 

1656 

1657 def keys(self) -> List[_COLKEY]: 

1658 """Return a sequence of string key names for all columns in this 

1659 collection.""" 

1660 return [k for (k, _, _) in self._collection] 

1661 

1662 def values(self) -> List[_COL_co]: 

1663 """Return a sequence of :class:`_sql.ColumnClause` or 

1664 :class:`_schema.Column` objects for all columns in this 

1665 collection.""" 

1666 return [col for (_, col, _) in self._collection] 

1667 

1668 def items(self) -> List[Tuple[_COLKEY, _COL_co]]: 

1669 """Return a sequence of (key, column) tuples for all columns in this 

1670 collection each consisting of a string key name and a 

1671 :class:`_sql.ColumnClause` or 

1672 :class:`_schema.Column` object. 

1673 """ 

1674 

1675 return [(k, col) for (k, col, _) in self._collection] 

1676 

1677 def __bool__(self) -> bool: 

1678 return bool(self._collection) 

1679 

1680 def __len__(self) -> int: 

1681 return len(self._collection) 

1682 

1683 def __iter__(self) -> Iterator[_COL_co]: 

1684 # turn to a list first to maintain over a course of changes 

1685 return iter([col for _, col, _ in self._collection]) 

1686 

1687 @overload 

1688 def __getitem__(self, key: Union[str, int]) -> _COL_co: ... 

1689 

1690 @overload 

1691 def __getitem__( 

1692 self, key: Tuple[Union[str, int], ...] 

1693 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1694 

1695 @overload 

1696 def __getitem__( 

1697 self, key: slice 

1698 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1699 

1700 def __getitem__( 

1701 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]] 

1702 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]: 

1703 try: 

1704 if isinstance(key, (tuple, slice)): 

1705 if isinstance(key, slice): 

1706 cols = ( 

1707 (sub_key, col) 

1708 for (sub_key, col, _) in self._collection[key] 

1709 ) 

1710 else: 

1711 cols = (self._index[sub_key] for sub_key in key) 

1712 

1713 return ColumnCollection(cols).as_readonly() 

1714 else: 

1715 return self._index[key][1] 

1716 except KeyError as err: 

1717 if isinstance(err.args[0], int): 

1718 raise IndexError(err.args[0]) from err 

1719 else: 

1720 raise 

1721 

1722 def __getattr__(self, key: str) -> _COL_co: 

1723 try: 

1724 return self._index[key][1] 

1725 except KeyError as err: 

1726 raise AttributeError(key) from err 

1727 

1728 def __contains__(self, key: str) -> bool: 

1729 if key not in self._index: 

1730 if not isinstance(key, str): 

1731 raise exc.ArgumentError( 

1732 "__contains__ requires a string argument" 

1733 ) 

1734 return False 

1735 else: 

1736 return True 

1737 

1738 def compare(self, other: ColumnCollection[_COLKEY, _COL_co]) -> bool: 

1739 """Compare this :class:`_expression.ColumnCollection` to another 

1740 based on the names of the keys""" 

1741 

1742 for l, r in zip_longest(self, other): 

1743 if l is not r: 

1744 return False 

1745 else: 

1746 return True 

1747 

1748 def __eq__(self, other: Any) -> bool: 

1749 return self.compare(other) 

1750 

1751 @overload 

1752 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ... 

1753 

1754 @overload 

1755 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ... 

1756 

1757 def get( 

1758 self, key: str, default: Optional[_COL] = None 

1759 ) -> Optional[Union[_COL_co, _COL]]: 

1760 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object 

1761 based on a string key name from this 

1762 :class:`_expression.ColumnCollection`.""" 

1763 

1764 if key in self._index: 

1765 return self._index[key][1] 

1766 else: 

1767 return default 

1768 

1769 def __str__(self) -> str: 

1770 return "%s(%s)" % ( 

1771 self.__class__.__name__, 

1772 ", ".join(str(c) for c in self), 

1773 ) 

1774 

1775 def __setitem__(self, key: str, value: Any) -> NoReturn: 

1776 raise NotImplementedError() 

1777 

1778 def __delitem__(self, key: str) -> NoReturn: 

1779 raise NotImplementedError() 

1780 

1781 def __setattr__(self, key: str, obj: Any) -> NoReturn: 

1782 raise NotImplementedError() 

1783 

1784 def clear(self) -> NoReturn: 

1785 """Dictionary clear() is not implemented for 

1786 :class:`_sql.ColumnCollection`.""" 

1787 raise NotImplementedError() 

1788 

1789 def remove(self, column: Any) -> NoReturn: 

1790 raise NotImplementedError() 

1791 

1792 def update(self, iter_: Any) -> NoReturn: 

1793 """Dictionary update() is not implemented for 

1794 :class:`_sql.ColumnCollection`.""" 

1795 raise NotImplementedError() 

1796 

1797 # https://github.com/python/mypy/issues/4266 

1798 __hash__: Optional[int] = None # type: ignore 

1799 

1800 def _populate_separate_keys( 

1801 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

1802 ) -> None: 

1803 """populate from an iterator of (key, column)""" 

1804 

1805 self._collection[:] = collection = [ 

1806 (k, c, _ColumnMetrics(self, c)) for k, c in iter_ 

1807 ] 

1808 self._colset.update(c._deannotate() for _, c, _ in collection) 

1809 self._index.update( 

1810 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)} 

1811 ) 

1812 self._index.update({k: (k, col) for k, col, _ in reversed(collection)}) 

1813 

1814 def add( 

1815 self, column: ColumnElement[Any], key: Optional[_COLKEY] = None 

1816 ) -> None: 

1817 """Add a column to this :class:`_sql.ColumnCollection`. 

1818 

1819 .. note:: 

1820 

1821 This method is **not normally used by user-facing code**, as the 

1822 :class:`_sql.ColumnCollection` is usually part of an existing 

1823 object such as a :class:`_schema.Table`. To add a 

1824 :class:`_schema.Column` to an existing :class:`_schema.Table` 

1825 object, use the :meth:`_schema.Table.append_column` method. 

1826 

1827 """ 

1828 colkey: _COLKEY 

1829 

1830 if key is None: 

1831 colkey = column.key # type: ignore 

1832 else: 

1833 colkey = key 

1834 

1835 l = len(self._collection) 

1836 

1837 # don't really know how this part is supposed to work w/ the 

1838 # covariant thing 

1839 

1840 _column = cast(_COL_co, column) 

1841 

1842 self._collection.append( 

1843 (colkey, _column, _ColumnMetrics(self, _column)) 

1844 ) 

1845 self._colset.add(_column._deannotate()) 

1846 self._index[l] = (colkey, _column) 

1847 if colkey not in self._index: 

1848 self._index[colkey] = (colkey, _column) 

1849 

1850 def __getstate__(self) -> Dict[str, Any]: 

1851 return { 

1852 "_collection": [(k, c) for k, c, _ in self._collection], 

1853 "_index": self._index, 

1854 } 

1855 

1856 def __setstate__(self, state: Dict[str, Any]) -> None: 

1857 object.__setattr__(self, "_index", state["_index"]) 

1858 object.__setattr__( 

1859 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

1860 ) 

1861 object.__setattr__( 

1862 self, 

1863 "_collection", 

1864 [ 

1865 (k, c, _ColumnMetrics(self, c)) 

1866 for (k, c) in state["_collection"] 

1867 ], 

1868 ) 

1869 object.__setattr__( 

1870 self, "_colset", {col for k, col, _ in self._collection} 

1871 ) 

1872 

1873 def contains_column(self, col: ColumnElement[Any]) -> bool: 

1874 """Checks if a column object exists in this collection""" 

1875 if col not in self._colset: 

1876 if isinstance(col, str): 

1877 raise exc.ArgumentError( 

1878 "contains_column cannot be used with string arguments. " 

1879 "Use ``col_name in table.c`` instead." 

1880 ) 

1881 return False 

1882 else: 

1883 return True 

1884 

1885 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: 

1886 """Return a "read only" form of this 

1887 :class:`_sql.ColumnCollection`.""" 

1888 

1889 return ReadOnlyColumnCollection(self) 

1890 

1891 def _init_proxy_index(self) -> None: 

1892 """populate the "proxy index", if empty. 

1893 

1894 proxy index is added in 2.0 to provide more efficient operation 

1895 for the corresponding_column() method. 

1896 

1897 For reasons of both time to construct new .c collections as well as 

1898 memory conservation for large numbers of large .c collections, the 

1899 proxy_index is only filled if corresponding_column() is called. once 

1900 filled it stays that way, and new _ColumnMetrics objects created after 

1901 that point will populate it with new data. Note this case would be 

1902 unusual, if not nonexistent, as it means a .c collection is being 

1903 mutated after corresponding_column() were used, however it is tested in 

1904 test/base/test_utils.py. 

1905 

1906 """ 

1907 pi = self._proxy_index 

1908 if pi: 

1909 return 

1910 

1911 for _, _, metrics in self._collection: 

1912 eps = metrics.column._expanded_proxy_set 

1913 

1914 for eps_col in eps: 

1915 pi[eps_col].add(metrics) 

1916 

1917 def corresponding_column( 

1918 self, column: _COL, require_embedded: bool = False 

1919 ) -> Optional[Union[_COL, _COL_co]]: 

1920 """Given a :class:`_expression.ColumnElement`, return the exported 

1921 :class:`_expression.ColumnElement` object from this 

1922 :class:`_expression.ColumnCollection` 

1923 which corresponds to that original :class:`_expression.ColumnElement` 

1924 via a common 

1925 ancestor column. 

1926 

1927 :param column: the target :class:`_expression.ColumnElement` 

1928 to be matched. 

1929 

1930 :param require_embedded: only return corresponding columns for 

1931 the given :class:`_expression.ColumnElement`, if the given 

1932 :class:`_expression.ColumnElement` 

1933 is actually present within a sub-element 

1934 of this :class:`_expression.Selectable`. 

1935 Normally the column will match if 

1936 it merely shares a common ancestor with one of the exported 

1937 columns of this :class:`_expression.Selectable`. 

1938 

1939 .. seealso:: 

1940 

1941 :meth:`_expression.Selectable.corresponding_column` 

1942 - invokes this method 

1943 against the collection returned by 

1944 :attr:`_expression.Selectable.exported_columns`. 

1945 

1946 .. versionchanged:: 1.4 the implementation for ``corresponding_column`` 

1947 was moved onto the :class:`_expression.ColumnCollection` itself. 

1948 

1949 """ 

1950 # TODO: cython candidate 

1951 

1952 # don't dig around if the column is locally present 

1953 if column in self._colset: 

1954 return column 

1955 

1956 selected_intersection, selected_metrics = None, None 

1957 target_set = column.proxy_set 

1958 

1959 pi = self._proxy_index 

1960 if not pi: 

1961 self._init_proxy_index() 

1962 

1963 for current_metrics in ( 

1964 mm for ts in target_set if ts in pi for mm in pi[ts] 

1965 ): 

1966 if not require_embedded or current_metrics.embedded(target_set): 

1967 if selected_metrics is None: 

1968 # no corresponding column yet, pick this one. 

1969 selected_metrics = current_metrics 

1970 continue 

1971 

1972 current_intersection = target_set.intersection( 

1973 current_metrics.column._expanded_proxy_set 

1974 ) 

1975 if selected_intersection is None: 

1976 selected_intersection = target_set.intersection( 

1977 selected_metrics.column._expanded_proxy_set 

1978 ) 

1979 

1980 if len(current_intersection) > len(selected_intersection): 

1981 # 'current' has a larger field of correspondence than 

1982 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x 

1983 # matches a1.c.x->table.c.x better than 

1984 # selectable.c.x->table.c.x does. 

1985 

1986 selected_metrics = current_metrics 

1987 selected_intersection = current_intersection 

1988 elif current_intersection == selected_intersection: 

1989 # they have the same field of correspondence. see 

1990 # which proxy_set has fewer columns in it, which 

1991 # indicates a closer relationship with the root 

1992 # column. Also take into account the "weight" 

1993 # attribute which CompoundSelect() uses to give 

1994 # higher precedence to columns based on vertical 

1995 # position in the compound statement, and discard 

1996 # columns that have no reference to the target 

1997 # column (also occurs with CompoundSelect) 

1998 

1999 selected_col_distance = sum( 

2000 [ 

2001 sc._annotations.get("weight", 1) 

2002 for sc in ( 

2003 selected_metrics.column._uncached_proxy_list() 

2004 ) 

2005 if sc.shares_lineage(column) 

2006 ], 

2007 ) 

2008 current_col_distance = sum( 

2009 [ 

2010 sc._annotations.get("weight", 1) 

2011 for sc in ( 

2012 current_metrics.column._uncached_proxy_list() 

2013 ) 

2014 if sc.shares_lineage(column) 

2015 ], 

2016 ) 

2017 if current_col_distance < selected_col_distance: 

2018 selected_metrics = current_metrics 

2019 selected_intersection = current_intersection 

2020 

2021 return selected_metrics.column if selected_metrics else None 

2022 

2023 

2024_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]") 

2025 

2026 

2027class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]): 

2028 """A :class:`_expression.ColumnCollection` 

2029 that maintains deduplicating behavior. 

2030 

2031 This is useful by schema level objects such as :class:`_schema.Table` and 

2032 :class:`.PrimaryKeyConstraint`. The collection includes more 

2033 sophisticated mutator methods as well to suit schema objects which 

2034 require mutable column collections. 

2035 

2036 .. versionadded:: 1.4 

2037 

2038 """ 

2039 

2040 def add( # type: ignore[override] 

2041 self, column: _NAMEDCOL, key: Optional[str] = None 

2042 ) -> None: 

2043 if key is not None and column.key != key: 

2044 raise exc.ArgumentError( 

2045 "DedupeColumnCollection requires columns be under " 

2046 "the same key as their .key" 

2047 ) 

2048 key = column.key 

2049 

2050 if key is None: 

2051 raise exc.ArgumentError( 

2052 "Can't add unnamed column to column collection" 

2053 ) 

2054 

2055 if key in self._index: 

2056 existing = self._index[key][1] 

2057 

2058 if existing is column: 

2059 return 

2060 

2061 self.replace(column) 

2062 

2063 # pop out memoized proxy_set as this 

2064 # operation may very well be occurring 

2065 # in a _make_proxy operation 

2066 util.memoized_property.reset(column, "proxy_set") 

2067 else: 

2068 self._append_new_column(key, column) 

2069 

2070 def _append_new_column(self, key: str, named_column: _NAMEDCOL) -> None: 

2071 l = len(self._collection) 

2072 self._collection.append( 

2073 (key, named_column, _ColumnMetrics(self, named_column)) 

2074 ) 

2075 self._colset.add(named_column._deannotate()) 

2076 self._index[l] = (key, named_column) 

2077 self._index[key] = (key, named_column) 

2078 

2079 def _populate_separate_keys( 

2080 self, iter_: Iterable[Tuple[str, _NAMEDCOL]] 

2081 ) -> None: 

2082 """populate from an iterator of (key, column)""" 

2083 cols = list(iter_) 

2084 

2085 replace_col = [] 

2086 for k, col in cols: 

2087 if col.key != k: 

2088 raise exc.ArgumentError( 

2089 "DedupeColumnCollection requires columns be under " 

2090 "the same key as their .key" 

2091 ) 

2092 if col.name in self._index and col.key != col.name: 

2093 replace_col.append(col) 

2094 elif col.key in self._index: 

2095 replace_col.append(col) 

2096 else: 

2097 self._index[k] = (k, col) 

2098 self._collection.append((k, col, _ColumnMetrics(self, col))) 

2099 self._colset.update(c._deannotate() for (k, c, _) in self._collection) 

2100 

2101 self._index.update( 

2102 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection) 

2103 ) 

2104 for col in replace_col: 

2105 self.replace(col) 

2106 

2107 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None: 

2108 self._populate_separate_keys((col.key, col) for col in iter_) 

2109 

2110 def remove(self, column: _NAMEDCOL) -> None: # type: ignore[override] 

2111 if column not in self._colset: 

2112 raise ValueError( 

2113 "Can't remove column %r; column is not in this collection" 

2114 % column 

2115 ) 

2116 del self._index[column.key] 

2117 self._colset.remove(column) 

2118 self._collection[:] = [ 

2119 (k, c, metrics) 

2120 for (k, c, metrics) in self._collection 

2121 if c is not column 

2122 ] 

2123 for metrics in self._proxy_index.get(column, ()): 

2124 metrics.dispose(self) 

2125 

2126 self._index.update( 

2127 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2128 ) 

2129 # delete higher index 

2130 del self._index[len(self._collection)] 

2131 

2132 def replace( 

2133 self, 

2134 column: _NAMEDCOL, 

2135 extra_remove: Optional[Iterable[_NAMEDCOL]] = None, 

2136 ) -> None: 

2137 """add the given column to this collection, removing unaliased 

2138 versions of this column as well as existing columns with the 

2139 same key. 

2140 

2141 e.g.:: 

2142 

2143 t = Table("sometable", metadata, Column("col1", Integer)) 

2144 t.columns.replace(Column("col1", Integer, key="columnone")) 

2145 

2146 will remove the original 'col1' from the collection, and add 

2147 the new column under the name 'columnname'. 

2148 

2149 Used by schema.Column to override columns during table reflection. 

2150 

2151 """ 

2152 

2153 if extra_remove: 

2154 remove_col = set(extra_remove) 

2155 else: 

2156 remove_col = set() 

2157 # remove up to two columns based on matches of name as well as key 

2158 if column.name in self._index and column.key != column.name: 

2159 other = self._index[column.name][1] 

2160 if other.name == other.key: 

2161 remove_col.add(other) 

2162 

2163 if column.key in self._index: 

2164 remove_col.add(self._index[column.key][1]) 

2165 

2166 if not remove_col: 

2167 self._append_new_column(column.key, column) 

2168 return 

2169 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = [] 

2170 replaced = False 

2171 for k, col, metrics in self._collection: 

2172 if col in remove_col: 

2173 if not replaced: 

2174 replaced = True 

2175 new_cols.append( 

2176 (column.key, column, _ColumnMetrics(self, column)) 

2177 ) 

2178 else: 

2179 new_cols.append((k, col, metrics)) 

2180 

2181 if remove_col: 

2182 self._colset.difference_update(remove_col) 

2183 

2184 for rc in remove_col: 

2185 for metrics in self._proxy_index.get(rc, ()): 

2186 metrics.dispose(self) 

2187 

2188 if not replaced: 

2189 new_cols.append((column.key, column, _ColumnMetrics(self, column))) 

2190 

2191 self._colset.add(column._deannotate()) 

2192 self._collection[:] = new_cols 

2193 

2194 self._index.clear() 

2195 

2196 self._index.update( 

2197 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2198 ) 

2199 self._index.update({k: (k, col) for (k, col, _) in self._collection}) 

2200 

2201 

2202class ReadOnlyColumnCollection( 

2203 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co] 

2204): 

2205 __slots__ = ("_parent",) 

2206 

2207 def __init__(self, collection: ColumnCollection[_COLKEY, _COL_co]): 

2208 object.__setattr__(self, "_parent", collection) 

2209 object.__setattr__(self, "_colset", collection._colset) 

2210 object.__setattr__(self, "_index", collection._index) 

2211 object.__setattr__(self, "_collection", collection._collection) 

2212 object.__setattr__(self, "_proxy_index", collection._proxy_index) 

2213 

2214 def __getstate__(self) -> Dict[str, _COL_co]: 

2215 return {"_parent": self._parent} 

2216 

2217 def __setstate__(self, state: Dict[str, Any]) -> None: 

2218 parent = state["_parent"] 

2219 self.__init__(parent) # type: ignore 

2220 

2221 def add(self, column: Any, key: Any = ...) -> Any: 

2222 self._readonly() 

2223 

2224 def extend(self, elements: Any) -> NoReturn: 

2225 self._readonly() 

2226 

2227 def remove(self, item: Any) -> NoReturn: 

2228 self._readonly() 

2229 

2230 

2231class ColumnSet(util.OrderedSet["ColumnClause[Any]"]): 

2232 def contains_column(self, col: ColumnClause[Any]) -> bool: 

2233 return col in self 

2234 

2235 def extend(self, cols: Iterable[Any]) -> None: 

2236 for col in cols: 

2237 self.add(col) 

2238 

2239 def __eq__(self, other): 

2240 l = [] 

2241 for c in other: 

2242 for local in self: 

2243 if c.shares_lineage(local): 

2244 l.append(c == local) 

2245 return elements.and_(*l) 

2246 

2247 def __hash__(self) -> int: # type: ignore[override] 

2248 return hash(tuple(x for x in self)) 

2249 

2250 

2251def _entity_namespace( 

2252 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2253) -> _EntityNamespace: 

2254 """Return the nearest .entity_namespace for the given entity. 

2255 

2256 If not immediately available, does an iterate to find a sub-element 

2257 that has one, if any. 

2258 

2259 """ 

2260 try: 

2261 return cast(_HasEntityNamespace, entity).entity_namespace 

2262 except AttributeError: 

2263 for elem in visitors.iterate(cast(ExternallyTraversible, entity)): 

2264 if _is_has_entity_namespace(elem): 

2265 return elem.entity_namespace 

2266 else: 

2267 raise 

2268 

2269 

2270def _entity_namespace_key( 

2271 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2272 key: str, 

2273 default: Union[SQLCoreOperations[Any], _NoArg] = NO_ARG, 

2274) -> SQLCoreOperations[Any]: 

2275 """Return an entry from an entity_namespace. 

2276 

2277 

2278 Raises :class:`_exc.InvalidRequestError` rather than attribute error 

2279 on not found. 

2280 

2281 """ 

2282 

2283 try: 

2284 ns = _entity_namespace(entity) 

2285 if default is not NO_ARG: 

2286 return getattr(ns, key, default) 

2287 else: 

2288 return getattr(ns, key) # type: ignore 

2289 except AttributeError as err: 

2290 raise exc.InvalidRequestError( 

2291 'Entity namespace for "%s" has no property "%s"' % (entity, key) 

2292 ) from err