Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/base.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

917 statements  

1# sql/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Foundational utilities common to many sql modules.""" 

10 

11 

12from __future__ import annotations 

13 

14import collections 

15from enum import Enum 

16import itertools 

17from itertools import zip_longest 

18import operator 

19import re 

20from typing import Any 

21from typing import Callable 

22from typing import cast 

23from typing import Dict 

24from typing import FrozenSet 

25from typing import Generic 

26from typing import Iterable 

27from typing import Iterator 

28from typing import List 

29from typing import Mapping 

30from typing import MutableMapping 

31from typing import NamedTuple 

32from typing import NoReturn 

33from typing import Optional 

34from typing import overload 

35from typing import Protocol 

36from typing import Sequence 

37from typing import Set 

38from typing import Tuple 

39from typing import Type 

40from typing import TYPE_CHECKING 

41from typing import TypeVar 

42from typing import Union 

43 

44from . import roles 

45from . import visitors 

46from .cache_key import HasCacheKey # noqa 

47from .cache_key import MemoizedHasCacheKey # noqa 

48from .traversals import HasCopyInternals # noqa 

49from .visitors import ClauseVisitor 

50from .visitors import ExtendedInternalTraversal 

51from .visitors import ExternallyTraversible 

52from .visitors import InternalTraversal 

53from .. import event 

54from .. import exc 

55from .. import util 

56from ..util import HasMemoized as HasMemoized 

57from ..util import hybridmethod 

58from ..util.typing import Self 

59from ..util.typing import TypeGuard 

60from ..util.typing import TypeVarTuple 

61from ..util.typing import Unpack 

62 

63if TYPE_CHECKING: 

64 from . import coercions 

65 from . import elements 

66 from . import type_api 

67 from ._orm_types import DMLStrategyArgument 

68 from ._orm_types import SynchronizeSessionArgument 

69 from ._typing import _CLE 

70 from .compiler import SQLCompiler 

71 from .dml import Delete 

72 from .dml import Insert 

73 from .dml import Update 

74 from .elements import BindParameter 

75 from .elements import ClauseElement 

76 from .elements import ClauseList 

77 from .elements import ColumnClause # noqa 

78 from .elements import ColumnElement 

79 from .elements import NamedColumn 

80 from .elements import SQLCoreOperations 

81 from .elements import TextClause 

82 from .schema import Column 

83 from .schema import DefaultGenerator 

84 from .selectable import _JoinTargetElement 

85 from .selectable import _SelectIterable 

86 from .selectable import FromClause 

87 from .selectable import Select 

88 from ..engine import Connection 

89 from ..engine import CursorResult 

90 from ..engine.interfaces import _CoreMultiExecuteParams 

91 from ..engine.interfaces import _ExecuteOptions 

92 from ..engine.interfaces import _ImmutableExecuteOptions 

93 from ..engine.interfaces import CacheStats 

94 from ..engine.interfaces import Compiled 

95 from ..engine.interfaces import CompiledCacheType 

96 from ..engine.interfaces import CoreExecuteOptionsParameter 

97 from ..engine.interfaces import Dialect 

98 from ..engine.interfaces import IsolationLevel 

99 from ..engine.interfaces import SchemaTranslateMapType 

100 from ..event import dispatcher 

101 

102if not TYPE_CHECKING: 

103 coercions = None # noqa 

104 elements = None # noqa 

105 type_api = None # noqa 

106 

107 

108_Ts = TypeVarTuple("_Ts") 

109 

110 

111class _NoArg(Enum): 

112 NO_ARG = 0 

113 

114 def __repr__(self): 

115 return f"_NoArg.{self.name}" 

116 

117 

118NO_ARG = _NoArg.NO_ARG 

119 

120 

121class _NoneName(Enum): 

122 NONE_NAME = 0 

123 """indicate a 'deferred' name that was ultimately the value None.""" 

124 

125 

126_NONE_NAME = _NoneName.NONE_NAME 

127 

128_T = TypeVar("_T", bound=Any) 

129 

130_Fn = TypeVar("_Fn", bound=Callable[..., Any]) 

131 

132_AmbiguousTableNameMap = MutableMapping[str, str] 

133 

134 

135class _DefaultDescriptionTuple(NamedTuple): 

136 arg: Any 

137 is_scalar: Optional[bool] 

138 is_callable: Optional[bool] 

139 is_sentinel: Optional[bool] 

140 

141 @classmethod 

142 def _from_column_default( 

143 cls, default: Optional[DefaultGenerator] 

144 ) -> _DefaultDescriptionTuple: 

145 return ( 

146 _DefaultDescriptionTuple( 

147 default.arg, # type: ignore 

148 default.is_scalar, 

149 default.is_callable, 

150 default.is_sentinel, 

151 ) 

152 if default 

153 and ( 

154 default.has_arg 

155 or (not default.for_update and default.is_sentinel) 

156 ) 

157 else _DefaultDescriptionTuple(None, None, None, None) 

158 ) 

159 

160 

161_never_select_column = operator.attrgetter("_omit_from_statements") 

162 

163 

164class _EntityNamespace(Protocol): 

165 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ... 

166 

167 

168class _HasEntityNamespace(Protocol): 

169 @util.ro_non_memoized_property 

170 def entity_namespace(self) -> _EntityNamespace: ... 

171 

172 

173def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]: 

174 return hasattr(element, "entity_namespace") 

175 

176 

177# Remove when https://github.com/python/mypy/issues/14640 will be fixed 

178_Self = TypeVar("_Self", bound=Any) 

179 

180 

181class Immutable: 

182 """mark a ClauseElement as 'immutable' when expressions are cloned. 

183 

184 "immutable" objects refers to the "mutability" of an object in the 

185 context of SQL DQL and DML generation. Such as, in DQL, one can 

186 compose a SELECT or subquery of varied forms, but one cannot modify 

187 the structure of a specific table or column within DQL. 

188 :class:`.Immutable` is mostly intended to follow this concept, and as 

189 such the primary "immutable" objects are :class:`.ColumnClause`, 

190 :class:`.Column`, :class:`.TableClause`, :class:`.Table`. 

191 

192 """ 

193 

194 __slots__ = () 

195 

196 _is_immutable = True 

197 

198 def unique_params(self, *optionaldict, **kwargs): 

199 raise NotImplementedError("Immutable objects do not support copying") 

200 

201 def params(self, *optionaldict, **kwargs): 

202 raise NotImplementedError("Immutable objects do not support copying") 

203 

204 def _clone(self: _Self, **kw: Any) -> _Self: 

205 return self 

206 

207 def _copy_internals( 

208 self, *, omit_attrs: Iterable[str] = (), **kw: Any 

209 ) -> None: 

210 pass 

211 

212 

213class SingletonConstant(Immutable): 

214 """Represent SQL constants like NULL, TRUE, FALSE""" 

215 

216 _is_singleton_constant = True 

217 

218 _singleton: SingletonConstant 

219 

220 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T: 

221 return cast(_T, cls._singleton) 

222 

223 @util.non_memoized_property 

224 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]: 

225 raise NotImplementedError() 

226 

227 @classmethod 

228 def _create_singleton(cls): 

229 obj = object.__new__(cls) 

230 obj.__init__() # type: ignore 

231 

232 # for a long time this was an empty frozenset, meaning 

233 # a SingletonConstant would never be a "corresponding column" in 

234 # a statement. This referred to #6259. However, in #7154 we see 

235 # that we do in fact need "correspondence" to work when matching cols 

236 # in result sets, so the non-correspondence was moved to a more 

237 # specific level when we are actually adapting expressions for SQL 

238 # render only. 

239 obj.proxy_set = frozenset([obj]) 

240 cls._singleton = obj 

241 

242 

243def _from_objects( 

244 *elements: Union[ 

245 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement 

246 ] 

247) -> Iterator[FromClause]: 

248 return itertools.chain.from_iterable( 

249 [element._from_objects for element in elements] 

250 ) 

251 

252 

253def _select_iterables( 

254 elements: Iterable[roles.ColumnsClauseRole], 

255) -> _SelectIterable: 

256 """expand tables into individual columns in the 

257 given list of column expressions. 

258 

259 """ 

260 return itertools.chain.from_iterable( 

261 [c._select_iterable for c in elements] 

262 ) 

263 

264 

265_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType") 

266 

267 

268class _GenerativeType(Protocol): 

269 def _generate(self) -> Self: ... 

270 

271 

272def _generative(fn: _Fn) -> _Fn: 

273 """non-caching _generative() decorator. 

274 

275 This is basically the legacy decorator that copies the object and 

276 runs a method on the new copy. 

277 

278 """ 

279 

280 @util.decorator 

281 def _generative( 

282 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any 

283 ) -> _SelfGenerativeType: 

284 """Mark a method as generative.""" 

285 

286 self = self._generate() 

287 x = fn(self, *args, **kw) 

288 assert x is self, "generative methods must return self" 

289 return self 

290 

291 decorated = _generative(fn) 

292 decorated.non_generative = fn # type: ignore 

293 return decorated 

294 

295 

296def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]: 

297 msgs = kw.pop("msgs", {}) 

298 

299 defaults = kw.pop("defaults", {}) 

300 

301 getters = [ 

302 (name, operator.attrgetter(name), defaults.get(name, None)) 

303 for name in names 

304 ] 

305 

306 @util.decorator 

307 def check(fn, *args, **kw): 

308 # make pylance happy by not including "self" in the argument 

309 # list 

310 self = args[0] 

311 args = args[1:] 

312 for name, getter, default_ in getters: 

313 if getter(self) is not default_: 

314 msg = msgs.get( 

315 name, 

316 "Method %s() has already been invoked on this %s construct" 

317 % (fn.__name__, self.__class__), 

318 ) 

319 raise exc.InvalidRequestError(msg) 

320 return fn(self, *args, **kw) 

321 

322 return check 

323 

324 

325def _clone(element, **kw): 

326 return element._clone(**kw) 

327 

328 

329def _expand_cloned( 

330 elements: Iterable[_CLE], 

331) -> Iterable[_CLE]: 

332 """expand the given set of ClauseElements to be the set of all 'cloned' 

333 predecessors. 

334 

335 """ 

336 # TODO: cython candidate 

337 return itertools.chain(*[x._cloned_set for x in elements]) 

338 

339 

340def _de_clone( 

341 elements: Iterable[_CLE], 

342) -> Iterable[_CLE]: 

343 for x in elements: 

344 while x._is_clone_of is not None: 

345 x = x._is_clone_of 

346 yield x 

347 

348 

349def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

