1# sql/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Foundational utilities common to many sql modules."""
10
11
12from __future__ import annotations
13
14import collections
15from enum import Enum
16import itertools
17from itertools import zip_longest
18import operator
19import re
20from typing import Any
21from typing import Callable
22from typing import cast
23from typing import Dict
24from typing import FrozenSet
25from typing import Generator
26from typing import Generic
27from typing import Iterable
28from typing import Iterator
29from typing import List
30from typing import Mapping
31from typing import MutableMapping
32from typing import NamedTuple
33from typing import NoReturn
34from typing import Optional
35from typing import overload
36from typing import Sequence
37from typing import Set
38from typing import Tuple
39from typing import Type
40from typing import TYPE_CHECKING
41from typing import TypeVar
42from typing import Union
43
44from . import roles
45from . import visitors
46from .cache_key import HasCacheKey # noqa
47from .cache_key import MemoizedHasCacheKey # noqa
48from .traversals import HasCopyInternals # noqa
49from .visitors import ClauseVisitor
50from .visitors import ExtendedInternalTraversal
51from .visitors import ExternallyTraversible
52from .visitors import InternalTraversal
53from .. import event
54from .. import exc
55from .. import util
56from ..util import HasMemoized as HasMemoized
57from ..util import hybridmethod
58from ..util import typing as compat_typing
59from ..util.typing import Final
60from ..util.typing import Protocol
61from ..util.typing import Self
62from ..util.typing import TypeGuard
63
64if TYPE_CHECKING:
65 from . import coercions
66 from . import elements
67 from . import type_api
68 from ._orm_types import DMLStrategyArgument
69 from ._orm_types import SynchronizeSessionArgument
70 from ._typing import _CLE
71 from .cache_key import CacheKey
72 from .compiler import SQLCompiler
73 from .elements import BindParameter
74 from .elements import ClauseList
75 from .elements import ColumnClause # noqa
76 from .elements import ColumnElement
77 from .elements import NamedColumn
78 from .elements import SQLCoreOperations
79 from .elements import TextClause
80 from .schema import Column
81 from .schema import DefaultGenerator
82 from .selectable import _JoinTargetElement
83 from .selectable import _SelectIterable
84 from .selectable import FromClause
85 from .visitors import anon_map
86 from ..engine import Connection
87 from ..engine import CursorResult
88 from ..engine.interfaces import _CoreMultiExecuteParams
89 from ..engine.interfaces import _ExecuteOptions
90 from ..engine.interfaces import _ImmutableExecuteOptions
91 from ..engine.interfaces import CacheStats
92 from ..engine.interfaces import Compiled
93 from ..engine.interfaces import CompiledCacheType
94 from ..engine.interfaces import CoreExecuteOptionsParameter
95 from ..engine.interfaces import Dialect
96 from ..engine.interfaces import IsolationLevel
97 from ..engine.interfaces import SchemaTranslateMapType
98 from ..event import dispatcher
99
100if not TYPE_CHECKING:
101 coercions = None # noqa
102 elements = None # noqa
103 type_api = None # noqa
104
105
106class _NoArg(Enum):
107 NO_ARG = 0
108
109 def __repr__(self):
110 return f"_NoArg.{self.name}"
111
112
113NO_ARG: Final = _NoArg.NO_ARG
114
115
116class _NoneName(Enum):
117 NONE_NAME = 0
118 """indicate a 'deferred' name that was ultimately the value None."""
119
120
121_NONE_NAME: Final = _NoneName.NONE_NAME
122
123_T = TypeVar("_T", bound=Any)
124
125_Fn = TypeVar("_Fn", bound=Callable[..., Any])
126
127_AmbiguousTableNameMap = MutableMapping[str, str]
128
129
130class _DefaultDescriptionTuple(NamedTuple):
131 arg: Any
132 is_scalar: Optional[bool]
133 is_callable: Optional[bool]
134 is_sentinel: Optional[bool]
135
136 @classmethod
137 def _from_column_default(
138 cls, default: Optional[DefaultGenerator]
139 ) -> _DefaultDescriptionTuple:
140 return (
141 _DefaultDescriptionTuple(
142 default.arg, # type: ignore
143 default.is_scalar,
144 default.is_callable,
145 default.is_sentinel,
146 )
147 if default
148 and (
149 default.has_arg
150 or (not default.for_update and default.is_sentinel)
151 )
152 else _DefaultDescriptionTuple(None, None, None, None)
153 )
154
155
156_never_select_column: operator.attrgetter[Any] = operator.attrgetter(
157 "_omit_from_statements"
158)
159
160
161class _EntityNamespace(Protocol):
162 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ...
163
164
165class _HasEntityNamespace(Protocol):
166 @util.ro_non_memoized_property
167 def entity_namespace(self) -> _EntityNamespace: ...
168
169
170def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]:
171 return hasattr(element, "entity_namespace")
172
173
174# Remove when https://github.com/python/mypy/issues/14640 will be fixed
175_Self = TypeVar("_Self", bound=Any)
176
177
178class Immutable:
179 """mark a ClauseElement as 'immutable' when expressions are cloned.
180
181 "immutable" objects refers to the "mutability" of an object in the
182 context of SQL DQL and DML generation. Such as, in DQL, one can
183 compose a SELECT or subquery of varied forms, but one cannot modify
184 the structure of a specific table or column within DQL.
185 :class:`.Immutable` is mostly intended to follow this concept, and as
186 such the primary "immutable" objects are :class:`.ColumnClause`,
187 :class:`.Column`, :class:`.TableClause`, :class:`.Table`.
188
189 """
190
191 __slots__ = ()
192
193 _is_immutable: bool = True
194
195 def unique_params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
196 raise NotImplementedError("Immutable objects do not support copying")
197
198 def params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
199 raise NotImplementedError("Immutable objects do not support copying")
200
201 def _clone(self: _Self, **kw: Any) -> _Self:
202 return self
203
204 def _copy_internals(
205 self, *, omit_attrs: Iterable[str] = (), **kw: Any
206 ) -> None:
207 pass
208
209
210class SingletonConstant(Immutable):
211 """Represent SQL constants like NULL, TRUE, FALSE"""
212
213 _is_singleton_constant: bool = True
214
215 _singleton: SingletonConstant
216
217 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T:
218 return cast(_T, cls._singleton)
219
220 @util.non_memoized_property
221 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
222 raise NotImplementedError()
223
224 @classmethod
225 def _create_singleton(cls) -> None:
226 obj = object.__new__(cls)
227 obj.__init__() # type: ignore
228
229 # for a long time this was an empty frozenset, meaning
230 # a SingletonConstant would never be a "corresponding column" in
231 # a statement. This referred to #6259. However, in #7154 we see
232 # that we do in fact need "correspondence" to work when matching cols
233 # in result sets, so the non-correspondence was moved to a more
234 # specific level when we are actually adapting expressions for SQL
235 # render only.
236 obj.proxy_set = frozenset([obj])
237 cls._singleton = obj
238
239
240def _from_objects(
241 *elements: Union[
242 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement
243 ]
244) -> Iterator[FromClause]:
245 return itertools.chain.from_iterable(
246 [element._from_objects for element in elements]
247 )
248
249
250def _select_iterables(
251 elements: Iterable[roles.ColumnsClauseRole],
252) -> _SelectIterable:
253 """expand tables into individual columns in the
254 given list of column expressions.
255
256 """
257 return itertools.chain.from_iterable(
258 [c._select_iterable for c in elements]
259 )
260
261
262_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType")
263
264
265class _GenerativeType(compat_typing.Protocol):
266 def _generate(self) -> Self: ...
267
268
269def _generative(fn: _Fn) -> _Fn:
270 """non-caching _generative() decorator.
271
272 This is basically the legacy decorator that copies the object and
273 runs a method on the new copy.
274
275 """
276
277 @util.decorator
278 def _generative(
279 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any
280 ) -> _SelfGenerativeType:
281 """Mark a method as generative."""
282
283 self = self._generate()
284 x = fn(self, *args, **kw)
285 assert x is self, "generative methods must return self"
286 return self
287
288 decorated = _generative(fn)
289 decorated.non_generative = fn # type: ignore
290 return decorated
291
292
293def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]:
294 msgs: Dict[str, str] = kw.pop("msgs", {})
295
296 defaults: Dict[str, str] = kw.pop("defaults", {})
297
298 getters: List[Tuple[str, operator.attrgetter[Any], Optional[str]]] = [
299 (name, operator.attrgetter(name), defaults.get(name, None))
300 for name in names
301 ]
302
303 @util.decorator
304 def check(fn: _Fn, *args: Any, **kw: Any) -> Any:
305 # make pylance happy by not including "self" in the argument
306 # list
307 self = args[0]
308 args = args[1:]
309 for name, getter, default_ in getters:
310 if getter(self) is not default_:
311 msg = msgs.get(
312 name,
313 "Method %s() has already been invoked on this %s construct"
314 % (fn.__name__, self.__class__),
315 )
316 raise exc.InvalidRequestError(msg)
317 return fn(self, *args, **kw)
318
319 return check
320
321
322def _clone(element, **kw):
323 return element._clone(**kw)
324
325
326def _expand_cloned(
327 elements: Iterable[_CLE],
328) -> Iterable[_CLE]:
329 """expand the given set of ClauseElements to be the set of all 'cloned'
330 predecessors.
331
332 """
333 # TODO: cython candidate
334 return itertools.chain(*[x._cloned_set for x in elements])
335
336
337def _de_clone(
338 elements: Iterable[_CLE],
339) -> Iterable[_CLE]:
340 for x in elements:
341 while x._is_clone_of is not None:
342 x = x._is_clone_of
343 yield x
344
345
346def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
347 """return the intersection of sets a and b, counting
348 any overlap between 'cloned' predecessors.
349
350 The returned set is in terms of the entities present within 'a'.
351
352 """
353 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
354 _expand_cloned(b)
355 )
356 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)}
357
358
359def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
360 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
361 _expand_cloned(b)
362 )
363 return {
364 elem for elem in a if not all_overlap.intersection(elem._cloned_set)
365 }
366
367
368class _DialectArgView(MutableMapping[str, Any]):
369 """A dictionary view of dialect-level arguments in the form
370 <dialectname>_<argument_name>.
371
372 """
373
374 __slots__ = ("obj",)
375
376 def __init__(self, obj: DialectKWArgs) -> None:
377 self.obj = obj
378
379 def _key(self, key: str) -> Tuple[str, str]:
380 try:
381 dialect, value_key = key.split("_", 1)
382 except ValueError as err:
383 raise KeyError(key) from err
384 else:
385 return dialect, value_key
386
387 def __getitem__(self, key: str) -> Any:
388 dialect, value_key = self._key(key)
389
390 try:
391 opt = self.obj.dialect_options[dialect]
392 except exc.NoSuchModuleError as err:
393 raise KeyError(key) from err
394 else:
395 return opt[value_key]
396
397 def __setitem__(self, key: str, value: Any) -> None:
398 try:
399 dialect, value_key = self._key(key)
400 except KeyError as err:
401 raise exc.ArgumentError(
402 "Keys must be of the form <dialectname>_<argname>"
403 ) from err
404 else:
405 self.obj.dialect_options[dialect][value_key] = value
406
407 def __delitem__(self, key: str) -> None:
408 dialect, value_key = self._key(key)
409 del self.obj.dialect_options[dialect][value_key]
410
411 def __len__(self) -> int:
412 return sum(
413 len(args._non_defaults)
414 for args in self.obj.dialect_options.values()
415 )
416
417 def __iter__(self) -> Generator[str, None, None]:
418 return (
419 "%s_%s" % (dialect_name, value_name)
420 for dialect_name in self.obj.dialect_options
421 for value_name in self.obj.dialect_options[
422 dialect_name
423 ]._non_defaults
424 )
425
426
427class _DialectArgDict(MutableMapping[str, Any]):
428 """A dictionary view of dialect-level arguments for a specific
429 dialect.
430
431 Maintains a separate collection of user-specified arguments
432 and dialect-specified default arguments.
433
434 """
435
436 def __init__(self) -> None:
437 self._non_defaults: Dict[str, Any] = {}
438 self._defaults: Dict[str, Any] = {}
439
440 def __len__(self) -> int:
441 return len(set(self._non_defaults).union(self._defaults))
442
443 def __iter__(self) -> Iterator[str]:
444 return iter(set(self._non_defaults).union(self._defaults))
445
446 def __getitem__(self, key: str) -> Any:
447 if key in self._non_defaults:
448 return self._non_defaults[key]
449 else:
450 return self._defaults[key]
451
452 def __setitem__(self, key: str, value: Any) -> None:
453 self._non_defaults[key] = value
454
455 def __delitem__(self, key: str) -> None:
456 del self._non_defaults[key]
457
458
459@util.preload_module("sqlalchemy.dialects")
460def _kw_reg_for_dialect(dialect_name: str) -> Optional[Dict[Any, Any]]:
461 dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
462 if dialect_cls.construct_arguments is None:
463 return None
464 return dict(dialect_cls.construct_arguments)
465
466
467class DialectKWArgs:
468 """Establish the ability for a class to have dialect-specific arguments
469 with defaults and constructor validation.
470
471 The :class:`.DialectKWArgs` interacts with the
472 :attr:`.DefaultDialect.construct_arguments` present on a dialect.
473
474 .. seealso::
475
476 :attr:`.DefaultDialect.construct_arguments`
477
478 """
479
480 __slots__ = ()
481
482 _dialect_kwargs_traverse_internals: List[Tuple[str, Any]] = [
483 ("dialect_options", InternalTraversal.dp_dialect_options)
484 ]
485
486 @classmethod
487 def argument_for(
488 cls, dialect_name: str, argument_name: str, default: Any
489 ) -> None:
490 """Add a new kind of dialect-specific keyword argument for this class.
491
492 E.g.::
493
494 Index.argument_for("mydialect", "length", None)
495
496 some_index = Index("a", "b", mydialect_length=5)
497
498 The :meth:`.DialectKWArgs.argument_for` method is a per-argument
499 way adding extra arguments to the
500 :attr:`.DefaultDialect.construct_arguments` dictionary. This
501 dictionary provides a list of argument names accepted by various
502 schema-level constructs on behalf of a dialect.
503
504 New dialects should typically specify this dictionary all at once as a
505 data member of the dialect class. The use case for ad-hoc addition of
506 argument names is typically for end-user code that is also using
507 a custom compilation scheme which consumes the additional arguments.
508
509 :param dialect_name: name of a dialect. The dialect must be
510 locatable, else a :class:`.NoSuchModuleError` is raised. The
511 dialect must also include an existing
512 :attr:`.DefaultDialect.construct_arguments` collection, indicating
513 that it participates in the keyword-argument validation and default
514 system, else :class:`.ArgumentError` is raised. If the dialect does
515 not include this collection, then any keyword argument can be
516 specified on behalf of this dialect already. All dialects packaged
517 within SQLAlchemy include this collection, however for third party
518 dialects, support may vary.
519
520 :param argument_name: name of the parameter.
521
522 :param default: default value of the parameter.
523
524 """
525
526 construct_arg_dictionary: Optional[Dict[Any, Any]] = (
527 DialectKWArgs._kw_registry[dialect_name]
528 )
529 if construct_arg_dictionary is None:
530 raise exc.ArgumentError(
531 "Dialect '%s' does have keyword-argument "
532 "validation and defaults enabled configured" % dialect_name
533 )
534 if cls not in construct_arg_dictionary:
535 construct_arg_dictionary[cls] = {}
536 construct_arg_dictionary[cls][argument_name] = default
537
538 @property
539 def dialect_kwargs(self) -> _DialectArgView:
540 """A collection of keyword arguments specified as dialect-specific
541 options to this construct.
542
543 The arguments are present here in their original ``<dialect>_<kwarg>``
544 format. Only arguments that were actually passed are included;
545 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which
546 contains all options known by this dialect including defaults.
547
548 The collection is also writable; keys are accepted of the
549 form ``<dialect>_<kwarg>`` where the value will be assembled
550 into the list of options.
551
552 .. seealso::
553
554 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form
555
556 """
557 return _DialectArgView(self)
558
559 @property
560 def kwargs(self) -> _DialectArgView:
561 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
562 return self.dialect_kwargs
563
564 _kw_registry: util.PopulateDict[str, Optional[Dict[Any, Any]]] = (
565 util.PopulateDict(_kw_reg_for_dialect)
566 )
567
568 @classmethod
569 def _kw_reg_for_dialect_cls(cls, dialect_name: str) -> _DialectArgDict:
570 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
571 d = _DialectArgDict()
572
573 if construct_arg_dictionary is None:
574 d._defaults.update({"*": None})
575 else:
576 for cls in reversed(cls.__mro__):
577 if cls in construct_arg_dictionary:
578 d._defaults.update(construct_arg_dictionary[cls])
579 return d
580
581 @util.memoized_property
582 def dialect_options(self) -> util.PopulateDict[str, _DialectArgDict]:
583 """A collection of keyword arguments specified as dialect-specific
584 options to this construct.
585
586 This is a two-level nested registry, keyed to ``<dialect_name>``
587 and ``<argument_name>``. For example, the ``postgresql_where``
588 argument would be locatable as::
589
590 arg = my_object.dialect_options["postgresql"]["where"]
591
592 .. versionadded:: 0.9.2
593
594 .. seealso::
595
596 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form
597
598 """
599
600 return util.PopulateDict(self._kw_reg_for_dialect_cls)
601
602 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None:
603 # validate remaining kwargs that they all specify DB prefixes
604
605 if not kwargs:
606 return
607
608 for k in kwargs:
609 m = re.match("^(.+?)_(.+)$", k)
610 if not m:
611 raise TypeError(
612 "Additional arguments should be "
613 "named <dialectname>_<argument>, got '%s'" % k
614 )
615 dialect_name, arg_name = m.group(1, 2)
616
617 try:
618 construct_arg_dictionary = self.dialect_options[dialect_name]
619 except exc.NoSuchModuleError:
620 util.warn(
621 "Can't validate argument %r; can't "
622 "locate any SQLAlchemy dialect named %r"
623 % (k, dialect_name)
624 )
625 self.dialect_options[dialect_name] = d = _DialectArgDict()
626 d._defaults.update({"*": None})
627 d._non_defaults[arg_name] = kwargs[k]
628 else:
629 if (
630 "*" not in construct_arg_dictionary
631 and arg_name not in construct_arg_dictionary
632 ):
633 raise exc.ArgumentError(
634 "Argument %r is not accepted by "
635 "dialect %r on behalf of %r"
636 % (k, dialect_name, self.__class__)
637 )
638 else:
639 construct_arg_dictionary[arg_name] = kwargs[k]
640
641
642class CompileState:
643 """Produces additional object state necessary for a statement to be
644 compiled.
645
646 the :class:`.CompileState` class is at the base of classes that assemble
647 state for a particular statement object that is then used by the
648 compiler. This process is essentially an extension of the process that
649 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis
650 on converting raw user intent into more organized structures rather than
651 producing string output. The top-level :class:`.CompileState` for the
652 statement being executed is also accessible when the execution context
653 works with invoking the statement and collecting results.
654
655 The production of :class:`.CompileState` is specific to the compiler, such
656 as within the :meth:`.SQLCompiler.visit_insert`,
657 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also
658 responsible for associating the :class:`.CompileState` with the
659 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement,
660 i.e. the outermost SQL statement that's actually being executed.
661 There can be other :class:`.CompileState` objects that are not the
662 toplevel, such as when a SELECT subquery or CTE-nested
663 INSERT/UPDATE/DELETE is generated.
664
665 .. versionadded:: 1.4
666
667 """
668
669 __slots__ = ("statement", "_ambiguous_table_name_map")
670
671 plugins: Dict[Tuple[str, str], Type[CompileState]] = {}
672
673 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap]
674
675 @classmethod
676 def create_for_statement(
677 cls, statement: Executable, compiler: SQLCompiler, **kw: Any
678 ) -> CompileState:
679 # factory construction.
680
681 if statement._propagate_attrs:
682 plugin_name = statement._propagate_attrs.get(
683 "compile_state_plugin", "default"
684 )
685 klass = cls.plugins.get(
686 (plugin_name, statement._effective_plugin_target), None
687 )
688 if klass is None:
689 klass = cls.plugins[
690 ("default", statement._effective_plugin_target)
691 ]
692
693 else:
694 klass = cls.plugins[
695 ("default", statement._effective_plugin_target)
696 ]
697
698 if klass is cls:
699 return cls(statement, compiler, **kw)
700 else:
701 return klass.create_for_statement(statement, compiler, **kw)
702
703 def __init__(self, statement, compiler, **kw):
704 self.statement = statement
705
706 @classmethod
707 def get_plugin_class(
708 cls, statement: Executable
709 ) -> Optional[Type[CompileState]]:
710 plugin_name = statement._propagate_attrs.get(
711 "compile_state_plugin", None
712 )
713
714 if plugin_name:
715 key = (plugin_name, statement._effective_plugin_target)
716 if key in cls.plugins:
717 return cls.plugins[key]
718
719 # there's no case where we call upon get_plugin_class() and want
720 # to get None back, there should always be a default. return that
721 # if there was no plugin-specific class (e.g. Insert with "orm"
722 # plugin)
723 try:
724 return cls.plugins[("default", statement._effective_plugin_target)]
725 except KeyError:
726 return None
727
728 @classmethod
729 def _get_plugin_class_for_plugin(
730 cls, statement: Executable, plugin_name: str
731 ) -> Optional[Type[CompileState]]:
732 try:
733 return cls.plugins[
734 (plugin_name, statement._effective_plugin_target)
735 ]
736 except KeyError:
737 return None
738
739 @classmethod
740 def plugin_for(
741 cls, plugin_name: str, visit_name: str
742 ) -> Callable[[_Fn], _Fn]:
743 def decorate(cls_to_decorate):
744 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate
745 return cls_to_decorate
746
747 return decorate
748
749
750class Generative(HasMemoized):
751 """Provide a method-chaining pattern in conjunction with the
752 @_generative decorator."""
753
754 def _generate(self) -> Self:
755 skip = self._memoized_keys
756 cls = self.__class__
757 s = cls.__new__(cls)
758 if skip:
759 # ensure this iteration remains atomic
760 s.__dict__ = {
761 k: v for k, v in self.__dict__.copy().items() if k not in skip
762 }
763 else:
764 s.__dict__ = self.__dict__.copy()
765 return s
766
767
768class InPlaceGenerative(HasMemoized):
769 """Provide a method-chaining pattern in conjunction with the
770 @_generative decorator that mutates in place."""
771
772 __slots__ = ()
773
774 def _generate(self):
775 skip = self._memoized_keys
776 # note __dict__ needs to be in __slots__ if this is used
777 for k in skip:
778 self.__dict__.pop(k, None)
779 return self
780
781
782class HasCompileState(Generative):
783 """A class that has a :class:`.CompileState` associated with it."""
784
785 _compile_state_plugin: Optional[Type[CompileState]] = None
786
787 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT
788
789 _compile_state_factory = CompileState.create_for_statement
790
791
792class _MetaOptions(type):
793 """metaclass for the Options class.
794
795 This metaclass is actually necessary despite the availability of the
796 ``__init_subclass__()`` hook as this type also provides custom class-level
797 behavior for the ``__add__()`` method.
798
799 """
800
801 _cache_attrs: Tuple[str, ...]
802
803 def __add__(self, other):
804 o1 = self()
805
806 if set(other).difference(self._cache_attrs):
807 raise TypeError(
808 "dictionary contains attributes not covered by "
809 "Options class %s: %r"
810 % (self, set(other).difference(self._cache_attrs))
811 )
812
813 o1.__dict__.update(other)
814 return o1
815
816 if TYPE_CHECKING:
817
818 def __getattr__(self, key: str) -> Any: ...
819
820 def __setattr__(self, key: str, value: Any) -> None: ...
821
822 def __delattr__(self, key: str) -> None: ...
823
824
825class Options(metaclass=_MetaOptions):
826 """A cacheable option dictionary with defaults."""
827
828 __slots__ = ()
829
830 _cache_attrs: Tuple[str, ...]
831
832 def __init_subclass__(cls) -> None:
833 dict_ = cls.__dict__
834 cls._cache_attrs = tuple(
835 sorted(
836 d
837 for d in dict_
838 if not d.startswith("__")
839 and d not in ("_cache_key_traversal",)
840 )
841 )
842 super().__init_subclass__()
843
844 def __init__(self, **kw: Any) -> None:
845 self.__dict__.update(kw)
846
847 def __add__(self, other):
848 o1 = self.__class__.__new__(self.__class__)
849 o1.__dict__.update(self.__dict__)
850
851 if set(other).difference(self._cache_attrs):
852 raise TypeError(
853 "dictionary contains attributes not covered by "
854 "Options class %s: %r"
855 % (self, set(other).difference(self._cache_attrs))
856 )
857
858 o1.__dict__.update(other)
859 return o1
860
861 def __eq__(self, other):
862 # TODO: very inefficient. This is used only in test suites
863 # right now.
864 for a, b in zip_longest(self._cache_attrs, other._cache_attrs):
865 if getattr(self, a) != getattr(other, b):
866 return False
867 return True
868
869 def __repr__(self) -> str:
870 # TODO: fairly inefficient, used only in debugging right now.
871
872 return "%s(%s)" % (
873 self.__class__.__name__,
874 ", ".join(
875 "%s=%r" % (k, self.__dict__[k])
876 for k in self._cache_attrs
877 if k in self.__dict__
878 ),
879 )
880
881 @classmethod
882 def isinstance(cls, klass: Type[Any]) -> bool:
883 return issubclass(cls, klass)
884
885 @hybridmethod
886 def add_to_element(self, name: str, value: str) -> Any:
887 return self + {name: getattr(self, name) + value}
888
889 @hybridmethod
890 def _state_dict_inst(self) -> Mapping[str, Any]:
891 return self.__dict__
892
893 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT
894
895 @_state_dict_inst.classlevel
896 def _state_dict(cls) -> Mapping[str, Any]:
897 return cls._state_dict_const
898
899 @classmethod
900 def safe_merge(cls, other: "Options") -> Any:
901 d = other._state_dict()
902
903 # only support a merge with another object of our class
904 # and which does not have attrs that we don't. otherwise
905 # we risk having state that might not be part of our cache
906 # key strategy
907
908 if (
909 cls is not other.__class__
910 and other._cache_attrs
911 and set(other._cache_attrs).difference(cls._cache_attrs)
912 ):
913 raise TypeError(
914 "other element %r is not empty, is not of type %s, "
915 "and contains attributes not covered here %r"
916 % (
917 other,
918 cls,
919 set(other._cache_attrs).difference(cls._cache_attrs),
920 )
921 )
922 return cls + d
923
924 @classmethod
925 def from_execution_options(
926 cls,
927 key: str,
928 attrs: set[str],
929 exec_options: Mapping[str, Any],
930 statement_exec_options: Mapping[str, Any],
931 ) -> Tuple["Options", Mapping[str, Any]]:
932 """process Options argument in terms of execution options.
933
934
935 e.g.::
936
937 (
938 load_options,
939 execution_options,
940 ) = QueryContext.default_load_options.from_execution_options(
941 "_sa_orm_load_options",
942 {"populate_existing", "autoflush", "yield_per"},
943 execution_options,
944 statement._execution_options,
945 )
946
947 get back the Options and refresh "_sa_orm_load_options" in the
948 exec options dict w/ the Options as well
949
950 """
951
952 # common case is that no options we are looking for are
953 # in either dictionary, so cancel for that first
954 check_argnames = attrs.intersection(
955 set(exec_options).union(statement_exec_options)
956 )
957
958 existing_options = exec_options.get(key, cls)
959
960 if check_argnames:
961 result = {}
962 for argname in check_argnames:
963 local = "_" + argname
964 if argname in exec_options:
965 result[local] = exec_options[argname]
966 elif argname in statement_exec_options:
967 result[local] = statement_exec_options[argname]
968
969 new_options = existing_options + result
970 exec_options = util.immutabledict(exec_options).merge_with(
971 {key: new_options}
972 )
973 return new_options, exec_options
974
975 else:
976 return existing_options, exec_options
977
978 if TYPE_CHECKING:
979
980 def __getattr__(self, key: str) -> Any: ...
981
982 def __setattr__(self, key: str, value: Any) -> None: ...
983
984 def __delattr__(self, key: str) -> None: ...
985
986
987class CacheableOptions(Options, HasCacheKey):
988 __slots__ = ()
989
990 @hybridmethod
991 def _gen_cache_key_inst(
992 self, anon_map: Any, bindparams: List[BindParameter[Any]]
993 ) -> Optional[Tuple[Any]]:
994 return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
995
996 @_gen_cache_key_inst.classlevel
997 def _gen_cache_key(
998 cls, anon_map: "anon_map", bindparams: List[BindParameter[Any]]
999 ) -> Tuple[CacheableOptions, Any]:
1000 return (cls, ())
1001
1002 @hybridmethod
1003 def _generate_cache_key(self) -> Optional[CacheKey]:
1004 return HasCacheKey._generate_cache_key_for_object(self)
1005
1006
1007class ExecutableOption(HasCopyInternals):
1008 __slots__ = ()
1009
1010 _annotations: _ImmutableExecuteOptions = util.EMPTY_DICT
1011
1012 __visit_name__: str = "executable_option"
1013
1014 _is_has_cache_key: bool = False
1015
1016 _is_core: bool = True
1017
1018 def _clone(self, **kw):
1019 """Create a shallow copy of this ExecutableOption."""
1020 c = self.__class__.__new__(self.__class__)
1021 c.__dict__ = dict(self.__dict__) # type: ignore
1022 return c
1023
1024
1025class Executable(roles.StatementRole):
1026 """Mark a :class:`_expression.ClauseElement` as supporting execution.
1027
1028 :class:`.Executable` is a superclass for all "statement" types
1029 of objects, including :func:`select`, :func:`delete`, :func:`update`,
1030 :func:`insert`, :func:`text`.
1031
1032 """
1033
1034 supports_execution: bool = True
1035 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT
1036 _is_default_generator: bool = False
1037 _with_options: Tuple[ExecutableOption, ...] = ()
1038 _with_context_options: Tuple[
1039 Tuple[Callable[[CompileState], None], Any], ...
1040 ] = ()
1041 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]]
1042
1043 _executable_traverse_internals = [
1044 ("_with_options", InternalTraversal.dp_executable_options),
1045 (
1046 "_with_context_options",
1047 ExtendedInternalTraversal.dp_with_context_options,
1048 ),
1049 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
1050 ]
1051
1052 is_select: bool = False
1053 is_from_statement: bool = False
1054 is_update: bool = False
1055 is_insert: bool = False
1056 is_text: bool = False
1057 is_delete: bool = False
1058 is_dml: bool = False
1059
1060 if TYPE_CHECKING:
1061 __visit_name__: str
1062
1063 def _compile_w_cache(
1064 self,
1065 dialect: Dialect,
1066 *,
1067 compiled_cache: Optional[CompiledCacheType],
1068 column_keys: List[str],
1069 for_executemany: bool = False,
1070 schema_translate_map: Optional[SchemaTranslateMapType] = None,
1071 **kw: Any,
1072 ) -> Tuple[
1073 Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats
1074 ]: ...
1075
1076 def _execute_on_connection(
1077 self,
1078 connection: Connection,
1079 distilled_params: _CoreMultiExecuteParams,
1080 execution_options: CoreExecuteOptionsParameter,
1081 ) -> CursorResult[Any]: ...
1082
1083 def _execute_on_scalar(
1084 self,
1085 connection: Connection,
1086 distilled_params: _CoreMultiExecuteParams,
1087 execution_options: CoreExecuteOptionsParameter,
1088 ) -> Any: ...
1089
1090 @util.ro_non_memoized_property
1091 def _all_selected_columns(self) -> _SelectIterable:
1092 raise NotImplementedError()
1093
1094 @property
1095 def _effective_plugin_target(self) -> str:
1096 return self.__visit_name__
1097
1098 @_generative
1099 def options(self, *options: ExecutableOption) -> Self:
1100 """Apply options to this statement.
1101
1102 In the general sense, options are any kind of Python object
1103 that can be interpreted by the SQL compiler for the statement.
1104 These options can be consumed by specific dialects or specific kinds
1105 of compilers.
1106
1107 The most commonly known kind of option are the ORM level options
1108 that apply "eager load" and other loading behaviors to an ORM
1109 query. However, options can theoretically be used for many other
1110 purposes.
1111
1112 For background on specific kinds of options for specific kinds of
1113 statements, refer to the documentation for those option objects.
1114
1115 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to
1116 Core statement objects towards the goal of allowing unified
1117 Core / ORM querying capabilities.
1118
1119 .. seealso::
1120
1121 :ref:`loading_columns` - refers to options specific to the usage
1122 of ORM queries
1123
1124 :ref:`relationship_loader_options` - refers to options specific
1125 to the usage of ORM queries
1126
1127 """
1128 self._with_options += tuple(
1129 coercions.expect(roles.ExecutableOptionRole, opt)
1130 for opt in options
1131 )
1132 return self
1133
1134 @_generative
1135 def _set_compile_options(self, compile_options: CacheableOptions) -> Self:
1136 """Assign the compile options to a new value.
1137
1138 :param compile_options: appropriate CacheableOptions structure
1139
1140 """
1141
1142 self._compile_options = compile_options
1143 return self
1144
1145 @_generative
1146 def _update_compile_options(self, options: CacheableOptions) -> Self:
1147 """update the _compile_options with new keys."""
1148
1149 assert self._compile_options is not None
1150 self._compile_options += options
1151 return self
1152
1153 @_generative
1154 def _add_context_option(
1155 self,
1156 callable_: Callable[[CompileState], None],
1157 cache_args: Any,
1158 ) -> Self:
1159 """Add a context option to this statement.
1160
1161 These are callable functions that will
1162 be given the CompileState object upon compilation.
1163
1164 A second argument cache_args is required, which will be combined with
1165 the ``__code__`` identity of the function itself in order to produce a
1166 cache key.
1167
1168 """
1169 self._with_context_options += ((callable_, cache_args),)
1170 return self
1171
1172 @overload
1173 def execution_options(
1174 self,
1175 *,
1176 compiled_cache: Optional[CompiledCacheType] = ...,
1177 logging_token: str = ...,
1178 isolation_level: IsolationLevel = ...,
1179 no_parameters: bool = False,
1180 stream_results: bool = False,
1181 max_row_buffer: int = ...,
1182 yield_per: int = ...,
1183 insertmanyvalues_page_size: int = ...,
1184 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
1185 populate_existing: bool = False,
1186 autoflush: bool = False,
1187 synchronize_session: SynchronizeSessionArgument = ...,
1188 dml_strategy: DMLStrategyArgument = ...,
1189 render_nulls: bool = ...,
1190 is_delete_using: bool = ...,
1191 is_update_from: bool = ...,
1192 preserve_rowcount: bool = False,
1193 **opt: Any,
1194 ) -> Self: ...
1195
1196 @overload
1197 def execution_options(self, **opt: Any) -> Self: ...
1198
1199 @_generative
1200 def execution_options(self, **kw: Any) -> Self:
1201 """Set non-SQL options for the statement which take effect during
1202 execution.
1203
1204 Execution options can be set at many scopes, including per-statement,
1205 per-connection, or per execution, using methods such as
1206 :meth:`_engine.Connection.execution_options` and parameters which
1207 accept a dictionary of options such as
1208 :paramref:`_engine.Connection.execute.execution_options` and
1209 :paramref:`_orm.Session.execute.execution_options`.
1210
1211 The primary characteristic of an execution option, as opposed to
1212 other kinds of options such as ORM loader options, is that
1213 **execution options never affect the compiled SQL of a query, only
1214 things that affect how the SQL statement itself is invoked or how
1215 results are fetched**. That is, execution options are not part of
1216 what's accommodated by SQL compilation nor are they considered part of
1217 the cached state of a statement.
1218
1219 The :meth:`_sql.Executable.execution_options` method is
1220 :term:`generative`, as
1221 is the case for the method as applied to the :class:`_engine.Engine`
1222 and :class:`_orm.Query` objects, which means when the method is called,
1223 a copy of the object is returned, which applies the given parameters to
1224 that new copy, but leaves the original unchanged::
1225
1226 statement = select(table.c.x, table.c.y)
1227 new_statement = statement.execution_options(my_option=True)
1228
1229 An exception to this behavior is the :class:`_engine.Connection`
1230 object, where the :meth:`_engine.Connection.execution_options` method
1231 is explicitly **not** generative.
1232
1233 The kinds of options that may be passed to
1234 :meth:`_sql.Executable.execution_options` and other related methods and
1235 parameter dictionaries include parameters that are explicitly consumed
1236 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not
1237 defined by SQLAlchemy, which means the methods and/or parameter
1238 dictionaries may be used for user-defined parameters that interact with
1239 custom code, which may access the parameters using methods such as
1240 :meth:`_sql.Executable.get_execution_options` and
1241 :meth:`_engine.Connection.get_execution_options`, or within selected
1242 event hooks using a dedicated ``execution_options`` event parameter
1243 such as
1244 :paramref:`_events.ConnectionEvents.before_execute.execution_options`
1245 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.::
1246
1247 from sqlalchemy import event
1248
1249
1250 @event.listens_for(some_engine, "before_execute")
1251 def _process_opt(conn, statement, multiparams, params, execution_options):
1252 "run a SQL function before invoking a statement"
1253
1254 if execution_options.get("do_special_thing", False):
1255 conn.exec_driver_sql("run_special_function()")
1256
1257 Within the scope of options that are explicitly recognized by
1258 SQLAlchemy, most apply to specific classes of objects and not others.
1259 The most common execution options include:
1260
1261 * :paramref:`_engine.Connection.execution_options.isolation_level` -
1262 sets the isolation level for a connection or a class of connections
1263 via an :class:`_engine.Engine`. This option is accepted only
1264 by :class:`_engine.Connection` or :class:`_engine.Engine`.
1265
1266 * :paramref:`_engine.Connection.execution_options.stream_results` -
1267 indicates results should be fetched using a server side cursor;
1268 this option is accepted by :class:`_engine.Connection`, by the
1269 :paramref:`_engine.Connection.execute.execution_options` parameter
1270 on :meth:`_engine.Connection.execute`, and additionally by
1271 :meth:`_sql.Executable.execution_options` on a SQL statement object,
1272 as well as by ORM constructs like :meth:`_orm.Session.execute`.
1273
1274 * :paramref:`_engine.Connection.execution_options.compiled_cache` -
1275 indicates a dictionary that will serve as the
1276 :ref:`SQL compilation cache <sql_caching>`
1277 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as
1278 well as for ORM methods like :meth:`_orm.Session.execute`.
1279 Can be passed as ``None`` to disable caching for statements.
1280 This option is not accepted by
1281 :meth:`_sql.Executable.execution_options` as it is inadvisable to
1282 carry along a compilation cache within a statement object.
1283
1284 * :paramref:`_engine.Connection.execution_options.schema_translate_map`
1285 - a mapping of schema names used by the
1286 :ref:`Schema Translate Map <schema_translating>` feature, accepted
1287 by :class:`_engine.Connection`, :class:`_engine.Engine`,
1288 :class:`_sql.Executable`, as well as by ORM constructs
1289 like :meth:`_orm.Session.execute`.
1290
1291 .. seealso::
1292
1293 :meth:`_engine.Connection.execution_options`
1294
1295 :paramref:`_engine.Connection.execute.execution_options`
1296
1297 :paramref:`_orm.Session.execute.execution_options`
1298
1299 :ref:`orm_queryguide_execution_options` - documentation on all
1300 ORM-specific execution options
1301
1302 """ # noqa: E501
1303 if "isolation_level" in kw:
1304 raise exc.ArgumentError(
1305 "'isolation_level' execution option may only be specified "
1306 "on Connection.execution_options(), or "
1307 "per-engine using the isolation_level "
1308 "argument to create_engine()."
1309 )
1310 if "compiled_cache" in kw:
1311 raise exc.ArgumentError(
1312 "'compiled_cache' execution option may only be specified "
1313 "on Connection.execution_options(), not per statement."
1314 )
1315 self._execution_options = self._execution_options.union(kw)
1316 return self
1317
1318 def get_execution_options(self) -> _ExecuteOptions:
1319 """Get the non-SQL options which will take effect during execution.
1320
1321 .. versionadded:: 1.3
1322
1323 .. seealso::
1324
1325 :meth:`.Executable.execution_options`
1326 """
1327 return self._execution_options
1328
1329
1330class SchemaEventTarget(event.EventTarget):
1331 """Base class for elements that are the targets of :class:`.DDLEvents`
1332 events.
1333
1334 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
1335
1336 """
1337
1338 dispatch: dispatcher[SchemaEventTarget]
1339
1340 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
1341 """Associate with this SchemaEvent's parent object."""
1342
1343 def _set_parent_with_dispatch(
1344 self, parent: SchemaEventTarget, **kw: Any
1345 ) -> None:
1346 self.dispatch.before_parent_attach(self, parent)
1347 self._set_parent(parent, **kw)
1348 self.dispatch.after_parent_attach(self, parent)
1349
1350
1351class SchemaVisitable(SchemaEventTarget, visitors.Visitable):
1352 """Base class for elements that are targets of a :class:`.SchemaVisitor`.
1353
1354 .. versionadded:: 2.0.41
1355
1356 """
1357
1358
1359class SchemaVisitor(ClauseVisitor):
1360 """Define the visiting for ``SchemaItem`` and more
1361 generally ``SchemaVisitable`` objects.
1362
1363 """
1364
1365 __traverse_options__: Dict[str, Any] = {"schema_visitor": True}
1366
1367
1368class _SentinelDefaultCharacterization(Enum):
1369 NONE = "none"
1370 UNKNOWN = "unknown"
1371 CLIENTSIDE = "clientside"
1372 SENTINEL_DEFAULT = "sentinel_default"
1373 SERVERSIDE = "serverside"
1374 IDENTITY = "identity"
1375 SEQUENCE = "sequence"
1376
1377
1378class _SentinelColumnCharacterization(NamedTuple):
1379 columns: Optional[Sequence[Column[Any]]] = None
1380 is_explicit: bool = False
1381 is_autoinc: bool = False
1382 default_characterization: _SentinelDefaultCharacterization = (
1383 _SentinelDefaultCharacterization.NONE
1384 )
1385
1386
1387_COLKEY = TypeVar("_COLKEY", Union[None, str], str)
1388
1389_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True)
1390_COL = TypeVar("_COL", bound="ColumnElement[Any]")
1391
1392
1393class _ColumnMetrics(Generic[_COL_co]):
1394 __slots__ = ("column",)
1395
1396 column: _COL_co
1397
1398 def __init__(
1399 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co
1400 ) -> None:
1401 self.column = col
1402
1403 # proxy_index being non-empty means it was initialized.
1404 # so we need to update it
1405 pi = collection._proxy_index
1406 if pi:
1407 for eps_col in col._expanded_proxy_set:
1408 pi[eps_col].add(self)
1409
1410 def get_expanded_proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
1411 return self.column._expanded_proxy_set
1412
1413 def dispose(self, collection: ColumnCollection[_COLKEY, _COL_co]) -> None:
1414 pi = collection._proxy_index
1415 if not pi:
1416 return
1417 for col in self.column._expanded_proxy_set:
1418 colset = pi.get(col, None)
1419 if colset:
1420 colset.discard(self)
1421 if colset is not None and not colset:
1422 del pi[col]
1423
1424 def embedded(
1425 self,
1426 target_set: Union[
1427 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]]
1428 ],
1429 ) -> bool:
1430 expanded_proxy_set = self.column._expanded_proxy_set
1431 for t in target_set.difference(expanded_proxy_set):
1432 if not expanded_proxy_set.intersection(_expand_cloned([t])):
1433 return False
1434 return True
1435
1436
1437class ColumnCollection(Generic[_COLKEY, _COL_co]):
1438 """Collection of :class:`_expression.ColumnElement` instances,
1439 typically for
1440 :class:`_sql.FromClause` objects.
1441
1442 The :class:`_sql.ColumnCollection` object is most commonly available
1443 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection
1444 on the :class:`_schema.Table` object, introduced at
1445 :ref:`metadata_tables_and_columns`.
1446
1447 The :class:`_expression.ColumnCollection` has both mapping- and sequence-
1448 like behaviors. A :class:`_expression.ColumnCollection` usually stores
1449 :class:`_schema.Column` objects, which are then accessible both via mapping
1450 style access as well as attribute access style.
1451
1452 To access :class:`_schema.Column` objects using ordinary attribute-style
1453 access, specify the name like any other object attribute, such as below
1454 a column named ``employee_name`` is accessed::
1455
1456 >>> employee_table.c.employee_name
1457
1458 To access columns that have names with special characters or spaces,
1459 index-style access is used, such as below which illustrates a column named
1460 ``employee ' payment`` is accessed::
1461
1462 >>> employee_table.c["employee ' payment"]
1463
1464 As the :class:`_sql.ColumnCollection` object provides a Python dictionary
1465 interface, common dictionary method names like
1466 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`,
1467 and :meth:`_sql.ColumnCollection.items` are available, which means that
1468 database columns that are keyed under these names also need to use indexed
1469 access::
1470
1471 >>> employee_table.c["values"]
1472
1473
1474 The name for which a :class:`_schema.Column` would be present is normally
1475 that of the :paramref:`_schema.Column.key` parameter. In some contexts,
1476 such as a :class:`_sql.Select` object that uses a label style set
1477 using the :meth:`_sql.Select.set_label_style` method, a column of a certain
1478 key may instead be represented under a particular label name such
1479 as ``tablename_columnname``::
1480
1481 >>> from sqlalchemy import select, column, table
1482 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
1483 >>> t = table("t", column("c"))
1484 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
1485 >>> subq = stmt.subquery()
1486 >>> subq.c.t_c
1487 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c>
1488
1489 :class:`.ColumnCollection` also indexes the columns in order and allows
1490 them to be accessible by their integer position::
1491
1492 >>> cc[0]
1493 Column('x', Integer(), table=None)
1494 >>> cc[1]
1495 Column('y', Integer(), table=None)
1496
1497 .. versionadded:: 1.4 :class:`_expression.ColumnCollection`
1498 allows integer-based
1499 index access to the collection.
1500
1501 Iterating the collection yields the column expressions in order::
1502
1503 >>> list(cc)
1504 [Column('x', Integer(), table=None),
1505 Column('y', Integer(), table=None)]
1506
1507 The base :class:`_expression.ColumnCollection` object can store
1508 duplicates, which can
1509 mean either two columns with the same key, in which case the column
1510 returned by key access is **arbitrary**::
1511
1512 >>> x1, x2 = Column("x", Integer), Column("x", Integer)
1513 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)])
1514 >>> list(cc)
1515 [Column('x', Integer(), table=None),
1516 Column('x', Integer(), table=None)]
1517 >>> cc["x"] is x1
1518 False
1519 >>> cc["x"] is x2
1520 True
1521
1522 Or it can also mean the same column multiple times. These cases are
1523 supported as :class:`_expression.ColumnCollection`
1524 is used to represent the columns in
1525 a SELECT statement which may include duplicates.
1526
1527 A special subclass :class:`.DedupeColumnCollection` exists which instead
1528 maintains SQLAlchemy's older behavior of not allowing duplicates; this
1529 collection is used for schema level objects like :class:`_schema.Table`
1530 and
1531 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The
1532 :class:`.DedupeColumnCollection` class also has additional mutation methods
1533 as the schema constructs have more use cases that require removal and
1534 replacement of columns.
1535
1536 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection`
1537 now stores duplicate
1538 column keys as well as the same column in multiple positions. The
1539 :class:`.DedupeColumnCollection` class is added to maintain the
1540 former behavior in those cases where deduplication as well as
1541 additional replace/remove operations are needed.
1542
1543
1544 """
1545
1546 __slots__ = ("_collection", "_index", "_colset", "_proxy_index")
1547
1548 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]]
1549 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
1550 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]]
1551 _colset: Set[_COL_co]
1552
1553 def __init__(
1554 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None
1555 ):
1556 object.__setattr__(self, "_colset", set())
1557 object.__setattr__(self, "_index", {})
1558 object.__setattr__(
1559 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1560 )
1561 object.__setattr__(self, "_collection", [])
1562 if columns:
1563 self._initial_populate(columns)
1564
1565 @util.preload_module("sqlalchemy.sql.elements")
1566 def __clause_element__(self) -> ClauseList:
1567 elements = util.preloaded.sql_elements
1568
1569 return elements.ClauseList(
1570 _literal_as_text_role=roles.ColumnsClauseRole,
1571 group=False,
1572 *self._all_columns,
1573 )
1574
1575 def _initial_populate(
1576 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1577 ) -> None:
1578 self._populate_separate_keys(iter_)
1579
1580 @property
1581 def _all_columns(self) -> List[_COL_co]:
1582 return [col for (_, col, _) in self._collection]
1583
1584 def keys(self) -> List[_COLKEY]:
1585 """Return a sequence of string key names for all columns in this
1586 collection."""
1587 return [k for (k, _, _) in self._collection]
1588
1589 def values(self) -> List[_COL_co]:
1590 """Return a sequence of :class:`_sql.ColumnClause` or
1591 :class:`_schema.Column` objects for all columns in this
1592 collection."""
1593 return [col for (_, col, _) in self._collection]
1594
1595 def items(self) -> List[Tuple[_COLKEY, _COL_co]]:
1596 """Return a sequence of (key, column) tuples for all columns in this
1597 collection each consisting of a string key name and a
1598 :class:`_sql.ColumnClause` or
1599 :class:`_schema.Column` object.
1600 """
1601
1602 return [(k, col) for (k, col, _) in self._collection]
1603
1604 def __bool__(self) -> bool:
1605 return bool(self._collection)
1606
1607 def __len__(self) -> int:
1608 return len(self._collection)
1609
1610 def __iter__(self) -> Iterator[_COL_co]:
1611 # turn to a list first to maintain over a course of changes
1612 return iter([col for _, col, _ in self._collection])
1613
1614 @overload
1615 def __getitem__(self, key: Union[str, int]) -> _COL_co: ...
1616
1617 @overload
1618 def __getitem__(
1619 self, key: Tuple[Union[str, int], ...]
1620 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1621
1622 @overload
1623 def __getitem__(
1624 self, key: slice
1625 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1626
1627 def __getitem__(
1628 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]]
1629 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]:
1630 try:
1631 if isinstance(key, (tuple, slice)):
1632 if isinstance(key, slice):
1633 cols = (
1634 (sub_key, col)
1635 for (sub_key, col, _) in self._collection[key]
1636 )
1637 else:
1638 cols = (self._index[sub_key] for sub_key in key)
1639
1640 return ColumnCollection(cols).as_readonly()
1641 else:
1642 return self._index[key][1]
1643 except KeyError as err:
1644 if isinstance(err.args[0], int):
1645 raise IndexError(err.args[0]) from err
1646 else:
1647 raise
1648
1649 def __getattr__(self, key: str) -> _COL_co:
1650 try:
1651 return self._index[key][1]
1652 except KeyError as err:
1653 raise AttributeError(key) from err
1654
1655 def __contains__(self, key: str) -> bool:
1656 if key not in self._index:
1657 if not isinstance(key, str):
1658 raise exc.ArgumentError(
1659 "__contains__ requires a string argument"
1660 )
1661 return False
1662 else:
1663 return True
1664
1665 def compare(self, other: ColumnCollection[_COLKEY, _COL_co]) -> bool:
1666 """Compare this :class:`_expression.ColumnCollection` to another
1667 based on the names of the keys"""
1668
1669 for l, r in zip_longest(self, other):
1670 if l is not r:
1671 return False
1672 else:
1673 return True
1674
1675 def __eq__(self, other: Any) -> bool:
1676 return self.compare(other)
1677
1678 @overload
1679 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ...
1680
1681 @overload
1682 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ...
1683
1684 def get(
1685 self, key: str, default: Optional[_COL] = None
1686 ) -> Optional[Union[_COL_co, _COL]]:
1687 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object
1688 based on a string key name from this
1689 :class:`_expression.ColumnCollection`."""
1690
1691 if key in self._index:
1692 return self._index[key][1]
1693 else:
1694 return default
1695
1696 def __str__(self) -> str:
1697 return "%s(%s)" % (
1698 self.__class__.__name__,
1699 ", ".join(str(c) for c in self),
1700 )
1701
1702 def __setitem__(self, key: str, value: Any) -> NoReturn:
1703 raise NotImplementedError()
1704
1705 def __delitem__(self, key: str) -> NoReturn:
1706 raise NotImplementedError()
1707
1708 def __setattr__(self, key: str, obj: Any) -> NoReturn:
1709 raise NotImplementedError()
1710
1711 def clear(self) -> NoReturn:
1712 """Dictionary clear() is not implemented for
1713 :class:`_sql.ColumnCollection`."""
1714 raise NotImplementedError()
1715
1716 def remove(self, column: Any) -> NoReturn:
1717 raise NotImplementedError()
1718
1719 def update(self, iter_: Any) -> NoReturn:
1720 """Dictionary update() is not implemented for
1721 :class:`_sql.ColumnCollection`."""
1722 raise NotImplementedError()
1723
1724 # https://github.com/python/mypy/issues/4266
1725 __hash__: Optional[int] = None # type: ignore
1726
1727 def _populate_separate_keys(
1728 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1729 ) -> None:
1730 """populate from an iterator of (key, column)"""
1731
1732 self._collection[:] = collection = [
1733 (k, c, _ColumnMetrics(self, c)) for k, c in iter_
1734 ]
1735 self._colset.update(c._deannotate() for _, c, _ in collection)
1736 self._index.update(
1737 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)}
1738 )
1739 self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
1740
1741 def add(
1742 self, column: ColumnElement[Any], key: Optional[_COLKEY] = None
1743 ) -> None:
1744 """Add a column to this :class:`_sql.ColumnCollection`.
1745
1746 .. note::
1747
1748 This method is **not normally used by user-facing code**, as the
1749 :class:`_sql.ColumnCollection` is usually part of an existing
1750 object such as a :class:`_schema.Table`. To add a
1751 :class:`_schema.Column` to an existing :class:`_schema.Table`
1752 object, use the :meth:`_schema.Table.append_column` method.
1753
1754 """
1755 colkey: _COLKEY
1756
1757 if key is None:
1758 colkey = column.key # type: ignore
1759 else:
1760 colkey = key
1761
1762 l = len(self._collection)
1763
1764 # don't really know how this part is supposed to work w/ the
1765 # covariant thing
1766
1767 _column = cast(_COL_co, column)
1768
1769 self._collection.append(
1770 (colkey, _column, _ColumnMetrics(self, _column))
1771 )
1772 self._colset.add(_column._deannotate())
1773 self._index[l] = (colkey, _column)
1774 if colkey not in self._index:
1775 self._index[colkey] = (colkey, _column)
1776
1777 def __getstate__(self) -> Dict[str, Any]:
1778 return {
1779 "_collection": [(k, c) for k, c, _ in self._collection],
1780 "_index": self._index,
1781 }
1782
1783 def __setstate__(self, state: Dict[str, Any]) -> None:
1784 object.__setattr__(self, "_index", state["_index"])
1785 object.__setattr__(
1786 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1787 )
1788 object.__setattr__(
1789 self,
1790 "_collection",
1791 [
1792 (k, c, _ColumnMetrics(self, c))
1793 for (k, c) in state["_collection"]
1794 ],
1795 )
1796 object.__setattr__(
1797 self, "_colset", {col for k, col, _ in self._collection}
1798 )
1799
1800 def contains_column(self, col: ColumnElement[Any]) -> bool:
1801 """Checks if a column object exists in this collection"""
1802 if col not in self._colset:
1803 if isinstance(col, str):
1804 raise exc.ArgumentError(
1805 "contains_column cannot be used with string arguments. "
1806 "Use ``col_name in table.c`` instead."
1807 )
1808 return False
1809 else:
1810 return True
1811
1812 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
1813 """Return a "read only" form of this
1814 :class:`_sql.ColumnCollection`."""
1815
1816 return ReadOnlyColumnCollection(self)
1817
1818 def _init_proxy_index(self) -> None:
1819 """populate the "proxy index", if empty.
1820
1821 proxy index is added in 2.0 to provide more efficient operation
1822 for the corresponding_column() method.
1823
1824 For reasons of both time to construct new .c collections as well as
1825 memory conservation for large numbers of large .c collections, the
1826 proxy_index is only filled if corresponding_column() is called. once
1827 filled it stays that way, and new _ColumnMetrics objects created after
1828 that point will populate it with new data. Note this case would be
1829 unusual, if not nonexistent, as it means a .c collection is being
1830 mutated after corresponding_column() were used, however it is tested in
1831 test/base/test_utils.py.
1832
1833 """
1834 pi = self._proxy_index
1835 if pi:
1836 return
1837
1838 for _, _, metrics in self._collection:
1839 eps = metrics.column._expanded_proxy_set
1840
1841 for eps_col in eps:
1842 pi[eps_col].add(metrics)
1843
1844 def corresponding_column(
1845 self, column: _COL, require_embedded: bool = False
1846 ) -> Optional[Union[_COL, _COL_co]]:
1847 """Given a :class:`_expression.ColumnElement`, return the exported
1848 :class:`_expression.ColumnElement` object from this
1849 :class:`_expression.ColumnCollection`
1850 which corresponds to that original :class:`_expression.ColumnElement`
1851 via a common
1852 ancestor column.
1853
1854 :param column: the target :class:`_expression.ColumnElement`
1855 to be matched.
1856
1857 :param require_embedded: only return corresponding columns for
1858 the given :class:`_expression.ColumnElement`, if the given
1859 :class:`_expression.ColumnElement`
1860 is actually present within a sub-element
1861 of this :class:`_expression.Selectable`.
1862 Normally the column will match if
1863 it merely shares a common ancestor with one of the exported
1864 columns of this :class:`_expression.Selectable`.
1865
1866 .. seealso::
1867
1868 :meth:`_expression.Selectable.corresponding_column`
1869 - invokes this method
1870 against the collection returned by
1871 :attr:`_expression.Selectable.exported_columns`.
1872
1873 .. versionchanged:: 1.4 the implementation for ``corresponding_column``
1874 was moved onto the :class:`_expression.ColumnCollection` itself.
1875
1876 """
1877 # TODO: cython candidate
1878
1879 # don't dig around if the column is locally present
1880 if column in self._colset:
1881 return column
1882
1883 selected_intersection, selected_metrics = None, None
1884 target_set = column.proxy_set
1885
1886 pi = self._proxy_index
1887 if not pi:
1888 self._init_proxy_index()
1889
1890 for current_metrics in (
1891 mm for ts in target_set if ts in pi for mm in pi[ts]
1892 ):
1893 if not require_embedded or current_metrics.embedded(target_set):
1894 if selected_metrics is None:
1895 # no corresponding column yet, pick this one.
1896 selected_metrics = current_metrics
1897 continue
1898
1899 current_intersection = target_set.intersection(
1900 current_metrics.column._expanded_proxy_set
1901 )
1902 if selected_intersection is None:
1903 selected_intersection = target_set.intersection(
1904 selected_metrics.column._expanded_proxy_set
1905 )
1906
1907 if len(current_intersection) > len(selected_intersection):
1908 # 'current' has a larger field of correspondence than
1909 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x
1910 # matches a1.c.x->table.c.x better than
1911 # selectable.c.x->table.c.x does.
1912
1913 selected_metrics = current_metrics
1914 selected_intersection = current_intersection
1915 elif current_intersection == selected_intersection:
1916 # they have the same field of correspondence. see
1917 # which proxy_set has fewer columns in it, which
1918 # indicates a closer relationship with the root
1919 # column. Also take into account the "weight"
1920 # attribute which CompoundSelect() uses to give
1921 # higher precedence to columns based on vertical
1922 # position in the compound statement, and discard
1923 # columns that have no reference to the target
1924 # column (also occurs with CompoundSelect)
1925
1926 selected_col_distance = sum(
1927 [
1928 sc._annotations.get("weight", 1)
1929 for sc in (
1930 selected_metrics.column._uncached_proxy_list()
1931 )
1932 if sc.shares_lineage(column)
1933 ],
1934 )
1935 current_col_distance = sum(
1936 [
1937 sc._annotations.get("weight", 1)
1938 for sc in (
1939 current_metrics.column._uncached_proxy_list()
1940 )
1941 if sc.shares_lineage(column)
1942 ],
1943 )
1944 if current_col_distance < selected_col_distance:
1945 selected_metrics = current_metrics
1946 selected_intersection = current_intersection
1947
1948 return selected_metrics.column if selected_metrics else None
1949
1950
1951_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]")
1952
1953
1954class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
1955 """A :class:`_expression.ColumnCollection`
1956 that maintains deduplicating behavior.
1957
1958 This is useful by schema level objects such as :class:`_schema.Table` and
1959 :class:`.PrimaryKeyConstraint`. The collection includes more
1960 sophisticated mutator methods as well to suit schema objects which
1961 require mutable column collections.
1962
1963 .. versionadded:: 1.4
1964
1965 """
1966
1967 def add( # type: ignore[override]
1968 self, column: _NAMEDCOL, key: Optional[str] = None
1969 ) -> None:
1970 if key is not None and column.key != key:
1971 raise exc.ArgumentError(
1972 "DedupeColumnCollection requires columns be under "
1973 "the same key as their .key"
1974 )
1975 key = column.key
1976
1977 if key is None:
1978 raise exc.ArgumentError(
1979 "Can't add unnamed column to column collection"
1980 )
1981
1982 if key in self._index:
1983 existing = self._index[key][1]
1984
1985 if existing is column:
1986 return
1987
1988 self.replace(column)
1989
1990 # pop out memoized proxy_set as this
1991 # operation may very well be occurring
1992 # in a _make_proxy operation
1993 util.memoized_property.reset(column, "proxy_set")
1994 else:
1995 self._append_new_column(key, column)
1996
1997 def _append_new_column(self, key: str, named_column: _NAMEDCOL) -> None:
1998 l = len(self._collection)
1999 self._collection.append(
2000 (key, named_column, _ColumnMetrics(self, named_column))
2001 )
2002 self._colset.add(named_column._deannotate())
2003 self._index[l] = (key, named_column)
2004 self._index[key] = (key, named_column)
2005
2006 def _populate_separate_keys(
2007 self, iter_: Iterable[Tuple[str, _NAMEDCOL]]
2008 ) -> None:
2009 """populate from an iterator of (key, column)"""
2010 cols = list(iter_)
2011
2012 replace_col = []
2013 for k, col in cols:
2014 if col.key != k:
2015 raise exc.ArgumentError(
2016 "DedupeColumnCollection requires columns be under "
2017 "the same key as their .key"
2018 )
2019 if col.name in self._index and col.key != col.name:
2020 replace_col.append(col)
2021 elif col.key in self._index:
2022 replace_col.append(col)
2023 else:
2024 self._index[k] = (k, col)
2025 self._collection.append((k, col, _ColumnMetrics(self, col)))
2026 self._colset.update(c._deannotate() for (k, c, _) in self._collection)
2027
2028 self._index.update(
2029 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection)
2030 )
2031 for col in replace_col:
2032 self.replace(col)
2033
2034 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None:
2035 self._populate_separate_keys((col.key, col) for col in iter_)
2036
2037 def remove(self, column: _NAMEDCOL) -> None: # type: ignore[override]
2038 if column not in self._colset:
2039 raise ValueError(
2040 "Can't remove column %r; column is not in this collection"
2041 % column
2042 )
2043 del self._index[column.key]
2044 self._colset.remove(column)
2045 self._collection[:] = [
2046 (k, c, metrics)
2047 for (k, c, metrics) in self._collection
2048 if c is not column
2049 ]
2050 for metrics in self._proxy_index.get(column, ()):
2051 metrics.dispose(self)
2052
2053 self._index.update(
2054 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2055 )
2056 # delete higher index
2057 del self._index[len(self._collection)]
2058
2059 def replace(
2060 self,
2061 column: _NAMEDCOL,
2062 extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
2063 ) -> None:
2064 """add the given column to this collection, removing unaliased
2065 versions of this column as well as existing columns with the
2066 same key.
2067
2068 e.g.::
2069
2070 t = Table("sometable", metadata, Column("col1", Integer))
2071 t.columns.replace(Column("col1", Integer, key="columnone"))
2072
2073 will remove the original 'col1' from the collection, and add
2074 the new column under the name 'columnname'.
2075
2076 Used by schema.Column to override columns during table reflection.
2077
2078 """
2079
2080 if extra_remove:
2081 remove_col = set(extra_remove)
2082 else:
2083 remove_col = set()
2084 # remove up to two columns based on matches of name as well as key
2085 if column.name in self._index and column.key != column.name:
2086 other = self._index[column.name][1]
2087 if other.name == other.key:
2088 remove_col.add(other)
2089
2090 if column.key in self._index:
2091 remove_col.add(self._index[column.key][1])
2092
2093 if not remove_col:
2094 self._append_new_column(column.key, column)
2095 return
2096 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
2097 replaced = False
2098 for k, col, metrics in self._collection:
2099 if col in remove_col:
2100 if not replaced:
2101 replaced = True
2102 new_cols.append(
2103 (column.key, column, _ColumnMetrics(self, column))
2104 )
2105 else:
2106 new_cols.append((k, col, metrics))
2107
2108 if remove_col:
2109 self._colset.difference_update(remove_col)
2110
2111 for rc in remove_col:
2112 for metrics in self._proxy_index.get(rc, ()):
2113 metrics.dispose(self)
2114
2115 if not replaced:
2116 new_cols.append((column.key, column, _ColumnMetrics(self, column)))
2117
2118 self._colset.add(column._deannotate())
2119 self._collection[:] = new_cols
2120
2121 self._index.clear()
2122
2123 self._index.update(
2124 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2125 )
2126 self._index.update({k: (k, col) for (k, col, _) in self._collection})
2127
2128
2129class ReadOnlyColumnCollection(
2130 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co]
2131):
2132 __slots__ = ("_parent",)
2133
2134 def __init__(self, collection: ColumnCollection[_COLKEY, _COL_co]):
2135 object.__setattr__(self, "_parent", collection)
2136 object.__setattr__(self, "_colset", collection._colset)
2137 object.__setattr__(self, "_index", collection._index)
2138 object.__setattr__(self, "_collection", collection._collection)
2139 object.__setattr__(self, "_proxy_index", collection._proxy_index)
2140
2141 def __getstate__(self) -> Dict[str, _COL_co]:
2142 return {"_parent": self._parent}
2143
2144 def __setstate__(self, state: Dict[str, Any]) -> None:
2145 parent = state["_parent"]
2146 self.__init__(parent) # type: ignore
2147
2148 def add(self, column: Any, key: Any = ...) -> Any:
2149 self._readonly()
2150
2151 def extend(self, elements: Any) -> NoReturn:
2152 self._readonly()
2153
2154 def remove(self, item: Any) -> NoReturn:
2155 self._readonly()
2156
2157
2158class ColumnSet(util.OrderedSet["ColumnClause[Any]"]):
2159 def contains_column(self, col: ColumnClause[Any]) -> bool:
2160 return col in self
2161
2162 def extend(self, cols: Iterable[Any]) -> None:
2163 for col in cols:
2164 self.add(col)
2165
2166 def __eq__(self, other):
2167 l = []
2168 for c in other:
2169 for local in self:
2170 if c.shares_lineage(local):
2171 l.append(c == local)
2172 return elements.and_(*l)
2173
2174 def __hash__(self) -> int: # type: ignore[override]
2175 return hash(tuple(x for x in self))
2176
2177
2178def _entity_namespace(
2179 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2180) -> _EntityNamespace:
2181 """Return the nearest .entity_namespace for the given entity.
2182
2183 If not immediately available, does an iterate to find a sub-element
2184 that has one, if any.
2185
2186 """
2187 try:
2188 return cast(_HasEntityNamespace, entity).entity_namespace
2189 except AttributeError:
2190 for elem in visitors.iterate(cast(ExternallyTraversible, entity)):
2191 if _is_has_entity_namespace(elem):
2192 return elem.entity_namespace
2193 else:
2194 raise
2195
2196
2197def _entity_namespace_key(
2198 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2199 key: str,
2200 default: Union[SQLCoreOperations[Any], _NoArg] = NO_ARG,
2201) -> SQLCoreOperations[Any]:
2202 """Return an entry from an entity_namespace.
2203
2204
2205 Raises :class:`_exc.InvalidRequestError` rather than attribute error
2206 on not found.
2207
2208 """
2209
2210 try:
2211 ns = _entity_namespace(entity)
2212 if default is not NO_ARG:
2213 return getattr(ns, key, default)
2214 else:
2215 return getattr(ns, key) # type: ignore
2216 except AttributeError as err:
2217 raise exc.InvalidRequestError(
2218 'Entity namespace for "%s" has no property "%s"' % (entity, key)
2219 ) from err