Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/base.py: 56%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# sql/base.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
9"""Foundational utilities common to many sql modules."""
11from __future__ import annotations
13import collections
14from enum import Enum
15import itertools
16from itertools import zip_longest
17import operator
18import re
19from typing import Any
20from typing import Callable
21from typing import cast
22from typing import Dict
23from typing import FrozenSet
24from typing import Generator
25from typing import Generic
26from typing import Iterable
27from typing import Iterator
28from typing import List
29from typing import Mapping
30from typing import MutableMapping
31from typing import NamedTuple
32from typing import NoReturn
33from typing import Optional
34from typing import overload
35from typing import Sequence
36from typing import Set
37from typing import Tuple
38from typing import Type
39from typing import TYPE_CHECKING
40from typing import TypeVar
41from typing import Union
43from . import roles
44from . import visitors
45from .cache_key import HasCacheKey # noqa
46from .cache_key import MemoizedHasCacheKey # noqa
47from .traversals import HasCopyInternals # noqa
48from .visitors import ClauseVisitor
49from .visitors import ExtendedInternalTraversal
50from .visitors import ExternallyTraversible
51from .visitors import InternalTraversal
52from .. import event
53from .. import exc
54from .. import util
55from ..util import HasMemoized as HasMemoized
56from ..util import hybridmethod
57from ..util import typing as compat_typing
58from ..util import warn_deprecated
59from ..util.typing import Final
60from ..util.typing import Protocol
61from ..util.typing import Self
62from ..util.typing import TypeGuard
64if TYPE_CHECKING:
65 from . import coercions
66 from . import elements
67 from . import type_api
68 from ._orm_types import DMLStrategyArgument
69 from ._orm_types import SynchronizeSessionArgument
70 from ._typing import _CLE
71 from .cache_key import CacheKey
72 from .compiler import SQLCompiler
73 from .elements import BindParameter
74 from .elements import ClauseList
75 from .elements import ColumnClause # noqa
76 from .elements import ColumnElement
77 from .elements import NamedColumn
78 from .elements import SQLCoreOperations
79 from .elements import TextClause
80 from .schema import Column
81 from .schema import DefaultGenerator
82 from .selectable import _JoinTargetElement
83 from .selectable import _SelectIterable
84 from .selectable import FromClause
85 from .visitors import anon_map
86 from ..engine import Connection
87 from ..engine import CursorResult
88 from ..engine.interfaces import _CoreMultiExecuteParams
89 from ..engine.interfaces import _ExecuteOptions
90 from ..engine.interfaces import _ImmutableExecuteOptions
91 from ..engine.interfaces import CacheStats
92 from ..engine.interfaces import Compiled
93 from ..engine.interfaces import CompiledCacheType
94 from ..engine.interfaces import CoreExecuteOptionsParameter
95 from ..engine.interfaces import Dialect
96 from ..engine.interfaces import IsolationLevel
97 from ..engine.interfaces import SchemaTranslateMapType
98 from ..event import dispatcher
100if not TYPE_CHECKING:
101 coercions = None # noqa
102 elements = None # noqa
103 type_api = None # noqa
106class _NoArg(Enum):
107 NO_ARG = 0
109 def __repr__(self):
110 return f"_NoArg.{self.name}"
113NO_ARG: Final = _NoArg.NO_ARG
116class _NoneName(Enum):
117 NONE_NAME = 0
118 """indicate a 'deferred' name that was ultimately the value None."""
121_NONE_NAME: Final = _NoneName.NONE_NAME
123_T = TypeVar("_T", bound=Any)
125_Fn = TypeVar("_Fn", bound=Callable[..., Any])
127_AmbiguousTableNameMap = MutableMapping[str, str]
130class _DefaultDescriptionTuple(NamedTuple):
131 arg: Any
132 is_scalar: Optional[bool]
133 is_callable: Optional[bool]
134 is_sentinel: Optional[bool]
136 @classmethod
137 def _from_column_default(
138 cls, default: Optional[DefaultGenerator]
139 ) -> _DefaultDescriptionTuple:
140 return (
141 _DefaultDescriptionTuple(
142 default.arg, # type: ignore
143 default.is_scalar,
144 default.is_callable,
145 default.is_sentinel,
146 )
147 if default
148 and (
149 default.has_arg
150 or (not default.for_update and default.is_sentinel)
151 )
152 else _DefaultDescriptionTuple(None, None, None, None)
153 )
156_never_select_column: operator.attrgetter[Any] = operator.attrgetter(
157 "_omit_from_statements"
158)
161class _EntityNamespace(Protocol):
162 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ...
165class _HasEntityNamespace(Protocol):
166 @util.ro_non_memoized_property
167 def entity_namespace(self) -> _EntityNamespace: ...
170def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]:
171 return hasattr(element, "entity_namespace")
174# Remove when https://github.com/python/mypy/issues/14640 will be fixed
175_Self = TypeVar("_Self", bound=Any)
178class Immutable:
179 """mark a ClauseElement as 'immutable' when expressions are cloned.
181 "immutable" objects refers to the "mutability" of an object in the
182 context of SQL DQL and DML generation. Such as, in DQL, one can
183 compose a SELECT or subquery of varied forms, but one cannot modify
184 the structure of a specific table or column within DQL.
185 :class:`.Immutable` is mostly intended to follow this concept, and as
186 such the primary "immutable" objects are :class:`.ColumnClause`,
187 :class:`.Column`, :class:`.TableClause`, :class:`.Table`.
189 """
191 __slots__ = ()
193 _is_immutable: bool = True
195 def unique_params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
196 raise NotImplementedError("Immutable objects do not support copying")
198 def params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
199 raise NotImplementedError("Immutable objects do not support copying")
201 def _clone(self: _Self, **kw: Any) -> _Self:
202 return self
204 def _copy_internals(
205 self, *, omit_attrs: Iterable[str] = (), **kw: Any
206 ) -> None:
207 pass
210class SingletonConstant(Immutable):
211 """Represent SQL constants like NULL, TRUE, FALSE"""
213 _is_singleton_constant: bool = True
215 _singleton: SingletonConstant
217 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T:
218 return cast(_T, cls._singleton)
220 @util.non_memoized_property
221 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
222 raise NotImplementedError()
224 @classmethod
225 def _create_singleton(cls) -> None:
226 obj = object.__new__(cls)
227 obj.__init__() # type: ignore
229 # for a long time this was an empty frozenset, meaning
230 # a SingletonConstant would never be a "corresponding column" in
231 # a statement. This referred to #6259. However, in #7154 we see
232 # that we do in fact need "correspondence" to work when matching cols
233 # in result sets, so the non-correspondence was moved to a more
234 # specific level when we are actually adapting expressions for SQL
235 # render only.
236 obj.proxy_set = frozenset([obj])
237 cls._singleton = obj
240def _from_objects(
241 *elements: Union[
242 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement
243 ]
244) -> Iterator[FromClause]:
245 return itertools.chain.from_iterable(
246 [element._from_objects for element in elements]
247 )
250def _select_iterables(
251 elements: Iterable[roles.ColumnsClauseRole],
252) -> _SelectIterable:
253 """expand tables into individual columns in the
254 given list of column expressions.
256 """
257 return itertools.chain.from_iterable(
258 [c._select_iterable for c in elements]
259 )
262_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType")
265class _GenerativeType(compat_typing.Protocol):
266 def _generate(self) -> Self: ...
269def _generative(fn: _Fn) -> _Fn:
270 """non-caching _generative() decorator.
272 This is basically the legacy decorator that copies the object and
273 runs a method on the new copy.
275 """
277 @util.decorator
278 def _generative(
279 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any
280 ) -> _SelfGenerativeType:
281 """Mark a method as generative."""
283 self = self._generate()
284 x = fn(self, *args, **kw)
285 assert x is self, "generative methods must return self"
286 return self
288 decorated = _generative(fn)
289 decorated.non_generative = fn # type: ignore
290 return decorated
293def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]:
294 msgs: Dict[str, str] = kw.pop("msgs", {})
296 defaults: Dict[str, str] = kw.pop("defaults", {})
298 getters: List[Tuple[str, operator.attrgetter[Any], Optional[str]]] = [
299 (name, operator.attrgetter(name), defaults.get(name, None))
300 for name in names
301 ]
303 @util.decorator
304 def check(fn: _Fn, *args: Any, **kw: Any) -> Any:
305 # make pylance happy by not including "self" in the argument
306 # list
307 self = args[0]
308 args = args[1:]
309 for name, getter, default_ in getters:
310 if getter(self) is not default_:
311 msg = msgs.get(
312 name,
313 "Method %s() has already been invoked on this %s construct"
314 % (fn.__name__, self.__class__),
315 )
316 raise exc.InvalidRequestError(msg)
317 return fn(self, *args, **kw)
319 return check
322def _clone(element, **kw):
323 return element._clone(**kw)
326def _expand_cloned(
327 elements: Iterable[_CLE],
328) -> Iterable[_CLE]:
329 """expand the given set of ClauseElements to be the set of all 'cloned'
330 predecessors.
332 """
333 # TODO: cython candidate
334 return itertools.chain(*[x._cloned_set for x in elements])
337def _de_clone(
338 elements: Iterable[_CLE],
339) -> Iterable[_CLE]:
340 for x in elements:
341 while x._is_clone_of is not None:
342 x = x._is_clone_of
343 yield x
346def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
347 """return the intersection of sets a and b, counting
348 any overlap between 'cloned' predecessors.
350 The returned set is in terms of the entities present within 'a'.
352 """
353 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
354 _expand_cloned(b)
355 )
356 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)}
359def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
360 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
361 _expand_cloned(b)
362 )
363 return {
364 elem for elem in a if not all_overlap.intersection(elem._cloned_set)
365 }
368class _DialectArgView(MutableMapping[str, Any]):
369 """A dictionary view of dialect-level arguments in the form
370 <dialectname>_<argument_name>.
372 """
374 __slots__ = ("obj",)
376 def __init__(self, obj: DialectKWArgs) -> None:
377 self.obj = obj
379 def _key(self, key: str) -> Tuple[str, str]:
380 try:
381 dialect, value_key = key.split("_", 1)
382 except ValueError as err:
383 raise KeyError(key) from err
384 else:
385 return dialect, value_key
387 def __getitem__(self, key: str) -> Any:
388 dialect, value_key = self._key(key)
390 try:
391 opt = self.obj.dialect_options[dialect]
392 except exc.NoSuchModuleError as err:
393 raise KeyError(key) from err
394 else:
395 return opt[value_key]
397 def __setitem__(self, key: str, value: Any) -> None:
398 try:
399 dialect, value_key = self._key(key)
400 except KeyError as err:
401 raise exc.ArgumentError(
402 "Keys must be of the form <dialectname>_<argname>"
403 ) from err
404 else:
405 self.obj.dialect_options[dialect][value_key] = value
407 def __delitem__(self, key: str) -> None:
408 dialect, value_key = self._key(key)
409 del self.obj.dialect_options[dialect][value_key]
411 def __len__(self) -> int:
412 return sum(
413 len(args._non_defaults)
414 for args in self.obj.dialect_options.values()
415 )
417 def __iter__(self) -> Generator[str, None, None]:
418 return (
419 "%s_%s" % (dialect_name, value_name)
420 for dialect_name in self.obj.dialect_options
421 for value_name in self.obj.dialect_options[
422 dialect_name
423 ]._non_defaults
424 )
427class _DialectArgDict(MutableMapping[str, Any]):
428 """A dictionary view of dialect-level arguments for a specific
429 dialect.
431 Maintains a separate collection of user-specified arguments
432 and dialect-specified default arguments.
434 """
436 def __init__(self) -> None:
437 self._non_defaults: Dict[str, Any] = {}
438 self._defaults: Dict[str, Any] = {}
440 def __len__(self) -> int:
441 return len(set(self._non_defaults).union(self._defaults))
443 def __iter__(self) -> Iterator[str]:
444 return iter(set(self._non_defaults).union(self._defaults))
446 def __getitem__(self, key: str) -> Any:
447 if key in self._non_defaults:
448 return self._non_defaults[key]
449 else:
450 return self._defaults[key]
452 def __setitem__(self, key: str, value: Any) -> None:
453 self._non_defaults[key] = value
455 def __delitem__(self, key: str) -> None:
456 del self._non_defaults[key]
459@util.preload_module("sqlalchemy.dialects")
460def _kw_reg_for_dialect(dialect_name: str) -> Optional[Dict[Any, Any]]:
461 dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
462 if dialect_cls.construct_arguments is None:
463 return None
464 return dict(dialect_cls.construct_arguments)
467class DialectKWArgs:
468 """Establish the ability for a class to have dialect-specific arguments
469 with defaults and constructor validation.
471 The :class:`.DialectKWArgs` interacts with the
472 :attr:`.DefaultDialect.construct_arguments` present on a dialect.
474 .. seealso::
476 :attr:`.DefaultDialect.construct_arguments`
478 """
480 __slots__ = ()
482 _dialect_kwargs_traverse_internals: List[Tuple[str, Any]] = [
483 ("dialect_options", InternalTraversal.dp_dialect_options)
484 ]
486 def get_dialect_option(
487 self,
488 dialect: Dialect,
489 argument_name: str,
490 *,
491 else_: Any = None,
492 deprecated_fallback: Optional[str] = None,
493 ) -> Any:
494 r"""Return the value of a dialect-specific option, or *else_* if
495 this dialect does not register the given argument.
497 This is useful for DDL compilers that may be inherited by
498 third-party dialects whose ``construct_arguments`` do not
499 include the same set of keys as the parent dialect.
501 :param dialect: The dialect for which to retrieve the option.
502 :param argument_name: The name of the argument to retrieve.
503 :param else\_: The value to return if the argument is not present.
504 :param deprecated_fallback: Optional dialect name to fall back to
505 if the argument is not present for the current dialect. If the
506 argument is present for the fallback dialect but not the current
507 dialect, a deprecation warning will be emitted.
509 """
511 registry = DialectKWArgs._kw_registry[dialect.name]
512 if registry is None:
513 return else_
515 if argument_name in registry.get(self.__class__, {}):
516 if (
517 deprecated_fallback is None
518 or dialect.name == deprecated_fallback
519 ):
520 return self.dialect_options[dialect.name][argument_name]
522 # deprecated_fallback is present; need to look in two places
524 # Current dialect has this option registered.
525 # Check if user explicitly set it.
526 if (
527 dialect.name in self.dialect_options
528 and argument_name
529 in self.dialect_options[dialect.name]._non_defaults
530 ):
531 # User explicitly set this dialect's option - use it
532 return self.dialect_options[dialect.name][argument_name]
534 # User didn't set current dialect's option.
535 # Check for deprecated fallback.
536 elif (
537 deprecated_fallback in self.dialect_options
538 and argument_name
539 in self.dialect_options[deprecated_fallback]._non_defaults
540 ):
541 # User set fallback option but not current dialect's option
542 warn_deprecated(
543 f"Using '{deprecated_fallback}_{argument_name}' "
544 f"with the '{dialect.name}' dialect is deprecated; "
545 f"please additionally specify "
546 f"'{dialect.name}_{argument_name}'.",
547 version="2.1",
548 )
549 return self.dialect_options[deprecated_fallback][argument_name]
551 # Return default value
552 return self.dialect_options[dialect.name][argument_name]
553 else:
554 # Current dialect doesn't have the option registered at all.
555 # Don't warn - if a third-party dialect doesn't support an
556 # option, that's their choice, not a deprecation case.
557 return else_
559 @classmethod
560 def argument_for(
561 cls, dialect_name: str, argument_name: str, default: Any
562 ) -> None:
563 """Add a new kind of dialect-specific keyword argument for this class.
565 E.g.::
567 Index.argument_for("mydialect", "length", None)
569 some_index = Index("a", "b", mydialect_length=5)
571 The :meth:`.DialectKWArgs.argument_for` method is a per-argument
572 way adding extra arguments to the
573 :attr:`.DefaultDialect.construct_arguments` dictionary. This
574 dictionary provides a list of argument names accepted by various
575 schema-level constructs on behalf of a dialect.
577 New dialects should typically specify this dictionary all at once as a
578 data member of the dialect class. The use case for ad-hoc addition of
579 argument names is typically for end-user code that is also using
580 a custom compilation scheme which consumes the additional arguments.
582 :param dialect_name: name of a dialect. The dialect must be
583 locatable, else a :class:`.NoSuchModuleError` is raised. The
584 dialect must also include an existing
585 :attr:`.DefaultDialect.construct_arguments` collection, indicating
586 that it participates in the keyword-argument validation and default
587 system, else :class:`.ArgumentError` is raised. If the dialect does
588 not include this collection, then any keyword argument can be
589 specified on behalf of this dialect already. All dialects packaged
590 within SQLAlchemy include this collection, however for third party
591 dialects, support may vary.
593 :param argument_name: name of the parameter.
595 :param default: default value of the parameter.
597 """
599 construct_arg_dictionary: Optional[Dict[Any, Any]] = (
600 DialectKWArgs._kw_registry[dialect_name]
601 )
602 if construct_arg_dictionary is None:
603 raise exc.ArgumentError(
604 "Dialect '%s' does have keyword-argument "
605 "validation and defaults enabled configured" % dialect_name
606 )
607 if cls not in construct_arg_dictionary:
608 construct_arg_dictionary[cls] = {}
609 construct_arg_dictionary[cls][argument_name] = default
611 @property
612 def dialect_kwargs(self) -> _DialectArgView:
613 """A collection of keyword arguments specified as dialect-specific
614 options to this construct.
616 The arguments are present here in their original ``<dialect>_<kwarg>``
617 format. Only arguments that were actually passed are included;
618 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which
619 contains all options known by this dialect including defaults.
621 The collection is also writable; keys are accepted of the
622 form ``<dialect>_<kwarg>`` where the value will be assembled
623 into the list of options.
625 .. seealso::
627 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form
629 """
630 return _DialectArgView(self)
632 @property
633 def kwargs(self) -> _DialectArgView:
634 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
635 return self.dialect_kwargs
637 _kw_registry: util.PopulateDict[str, Optional[Dict[Any, Any]]] = (
638 util.PopulateDict(_kw_reg_for_dialect)
639 )
641 @classmethod
642 def _kw_reg_for_dialect_cls(cls, dialect_name: str) -> _DialectArgDict:
643 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
644 d = _DialectArgDict()
646 if construct_arg_dictionary is None:
647 d._defaults.update({"*": None})
648 else:
649 for cls in reversed(cls.__mro__):
650 if cls in construct_arg_dictionary:
651 d._defaults.update(construct_arg_dictionary[cls])
652 return d
654 @util.memoized_property
655 def dialect_options(self) -> util.PopulateDict[str, _DialectArgDict]:
656 """A collection of keyword arguments specified as dialect-specific
657 options to this construct.
659 This is a two-level nested registry, keyed to ``<dialect_name>``
660 and ``<argument_name>``. For example, the ``postgresql_where``
661 argument would be locatable as::
663 arg = my_object.dialect_options["postgresql"]["where"]
665 .. versionadded:: 0.9.2
667 .. seealso::
669 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form
671 """
673 return util.PopulateDict(self._kw_reg_for_dialect_cls)
675 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None:
676 # validate remaining kwargs that they all specify DB prefixes
678 if not kwargs:
679 return
681 for k in kwargs:
682 m = re.match("^(.+?)_(.+)$", k)
683 if not m:
684 raise TypeError(
685 "Additional arguments should be "
686 "named <dialectname>_<argument>, got '%s'" % k
687 )
688 dialect_name, arg_name = m.group(1, 2)
690 try:
691 construct_arg_dictionary = self.dialect_options[dialect_name]
692 except exc.NoSuchModuleError:
693 util.warn(
694 "Can't validate argument %r; can't "
695 "locate any SQLAlchemy dialect named %r"
696 % (k, dialect_name)
697 )
698 self.dialect_options[dialect_name] = d = _DialectArgDict()
699 d._defaults.update({"*": None})
700 d._non_defaults[arg_name] = kwargs[k]
701 else:
702 if (
703 "*" not in construct_arg_dictionary
704 and arg_name not in construct_arg_dictionary
705 ):
706 raise exc.ArgumentError(
707 "Argument %r is not accepted by "
708 "dialect %r on behalf of %r"
709 % (k, dialect_name, self.__class__)
710 )
711 else:
712 construct_arg_dictionary[arg_name] = kwargs[k]
715class CompileState:
716 """Produces additional object state necessary for a statement to be
717 compiled.
719 the :class:`.CompileState` class is at the base of classes that assemble
720 state for a particular statement object that is then used by the
721 compiler. This process is essentially an extension of the process that
722 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis
723 on converting raw user intent into more organized structures rather than
724 producing string output. The top-level :class:`.CompileState` for the
725 statement being executed is also accessible when the execution context
726 works with invoking the statement and collecting results.
728 The production of :class:`.CompileState` is specific to the compiler, such
729 as within the :meth:`.SQLCompiler.visit_insert`,
730 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also
731 responsible for associating the :class:`.CompileState` with the
732 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement,
733 i.e. the outermost SQL statement that's actually being executed.
734 There can be other :class:`.CompileState` objects that are not the
735 toplevel, such as when a SELECT subquery or CTE-nested
736 INSERT/UPDATE/DELETE is generated.
738 .. versionadded:: 1.4
740 """
742 __slots__ = ("statement", "_ambiguous_table_name_map")
744 plugins: Dict[Tuple[str, str], Type[CompileState]] = {}
746 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap]
748 @classmethod
749 def create_for_statement(
750 cls, statement: Executable, compiler: SQLCompiler, **kw: Any
751 ) -> CompileState:
752 # factory construction.
754 if statement._propagate_attrs:
755 plugin_name = statement._propagate_attrs.get(
756 "compile_state_plugin", "default"
757 )
758 klass = cls.plugins.get(
759 (plugin_name, statement._effective_plugin_target), None
760 )
761 if klass is None:
762 klass = cls.plugins[
763 ("default", statement._effective_plugin_target)
764 ]
766 else:
767 klass = cls.plugins[
768 ("default", statement._effective_plugin_target)
769 ]
771 if klass is cls:
772 return cls(statement, compiler, **kw)
773 else:
774 return klass.create_for_statement(statement, compiler, **kw)
776 def __init__(self, statement, compiler, **kw):
777 self.statement = statement
779 @classmethod
780 def get_plugin_class(
781 cls, statement: Executable
782 ) -> Optional[Type[CompileState]]:
783 plugin_name = statement._propagate_attrs.get(
784 "compile_state_plugin", None
785 )
787 if plugin_name:
788 key = (plugin_name, statement._effective_plugin_target)
789 if key in cls.plugins:
790 return cls.plugins[key]
792 # there's no case where we call upon get_plugin_class() and want
793 # to get None back, there should always be a default. return that
794 # if there was no plugin-specific class (e.g. Insert with "orm"
795 # plugin)
796 try:
797 return cls.plugins[("default", statement._effective_plugin_target)]
798 except KeyError:
799 return None
801 @classmethod
802 def _get_plugin_class_for_plugin(
803 cls, statement: Executable, plugin_name: str
804 ) -> Optional[Type[CompileState]]:
805 try:
806 return cls.plugins[
807 (plugin_name, statement._effective_plugin_target)
808 ]
809 except KeyError:
810 return None
812 @classmethod
813 def plugin_for(
814 cls, plugin_name: str, visit_name: str
815 ) -> Callable[[_Fn], _Fn]:
816 def decorate(cls_to_decorate):
817 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate
818 return cls_to_decorate
820 return decorate
823class Generative(HasMemoized):
824 """Provide a method-chaining pattern in conjunction with the
825 @_generative decorator."""
827 def _generate(self) -> Self:
828 skip = self._memoized_keys
829 cls = self.__class__
830 s = cls.__new__(cls)
831 if skip:
832 # ensure this iteration remains atomic
833 s.__dict__ = {
834 k: v for k, v in self.__dict__.copy().items() if k not in skip
835 }
836 else:
837 s.__dict__ = self.__dict__.copy()
838 return s
841class InPlaceGenerative(HasMemoized):
842 """Provide a method-chaining pattern in conjunction with the
843 @_generative decorator that mutates in place."""
845 __slots__ = ()
847 def _generate(self) -> Self:
848 skip = self._memoized_keys
849 # note __dict__ needs to be in __slots__ if this is used
850 for k in skip:
851 self.__dict__.pop(k, None)
852 return self
855class HasCompileState(Generative):
856 """A class that has a :class:`.CompileState` associated with it."""
858 _compile_state_plugin: Optional[Type[CompileState]] = None
860 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT
862 _compile_state_factory = CompileState.create_for_statement
865class _MetaOptions(type):
866 """metaclass for the Options class.
868 This metaclass is actually necessary despite the availability of the
869 ``__init_subclass__()`` hook as this type also provides custom class-level
870 behavior for the ``__add__()`` method.
872 """
874 _cache_attrs: Tuple[str, ...]
876 def __add__(self, other):
877 o1 = self()
879 if set(other).difference(self._cache_attrs):
880 raise TypeError(
881 "dictionary contains attributes not covered by "
882 "Options class %s: %r"
883 % (self, set(other).difference(self._cache_attrs))
884 )
886 o1.__dict__.update(other)
887 return o1
889 if TYPE_CHECKING:
891 def __getattr__(self, key: str) -> Any: ...
893 def __setattr__(self, key: str, value: Any) -> None: ...
895 def __delattr__(self, key: str) -> None: ...
898class Options(metaclass=_MetaOptions):
899 """A cacheable option dictionary with defaults."""
901 __slots__ = ()
903 _cache_attrs: Tuple[str, ...]
905 def __init_subclass__(cls) -> None:
906 dict_ = cls.__dict__
907 cls._cache_attrs = tuple(
908 sorted(
909 d
910 for d in dict_
911 if not d.startswith("__")
912 and d not in ("_cache_key_traversal",)
913 )
914 )
915 super().__init_subclass__()
917 def __init__(self, **kw: Any) -> None:
918 self.__dict__.update(kw)
920 def __add__(self, other):
921 o1 = self.__class__.__new__(self.__class__)
922 o1.__dict__.update(self.__dict__)
924 if set(other).difference(self._cache_attrs):
925 raise TypeError(
926 "dictionary contains attributes not covered by "
927 "Options class %s: %r"
928 % (self, set(other).difference(self._cache_attrs))
929 )
931 o1.__dict__.update(other)
932 return o1
934 def __eq__(self, other):
935 # TODO: very inefficient. This is used only in test suites
936 # right now.
937 for a, b in zip_longest(self._cache_attrs, other._cache_attrs):
938 if getattr(self, a) != getattr(other, b):
939 return False
940 return True
942 def __repr__(self) -> str:
943 # TODO: fairly inefficient, used only in debugging right now.
945 return "%s(%s)" % (
946 self.__class__.__name__,
947 ", ".join(
948 "%s=%r" % (k, self.__dict__[k])
949 for k in self._cache_attrs
950 if k in self.__dict__
951 ),
952 )
954 @classmethod
955 def isinstance(cls, klass: Type[Any]) -> bool:
956 return issubclass(cls, klass)
958 @hybridmethod
959 def add_to_element(self, name: str, value: str) -> Any:
960 return self + {name: getattr(self, name) + value}
962 @hybridmethod
963 def _state_dict_inst(self) -> Mapping[str, Any]:
964 return self.__dict__
966 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT
968 @_state_dict_inst.classlevel
969 def _state_dict(cls) -> Mapping[str, Any]:
970 return cls._state_dict_const
972 @classmethod
973 def safe_merge(cls, other: "Options") -> Any:
974 d = other._state_dict()
976 # only support a merge with another object of our class
977 # and which does not have attrs that we don't. otherwise
978 # we risk having state that might not be part of our cache
979 # key strategy
981 if (
982 cls is not other.__class__
983 and other._cache_attrs
984 and set(other._cache_attrs).difference(cls._cache_attrs)
985 ):
986 raise TypeError(
987 "other element %r is not empty, is not of type %s, "
988 "and contains attributes not covered here %r"
989 % (
990 other,
991 cls,
992 set(other._cache_attrs).difference(cls._cache_attrs),
993 )
994 )
995 return cls + d
997 @classmethod
998 def from_execution_options(
999 cls,
1000 key: str,
1001 attrs: set[str],
1002 exec_options: Mapping[str, Any],
1003 statement_exec_options: Mapping[str, Any],
1004 ) -> Tuple["Options", Mapping[str, Any]]:
1005 """process Options argument in terms of execution options.
1008 e.g.::
1010 (
1011 load_options,
1012 execution_options,
1013 ) = QueryContext.default_load_options.from_execution_options(
1014 "_sa_orm_load_options",
1015 {"populate_existing", "autoflush", "yield_per"},
1016 execution_options,
1017 statement._execution_options,
1018 )
1020 get back the Options and refresh "_sa_orm_load_options" in the
1021 exec options dict w/ the Options as well
1023 """
1025 # common case is that no options we are looking for are
1026 # in either dictionary, so cancel for that first
1027 check_argnames = attrs.intersection(
1028 set(exec_options).union(statement_exec_options)
1029 )
1031 existing_options = exec_options.get(key, cls)
1033 if check_argnames:
1034 result = {}
1035 for argname in check_argnames:
1036 local = "_" + argname
1037 if argname in exec_options:
1038 result[local] = exec_options[argname]
1039 elif argname in statement_exec_options:
1040 result[local] = statement_exec_options[argname]
1042 new_options = existing_options + result
1043 exec_options = util.immutabledict(exec_options).merge_with(
1044 {key: new_options}
1045 )
1046 return new_options, exec_options
1048 else:
1049 return existing_options, exec_options
1051 if TYPE_CHECKING:
1053 def __getattr__(self, key: str) -> Any: ...
1055 def __setattr__(self, key: str, value: Any) -> None: ...
1057 def __delattr__(self, key: str) -> None: ...
1060class CacheableOptions(Options, HasCacheKey):
1061 __slots__ = ()
1063 @hybridmethod
1064 def _gen_cache_key_inst(
1065 self, anon_map: Any, bindparams: List[BindParameter[Any]]
1066 ) -> Optional[Tuple[Any]]:
1067 return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
1069 @_gen_cache_key_inst.classlevel
1070 def _gen_cache_key(
1071 cls, anon_map: "anon_map", bindparams: List[BindParameter[Any]]
1072 ) -> Tuple[CacheableOptions, Any]:
1073 return (cls, ())
1075 @hybridmethod
1076 def _generate_cache_key(self) -> Optional[CacheKey]:
1077 return HasCacheKey._generate_cache_key_for_object(self)
1080class ExecutableOption(HasCopyInternals):
1081 __slots__ = ()
1083 _annotations: _ImmutableExecuteOptions = util.EMPTY_DICT
1085 __visit_name__: str = "executable_option"
1087 _is_has_cache_key: bool = False
1089 _is_core: bool = True
1091 def _clone(self, **kw):
1092 """Create a shallow copy of this ExecutableOption."""
1093 c = self.__class__.__new__(self.__class__)
1094 c.__dict__ = dict(self.__dict__) # type: ignore
1095 return c
1098class Executable(roles.StatementRole):
1099 """Mark a :class:`_expression.ClauseElement` as supporting execution.
1101 :class:`.Executable` is a superclass for all "statement" types
1102 of objects, including :func:`select`, :func:`delete`, :func:`update`,
1103 :func:`insert`, :func:`text`.
1105 """
1107 supports_execution: bool = True
1108 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT
1109 _is_default_generator: bool = False
1110 _with_options: Tuple[ExecutableOption, ...] = ()
1111 _with_context_options: Tuple[
1112 Tuple[Callable[[CompileState], None], Any], ...
1113 ] = ()
1114 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]]
1116 _executable_traverse_internals = [
1117 ("_with_options", InternalTraversal.dp_executable_options),
1118 (
1119 "_with_context_options",
1120 ExtendedInternalTraversal.dp_with_context_options,
1121 ),
1122 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
1123 ]
1125 is_select: bool = False
1126 is_from_statement: bool = False
1127 is_update: bool = False
1128 is_insert: bool = False
1129 is_text: bool = False
1130 is_delete: bool = False
1131 is_dml: bool = False
1133 if TYPE_CHECKING:
1134 __visit_name__: str
1136 def _compile_w_cache(
1137 self,
1138 dialect: Dialect,
1139 *,
1140 compiled_cache: Optional[CompiledCacheType],
1141 column_keys: List[str],
1142 for_executemany: bool = False,
1143 schema_translate_map: Optional[SchemaTranslateMapType] = None,
1144 **kw: Any,
1145 ) -> Tuple[
1146 Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats
1147 ]: ...
1149 def _execute_on_connection(
1150 self,
1151 connection: Connection,
1152 distilled_params: _CoreMultiExecuteParams,
1153 execution_options: CoreExecuteOptionsParameter,
1154 ) -> CursorResult[Any]: ...
1156 def _execute_on_scalar(
1157 self,
1158 connection: Connection,
1159 distilled_params: _CoreMultiExecuteParams,
1160 execution_options: CoreExecuteOptionsParameter,
1161 ) -> Any: ...
1163 @util.ro_non_memoized_property
1164 def _all_selected_columns(self) -> _SelectIterable:
1165 raise NotImplementedError()
1167 @property
1168 def _effective_plugin_target(self) -> str:
1169 return self.__visit_name__
1171 @_generative
1172 def options(self, *options: ExecutableOption) -> Self:
1173 """Apply options to this statement.
1175 In the general sense, options are any kind of Python object
1176 that can be interpreted by the SQL compiler for the statement.
1177 These options can be consumed by specific dialects or specific kinds
1178 of compilers.
1180 The most commonly known kind of option are the ORM level options
1181 that apply "eager load" and other loading behaviors to an ORM
1182 query. However, options can theoretically be used for many other
1183 purposes.
1185 For background on specific kinds of options for specific kinds of
1186 statements, refer to the documentation for those option objects.
1188 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to
1189 Core statement objects towards the goal of allowing unified
1190 Core / ORM querying capabilities.
1192 .. seealso::
1194 :ref:`loading_columns` - refers to options specific to the usage
1195 of ORM queries
1197 :ref:`relationship_loader_options` - refers to options specific
1198 to the usage of ORM queries
1200 """
1201 self._with_options += tuple(
1202 coercions.expect(roles.ExecutableOptionRole, opt)
1203 for opt in options
1204 )
1205 return self
1207 @_generative
1208 def _set_compile_options(self, compile_options: CacheableOptions) -> Self:
1209 """Assign the compile options to a new value.
1211 :param compile_options: appropriate CacheableOptions structure
1213 """
1215 self._compile_options = compile_options
1216 return self
1218 @_generative
1219 def _update_compile_options(self, options: CacheableOptions) -> Self:
1220 """update the _compile_options with new keys."""
1222 assert self._compile_options is not None
1223 self._compile_options += options
1224 return self
1226 @_generative
1227 def _add_context_option(
1228 self,
1229 callable_: Callable[[CompileState], None],
1230 cache_args: Any,
1231 ) -> Self:
1232 """Add a context option to this statement.
1234 These are callable functions that will
1235 be given the CompileState object upon compilation.
1237 A second argument cache_args is required, which will be combined with
1238 the ``__code__`` identity of the function itself in order to produce a
1239 cache key.
1241 """
1242 self._with_context_options += ((callable_, cache_args),)
1243 return self
1245 @overload
1246 def execution_options(
1247 self,
1248 *,
1249 compiled_cache: Optional[CompiledCacheType] = ...,
1250 logging_token: str = ...,
1251 isolation_level: IsolationLevel = ...,
1252 no_parameters: bool = False,
1253 stream_results: bool = False,
1254 max_row_buffer: int = ...,
1255 yield_per: int = ...,
1256 insertmanyvalues_page_size: int = ...,
1257 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
1258 populate_existing: bool = False,
1259 autoflush: bool = False,
1260 synchronize_session: SynchronizeSessionArgument = ...,
1261 dml_strategy: DMLStrategyArgument = ...,
1262 render_nulls: bool = ...,
1263 is_delete_using: bool = ...,
1264 is_update_from: bool = ...,
1265 preserve_rowcount: bool = False,
1266 **opt: Any,
1267 ) -> Self: ...
1269 @overload
1270 def execution_options(self, **opt: Any) -> Self: ...
1272 @_generative
1273 def execution_options(self, **kw: Any) -> Self:
1274 """Set non-SQL options for the statement which take effect during
1275 execution.
1277 Execution options can be set at many scopes, including per-statement,
1278 per-connection, or per execution, using methods such as
1279 :meth:`_engine.Connection.execution_options` and parameters which
1280 accept a dictionary of options such as
1281 :paramref:`_engine.Connection.execute.execution_options` and
1282 :paramref:`_orm.Session.execute.execution_options`.
1284 The primary characteristic of an execution option, as opposed to
1285 other kinds of options such as ORM loader options, is that
1286 **execution options never affect the compiled SQL of a query, only
1287 things that affect how the SQL statement itself is invoked or how
1288 results are fetched**. That is, execution options are not part of
1289 what's accommodated by SQL compilation nor are they considered part of
1290 the cached state of a statement.
1292 The :meth:`_sql.Executable.execution_options` method is
1293 :term:`generative`, as
1294 is the case for the method as applied to the :class:`_engine.Engine`
1295 and :class:`_orm.Query` objects, which means when the method is called,
1296 a copy of the object is returned, which applies the given parameters to
1297 that new copy, but leaves the original unchanged::
1299 statement = select(table.c.x, table.c.y)
1300 new_statement = statement.execution_options(my_option=True)
1302 An exception to this behavior is the :class:`_engine.Connection`
1303 object, where the :meth:`_engine.Connection.execution_options` method
1304 is explicitly **not** generative.
1306 The kinds of options that may be passed to
1307 :meth:`_sql.Executable.execution_options` and other related methods and
1308 parameter dictionaries include parameters that are explicitly consumed
1309 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not
1310 defined by SQLAlchemy, which means the methods and/or parameter
1311 dictionaries may be used for user-defined parameters that interact with
1312 custom code, which may access the parameters using methods such as
1313 :meth:`_sql.Executable.get_execution_options` and
1314 :meth:`_engine.Connection.get_execution_options`, or within selected
1315 event hooks using a dedicated ``execution_options`` event parameter
1316 such as
1317 :paramref:`_events.ConnectionEvents.before_execute.execution_options`
1318 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.::
1320 from sqlalchemy import event
1323 @event.listens_for(some_engine, "before_execute")
1324 def _process_opt(conn, statement, multiparams, params, execution_options):
1325 "run a SQL function before invoking a statement"
1327 if execution_options.get("do_special_thing", False):
1328 conn.exec_driver_sql("run_special_function()")
1330 Within the scope of options that are explicitly recognized by
1331 SQLAlchemy, most apply to specific classes of objects and not others.
1332 The most common execution options include:
1334 * :paramref:`_engine.Connection.execution_options.isolation_level` -
1335 sets the isolation level for a connection or a class of connections
1336 via an :class:`_engine.Engine`. This option is accepted only
1337 by :class:`_engine.Connection` or :class:`_engine.Engine`.
1339 * :paramref:`_engine.Connection.execution_options.stream_results` -
1340 indicates results should be fetched using a server side cursor;
1341 this option is accepted by :class:`_engine.Connection`, by the
1342 :paramref:`_engine.Connection.execute.execution_options` parameter
1343 on :meth:`_engine.Connection.execute`, and additionally by
1344 :meth:`_sql.Executable.execution_options` on a SQL statement object,
1345 as well as by ORM constructs like :meth:`_orm.Session.execute`.
1347 * :paramref:`_engine.Connection.execution_options.compiled_cache` -
1348 indicates a dictionary that will serve as the
1349 :ref:`SQL compilation cache <sql_caching>`
1350 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as
1351 well as for ORM methods like :meth:`_orm.Session.execute`.
1352 Can be passed as ``None`` to disable caching for statements.
1353 This option is not accepted by
1354 :meth:`_sql.Executable.execution_options` as it is inadvisable to
1355 carry along a compilation cache within a statement object.
1357 * :paramref:`_engine.Connection.execution_options.schema_translate_map`
1358 - a mapping of schema names used by the
1359 :ref:`Schema Translate Map <schema_translating>` feature, accepted
1360 by :class:`_engine.Connection`, :class:`_engine.Engine`,
1361 :class:`_sql.Executable`, as well as by ORM constructs
1362 like :meth:`_orm.Session.execute`.
1364 .. seealso::
1366 :meth:`_engine.Connection.execution_options`
1368 :paramref:`_engine.Connection.execute.execution_options`
1370 :paramref:`_orm.Session.execute.execution_options`
1372 :ref:`orm_queryguide_execution_options` - documentation on all
1373 ORM-specific execution options
1375 """ # noqa: E501
1376 if "isolation_level" in kw:
1377 raise exc.ArgumentError(
1378 "'isolation_level' execution option may only be specified "
1379 "on Connection.execution_options(), or "
1380 "per-engine using the isolation_level "
1381 "argument to create_engine()."
1382 )
1383 if "compiled_cache" in kw:
1384 raise exc.ArgumentError(
1385 "'compiled_cache' execution option may only be specified "
1386 "on Connection.execution_options(), not per statement."
1387 )
1388 self._execution_options = self._execution_options.union(kw)
1389 return self
1391 def get_execution_options(self) -> _ExecuteOptions:
1392 """Get the non-SQL options which will take effect during execution.
1394 .. versionadded:: 1.3
1396 .. seealso::
1398 :meth:`.Executable.execution_options`
1399 """
1400 return self._execution_options
1403class SchemaEventTarget(event.EventTarget):
1404 """Base class for elements that are the targets of :class:`.DDLEvents`
1405 events.
1407 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
1409 """
1411 dispatch: dispatcher[SchemaEventTarget]
1413 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
1414 """Associate with this SchemaEvent's parent object."""
1416 def _set_parent_with_dispatch(
1417 self, parent: SchemaEventTarget, **kw: Any
1418 ) -> None:
1419 self.dispatch.before_parent_attach(self, parent)
1420 self._set_parent(parent, **kw)
1421 self.dispatch.after_parent_attach(self, parent)
1424class SchemaVisitable(SchemaEventTarget, visitors.Visitable):
1425 """Base class for elements that are targets of a :class:`.SchemaVisitor`.
1427 .. versionadded:: 2.0.41
1429 """
1432class SchemaVisitor(ClauseVisitor):
1433 """Define the visiting for ``SchemaItem`` and more
1434 generally ``SchemaVisitable`` objects.
1436 """
1438 __traverse_options__: Dict[str, Any] = {"schema_visitor": True}
1441class _SentinelDefaultCharacterization(Enum):
1442 NONE = "none"
1443 UNKNOWN = "unknown"
1444 CLIENTSIDE = "clientside"
1445 SENTINEL_DEFAULT = "sentinel_default"
1446 SERVERSIDE = "serverside"
1447 IDENTITY = "identity"
1448 SEQUENCE = "sequence"
1451class _SentinelColumnCharacterization(NamedTuple):
1452 columns: Optional[Sequence[Column[Any]]] = None
1453 is_explicit: bool = False
1454 is_autoinc: bool = False
1455 default_characterization: _SentinelDefaultCharacterization = (
1456 _SentinelDefaultCharacterization.NONE
1457 )
1460_COLKEY = TypeVar("_COLKEY", Union[None, str], str)
1462_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True)
1463_COL = TypeVar("_COL", bound="ColumnElement[Any]")
1466class _ColumnMetrics(Generic[_COL_co]):
1467 __slots__ = ("column",)
1469 column: _COL_co
1471 def __init__(
1472 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co
1473 ) -> None:
1474 self.column = col
1476 # proxy_index being non-empty means it was initialized.
1477 # so we need to update it
1478 pi = collection._proxy_index
1479 if pi:
1480 for eps_col in col._expanded_proxy_set:
1481 pi[eps_col].add(self)
1483 def get_expanded_proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
1484 return self.column._expanded_proxy_set
1486 def dispose(self, collection: ColumnCollection[_COLKEY, _COL_co]) -> None:
1487 pi = collection._proxy_index
1488 if not pi:
1489 return
1490 for col in self.column._expanded_proxy_set:
1491 colset = pi.get(col, None)
1492 if colset:
1493 colset.discard(self)
1494 if colset is not None and not colset:
1495 del pi[col]
1497 def embedded(
1498 self,
1499 target_set: Union[
1500 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]]
1501 ],
1502 ) -> bool:
1503 expanded_proxy_set = self.column._expanded_proxy_set
1504 for t in target_set.difference(expanded_proxy_set):
1505 if not expanded_proxy_set.intersection(_expand_cloned([t])):
1506 return False
1507 return True
1510class ColumnCollection(Generic[_COLKEY, _COL_co]):
1511 """Collection of :class:`_expression.ColumnElement` instances,
1512 typically for
1513 :class:`_sql.FromClause` objects.
1515 The :class:`_sql.ColumnCollection` object is most commonly available
1516 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection
1517 on the :class:`_schema.Table` object, introduced at
1518 :ref:`metadata_tables_and_columns`.
1520 The :class:`_expression.ColumnCollection` has both mapping- and sequence-
1521 like behaviors. A :class:`_expression.ColumnCollection` usually stores
1522 :class:`_schema.Column` objects, which are then accessible both via mapping
1523 style access as well as attribute access style.
1525 To access :class:`_schema.Column` objects using ordinary attribute-style
1526 access, specify the name like any other object attribute, such as below
1527 a column named ``employee_name`` is accessed::
1529 >>> employee_table.c.employee_name
1531 To access columns that have names with special characters or spaces,
1532 index-style access is used, such as below which illustrates a column named
1533 ``employee ' payment`` is accessed::
1535 >>> employee_table.c["employee ' payment"]
1537 As the :class:`_sql.ColumnCollection` object provides a Python dictionary
1538 interface, common dictionary method names like
1539 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`,
1540 and :meth:`_sql.ColumnCollection.items` are available, which means that
1541 database columns that are keyed under these names also need to use indexed
1542 access::
1544 >>> employee_table.c["values"]
1547 The name for which a :class:`_schema.Column` would be present is normally
1548 that of the :paramref:`_schema.Column.key` parameter. In some contexts,
1549 such as a :class:`_sql.Select` object that uses a label style set
1550 using the :meth:`_sql.Select.set_label_style` method, a column of a certain
1551 key may instead be represented under a particular label name such
1552 as ``tablename_columnname``::
1554 >>> from sqlalchemy import select, column, table
1555 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
1556 >>> t = table("t", column("c"))
1557 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
1558 >>> subq = stmt.subquery()
1559 >>> subq.c.t_c
1560 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c>
1562 :class:`.ColumnCollection` also indexes the columns in order and allows
1563 them to be accessible by their integer position::
1565 >>> cc[0]
1566 Column('x', Integer(), table=None)
1567 >>> cc[1]
1568 Column('y', Integer(), table=None)
1570 .. versionadded:: 1.4 :class:`_expression.ColumnCollection`
1571 allows integer-based
1572 index access to the collection.
1574 Iterating the collection yields the column expressions in order::
1576 >>> list(cc)
1577 [Column('x', Integer(), table=None),
1578 Column('y', Integer(), table=None)]
1580 The base :class:`_expression.ColumnCollection` object can store
1581 duplicates, which can
1582 mean either two columns with the same key, in which case the column
1583 returned by key access is **arbitrary**::
1585 >>> x1, x2 = Column("x", Integer), Column("x", Integer)
1586 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)])
1587 >>> list(cc)
1588 [Column('x', Integer(), table=None),
1589 Column('x', Integer(), table=None)]
1590 >>> cc["x"] is x1
1591 False
1592 >>> cc["x"] is x2
1593 True
1595 Or it can also mean the same column multiple times. These cases are
1596 supported as :class:`_expression.ColumnCollection`
1597 is used to represent the columns in
1598 a SELECT statement which may include duplicates.
1600 A special subclass :class:`.DedupeColumnCollection` exists which instead
1601 maintains SQLAlchemy's older behavior of not allowing duplicates; this
1602 collection is used for schema level objects like :class:`_schema.Table`
1603 and
1604 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The
1605 :class:`.DedupeColumnCollection` class also has additional mutation methods
1606 as the schema constructs have more use cases that require removal and
1607 replacement of columns.
1609 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection`
1610 now stores duplicate
1611 column keys as well as the same column in multiple positions. The
1612 :class:`.DedupeColumnCollection` class is added to maintain the
1613 former behavior in those cases where deduplication as well as
1614 additional replace/remove operations are needed.
1617 """
1619 __slots__ = ("_collection", "_index", "_colset", "_proxy_index")
1621 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]]
1622 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
1623 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]]
1624 _colset: Set[_COL_co]
1626 def __init__(
1627 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None
1628 ):
1629 object.__setattr__(self, "_colset", set())
1630 object.__setattr__(self, "_index", {})
1631 object.__setattr__(
1632 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1633 )
1634 object.__setattr__(self, "_collection", [])
1635 if columns:
1636 self._initial_populate(columns)
1638 @util.preload_module("sqlalchemy.sql.elements")
1639 def __clause_element__(self) -> ClauseList:
1640 elements = util.preloaded.sql_elements
1642 return elements.ClauseList(
1643 _literal_as_text_role=roles.ColumnsClauseRole,
1644 group=False,
1645 *self._all_columns,
1646 )
1648 def _initial_populate(
1649 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1650 ) -> None:
1651 self._populate_separate_keys(iter_)
1653 @property
1654 def _all_columns(self) -> List[_COL_co]:
1655 return [col for (_, col, _) in self._collection]
1657 def keys(self) -> List[_COLKEY]:
1658 """Return a sequence of string key names for all columns in this
1659 collection."""
1660 return [k for (k, _, _) in self._collection]
1662 def values(self) -> List[_COL_co]:
1663 """Return a sequence of :class:`_sql.ColumnClause` or
1664 :class:`_schema.Column` objects for all columns in this
1665 collection."""
1666 return [col for (_, col, _) in self._collection]
1668 def items(self) -> List[Tuple[_COLKEY, _COL_co]]:
1669 """Return a sequence of (key, column) tuples for all columns in this
1670 collection each consisting of a string key name and a
1671 :class:`_sql.ColumnClause` or
1672 :class:`_schema.Column` object.
1673 """
1675 return [(k, col) for (k, col, _) in self._collection]
1677 def __bool__(self) -> bool:
1678 return bool(self._collection)
1680 def __len__(self) -> int:
1681 return len(self._collection)
1683 def __iter__(self) -> Iterator[_COL_co]:
1684 # turn to a list first to maintain over a course of changes
1685 return iter([col for _, col, _ in self._collection])
1687 @overload
1688 def __getitem__(self, key: Union[str, int]) -> _COL_co: ...
1690 @overload
1691 def __getitem__(
1692 self, key: Tuple[Union[str, int], ...]
1693 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1695 @overload
1696 def __getitem__(
1697 self, key: slice
1698 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1700 def __getitem__(
1701 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]]
1702 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]:
1703 try:
1704 if isinstance(key, (tuple, slice)):
1705 if isinstance(key, slice):
1706 cols = (
1707 (sub_key, col)
1708 for (sub_key, col, _) in self._collection[key]
1709 )
1710 else:
1711 cols = (self._index[sub_key] for sub_key in key)
1713 return ColumnCollection(cols).as_readonly()
1714 else:
1715 return self._index[key][1]
1716 except KeyError as err:
1717 if isinstance(err.args[0], int):
1718 raise IndexError(err.args[0]) from err
1719 else:
1720 raise
1722 def __getattr__(self, key: str) -> _COL_co:
1723 try:
1724 return self._index[key][1]
1725 except KeyError as err:
1726 raise AttributeError(key) from err
1728 def __contains__(self, key: str) -> bool:
1729 if key not in self._index:
1730 if not isinstance(key, str):
1731 raise exc.ArgumentError(
1732 "__contains__ requires a string argument"
1733 )
1734 return False
1735 else:
1736 return True
1738 def compare(self, other: ColumnCollection[_COLKEY, _COL_co]) -> bool:
1739 """Compare this :class:`_expression.ColumnCollection` to another
1740 based on the names of the keys"""
1742 for l, r in zip_longest(self, other):
1743 if l is not r:
1744 return False
1745 else:
1746 return True
1748 def __eq__(self, other: Any) -> bool:
1749 return self.compare(other)
1751 @overload
1752 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ...
1754 @overload
1755 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ...
1757 def get(
1758 self, key: str, default: Optional[_COL] = None
1759 ) -> Optional[Union[_COL_co, _COL]]:
1760 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object
1761 based on a string key name from this
1762 :class:`_expression.ColumnCollection`."""
1764 if key in self._index:
1765 return self._index[key][1]
1766 else:
1767 return default
1769 def __str__(self) -> str:
1770 return "%s(%s)" % (
1771 self.__class__.__name__,
1772 ", ".join(str(c) for c in self),
1773 )
1775 def __setitem__(self, key: str, value: Any) -> NoReturn:
1776 raise NotImplementedError()
1778 def __delitem__(self, key: str) -> NoReturn:
1779 raise NotImplementedError()
1781 def __setattr__(self, key: str, obj: Any) -> NoReturn:
1782 raise NotImplementedError()
1784 def clear(self) -> NoReturn:
1785 """Dictionary clear() is not implemented for
1786 :class:`_sql.ColumnCollection`."""
1787 raise NotImplementedError()
1789 def remove(self, column: Any) -> NoReturn:
1790 raise NotImplementedError()
1792 def update(self, iter_: Any) -> NoReturn:
1793 """Dictionary update() is not implemented for
1794 :class:`_sql.ColumnCollection`."""
1795 raise NotImplementedError()
1797 # https://github.com/python/mypy/issues/4266
1798 __hash__: Optional[int] = None # type: ignore
1800 def _populate_separate_keys(
1801 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1802 ) -> None:
1803 """populate from an iterator of (key, column)"""
1805 self._collection[:] = collection = [
1806 (k, c, _ColumnMetrics(self, c)) for k, c in iter_
1807 ]
1808 self._colset.update(c._deannotate() for _, c, _ in collection)
1809 self._index.update(
1810 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)}
1811 )
1812 self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
1814 def add(
1815 self, column: ColumnElement[Any], key: Optional[_COLKEY] = None
1816 ) -> None:
1817 """Add a column to this :class:`_sql.ColumnCollection`.
1819 .. note::
1821 This method is **not normally used by user-facing code**, as the
1822 :class:`_sql.ColumnCollection` is usually part of an existing
1823 object such as a :class:`_schema.Table`. To add a
1824 :class:`_schema.Column` to an existing :class:`_schema.Table`
1825 object, use the :meth:`_schema.Table.append_column` method.
1827 """
1828 colkey: _COLKEY
1830 if key is None:
1831 colkey = column.key # type: ignore
1832 else:
1833 colkey = key
1835 l = len(self._collection)
1837 # don't really know how this part is supposed to work w/ the
1838 # covariant thing
1840 _column = cast(_COL_co, column)
1842 self._collection.append(
1843 (colkey, _column, _ColumnMetrics(self, _column))
1844 )
1845 self._colset.add(_column._deannotate())
1846 self._index[l] = (colkey, _column)
1847 if colkey not in self._index:
1848 self._index[colkey] = (colkey, _column)
1850 def __getstate__(self) -> Dict[str, Any]:
1851 return {
1852 "_collection": [(k, c) for k, c, _ in self._collection],
1853 "_index": self._index,
1854 }
1856 def __setstate__(self, state: Dict[str, Any]) -> None:
1857 object.__setattr__(self, "_index", state["_index"])
1858 object.__setattr__(
1859 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1860 )
1861 object.__setattr__(
1862 self,
1863 "_collection",
1864 [
1865 (k, c, _ColumnMetrics(self, c))
1866 for (k, c) in state["_collection"]
1867 ],
1868 )
1869 object.__setattr__(
1870 self, "_colset", {col for k, col, _ in self._collection}
1871 )
1873 def contains_column(self, col: ColumnElement[Any]) -> bool:
1874 """Checks if a column object exists in this collection"""
1875 if col not in self._colset:
1876 if isinstance(col, str):
1877 raise exc.ArgumentError(
1878 "contains_column cannot be used with string arguments. "
1879 "Use ``col_name in table.c`` instead."
1880 )
1881 return False
1882 else:
1883 return True
1885 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
1886 """Return a "read only" form of this
1887 :class:`_sql.ColumnCollection`."""
1889 return ReadOnlyColumnCollection(self)
1891 def _init_proxy_index(self) -> None:
1892 """populate the "proxy index", if empty.
1894 proxy index is added in 2.0 to provide more efficient operation
1895 for the corresponding_column() method.
1897 For reasons of both time to construct new .c collections as well as
1898 memory conservation for large numbers of large .c collections, the
1899 proxy_index is only filled if corresponding_column() is called. once
1900 filled it stays that way, and new _ColumnMetrics objects created after
1901 that point will populate it with new data. Note this case would be
1902 unusual, if not nonexistent, as it means a .c collection is being
1903 mutated after corresponding_column() were used, however it is tested in
1904 test/base/test_utils.py.
1906 """
1907 pi = self._proxy_index
1908 if pi:
1909 return
1911 for _, _, metrics in self._collection:
1912 eps = metrics.column._expanded_proxy_set
1914 for eps_col in eps:
1915 pi[eps_col].add(metrics)
1917 def corresponding_column(
1918 self, column: _COL, require_embedded: bool = False
1919 ) -> Optional[Union[_COL, _COL_co]]:
1920 """Given a :class:`_expression.ColumnElement`, return the exported
1921 :class:`_expression.ColumnElement` object from this
1922 :class:`_expression.ColumnCollection`
1923 which corresponds to that original :class:`_expression.ColumnElement`
1924 via a common
1925 ancestor column.
1927 :param column: the target :class:`_expression.ColumnElement`
1928 to be matched.
1930 :param require_embedded: only return corresponding columns for
1931 the given :class:`_expression.ColumnElement`, if the given
1932 :class:`_expression.ColumnElement`
1933 is actually present within a sub-element
1934 of this :class:`_expression.Selectable`.
1935 Normally the column will match if
1936 it merely shares a common ancestor with one of the exported
1937 columns of this :class:`_expression.Selectable`.
1939 .. seealso::
1941 :meth:`_expression.Selectable.corresponding_column`
1942 - invokes this method
1943 against the collection returned by
1944 :attr:`_expression.Selectable.exported_columns`.
1946 .. versionchanged:: 1.4 the implementation for ``corresponding_column``
1947 was moved onto the :class:`_expression.ColumnCollection` itself.
1949 """
1950 # TODO: cython candidate
1952 # don't dig around if the column is locally present
1953 if column in self._colset:
1954 return column
1956 selected_intersection, selected_metrics = None, None
1957 target_set = column.proxy_set
1959 pi = self._proxy_index
1960 if not pi:
1961 self._init_proxy_index()
1963 for current_metrics in (
1964 mm for ts in target_set if ts in pi for mm in pi[ts]
1965 ):
1966 if not require_embedded or current_metrics.embedded(target_set):
1967 if selected_metrics is None:
1968 # no corresponding column yet, pick this one.
1969 selected_metrics = current_metrics
1970 continue
1972 current_intersection = target_set.intersection(
1973 current_metrics.column._expanded_proxy_set
1974 )
1975 if selected_intersection is None:
1976 selected_intersection = target_set.intersection(
1977 selected_metrics.column._expanded_proxy_set
1978 )
1980 if len(current_intersection) > len(selected_intersection):
1981 # 'current' has a larger field of correspondence than
1982 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x
1983 # matches a1.c.x->table.c.x better than
1984 # selectable.c.x->table.c.x does.
1986 selected_metrics = current_metrics
1987 selected_intersection = current_intersection
1988 elif current_intersection == selected_intersection:
1989 # they have the same field of correspondence. see
1990 # which proxy_set has fewer columns in it, which
1991 # indicates a closer relationship with the root
1992 # column. Also take into account the "weight"
1993 # attribute which CompoundSelect() uses to give
1994 # higher precedence to columns based on vertical
1995 # position in the compound statement, and discard
1996 # columns that have no reference to the target
1997 # column (also occurs with CompoundSelect)
1999 selected_col_distance = sum(
2000 [
2001 sc._annotations.get("weight", 1)
2002 for sc in (
2003 selected_metrics.column._uncached_proxy_list()
2004 )
2005 if sc.shares_lineage(column)
2006 ],
2007 )
2008 current_col_distance = sum(
2009 [
2010 sc._annotations.get("weight", 1)
2011 for sc in (
2012 current_metrics.column._uncached_proxy_list()
2013 )
2014 if sc.shares_lineage(column)
2015 ],
2016 )
2017 if current_col_distance < selected_col_distance:
2018 selected_metrics = current_metrics
2019 selected_intersection = current_intersection
2021 return selected_metrics.column if selected_metrics else None
2024_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]")
2027class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
2028 """A :class:`_expression.ColumnCollection`
2029 that maintains deduplicating behavior.
2031 This is useful by schema level objects such as :class:`_schema.Table` and
2032 :class:`.PrimaryKeyConstraint`. The collection includes more
2033 sophisticated mutator methods as well to suit schema objects which
2034 require mutable column collections.
2036 .. versionadded:: 1.4
2038 """
2040 def add( # type: ignore[override]
2041 self, column: _NAMEDCOL, key: Optional[str] = None
2042 ) -> None:
2043 if key is not None and column.key != key:
2044 raise exc.ArgumentError(
2045 "DedupeColumnCollection requires columns be under "
2046 "the same key as their .key"
2047 )
2048 key = column.key
2050 if key is None:
2051 raise exc.ArgumentError(
2052 "Can't add unnamed column to column collection"
2053 )
2055 if key in self._index:
2056 existing = self._index[key][1]
2058 if existing is column:
2059 return
2061 self.replace(column)
2063 # pop out memoized proxy_set as this
2064 # operation may very well be occurring
2065 # in a _make_proxy operation
2066 util.memoized_property.reset(column, "proxy_set")
2067 else:
2068 self._append_new_column(key, column)
2070 def _append_new_column(self, key: str, named_column: _NAMEDCOL) -> None:
2071 l = len(self._collection)
2072 self._collection.append(
2073 (key, named_column, _ColumnMetrics(self, named_column))
2074 )
2075 self._colset.add(named_column._deannotate())
2076 self._index[l] = (key, named_column)
2077 self._index[key] = (key, named_column)
2079 def _populate_separate_keys(
2080 self, iter_: Iterable[Tuple[str, _NAMEDCOL]]
2081 ) -> None:
2082 """populate from an iterator of (key, column)"""
2083 cols = list(iter_)
2085 replace_col = []
2086 for k, col in cols:
2087 if col.key != k:
2088 raise exc.ArgumentError(
2089 "DedupeColumnCollection requires columns be under "
2090 "the same key as their .key"
2091 )
2092 if col.name in self._index and col.key != col.name:
2093 replace_col.append(col)
2094 elif col.key in self._index:
2095 replace_col.append(col)
2096 else:
2097 self._index[k] = (k, col)
2098 self._collection.append((k, col, _ColumnMetrics(self, col)))
2099 self._colset.update(c._deannotate() for (k, c, _) in self._collection)
2101 self._index.update(
2102 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection)
2103 )
2104 for col in replace_col:
2105 self.replace(col)
2107 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None:
2108 self._populate_separate_keys((col.key, col) for col in iter_)
2110 def remove(self, column: _NAMEDCOL) -> None: # type: ignore[override]
2111 if column not in self._colset:
2112 raise ValueError(
2113 "Can't remove column %r; column is not in this collection"
2114 % column
2115 )
2116 del self._index[column.key]
2117 self._colset.remove(column)
2118 self._collection[:] = [
2119 (k, c, metrics)
2120 for (k, c, metrics) in self._collection
2121 if c is not column
2122 ]
2123 for metrics in self._proxy_index.get(column, ()):
2124 metrics.dispose(self)
2126 self._index.update(
2127 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2128 )
2129 # delete higher index
2130 del self._index[len(self._collection)]
2132 def replace(
2133 self,
2134 column: _NAMEDCOL,
2135 extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
2136 ) -> None:
2137 """add the given column to this collection, removing unaliased
2138 versions of this column as well as existing columns with the
2139 same key.
2141 e.g.::
2143 t = Table("sometable", metadata, Column("col1", Integer))
2144 t.columns.replace(Column("col1", Integer, key="columnone"))
2146 will remove the original 'col1' from the collection, and add
2147 the new column under the name 'columnname'.
2149 Used by schema.Column to override columns during table reflection.
2151 """
2153 if extra_remove:
2154 remove_col = set(extra_remove)
2155 else:
2156 remove_col = set()
2157 # remove up to two columns based on matches of name as well as key
2158 if column.name in self._index and column.key != column.name:
2159 other = self._index[column.name][1]
2160 if other.name == other.key:
2161 remove_col.add(other)
2163 if column.key in self._index:
2164 remove_col.add(self._index[column.key][1])
2166 if not remove_col:
2167 self._append_new_column(column.key, column)
2168 return
2169 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
2170 replaced = False
2171 for k, col, metrics in self._collection:
2172 if col in remove_col:
2173 if not replaced:
2174 replaced = True
2175 new_cols.append(
2176 (column.key, column, _ColumnMetrics(self, column))
2177 )
2178 else:
2179 new_cols.append((k, col, metrics))
2181 if remove_col:
2182 self._colset.difference_update(remove_col)
2184 for rc in remove_col:
2185 for metrics in self._proxy_index.get(rc, ()):
2186 metrics.dispose(self)
2188 if not replaced:
2189 new_cols.append((column.key, column, _ColumnMetrics(self, column)))
2191 self._colset.add(column._deannotate())
2192 self._collection[:] = new_cols
2194 self._index.clear()
2196 self._index.update(
2197 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2198 )
2199 self._index.update({k: (k, col) for (k, col, _) in self._collection})
2202class ReadOnlyColumnCollection(
2203 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co]
2204):
2205 __slots__ = ("_parent",)
2207 def __init__(self, collection: ColumnCollection[_COLKEY, _COL_co]):
2208 object.__setattr__(self, "_parent", collection)
2209 object.__setattr__(self, "_colset", collection._colset)
2210 object.__setattr__(self, "_index", collection._index)
2211 object.__setattr__(self, "_collection", collection._collection)
2212 object.__setattr__(self, "_proxy_index", collection._proxy_index)
2214 def __getstate__(self) -> Dict[str, _COL_co]:
2215 return {"_parent": self._parent}
2217 def __setstate__(self, state: Dict[str, Any]) -> None:
2218 parent = state["_parent"]
2219 self.__init__(parent) # type: ignore
2221 def add(self, column: Any, key: Any = ...) -> Any:
2222 self._readonly()
2224 def extend(self, elements: Any) -> NoReturn:
2225 self._readonly()
2227 def remove(self, item: Any) -> NoReturn:
2228 self._readonly()
2231class ColumnSet(util.OrderedSet["ColumnClause[Any]"]):
2232 def contains_column(self, col: ColumnClause[Any]) -> bool:
2233 return col in self
2235 def extend(self, cols: Iterable[Any]) -> None:
2236 for col in cols:
2237 self.add(col)
2239 def __eq__(self, other):
2240 l = []
2241 for c in other:
2242 for local in self:
2243 if c.shares_lineage(local):
2244 l.append(c == local)
2245 return elements.and_(*l)
2247 def __hash__(self) -> int: # type: ignore[override]
2248 return hash(tuple(x for x in self))
2251def _entity_namespace(
2252 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2253) -> _EntityNamespace:
2254 """Return the nearest .entity_namespace for the given entity.
2256 If not immediately available, does an iterate to find a sub-element
2257 that has one, if any.
2259 """
2260 try:
2261 return cast(_HasEntityNamespace, entity).entity_namespace
2262 except AttributeError:
2263 for elem in visitors.iterate(cast(ExternallyTraversible, entity)):
2264 if _is_has_entity_namespace(elem):
2265 return elem.entity_namespace
2266 else:
2267 raise
2270def _entity_namespace_key(
2271 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2272 key: str,
2273 default: Union[SQLCoreOperations[Any], _NoArg] = NO_ARG,
2274) -> SQLCoreOperations[Any]:
2275 """Return an entry from an entity_namespace.
2278 Raises :class:`_exc.InvalidRequestError` rather than attribute error
2279 on not found.
2281 """
2283 try:
2284 ns = _entity_namespace(entity)
2285 if default is not NO_ARG:
2286 return getattr(ns, key, default)
2287 else:
2288 return getattr(ns, key) # type: ignore
2289 except AttributeError as err:
2290 raise exc.InvalidRequestError(
2291 'Entity namespace for "%s" has no property "%s"' % (entity, key)
2292 ) from err