350 """return the intersection of sets a and b, counting 

351 any overlap between 'cloned' predecessors. 

352 

353 The returned set is in terms of the entities present within 'a'. 

354 

355 """ 

356 all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b)) 

357 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)} 

358 

359 

360def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]: 

361 all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b)) 

362 return { 

363 elem for elem in a if not all_overlap.intersection(elem._cloned_set) 

364 } 

365 

366 

367class _DialectArgView(MutableMapping[str, Any]): 

368 """A dictionary view of dialect-level arguments in the form 

369 <dialectname>_<argument_name>. 

370 

371 """ 

372 

373 __slots__ = ("obj",) 

374 

375 def __init__(self, obj): 

376 self.obj = obj 

377 

378 def _key(self, key): 

379 try: 

380 dialect, value_key = key.split("_", 1) 

381 except ValueError as err: 

382 raise KeyError(key) from err 

383 else: 

384 return dialect, value_key 

385 

386 def __getitem__(self, key): 

387 dialect, value_key = self._key(key) 

388 

389 try: 

390 opt = self.obj.dialect_options[dialect] 

391 except exc.NoSuchModuleError as err: 

392 raise KeyError(key) from err 

393 else: 

394 return opt[value_key] 

395 

396 def __setitem__(self, key, value): 

397 try: 

398 dialect, value_key = self._key(key) 

399 except KeyError as err: 

400 raise exc.ArgumentError( 

401 "Keys must be of the form <dialectname>_<argname>" 

402 ) from err 

403 else: 

404 self.obj.dialect_options[dialect][value_key] = value 

405 

406 def __delitem__(self, key): 

407 dialect, value_key = self._key(key) 

408 del self.obj.dialect_options[dialect][value_key] 

409 

410 def __len__(self): 

411 return sum( 

412 len(args._non_defaults) 

413 for args in self.obj.dialect_options.values() 

414 ) 

415 

416 def __iter__(self): 

417 return ( 

418 "%s_%s" % (dialect_name, value_name) 

419 for dialect_name in self.obj.dialect_options 

420 for value_name in self.obj.dialect_options[ 

421 dialect_name 

422 ]._non_defaults 

423 ) 

424 

425 

426class _DialectArgDict(MutableMapping[str, Any]): 

427 """A dictionary view of dialect-level arguments for a specific 

428 dialect. 

429 

430 Maintains a separate collection of user-specified arguments 

431 and dialect-specified default arguments. 

432 

433 """ 

434 

435 def __init__(self): 

436 self._non_defaults = {} 

437 self._defaults = {} 

438 

439 def __len__(self): 

440 return len(set(self._non_defaults).union(self._defaults)) 

441 

442 def __iter__(self): 

443 return iter(set(self._non_defaults).union(self._defaults)) 

444 

445 def __getitem__(self, key): 

446 if key in self._non_defaults: 

447 return self._non_defaults[key] 

448 else: 

449 return self._defaults[key] 

450 

451 def __setitem__(self, key, value): 

452 self._non_defaults[key] = value 

453 

454 def __delitem__(self, key): 

455 del self._non_defaults[key] 

456 

457 

458@util.preload_module("sqlalchemy.dialects") 

459def _kw_reg_for_dialect(dialect_name): 

460 dialect_cls = util.preloaded.dialects.registry.load(dialect_name) 

461 if dialect_cls.construct_arguments is None: 

462 return None 

463 return dict(dialect_cls.construct_arguments) 

464 

465 

466class DialectKWArgs: 

467 """Establish the ability for a class to have dialect-specific arguments 

468 with defaults and constructor validation. 

469 

470 The :class:`.DialectKWArgs` interacts with the 

471 :attr:`.DefaultDialect.construct_arguments` present on a dialect. 

472 

473 .. seealso:: 

474 

475 :attr:`.DefaultDialect.construct_arguments` 

476 

477 """ 

478 

479 __slots__ = () 

480 

481 _dialect_kwargs_traverse_internals = [ 

482 ("dialect_options", InternalTraversal.dp_dialect_options) 

483 ] 

484 

485 @classmethod 

486 def argument_for(cls, dialect_name, argument_name, default): 

487 """Add a new kind of dialect-specific keyword argument for this class. 

488 

489 E.g.:: 

490 

491 Index.argument_for("mydialect", "length", None) 

492 

493 some_index = Index("a", "b", mydialect_length=5) 

494 

495 The :meth:`.DialectKWArgs.argument_for` method is a per-argument 

496 way adding extra arguments to the 

497 :attr:`.DefaultDialect.construct_arguments` dictionary. This 

498 dictionary provides a list of argument names accepted by various 

499 schema-level constructs on behalf of a dialect. 

500 

501 New dialects should typically specify this dictionary all at once as a 

502 data member of the dialect class. The use case for ad-hoc addition of 

503 argument names is typically for end-user code that is also using 

504 a custom compilation scheme which consumes the additional arguments. 

505 

506 :param dialect_name: name of a dialect. The dialect must be 

507 locatable, else a :class:`.NoSuchModuleError` is raised. The 

508 dialect must also include an existing 

509 :attr:`.DefaultDialect.construct_arguments` collection, indicating 

510 that it participates in the keyword-argument validation and default 

511 system, else :class:`.ArgumentError` is raised. If the dialect does 

512 not include this collection, then any keyword argument can be 

513 specified on behalf of this dialect already. All dialects packaged 

514 within SQLAlchemy include this collection, however for third party 

515 dialects, support may vary. 

516 

517 :param argument_name: name of the parameter. 

518 

519 :param default: default value of the parameter. 

520 

521 """ 

522 

523 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name] 

524 if construct_arg_dictionary is None: 

525 raise exc.ArgumentError( 

526 "Dialect '%s' does have keyword-argument " 

527 "validation and defaults enabled configured" % dialect_name 

528 ) 

529 if cls not in construct_arg_dictionary: 

530 construct_arg_dictionary[cls] = {} 

531 construct_arg_dictionary[cls][argument_name] = default 

532 

533 @property 

534 def dialect_kwargs(self): 

535 """A collection of keyword arguments specified as dialect-specific 

536 options to this construct. 

537 

538 The arguments are present here in their original ``<dialect>_<kwarg>`` 

539 format. Only arguments that were actually passed are included; 

540 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which 

541 contains all options known by this dialect including defaults. 

542 

543 The collection is also writable; keys are accepted of the 

544 form ``<dialect>_<kwarg>`` where the value will be assembled 

545 into the list of options. 

546 

547 .. seealso:: 

548 

549 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form 

550 

551 """ 

552 return _DialectArgView(self) 

553 

554 @property 

555 def kwargs(self): 

