Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/base.py: 56%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# sql/base.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
9"""Foundational utilities common to many sql modules."""
12from __future__ import annotations
14import collections
15from enum import Enum
16import itertools
17from itertools import zip_longest
18import operator
19import re
20from typing import Any
21from typing import Callable
22from typing import cast
23from typing import Dict
24from typing import FrozenSet
25from typing import Generator
26from typing import Generic
27from typing import Iterable
28from typing import Iterator
29from typing import List
30from typing import Mapping
31from typing import MutableMapping
32from typing import NamedTuple
33from typing import NoReturn
34from typing import Optional
35from typing import overload
36from typing import Sequence
37from typing import Set
38from typing import Tuple
39from typing import Type
40from typing import TYPE_CHECKING
41from typing import TypeVar
42from typing import Union
44from . import roles
45from . import visitors
46from .cache_key import HasCacheKey # noqa
47from .cache_key import MemoizedHasCacheKey # noqa
48from .traversals import HasCopyInternals # noqa
49from .visitors import ClauseVisitor
50from .visitors import ExtendedInternalTraversal
51from .visitors import ExternallyTraversible
52from .visitors import InternalTraversal
53from .. import event
54from .. import exc
55from .. import util
56from ..util import HasMemoized as HasMemoized
57from ..util import hybridmethod
58from ..util import typing as compat_typing
59from ..util import warn_deprecated
60from ..util.typing import Final
61from ..util.typing import Protocol
62from ..util.typing import Self
63from ..util.typing import TypeGuard
65if TYPE_CHECKING:
66 from . import coercions
67 from . import elements
68 from . import type_api
69 from ._orm_types import DMLStrategyArgument
70 from ._orm_types import SynchronizeSessionArgument
71 from ._typing import _CLE
72 from .cache_key import CacheKey
73 from .compiler import SQLCompiler
74 from .elements import BindParameter
75 from .elements import ClauseList
76 from .elements import ColumnClause # noqa
77 from .elements import ColumnElement
78 from .elements import NamedColumn
79 from .elements import SQLCoreOperations
80 from .elements import TextClause
81 from .schema import Column
82 from .schema import DefaultGenerator
83 from .selectable import _JoinTargetElement
84 from .selectable import _SelectIterable
85 from .selectable import FromClause
86 from .visitors import anon_map
87 from ..engine import Connection
88 from ..engine import CursorResult
89 from ..engine.interfaces import _CoreMultiExecuteParams
90 from ..engine.interfaces import _ExecuteOptions
91 from ..engine.interfaces import _ImmutableExecuteOptions
92 from ..engine.interfaces import CacheStats
93 from ..engine.interfaces import Compiled
94 from ..engine.interfaces import CompiledCacheType
95 from ..engine.interfaces import CoreExecuteOptionsParameter
96 from ..engine.interfaces import Dialect
97 from ..engine.interfaces import IsolationLevel
98 from ..engine.interfaces import SchemaTranslateMapType
99 from ..event import dispatcher
101if not TYPE_CHECKING:
102 coercions = None # noqa
103 elements = None # noqa
104 type_api = None # noqa
107class _NoArg(Enum):
108 NO_ARG = 0
110 def __repr__(self):
111 return f"_NoArg.{self.name}"
114NO_ARG: Final = _NoArg.NO_ARG
117class _NoneName(Enum):
118 NONE_NAME = 0
119 """indicate a 'deferred' name that was ultimately the value None."""
122_NONE_NAME: Final = _NoneName.NONE_NAME
124_T = TypeVar("_T", bound=Any)
126_Fn = TypeVar("_Fn", bound=Callable[..., Any])
128_AmbiguousTableNameMap = MutableMapping[str, str]
131class _DefaultDescriptionTuple(NamedTuple):
132 arg: Any
133 is_scalar: Optional[bool]
134 is_callable: Optional[bool]
135 is_sentinel: Optional[bool]
137 @classmethod
138 def _from_column_default(
139 cls, default: Optional[DefaultGenerator]
140 ) -> _DefaultDescriptionTuple:
141 return (
142 _DefaultDescriptionTuple(
143 default.arg, # type: ignore
144 default.is_scalar,
145 default.is_callable,
146 default.is_sentinel,
147 )
148 if default
149 and (
150 default.has_arg
151 or (not default.for_update and default.is_sentinel)
152 )
153 else _DefaultDescriptionTuple(None, None, None, None)
154 )
157_never_select_column: operator.attrgetter[Any] = operator.attrgetter(
158 "_omit_from_statements"
159)
162class _EntityNamespace(Protocol):
163 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ...
166class _HasEntityNamespace(Protocol):
167 @util.ro_non_memoized_property
168 def entity_namespace(self) -> _EntityNamespace: ...
171def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]:
172 return hasattr(element, "entity_namespace")
175# Remove when https://github.com/python/mypy/issues/14640 will be fixed
176_Self = TypeVar("_Self", bound=Any)
179class Immutable:
180 """mark a ClauseElement as 'immutable' when expressions are cloned.
182 "immutable" objects refers to the "mutability" of an object in the
183 context of SQL DQL and DML generation. Such as, in DQL, one can
184 compose a SELECT or subquery of varied forms, but one cannot modify
185 the structure of a specific table or column within DQL.
186 :class:`.Immutable` is mostly intended to follow this concept, and as
187 such the primary "immutable" objects are :class:`.ColumnClause`,
188 :class:`.Column`, :class:`.TableClause`, :class:`.Table`.
190 """
192 __slots__ = ()
194 _is_immutable: bool = True
196 def unique_params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
197 raise NotImplementedError("Immutable objects do not support copying")
199 def params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
200 raise NotImplementedError("Immutable objects do not support copying")
202 def _clone(self: _Self, **kw: Any) -> _Self:
203 return self
205 def _copy_internals(
206 self, *, omit_attrs: Iterable[str] = (), **kw: Any
207 ) -> None:
208 pass
211class SingletonConstant(Immutable):
212 """Represent SQL constants like NULL, TRUE, FALSE"""
214 _is_singleton_constant: bool = True
216 _singleton: SingletonConstant
218 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T:
219 return cast(_T, cls._singleton)
221 @util.non_memoized_property
222 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
223 raise NotImplementedError()
225 @classmethod
226 def _create_singleton(cls) -> None:
227 obj = object.__new__(cls)
228 obj.__init__() # type: ignore
230 # for a long time this was an empty frozenset, meaning
231 # a SingletonConstant would never be a "corresponding column" in
232 # a statement. This referred to #6259. However, in #7154 we see
233 # that we do in fact need "correspondence" to work when matching cols
234 # in result sets, so the non-correspondence was moved to a more
235 # specific level when we are actually adapting expressions for SQL
236 # render only.
237 obj.proxy_set = frozenset([obj])
238 cls._singleton = obj
241def _from_objects(
242 *elements: Union[
243 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement
244 ]
245) -> Iterator[FromClause]:
246 return itertools.chain.from_iterable(
247 [element._from_objects for element in elements]
248 )
251def _select_iterables(
252 elements: Iterable[roles.ColumnsClauseRole],
253) -> _SelectIterable:
254 """expand tables into individual columns in the
255 given list of column expressions.
257 """
258 return itertools.chain.from_iterable(
259 [c._select_iterable for c in elements]
260 )
263_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType")
266class _GenerativeType(compat_typing.Protocol):
267 def _generate(self) -> Self: ...
270def _generative(fn: _Fn) -> _Fn:
271 """non-caching _generative() decorator.
273 This is basically the legacy decorator that copies the object and
274 runs a method on the new copy.
276 """
278 @util.decorator
279 def _generative(
280 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any
281 ) -> _SelfGenerativeType:
282 """Mark a method as generative."""
284 self = self._generate()
285 x = fn(self, *args, **kw)
286 assert x is self, "generative methods must return self"
287 return self
289 decorated = _generative(fn)
290 decorated.non_generative = fn # type: ignore
291 return decorated
294def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]:
295 msgs: Dict[str, str] = kw.pop("msgs", {})
297 defaults: Dict[str, str] = kw.pop("defaults", {})
299 getters: List[Tuple[str, operator.attrgetter[Any], Optional[str]]] = [
300 (name, operator.attrgetter(name), defaults.get(name, None))
301 for name in names
302 ]
304 @util.decorator
305 def check(fn: _Fn, *args: Any, **kw: Any) -> Any:
306 # make pylance happy by not including "self" in the argument
307 # list
308 self = args[0]
309 args = args[1:]
310 for name, getter, default_ in getters:
311 if getter(self) is not default_:
312 msg = msgs.get(
313 name,
314 "Method %s() has already been invoked on this %s construct"
315 % (fn.__name__, self.__class__),
316 )
317 raise exc.InvalidRequestError(msg)
318 return fn(self, *args, **kw)
320 return check
323def _clone(element, **kw):
324 return element._clone(**kw)
327def _expand_cloned(
328 elements: Iterable[_CLE],
329) -> Iterable[_CLE]:
330 """expand the given set of ClauseElements to be the set of all 'cloned'
331 predecessors.
333 """
334 # TODO: cython candidate
335 return itertools.chain(*[x._cloned_set for x in elements])
338def _de_clone(
339 elements: Iterable[_CLE],
340) -> Iterable[_CLE]:
341 for x in elements:
342 while x._is_clone_of is not None:
343 x = x._is_clone_of
344 yield x
347def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
348 """return the intersection of sets a and b, counting
349 any overlap between 'cloned' predecessors.
351 The returned set is in terms of the entities present within 'a'.
353 """
354 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
355 _expand_cloned(b)
356 )
357 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)}
360def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
361 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
362 _expand_cloned(b)
363 )
364 return {
365 elem for elem in a if not all_overlap.intersection(elem._cloned_set)
366 }
369class _DialectArgView(MutableMapping[str, Any]):
370 """A dictionary view of dialect-level arguments in the form
371 <dialectname>_<argument_name>.
373 """
375 __slots__ = ("obj",)
377 def __init__(self, obj: DialectKWArgs) -> None:
378 self.obj = obj
380 def _key(self, key: str) -> Tuple[str, str]:
381 try:
382 dialect, value_key = key.split("_", 1)
383 except ValueError as err:
384 raise KeyError(key) from err
385 else:
386 return dialect, value_key
388 def __getitem__(self, key: str) -> Any:
389 dialect, value_key = self._key(key)
391 try:
392 opt = self.obj.dialect_options[dialect]
393 except exc.NoSuchModuleError as err:
394 raise KeyError(key) from err
395 else:
396 return opt[value_key]
398 def __setitem__(self, key: str, value: Any) -> None:
399 try:
400 dialect, value_key = self._key(key)
401 except KeyError as err:
402 raise exc.ArgumentError(
403 "Keys must be of the form <dialectname>_<argname>"
404 ) from err
405 else:
406 self.obj.dialect_options[dialect][value_key] = value
408 def __delitem__(self, key: str) -> None:
409 dialect, value_key = self._key(key)
410 del self.obj.dialect_options[dialect][value_key]
412 def __len__(self) -> int:
413 return sum(
414 len(args._non_defaults)
415 for args in self.obj.dialect_options.values()
416 )
418 def __iter__(self) -> Generator[str, None, None]:
419 return (
420 "%s_%s" % (dialect_name, value_name)
421 for dialect_name in self.obj.dialect_options
422 for value_name in self.obj.dialect_options[
423 dialect_name
424 ]._non_defaults
425 )
428class _DialectArgDict(MutableMapping[str, Any]):
429 """A dictionary view of dialect-level arguments for a specific
430 dialect.
432 Maintains a separate collection of user-specified arguments
433 and dialect-specified default arguments.
435 """
437 def __init__(self) -> None:
438 self._non_defaults: Dict[str, Any] = {}
439 self._defaults: Dict[str, Any] = {}
441 def __len__(self) -> int:
442 return len(set(self._non_defaults).union(self._defaults))
444 def __iter__(self) -> Iterator[str]:
445 return iter(set(self._non_defaults).union(self._defaults))
447 def __getitem__(self, key: str) -> Any:
448 if key in self._non_defaults:
449 return self._non_defaults[key]
450 else:
451 return self._defaults[key]
453 def __setitem__(self, key: str, value: Any) -> None:
454 self._non_defaults[key] = value
456 def __delitem__(self, key: str) -> None:
457 del self._non_defaults[key]
460@util.preload_module("sqlalchemy.dialects")
461def _kw_reg_for_dialect(dialect_name: str) -> Optional[Dict[Any, Any]]:
462 dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
463 if dialect_cls.construct_arguments is None:
464 return None
465 return dict(dialect_cls.construct_arguments)
468class DialectKWArgs:
469 """Establish the ability for a class to have dialect-specific arguments
470 with defaults and constructor validation.
472 The :class:`.DialectKWArgs` interacts with the
473 :attr:`.DefaultDialect.construct_arguments` present on a dialect.
475 .. seealso::
477 :attr:`.DefaultDialect.construct_arguments`
479 """
481 __slots__ = ()
483 _dialect_kwargs_traverse_internals: List[Tuple[str, Any]] = [
484 ("dialect_options", InternalTraversal.dp_dialect_options)
485 ]
487 def get_dialect_option(
488 self,
489 dialect: Dialect,
490 argument_name: str,
491 *,
492 else_: Any = None,
493 deprecated_fallback: Optional[str] = None,
494 ) -> Any:
495 r"""Return the value of a dialect-specific option, or *else_* if
496 this dialect does not register the given argument.
498 This is useful for DDL compilers that may be inherited by
499 third-party dialects whose ``construct_arguments`` do not
500 include the same set of keys as the parent dialect.
502 :param dialect: The dialect for which to retrieve the option.
503 :param argument_name: The name of the argument to retrieve.
504 :param else\_: The value to return if the argument is not present.
505 :param deprecated_fallback: Optional dialect name to fall back to
506 if the argument is not present for the current dialect. If the
507 argument is present for the fallback dialect but not the current
508 dialect, a deprecation warning will be emitted.
510 """
512 registry = DialectKWArgs._kw_registry[dialect.name]
513 if registry is None:
514 return else_
516 if argument_name in registry.get(self.__class__, {}):
517 if (
518 deprecated_fallback is None
519 or dialect.name == deprecated_fallback
520 ):
521 return self.dialect_options[dialect.name][argument_name]
523 # deprecated_fallback is present; need to look in two places
525 # Current dialect has this option registered.
526 # Check if user explicitly set it.
527 if (
528 dialect.name in self.dialect_options
529 and argument_name
530 in self.dialect_options[dialect.name]._non_defaults
531 ):
532 # User explicitly set this dialect's option - use it
533 return self.dialect_options[dialect.name][argument_name]
535 # User didn't set current dialect's option.
536 # Check for deprecated fallback.
537 elif (
538 deprecated_fallback in self.dialect_options
539 and argument_name
540 in self.dialect_options[deprecated_fallback]._non_defaults
541 ):
542 # User set fallback option but not current dialect's option
543 warn_deprecated(
544 f"Using '{deprecated_fallback}_{argument_name}' "
545 f"with the '{dialect.name}' dialect is deprecated; "
546 f"please additionally specify "
547 f"'{dialect.name}_{argument_name}'.",
548 version="2.1",
549 )
550 return self.dialect_options[deprecated_fallback][argument_name]
552 # Return default value
553 return self.dialect_options[dialect.name][argument_name]
554 else:
555 # Current dialect doesn't have the option registered at all.
556 # Don't warn - if a third-party dialect doesn't support an
557 # option, that's their choice, not a deprecation case.
558 return else_
560 @classmethod
561 def argument_for(
562 cls, dialect_name: str, argument_name: str, default: Any
563 ) -> None:
564 """Add a new kind of dialect-specific keyword argument for this class.
566 E.g.::
568 Index.argument_for("mydialect", "length", None)
570 some_index = Index("a", "b", mydialect_length=5)
572 The :meth:`.DialectKWArgs.argument_for` method is a per-argument
573 way adding extra arguments to the
574 :attr:`.DefaultDialect.construct_arguments` dictionary. This
575 dictionary provides a list of argument names accepted by various
576 schema-level constructs on behalf of a dialect.
578 New dialects should typically specify this dictionary all at once as a
579 data member of the dialect class. The use case for ad-hoc addition of
580 argument names is typically for end-user code that is also using
581 a custom compilation scheme which consumes the additional arguments.
583 :param dialect_name: name of a dialect. The dialect must be
584 locatable, else a :class:`.NoSuchModuleError` is raised. The
585 dialect must also include an existing
586 :attr:`.DefaultDialect.construct_arguments` collection, indicating
587 that it participates in the keyword-argument validation and default
588 system, else :class:`.ArgumentError` is raised. If the dialect does
589 not include this collection, then any keyword argument can be
590 specified on behalf of this dialect already. All dialects packaged
591 within SQLAlchemy include this collection, however for third party
592 dialects, support may vary.
594 :param argument_name: name of the parameter.
596 :param default: default value of the parameter.
598 """
600 construct_arg_dictionary: Optional[Dict[Any, Any]] = (
601 DialectKWArgs._kw_registry[dialect_name]
602 )
603 if construct_arg_dictionary is None:
604 raise exc.ArgumentError(
605 "Dialect '%s' does have keyword-argument "
606 "validation and defaults enabled configured" % dialect_name
607 )
608 if cls not in construct_arg_dictionary:
609 construct_arg_dictionary[cls] = {}
610 construct_arg_dictionary[cls][argument_name] = default
612 @property
613 def dialect_kwargs(self) -> _DialectArgView:
614 """A collection of keyword arguments specified as dialect-specific
615 options to this construct.
617 The arguments are present here in their original ``<dialect>_<kwarg>``
618 format. Only arguments that were actually passed are included;
619 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which
620 contains all options known by this dialect including defaults.
622 The collection is also writable; keys are accepted of the
623 form ``<dialect>_<kwarg>`` where the value will be assembled
624 into the list of options.
626 .. seealso::
628 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form
630 """
631 return _DialectArgView(self)
633 @property
634 def kwargs(self) -> _DialectArgView:
635 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
636 return self.dialect_kwargs
638 _kw_registry: util.PopulateDict[str, Optional[Dict[Any, Any]]] = (
639 util.PopulateDict(_kw_reg_for_dialect)
640 )
642 @classmethod
643 def _kw_reg_for_dialect_cls(cls, dialect_name: str) -> _DialectArgDict:
644 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
645 d = _DialectArgDict()
647 if construct_arg_dictionary is None:
648 d._defaults.update({"*": None})
649 else:
650 for cls in reversed(cls.__mro__):
651 if cls in construct_arg_dictionary:
652 d._defaults.update(construct_arg_dictionary[cls])
653 return d
655 @util.memoized_property
656 def dialect_options(self) -> util.PopulateDict[str, _DialectArgDict]:
657 """A collection of keyword arguments specified as dialect-specific
658 options to this construct.
660 This is a two-level nested registry, keyed to ``<dialect_name>``
661 and ``<argument_name>``. For example, the ``postgresql_where``
662 argument would be locatable as::
664 arg = my_object.dialect_options["postgresql"]["where"]
666 .. versionadded:: 0.9.2
668 .. seealso::
670 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form
672 """
674 return util.PopulateDict(self._kw_reg_for_dialect_cls)
676 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None:
677 # validate remaining kwargs that they all specify DB prefixes
679 if not kwargs:
680 return
682 for k in kwargs:
683 m = re.match("^(.+?)_(.+)$", k)
684 if not m:
685 raise TypeError(
686 "Additional arguments should be "
687 "named <dialectname>_<argument>, got '%s'" % k
688 )
689 dialect_name, arg_name = m.group(1, 2)
691 try:
692 construct_arg_dictionary = self.dialect_options[dialect_name]
693 except exc.NoSuchModuleError:
694 util.warn(
695 "Can't validate argument %r; can't "
696 "locate any SQLAlchemy dialect named %r"
697 % (k, dialect_name)
698 )
699 self.dialect_options[dialect_name] = d = _DialectArgDict()
700 d._defaults.update({"*": None})
701 d._non_defaults[arg_name] = kwargs[k]
702 else:
703 if (
704 "*" not in construct_arg_dictionary
705 and arg_name not in construct_arg_dictionary
706 ):
707 raise exc.ArgumentError(
708 "Argument %r is not accepted by "
709 "dialect %r on behalf of %r"
710 % (k, dialect_name, self.__class__)
711 )
712 else:
713 construct_arg_dictionary[arg_name] = kwargs[k]
716class CompileState:
717 """Produces additional object state necessary for a statement to be
718 compiled.
720 the :class:`.CompileState` class is at the base of classes that assemble
721 state for a particular statement object that is then used by the
722 compiler. This process is essentially an extension of the process that
723 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis
724 on converting raw user intent into more organized structures rather than
725 producing string output. The top-level :class:`.CompileState` for the
726 statement being executed is also accessible when the execution context
727 works with invoking the statement and collecting results.
729 The production of :class:`.CompileState` is specific to the compiler, such
730 as within the :meth:`.SQLCompiler.visit_insert`,
731 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also
732 responsible for associating the :class:`.CompileState` with the
733 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement,
734 i.e. the outermost SQL statement that's actually being executed.
735 There can be other :class:`.CompileState` objects that are not the
736 toplevel, such as when a SELECT subquery or CTE-nested
737 INSERT/UPDATE/DELETE is generated.
739 .. versionadded:: 1.4
741 """
743 __slots__ = ("statement", "_ambiguous_table_name_map")
745 plugins: Dict[Tuple[str, str], Type[CompileState]] = {}
747 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap]
749 @classmethod
750 def create_for_statement(
751 cls, statement: Executable, compiler: SQLCompiler, **kw: Any
752 ) -> CompileState:
753 # factory construction.
755 if statement._propagate_attrs:
756 plugin_name = statement._propagate_attrs.get(
757 "compile_state_plugin", "default"
758 )
759 klass = cls.plugins.get(
760 (plugin_name, statement._effective_plugin_target), None
761 )
762 if klass is None:
763 klass = cls.plugins[
764 ("default", statement._effective_plugin_target)
765 ]
767 else:
768 klass = cls.plugins[
769 ("default", statement._effective_plugin_target)
770 ]
772 if klass is cls:
773 return cls(statement, compiler, **kw)
774 else:
775 return klass.create_for_statement(statement, compiler, **kw)
777 def __init__(self, statement, compiler, **kw):
778 self.statement = statement
780 @classmethod
781 def get_plugin_class(
782 cls, statement: Executable
783 ) -> Optional[Type[CompileState]]:
784 plugin_name = statement._propagate_attrs.get(
785 "compile_state_plugin", None
786 )
788 if plugin_name:
789 key = (plugin_name, statement._effective_plugin_target)
790 if key in cls.plugins:
791 return cls.plugins[key]
793 # there's no case where we call upon get_plugin_class() and want
794 # to get None back, there should always be a default. return that
795 # if there was no plugin-specific class (e.g. Insert with "orm"
796 # plugin)
797 try:
798 return cls.plugins[("default", statement._effective_plugin_target)]
799 except KeyError:
800 return None
802 @classmethod
803 def _get_plugin_class_for_plugin(
804 cls, statement: Executable, plugin_name: str
805 ) -> Optional[Type[CompileState]]:
806 try:
807 return cls.plugins[
808 (plugin_name, statement._effective_plugin_target)
809 ]
810 except KeyError:
811 return None
813 @classmethod
814 def plugin_for(
815 cls, plugin_name: str, visit_name: str
816 ) -> Callable[[_Fn], _Fn]:
817 def decorate(cls_to_decorate):
818 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate
819 return cls_to_decorate
821 return decorate
824class Generative(HasMemoized):
825 """Provide a method-chaining pattern in conjunction with the
826 @_generative decorator."""
828 def _generate(self) -> Self:
829 skip = self._memoized_keys
830 cls = self.__class__
831 s = cls.__new__(cls)
832 if skip:
833 # ensure this iteration remains atomic
834 s.__dict__ = {
835 k: v for k, v in self.__dict__.copy().items() if k not in skip
836 }
837 else:
838 s.__dict__ = self.__dict__.copy()
839 return s
842class InPlaceGenerative(HasMemoized):
843 """Provide a method-chaining pattern in conjunction with the
844 @_generative decorator that mutates in place."""
846 __slots__ = ()
848 def _generate(self) -> Self:
849 skip = self._memoized_keys
850 # note __dict__ needs to be in __slots__ if this is used
851 for k in skip:
852 self.__dict__.pop(k, None)
853 return self
856class HasCompileState(Generative):
857 """A class that has a :class:`.CompileState` associated with it."""
859 _compile_state_plugin: Optional[Type[CompileState]] = None
861 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT
863 _compile_state_factory = CompileState.create_for_statement
866class _MetaOptions(type):
867 """metaclass for the Options class.
869 This metaclass is actually necessary despite the availability of the
870 ``__init_subclass__()`` hook as this type also provides custom class-level
871 behavior for the ``__add__()`` method.
873 """
875 _cache_attrs: Tuple[str, ...]
877 def __add__(self, other):
878 o1 = self()
880 if set(other).difference(self._cache_attrs):
881 raise TypeError(
882 "dictionary contains attributes not covered by "
883 "Options class %s: %r"
884 % (self, set(other).difference(self._cache_attrs))
885 )
887 o1.__dict__.update(other)
888 return o1
890 if TYPE_CHECKING:
892 def __getattr__(self, key: str) -> Any: ...
894 def __setattr__(self, key: str, value: Any) -> None: ...
896 def __delattr__(self, key: str) -> None: ...
899class Options(metaclass=_MetaOptions):
900 """A cacheable option dictionary with defaults."""
902 __slots__ = ()
904 _cache_attrs: Tuple[str, ...]
906 def __init_subclass__(cls) -> None:
907 dict_ = cls.__dict__
908 cls._cache_attrs = tuple(
909 sorted(
910 d
911 for d in dict_
912 if not d.startswith("__")
913 and d not in ("_cache_key_traversal",)
914 )
915 )
916 super().__init_subclass__()
918 def __init__(self, **kw: Any) -> None:
919 self.__dict__.update(kw)
921 def __add__(self, other):
922 o1 = self.__class__.__new__(self.__class__)
923 o1.__dict__.update(self.__dict__)
925 if set(other).difference(self._cache_attrs):
926 raise TypeError(
927 "dictionary contains attributes not covered by "
928 "Options class %s: %r"
929 % (self, set(other).difference(self._cache_attrs))
930 )
932 o1.__dict__.update(other)
933 return o1
935 def __eq__(self, other):
936 # TODO: very inefficient. This is used only in test suites
937 # right now.
938 for a, b in zip_longest(self._cache_attrs, other._cache_attrs):
939 if getattr(self, a) != getattr(other, b):
940 return False
941 return True
943 def __repr__(self) -> str:
944 # TODO: fairly inefficient, used only in debugging right now.
946 return "%s(%s)" % (
947 self.__class__.__name__,
948 ", ".join(
949 "%s=%r" % (k, self.__dict__[k])
950 for k in self._cache_attrs
951 if k in self.__dict__
952 ),
953 )
955 @classmethod
956 def isinstance(cls, klass: Type[Any]) -> bool:
957 return issubclass(cls, klass)
959 @hybridmethod
960 def add_to_element(self, name: str, value: str) -> Any:
961 return self + {name: getattr(self, name) + value}
963 @hybridmethod
964 def _state_dict_inst(self) -> Mapping[str, Any]:
965 return self.__dict__
967 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT
969 @_state_dict_inst.classlevel
970 def _state_dict(cls) -> Mapping[str, Any]:
971 return cls._state_dict_const
973 @classmethod
974 def safe_merge(cls, other: "Options") -> Any:
975 d = other._state_dict()
977 # only support a merge with another object of our class
978 # and which does not have attrs that we don't. otherwise
979 # we risk having state that might not be part of our cache
980 # key strategy
982 if (
983 cls is not other.__class__
984 and other._cache_attrs
985 and set(other._cache_attrs).difference(cls._cache_attrs)
986 ):
987 raise TypeError(
988 "other element %r is not empty, is not of type %s, "
989 "and contains attributes not covered here %r"
990 % (
991 other,
992 cls,
993 set(other._cache_attrs).difference(cls._cache_attrs),
994 )
995 )
996 return cls + d
998 @classmethod
999 def from_execution_options(
1000 cls,
1001 key: str,
1002 attrs: set[str],
1003 exec_options: Mapping[str, Any],
1004 statement_exec_options: Mapping[str, Any],
1005 ) -> Tuple["Options", Mapping[str, Any]]:
1006 """process Options argument in terms of execution options.
1009 e.g.::
1011 (
1012 load_options,
1013 execution_options,
1014 ) = QueryContext.default_load_options.from_execution_options(
1015 "_sa_orm_load_options",
1016 {"populate_existing", "autoflush", "yield_per"},
1017 execution_options,
1018 statement._execution_options,
1019 )
1021 get back the Options and refresh "_sa_orm_load_options" in the
1022 exec options dict w/ the Options as well
1024 """
1026 # common case is that no options we are looking for are
1027 # in either dictionary, so cancel for that first
1028 check_argnames = attrs.intersection(
1029 set(exec_options).union(statement_exec_options)
1030 )
1032 existing_options = exec_options.get(key, cls)
1034 if check_argnames:
1035 result = {}
1036 for argname in check_argnames:
1037 local = "_" + argname
1038 if argname in exec_options:
1039 result[local] = exec_options[argname]
1040 elif argname in statement_exec_options:
1041 result[local] = statement_exec_options[argname]
1043 new_options = existing_options + result
1044 exec_options = util.immutabledict(exec_options).merge_with(
1045 {key: new_options}
1046 )
1047 return new_options, exec_options
1049 else:
1050 return existing_options, exec_options
1052 if TYPE_CHECKING:
1054 def __getattr__(self, key: str) -> Any: ...
1056 def __setattr__(self, key: str, value: Any) -> None: ...
1058 def __delattr__(self, key: str) -> None: ...
1061class CacheableOptions(Options, HasCacheKey):
1062 __slots__ = ()
1064 @hybridmethod
1065 def _gen_cache_key_inst(
1066 self, anon_map: Any, bindparams: List[BindParameter[Any]]
1067 ) -> Optional[Tuple[Any]]:
1068 return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
1070 @_gen_cache_key_inst.classlevel
1071 def _gen_cache_key(
1072 cls, anon_map: "anon_map", bindparams: List[BindParameter[Any]]
1073 ) -> Tuple[CacheableOptions, Any]:
1074 return (cls, ())
1076 @hybridmethod
1077 def _generate_cache_key(self) -> Optional[CacheKey]:
1078 return HasCacheKey._generate_cache_key_for_object(self)
1081class ExecutableOption(HasCopyInternals):
1082 __slots__ = ()
1084 _annotations: _ImmutableExecuteOptions = util.EMPTY_DICT
1086 __visit_name__: str = "executable_option"
1088 _is_has_cache_key: bool = False
1090 _is_core: bool = True
1092 def _clone(self, **kw):
1093 """Create a shallow copy of this ExecutableOption."""
1094 c = self.__class__.__new__(self.__class__)
1095 c.__dict__ = dict(self.__dict__) # type: ignore
1096 return c
1099class Executable(roles.StatementRole):
1100 """Mark a :class:`_expression.ClauseElement` as supporting execution.
1102 :class:`.Executable` is a superclass for all "statement" types
1103 of objects, including :func:`select`, :func:`delete`, :func:`update`,
1104 :func:`insert`, :func:`text`.
1106 """
1108 supports_execution: bool = True
1109 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT
1110 _is_default_generator: bool = False
1111 _with_options: Tuple[ExecutableOption, ...] = ()
1112 _with_context_options: Tuple[
1113 Tuple[Callable[[CompileState], None], Any], ...
1114 ] = ()
1115 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]]
1117 _executable_traverse_internals = [
1118 ("_with_options", InternalTraversal.dp_executable_options),
1119 (
1120 "_with_context_options",
1121 ExtendedInternalTraversal.dp_with_context_options,
1122 ),
1123 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
1124 ]
1126 is_select: bool = False
1127 is_from_statement: bool = False
1128 is_update: bool = False
1129 is_insert: bool = False
1130 is_text: bool = False
1131 is_delete: bool = False
1132 is_dml: bool = False
1134 if TYPE_CHECKING:
1135 __visit_name__: str
1137 def _compile_w_cache(
1138 self,
1139 dialect: Dialect,
1140 *,
1141 compiled_cache: Optional[CompiledCacheType],
1142 column_keys: List[str],
1143 for_executemany: bool = False,
1144 schema_translate_map: Optional[SchemaTranslateMapType] = None,
1145 **kw: Any,
1146 ) -> Tuple[
1147 Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats
1148 ]: ...
1150 def _execute_on_connection(
1151 self,
1152 connection: Connection,
1153 distilled_params: _CoreMultiExecuteParams,
1154 execution_options: CoreExecuteOptionsParameter,
1155 ) -> CursorResult[Any]: ...
1157 def _execute_on_scalar(
1158 self,
1159 connection: Connection,
1160 distilled_params: _CoreMultiExecuteParams,
1161 execution_options: CoreExecuteOptionsParameter,
1162 ) -> Any: ...
1164 @util.ro_non_memoized_property
1165 def _all_selected_columns(self) -> _SelectIterable:
1166 raise NotImplementedError()
1168 @property
1169 def _effective_plugin_target(self) -> str:
1170 return self.__visit_name__
1172 @_generative
1173 def options(self, *options: ExecutableOption) -> Self:
1174 """Apply options to this statement.
1176 In the general sense, options are any kind of Python object
1177 that can be interpreted by the SQL compiler for the statement.
1178 These options can be consumed by specific dialects or specific kinds
1179 of compilers.
1181 The most commonly known kind of option are the ORM level options
1182 that apply "eager load" and other loading behaviors to an ORM
1183 query. However, options can theoretically be used for many other
1184 purposes.
1186 For background on specific kinds of options for specific kinds of
1187 statements, refer to the documentation for those option objects.
1189 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to
1190 Core statement objects towards the goal of allowing unified
1191 Core / ORM querying capabilities.
1193 .. seealso::
1195 :ref:`loading_columns` - refers to options specific to the usage
1196 of ORM queries
1198 :ref:`relationship_loader_options` - refers to options specific
1199 to the usage of ORM queries
1201 """
1202 self._with_options += tuple(
1203 coercions.expect(roles.ExecutableOptionRole, opt)
1204 for opt in options
1205 )
1206 return self
1208 @_generative
1209 def _set_compile_options(self, compile_options: CacheableOptions) -> Self:
1210 """Assign the compile options to a new value.
1212 :param compile_options: appropriate CacheableOptions structure
1214 """
1216 self._compile_options = compile_options
1217 return self
1219 @_generative
1220 def _update_compile_options(self, options: CacheableOptions) -> Self:
1221 """update the _compile_options with new keys."""
1223 assert self._compile_options is not None
1224 self._compile_options += options
1225 return self
1227 @_generative
1228 def _add_context_option(
1229 self,
1230 callable_: Callable[[CompileState], None],
1231 cache_args: Any,
1232 ) -> Self:
1233 """Add a context option to this statement.
1235 These are callable functions that will
1236 be given the CompileState object upon compilation.
1238 A second argument cache_args is required, which will be combined with
1239 the ``__code__`` identity of the function itself in order to produce a
1240 cache key.
1242 """
1243 self._with_context_options += ((callable_, cache_args),)
1244 return self
1246 @overload
1247 def execution_options(
1248 self,
1249 *,
1250 compiled_cache: Optional[CompiledCacheType] = ...,
1251 logging_token: str = ...,
1252 isolation_level: IsolationLevel = ...,
1253 no_parameters: bool = False,
1254 stream_results: bool = False,
1255 max_row_buffer: int = ...,
1256 yield_per: int = ...,
1257 insertmanyvalues_page_size: int = ...,
1258 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
1259 populate_existing: bool = False,
1260 autoflush: bool = False,
1261 synchronize_session: SynchronizeSessionArgument = ...,
1262 dml_strategy: DMLStrategyArgument = ...,
1263 render_nulls: bool = ...,
1264 is_delete_using: bool = ...,
1265 is_update_from: bool = ...,
1266 preserve_rowcount: bool = False,
1267 **opt: Any,
1268 ) -> Self: ...
1270 @overload
1271 def execution_options(self, **opt: Any) -> Self: ...
1273 @_generative
1274 def execution_options(self, **kw: Any) -> Self:
1275 """Set non-SQL options for the statement which take effect during
1276 execution.
1278 Execution options can be set at many scopes, including per-statement,
1279 per-connection, or per execution, using methods such as
1280 :meth:`_engine.Connection.execution_options` and parameters which
1281 accept a dictionary of options such as
1282 :paramref:`_engine.Connection.execute.execution_options` and
1283 :paramref:`_orm.Session.execute.execution_options`.
1285 The primary characteristic of an execution option, as opposed to
1286 other kinds of options such as ORM loader options, is that
1287 **execution options never affect the compiled SQL of a query, only
1288 things that affect how the SQL statement itself is invoked or how
1289 results are fetched**. That is, execution options are not part of
1290 what's accommodated by SQL compilation nor are they considered part of
1291 the cached state of a statement.
1293 The :meth:`_sql.Executable.execution_options` method is
1294 :term:`generative`, as
1295 is the case for the method as applied to the :class:`_engine.Engine`
1296 and :class:`_orm.Query` objects, which means when the method is called,
1297 a copy of the object is returned, which applies the given parameters to
1298 that new copy, but leaves the original unchanged::
1300 statement = select(table.c.x, table.c.y)
1301 new_statement = statement.execution_options(my_option=True)
1303 An exception to this behavior is the :class:`_engine.Connection`
1304 object, where the :meth:`_engine.Connection.execution_options` method
1305 is explicitly **not** generative.
1307 The kinds of options that may be passed to
1308 :meth:`_sql.Executable.execution_options` and other related methods and
1309 parameter dictionaries include parameters that are explicitly consumed
1310 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not
1311 defined by SQLAlchemy, which means the methods and/or parameter
1312 dictionaries may be used for user-defined parameters that interact with
1313 custom code, which may access the parameters using methods such as
1314 :meth:`_sql.Executable.get_execution_options` and
1315 :meth:`_engine.Connection.get_execution_options`, or within selected
1316 event hooks using a dedicated ``execution_options`` event parameter
1317 such as
1318 :paramref:`_events.ConnectionEvents.before_execute.execution_options`
1319 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.::
1321 from sqlalchemy import event
1324 @event.listens_for(some_engine, "before_execute")
1325 def _process_opt(conn, statement, multiparams, params, execution_options):
1326 "run a SQL function before invoking a statement"
1328 if execution_options.get("do_special_thing", False):
1329 conn.exec_driver_sql("run_special_function()")
1331 Within the scope of options that are explicitly recognized by
1332 SQLAlchemy, most apply to specific classes of objects and not others.
1333 The most common execution options include:
1335 * :paramref:`_engine.Connection.execution_options.isolation_level` -
1336 sets the isolation level for a connection or a class of connections
1337 via an :class:`_engine.Engine`. This option is accepted only
1338 by :class:`_engine.Connection` or :class:`_engine.Engine`.
1340 * :paramref:`_engine.Connection.execution_options.stream_results` -
1341 indicates results should be fetched using a server side cursor;
1342 this option is accepted by :class:`_engine.Connection`, by the
1343 :paramref:`_engine.Connection.execute.execution_options` parameter
1344 on :meth:`_engine.Connection.execute`, and additionally by
1345 :meth:`_sql.Executable.execution_options` on a SQL statement object,
1346 as well as by ORM constructs like :meth:`_orm.Session.execute`.
1348 * :paramref:`_engine.Connection.execution_options.compiled_cache` -
1349 indicates a dictionary that will serve as the
1350 :ref:`SQL compilation cache <sql_caching>`
1351 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as
1352 well as for ORM methods like :meth:`_orm.Session.execute`.
1353 Can be passed as ``None`` to disable caching for statements.
1354 This option is not accepted by
1355 :meth:`_sql.Executable.execution_options` as it is inadvisable to
1356 carry along a compilation cache within a statement object.
1358 * :paramref:`_engine.Connection.execution_options.schema_translate_map`
1359 - a mapping of schema names used by the
1360 :ref:`Schema Translate Map <schema_translating>` feature, accepted
1361 by :class:`_engine.Connection`, :class:`_engine.Engine`,
1362 :class:`_sql.Executable`, as well as by ORM constructs
1363 like :meth:`_orm.Session.execute`.
1365 .. seealso::
1367 :meth:`_engine.Connection.execution_options`
1369 :paramref:`_engine.Connection.execute.execution_options`
1371 :paramref:`_orm.Session.execute.execution_options`
1373 :ref:`orm_queryguide_execution_options` - documentation on all
1374 ORM-specific execution options
1376 """ # noqa: E501
1377 if "isolation_level" in kw:
1378 raise exc.ArgumentError(
1379 "'isolation_level' execution option may only be specified "
1380 "on Connection.execution_options(), or "
1381 "per-engine using the isolation_level "
1382 "argument to create_engine()."
1383 )
1384 if "compiled_cache" in kw:
1385 raise exc.ArgumentError(
1386 "'compiled_cache' execution option may only be specified "
1387 "on Connection.execution_options(), not per statement."
1388 )
1389 self._execution_options = self._execution_options.union(kw)
1390 return self
1392 def get_execution_options(self) -> _ExecuteOptions:
1393 """Get the non-SQL options which will take effect during execution.
1395 .. versionadded:: 1.3
1397 .. seealso::
1399 :meth:`.Executable.execution_options`
1400 """
1401 return self._execution_options
1404class SchemaEventTarget(event.EventTarget):
1405 """Base class for elements that are the targets of :class:`.DDLEvents`
1406 events.
1408 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
1410 """
1412 dispatch: dispatcher[SchemaEventTarget]
1414 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
1415 """Associate with this SchemaEvent's parent object."""
1417 def _set_parent_with_dispatch(
1418 self, parent: SchemaEventTarget, **kw: Any
1419 ) -> None:
1420 self.dispatch.before_parent_attach(self, parent)
1421 self._set_parent(parent, **kw)
1422 self.dispatch.after_parent_attach(self, parent)
1425class SchemaVisitable(SchemaEventTarget, visitors.Visitable):
1426 """Base class for elements that are targets of a :class:`.SchemaVisitor`.
1428 .. versionadded:: 2.0.41
1430 """
1433class SchemaVisitor(ClauseVisitor):
1434 """Define the visiting for ``SchemaItem`` and more
1435 generally ``SchemaVisitable`` objects.
1437 """
1439 __traverse_options__: Dict[str, Any] = {"schema_visitor": True}
1442class _SentinelDefaultCharacterization(Enum):
1443 NONE = "none"
1444 UNKNOWN = "unknown"
1445 CLIENTSIDE = "clientside"
1446 SENTINEL_DEFAULT = "sentinel_default"
1447 SERVERSIDE = "serverside"
1448 IDENTITY = "identity"
1449 SEQUENCE = "sequence"
1452class _SentinelColumnCharacterization(NamedTuple):
1453 columns: Optional[Sequence[Column[Any]]] = None
1454 is_explicit: bool = False
1455 is_autoinc: bool = False
1456 default_characterization: _SentinelDefaultCharacterization = (
1457 _SentinelDefaultCharacterization.NONE
1458 )
1461_COLKEY = TypeVar("_COLKEY", Union[None, str], str)
1463_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True)
1464_COL = TypeVar("_COL", bound="ColumnElement[Any]")
1467class _ColumnMetrics(Generic[_COL_co]):
1468 __slots__ = ("column",)
1470 column: _COL_co
1472 def __init__(
1473 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co
1474 ) -> None:
1475 self.column = col
1477 # proxy_index being non-empty means it was initialized.
1478 # so we need to update it
1479 pi = collection._proxy_index
1480 if pi:
1481 for eps_col in col._expanded_proxy_set:
1482 pi[eps_col].add(self)
1484 def get_expanded_proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
1485 return self.column._expanded_proxy_set
1487 def dispose(self, collection: ColumnCollection[_COLKEY, _COL_co]) -> None:
1488 pi = collection._proxy_index
1489 if not pi:
1490 return
1491 for col in self.column._expanded_proxy_set:
1492 colset = pi.get(col, None)
1493 if colset:
1494 colset.discard(self)
1495 if colset is not None and not colset:
1496 del pi[col]
1498 def embedded(
1499 self,
1500 target_set: Union[
1501 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]]
1502 ],
1503 ) -> bool:
1504 expanded_proxy_set = self.column._expanded_proxy_set
1505 for t in target_set.difference(expanded_proxy_set):
1506 if not expanded_proxy_set.intersection(_expand_cloned([t])):
1507 return False
1508 return True
1511class ColumnCollection(Generic[_COLKEY, _COL_co]):
1512 """Collection of :class:`_expression.ColumnElement` instances,
1513 typically for
1514 :class:`_sql.FromClause` objects.
1516 The :class:`_sql.ColumnCollection` object is most commonly available
1517 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection
1518 on the :class:`_schema.Table` object, introduced at
1519 :ref:`metadata_tables_and_columns`.
1521 The :class:`_expression.ColumnCollection` has both mapping- and sequence-
1522 like behaviors. A :class:`_expression.ColumnCollection` usually stores
1523 :class:`_schema.Column` objects, which are then accessible both via mapping
1524 style access as well as attribute access style.
1526 To access :class:`_schema.Column` objects using ordinary attribute-style
1527 access, specify the name like any other object attribute, such as below
1528 a column named ``employee_name`` is accessed::
1530 >>> employee_table.c.employee_name
1532 To access columns that have names with special characters or spaces,
1533 index-style access is used, such as below which illustrates a column named
1534 ``employee ' payment`` is accessed::
1536 >>> employee_table.c["employee ' payment"]
1538 As the :class:`_sql.ColumnCollection` object provides a Python dictionary
1539 interface, common dictionary method names like
1540 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`,
1541 and :meth:`_sql.ColumnCollection.items` are available, which means that
1542 database columns that are keyed under these names also need to use indexed
1543 access::
1545 >>> employee_table.c["values"]
1548 The name for which a :class:`_schema.Column` would be present is normally
1549 that of the :paramref:`_schema.Column.key` parameter. In some contexts,
1550 such as a :class:`_sql.Select` object that uses a label style set
1551 using the :meth:`_sql.Select.set_label_style` method, a column of a certain
1552 key may instead be represented under a particular label name such
1553 as ``tablename_columnname``::
1555 >>> from sqlalchemy import select, column, table
1556 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
1557 >>> t = table("t", column("c"))
1558 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
1559 >>> subq = stmt.subquery()
1560 >>> subq.c.t_c
1561 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c>
1563 :class:`.ColumnCollection` also indexes the columns in order and allows
1564 them to be accessible by their integer position::
1566 >>> cc[0]
1567 Column('x', Integer(), table=None)
1568 >>> cc[1]
1569 Column('y', Integer(), table=None)
1571 .. versionadded:: 1.4 :class:`_expression.ColumnCollection`
1572 allows integer-based
1573 index access to the collection.
1575 Iterating the collection yields the column expressions in order::
1577 >>> list(cc)
1578 [Column('x', Integer(), table=None),
1579 Column('y', Integer(), table=None)]
1581 The base :class:`_expression.ColumnCollection` object can store
1582 duplicates, which can
1583 mean either two columns with the same key, in which case the column
1584 returned by key access is **arbitrary**::
1586 >>> x1, x2 = Column("x", Integer), Column("x", Integer)
1587 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)])
1588 >>> list(cc)
1589 [Column('x', Integer(), table=None),
1590 Column('x', Integer(), table=None)]
1591 >>> cc["x"] is x1
1592 False
1593 >>> cc["x"] is x2
1594 True
1596 Or it can also mean the same column multiple times. These cases are
1597 supported as :class:`_expression.ColumnCollection`
1598 is used to represent the columns in
1599 a SELECT statement which may include duplicates.
1601 A special subclass :class:`.DedupeColumnCollection` exists which instead
1602 maintains SQLAlchemy's older behavior of not allowing duplicates; this
1603 collection is used for schema level objects like :class:`_schema.Table`
1604 and
1605 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The
1606 :class:`.DedupeColumnCollection` class also has additional mutation methods
1607 as the schema constructs have more use cases that require removal and
1608 replacement of columns.
1610 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection`
1611 now stores duplicate
1612 column keys as well as the same column in multiple positions. The
1613 :class:`.DedupeColumnCollection` class is added to maintain the
1614 former behavior in those cases where deduplication as well as
1615 additional replace/remove operations are needed.
1618 """
1620 __slots__ = ("_collection", "_index", "_colset", "_proxy_index")
1622 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]]
1623 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
1624 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]]
1625 _colset: Set[_COL_co]
1627 def __init__(
1628 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None
1629 ):
1630 object.__setattr__(self, "_colset", set())
1631 object.__setattr__(self, "_index", {})
1632 object.__setattr__(
1633 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1634 )
1635 object.__setattr__(self, "_collection", [])
1636 if columns:
1637 self._initial_populate(columns)
1639 @util.preload_module("sqlalchemy.sql.elements")
1640 def __clause_element__(self) -> ClauseList:
1641 elements = util.preloaded.sql_elements
1643 return elements.ClauseList(
1644 _literal_as_text_role=roles.ColumnsClauseRole,
1645 group=False,
1646 *self._all_columns,
1647 )
1649 def _initial_populate(
1650 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1651 ) -> None:
1652 self._populate_separate_keys(iter_)
1654 @property
1655 def _all_columns(self) -> List[_COL_co]:
1656 return [col for (_, col, _) in self._collection]
1658 def keys(self) -> List[_COLKEY]:
1659 """Return a sequence of string key names for all columns in this
1660 collection."""
1661 return [k for (k, _, _) in self._collection]
1663 def values(self) -> List[_COL_co]:
1664 """Return a sequence of :class:`_sql.ColumnClause` or
1665 :class:`_schema.Column` objects for all columns in this
1666 collection."""
1667 return [col for (_, col, _) in self._collection]
1669 def items(self) -> List[Tuple[_COLKEY, _COL_co]]:
1670 """Return a sequence of (key, column) tuples for all columns in this
1671 collection each consisting of a string key name and a
1672 :class:`_sql.ColumnClause` or
1673 :class:`_schema.Column` object.
1674 """
1676 return [(k, col) for (k, col, _) in self._collection]
1678 def __bool__(self) -> bool:
1679 return bool(self._collection)
1681 def __len__(self) -> int:
1682 return len(self._collection)
1684 def __iter__(self) -> Iterator[_COL_co]:
1685 # turn to a list first to maintain over a course of changes
1686 return iter([col for _, col, _ in self._collection])
1688 @overload
1689 def __getitem__(self, key: Union[str, int]) -> _COL_co: ...
1691 @overload
1692 def __getitem__(
1693 self, key: Tuple[Union[str, int], ...]
1694 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1696 @overload
1697 def __getitem__(
1698 self, key: slice
1699 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1701 def __getitem__(
1702 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]]
1703 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]:
1704 try:
1705 if isinstance(key, (tuple, slice)):
1706 if isinstance(key, slice):
1707 cols = (
1708 (sub_key, col)
1709 for (sub_key, col, _) in self._collection[key]
1710 )
1711 else:
1712 cols = (self._index[sub_key] for sub_key in key)
1714 return ColumnCollection(cols).as_readonly()
1715 else:
1716 return self._index[key][1]
1717 except KeyError as err:
1718 if isinstance(err.args[0], int):
1719 raise IndexError(err.args[0]) from err
1720 else:
1721 raise
1723 def __getattr__(self, key: str) -> _COL_co:
1724 try:
1725 return self._index[key][1]
1726 except KeyError as err:
1727 raise AttributeError(key) from err
1729 def __contains__(self, key: str) -> bool:
1730 if key not in self._index:
1731 if not isinstance(key, str):
1732 raise exc.ArgumentError(
1733 "__contains__ requires a string argument"
1734 )
1735 return False
1736 else:
1737 return True
1739 def compare(self, other: ColumnCollection[_COLKEY, _COL_co]) -> bool:
1740 """Compare this :class:`_expression.ColumnCollection` to another
1741 based on the names of the keys"""
1743 for l, r in zip_longest(self, other):
1744 if l is not r:
1745 return False
1746 else:
1747 return True
1749 def __eq__(self, other: Any) -> bool:
1750 return self.compare(other)
1752 @overload
1753 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ...
1755 @overload
1756 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ...
1758 def get(
1759 self, key: str, default: Optional[_COL] = None
1760 ) -> Optional[Union[_COL_co, _COL]]:
1761 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object
1762 based on a string key name from this
1763 :class:`_expression.ColumnCollection`."""
1765 if key in self._index:
1766 return self._index[key][1]
1767 else:
1768 return default
1770 def __str__(self) -> str:
1771 return "%s(%s)" % (
1772 self.__class__.__name__,
1773 ", ".join(str(c) for c in self),
1774 )
1776 def __setitem__(self, key: str, value: Any) -> NoReturn:
1777 raise NotImplementedError()
1779 def __delitem__(self, key: str) -> NoReturn:
1780 raise NotImplementedError()
1782 def __setattr__(self, key: str, obj: Any) -> NoReturn:
1783 raise NotImplementedError()
1785 def clear(self) -> NoReturn:
1786 """Dictionary clear() is not implemented for
1787 :class:`_sql.ColumnCollection`."""
1788 raise NotImplementedError()
1790 def remove(self, column: Any) -> NoReturn:
1791 raise NotImplementedError()
1793 def update(self, iter_: Any) -> NoReturn:
1794 """Dictionary update() is not implemented for
1795 :class:`_sql.ColumnCollection`."""
1796 raise NotImplementedError()
1798 # https://github.com/python/mypy/issues/4266
1799 __hash__: Optional[int] = None # type: ignore
1801 def _populate_separate_keys(
1802 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1803 ) -> None:
1804 """populate from an iterator of (key, column)"""
1806 self._collection[:] = collection = [
1807 (k, c, _ColumnMetrics(self, c)) for k, c in iter_
1808 ]
1809 self._colset.update(c._deannotate() for _, c, _ in collection)
1810 self._index.update(
1811 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)}
1812 )
1813 self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
1815 def add(
1816 self, column: ColumnElement[Any], key: Optional[_COLKEY] = None
1817 ) -> None:
1818 """Add a column to this :class:`_sql.ColumnCollection`.
1820 .. note::
1822 This method is **not normally used by user-facing code**, as the
1823 :class:`_sql.ColumnCollection` is usually part of an existing
1824 object such as a :class:`_schema.Table`. To add a
1825 :class:`_schema.Column` to an existing :class:`_schema.Table`
1826 object, use the :meth:`_schema.Table.append_column` method.
1828 """
1829 colkey: _COLKEY
1831 if key is None:
1832 colkey = column.key # type: ignore
1833 else:
1834 colkey = key
1836 l = len(self._collection)
1838 # don't really know how this part is supposed to work w/ the
1839 # covariant thing
1841 _column = cast(_COL_co, column)
1843 self._collection.append(
1844 (colkey, _column, _ColumnMetrics(self, _column))
1845 )
1846 self._colset.add(_column._deannotate())
1847 self._index[l] = (colkey, _column)
1848 if colkey not in self._index:
1849 self._index[colkey] = (colkey, _column)
1851 def __getstate__(self) -> Dict[str, Any]:
1852 return {
1853 "_collection": [(k, c) for k, c, _ in self._collection],
1854 "_index": self._index,
1855 }
1857 def __setstate__(self, state: Dict[str, Any]) -> None:
1858 object.__setattr__(self, "_index", state["_index"])
1859 object.__setattr__(
1860 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1861 )
1862 object.__setattr__(
1863 self,
1864 "_collection",
1865 [
1866 (k, c, _ColumnMetrics(self, c))
1867 for (k, c) in state["_collection"]
1868 ],
1869 )
1870 object.__setattr__(
1871 self, "_colset", {col for k, col, _ in self._collection}
1872 )
1874 def contains_column(self, col: ColumnElement[Any]) -> bool:
1875 """Checks if a column object exists in this collection"""
1876 if col not in self._colset:
1877 if isinstance(col, str):
1878 raise exc.ArgumentError(
1879 "contains_column cannot be used with string arguments. "
1880 "Use ``col_name in table.c`` instead."
1881 )
1882 return False
1883 else:
1884 return True
1886 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
1887 """Return a "read only" form of this
1888 :class:`_sql.ColumnCollection`."""
1890 return ReadOnlyColumnCollection(self)
1892 def _init_proxy_index(self) -> None:
1893 """populate the "proxy index", if empty.
1895 proxy index is added in 2.0 to provide more efficient operation
1896 for the corresponding_column() method.
1898 For reasons of both time to construct new .c collections as well as
1899 memory conservation for large numbers of large .c collections, the
1900 proxy_index is only filled if corresponding_column() is called. once
1901 filled it stays that way, and new _ColumnMetrics objects created after
1902 that point will populate it with new data. Note this case would be
1903 unusual, if not nonexistent, as it means a .c collection is being
1904 mutated after corresponding_column() were used, however it is tested in
1905 test/base/test_utils.py.
1907 """
1908 pi = self._proxy_index
1909 if pi:
1910 return
1912 for _, _, metrics in self._collection:
1913 eps = metrics.column._expanded_proxy_set
1915 for eps_col in eps:
1916 pi[eps_col].add(metrics)
1918 def corresponding_column(
1919 self, column: _COL, require_embedded: bool = False
1920 ) -> Optional[Union[_COL, _COL_co]]:
1921 """Given a :class:`_expression.ColumnElement`, return the exported
1922 :class:`_expression.ColumnElement` object from this
1923 :class:`_expression.ColumnCollection`
1924 which corresponds to that original :class:`_expression.ColumnElement`
1925 via a common
1926 ancestor column.
1928 :param column: the target :class:`_expression.ColumnElement`
1929 to be matched.
1931 :param require_embedded: only return corresponding columns for
1932 the given :class:`_expression.ColumnElement`, if the given
1933 :class:`_expression.ColumnElement`
1934 is actually present within a sub-element
1935 of this :class:`_expression.Selectable`.
1936 Normally the column will match if
1937 it merely shares a common ancestor with one of the exported
1938 columns of this :class:`_expression.Selectable`.
1940 .. seealso::
1942 :meth:`_expression.Selectable.corresponding_column`
1943 - invokes this method
1944 against the collection returned by
1945 :attr:`_expression.Selectable.exported_columns`.
1947 .. versionchanged:: 1.4 the implementation for ``corresponding_column``
1948 was moved onto the :class:`_expression.ColumnCollection` itself.
1950 """
1951 # TODO: cython candidate
1953 # don't dig around if the column is locally present
1954 if column in self._colset:
1955 return column
1957 selected_intersection, selected_metrics = None, None
1958 target_set = column.proxy_set
1960 pi = self._proxy_index
1961 if not pi:
1962 self._init_proxy_index()
1964 for current_metrics in (
1965 mm for ts in target_set if ts in pi for mm in pi[ts]
1966 ):
1967 if not require_embedded or current_metrics.embedded(target_set):
1968 if selected_metrics is None:
1969 # no corresponding column yet, pick this one.
1970 selected_metrics = current_metrics
1971 continue
1973 current_intersection = target_set.intersection(
1974 current_metrics.column._expanded_proxy_set
1975 )
1976 if selected_intersection is None:
1977 selected_intersection = target_set.intersection(
1978 selected_metrics.column._expanded_proxy_set
1979 )
1981 if len(current_intersection) > len(selected_intersection):
1982 # 'current' has a larger field of correspondence than
1983 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x
1984 # matches a1.c.x->table.c.x better than
1985 # selectable.c.x->table.c.x does.
1987 selected_metrics = current_metrics
1988 selected_intersection = current_intersection
1989 elif current_intersection == selected_intersection:
1990 # they have the same field of correspondence. see
1991 # which proxy_set has fewer columns in it, which
1992 # indicates a closer relationship with the root
1993 # column. Also take into account the "weight"
1994 # attribute which CompoundSelect() uses to give
1995 # higher precedence to columns based on vertical
1996 # position in the compound statement, and discard
1997 # columns that have no reference to the target
1998 # column (also occurs with CompoundSelect)
2000 selected_col_distance = sum(
2001 [
2002 sc._annotations.get("weight", 1)
2003 for sc in (
2004 selected_metrics.column._uncached_proxy_list()
2005 )
2006 if sc.shares_lineage(column)
2007 ],
2008 )
2009 current_col_distance = sum(
2010 [
2011 sc._annotations.get("weight", 1)
2012 for sc in (
2013 current_metrics.column._uncached_proxy_list()
2014 )
2015 if sc.shares_lineage(column)
2016 ],
2017 )
2018 if current_col_distance < selected_col_distance:
2019 selected_metrics = current_metrics
2020 selected_intersection = current_intersection
2022 return selected_metrics.column if selected_metrics else None
2025_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]")
2028class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
2029 """A :class:`_expression.ColumnCollection`
2030 that maintains deduplicating behavior.
2032 This is useful by schema level objects such as :class:`_schema.Table` and
2033 :class:`.PrimaryKeyConstraint`. The collection includes more
2034 sophisticated mutator methods as well to suit schema objects which
2035 require mutable column collections.
2037 .. versionadded:: 1.4
2039 """
2041 def add( # type: ignore[override]
2042 self, column: _NAMEDCOL, key: Optional[str] = None
2043 ) -> None:
2044 if key is not None and column.key != key:
2045 raise exc.ArgumentError(
2046 "DedupeColumnCollection requires columns be under "
2047 "the same key as their .key"
2048 )
2049 key = column.key
2051 if key is None:
2052 raise exc.ArgumentError(
2053 "Can't add unnamed column to column collection"
2054 )
2056 if key in self._index:
2057 existing = self._index[key][1]
2059 if existing is column:
2060 return
2062 self.replace(column)
2064 # pop out memoized proxy_set as this
2065 # operation may very well be occurring
2066 # in a _make_proxy operation
2067 util.memoized_property.reset(column, "proxy_set")
2068 else:
2069 self._append_new_column(key, column)
2071 def _append_new_column(self, key: str, named_column: _NAMEDCOL) -> None:
2072 l = len(self._collection)
2073 self._collection.append(
2074 (key, named_column, _ColumnMetrics(self, named_column))
2075 )
2076 self._colset.add(named_column._deannotate())
2077 self._index[l] = (key, named_column)
2078 self._index[key] = (key, named_column)
2080 def _populate_separate_keys(
2081 self, iter_: Iterable[Tuple[str, _NAMEDCOL]]
2082 ) -> None:
2083 """populate from an iterator of (key, column)"""
2084 cols = list(iter_)
2086 replace_col = []
2087 for k, col in cols:
2088 if col.key != k:
2089 raise exc.ArgumentError(
2090 "DedupeColumnCollection requires columns be under "
2091 "the same key as their .key"
2092 )
2093 if col.name in self._index and col.key != col.name:
2094 replace_col.append(col)
2095 elif col.key in self._index:
2096 replace_col.append(col)
2097 else:
2098 self._index[k] = (k, col)
2099 self._collection.append((k, col, _ColumnMetrics(self, col)))
2100 self._colset.update(c._deannotate() for (k, c, _) in self._collection)
2102 self._index.update(
2103 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection)
2104 )
2105 for col in replace_col:
2106 self.replace(col)
2108 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None:
2109 self._populate_separate_keys((col.key, col) for col in iter_)
2111 def remove(self, column: _NAMEDCOL) -> None: # type: ignore[override]
2112 if column not in self._colset:
2113 raise ValueError(
2114 "Can't remove column %r; column is not in this collection"
2115 % column
2116 )
2117 del self._index[column.key]
2118 self._colset.remove(column)
2119 self._collection[:] = [
2120 (k, c, metrics)
2121 for (k, c, metrics) in self._collection
2122 if c is not column
2123 ]
2124 for metrics in self._proxy_index.get(column, ()):
2125 metrics.dispose(self)
2127 self._index.update(
2128 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2129 )
2130 # delete higher index
2131 del self._index[len(self._collection)]
2133 def replace(
2134 self,
2135 column: _NAMEDCOL,
2136 extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
2137 ) -> None:
2138 """add the given column to this collection, removing unaliased
2139 versions of this column as well as existing columns with the
2140 same key.
2142 e.g.::
2144 t = Table("sometable", metadata, Column("col1", Integer))
2145 t.columns.replace(Column("col1", Integer, key="columnone"))
2147 will remove the original 'col1' from the collection, and add
2148 the new column under the name 'columnname'.
2150 Used by schema.Column to override columns during table reflection.
2152 """
2154 if extra_remove:
2155 remove_col = set(extra_remove)
2156 else:
2157 remove_col = set()
2158 # remove up to two columns based on matches of name as well as key
2159 if column.name in self._index and column.key != column.name:
2160 other = self._index[column.name][1]
2161 if other.name == other.key:
2162 remove_col.add(other)
2164 if column.key in self._index:
2165 remove_col.add(self._index[column.key][1])
2167 if not remove_col:
2168 self._append_new_column(column.key, column)
2169 return
2170 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
2171 replaced = False
2172 for k, col, metrics in self._collection:
2173 if col in remove_col:
2174 if not replaced:
2175 replaced = True
2176 new_cols.append(
2177 (column.key, column, _ColumnMetrics(self, column))
2178 )
2179 else:
2180 new_cols.append((k, col, metrics))
2182 if remove_col:
2183 self._colset.difference_update(remove_col)
2185 for rc in remove_col:
2186 for metrics in self._proxy_index.get(rc, ()):
2187 metrics.dispose(self)
2189 if not replaced:
2190 new_cols.append((column.key, column, _ColumnMetrics(self, column)))
2192 self._colset.add(column._deannotate())
2193 self._collection[:] = new_cols
2195 self._index.clear()
2197 self._index.update(
2198 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2199 )
2200 self._index.update({k: (k, col) for (k, col, _) in self._collection})
2203class ReadOnlyColumnCollection(
2204 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co]
2205):
2206 __slots__ = ("_parent",)
2208 def __init__(self, collection: ColumnCollection[_COLKEY, _COL_co]):
2209 object.__setattr__(self, "_parent", collection)
2210 object.__setattr__(self, "_colset", collection._colset)
2211 object.__setattr__(self, "_index", collection._index)
2212 object.__setattr__(self, "_collection", collection._collection)
2213 object.__setattr__(self, "_proxy_index", collection._proxy_index)
2215 def __getstate__(self) -> Dict[str, _COL_co]:
2216 return {"_parent": self._parent}
2218 def __setstate__(self, state: Dict[str, Any]) -> None:
2219 parent = state["_parent"]
2220 self.__init__(parent) # type: ignore
2222 def add(self, column: Any, key: Any = ...) -> Any:
2223 self._readonly()
2225 def extend(self, elements: Any) -> NoReturn:
2226 self._readonly()
2228 def remove(self, item: Any) -> NoReturn:
2229 self._readonly()
2232class ColumnSet(util.OrderedSet["ColumnClause[Any]"]):
2233 def contains_column(self, col: ColumnClause[Any]) -> bool:
2234 return col in self
2236 def extend(self, cols: Iterable[Any]) -> None:
2237 for col in cols:
2238 self.add(col)
2240 def __eq__(self, other):
2241 l = []
2242 for c in other:
2243 for local in self:
2244 if c.shares_lineage(local):
2245 l.append(c == local)
2246 return elements.and_(*l)
2248 def __hash__(self) -> int: # type: ignore[override]
2249 return hash(tuple(x for x in self))
2252def _entity_namespace(
2253 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2254) -> _EntityNamespace:
2255 """Return the nearest .entity_namespace for the given entity.
2257 If not immediately available, does an iterate to find a sub-element
2258 that has one, if any.
2260 """
2261 try:
2262 return cast(_HasEntityNamespace, entity).entity_namespace
2263 except AttributeError:
2264 for elem in visitors.iterate(cast(ExternallyTraversible, entity)):
2265 if _is_has_entity_namespace(elem):
2266 return elem.entity_namespace
2267 else:
2268 raise
2271def _entity_namespace_key(
2272 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2273 key: str,
2274 default: Union[SQLCoreOperations[Any], _NoArg] = NO_ARG,
2275) -> SQLCoreOperations[Any]:
2276 """Return an entry from an entity_namespace.
2279 Raises :class:`_exc.InvalidRequestError` rather than attribute error
2280 on not found.
2282 """
2284 try:
2285 ns = _entity_namespace(entity)
2286 if default is not NO_ARG:
2287 return getattr(ns, key, default)
2288 else:
2289 return getattr(ns, key) # type: ignore
2290 except AttributeError as err:
2291 raise exc.InvalidRequestError(
2292 'Entity namespace for "%s" has no property "%s"' % (entity, key)
2293 ) from err