Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/base.py: 48%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# sql/base.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
9"""Foundational utilities common to many sql modules."""
11from __future__ import annotations
13import collections
14from enum import Enum
15import itertools
16from itertools import zip_longest
17import operator
18import re
19from typing import Any
20from typing import Callable
21from typing import cast
22from typing import Collection
23from typing import Dict
24from typing import Final
25from typing import FrozenSet
26from typing import Generator
27from typing import Generic
28from typing import Iterable
29from typing import Iterator
30from typing import List
31from typing import Mapping
32from typing import MutableMapping
33from typing import NamedTuple
34from typing import NoReturn
35from typing import Optional
36from typing import overload
37from typing import Protocol
38from typing import Sequence
39from typing import Set
40from typing import Tuple
41from typing import Type
42from typing import TYPE_CHECKING
43from typing import TypeGuard
44from typing import TypeVar
45from typing import Union
47from . import roles
48from . import visitors
49from .cache_key import HasCacheKey # noqa
50from .cache_key import MemoizedHasCacheKey # noqa
51from .traversals import HasCopyInternals # noqa
52from .visitors import ClauseVisitor
53from .visitors import ExtendedInternalTraversal
54from .visitors import ExternallyTraversible
55from .visitors import InternalTraversal
56from .. import event
57from .. import exc
58from .. import util
59from ..util import EMPTY_DICT
60from ..util import HasMemoized as HasMemoized
61from ..util import hybridmethod
62from ..util import warn_deprecated
63from ..util.typing import Self
64from ..util.typing import TypeVarTuple
65from ..util.typing import Unpack
67if TYPE_CHECKING:
68 from . import coercions
69 from . import elements
70 from . import type_api
71 from ._orm_types import DMLStrategyArgument
72 from ._orm_types import SynchronizeSessionArgument
73 from ._typing import _CLE
74 from .cache_key import CacheKey
75 from .compiler import SQLCompiler
76 from .dml import Delete
77 from .dml import Insert
78 from .dml import Update
79 from .elements import BindParameter
80 from .elements import ClauseElement
81 from .elements import ClauseList
82 from .elements import ColumnClause # noqa
83 from .elements import ColumnElement
84 from .elements import NamedColumn
85 from .elements import SQLCoreOperations
86 from .elements import TextClause
87 from .schema import Column
88 from .schema import DefaultGenerator
89 from .selectable import _JoinTargetElement
90 from .selectable import _SelectIterable
91 from .selectable import FromClause
92 from .selectable import Select
93 from .visitors import anon_map
94 from ..engine import Connection
95 from ..engine import CursorResult
96 from ..engine.interfaces import _CoreMultiExecuteParams
97 from ..engine.interfaces import _CoreSingleExecuteParams
98 from ..engine.interfaces import _ExecuteOptions
99 from ..engine.interfaces import _ImmutableExecuteOptions
100 from ..engine.interfaces import CacheStats
101 from ..engine.interfaces import Compiled
102 from ..engine.interfaces import CompiledCacheType
103 from ..engine.interfaces import CoreExecuteOptionsParameter
104 from ..engine.interfaces import Dialect
105 from ..engine.interfaces import IsolationLevel
106 from ..engine.interfaces import SchemaTranslateMapType
107 from ..event import dispatcher
109if not TYPE_CHECKING:
110 coercions = None # noqa
111 elements = None # noqa
112 type_api = None # noqa
115_Ts = TypeVarTuple("_Ts")
118class _NoArg(Enum):
119 NO_ARG = 0
121 def __repr__(self):
122 return f"_NoArg.{self.name}"
125NO_ARG: Final = _NoArg.NO_ARG
128class _NoneName(Enum):
129 NONE_NAME = 0
130 """indicate a 'deferred' name that was ultimately the value None."""
133_NONE_NAME: Final = _NoneName.NONE_NAME
135_T = TypeVar("_T", bound=Any)
137_Fn = TypeVar("_Fn", bound=Callable[..., Any])
139_AmbiguousTableNameMap = MutableMapping[str, str]
142class _DefaultDescriptionTuple(NamedTuple):
143 arg: Any
144 is_scalar: Optional[bool]
145 is_callable: Optional[bool]
146 is_sentinel: Optional[bool]
148 @classmethod
149 def _from_column_default(
150 cls, default: Optional[DefaultGenerator]
151 ) -> _DefaultDescriptionTuple:
152 return (
153 _DefaultDescriptionTuple(
154 default.arg, # type: ignore
155 default.is_scalar,
156 default.is_callable,
157 default.is_sentinel,
158 )
159 if default
160 and (
161 default.has_arg
162 or (not default.for_update and default.is_sentinel)
163 )
164 else _DefaultDescriptionTuple(None, None, None, None)
165 )
168_never_select_column: operator.attrgetter[Any] = operator.attrgetter(
169 "_omit_from_statements"
170)
173class _EntityNamespace(Protocol):
174 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ...
177class _HasEntityNamespace(Protocol):
178 @util.ro_non_memoized_property
179 def entity_namespace(self) -> _EntityNamespace: ...
182def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]:
183 return hasattr(element, "entity_namespace")
186# Remove when https://github.com/python/mypy/issues/14640 will be fixed
187_Self = TypeVar("_Self", bound=Any)
190class Immutable:
191 """mark a ClauseElement as 'immutable' when expressions are cloned.
193 "immutable" objects refers to the "mutability" of an object in the
194 context of SQL DQL and DML generation. Such as, in DQL, one can
195 compose a SELECT or subquery of varied forms, but one cannot modify
196 the structure of a specific table or column within DQL.
197 :class:`.Immutable` is mostly intended to follow this concept, and as
198 such the primary "immutable" objects are :class:`.ColumnClause`,
199 :class:`.Column`, :class:`.TableClause`, :class:`.Table`.
201 """
203 __slots__ = ()
205 _is_immutable: bool = True
207 def unique_params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
208 raise NotImplementedError("Immutable objects do not support copying")
210 def params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
211 raise NotImplementedError("Immutable objects do not support copying")
213 def _clone(self: _Self, **kw: Any) -> _Self:
214 return self
216 def _copy_internals(
217 self, *, omit_attrs: Iterable[str] = (), **kw: Any
218 ) -> None:
219 pass
222class SingletonConstant(Immutable):
223 """Represent SQL constants like NULL, TRUE, FALSE"""
225 _is_singleton_constant: bool = True
227 _singleton: SingletonConstant
229 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T:
230 return cast(_T, cls._singleton)
232 @util.non_memoized_property
233 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
234 raise NotImplementedError()
236 @classmethod
237 def _create_singleton(cls) -> None:
238 obj = object.__new__(cls)
239 obj.__init__() # type: ignore
241 # for a long time this was an empty frozenset, meaning
242 # a SingletonConstant would never be a "corresponding column" in
243 # a statement. This referred to #6259. However, in #7154 we see
244 # that we do in fact need "correspondence" to work when matching cols
245 # in result sets, so the non-correspondence was moved to a more
246 # specific level when we are actually adapting expressions for SQL
247 # render only.
248 obj.proxy_set = frozenset([obj])
249 cls._singleton = obj
252def _from_objects(
253 *elements: Union[
254 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement
255 ]
256) -> Iterator[FromClause]:
257 return itertools.chain.from_iterable(
258 [element._from_objects for element in elements]
259 )
262def _select_iterables(
263 elements: Iterable[roles.ColumnsClauseRole],
264) -> _SelectIterable:
265 """expand tables into individual columns in the
266 given list of column expressions.
268 """
269 return itertools.chain.from_iterable(
270 [c._select_iterable for c in elements]
271 )
274_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType")
277class _GenerativeType(Protocol):
278 def _generate(self) -> Self: ...
281def _generative(fn: _Fn) -> _Fn:
282 """non-caching _generative() decorator.
284 This is basically the legacy decorator that copies the object and
285 runs a method on the new copy.
287 """
289 @util.decorator
290 def _generative(
291 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any
292 ) -> _SelfGenerativeType:
293 """Mark a method as generative."""
295 self = self._generate()
296 x = fn(self, *args, **kw)
297 assert x is self, "generative methods must return self"
298 return self
300 decorated = _generative(fn)
301 decorated.non_generative = fn # type: ignore
302 return decorated
305def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]:
306 msgs: Dict[str, str] = kw.pop("msgs", {})
308 defaults: Dict[str, str] = kw.pop("defaults", {})
310 getters: List[Tuple[str, operator.attrgetter[Any], Optional[str]]] = [
311 (name, operator.attrgetter(name), defaults.get(name, None))
312 for name in names
313 ]
315 @util.decorator
316 def check(fn: _Fn, *args: Any, **kw: Any) -> Any:
317 # make pylance happy by not including "self" in the argument
318 # list
319 self = args[0]
320 args = args[1:]
321 for name, getter, default_ in getters:
322 if getter(self) is not default_:
323 msg = msgs.get(
324 name,
325 "Method %s() has already been invoked on this %s construct"
326 % (fn.__name__, self.__class__),
327 )
328 raise exc.InvalidRequestError(msg)
329 return fn(self, *args, **kw)
331 return check
334def _clone(element, **kw):
335 return element._clone(**kw)
338def _expand_cloned(
339 elements: Iterable[_CLE],
340) -> Iterable[_CLE]:
341 """expand the given set of ClauseElements to be the set of all 'cloned'
342 predecessors.
344 """
345 # TODO: cython candidate
346 return itertools.chain(*[x._cloned_set for x in elements])
349def _de_clone(
350 elements: Iterable[_CLE],
351) -> Iterable[_CLE]:
352 for x in elements:
353 while x._is_clone_of is not None:
354 x = x._is_clone_of
355 yield x
358def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
359 """return the intersection of sets a and b, counting
360 any overlap between 'cloned' predecessors.
362 The returned set is in terms of the entities present within 'a'.
364 """
365 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
366 _expand_cloned(b)
367 )
368 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)}
371def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
372 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
373 _expand_cloned(b)
374 )
375 return {
376 elem for elem in a if not all_overlap.intersection(elem._cloned_set)
377 }
380class _DialectArgView(MutableMapping[str, Any]):
381 """A dictionary view of dialect-level arguments in the form
382 <dialectname>_<argument_name>.
384 """
386 __slots__ = ("obj",)
388 def __init__(self, obj: DialectKWArgs) -> None:
389 self.obj = obj
391 def _key(self, key: str) -> Tuple[str, str]:
392 try:
393 dialect, value_key = key.split("_", 1)
394 except ValueError as err:
395 raise KeyError(key) from err
396 else:
397 return dialect, value_key
399 def __getitem__(self, key: str) -> Any:
400 dialect, value_key = self._key(key)
402 try:
403 opt = self.obj.dialect_options[dialect]
404 except exc.NoSuchModuleError as err:
405 raise KeyError(key) from err
406 else:
407 return opt[value_key]
409 def __setitem__(self, key: str, value: Any) -> None:
410 try:
411 dialect, value_key = self._key(key)
412 except KeyError as err:
413 raise exc.ArgumentError(
414 "Keys must be of the form <dialectname>_<argname>"
415 ) from err
416 else:
417 self.obj.dialect_options[dialect][value_key] = value
419 def __delitem__(self, key: str) -> None:
420 dialect, value_key = self._key(key)
421 del self.obj.dialect_options[dialect][value_key]
423 def __len__(self) -> int:
424 return sum(
425 len(args._non_defaults)
426 for args in self.obj.dialect_options.values()
427 )
429 def __iter__(self) -> Generator[str, None, None]:
430 return (
431 "%s_%s" % (dialect_name, value_name)
432 for dialect_name in self.obj.dialect_options
433 for value_name in self.obj.dialect_options[
434 dialect_name
435 ]._non_defaults
436 )
439class _DialectArgDict(MutableMapping[str, Any]):
440 """A dictionary view of dialect-level arguments for a specific
441 dialect.
443 Maintains a separate collection of user-specified arguments
444 and dialect-specified default arguments.
446 """
448 def __init__(self) -> None:
449 self._non_defaults: Dict[str, Any] = {}
450 self._defaults: Dict[str, Any] = {}
452 def __len__(self) -> int:
453 return len(set(self._non_defaults).union(self._defaults))
455 def __iter__(self) -> Iterator[str]:
456 return iter(set(self._non_defaults).union(self._defaults))
458 def __getitem__(self, key: str) -> Any:
459 if key in self._non_defaults:
460 return self._non_defaults[key]
461 else:
462 return self._defaults[key]
464 def __setitem__(self, key: str, value: Any) -> None:
465 self._non_defaults[key] = value
467 def __delitem__(self, key: str) -> None:
468 del self._non_defaults[key]
471@util.preload_module("sqlalchemy.dialects")
472def _kw_reg_for_dialect(dialect_name: str) -> Optional[Dict[Any, Any]]:
473 dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
474 if dialect_cls.construct_arguments is None:
475 return None
476 return dict(dialect_cls.construct_arguments)
479class DialectKWArgs:
480 """Establish the ability for a class to have dialect-specific arguments
481 with defaults and constructor validation.
483 The :class:`.DialectKWArgs` interacts with the
484 :attr:`.DefaultDialect.construct_arguments` present on a dialect.
486 .. seealso::
488 :attr:`.DefaultDialect.construct_arguments`
490 """
492 __slots__ = ()
494 _dialect_kwargs_traverse_internals: List[Tuple[str, Any]] = [
495 ("dialect_options", InternalTraversal.dp_dialect_options)
496 ]
498 def get_dialect_option(
499 self,
500 dialect: Dialect,
501 argument_name: str,
502 *,
503 else_: Any = None,
504 deprecated_fallback: Optional[str] = None,
505 ) -> Any:
506 r"""Return the value of a dialect-specific option, or *else_* if
507 this dialect does not register the given argument.
509 This is useful for DDL compilers that may be inherited by
510 third-party dialects whose ``construct_arguments`` do not
511 include the same set of keys as the parent dialect.
513 :param dialect: The dialect for which to retrieve the option.
514 :param argument_name: The name of the argument to retrieve.
515 :param else\_: The value to return if the argument is not present.
516 :param deprecated_fallback: Optional dialect name to fall back to
517 if the argument is not present for the current dialect. If the
518 argument is present for the fallback dialect but not the current
519 dialect, a deprecation warning will be emitted.
521 """
523 registry = DialectKWArgs._kw_registry[dialect.name]
524 if registry is None:
525 return else_
527 if argument_name in registry.get(self.__class__, {}):
528 if (
529 deprecated_fallback is None
530 or dialect.name == deprecated_fallback
531 ):
532 return self.dialect_options[dialect.name][argument_name]
534 # deprecated_fallback is present; need to look in two places
536 # Current dialect has this option registered.
537 # Check if user explicitly set it.
538 if (
539 dialect.name in self.dialect_options
540 and argument_name
541 in self.dialect_options[dialect.name]._non_defaults
542 ):
543 # User explicitly set this dialect's option - use it
544 return self.dialect_options[dialect.name][argument_name]
546 # User didn't set current dialect's option.
547 # Check for deprecated fallback.
548 elif (
549 deprecated_fallback in self.dialect_options
550 and argument_name
551 in self.dialect_options[deprecated_fallback]._non_defaults
552 ):
553 # User set fallback option but not current dialect's option
554 warn_deprecated(
555 f"Using '{deprecated_fallback}_{argument_name}' "
556 f"with the '{dialect.name}' dialect is deprecated; "
557 f"please additionally specify "
558 f"'{dialect.name}_{argument_name}'.",
559 version="2.1",
560 )
561 return self.dialect_options[deprecated_fallback][argument_name]
563 # Return default value
564 return self.dialect_options[dialect.name][argument_name]
565 else:
566 # Current dialect doesn't have the option registered at all.
567 # Don't warn - if a third-party dialect doesn't support an
568 # option, that's their choice, not a deprecation case.
569 return else_
571 @classmethod
572 def argument_for(
573 cls, dialect_name: str, argument_name: str, default: Any
574 ) -> None:
575 """Add a new kind of dialect-specific keyword argument for this class.
577 E.g.::
579 Index.argument_for("mydialect", "length", None)
581 some_index = Index("a", "b", mydialect_length=5)
583 The :meth:`.DialectKWArgs.argument_for` method is a per-argument
584 way adding extra arguments to the
585 :attr:`.DefaultDialect.construct_arguments` dictionary. This
586 dictionary provides a list of argument names accepted by various
587 schema-level constructs on behalf of a dialect.
589 New dialects should typically specify this dictionary all at once as a
590 data member of the dialect class. The use case for ad-hoc addition of
591 argument names is typically for end-user code that is also using
592 a custom compilation scheme which consumes the additional arguments.
594 :param dialect_name: name of a dialect. The dialect must be
595 locatable, else a :class:`.NoSuchModuleError` is raised. The
596 dialect must also include an existing
597 :attr:`.DefaultDialect.construct_arguments` collection, indicating
598 that it participates in the keyword-argument validation and default
599 system, else :class:`.ArgumentError` is raised. If the dialect does
600 not include this collection, then any keyword argument can be
601 specified on behalf of this dialect already. All dialects packaged
602 within SQLAlchemy include this collection, however for third party
603 dialects, support may vary.
605 :param argument_name: name of the parameter.
607 :param default: default value of the parameter.
609 """
611 construct_arg_dictionary: Optional[Dict[Any, Any]] = (
612 DialectKWArgs._kw_registry[dialect_name]
613 )
614 if construct_arg_dictionary is None:
615 raise exc.ArgumentError(
616 "Dialect '%s' does have keyword-argument "
617 "validation and defaults enabled configured" % dialect_name
618 )
619 if cls not in construct_arg_dictionary:
620 construct_arg_dictionary[cls] = {}
621 construct_arg_dictionary[cls][argument_name] = default
623 @property
624 def dialect_kwargs(self) -> _DialectArgView:
625 """A collection of keyword arguments specified as dialect-specific
626 options to this construct.
628 The arguments are present here in their original ``<dialect>_<kwarg>``
629 format. Only arguments that were actually passed are included;
630 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which
631 contains all options known by this dialect including defaults.
633 The collection is also writable; keys are accepted of the
634 form ``<dialect>_<kwarg>`` where the value will be assembled
635 into the list of options.
637 .. seealso::
639 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form
641 """
642 return _DialectArgView(self)
644 @property
645 def kwargs(self) -> _DialectArgView:
646 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
647 return self.dialect_kwargs
649 _kw_registry: util.PopulateDict[str, Optional[Dict[Any, Any]]] = (
650 util.PopulateDict(_kw_reg_for_dialect)
651 )
653 @classmethod
654 def _kw_reg_for_dialect_cls(cls, dialect_name: str) -> _DialectArgDict:
655 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
656 d = _DialectArgDict()
658 if construct_arg_dictionary is None:
659 d._defaults.update({"*": None})
660 else:
661 for cls in reversed(cls.__mro__):
662 if cls in construct_arg_dictionary:
663 d._defaults.update(construct_arg_dictionary[cls])
664 return d
666 @util.memoized_property
667 def dialect_options(self) -> util.PopulateDict[str, _DialectArgDict]:
668 """A collection of keyword arguments specified as dialect-specific
669 options to this construct.
671 This is a two-level nested registry, keyed to ``<dialect_name>``
672 and ``<argument_name>``. For example, the ``postgresql_where``
673 argument would be locatable as::
675 arg = my_object.dialect_options["postgresql"]["where"]
677 .. versionadded:: 0.9.2
679 .. seealso::
681 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form
683 """
685 return util.PopulateDict(self._kw_reg_for_dialect_cls)
687 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None:
688 # validate remaining kwargs that they all specify DB prefixes
690 if not kwargs:
691 return
693 for k in kwargs:
694 m = re.match("^(.+?)_(.+)$", k)
695 if not m:
696 raise TypeError(
697 "Additional arguments should be "
698 "named <dialectname>_<argument>, got '%s'" % k
699 )
700 dialect_name, arg_name = m.group(1, 2)
702 try:
703 construct_arg_dictionary = self.dialect_options[dialect_name]
704 except exc.NoSuchModuleError:
705 util.warn(
706 "Can't validate argument %r; can't "
707 "locate any SQLAlchemy dialect named %r"
708 % (k, dialect_name)
709 )
710 self.dialect_options[dialect_name] = d = _DialectArgDict()
711 d._defaults.update({"*": None})
712 d._non_defaults[arg_name] = kwargs[k]
713 else:
714 if (
715 "*" not in construct_arg_dictionary
716 and arg_name not in construct_arg_dictionary
717 ):
718 raise exc.ArgumentError(
719 "Argument %r is not accepted by "
720 "dialect %r on behalf of %r"
721 % (k, dialect_name, self.__class__)
722 )
723 else:
724 construct_arg_dictionary[arg_name] = kwargs[k]
727class CompileState:
728 """Produces additional object state necessary for a statement to be
729 compiled.
731 the :class:`.CompileState` class is at the base of classes that assemble
732 state for a particular statement object that is then used by the
733 compiler. This process is essentially an extension of the process that
734 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis
735 on converting raw user intent into more organized structures rather than
736 producing string output. The top-level :class:`.CompileState` for the
737 statement being executed is also accessible when the execution context
738 works with invoking the statement and collecting results.
740 The production of :class:`.CompileState` is specific to the compiler, such
741 as within the :meth:`.SQLCompiler.visit_insert`,
742 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also
743 responsible for associating the :class:`.CompileState` with the
744 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement,
745 i.e. the outermost SQL statement that's actually being executed.
746 There can be other :class:`.CompileState` objects that are not the
747 toplevel, such as when a SELECT subquery or CTE-nested
748 INSERT/UPDATE/DELETE is generated.
750 .. versionadded:: 1.4
752 """
754 __slots__ = ("statement", "_ambiguous_table_name_map")
756 plugins: Dict[Tuple[str, str], Type[CompileState]] = {}
758 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap]
760 @classmethod
761 def create_for_statement(
762 cls, statement: Executable, compiler: SQLCompiler, **kw: Any
763 ) -> CompileState:
764 # factory construction.
766 if statement._propagate_attrs:
767 plugin_name = statement._propagate_attrs.get(
768 "compile_state_plugin", "default"
769 )
770 klass = cls.plugins.get(
771 (plugin_name, statement._effective_plugin_target), None
772 )
773 if klass is None:
774 klass = cls.plugins[
775 ("default", statement._effective_plugin_target)
776 ]
778 else:
779 klass = cls.plugins[
780 ("default", statement._effective_plugin_target)
781 ]
783 if klass is cls:
784 return cls(statement, compiler, **kw)
785 else:
786 return klass.create_for_statement(statement, compiler, **kw)
788 def __init__(self, statement, compiler, **kw):
789 self.statement = statement
791 @classmethod
792 def get_plugin_class(
793 cls, statement: Executable
794 ) -> Optional[Type[CompileState]]:
795 plugin_name = statement._propagate_attrs.get(
796 "compile_state_plugin", None
797 )
799 if plugin_name:
800 key = (plugin_name, statement._effective_plugin_target)
801 if key in cls.plugins:
802 return cls.plugins[key]
804 # there's no case where we call upon get_plugin_class() and want
805 # to get None back, there should always be a default. return that
806 # if there was no plugin-specific class (e.g. Insert with "orm"
807 # plugin)
808 try:
809 return cls.plugins[("default", statement._effective_plugin_target)]
810 except KeyError:
811 return None
813 @classmethod
814 def _get_plugin_class_for_plugin(
815 cls, statement: Executable, plugin_name: str
816 ) -> Optional[Type[CompileState]]:
817 try:
818 return cls.plugins[
819 (plugin_name, statement._effective_plugin_target)
820 ]
821 except KeyError:
822 return None
824 @classmethod
825 def plugin_for(
826 cls, plugin_name: str, visit_name: str
827 ) -> Callable[[_Fn], _Fn]:
828 def decorate(cls_to_decorate):
829 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate
830 return cls_to_decorate
832 return decorate
835class Generative(HasMemoized):
836 """Provide a method-chaining pattern in conjunction with the
837 @_generative decorator."""
839 def _generate(self) -> Self:
840 skip = self._memoized_keys
841 cls = self.__class__
842 s = cls.__new__(cls)
843 if skip:
844 # ensure this iteration remains atomic
845 s.__dict__ = {
846 k: v for k, v in self.__dict__.copy().items() if k not in skip
847 }
848 else:
849 s.__dict__ = self.__dict__.copy()
850 return s
853class InPlaceGenerative(HasMemoized):
854 """Provide a method-chaining pattern in conjunction with the
855 @_generative decorator that mutates in place."""
857 __slots__ = ()
859 def _generate(self) -> Self:
860 skip = self._memoized_keys
861 # note __dict__ needs to be in __slots__ if this is used
862 for k in skip:
863 self.__dict__.pop(k, None)
864 return self
867class HasCompileState(Generative):
868 """A class that has a :class:`.CompileState` associated with it."""
870 _compile_state_plugin: Optional[Type[CompileState]] = None
872 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT
874 _compile_state_factory = CompileState.create_for_statement
877class _MetaOptions(type):
878 """metaclass for the Options class.
880 This metaclass is actually necessary despite the availability of the
881 ``__init_subclass__()`` hook as this type also provides custom class-level
882 behavior for the ``__add__()`` method.
884 """
886 _cache_attrs: Tuple[str, ...]
888 def __add__(self, other):
889 o1 = self()
891 if set(other).difference(self._cache_attrs):
892 raise TypeError(
893 "dictionary contains attributes not covered by "
894 "Options class %s: %r"
895 % (self, set(other).difference(self._cache_attrs))
896 )
898 o1.__dict__.update(other)
899 return o1
901 if TYPE_CHECKING:
903 def __getattr__(self, key: str) -> Any: ...
905 def __setattr__(self, key: str, value: Any) -> None: ...
907 def __delattr__(self, key: str) -> None: ...
910class Options(metaclass=_MetaOptions):
911 """A cacheable option dictionary with defaults."""
913 __slots__ = ()
915 _cache_attrs: Tuple[str, ...]
917 def __init_subclass__(cls) -> None:
918 dict_ = cls.__dict__
919 cls._cache_attrs = tuple(
920 sorted(
921 d
922 for d in dict_
923 if not d.startswith("__")
924 and d not in ("_cache_key_traversal",)
925 )
926 )
927 super().__init_subclass__()
929 def __init__(self, **kw: Any) -> None:
930 self.__dict__.update(kw)
932 def __add__(self, other):
933 o1 = self.__class__.__new__(self.__class__)
934 o1.__dict__.update(self.__dict__)
936 if set(other).difference(self._cache_attrs):
937 raise TypeError(
938 "dictionary contains attributes not covered by "
939 "Options class %s: %r"
940 % (self, set(other).difference(self._cache_attrs))
941 )
943 o1.__dict__.update(other)
944 return o1
946 def __eq__(self, other):
947 # TODO: very inefficient. This is used only in test suites
948 # right now.
949 for a, b in zip_longest(self._cache_attrs, other._cache_attrs):
950 if getattr(self, a) != getattr(other, b):
951 return False
952 return True
954 def __repr__(self) -> str:
955 # TODO: fairly inefficient, used only in debugging right now.
957 return "%s(%s)" % (
958 self.__class__.__name__,
959 ", ".join(
960 "%s=%r" % (k, self.__dict__[k])
961 for k in self._cache_attrs
962 if k in self.__dict__
963 ),
964 )
966 @classmethod
967 def isinstance(cls, klass: Type[Any]) -> bool:
968 return issubclass(cls, klass)
970 @hybridmethod
971 def add_to_element(self, name: str, value: str) -> Any:
972 return self + {name: getattr(self, name) + value}
974 @hybridmethod
975 def _state_dict_inst(self) -> Mapping[str, Any]:
976 return self.__dict__
978 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT
980 @_state_dict_inst.classlevel
981 def _state_dict(cls) -> Mapping[str, Any]:
982 return cls._state_dict_const
984 @classmethod
985 def safe_merge(cls, other: "Options") -> Any:
986 d = other._state_dict()
988 # only support a merge with another object of our class
989 # and which does not have attrs that we don't. otherwise
990 # we risk having state that might not be part of our cache
991 # key strategy
993 if (
994 cls is not other.__class__
995 and other._cache_attrs
996 and set(other._cache_attrs).difference(cls._cache_attrs)
997 ):
998 raise TypeError(
999 "other element %r is not empty, is not of type %s, "
1000 "and contains attributes not covered here %r"
1001 % (
1002 other,
1003 cls,
1004 set(other._cache_attrs).difference(cls._cache_attrs),
1005 )
1006 )
1007 return cls + d
1009 @classmethod
1010 def from_execution_options(
1011 cls,
1012 key: str,
1013 attrs: set[str],
1014 exec_options: Mapping[str, Any],
1015 statement_exec_options: Mapping[str, Any],
1016 ) -> Tuple["Options", Mapping[str, Any]]:
1017 """process Options argument in terms of execution options.
1020 e.g.::
1022 (
1023 load_options,
1024 execution_options,
1025 ) = QueryContext.default_load_options.from_execution_options(
1026 "_sa_orm_load_options",
1027 {"populate_existing", "autoflush", "yield_per"},
1028 execution_options,
1029 statement._execution_options,
1030 )
1032 get back the Options and refresh "_sa_orm_load_options" in the
1033 exec options dict w/ the Options as well
1035 """
1037 # common case is that no options we are looking for are
1038 # in either dictionary, so cancel for that first
1039 check_argnames = attrs.intersection(
1040 set(exec_options).union(statement_exec_options)
1041 )
1043 existing_options = exec_options.get(key, cls)
1045 if check_argnames:
1046 result = {}
1047 for argname in check_argnames:
1048 local = "_" + argname
1049 if argname in exec_options:
1050 result[local] = exec_options[argname]
1051 elif argname in statement_exec_options:
1052 result[local] = statement_exec_options[argname]
1054 new_options = existing_options + result
1055 exec_options = util.EMPTY_DICT.merge_with(
1056 exec_options, {key: new_options}
1057 )
1058 return new_options, exec_options
1060 else:
1061 return existing_options, exec_options
1063 if TYPE_CHECKING:
1065 def __getattr__(self, key: str) -> Any: ...
1067 def __setattr__(self, key: str, value: Any) -> None: ...
1069 def __delattr__(self, key: str) -> None: ...
1072class CacheableOptions(Options, HasCacheKey):
1073 __slots__ = ()
1075 @hybridmethod
1076 def _gen_cache_key_inst(
1077 self, anon_map: Any, bindparams: List[BindParameter[Any]]
1078 ) -> Optional[Tuple[Any]]:
1079 return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
1081 @_gen_cache_key_inst.classlevel
1082 def _gen_cache_key(
1083 cls, anon_map: "anon_map", bindparams: List[BindParameter[Any]]
1084 ) -> Tuple[CacheableOptions, Any]:
1085 return (cls, ())
1087 @hybridmethod
1088 def _generate_cache_key(self) -> Optional[CacheKey]:
1089 return HasCacheKey._generate_cache_key(self)
1092class ExecutableOption(HasCopyInternals):
1093 __slots__ = ()
1095 _annotations: _ImmutableExecuteOptions = util.EMPTY_DICT
1097 __visit_name__: str = "executable_option"
1099 _is_has_cache_key: bool = False
1101 _is_core: bool = True
1103 def _clone(self, **kw):
1104 """Create a shallow copy of this ExecutableOption."""
1105 c = self.__class__.__new__(self.__class__)
1106 c.__dict__ = dict(self.__dict__) # type: ignore
1107 return c
1110_L = TypeVar("_L", bound=str)
1113class HasSyntaxExtensions(Generic[_L]):
1115 _position_map: Mapping[_L, str]
1117 @_generative
1118 def ext(self, extension: SyntaxExtension) -> Self:
1119 """Applies a SQL syntax extension to this statement.
1121 SQL syntax extensions are :class:`.ClauseElement` objects that define
1122 some vendor-specific syntactical construct that take place in specific
1123 parts of a SQL statement. Examples include vendor extensions like
1124 PostgreSQL / SQLite's "ON DUPLICATE KEY UPDATE", PostgreSQL's
1125 "DISTINCT ON", and MySQL's "LIMIT" that can be applied to UPDATE
1126 and DELETE statements.
1128 .. seealso::
1130 :ref:`examples_syntax_extensions`
1132 :func:`_mysql.limit` - DML LIMIT for MySQL
1134 :func:`_postgresql.distinct_on` - DISTINCT ON for PostgreSQL
1136 .. versionadded:: 2.1
1138 """
1139 extension = coercions.expect(
1140 roles.SyntaxExtensionRole, extension, apply_propagate_attrs=self
1141 )
1142 self._apply_syntax_extension_to_self(extension)
1143 return self
1145 @util.preload_module("sqlalchemy.sql.elements")
1146 def apply_syntax_extension_point(
1147 self,
1148 apply_fn: Callable[[Sequence[ClauseElement]], Sequence[ClauseElement]],
1149 position: _L,
1150 ) -> None:
1151 """Apply a :class:`.SyntaxExtension` to a known extension point.
1153 Should be used only internally by :class:`.SyntaxExtension`.
1155 E.g.::
1157 class Qualify(SyntaxExtension, ClauseElement):
1159 # ...
1161 def apply_to_select(self, select_stmt: Select) -> None:
1162 # append self to existing
1163 select_stmt.apply_extension_point(
1164 lambda existing: [*existing, self], "post_criteria"
1165 )
1168 class ReplaceExt(SyntaxExtension, ClauseElement):
1170 # ...
1172 def apply_to_select(self, select_stmt: Select) -> None:
1173 # replace any existing elements regardless of type
1174 select_stmt.apply_extension_point(
1175 lambda existing: [self], "post_criteria"
1176 )
1179 class ReplaceOfTypeExt(SyntaxExtension, ClauseElement):
1181 # ...
1183 def apply_to_select(self, select_stmt: Select) -> None:
1184 # replace any existing elements of the same type
1185 select_stmt.apply_extension_point(
1186 self.append_replacing_same_type, "post_criteria"
1187 )
1189 :param apply_fn: callable function that will receive a sequence of
1190 :class:`.ClauseElement` that is already populating the extension
1191 point (the sequence is empty if there isn't one), and should return
1192 a new sequence of :class:`.ClauseElement` that will newly populate
1193 that point. The function typically can choose to concatenate the
1194 existing values with the new one, or to replace the values that are
1195 there with a new one by returning a list of a single element, or
1196 to perform more complex operations like removing only the same
1197 type element from the input list of merging already existing elements
1198 of the same type. Some examples are shown in the examples above
1199 :param position: string name of the position to apply to. This
1200 varies per statement type. IDEs should show the possible values
1201 for each statement type as it's typed with a ``typing.Literal`` per
1202 statement.
1204 .. seealso::
1206 :ref:`examples_syntax_extensions`
1208 :meth:`.ext`
1211 """ # noqa: E501
1213 try:
1214 attrname = self._position_map[position]
1215 except KeyError as ke:
1216 raise ValueError(
1217 f"Unknown position {position!r} for {self.__class__} "
1218 f"construct; known positions: "
1219 f"{', '.join(repr(k) for k in self._position_map)}"
1220 ) from ke
1221 else:
1222 ElementList = util.preloaded.sql_elements.ElementList
1223 existing: Optional[ClauseElement] = getattr(self, attrname, None)
1224 if existing is None:
1225 input_seq: Tuple[ClauseElement, ...] = ()
1226 elif isinstance(existing, ElementList):
1227 input_seq = existing.clauses
1228 else:
1229 input_seq = (existing,)
1231 new_seq = apply_fn(input_seq)
1232 assert new_seq, "cannot return empty sequence"
1233 new = new_seq[0] if len(new_seq) == 1 else ElementList(new_seq)
1234 setattr(self, attrname, new)
1236 def _apply_syntax_extension_to_self(
1237 self, extension: SyntaxExtension
1238 ) -> None:
1239 raise NotImplementedError()
1241 def _get_syntax_extensions_as_dict(self) -> Mapping[_L, SyntaxExtension]:
1242 res: Dict[_L, SyntaxExtension] = {}
1243 for name, attr in self._position_map.items():
1244 value = getattr(self, attr)
1245 if value is not None:
1246 res[name] = value
1247 return res
1249 def _set_syntax_extensions(self, **extensions: SyntaxExtension) -> None:
1250 for name, value in extensions.items():
1251 setattr(self, self._position_map[name], value) # type: ignore[index] # noqa: E501
1254class SyntaxExtension(roles.SyntaxExtensionRole):
1255 """Defines a unit that when also extending from :class:`.ClauseElement`
1256 can be applied to SQLAlchemy statements :class:`.Select`,
1257 :class:`_sql.Insert`, :class:`.Update` and :class:`.Delete` making use of
1258 pre-established SQL insertion points within these constructs.
1260 .. versionadded:: 2.1
1262 .. seealso::
1264 :ref:`examples_syntax_extensions`
1266 """
1268 def append_replacing_same_type(
1269 self, existing: Sequence[ClauseElement]
1270 ) -> Sequence[ClauseElement]:
1271 """Utility function that can be used as
1272 :paramref:`_sql.Select.apply_syntax_extension_point.apply_fn`
1273 to remove any other element of the same type in existing and appending
1274 ``self`` to the list.
1276 This is equivalent to::
1278 stmt.apply_syntax_extension_point(
1279 lambda existing: [
1280 *(e for e in existing if not isinstance(e, ReplaceOfTypeExt)),
1281 self,
1282 ],
1283 "post_criteria",
1284 )
1286 .. seealso::
1288 :ref:`examples_syntax_extensions`
1290 :meth:`_sql.Select.apply_syntax_extension_point` and equivalents
1291 in :class:`_dml.Insert`, :class:`_dml.Delete`, :class:`_dml.Update`
1293 """ # noqa: E501
1294 cls = type(self)
1295 return [*(e for e in existing if not isinstance(e, cls)), self] # type: ignore[list-item] # noqa: E501
1297 def apply_to_select(self, select_stmt: Select[Unpack[_Ts]]) -> None:
1298 """Apply this :class:`.SyntaxExtension` to a :class:`.Select`"""
1299 raise NotImplementedError(
1300 f"Extension {type(self).__name__} cannot be applied to select"
1301 )
1303 def apply_to_update(self, update_stmt: Update) -> None:
1304 """Apply this :class:`.SyntaxExtension` to an :class:`.Update`"""
1305 raise NotImplementedError(
1306 f"Extension {type(self).__name__} cannot be applied to update"
1307 )
1309 def apply_to_delete(self, delete_stmt: Delete) -> None:
1310 """Apply this :class:`.SyntaxExtension` to a :class:`.Delete`"""
1311 raise NotImplementedError(
1312 f"Extension {type(self).__name__} cannot be applied to delete"
1313 )
1315 def apply_to_insert(self, insert_stmt: Insert) -> None:
1316 """Apply this :class:`.SyntaxExtension` to an :class:`_sql.Insert`"""
1317 raise NotImplementedError(
1318 f"Extension {type(self).__name__} cannot be applied to insert"
1319 )
1322class Executable(roles.StatementRole):
1323 """Mark a :class:`_expression.ClauseElement` as supporting execution.
1325 :class:`.Executable` is a superclass for all "statement" types
1326 of objects, including :func:`select`, :func:`delete`, :func:`update`,
1327 :func:`insert`, :func:`text`.
1329 """
1331 supports_execution: bool = True
1332 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT
1333 _is_default_generator: bool = False
1334 _with_options: Tuple[ExecutableOption, ...] = ()
1335 _compile_state_funcs: Tuple[
1336 Tuple[Callable[[CompileState], None], Any], ...
1337 ] = ()
1338 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]]
1340 _executable_traverse_internals = [
1341 ("_with_options", InternalTraversal.dp_executable_options),
1342 (
1343 "_compile_state_funcs",
1344 ExtendedInternalTraversal.dp_compile_state_funcs,
1345 ),
1346 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
1347 ]
1349 is_select: bool = False
1350 is_from_statement: bool = False
1351 is_update: bool = False
1352 is_insert: bool = False
1353 is_text: bool = False
1354 is_delete: bool = False
1355 is_dml: bool = False
1357 if TYPE_CHECKING:
1358 __visit_name__: str
1360 def _compile_w_cache(
1361 self,
1362 dialect: Dialect,
1363 *,
1364 compiled_cache: Optional[CompiledCacheType],
1365 column_keys: List[str],
1366 for_executemany: bool = False,
1367 schema_translate_map: Optional[SchemaTranslateMapType] = None,
1368 **kw: Any,
1369 ) -> tuple[
1370 Compiled,
1371 Sequence[BindParameter[Any]] | None,
1372 _CoreSingleExecuteParams | None,
1373 CacheStats,
1374 ]: ...
1376 def _execute_on_connection(
1377 self,
1378 connection: Connection,
1379 distilled_params: _CoreMultiExecuteParams,
1380 execution_options: CoreExecuteOptionsParameter,
1381 ) -> CursorResult[Any]: ...
1383 def _execute_on_scalar(
1384 self,
1385 connection: Connection,
1386 distilled_params: _CoreMultiExecuteParams,
1387 execution_options: CoreExecuteOptionsParameter,
1388 ) -> Any: ...
1390 @util.ro_non_memoized_property
1391 def _all_selected_columns(self) -> _SelectIterable:
1392 raise NotImplementedError()
1394 @property
1395 def _effective_plugin_target(self) -> str:
1396 return self.__visit_name__
1398 @_generative
1399 def options(self, *options: ExecutableOption) -> Self:
1400 """Apply options to this statement.
1402 In the general sense, options are any kind of Python object
1403 that can be interpreted by systems that consume the statement outside
1404 of the regular SQL compiler chain. Specifically, these options are
1405 the ORM level options that apply "eager load" and other loading
1406 behaviors to an ORM query.
1408 For background on specific kinds of options for specific kinds of
1409 statements, refer to the documentation for those option objects.
1411 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to
1412 Core statement objects towards the goal of allowing unified
1413 Core / ORM querying capabilities.
1415 .. seealso::
1417 :ref:`loading_columns` - refers to options specific to the usage
1418 of ORM queries
1420 :ref:`relationship_loader_options` - refers to options specific
1421 to the usage of ORM queries
1423 """
1424 self._with_options += tuple(
1425 coercions.expect(roles.ExecutableOptionRole, opt)
1426 for opt in options
1427 )
1428 return self
1430 @_generative
1431 def _set_compile_options(self, compile_options: CacheableOptions) -> Self:
1432 """Assign the compile options to a new value.
1434 :param compile_options: appropriate CacheableOptions structure
1436 """
1438 self._compile_options = compile_options
1439 return self
1441 @_generative
1442 def _update_compile_options(self, options: CacheableOptions) -> Self:
1443 """update the _compile_options with new keys."""
1445 assert self._compile_options is not None
1446 self._compile_options += options
1447 return self
1449 @_generative
1450 def _add_compile_state_func(
1451 self,
1452 callable_: Callable[[CompileState], None],
1453 cache_args: Any,
1454 ) -> Self:
1455 """Add a compile state function to this statement.
1457 When using the ORM only, these are callable functions that will
1458 be given the CompileState object upon compilation.
1460 A second argument cache_args is required, which will be combined with
1461 the ``__code__`` identity of the function itself in order to produce a
1462 cache key.
1464 """
1465 self._compile_state_funcs += ((callable_, cache_args),)
1466 return self
1468 @overload
1469 def execution_options(
1470 self,
1471 *,
1472 compiled_cache: Optional[CompiledCacheType] = ...,
1473 logging_token: str = ...,
1474 isolation_level: IsolationLevel = ...,
1475 no_parameters: bool = False,
1476 stream_results: bool = False,
1477 max_row_buffer: int = ...,
1478 yield_per: int = ...,
1479 driver_column_names: bool = ...,
1480 insertmanyvalues_page_size: int = ...,
1481 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
1482 populate_existing: bool = False,
1483 autoflush: bool = False,
1484 synchronize_session: SynchronizeSessionArgument = ...,
1485 dml_strategy: DMLStrategyArgument = ...,
1486 render_nulls: bool = ...,
1487 is_delete_using: bool = ...,
1488 is_update_from: bool = ...,
1489 preserve_rowcount: bool = False,
1490 **opt: Any,
1491 ) -> Self: ...
1493 @overload
1494 def execution_options(self, **opt: Any) -> Self: ...
1496 @_generative
1497 def execution_options(self, **kw: Any) -> Self:
1498 """Set non-SQL options for the statement which take effect during
1499 execution.
1501 Execution options can be set at many scopes, including per-statement,
1502 per-connection, or per execution, using methods such as
1503 :meth:`_engine.Connection.execution_options` and parameters which
1504 accept a dictionary of options such as
1505 :paramref:`_engine.Connection.execute.execution_options` and
1506 :paramref:`_orm.Session.execute.execution_options`.
1508 The primary characteristic of an execution option, as opposed to
1509 other kinds of options such as ORM loader options, is that
1510 **execution options never affect the compiled SQL of a query, only
1511 things that affect how the SQL statement itself is invoked or how
1512 results are fetched**. That is, execution options are not part of
1513 what's accommodated by SQL compilation nor are they considered part of
1514 the cached state of a statement.
1516 The :meth:`_sql.Executable.execution_options` method is
1517 :term:`generative`, as
1518 is the case for the method as applied to the :class:`_engine.Engine`
1519 and :class:`_orm.Query` objects, which means when the method is called,
1520 a copy of the object is returned, which applies the given parameters to
1521 that new copy, but leaves the original unchanged::
1523 statement = select(table.c.x, table.c.y)
1524 new_statement = statement.execution_options(my_option=True)
1526 An exception to this behavior is the :class:`_engine.Connection`
1527 object, where the :meth:`_engine.Connection.execution_options` method
1528 is explicitly **not** generative.
1530 The kinds of options that may be passed to
1531 :meth:`_sql.Executable.execution_options` and other related methods and
1532 parameter dictionaries include parameters that are explicitly consumed
1533 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not
1534 defined by SQLAlchemy, which means the methods and/or parameter
1535 dictionaries may be used for user-defined parameters that interact with
1536 custom code, which may access the parameters using methods such as
1537 :meth:`_sql.Executable.get_execution_options` and
1538 :meth:`_engine.Connection.get_execution_options`, or within selected
1539 event hooks using a dedicated ``execution_options`` event parameter
1540 such as
1541 :paramref:`_events.ConnectionEvents.before_execute.execution_options`
1542 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.::
1544 from sqlalchemy import event
1547 @event.listens_for(some_engine, "before_execute")
1548 def _process_opt(conn, statement, multiparams, params, execution_options):
1549 "run a SQL function before invoking a statement"
1551 if execution_options.get("do_special_thing", False):
1552 conn.exec_driver_sql("run_special_function()")
1554 Within the scope of options that are explicitly recognized by
1555 SQLAlchemy, most apply to specific classes of objects and not others.
1556 The most common execution options include:
1558 * :paramref:`_engine.Connection.execution_options.isolation_level` -
1559 sets the isolation level for a connection or a class of connections
1560 via an :class:`_engine.Engine`. This option is accepted only
1561 by :class:`_engine.Connection` or :class:`_engine.Engine`.
1563 * :paramref:`_engine.Connection.execution_options.stream_results` -
1564 indicates results should be fetched using a server side cursor;
1565 this option is accepted by :class:`_engine.Connection`, by the
1566 :paramref:`_engine.Connection.execute.execution_options` parameter
1567 on :meth:`_engine.Connection.execute`, and additionally by
1568 :meth:`_sql.Executable.execution_options` on a SQL statement object,
1569 as well as by ORM constructs like :meth:`_orm.Session.execute`.
1571 * :paramref:`_engine.Connection.execution_options.compiled_cache` -
1572 indicates a dictionary that will serve as the
1573 :ref:`SQL compilation cache <sql_caching>`
1574 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as
1575 well as for ORM methods like :meth:`_orm.Session.execute`.
1576 Can be passed as ``None`` to disable caching for statements.
1577 This option is not accepted by
1578 :meth:`_sql.Executable.execution_options` as it is inadvisable to
1579 carry along a compilation cache within a statement object.
1581 * :paramref:`_engine.Connection.execution_options.schema_translate_map`
1582 - a mapping of schema names used by the
1583 :ref:`Schema Translate Map <schema_translating>` feature, accepted
1584 by :class:`_engine.Connection`, :class:`_engine.Engine`,
1585 :class:`_sql.Executable`, as well as by ORM constructs
1586 like :meth:`_orm.Session.execute`.
1588 .. seealso::
1590 :meth:`_engine.Connection.execution_options`
1592 :paramref:`_engine.Connection.execute.execution_options`
1594 :paramref:`_orm.Session.execute.execution_options`
1596 :ref:`orm_queryguide_execution_options` - documentation on all
1597 ORM-specific execution options
1599 """ # noqa: E501
1600 if "isolation_level" in kw:
1601 raise exc.ArgumentError(
1602 "'isolation_level' execution option may only be specified "
1603 "on Connection.execution_options(), or "
1604 "per-engine using the isolation_level "
1605 "argument to create_engine()."
1606 )
1607 if "compiled_cache" in kw:
1608 raise exc.ArgumentError(
1609 "'compiled_cache' execution option may only be specified "
1610 "on Connection.execution_options(), not per statement."
1611 )
1612 self._execution_options = self._execution_options.union(kw)
1613 return self
1615 def get_execution_options(self) -> _ExecuteOptions:
1616 """Get the non-SQL options which will take effect during execution.
1618 .. seealso::
1620 :meth:`.Executable.execution_options`
1621 """
1622 return self._execution_options
1625class ExecutableStatement(Executable):
1626 """Executable subclass that implements a lightweight version of ``params``
1627 that avoids a full cloned traverse.
1629 .. versionadded:: 2.1
1631 """
1633 _params: util.immutabledict[str, Any] = EMPTY_DICT
1635 _executable_traverse_internals = (
1636 Executable._executable_traverse_internals
1637 + [("_params", InternalTraversal.dp_params)]
1638 )
1640 @_generative
1641 def params(
1642 self,
1643 __optionaldict: _CoreSingleExecuteParams | None = None,
1644 /,
1645 **kwargs: Any,
1646 ) -> Self:
1647 """Return a copy with the provided bindparam values.
1649 Returns a copy of this Executable with bindparam values set
1650 to the given dictionary::
1652 >>> clause = column("x") + bindparam("foo")
1653 >>> print(clause.compile().params)
1654 {'foo': None}
1655 >>> print(clause.params({"foo": 7}).compile().params)
1656 {'foo': 7}
1658 """
1659 if __optionaldict:
1660 kwargs.update(__optionaldict)
1661 self._params = (
1662 util.immutabledict(kwargs)
1663 if not self._params
1664 else self._params | kwargs
1665 )
1666 return self
1669class SchemaEventTarget(event.EventTarget):
1670 """Base class for elements that are the targets of :class:`.DDLEvents`
1671 events.
1673 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
1675 """
1677 dispatch: dispatcher[SchemaEventTarget]
1679 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
1680 """Associate with this SchemaEvent's parent object."""
1682 def _set_parent_with_dispatch(
1683 self, parent: SchemaEventTarget, **kw: Any
1684 ) -> None:
1685 self.dispatch.before_parent_attach(self, parent)
1686 self._set_parent(parent, **kw)
1687 self.dispatch.after_parent_attach(self, parent)
1690class SchemaVisitable(SchemaEventTarget, visitors.Visitable):
1691 """Base class for elements that are targets of a :class:`.SchemaVisitor`.
1693 .. versionadded:: 2.0.41
1695 """
1698class SchemaVisitor(ClauseVisitor):
1699 """Define the visiting for ``SchemaItem`` and more
1700 generally ``SchemaVisitable`` objects.
1702 """
1704 __traverse_options__: Dict[str, Any] = {"schema_visitor": True}
1707class _SentinelDefaultCharacterization(Enum):
1708 NONE = "none"
1709 UNKNOWN = "unknown"
1710 CLIENTSIDE = "clientside"
1711 SENTINEL_DEFAULT = "sentinel_default"
1712 SERVERSIDE = "serverside"
1713 IDENTITY = "identity"
1714 SEQUENCE = "sequence"
1715 MONOTONIC_FUNCTION = "monotonic"
1718class _SentinelColumnCharacterization(NamedTuple):
1719 columns: Optional[Sequence[Column[Any]]] = None
1720 is_explicit: bool = False
1721 is_autoinc: bool = False
1722 default_characterization: _SentinelDefaultCharacterization = (
1723 _SentinelDefaultCharacterization.NONE
1724 )
1727_COLKEY = TypeVar("_COLKEY", Union[None, str], str)
1729_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True)
1730_COL = TypeVar("_COL", bound="ColumnElement[Any]")
1733class _ColumnMetrics(Generic[_COL_co]):
1734 __slots__ = ("column",)
1736 column: _COL_co
1738 def __init__(
1739 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co
1740 ) -> None:
1741 self.column = col
1743 # proxy_index being non-empty means it was initialized.
1744 # so we need to update it
1745 pi = collection._proxy_index
1746 if pi:
1747 for eps_col in col._expanded_proxy_set:
1748 pi[eps_col].add(self)
1750 def get_expanded_proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
1751 return self.column._expanded_proxy_set
1753 def dispose(self, collection: ColumnCollection[_COLKEY, _COL_co]) -> None:
1754 pi = collection._proxy_index
1755 if not pi:
1756 return
1757 for col in self.column._expanded_proxy_set:
1758 colset = pi.get(col, None)
1759 if colset:
1760 colset.discard(self)
1761 if colset is not None and not colset:
1762 del pi[col]
1764 def embedded(
1765 self,
1766 target_set: Union[
1767 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]]
1768 ],
1769 ) -> bool:
1770 expanded_proxy_set = self.column._expanded_proxy_set
1771 for t in target_set.difference(expanded_proxy_set):
1772 if not expanded_proxy_set.intersection(_expand_cloned([t])):
1773 return False
1774 return True
1777class ColumnCollection(Generic[_COLKEY, _COL_co]):
1778 """Base class for collection of :class:`_expression.ColumnElement`
1779 instances, typically for :class:`_sql.FromClause` objects.
1781 The :class:`_sql.ColumnCollection` object is most commonly available
1782 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection
1783 on the :class:`_schema.Table` object, introduced at
1784 :ref:`metadata_tables_and_columns`.
1786 The :class:`_expression.ColumnCollection` has both mapping- and sequence-
1787 like behaviors. A :class:`_expression.ColumnCollection` usually stores
1788 :class:`_schema.Column` objects, which are then accessible both via mapping
1789 style access as well as attribute access style.
1791 To access :class:`_schema.Column` objects using ordinary attribute-style
1792 access, specify the name like any other object attribute, such as below
1793 a column named ``employee_name`` is accessed::
1795 >>> employee_table.c.employee_name
1797 To access columns that have names with special characters or spaces,
1798 index-style access is used, such as below which illustrates a column named
1799 ``employee ' payment`` is accessed::
1801 >>> employee_table.c["employee ' payment"]
1803 As the :class:`_sql.ColumnCollection` object provides a Python dictionary
1804 interface, common dictionary method names like
1805 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`,
1806 and :meth:`_sql.ColumnCollection.items` are available, which means that
1807 database columns that are keyed under these names also need to use indexed
1808 access::
1810 >>> employee_table.c["values"]
1813 The name for which a :class:`_schema.Column` would be present is normally
1814 that of the :paramref:`_schema.Column.key` parameter. In some contexts,
1815 such as a :class:`_sql.Select` object that uses a label style set
1816 using the :meth:`_sql.Select.set_label_style` method, a column of a certain
1817 key may instead be represented under a particular label name such
1818 as ``tablename_columnname``::
1820 >>> from sqlalchemy import select, column, table
1821 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
1822 >>> t = table("t", column("c"))
1823 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
1824 >>> subq = stmt.subquery()
1825 >>> subq.c.t_c
1826 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c>
1828 :class:`.ColumnCollection` also indexes the columns in order and allows
1829 them to be accessible by their integer position::
1831 >>> cc[0]
1832 Column('x', Integer(), table=None)
1833 >>> cc[1]
1834 Column('y', Integer(), table=None)
1836 .. versionadded:: 1.4 :class:`_expression.ColumnCollection`
1837 allows integer-based
1838 index access to the collection.
1840 Iterating the collection yields the column expressions in order::
1842 >>> list(cc)
1843 [Column('x', Integer(), table=None),
1844 Column('y', Integer(), table=None)]
1846 The :class:`_expression.ColumnCollection` base class is read-only.
1847 For mutation operations, the :class:`.WriteableColumnCollection` subclass
1848 provides methods such as :meth:`.WriteableColumnCollection.add`.
1849 A special subclass :class:`.DedupeColumnCollection` exists which
1850 maintains SQLAlchemy's older behavior of not allowing duplicates; this
1851 collection is used for schema level objects like :class:`_schema.Table`
1852 and :class:`.PrimaryKeyConstraint` where this deduping is helpful.
1853 The :class:`.DedupeColumnCollection` class also has additional mutation
1854 methods as the schema constructs have more use cases that require removal
1855 and replacement of columns.
1857 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection`
1858 now stores duplicate
1859 column keys as well as the same column in multiple positions. The
1860 :class:`.DedupeColumnCollection` class is added to maintain the
1861 former behavior in those cases where deduplication as well as
1862 additional replace/remove operations are needed.
1864 .. versionchanged:: 2.1 :class:`_expression.ColumnCollection` is now
1865 a read-only base class. Mutation operations are available through
1866 :class:`.WriteableColumnCollection` and :class:`.DedupeColumnCollection`
1867 subclasses.
1870 """
1872 __slots__ = ("_collection", "_index", "_colset", "_proxy_index")
1874 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]]
1875 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
1876 _colset: Set[_COL_co]
1877 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]]
1879 def __init__(self) -> None:
1880 raise TypeError(
1881 "ColumnCollection is an abstract base class and cannot be "
1882 "instantiated directly. Use WriteableColumnCollection or "
1883 "DedupeColumnCollection instead."
1884 )
1886 @util.preload_module("sqlalchemy.sql.elements")
1887 def __clause_element__(self) -> ClauseList:
1888 elements = util.preloaded.sql_elements
1890 return elements.ClauseList(
1891 _literal_as_text_role=roles.ColumnsClauseRole,
1892 group=False,
1893 *self._all_columns,
1894 )
1896 @property
1897 def _all_columns(self) -> List[_COL_co]:
1898 return [col for (_, col, _) in self._collection]
1900 def keys(self) -> List[_COLKEY]:
1901 """Return a sequence of string key names for all columns in this
1902 collection."""
1903 return [k for (k, _, _) in self._collection]
1905 def values(self) -> List[_COL_co]:
1906 """Return a sequence of :class:`_sql.ColumnClause` or
1907 :class:`_schema.Column` objects for all columns in this
1908 collection."""
1909 return [col for (_, col, _) in self._collection]
1911 def items(self) -> List[Tuple[_COLKEY, _COL_co]]:
1912 """Return a sequence of (key, column) tuples for all columns in this
1913 collection each consisting of a string key name and a
1914 :class:`_sql.ColumnClause` or
1915 :class:`_schema.Column` object.
1916 """
1918 return [(k, col) for (k, col, _) in self._collection]
1920 def __bool__(self) -> bool:
1921 return bool(self._collection)
1923 def __len__(self) -> int:
1924 return len(self._collection)
1926 def __iter__(self) -> Iterator[_COL_co]:
1927 # turn to a list first to maintain over a course of changes
1928 return iter([col for _, col, _ in self._collection])
1930 @overload
1931 def __getitem__(self, key: Union[str, int]) -> _COL_co: ...
1933 @overload
1934 def __getitem__(
1935 self, key: Union[Tuple[Union[str, int], ...], slice]
1936 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1938 def __getitem__(
1939 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]]
1940 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]:
1941 try:
1942 if isinstance(key, (tuple, slice)):
1943 if isinstance(key, slice):
1944 cols = (
1945 (sub_key, col)
1946 for (sub_key, col, _) in self._collection[key]
1947 )
1948 else:
1949 cols = (self._index[sub_key] for sub_key in key)
1951 return WriteableColumnCollection(cols).as_readonly()
1952 else:
1953 return self._index[key][1]
1954 except KeyError as err:
1955 if isinstance(err.args[0], int):
1956 raise IndexError(err.args[0]) from err
1957 else:
1958 raise
1960 def __getattr__(self, key: str) -> _COL_co:
1961 try:
1962 return self._index[key][1]
1963 except KeyError as err:
1964 raise AttributeError(key) from err
1966 def __contains__(self, key: str) -> bool:
1967 if key not in self._index:
1968 if not isinstance(key, str):
1969 raise exc.ArgumentError(
1970 "__contains__ requires a string argument"
1971 )
1972 return False
1973 else:
1974 return True
1976 def compare(self, other: ColumnCollection[_COLKEY, _COL_co]) -> bool:
1977 """Compare this :class:`_expression.ColumnCollection` to another
1978 based on the names of the keys"""
1980 for l, r in zip_longest(self, other):
1981 if l is not r:
1982 return False
1983 else:
1984 return True
1986 def __eq__(self, other: Any) -> bool:
1987 return self.compare(other)
1989 @overload
1990 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ...
1992 @overload
1993 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ...
1995 def get(
1996 self, key: str, default: Optional[_COL] = None
1997 ) -> Optional[Union[_COL_co, _COL]]:
1998 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object
1999 based on a string key name from this
2000 :class:`_expression.ColumnCollection`."""
2002 if key in self._index:
2003 return self._index[key][1]
2004 else:
2005 return default
2007 def __str__(self) -> str:
2008 return "%s(%s)" % (
2009 self.__class__.__name__,
2010 ", ".join(str(c) for c in self),
2011 )
2013 # https://github.com/python/mypy/issues/4266
2014 __hash__: Optional[int] = None # type: ignore
2016 def contains_column(self, col: ColumnElement[Any]) -> bool:
2017 """Checks if a column object exists in this collection"""
2018 if col not in self._colset:
2019 if isinstance(col, str):
2020 raise exc.ArgumentError(
2021 "contains_column cannot be used with string arguments. "
2022 "Use ``col_name in table.c`` instead."
2023 )
2024 return False
2025 else:
2026 return True
2028 def _as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
2029 raise NotImplementedError()
2031 def corresponding_column(
2032 self, column: _COL, require_embedded: bool = False
2033 ) -> Optional[Union[_COL, _COL_co]]:
2034 """Given a :class:`_expression.ColumnElement`, return the exported
2035 :class:`_expression.ColumnElement` object from this
2036 :class:`_expression.ColumnCollection`
2037 which corresponds to that original :class:`_expression.ColumnElement`
2038 via a common
2039 ancestor column.
2041 :param column: the target :class:`_expression.ColumnElement`
2042 to be matched.
2044 :param require_embedded: only return corresponding columns for
2045 the given :class:`_expression.ColumnElement`, if the given
2046 :class:`_expression.ColumnElement`
2047 is actually present within a sub-element
2048 of this :class:`_expression.Selectable`.
2049 Normally the column will match if
2050 it merely shares a common ancestor with one of the exported
2051 columns of this :class:`_expression.Selectable`.
2053 .. seealso::
2055 :meth:`_expression.Selectable.corresponding_column`
2056 - invokes this method
2057 against the collection returned by
2058 :attr:`_expression.Selectable.exported_columns`.
2060 .. versionchanged:: 1.4 the implementation for ``corresponding_column``
2061 was moved onto the :class:`_expression.ColumnCollection` itself.
2063 """
2064 raise NotImplementedError()
2067class WriteableColumnCollection(ColumnCollection[_COLKEY, _COL_co]):
2068 """A :class:`_sql.ColumnCollection` that allows mutation operations.
2070 This is the writable form of :class:`_sql.ColumnCollection` that
2071 implements methods such as :meth:`.add`, :meth:`.remove`, :meth:`.update`,
2072 and :meth:`.clear`.
2074 This class is used internally for building column collections during
2075 construction of SQL constructs. For schema-level objects that require
2076 deduplication behavior, use :class:`.DedupeColumnCollection`.
2078 .. versionadded:: 2.1
2080 """
2082 __slots__ = ()
2084 def __init__(
2085 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None
2086 ):
2087 object.__setattr__(self, "_colset", set())
2088 object.__setattr__(self, "_index", {})
2089 object.__setattr__(
2090 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
2091 )
2092 object.__setattr__(self, "_collection", [])
2093 if columns:
2094 self._initial_populate(columns)
2096 def _initial_populate(
2097 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
2098 ) -> None:
2099 self._populate_separate_keys(iter_)
2101 def _populate_separate_keys(
2102 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
2103 ) -> None:
2104 """populate from an iterator of (key, column)"""
2106 self._collection[:] = collection = [
2107 (k, c, _ColumnMetrics(self, c)) for k, c in iter_
2108 ]
2109 self._colset.update(c._deannotate() for _, c, _ in collection)
2110 self._index.update(
2111 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)}
2112 )
2113 self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
2115 def __getstate__(self) -> Dict[str, Any]:
2116 return {
2117 "_collection": [(k, c) for k, c, _ in self._collection],
2118 "_index": self._index,
2119 }
2121 def __setstate__(self, state: Dict[str, Any]) -> None:
2122 object.__setattr__(self, "_index", state["_index"])
2123 object.__setattr__(
2124 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
2125 )
2126 object.__setattr__(
2127 self,
2128 "_collection",
2129 [
2130 (k, c, _ColumnMetrics(self, c))
2131 for (k, c) in state["_collection"]
2132 ],
2133 )
2134 object.__setattr__(
2135 self, "_colset", {col for k, col, _ in self._collection}
2136 )
2138 def add(
2139 self,
2140 column: ColumnElement[Any],
2141 key: Optional[_COLKEY] = None,
2142 ) -> None:
2143 """Add a column to this :class:`_sql.WriteableColumnCollection`.
2145 .. note::
2147 This method is **not normally used by user-facing code**, as the
2148 :class:`_sql.WriteableColumnCollection` is usually part of an
2149 existing object such as a :class:`_schema.Table`. To add a
2150 :class:`_schema.Column` to an existing :class:`_schema.Table`
2151 object, use the :meth:`_schema.Table.append_column` method.
2153 """
2154 colkey: _COLKEY
2156 if key is None:
2157 colkey = column.key # type: ignore
2158 else:
2159 colkey = key
2161 l = len(self._collection)
2163 # don't really know how this part is supposed to work w/ the
2164 # covariant thing
2166 _column = cast(_COL_co, column)
2168 self._collection.append(
2169 (colkey, _column, _ColumnMetrics(self, _column))
2170 )
2171 self._colset.add(_column._deannotate())
2173 self._index[l] = (colkey, _column)
2174 if colkey not in self._index:
2175 self._index[colkey] = (colkey, _column)
2177 def _as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
2178 return ReadOnlyColumnCollection(self)
2180 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
2181 """Return a "read only" form of this
2182 :class:`_sql.WriteableColumnCollection`."""
2184 return self._as_readonly()
2186 def _init_proxy_index(self) -> None:
2187 """populate the "proxy index", if empty.
2189 proxy index is added in 2.0 to provide more efficient operation
2190 for the corresponding_column() method.
2192 For reasons of both time to construct new .c collections as well as
2193 memory conservation for large numbers of large .c collections, the
2194 proxy_index is only filled if corresponding_column() is called. once
2195 filled it stays that way, and new _ColumnMetrics objects created after
2196 that point will populate it with new data. Note this case would be
2197 unusual, if not nonexistent, as it means a .c collection is being
2198 mutated after corresponding_column() were used, however it is tested in
2199 test/base/test_utils.py.
2201 """
2202 pi = self._proxy_index
2203 if pi:
2204 return
2206 for _, _, metrics in self._collection:
2207 eps = metrics.column._expanded_proxy_set
2209 for eps_col in eps:
2210 pi[eps_col].add(metrics)
2212 def corresponding_column(
2213 self, column: _COL, require_embedded: bool = False
2214 ) -> Optional[Union[_COL, _COL_co]]:
2215 """Given a :class:`_expression.ColumnElement`, return the exported
2216 :class:`_expression.ColumnElement` object from this
2217 :class:`_expression.ColumnCollection`
2218 which corresponds to that original :class:`_expression.ColumnElement`
2219 via a common
2220 ancestor column.
2222 See :meth:`.ColumnCollection.corresponding_column` for parameter
2223 information.
2225 """
2226 # TODO: cython candidate
2228 # don't dig around if the column is locally present
2229 if column in self._colset:
2230 return column
2232 selected_intersection, selected_metrics = None, None
2233 target_set = column.proxy_set
2235 pi = self._proxy_index
2236 if not pi:
2237 self._init_proxy_index()
2239 for current_metrics in (
2240 mm for ts in target_set if ts in pi for mm in pi[ts]
2241 ):
2242 if not require_embedded or current_metrics.embedded(target_set):
2243 if selected_metrics is None:
2244 # no corresponding column yet, pick this one.
2245 selected_metrics = current_metrics
2246 continue
2248 current_intersection = target_set.intersection(
2249 current_metrics.column._expanded_proxy_set
2250 )
2251 if selected_intersection is None:
2252 selected_intersection = target_set.intersection(
2253 selected_metrics.column._expanded_proxy_set
2254 )
2256 if len(current_intersection) > len(selected_intersection):
2257 # 'current' has a larger field of correspondence than
2258 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x
2259 # matches a1.c.x->table.c.x better than
2260 # selectable.c.x->table.c.x does.
2262 selected_metrics = current_metrics
2263 selected_intersection = current_intersection
2264 elif current_intersection == selected_intersection:
2265 # they have the same field of correspondence. see
2266 # which proxy_set has fewer columns in it, which
2267 # indicates a closer relationship with the root
2268 # column. Also take into account the "weight"
2269 # attribute which CompoundSelect() uses to give
2270 # higher precedence to columns based on vertical
2271 # position in the compound statement, and discard
2272 # columns that have no reference to the target
2273 # column (also occurs with CompoundSelect)
2275 selected_col_distance = sum(
2276 [
2277 sc._annotations.get("weight", 1)
2278 for sc in (
2279 selected_metrics.column._uncached_proxy_list()
2280 )
2281 if sc.shares_lineage(column)
2282 ],
2283 )
2284 current_col_distance = sum(
2285 [
2286 sc._annotations.get("weight", 1)
2287 for sc in (
2288 current_metrics.column._uncached_proxy_list()
2289 )
2290 if sc.shares_lineage(column)
2291 ],
2292 )
2293 if current_col_distance < selected_col_distance:
2294 selected_metrics = current_metrics
2295 selected_intersection = current_intersection
2297 return selected_metrics.column if selected_metrics else None
2300_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]")
2303class DedupeColumnCollection(WriteableColumnCollection[str, _NAMEDCOL]):
2304 """A :class:`_expression.ColumnCollection`
2305 that maintains deduplicating behavior.
2307 This is useful by schema level objects such as :class:`_schema.Table` and
2308 :class:`.PrimaryKeyConstraint`. The collection includes more
2309 sophisticated mutator methods as well to suit schema objects which
2310 require mutable column collections.
2312 .. versionadded:: 1.4
2314 """
2316 def add( # type: ignore[override]
2317 self,
2318 column: _NAMEDCOL,
2319 key: Optional[str] = None,
2320 *,
2321 index: Optional[int] = None,
2322 ) -> None:
2323 if key is not None and column.key != key:
2324 raise exc.ArgumentError(
2325 "DedupeColumnCollection requires columns be under "
2326 "the same key as their .key"
2327 )
2328 key = column.key
2330 if key is None:
2331 raise exc.ArgumentError(
2332 "Can't add unnamed column to column collection"
2333 )
2335 if key in self._index:
2336 existing = self._index[key][1]
2338 if existing is column:
2339 return
2341 self.replace(column, index=index)
2343 # pop out memoized proxy_set as this
2344 # operation may very well be occurring
2345 # in a _make_proxy operation
2346 util.memoized_property.reset(column, "proxy_set")
2347 else:
2348 self._append_new_column(key, column, index=index)
2350 def _append_new_column(
2351 self, key: str, named_column: _NAMEDCOL, *, index: Optional[int] = None
2352 ) -> None:
2353 collection_length = len(self._collection)
2355 if index is None:
2356 l = collection_length
2357 else:
2358 if index < 0:
2359 index = max(0, collection_length + index)
2360 l = index
2362 if index is None:
2363 self._collection.append(
2364 (key, named_column, _ColumnMetrics(self, named_column))
2365 )
2366 else:
2367 self._collection.insert(
2368 index, (key, named_column, _ColumnMetrics(self, named_column))
2369 )
2371 self._colset.add(named_column._deannotate())
2373 if index is not None:
2374 for idx in reversed(range(index, collection_length)):
2375 self._index[idx + 1] = self._index[idx]
2377 self._index[l] = (key, named_column)
2378 self._index[key] = (key, named_column)
2380 def _populate_separate_keys(
2381 self, iter_: Iterable[Tuple[str, _NAMEDCOL]]
2382 ) -> None:
2383 """populate from an iterator of (key, column)"""
2384 cols = list(iter_)
2386 replace_col = []
2387 for k, col in cols:
2388 if col.key != k:
2389 raise exc.ArgumentError(
2390 "DedupeColumnCollection requires columns be under "
2391 "the same key as their .key"
2392 )
2393 if col.name in self._index and col.key != col.name:
2394 replace_col.append(col)
2395 elif col.key in self._index:
2396 replace_col.append(col)
2397 else:
2398 self._index[k] = (k, col)
2399 self._collection.append((k, col, _ColumnMetrics(self, col)))
2400 self._colset.update(c._deannotate() for (k, c, _) in self._collection)
2402 self._index.update(
2403 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection)
2404 )
2405 for col in replace_col:
2406 self.replace(col)
2408 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None:
2409 self._populate_separate_keys((col.key, col) for col in iter_)
2411 def remove(self, column: _NAMEDCOL) -> None:
2412 if column not in self._colset:
2413 raise ValueError(
2414 "Can't remove column %r; column is not in this collection"
2415 % column
2416 )
2417 del self._index[column.key]
2418 self._colset.remove(column)
2419 self._collection[:] = [
2420 (k, c, metrics)
2421 for (k, c, metrics) in self._collection
2422 if c is not column
2423 ]
2424 for metrics in self._proxy_index.get(column, ()):
2425 metrics.dispose(self)
2427 self._index.update(
2428 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2429 )
2430 # delete higher index
2431 del self._index[len(self._collection)]
2433 def replace(
2434 self,
2435 column: _NAMEDCOL,
2436 *,
2437 extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
2438 index: Optional[int] = None,
2439 ) -> None:
2440 """add the given column to this collection, removing unaliased
2441 versions of this column as well as existing columns with the
2442 same key.
2444 e.g.::
2446 t = Table("sometable", metadata, Column("col1", Integer))
2447 t.columns.replace(Column("col1", Integer, key="columnone"))
2449 will remove the original 'col1' from the collection, and add
2450 the new column under the name 'columnname'.
2452 Used by schema.Column to override columns during table reflection.
2454 """
2456 if extra_remove:
2457 remove_col = set(extra_remove)
2458 else:
2459 remove_col = set()
2460 # remove up to two columns based on matches of name as well as key
2461 if column.name in self._index and column.key != column.name:
2462 other = self._index[column.name][1]
2463 if other.name == other.key:
2464 remove_col.add(other)
2466 if column.key in self._index:
2467 remove_col.add(self._index[column.key][1])
2469 if not remove_col:
2470 self._append_new_column(column.key, column, index=index)
2471 return
2472 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
2473 replace_index = None
2475 for idx, (k, col, metrics) in enumerate(self._collection):
2476 if col in remove_col:
2477 if replace_index is None:
2478 replace_index = idx
2479 new_cols.append(
2480 (column.key, column, _ColumnMetrics(self, column))
2481 )
2482 else:
2483 new_cols.append((k, col, metrics))
2485 if remove_col:
2486 self._colset.difference_update(remove_col)
2488 for rc in remove_col:
2489 for metrics in self._proxy_index.get(rc, ()):
2490 metrics.dispose(self)
2492 if replace_index is None:
2493 if index is not None:
2494 new_cols.insert(
2495 index, (column.key, column, _ColumnMetrics(self, column))
2496 )
2498 else:
2499 new_cols.append(
2500 (column.key, column, _ColumnMetrics(self, column))
2501 )
2502 elif index is not None:
2503 to_move = new_cols[replace_index]
2504 effective_positive_index = (
2505 index if index >= 0 else max(0, len(new_cols) + index)
2506 )
2507 new_cols.insert(index, to_move)
2508 if replace_index > effective_positive_index:
2509 del new_cols[replace_index + 1]
2510 else:
2511 del new_cols[replace_index]
2513 self._colset.add(column._deannotate())
2514 self._collection[:] = new_cols
2516 self._index.clear()
2518 self._index.update(
2519 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2520 )
2521 self._index.update({k: (k, col) for (k, col, _) in self._collection})
2524class ReadOnlyColumnCollection(
2525 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co]
2526):
2527 __slots__ = ("_parent",)
2529 _parent: WriteableColumnCollection[_COLKEY, _COL_co]
2531 def __init__(
2532 self, collection: WriteableColumnCollection[_COLKEY, _COL_co]
2533 ):
2534 object.__setattr__(self, "_parent", collection)
2535 object.__setattr__(self, "_index", collection._index)
2536 object.__setattr__(self, "_collection", collection._collection)
2537 object.__setattr__(self, "_colset", collection._colset)
2538 object.__setattr__(self, "_proxy_index", collection._proxy_index)
2540 def _as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
2541 return self
2543 def __getstate__(self) -> Dict[str, ColumnCollection[_COLKEY, _COL_co]]:
2544 return {"_parent": self._parent}
2546 def __setstate__(self, state: Dict[str, Any]) -> None:
2547 parent = state["_parent"]
2548 self.__init__(parent) # type: ignore
2550 def corresponding_column(
2551 self, column: _COL, require_embedded: bool = False
2552 ) -> Optional[Union[_COL, _COL_co]]:
2553 """Given a :class:`_expression.ColumnElement`, return the exported
2554 :class:`_expression.ColumnElement` object from this
2555 :class:`_expression.ColumnCollection`
2556 which corresponds to that original :class:`_expression.ColumnElement`
2557 via a common
2558 ancestor column.
2560 See :meth:`.ColumnCollection.corresponding_column` for parameter
2561 information.
2563 """
2564 return self._parent.corresponding_column(column, require_embedded)
2567class ColumnSet(util.OrderedSet["ColumnClause[Any]"]):
2568 def contains_column(self, col: ColumnClause[Any]) -> bool:
2569 return col in self
2571 def extend(self, cols: Iterable[Any]) -> None:
2572 for col in cols:
2573 self.add(col)
2575 def __eq__(self, other):
2576 l = []
2577 for c in other:
2578 for local in self:
2579 if c.shares_lineage(local):
2580 l.append(c == local)
2581 return elements.and_(*l)
2583 def __hash__(self) -> int: # type: ignore[override]
2584 return hash(tuple(x for x in self))
2587def _entity_namespace(
2588 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2589) -> _EntityNamespace:
2590 """Return the nearest .entity_namespace for the given entity.
2592 If not immediately available, does an iterate to find a sub-element
2593 that has one, if any.
2595 """
2596 try:
2597 return cast(_HasEntityNamespace, entity).entity_namespace
2598 except AttributeError:
2599 for elem in visitors.iterate(cast(ExternallyTraversible, entity)):
2600 if _is_has_entity_namespace(elem):
2601 return elem.entity_namespace
2602 else:
2603 raise
2606@overload
2607def _entity_namespace_key(
2608 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2609 key: str,
2610) -> SQLCoreOperations[Any]: ...
2613@overload
2614def _entity_namespace_key(
2615 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2616 key: str,
2617 default: _NoArg,
2618) -> SQLCoreOperations[Any]: ...
2621@overload
2622def _entity_namespace_key(
2623 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2624 key: str,
2625 default: _T,
2626) -> Union[SQLCoreOperations[Any], _T]: ...
2629def _entity_namespace_key(
2630 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2631 key: str,
2632 default: Union[SQLCoreOperations[Any], _T, _NoArg] = NO_ARG,
2633) -> Union[SQLCoreOperations[Any], _T]:
2634 """Return an entry from an entity_namespace.
2637 Raises :class:`_exc.InvalidRequestError` rather than attribute error
2638 on not found.
2640 """
2642 try:
2643 ns = _entity_namespace(entity)
2644 if default is not NO_ARG:
2645 return getattr(ns, key, default)
2646 else:
2647 return getattr(ns, key) # type: ignore
2648 except AttributeError as err:
2649 raise exc.InvalidRequestError(
2650 'Entity namespace for "%s" has no property "%s"' % (entity, key)
2651 ) from err
2654def _entity_namespace_key_search_all(
2655 entities: Collection[Any],
2656 key: str,
2657) -> SQLCoreOperations[Any]:
2658 """Search multiple entities for a key, raise if ambiguous or not found.
2660 This is used by filter_by() to search across all FROM clause entities
2661 when a single entity doesn't have the requested attribute.
2663 .. versionadded:: 2.1
2665 Raises:
2666 AmbiguousColumnError: If key exists in multiple entities
2667 InvalidRequestError: If key doesn't exist in any entity
2668 """
2670 match_: SQLCoreOperations[Any] | None = None
2672 for entity in entities:
2673 ns = _entity_namespace(entity)
2674 # Check if the attribute exists
2675 if hasattr(ns, key):
2676 if match_ is not None:
2677 entity_desc = ", ".join(str(e) for e in list(entities)[:3])
2678 if len(entities) > 3:
2679 entity_desc += f", ... ({len(entities)} total)"
2680 raise exc.AmbiguousColumnError(
2681 f'Attribute name "{key}" is ambiguous; it exists in '
2682 f"multiple FROM clause entities ({entity_desc}). "
2683 f"Use filter() with explicit column references instead "
2684 f"of filter_by()."
2685 )
2686 match_ = getattr(ns, key)
2688 if match_ is None:
2689 # No entity has this attribute
2690 entity_desc = ", ".join(str(e) for e in list(entities)[:3])
2691 if len(entities) > 3:
2692 entity_desc += f", ... ({len(entities)} total)"
2693 raise exc.InvalidRequestError(
2694 f'None of the FROM clause entities have a property "{key}". '
2695 f"Searched entities: {entity_desc}"
2696 )
2698 return match_