556 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`.""" 

557 return self.dialect_kwargs 

558 

559 _kw_registry = util.PopulateDict(_kw_reg_for_dialect) 

560 

561 @classmethod 

562 def _kw_reg_for_dialect_cls(cls, dialect_name): 

563 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name] 

564 d = _DialectArgDict() 

565 

566 if construct_arg_dictionary is None: 

567 d._defaults.update({"*": None}) 

568 else: 

569 for cls in reversed(cls.__mro__): 

570 if cls in construct_arg_dictionary: 

571 d._defaults.update(construct_arg_dictionary[cls]) 

572 return d 

573 

574 @util.memoized_property 

575 def dialect_options(self): 

576 """A collection of keyword arguments specified as dialect-specific 

577 options to this construct. 

578 

579 This is a two-level nested registry, keyed to ``<dialect_name>`` 

580 and ``<argument_name>``. For example, the ``postgresql_where`` 

581 argument would be locatable as:: 

582 

583 arg = my_object.dialect_options["postgresql"]["where"] 

584 

585 .. versionadded:: 0.9.2 

586 

587 .. seealso:: 

588 

589 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form 

590 

591 """ 

592 

593 return util.PopulateDict(self._kw_reg_for_dialect_cls) 

594 

595 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None: 

596 # validate remaining kwargs that they all specify DB prefixes 

597 

598 if not kwargs: 

599 return 

600 

601 for k in kwargs: 

602 m = re.match("^(.+?)_(.+)$", k) 

603 if not m: 

604 raise TypeError( 

605 "Additional arguments should be " 

606 "named <dialectname>_<argument>, got '%s'" % k 

607 ) 

608 dialect_name, arg_name = m.group(1, 2) 

609 

610 try: 

611 construct_arg_dictionary = self.dialect_options[dialect_name] 

612 except exc.NoSuchModuleError: 

613 util.warn( 

614 "Can't validate argument %r; can't " 

615 "locate any SQLAlchemy dialect named %r" 

616 % (k, dialect_name) 

617 ) 

618 self.dialect_options[dialect_name] = d = _DialectArgDict() 

619 d._defaults.update({"*": None}) 

620 d._non_defaults[arg_name] = kwargs[k] 

621 else: 

622 if ( 

623 "*" not in construct_arg_dictionary 

624 and arg_name not in construct_arg_dictionary 

625 ): 

626 raise exc.ArgumentError( 

627 "Argument %r is not accepted by " 

628 "dialect %r on behalf of %r" 

629 % (k, dialect_name, self.__class__) 

630 ) 

631 else: 

632 construct_arg_dictionary[arg_name] = kwargs[k] 

633 

634 

635class CompileState: 

636 """Produces additional object state necessary for a statement to be 

637 compiled. 

638 

639 the :class:`.CompileState` class is at the base of classes that assemble 

640 state for a particular statement object that is then used by the 

641 compiler. This process is essentially an extension of the process that 

642 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis 

643 on converting raw user intent into more organized structures rather than 

644 producing string output. The top-level :class:`.CompileState` for the 

645 statement being executed is also accessible when the execution context 

646 works with invoking the statement and collecting results. 

647 

648 The production of :class:`.CompileState` is specific to the compiler, such 

649 as within the :meth:`.SQLCompiler.visit_insert`, 

650 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also 

651 responsible for associating the :class:`.CompileState` with the 

652 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement, 

653 i.e. the outermost SQL statement that's actually being executed. 

654 There can be other :class:`.CompileState` objects that are not the 

655 toplevel, such as when a SELECT subquery or CTE-nested 

656 INSERT/UPDATE/DELETE is generated. 

657 

658 .. versionadded:: 1.4 

659 

660 """ 

661 

662 __slots__ = ("statement", "_ambiguous_table_name_map") 

663 

664 plugins: Dict[Tuple[str, str], Type[CompileState]] = {} 

665 

666 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] 

667 

668 @classmethod 

669 def create_for_statement( 

670 cls, statement: Executable, compiler: SQLCompiler, **kw: Any 

671 ) -> CompileState: 

672 # factory construction. 

673 

674 if statement._propagate_attrs: 

675 plugin_name = statement._propagate_attrs.get( 

676 "compile_state_plugin", "default" 

677 ) 

678 klass = cls.plugins.get( 

679 (plugin_name, statement._effective_plugin_target), None 

680 ) 

681 if klass is None: 

682 klass = cls.plugins[ 

683 ("default", statement._effective_plugin_target) 

684 ] 

685 

686 else: 

687 klass = cls.plugins[ 

688 ("default", statement._effective_plugin_target) 

689 ] 

690 

691 if klass is cls: 

692 return cls(statement, compiler, **kw) 

693 else: 

694 return klass.create_for_statement(statement, compiler, **kw) 

695 

696 def __init__(self, statement, compiler, **kw): 

697 self.statement = statement 

698 

699 @classmethod 

700 def get_plugin_class( 

701 cls, statement: Executable 

702 ) -> Optional[Type[CompileState]]: 

703 plugin_name = statement._propagate_attrs.get( 

704 "compile_state_plugin", None 

705 ) 

706 

707 if plugin_name: 

708 key = (plugin_name, statement._effective_plugin_target) 

709 if key in cls.plugins: 

710 return cls.plugins[key] 

711 

712 # there's no case where we call upon get_plugin_class() and want 

713 # to get None back, there should always be a default. return that 

714 # if there was no plugin-specific class (e.g. Insert with "orm" 

715 # plugin) 

716 try: 

717 return cls.plugins[("default", statement._effective_plugin_target)] 

718 except KeyError: 

719 return None 

720 

721 @classmethod 

722 def _get_plugin_class_for_plugin( 

723 cls, statement: Executable, plugin_name: str 

724 ) -> Optional[Type[CompileState]]: 

725 try: 

726 return cls.plugins[ 

727 (plugin_name, statement._effective_plugin_target) 

728 ] 

729 except KeyError: 

730 return None 

731 

732 @classmethod 

733 def plugin_for( 

734 cls, plugin_name: str, visit_name: str 

735 ) -> Callable[[_Fn], _Fn]: 

736 def decorate(cls_to_decorate): 

737 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate 

738 return cls_to_decorate 

739 

740 return decorate 

741 

742 

743class Generative(HasMemoized): 

744 """Provide a method-chaining pattern in conjunction with the 

745 @_generative decorator.""" 

746 

747 def _generate(self) -> Self: 

748 skip = self._memoized_keys 

749 cls = self.__class__ 

750 s = cls.__new__(cls) 

751 if skip: 

752 # ensure this iteration remains atomic 

753 s.__dict__ = { 

754 k: v for k, v in self.__dict__.copy().items() if k not in skip 

755 } 

756 else: 

757 s.__dict__ = self.__dict__.copy() 

758 return s 

759 

760 

761class InPlaceGenerative(HasMemoized): 

762 """Provide a method-chaining pattern in conjunction with the 

763 @_generative decorator that mutates in place.""" 

764 

765 __slots__ = () 

766 

767 def _generate(self): 

768 skip = self._memoized_keys 

769 # note __dict__ needs to be in __slots__ if this is used 

770 for k in skip: 

771 self.__dict__.pop(k, None) 

772 return self 

773 

774 

775class HasCompileState(Generative): 

776 """A class that has a :class:`.CompileState` associated with it.""" 

777 

778 _compile_state_plugin: Optional[Type[CompileState]] = None 

779 

780 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT 

781 

782 _compile_state_factory = CompileState.create_for_statement 

783 

784 

785class _MetaOptions(type): 

786 """metaclass for the Options class. 

787 

788 This metaclass is actually necessary despite the availability of the 

789 ``__init_subclass__()`` hook as this type also provides custom class-level 

790 behavior for the ``__add__()`` method. 

791 

792 """ 

793 

794 _cache_attrs: Tuple[str, ...] 

795 

796 def __add__(self, other): 

797 o1 = self() 

798 

799 if set(other).difference(self._cache_attrs): 

800 raise TypeError( 

801 "dictionary contains attributes not covered by " 

802 "Options class %s: %r" 

803 % (self, set(other).difference(self._cache_attrs)) 

804 ) 

805 

806 o1.__dict__.update(other) 

807 return o1 

808 

809 if TYPE_CHECKING: 

810 

811 def __getattr__(self, key: str) -> Any: ... 

812 

813 def __setattr__(self, key: str, value: Any) -> None: ... 

814 

815 def __delattr__(self, key: str) -> None: ... 

816 

817 

818class Options(metaclass=_MetaOptions): 

819 """A cacheable option dictionary with defaults.""" 

820 

821 __slots__ = () 

822 

823 _cache_attrs: Tuple[str, ...] 

824 

825 def __init_subclass__(cls) -> None: 

826 dict_ = cls.__dict__ 

827 cls._cache_attrs = tuple( 

828 sorted( 

829 d 

830 for d in dict_ 

831 if not d.startswith("__") 

832 and d not in ("_cache_key_traversal",) 

833 ) 

834 ) 

835 super().__init_subclass__() 

836 

837 def __init__(self, **kw): 

838 self.__dict__.update(kw) 

839 

840 def __add__(self, other): 

841 o1 = self.__class__.__new__(self.__class__) 

842 o1.__dict__.update(self.__dict__) 

843 

844 if set(other).difference(self._cache_attrs): 

845 raise TypeError( 

846 "dictionary contains attributes not covered by " 

847 "Options class %s: %r" 

848 % (self, set(other).difference(self._cache_attrs)) 

849 ) 

850 

851 o1.__dict__.update(other) 

852 return o1 

853 

854 def __eq__(self, other): 

855 # TODO: very inefficient. This is used only in test suites 

856 # right now. 

857 for a, b in zip_longest(self._cache_attrs, other._cache_attrs): 

858 if getattr(self, a) != getattr(other, b): 

859 return False 

860 return True 

861 

862 def __repr__(self): 

863 # TODO: fairly inefficient, used only in debugging right now. 

864 

865 return "%s(%s)" % ( 

866 self.__class__.__name__, 

867 ", ".join( 

868 "%s=%r" % (k, self.__dict__[k]) 

869 for k in self._cache_attrs 

870 if k in self.__dict__ 

871 ), 

872 ) 

873 

874 @classmethod 

875 def isinstance(cls, klass: Type[Any]) -> bool: 

876 return issubclass(cls, klass) 

877 

878 @hybridmethod 

879 def add_to_element(self, name, value): 

880 return self + {name: getattr(self, name) + value} 

881 

882 @hybridmethod 

883 def _state_dict_inst(self) -> Mapping[str, Any]: 

884 return self.__dict__ 

885 

886 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT 

887 

888 @_state_dict_inst.classlevel 

889 def _state_dict(cls) -> Mapping[str, Any]: 

890 return cls._state_dict_const 

891 

892 @classmethod 

893 def safe_merge(cls, other): 

894 d = other._state_dict() 

895 

896 # only support a merge with another object of our class 

897 # and which does not have attrs that we don't. otherwise 

898 # we risk having state that might not be part of our cache 

899 # key strategy 

900 

901 if ( 

902 cls is not other.__class__ 

903 and other._cache_attrs 

904 and set(other._cache_attrs).difference(cls._cache_attrs) 

905 ): 

906 raise TypeError( 

907 "other element %r is not empty, is not of type %s, " 

908 "and contains attributes not covered here %r" 

909 % ( 

910 other, 

911 cls, 

912 set(other._cache_attrs).difference(cls._cache_attrs), 

913 ) 

914 ) 

915 return cls + d 

916 

917 @classmethod 

918 def from_execution_options( 

919 cls, key, attrs, exec_options, statement_exec_options 

920 ): 

921 """process Options argument in terms of execution options. 

922 

923 

924 e.g.:: 

925 

926 ( 

927 load_options, 

928 execution_options, 

929 ) = QueryContext.default_load_options.from_execution_options( 

930 "_sa_orm_load_options", 

931 {"populate_existing", "autoflush", "yield_per"}, 

932 execution_options, 

933 statement._execution_options, 

934 ) 

935 

936 get back the Options and refresh "_sa_orm_load_options" in the 

937 exec options dict w/ the Options as well 

938 

939 """ 

940 

941 # common case is that no options we are looking for are 

942 # in either dictionary, so cancel for that first 

943 check_argnames = attrs.intersection( 

944 set(exec_options).union(statement_exec_options) 

945 ) 

946 

947 existing_options = exec_options.get(key, cls) 

948 

949 if check_argnames: 

950 result = {} 

951 for argname in check_argnames: 

952 local = "_" + argname 

953 if argname in exec_options: 

954 result[local] = exec_options[argname] 

955 elif argname in statement_exec_options: 

956 result[local] = statement_exec_options[argname] 

957 

958 new_options = existing_options + result 

959 exec_options = util.immutabledict().merge_with( 

960 exec_options, {key: new_options} 

961 ) 

962 return new_options, exec_options 

963 

964 else: 

965 return existing_options, exec_options 

966 

967 if TYPE_CHECKING: 

968 

969 def __getattr__(self, key: str) -> Any: ... 

970 

971 def __setattr__(self, key: str, value: Any) -> None: ... 

972 

973 def __delattr__(self, key: str) -> None: ... 

974 

975 

976class CacheableOptions(Options, HasCacheKey): 

977 __slots__ = () 

978 

979 @hybridmethod 

980 def _gen_cache_key_inst(self, anon_map, bindparams): 

981 return HasCacheKey._gen_cache_key(self, anon_map, bindparams) 

982 

983 @_gen_cache_key_inst.classlevel 

984 def _gen_cache_key(cls, anon_map, bindparams): 

985 return (cls, ()) 

986 

987 @hybridmethod 

988 def _generate_cache_key(self): 

989 return HasCacheKey._generate_cache_key_for_object(self) 

990 

991 

992class ExecutableOption(HasCopyInternals): 

993 __slots__ = () 

994 

995 _annotations = util.EMPTY_DICT 

996 

997 __visit_name__ = "executable_option" 

998 

999 _is_has_cache_key = False 

1000 

1001 _is_core = True 

1002 

1003 def _clone(self, **kw): 

1004 """Create a shallow copy of this ExecutableOption.""" 

1005 c = self.__class__.__new__(self.__class__) 

1006 c.__dict__ = dict(self.__dict__) # type: ignore 

1007 return c 

1008 

1009 

1010_L = TypeVar("_L", bound=str) 

1011 

1012 

1013class HasSyntaxExtensions(Generic[_L]): 

1014 

1015 _position_map: Mapping[_L, str] 

1016 

1017 @_generative 

1018 def ext(self, extension: SyntaxExtension) -> Self: 

1019 """Applies a SQL syntax extension to this statement. 

1020 

1021 SQL syntax extensions are :class:`.ClauseElement` objects that define 

1022 some vendor-specific syntactical construct that take place in specific 

1023 parts of a SQL statement. Examples include vendor extensions like 

1024 PostgreSQL / SQLite's "ON DUPLICATE KEY UPDATE", PostgreSQL's 

1025 "DISTINCT ON", and MySQL's "LIMIT" that can be applied to UPDATE 

1026 and DELETE statements. 

1027 

1028 .. seealso:: 

1029 

1030 :ref:`examples_syntax_extensions` 

1031 

1032 :func:`_mysql.limit` - DML LIMIT for MySQL 

1033 

1034 :func:`_postgresql.distinct_on` - DISTINCT ON for PostgreSQL 

1035 

1036 .. versionadded:: 2.1 

1037 

1038 """ 

1039 extension = coercions.expect( 

1040 roles.SyntaxExtensionRole, extension, apply_propagate_attrs=self 

1041 ) 

1042 self._apply_syntax_extension_to_self(extension) 

1043 return self 

1044 

1045 @util.preload_module("sqlalchemy.sql.elements") 

1046 def apply_syntax_extension_point( 

1047 self, 

1048 apply_fn: Callable[[Sequence[ClauseElement]], Sequence[ClauseElement]], 

1049 position: _L, 

1050 ) -> None: 

1051 """Apply a :class:`.SyntaxExtension` to a known extension point. 

1052 

1053 Should be used only internally by :class:`.SyntaxExtension`. 

1054 

1055 E.g.:: 

1056 

1057 class Qualify(SyntaxExtension, ClauseElement): 

1058 

1059 # ... 

1060 

1061 def apply_to_select(self, select_stmt: Select) -> None: 

1062 # append self to existing 

1063 select_stmt.apply_extension_point( 

1064 lambda existing: [*existing, self], "post_criteria" 

1065 ) 

1066 

1067 

1068 class ReplaceExt(SyntaxExtension, ClauseElement): 

1069 

1070 # ... 

1071 

1072 def apply_to_select(self, select_stmt: Select) -> None: 

1073 # replace any existing elements regardless of type 

1074 select_stmt.apply_extension_point( 

1075 lambda existing: [self], "post_criteria" 

1076 ) 

1077 

1078 

1079 class ReplaceOfTypeExt(SyntaxExtension, ClauseElement): 

1080 

1081 # ... 

1082 

1083 def apply_to_select(self, select_stmt: Select) -> None: 

1084 # replace any existing elements of the same type 

1085 select_stmt.apply_extension_point( 

1086 self.append_replacing_same_type, "post_criteria" 

1087 ) 

1088 

1089 :param apply_fn: callable function that will receive a sequence of 

1090 :class:`.ClauseElement` that is already populating the extension 

1091 point (the sequence is empty if there isn't one), and should return 

1092 a new sequence of :class:`.ClauseElement` that will newly populate 

1093 that point. The function typically can choose to concatenate the 

1094 existing values with the new one, or to replace the values that are 

1095 there with a new one by returning a list of a single element, or 

1096 to perform more complex operations like removing only the same 

1097 type element from the input list of merging already existing elements 

1098 of the same type. Some examples are shown in the examples above 

1099 :param position: string name of the position to apply to. This 

1100 varies per statement type. IDEs should show the possible values 

1101 for each statement type as it's typed with a ``typing.Literal`` per 

1102 statement. 

1103 

1104 .. seealso:: 

1105 

1106 :ref:`examples_syntax_extensions` 

1107 

1108 

1109 """ # noqa: E501 

1110 

1111 try: 

1112 attrname = self._position_map[position] 

1113 except KeyError as ke: 

1114 raise ValueError( 

1115 f"Unknown position {position!r} for {self.__class__} " 

1116 f"construct; known positions: " 

1117 f"{', '.join(repr(k) for k in self._position_map)}" 

1118 ) from ke 

1119 else: 

1120 ElementList = util.preloaded.sql_elements.ElementList 

1121 existing: Optional[ClauseElement] = getattr(self, attrname, None) 

1122 if existing is None: 

1123 input_seq: Tuple[ClauseElement, ...] = () 

1124 elif isinstance(existing, ElementList): 

1125 input_seq = existing.clauses 

1126 else: 

1127 input_seq = (existing,) 

1128 

1129 new_seq = apply_fn(input_seq) 

1130 assert new_seq, "cannot return empty sequence" 

1131 new = new_seq[0] if len(new_seq) == 1 else ElementList(new_seq) 

1132 setattr(self, attrname, new) 

1133 

1134 def _apply_syntax_extension_to_self( 

1135 self, extension: SyntaxExtension 

1136 ) -> None: 

1137 raise NotImplementedError() 

1138 

1139 def _get_syntax_extensions_as_dict(self) -> Mapping[_L, SyntaxExtension]: 

1140 res: Dict[_L, SyntaxExtension] = {} 

1141 for name, attr in self._position_map.items(): 

1142 value = getattr(self, attr) 

1143 if value is not None: 

1144 res[name] = value 

1145 return res 

1146 

1147 def _set_syntax_extensions(self, **extensions: SyntaxExtension) -> None: 

1148 for name, value in extensions.items(): 

1149 setattr(self, self._position_map[name], value) # type: ignore[index] # noqa: E501 

1150 

1151 

1152class SyntaxExtension(roles.SyntaxExtensionRole): 

1153 """Defines a unit that when also extending from :class:`.ClauseElement` 

1154 can be applied to SQLAlchemy statements :class:`.Select`, 

1155 :class:`_sql.Insert`, :class:`.Update` and :class:`.Delete` making use of 

1156 pre-established SQL insertion points within these constructs. 

1157 

1158 .. versionadded:: 2.1 

1159 

1160 .. seealso:: 

1161 

1162 :ref:`examples_syntax_extensions` 

1163 

1164 """ 

1165 

1166 def append_replacing_same_type( 

1167 self, existing: Sequence[ClauseElement] 

1168 ) -> Sequence[ClauseElement]: 

1169 """Utility function that can be used as 

1170 :paramref:`_sql.HasSyntaxExtensions.apply_extension_point.apply_fn` 

1171 to remove any other element of the same type in existing and appending 

1172 ``self`` to the list. 

1173 

1174 This is equivalent to:: 

1175 

1176 stmt.apply_extension_point( 

1177 lambda existing: [ 

1178 *(e for e in existing if not isinstance(e, ReplaceOfTypeExt)), 

1179 self, 

1180 ], 

1181 "post_criteria", 

1182 ) 

1183 

1184 .. seealso:: 

1185 

1186 :ref:`examples_syntax_extensions` 

1187 

1188 :meth:`_sql.HasSyntaxExtensions.apply_syntax_extension_point` 

1189 

1190 """ # noqa: E501 

1191 cls = type(self) 

1192 return [*(e for e in existing if not isinstance(e, cls)), self] # type: ignore[list-item] # noqa: E501 

1193 

1194 def apply_to_select(self, select_stmt: Select[Unpack[_Ts]]) -> None: 

1195 """Apply this :class:`.SyntaxExtension` to a :class:`.Select`""" 

1196 raise NotImplementedError( 

1197 f"Extension {type(self).__name__} cannot be applied to select" 

1198 ) 

1199 

1200 def apply_to_update(self, update_stmt: Update) -> None: 

1201 """Apply this :class:`.SyntaxExtension` to an :class:`.Update`""" 

1202 raise NotImplementedError( 

1203 f"Extension {type(self).__name__} cannot be applied to update" 

1204 ) 

1205 

1206 def apply_to_delete(self, delete_stmt: Delete) -> None: 

1207 """Apply this :class:`.SyntaxExtension` to a :class:`.Delete`""" 

1208 raise NotImplementedError( 

1209 f"Extension {type(self).__name__} cannot be applied to delete" 

1210 ) 

1211 

1212 def apply_to_insert(self, insert_stmt: Insert) -> None: 

1213 """Apply this :class:`.SyntaxExtension` to an 

1214 :class:`_sql.Insert`""" 

1215 raise NotImplementedError( 

1216 f"Extension {type(self).__name__} cannot be applied to insert" 

1217 ) 

1218 

1219 

1220class Executable(roles.StatementRole): 

1221 """Mark a :class:`_expression.ClauseElement` as supporting execution. 

1222 

1223 :class:`.Executable` is a superclass for all "statement" types 

1224 of objects, including :func:`select`, :func:`delete`, :func:`update`, 

1225 :func:`insert`, :func:`text`. 

1226 

1227 """ 

1228 

1229 supports_execution: bool = True 

1230 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT 

1231 _is_default_generator = False 

1232 _with_options: Tuple[ExecutableOption, ...] = () 

1233 _compile_state_funcs: Tuple[ 

1234 Tuple[Callable[[CompileState], None], Any], ... 

1235 ] = () 

1236 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]] 

1237 

1238 _executable_traverse_internals = [ 

1239 ("_with_options", InternalTraversal.dp_executable_options), 

1240 ( 

1241 "_compile_state_funcs", 

1242 ExtendedInternalTraversal.dp_compile_state_funcs, 

1243 ), 

1244 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs), 

1245 ] 

1246 

1247 is_select = False 

1248 is_from_statement = False 

1249 is_update = False 

1250 is_insert = False 

1251 is_text = False 

1252 is_delete = False 

1253 is_dml = False 

1254 

1255 if TYPE_CHECKING: 

1256 __visit_name__: str 

1257 

1258 def _compile_w_cache( 

1259 self, 

1260 dialect: Dialect, 

1261 *, 

1262 compiled_cache: Optional[CompiledCacheType], 

1263 column_keys: List[str], 

1264 for_executemany: bool = False, 

1265 schema_translate_map: Optional[SchemaTranslateMapType] = None, 

1266 **kw: Any, 

1267 ) -> Tuple[ 

1268 Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats 

1269 ]: ... 

1270 

1271 def _execute_on_connection( 

1272 self, 

1273 connection: Connection, 

1274 distilled_params: _CoreMultiExecuteParams, 

1275 execution_options: CoreExecuteOptionsParameter, 

1276 ) -> CursorResult[Any]: ... 

1277 

1278 def _execute_on_scalar( 

1279 self, 

1280 connection: Connection, 

1281 distilled_params: _CoreMultiExecuteParams, 

1282 execution_options: CoreExecuteOptionsParameter, 

1283 ) -> Any: ... 

1284 

1285 @util.ro_non_memoized_property 

1286 def _all_selected_columns(self): 

1287 raise NotImplementedError() 

1288 

1289 @property 

1290 def _effective_plugin_target(self) -> str: 

1291 return self.__visit_name__ 

1292 

1293 @_generative 

1294 def options(self, *options: ExecutableOption) -> Self: 

1295 """Apply options to this statement. 

1296 

1297 In the general sense, options are any kind of Python object 

1298 that can be interpreted by systems that consume the statement outside 

1299 of the regular SQL compiler chain. Specifically, these options are 

1300 the ORM level options that apply "eager load" and other loading 

1301 behaviors to an ORM query. 

1302 

1303 For background on specific kinds of options for specific kinds of 

1304 statements, refer to the documentation for those option objects. 

1305 

1306 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to 

1307 Core statement objects towards the goal of allowing unified 

1308 Core / ORM querying capabilities. 

1309 

1310 .. seealso:: 

1311 

1312 :ref:`loading_columns` - refers to options specific to the usage 

1313 of ORM queries 

1314 

1315 :ref:`relationship_loader_options` - refers to options specific 

1316 to the usage of ORM queries 

1317 

1318 """ 

1319 self._with_options += tuple( 

1320 coercions.expect(roles.ExecutableOptionRole, opt) 

1321 for opt in options 

1322 ) 

1323 return self 

1324 

1325 @_generative 

1326 def _set_compile_options(self, compile_options: CacheableOptions) -> Self: 

1327 """Assign the compile options to a new value. 

1328 

1329 :param compile_options: appropriate CacheableOptions structure 

1330 

1331 """ 

1332 

1333 self._compile_options = compile_options 

1334 return self 

1335 

1336 @_generative 

1337 def _update_compile_options(self, options: CacheableOptions) -> Self: 

1338 """update the _compile_options with new keys.""" 

1339 

1340 assert self._compile_options is not None 

1341 self._compile_options += options 

1342 return self 

1343 

1344 @_generative 

1345 def _add_compile_state_func( 

1346 self, 

1347 callable_: Callable[[CompileState], None], 

1348 cache_args: Any, 

1349 ) -> Self: 

1350 """Add a compile state function to this statement. 

1351 

1352 When using the ORM only, these are callable functions that will 

1353 be given the CompileState object upon compilation. 

1354 

1355 A second argument cache_args is required, which will be combined with 

1356 the ``__code__`` identity of the function itself in order to produce a 

1357 cache key. 

1358 

1359 """ 

1360 self._compile_state_funcs += ((callable_, cache_args),) 

1361 return self 

1362 

1363 @overload 

1364 def execution_options( 

1365 self, 

1366 *, 

1367 compiled_cache: Optional[CompiledCacheType] = ..., 

1368 logging_token: str = ..., 

1369 isolation_level: IsolationLevel = ..., 

1370 no_parameters: bool = False, 

1371 stream_results: bool = False, 

1372 max_row_buffer: int = ..., 

1373 yield_per: int = ..., 

1374 driver_column_names: bool = ..., 

1375 insertmanyvalues_page_size: int = ..., 

1376 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

1377 populate_existing: bool = False, 

1378 autoflush: bool = False, 

1379 synchronize_session: SynchronizeSessionArgument = ..., 

1380 dml_strategy: DMLStrategyArgument = ..., 

1381 render_nulls: bool = ..., 

1382 is_delete_using: bool = ..., 

1383 is_update_from: bool = ..., 

1384 preserve_rowcount: bool = False, 

1385 **opt: Any, 

1386 ) -> Self: ... 

1387 

1388 @overload 

1389 def execution_options(self, **opt: Any) -> Self: ... 

1390 

1391 @_generative 

1392 def execution_options(self, **kw: Any) -> Self: 

1393 """Set non-SQL options for the statement which take effect during 

1394 execution. 

1395 

1396 Execution options can be set at many scopes, including per-statement, 

1397 per-connection, or per execution, using methods such as 

1398 :meth:`_engine.Connection.execution_options` and parameters which 

1399 accept a dictionary of options such as 

1400 :paramref:`_engine.Connection.execute.execution_options` and 

1401 :paramref:`_orm.Session.execute.execution_options`. 

1402 

1403 The primary characteristic of an execution option, as opposed to 

1404 other kinds of options such as ORM loader options, is that 

1405 **execution options never affect the compiled SQL of a query, only 

1406 things that affect how the SQL statement itself is invoked or how 

1407 results are fetched**. That is, execution options are not part of 

1408 what's accommodated by SQL compilation nor are they considered part of 

1409 the cached state of a statement. 

1410 

1411 The :meth:`_sql.Executable.execution_options` method is 

1412 :term:`generative`, as 

1413 is the case for the method as applied to the :class:`_engine.Engine` 

1414 and :class:`_orm.Query` objects, which means when the method is called, 

1415 a copy of the object is returned, which applies the given parameters to 

1416 that new copy, but leaves the original unchanged:: 

1417 

1418 statement = select(table.c.x, table.c.y) 

1419 new_statement = statement.execution_options(my_option=True) 

1420 

1421 An exception to this behavior is the :class:`_engine.Connection` 

1422 object, where the :meth:`_engine.Connection.execution_options` method 

1423 is explicitly **not** generative. 

1424 

1425 The kinds of options that may be passed to 

1426 :meth:`_sql.Executable.execution_options` and other related methods and 

1427 parameter dictionaries include parameters that are explicitly consumed 

1428 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not 

1429 defined by SQLAlchemy, which means the methods and/or parameter 

1430 dictionaries may be used for user-defined parameters that interact with 

1431 custom code, which may access the parameters using methods such as 

1432 :meth:`_sql.Executable.get_execution_options` and 

1433 :meth:`_engine.Connection.get_execution_options`, or within selected 

1434 event hooks using a dedicated ``execution_options`` event parameter 

1435 such as 

1436 :paramref:`_events.ConnectionEvents.before_execute.execution_options` 

1437 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.:: 

1438 

1439 from sqlalchemy import event 

1440 

1441 

1442 @event.listens_for(some_engine, "before_execute") 

1443 def _process_opt(conn, statement, multiparams, params, execution_options): 

1444 "run a SQL function before invoking a statement" 

1445 

1446 if execution_options.get("do_special_thing", False): 

1447 conn.exec_driver_sql("run_special_function()") 

1448 

1449 Within the scope of options that are explicitly recognized by 

1450 SQLAlchemy, most apply to specific classes of objects and not others. 

1451 The most common execution options include: 

1452 

1453 * :paramref:`_engine.Connection.execution_options.isolation_level` - 

1454 sets the isolation level for a connection or a class of connections 

1455 via an :class:`_engine.Engine`. This option is accepted only 

1456 by :class:`_engine.Connection` or :class:`_engine.Engine`. 

1457 

1458 * :paramref:`_engine.Connection.execution_options.stream_results` - 

1459 indicates results should be fetched using a server side cursor; 

1460 this option is accepted by :class:`_engine.Connection`, by the 

1461 :paramref:`_engine.Connection.execute.execution_options` parameter 

1462 on :meth:`_engine.Connection.execute`, and additionally by 

1463 :meth:`_sql.Executable.execution_options` on a SQL statement object, 

1464 as well as by ORM constructs like :meth:`_orm.Session.execute`. 

1465 

1466 * :paramref:`_engine.Connection.execution_options.compiled_cache` - 

1467 indicates a dictionary that will serve as the 

1468 :ref:`SQL compilation cache <sql_caching>` 

1469 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as 

1470 well as for ORM methods like :meth:`_orm.Session.execute`. 

1471 Can be passed as ``None`` to disable caching for statements. 

1472 This option is not accepted by 

1473 :meth:`_sql.Executable.execution_options` as it is inadvisable to 

1474 carry along a compilation cache within a statement object. 

1475 

1476 * :paramref:`_engine.Connection.execution_options.schema_translate_map` 

1477 - a mapping of schema names used by the 

1478 :ref:`Schema Translate Map <schema_translating>` feature, accepted 

1479 by :class:`_engine.Connection`, :class:`_engine.Engine`, 

1480 :class:`_sql.Executable`, as well as by ORM constructs 

1481 like :meth:`_orm.Session.execute`. 

1482 

1483 .. seealso:: 

1484 

1485 :meth:`_engine.Connection.execution_options` 

1486 

1487 :paramref:`_engine.Connection.execute.execution_options` 

1488 

1489 :paramref:`_orm.Session.execute.execution_options` 

1490 

1491 :ref:`orm_queryguide_execution_options` - documentation on all 

1492 ORM-specific execution options 

1493 

1494 """ # noqa: E501 

1495 if "isolation_level" in kw: 

1496 raise exc.ArgumentError( 

1497 "'isolation_level' execution option may only be specified " 

1498 "on Connection.execution_options(), or " 

1499 "per-engine using the isolation_level " 

1500 "argument to create_engine()." 

1501 ) 

1502 if "compiled_cache" in kw: 

1503 raise exc.ArgumentError( 

1504 "'compiled_cache' execution option may only be specified " 

1505 "on Connection.execution_options(), not per statement." 

1506 ) 

1507 self._execution_options = self._execution_options.union(kw) 

1508 return self 

1509 

1510 def get_execution_options(self) -> _ExecuteOptions: 

1511 """Get the non-SQL options which will take effect during execution. 

1512 

1513 .. seealso:: 

1514 

1515 :meth:`.Executable.execution_options` 

1516 """ 

1517 return self._execution_options 

1518 

1519 

1520class SchemaEventTarget(event.EventTarget): 

1521 """Base class for elements that are the targets of :class:`.DDLEvents` 

1522 events. 

1523 

1524 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`. 

1525 

1526 """ 

1527 

1528 dispatch: dispatcher[SchemaEventTarget] 

1529 

1530 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None: 

1531 """Associate with this SchemaEvent's parent object.""" 

1532 

1533 def _set_parent_with_dispatch( 

1534 self, parent: SchemaEventTarget, **kw: Any 

1535 ) -> None: 

1536 self.dispatch.before_parent_attach(self, parent) 

1537 self._set_parent(parent, **kw) 

1538 self.dispatch.after_parent_attach(self, parent) 

1539 

1540 

1541class SchemaVisitable(SchemaEventTarget, visitors.Visitable): 

1542 """Base class for elements that are targets of a :class:`.SchemaVisitor`. 

1543 

1544 .. versionadded:: 2.0.41 

1545 

1546 """ 

1547 

1548 

1549class SchemaVisitor(ClauseVisitor): 

1550 """Define the visiting for ``SchemaItem`` and more 

1551 generally ``SchemaVisitable`` objects. 

1552 

1553 """ 

1554 

1555 __traverse_options__ = {"schema_visitor": True} 

1556 

1557 

1558class _SentinelDefaultCharacterization(Enum): 

1559 NONE = "none" 

1560 UNKNOWN = "unknown" 

1561 CLIENTSIDE = "clientside" 

1562 SENTINEL_DEFAULT = "sentinel_default" 

1563 SERVERSIDE = "serverside" 

1564 IDENTITY = "identity" 

1565 SEQUENCE = "sequence" 

1566 

1567 

1568class _SentinelColumnCharacterization(NamedTuple): 

1569 columns: Optional[Sequence[Column[Any]]] = None 

1570 is_explicit: bool = False 

1571 is_autoinc: bool = False 

1572 default_characterization: _SentinelDefaultCharacterization = ( 

1573 _SentinelDefaultCharacterization.NONE 

1574 ) 

1575 

1576 

1577_COLKEY = TypeVar("_COLKEY", Union[None, str], str) 

1578 

1579_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True) 

1580_COL = TypeVar("_COL", bound="ColumnElement[Any]") 

1581 

1582 

1583class _ColumnMetrics(Generic[_COL_co]): 

1584 __slots__ = ("column",) 

1585 

1586 column: _COL_co 

1587 

1588 def __init__( 

1589 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co 

1590 ): 

1591 self.column = col 

1592 

1593 # proxy_index being non-empty means it was initialized. 

1594 # so we need to update it 

1595 pi = collection._proxy_index 

1596 if pi: 

1597 for eps_col in col._expanded_proxy_set: 

1598 pi[eps_col].add(self) 

1599 

1600 def get_expanded_proxy_set(self): 

1601 return self.column._expanded_proxy_set 

1602 

1603 def dispose(self, collection): 

1604 pi = collection._proxy_index 

1605 if not pi: 

1606 return 

1607 for col in self.column._expanded_proxy_set: 

1608 colset = pi.get(col, None) 

1609 if colset: 

1610 colset.discard(self) 

1611 if colset is not None and not colset: 

1612 del pi[col] 

1613 

1614 def embedded( 

1615 self, 

1616 target_set: Union[ 

1617 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]] 

1618 ], 

1619 ) -> bool: 

1620 expanded_proxy_set = self.column._expanded_proxy_set 

1621 for t in target_set.difference(expanded_proxy_set): 

1622 if not expanded_proxy_set.intersection(_expand_cloned([t])): 

1623 return False 

1624 return True 

1625 

1626 

1627class ColumnCollection(Generic[_COLKEY, _COL_co]): 

1628 """Collection of :class:`_expression.ColumnElement` instances, 

1629 typically for 

1630 :class:`_sql.FromClause` objects. 

1631 

1632 The :class:`_sql.ColumnCollection` object is most commonly available 

1633 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection 

1634 on the :class:`_schema.Table` object, introduced at 

1635 :ref:`metadata_tables_and_columns`. 

1636 

1637 The :class:`_expression.ColumnCollection` has both mapping- and sequence- 

1638 like behaviors. A :class:`_expression.ColumnCollection` usually stores 

1639 :class:`_schema.Column` objects, which are then accessible both via mapping 

1640 style access as well as attribute access style. 

1641 

1642 To access :class:`_schema.Column` objects using ordinary attribute-style 

1643 access, specify the name like any other object attribute, such as below 

1644 a column named ``employee_name`` is accessed:: 

1645 

1646 >>> employee_table.c.employee_name 

1647 

1648 To access columns that have names with special characters or spaces, 

1649 index-style access is used, such as below which illustrates a column named 

1650 ``employee ' payment`` is accessed:: 

1651 

1652 >>> employee_table.c["employee ' payment"] 

1653 

1654 As the :class:`_sql.ColumnCollection` object provides a Python dictionary 

1655 interface, common dictionary method names like 

1656 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`, 

1657 and :meth:`_sql.ColumnCollection.items` are available, which means that 

1658 database columns that are keyed under these names also need to use indexed 

1659 access:: 

1660 

1661 >>> employee_table.c["values"] 

1662 

1663 

1664 The name for which a :class:`_schema.Column` would be present is normally 

1665 that of the :paramref:`_schema.Column.key` parameter. In some contexts, 

1666 such as a :class:`_sql.Select` object that uses a label style set 

1667 using the :meth:`_sql.Select.set_label_style` method, a column of a certain 

1668 key may instead be represented under a particular label name such 

1669 as ``tablename_columnname``:: 

1670 

1671 >>> from sqlalchemy import select, column, table 

1672 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL 

1673 >>> t = table("t", column("c")) 

1674 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL) 

1675 >>> subq = stmt.subquery() 

1676 >>> subq.c.t_c 

1677 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c> 

1678 

1679 :class:`.ColumnCollection` also indexes the columns in order and allows 

1680 them to be accessible by their integer position:: 

1681 

1682 >>> cc[0] 

1683 Column('x', Integer(), table=None) 

1684 >>> cc[1] 

1685 Column('y', Integer(), table=None) 

1686 

1687 .. versionadded:: 1.4 :class:`_expression.ColumnCollection` 

1688 allows integer-based 

1689 index access to the collection. 

1690 

1691 Iterating the collection yields the column expressions in order:: 

1692 

1693 >>> list(cc) 

1694 [Column('x', Integer(), table=None), 

1695 Column('y', Integer(), table=None)] 

1696 

1697 The base :class:`_expression.ColumnCollection` object can store 

1698 duplicates, which can 

1699 mean either two columns with the same key, in which case the column 

1700 returned by key access is **arbitrary**:: 

1701 

1702 >>> x1, x2 = Column("x", Integer), Column("x", Integer) 

1703 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)]) 

1704 >>> list(cc) 

1705 [Column('x', Integer(), table=None), 

1706 Column('x', Integer(), table=None)] 

1707 >>> cc["x"] is x1 

1708 False 

1709 >>> cc["x"] is x2 

1710 True 

1711 

1712 Or it can also mean the same column multiple times. These cases are 

1713 supported as :class:`_expression.ColumnCollection` 

1714 is used to represent the columns in 

1715 a SELECT statement which may include duplicates. 

1716 

1717 A special subclass :class:`.DedupeColumnCollection` exists which instead 

1718 maintains SQLAlchemy's older behavior of not allowing duplicates; this 

1719 collection is used for schema level objects like :class:`_schema.Table` 

1720 and 

1721 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The 

1722 :class:`.DedupeColumnCollection` class also has additional mutation methods 

1723 as the schema constructs have more use cases that require removal and 

1724 replacement of columns. 

1725 

1726 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection` 

1727 now stores duplicate 

1728 column keys as well as the same column in multiple positions. The 

1729 :class:`.DedupeColumnCollection` class is added to maintain the 

1730 former behavior in those cases where deduplication as well as 

1731 additional replace/remove operations are needed. 

1732 

1733 

1734 """ 

1735 

1736 __slots__ = "_collection", "_index", "_colset", "_proxy_index" 

1737 

1738 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]] 

1739 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]] 

1740 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]] 

1741 _colset: Set[_COL_co] 

1742 

1743 def __init__( 

1744 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None 

1745 ): 

1746 object.__setattr__(self, "_colset", set()) 

1747 object.__setattr__(self, "_index", {}) 

1748 object.__setattr__( 

1749 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

1750 ) 

1751 object.__setattr__(self, "_collection", []) 

1752 if columns: 

1753 self._initial_populate(columns) 

1754 

1755 @util.preload_module("sqlalchemy.sql.elements") 

1756 def __clause_element__(self) -> ClauseList: 

1757 elements = util.preloaded.sql_elements 

1758 

1759 return elements.ClauseList( 

1760 _literal_as_text_role=roles.ColumnsClauseRole, 

1761 group=False, 

1762 *self._all_columns, 

1763 ) 

1764 

1765 def _initial_populate( 

1766 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

1767 ) -> None: 

1768 self._populate_separate_keys(iter_) 

1769 

1770 @property 

1771 def _all_columns(self) -> List[_COL_co]: 

1772 return [col for (_, col, _) in self._collection] 

1773 

1774 def keys(self) -> List[_COLKEY]: 

1775 """Return a sequence of string key names for all columns in this 

1776 collection.""" 

1777 return [k for (k, _, _) in self._collection] 

1778 

1779 def values(self) -> List[_COL_co]: 

1780 """Return a sequence of :class:`_sql.ColumnClause` or 

1781 :class:`_schema.Column` objects for all columns in this 

1782 collection.""" 

1783 return [col for (_, col, _) in self._collection] 

1784 

1785 def items(self) -> List[Tuple[_COLKEY, _COL_co]]: 

1786 """Return a sequence of (key, column) tuples for all columns in this 

1787 collection each consisting of a string key name and a 

1788 :class:`_sql.ColumnClause` or 

1789 :class:`_schema.Column` object. 

1790 """ 

1791 

1792 return [(k, col) for (k, col, _) in self._collection] 

1793 

1794 def __bool__(self) -> bool: 

1795 return bool(self._collection) 

1796 

1797 def __len__(self) -> int: 

1798 return len(self._collection) 

1799 

1800 def __iter__(self) -> Iterator[_COL_co]: 

1801 # turn to a list first to maintain over a course of changes 

1802 return iter([col for _, col, _ in self._collection]) 

1803 

1804 @overload 

1805 def __getitem__(self, key: Union[str, int]) -> _COL_co: ... 

1806 

1807 @overload 

1808 def __getitem__( 

1809 self, key: Tuple[Union[str, int], ...] 

1810 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1811 

1812 @overload 

1813 def __getitem__( 

1814 self, key: slice 

1815 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ... 

1816 

1817 def __getitem__( 

1818 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]] 

1819 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]: 

1820 try: 

1821 if isinstance(key, (tuple, slice)): 

1822 if isinstance(key, slice): 

1823 cols = ( 

1824 (sub_key, col) 

1825 for (sub_key, col, _) in self._collection[key] 

1826 ) 

1827 else: 

1828 cols = (self._index[sub_key] for sub_key in key) 

1829 

1830 return ColumnCollection(cols).as_readonly() 

1831 else: 

1832 return self._index[key][1] 

1833 except KeyError as err: 

1834 if isinstance(err.args[0], int): 

1835 raise IndexError(err.args[0]) from err 

1836 else: 

1837 raise 

1838 

1839 def __getattr__(self, key: str) -> _COL_co: 

1840 try: 

1841 return self._index[key][1] 

1842 except KeyError as err: 

1843 raise AttributeError(key) from err 

1844 

1845 def __contains__(self, key: str) -> bool: 

1846 if key not in self._index: 

1847 if not isinstance(key, str): 

1848 raise exc.ArgumentError( 

1849 "__contains__ requires a string argument" 

1850 ) 

1851 return False 

1852 else: 

1853 return True 

1854 

1855 def compare(self, other: ColumnCollection[Any, Any]) -> bool: 

1856 """Compare this :class:`_expression.ColumnCollection` to another 

1857 based on the names of the keys""" 

1858 

1859 for l, r in zip_longest(self, other): 

1860 if l is not r: 

1861 return False 

1862 else: 

1863 return True 

1864 

1865 def __eq__(self, other: Any) -> bool: 

1866 return self.compare(other) 

1867 

1868 @overload 

1869 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ... 

1870 

1871 @overload 

1872 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ... 

1873 

1874 def get( 

1875 self, key: str, default: Optional[_COL] = None 

1876 ) -> Optional[Union[_COL_co, _COL]]: 

1877 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object 

1878 based on a string key name from this 

1879 :class:`_expression.ColumnCollection`.""" 

1880 

1881 if key in self._index: 

1882 return self._index[key][1] 

1883 else: 

1884 return default 

1885 

1886 def __str__(self) -> str: 

1887 return "%s(%s)" % ( 

1888 self.__class__.__name__, 

1889 ", ".join(str(c) for c in self), 

1890 ) 

1891 

1892 def __setitem__(self, key: str, value: Any) -> NoReturn: 

1893 raise NotImplementedError() 

1894 

1895 def __delitem__(self, key: str) -> NoReturn: 

1896 raise NotImplementedError() 

1897 

1898 def __setattr__(self, key: str, obj: Any) -> NoReturn: 

1899 raise NotImplementedError() 

1900 

1901 def clear(self) -> NoReturn: 

1902 """Dictionary clear() is not implemented for 

1903 :class:`_sql.ColumnCollection`.""" 

1904 raise NotImplementedError() 

1905 

1906 def remove(self, column: Any) -> None: 

1907 raise NotImplementedError() 

1908 

1909 def update(self, iter_: Any) -> NoReturn: 

1910 """Dictionary update() is not implemented for 

1911 :class:`_sql.ColumnCollection`.""" 

1912 raise NotImplementedError() 

1913 

1914 # https://github.com/python/mypy/issues/4266 

1915 __hash__ = None # type: ignore 

1916 

1917 def _populate_separate_keys( 

1918 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]] 

1919 ) -> None: 

1920 """populate from an iterator of (key, column)""" 

1921 

1922 self._collection[:] = collection = [ 

1923 (k, c, _ColumnMetrics(self, c)) for k, c in iter_ 

1924 ] 

1925 self._colset.update(c._deannotate() for _, c, _ in collection) 

1926 self._index.update( 

1927 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)} 

1928 ) 

1929 self._index.update({k: (k, col) for k, col, _ in reversed(collection)}) 

1930 

1931 def add( 

1932 self, 

1933 column: ColumnElement[Any], 

1934 key: Optional[_COLKEY] = None, 

1935 ) -> None: 

1936 """Add a column to this :class:`_sql.ColumnCollection`. 

1937 

1938 .. note:: 

1939 

1940 This method is **not normally used by user-facing code**, as the 

1941 :class:`_sql.ColumnCollection` is usually part of an existing 

1942 object such as a :class:`_schema.Table`. To add a 

1943 :class:`_schema.Column` to an existing :class:`_schema.Table` 

1944 object, use the :meth:`_schema.Table.append_column` method. 

1945 

1946 """ 

1947 colkey: _COLKEY 

1948 

1949 if key is None: 

1950 colkey = column.key # type: ignore 

1951 else: 

1952 colkey = key 

1953 

1954 l = len(self._collection) 

1955 

1956 # don't really know how this part is supposed to work w/ the 

1957 # covariant thing 

1958 

1959 _column = cast(_COL_co, column) 

1960 

1961 self._collection.append( 

1962 (colkey, _column, _ColumnMetrics(self, _column)) 

1963 ) 

1964 self._colset.add(_column._deannotate()) 

1965 

1966 self._index[l] = (colkey, _column) 

1967 if colkey not in self._index: 

1968 self._index[colkey] = (colkey, _column) 

1969 

1970 def __getstate__(self) -> Dict[str, Any]: 

1971 return { 

1972 "_collection": [(k, c) for k, c, _ in self._collection], 

1973 "_index": self._index, 

1974 } 

1975 

1976 def __setstate__(self, state: Dict[str, Any]) -> None: 

1977 object.__setattr__(self, "_index", state["_index"]) 

1978 object.__setattr__( 

1979 self, "_proxy_index", collections.defaultdict(util.OrderedSet) 

1980 ) 

1981 object.__setattr__( 

1982 self, 

1983 "_collection", 

1984 [ 

1985 (k, c, _ColumnMetrics(self, c)) 

1986 for (k, c) in state["_collection"] 

1987 ], 

1988 ) 

1989 object.__setattr__( 

1990 self, "_colset", {col for k, col, _ in self._collection} 

1991 ) 

1992 

1993 def contains_column(self, col: ColumnElement[Any]) -> bool: 

1994 """Checks if a column object exists in this collection""" 

1995 if col not in self._colset: 

1996 if isinstance(col, str): 

1997 raise exc.ArgumentError( 

1998 "contains_column cannot be used with string arguments. " 

1999 "Use ``col_name in table.c`` instead." 

2000 ) 

2001 return False 

2002 else: 

2003 return True 

2004 

2005 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: 

2006 """Return a "read only" form of this 

2007 :class:`_sql.ColumnCollection`.""" 

2008 

2009 return ReadOnlyColumnCollection(self) 

2010 

2011 def _init_proxy_index(self): 

2012 """populate the "proxy index", if empty. 

2013 

2014 proxy index is added in 2.0 to provide more efficient operation 

2015 for the corresponding_column() method. 

2016 

2017 For reasons of both time to construct new .c collections as well as 

2018 memory conservation for large numbers of large .c collections, the 

2019 proxy_index is only filled if corresponding_column() is called. once 

2020 filled it stays that way, and new _ColumnMetrics objects created after 

2021 that point will populate it with new data. Note this case would be 

2022 unusual, if not nonexistent, as it means a .c collection is being 

2023 mutated after corresponding_column() were used, however it is tested in 

2024 test/base/test_utils.py. 

2025 

2026 """ 

2027 pi = self._proxy_index 

2028 if pi: 

2029 return 

2030 

2031 for _, _, metrics in self._collection: 

2032 eps = metrics.column._expanded_proxy_set 

2033 

2034 for eps_col in eps: 

2035 pi[eps_col].add(metrics) 

2036 

2037 def corresponding_column( 

2038 self, column: _COL, require_embedded: bool = False 

2039 ) -> Optional[Union[_COL, _COL_co]]: 

2040 """Given a :class:`_expression.ColumnElement`, return the exported 

2041 :class:`_expression.ColumnElement` object from this 

2042 :class:`_expression.ColumnCollection` 

2043 which corresponds to that original :class:`_expression.ColumnElement` 

2044 via a common 

2045 ancestor column. 

2046 

2047 :param column: the target :class:`_expression.ColumnElement` 

2048 to be matched. 

2049 

2050 :param require_embedded: only return corresponding columns for 

2051 the given :class:`_expression.ColumnElement`, if the given 

2052 :class:`_expression.ColumnElement` 

2053 is actually present within a sub-element 

2054 of this :class:`_expression.Selectable`. 

2055 Normally the column will match if 

2056 it merely shares a common ancestor with one of the exported 

2057 columns of this :class:`_expression.Selectable`. 

2058 

2059 .. seealso:: 

2060 

2061 :meth:`_expression.Selectable.corresponding_column` 

2062 - invokes this method 

2063 against the collection returned by 

2064 :attr:`_expression.Selectable.exported_columns`. 

2065 

2066 .. versionchanged:: 1.4 the implementation for ``corresponding_column`` 

2067 was moved onto the :class:`_expression.ColumnCollection` itself. 

2068 

2069 """ 

2070 # TODO: cython candidate 

2071 

2072 # don't dig around if the column is locally present 

2073 if column in self._colset: 

2074 return column 

2075 

2076 selected_intersection, selected_metrics = None, None 

2077 target_set = column.proxy_set 

2078 

2079 pi = self._proxy_index 

2080 if not pi: 

2081 self._init_proxy_index() 

2082 

2083 for current_metrics in ( 

2084 mm for ts in target_set if ts in pi for mm in pi[ts] 

2085 ): 

2086 if not require_embedded or current_metrics.embedded(target_set): 

2087 if selected_metrics is None: 

2088 # no corresponding column yet, pick this one. 

2089 selected_metrics = current_metrics 

2090 continue 

2091 

2092 current_intersection = target_set.intersection( 

2093 current_metrics.column._expanded_proxy_set 

2094 ) 

2095 if selected_intersection is None: 

2096 selected_intersection = target_set.intersection( 

2097 selected_metrics.column._expanded_proxy_set 

2098 ) 

2099 

2100 if len(current_intersection) > len(selected_intersection): 

2101 # 'current' has a larger field of correspondence than 

2102 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x 

2103 # matches a1.c.x->table.c.x better than 

2104 # selectable.c.x->table.c.x does. 

2105 

2106 selected_metrics = current_metrics 

2107 selected_intersection = current_intersection 

2108 elif current_intersection == selected_intersection: 

2109 # they have the same field of correspondence. see 

2110 # which proxy_set has fewer columns in it, which 

2111 # indicates a closer relationship with the root 

2112 # column. Also take into account the "weight" 

2113 # attribute which CompoundSelect() uses to give 

2114 # higher precedence to columns based on vertical 

2115 # position in the compound statement, and discard 

2116 # columns that have no reference to the target 

2117 # column (also occurs with CompoundSelect) 

2118 

2119 selected_col_distance = sum( 

2120 [ 

2121 sc._annotations.get("weight", 1) 

2122 for sc in ( 

2123 selected_metrics.column._uncached_proxy_list() 

2124 ) 

2125 if sc.shares_lineage(column) 

2126 ], 

2127 ) 

2128 current_col_distance = sum( 

2129 [ 

2130 sc._annotations.get("weight", 1) 

2131 for sc in ( 

2132 current_metrics.column._uncached_proxy_list() 

2133 ) 

2134 if sc.shares_lineage(column) 

2135 ], 

2136 ) 

2137 if current_col_distance < selected_col_distance: 

2138 selected_metrics = current_metrics 

2139 selected_intersection = current_intersection 

2140 

2141 return selected_metrics.column if selected_metrics else None 

2142 

2143 

2144_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]") 

2145 

2146 

2147class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]): 

2148 """A :class:`_expression.ColumnCollection` 

2149 that maintains deduplicating behavior. 

2150 

2151 This is useful by schema level objects such as :class:`_schema.Table` and 

2152 :class:`.PrimaryKeyConstraint`. The collection includes more 

2153 sophisticated mutator methods as well to suit schema objects which 

2154 require mutable column collections. 

2155 

2156 .. versionadded:: 1.4 

2157 

2158 """ 

2159 

2160 def add( # type: ignore[override] 

2161 self, 

2162 column: _NAMEDCOL, 

2163 key: Optional[str] = None, 

2164 *, 

2165 index: Optional[int] = None, 

2166 ) -> None: 

2167 if key is not None and column.key != key: 

2168 raise exc.ArgumentError( 

2169 "DedupeColumnCollection requires columns be under " 

2170 "the same key as their .key" 

2171 ) 

2172 key = column.key 

2173 

2174 if key is None: 

2175 raise exc.ArgumentError( 

2176 "Can't add unnamed column to column collection" 

2177 ) 

2178 

2179 if key in self._index: 

2180 existing = self._index[key][1] 

2181 

2182 if existing is column: 

2183 return 

2184 

2185 self.replace(column, index=index) 

2186 

2187 # pop out memoized proxy_set as this 

2188 # operation may very well be occurring 

2189 # in a _make_proxy operation 

2190 util.memoized_property.reset(column, "proxy_set") 

2191 else: 

2192 self._append_new_column(key, column, index=index) 

2193 

2194 def _append_new_column( 

2195 self, key: str, named_column: _NAMEDCOL, *, index: Optional[int] = None 

2196 ) -> None: 

2197 collection_length = len(self._collection) 

2198 

2199 if index is None: 

2200 l = collection_length 

2201 else: 

2202 if index < 0: 

2203 index = max(0, collection_length + index) 

2204 l = index 

2205 

2206 if index is None: 

2207 self._collection.append( 

2208 (key, named_column, _ColumnMetrics(self, named_column)) 

2209 ) 

2210 else: 

2211 self._collection.insert( 

2212 index, (key, named_column, _ColumnMetrics(self, named_column)) 

2213 ) 

2214 

2215 self._colset.add(named_column._deannotate()) 

2216 

2217 if index is not None: 

2218 for idx in reversed(range(index, collection_length)): 

2219 self._index[idx + 1] = self._index[idx] 

2220 

2221 self._index[l] = (key, named_column) 

2222 self._index[key] = (key, named_column) 

2223 

2224 def _populate_separate_keys( 

2225 self, iter_: Iterable[Tuple[str, _NAMEDCOL]] 

2226 ) -> None: 

2227 """populate from an iterator of (key, column)""" 

2228 cols = list(iter_) 

2229 

2230 replace_col = [] 

2231 for k, col in cols: 

2232 if col.key != k: 

2233 raise exc.ArgumentError( 

2234 "DedupeColumnCollection requires columns be under " 

2235 "the same key as their .key" 

2236 ) 

2237 if col.name in self._index and col.key != col.name: 

2238 replace_col.append(col) 

2239 elif col.key in self._index: 

2240 replace_col.append(col) 

2241 else: 

2242 self._index[k] = (k, col) 

2243 self._collection.append((k, col, _ColumnMetrics(self, col))) 

2244 self._colset.update(c._deannotate() for (k, c, _) in self._collection) 

2245 

2246 self._index.update( 

2247 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection) 

2248 ) 

2249 for col in replace_col: 

2250 self.replace(col) 

2251 

2252 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None: 

2253 self._populate_separate_keys((col.key, col) for col in iter_) 

2254 

2255 def remove(self, column: _NAMEDCOL) -> None: 

2256 if column not in self._colset: 

2257 raise ValueError( 

2258 "Can't remove column %r; column is not in this collection" 

2259 % column 

2260 ) 

2261 del self._index[column.key] 

2262 self._colset.remove(column) 

2263 self._collection[:] = [ 

2264 (k, c, metrics) 

2265 for (k, c, metrics) in self._collection 

2266 if c is not column 

2267 ] 

2268 for metrics in self._proxy_index.get(column, ()): 

2269 metrics.dispose(self) 

2270 

2271 self._index.update( 

2272 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2273 ) 

2274 # delete higher index 

2275 del self._index[len(self._collection)] 

2276 

2277 def replace( 

2278 self, 

2279 column: _NAMEDCOL, 

2280 *, 

2281 extra_remove: Optional[Iterable[_NAMEDCOL]] = None, 

2282 index: Optional[int] = None, 

2283 ) -> None: 

2284 """add the given column to this collection, removing unaliased 

2285 versions of this column as well as existing columns with the 

2286 same key. 

2287 

2288 e.g.:: 

2289 

2290 t = Table("sometable", metadata, Column("col1", Integer)) 

2291 t.columns.replace(Column("col1", Integer, key="columnone")) 

2292 

2293 will remove the original 'col1' from the collection, and add 

2294 the new column under the name 'columnname'. 

2295 

2296 Used by schema.Column to override columns during table reflection. 

2297 

2298 """ 

2299 

2300 if extra_remove: 

2301 remove_col = set(extra_remove) 

2302 else: 

2303 remove_col = set() 

2304 # remove up to two columns based on matches of name as well as key 

2305 if column.name in self._index and column.key != column.name: 

2306 other = self._index[column.name][1] 

2307 if other.name == other.key: 

2308 remove_col.add(other) 

2309 

2310 if column.key in self._index: 

2311 remove_col.add(self._index[column.key][1]) 

2312 

2313 if not remove_col: 

2314 self._append_new_column(column.key, column, index=index) 

2315 return 

2316 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = [] 

2317 replace_index = None 

2318 

2319 for idx, (k, col, metrics) in enumerate(self._collection): 

2320 if col in remove_col: 

2321 if replace_index is None: 

2322 replace_index = idx 

2323 new_cols.append( 

2324 (column.key, column, _ColumnMetrics(self, column)) 

2325 ) 

2326 else: 

2327 new_cols.append((k, col, metrics)) 

2328 

2329 if remove_col: 

2330 self._colset.difference_update(remove_col) 

2331 

2332 for rc in remove_col: 

2333 for metrics in self._proxy_index.get(rc, ()): 

2334 metrics.dispose(self) 

2335 

2336 if replace_index is None: 

2337 if index is not None: 

2338 new_cols.insert( 

2339 index, (column.key, column, _ColumnMetrics(self, column)) 

2340 ) 

2341 

2342 else: 

2343 new_cols.append( 

2344 (column.key, column, _ColumnMetrics(self, column)) 

2345 ) 

2346 elif index is not None: 

2347 to_move = new_cols[replace_index] 

2348 effective_positive_index = ( 

2349 index if index >= 0 else max(0, len(new_cols) + index) 

2350 ) 

2351 new_cols.insert(index, to_move) 

2352 if replace_index > effective_positive_index: 

2353 del new_cols[replace_index + 1] 

2354 else: 

2355 del new_cols[replace_index] 

2356 

2357 self._colset.add(column._deannotate()) 

2358 self._collection[:] = new_cols 

2359 

2360 self._index.clear() 

2361 

2362 self._index.update( 

2363 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)} 

2364 ) 

2365 self._index.update({k: (k, col) for (k, col, _) in self._collection}) 

2366 

2367 

2368class ReadOnlyColumnCollection( 

2369 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co] 

2370): 

2371 __slots__ = ("_parent",) 

2372 

2373 def __init__(self, collection): 

2374 object.__setattr__(self, "_parent", collection) 

2375 object.__setattr__(self, "_colset", collection._colset) 

2376 object.__setattr__(self, "_index", collection._index) 

2377 object.__setattr__(self, "_collection", collection._collection) 

2378 object.__setattr__(self, "_proxy_index", collection._proxy_index) 

2379 

2380 def __getstate__(self): 

2381 return {"_parent": self._parent} 

2382 

2383 def __setstate__(self, state): 

2384 parent = state["_parent"] 

2385 self.__init__(parent) # type: ignore 

2386 

2387 def add(self, column: Any, key: Any = ...) -> Any: 

2388 self._readonly() 

2389 

2390 def extend(self, elements: Any) -> NoReturn: 

2391 self._readonly() 

2392 

2393 def remove(self, item: Any) -> NoReturn: 

2394 self._readonly() 

2395 

2396 

2397class ColumnSet(util.OrderedSet["ColumnClause[Any]"]): 

2398 def contains_column(self, col): 

2399 return col in self 

2400 

2401 def extend(self, cols): 

2402 for col in cols: 

2403 self.add(col) 

2404 

2405 def __eq__(self, other): 

2406 l = [] 

2407 for c in other: 

2408 for local in self: 

2409 if c.shares_lineage(local): 

2410 l.append(c == local) 

2411 return elements.and_(*l) 

2412 

2413 def __hash__(self): # type: ignore[override] 

2414 return hash(tuple(x for x in self)) 

2415 

2416 

2417def _entity_namespace( 

2418 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2419) -> _EntityNamespace: 

2420 """Return the nearest .entity_namespace for the given entity. 

2421 

2422 If not immediately available, does an iterate to find a sub-element 

2423 that has one, if any. 

2424 

2425 """ 

2426 try: 

2427 return cast(_HasEntityNamespace, entity).entity_namespace 

2428 except AttributeError: 

2429 for elem in visitors.iterate(cast(ExternallyTraversible, entity)): 

2430 if _is_has_entity_namespace(elem): 

2431 return elem.entity_namespace 

2432 else: 

2433 raise 

2434 

2435 

2436def _entity_namespace_key( 

2437 entity: Union[_HasEntityNamespace, ExternallyTraversible], 

2438 key: str, 

2439 default: Union[SQLCoreOperations[Any], _NoArg] = NO_ARG, 

2440) -> SQLCoreOperations[Any]: 

2441 """Return an entry from an entity_namespace. 

2442 

2443 

2444 Raises :class:`_exc.InvalidRequestError` rather than attribute error 

2445 on not found. 

2446 

2447 """ 

2448 

2449 try: 

2450 ns = _entity_namespace(entity) 

2451 if default is not NO_ARG: 

2452 return getattr(ns, key, default) 

2453 else: 

2454 return getattr(ns, key) # type: ignore 

2455 except AttributeError as err: 

2456 raise exc.InvalidRequestError( 

2457 'Entity namespace for "%s" has no property "%s"' % (entity, key) 

2458 ) from err