Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/base.py: 48%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# sql/base.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
9"""Foundational utilities common to many sql modules."""
12from __future__ import annotations
14import collections
15from enum import Enum
16import itertools
17from itertools import zip_longest
18import operator
19import re
20from typing import Any
21from typing import Callable
22from typing import cast
23from typing import Collection
24from typing import Dict
25from typing import Final
26from typing import FrozenSet
27from typing import Generator
28from typing import Generic
29from typing import Iterable
30from typing import Iterator
31from typing import List
32from typing import Mapping
33from typing import MutableMapping
34from typing import NamedTuple
35from typing import NoReturn
36from typing import Optional
37from typing import overload
38from typing import Protocol
39from typing import Sequence
40from typing import Set
41from typing import Tuple
42from typing import Type
43from typing import TYPE_CHECKING
44from typing import TypeGuard
45from typing import TypeVar
46from typing import Union
48from . import roles
49from . import visitors
50from .cache_key import HasCacheKey # noqa
51from .cache_key import MemoizedHasCacheKey # noqa
52from .traversals import HasCopyInternals # noqa
53from .visitors import ClauseVisitor
54from .visitors import ExtendedInternalTraversal
55from .visitors import ExternallyTraversible
56from .visitors import InternalTraversal
57from .. import event
58from .. import exc
59from .. import util
60from ..util import EMPTY_DICT
61from ..util import HasMemoized as HasMemoized
62from ..util import hybridmethod
63from ..util import warn_deprecated
64from ..util.typing import Self
65from ..util.typing import TypeVarTuple
66from ..util.typing import Unpack
68if TYPE_CHECKING:
69 from . import coercions
70 from . import elements
71 from . import type_api
72 from ._orm_types import DMLStrategyArgument
73 from ._orm_types import SynchronizeSessionArgument
74 from ._typing import _CLE
75 from .cache_key import CacheKey
76 from .compiler import SQLCompiler
77 from .dml import Delete
78 from .dml import Insert
79 from .dml import Update
80 from .elements import BindParameter
81 from .elements import ClauseElement
82 from .elements import ClauseList
83 from .elements import ColumnClause # noqa
84 from .elements import ColumnElement
85 from .elements import NamedColumn
86 from .elements import SQLCoreOperations
87 from .elements import TextClause
88 from .schema import Column
89 from .schema import DefaultGenerator
90 from .selectable import _JoinTargetElement
91 from .selectable import _SelectIterable
92 from .selectable import FromClause
93 from .selectable import Select
94 from .visitors import anon_map
95 from ..engine import Connection
96 from ..engine import CursorResult
97 from ..engine.interfaces import _CoreMultiExecuteParams
98 from ..engine.interfaces import _CoreSingleExecuteParams
99 from ..engine.interfaces import _ExecuteOptions
100 from ..engine.interfaces import _ImmutableExecuteOptions
101 from ..engine.interfaces import CacheStats
102 from ..engine.interfaces import Compiled
103 from ..engine.interfaces import CompiledCacheType
104 from ..engine.interfaces import CoreExecuteOptionsParameter
105 from ..engine.interfaces import Dialect
106 from ..engine.interfaces import IsolationLevel
107 from ..engine.interfaces import SchemaTranslateMapType
108 from ..event import dispatcher
110if not TYPE_CHECKING:
111 coercions = None # noqa
112 elements = None # noqa
113 type_api = None # noqa
116_Ts = TypeVarTuple("_Ts")
119class _NoArg(Enum):
120 NO_ARG = 0
122 def __repr__(self):
123 return f"_NoArg.{self.name}"
126NO_ARG: Final = _NoArg.NO_ARG
129class _NoneName(Enum):
130 NONE_NAME = 0
131 """indicate a 'deferred' name that was ultimately the value None."""
134_NONE_NAME: Final = _NoneName.NONE_NAME
136_T = TypeVar("_T", bound=Any)
138_Fn = TypeVar("_Fn", bound=Callable[..., Any])
140_AmbiguousTableNameMap = MutableMapping[str, str]
143class _DefaultDescriptionTuple(NamedTuple):
144 arg: Any
145 is_scalar: Optional[bool]
146 is_callable: Optional[bool]
147 is_sentinel: Optional[bool]
149 @classmethod
150 def _from_column_default(
151 cls, default: Optional[DefaultGenerator]
152 ) -> _DefaultDescriptionTuple:
153 return (
154 _DefaultDescriptionTuple(
155 default.arg, # type: ignore
156 default.is_scalar,
157 default.is_callable,
158 default.is_sentinel,
159 )
160 if default
161 and (
162 default.has_arg
163 or (not default.for_update and default.is_sentinel)
164 )
165 else _DefaultDescriptionTuple(None, None, None, None)
166 )
169_never_select_column: operator.attrgetter[Any] = operator.attrgetter(
170 "_omit_from_statements"
171)
174class _EntityNamespace(Protocol):
175 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ...
178class _HasEntityNamespace(Protocol):
179 @util.ro_non_memoized_property
180 def entity_namespace(self) -> _EntityNamespace: ...
183def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]:
184 return hasattr(element, "entity_namespace")
187# Remove when https://github.com/python/mypy/issues/14640 will be fixed
188_Self = TypeVar("_Self", bound=Any)
191class Immutable:
192 """mark a ClauseElement as 'immutable' when expressions are cloned.
194 "immutable" objects refers to the "mutability" of an object in the
195 context of SQL DQL and DML generation. Such as, in DQL, one can
196 compose a SELECT or subquery of varied forms, but one cannot modify
197 the structure of a specific table or column within DQL.
198 :class:`.Immutable` is mostly intended to follow this concept, and as
199 such the primary "immutable" objects are :class:`.ColumnClause`,
200 :class:`.Column`, :class:`.TableClause`, :class:`.Table`.
202 """
204 __slots__ = ()
206 _is_immutable: bool = True
208 def unique_params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
209 raise NotImplementedError("Immutable objects do not support copying")
211 def params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
212 raise NotImplementedError("Immutable objects do not support copying")
214 def _clone(self: _Self, **kw: Any) -> _Self:
215 return self
217 def _copy_internals(
218 self, *, omit_attrs: Iterable[str] = (), **kw: Any
219 ) -> None:
220 pass
223class SingletonConstant(Immutable):
224 """Represent SQL constants like NULL, TRUE, FALSE"""
226 _is_singleton_constant: bool = True
228 _singleton: SingletonConstant
230 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T:
231 return cast(_T, cls._singleton)
233 @util.non_memoized_property
234 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
235 raise NotImplementedError()
237 @classmethod
238 def _create_singleton(cls) -> None:
239 obj = object.__new__(cls)
240 obj.__init__() # type: ignore
242 # for a long time this was an empty frozenset, meaning
243 # a SingletonConstant would never be a "corresponding column" in
244 # a statement. This referred to #6259. However, in #7154 we see
245 # that we do in fact need "correspondence" to work when matching cols
246 # in result sets, so the non-correspondence was moved to a more
247 # specific level when we are actually adapting expressions for SQL
248 # render only.
249 obj.proxy_set = frozenset([obj])
250 cls._singleton = obj
253def _from_objects(
254 *elements: Union[
255 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement
256 ]
257) -> Iterator[FromClause]:
258 return itertools.chain.from_iterable(
259 [element._from_objects for element in elements]
260 )
263def _select_iterables(
264 elements: Iterable[roles.ColumnsClauseRole],
265) -> _SelectIterable:
266 """expand tables into individual columns in the
267 given list of column expressions.
269 """
270 return itertools.chain.from_iterable(
271 [c._select_iterable for c in elements]
272 )
275_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType")
278class _GenerativeType(Protocol):
279 def _generate(self) -> Self: ...
282def _generative(fn: _Fn) -> _Fn:
283 """non-caching _generative() decorator.
285 This is basically the legacy decorator that copies the object and
286 runs a method on the new copy.
288 """
290 @util.decorator
291 def _generative(
292 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any
293 ) -> _SelfGenerativeType:
294 """Mark a method as generative."""
296 self = self._generate()
297 x = fn(self, *args, **kw)
298 assert x is self, "generative methods must return self"
299 return self
301 decorated = _generative(fn)
302 decorated.non_generative = fn # type: ignore
303 return decorated
306def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]:
307 msgs: Dict[str, str] = kw.pop("msgs", {})
309 defaults: Dict[str, str] = kw.pop("defaults", {})
311 getters: List[Tuple[str, operator.attrgetter[Any], Optional[str]]] = [
312 (name, operator.attrgetter(name), defaults.get(name, None))
313 for name in names
314 ]
316 @util.decorator
317 def check(fn: _Fn, *args: Any, **kw: Any) -> Any:
318 # make pylance happy by not including "self" in the argument
319 # list
320 self = args[0]
321 args = args[1:]
322 for name, getter, default_ in getters:
323 if getter(self) is not default_:
324 msg = msgs.get(
325 name,
326 "Method %s() has already been invoked on this %s construct"
327 % (fn.__name__, self.__class__),
328 )
329 raise exc.InvalidRequestError(msg)
330 return fn(self, *args, **kw)
332 return check
335def _clone(element, **kw):
336 return element._clone(**kw)
339def _expand_cloned(
340 elements: Iterable[_CLE],
341) -> Iterable[_CLE]:
342 """expand the given set of ClauseElements to be the set of all 'cloned'
343 predecessors.
345 """
346 # TODO: cython candidate
347 return itertools.chain(*[x._cloned_set for x in elements])
350def _de_clone(
351 elements: Iterable[_CLE],
352) -> Iterable[_CLE]:
353 for x in elements:
354 while x._is_clone_of is not None:
355 x = x._is_clone_of
356 yield x
359def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
360 """return the intersection of sets a and b, counting
361 any overlap between 'cloned' predecessors.
363 The returned set is in terms of the entities present within 'a'.
365 """
366 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
367 _expand_cloned(b)
368 )
369 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)}
372def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
373 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
374 _expand_cloned(b)
375 )
376 return {
377 elem for elem in a if not all_overlap.intersection(elem._cloned_set)
378 }
381class _DialectArgView(MutableMapping[str, Any]):
382 """A dictionary view of dialect-level arguments in the form
383 <dialectname>_<argument_name>.
385 """
387 __slots__ = ("obj",)
389 def __init__(self, obj: DialectKWArgs) -> None:
390 self.obj = obj
392 def _key(self, key: str) -> Tuple[str, str]:
393 try:
394 dialect, value_key = key.split("_", 1)
395 except ValueError as err:
396 raise KeyError(key) from err
397 else:
398 return dialect, value_key
400 def __getitem__(self, key: str) -> Any:
401 dialect, value_key = self._key(key)
403 try:
404 opt = self.obj.dialect_options[dialect]
405 except exc.NoSuchModuleError as err:
406 raise KeyError(key) from err
407 else:
408 return opt[value_key]
410 def __setitem__(self, key: str, value: Any) -> None:
411 try:
412 dialect, value_key = self._key(key)
413 except KeyError as err:
414 raise exc.ArgumentError(
415 "Keys must be of the form <dialectname>_<argname>"
416 ) from err
417 else:
418 self.obj.dialect_options[dialect][value_key] = value
420 def __delitem__(self, key: str) -> None:
421 dialect, value_key = self._key(key)
422 del self.obj.dialect_options[dialect][value_key]
424 def __len__(self) -> int:
425 return sum(
426 len(args._non_defaults)
427 for args in self.obj.dialect_options.values()
428 )
430 def __iter__(self) -> Generator[str, None, None]:
431 return (
432 "%s_%s" % (dialect_name, value_name)
433 for dialect_name in self.obj.dialect_options
434 for value_name in self.obj.dialect_options[
435 dialect_name
436 ]._non_defaults
437 )
440class _DialectArgDict(MutableMapping[str, Any]):
441 """A dictionary view of dialect-level arguments for a specific
442 dialect.
444 Maintains a separate collection of user-specified arguments
445 and dialect-specified default arguments.
447 """
449 def __init__(self) -> None:
450 self._non_defaults: Dict[str, Any] = {}
451 self._defaults: Dict[str, Any] = {}
453 def __len__(self) -> int:
454 return len(set(self._non_defaults).union(self._defaults))
456 def __iter__(self) -> Iterator[str]:
457 return iter(set(self._non_defaults).union(self._defaults))
459 def __getitem__(self, key: str) -> Any:
460 if key in self._non_defaults:
461 return self._non_defaults[key]
462 else:
463 return self._defaults[key]
465 def __setitem__(self, key: str, value: Any) -> None:
466 self._non_defaults[key] = value
468 def __delitem__(self, key: str) -> None:
469 del self._non_defaults[key]
472@util.preload_module("sqlalchemy.dialects")
473def _kw_reg_for_dialect(dialect_name: str) -> Optional[Dict[Any, Any]]:
474 dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
475 if dialect_cls.construct_arguments is None:
476 return None
477 return dict(dialect_cls.construct_arguments)
480class DialectKWArgs:
481 """Establish the ability for a class to have dialect-specific arguments
482 with defaults and constructor validation.
484 The :class:`.DialectKWArgs` interacts with the
485 :attr:`.DefaultDialect.construct_arguments` present on a dialect.
487 .. seealso::
489 :attr:`.DefaultDialect.construct_arguments`
491 """
493 __slots__ = ()
495 _dialect_kwargs_traverse_internals: List[Tuple[str, Any]] = [
496 ("dialect_options", InternalTraversal.dp_dialect_options)
497 ]
499 def get_dialect_option(
500 self,
501 dialect: Dialect,
502 argument_name: str,
503 *,
504 else_: Any = None,
505 deprecated_fallback: Optional[str] = None,
506 ) -> Any:
507 r"""Return the value of a dialect-specific option, or *else_* if
508 this dialect does not register the given argument.
510 This is useful for DDL compilers that may be inherited by
511 third-party dialects whose ``construct_arguments`` do not
512 include the same set of keys as the parent dialect.
514 :param dialect: The dialect for which to retrieve the option.
515 :param argument_name: The name of the argument to retrieve.
516 :param else\_: The value to return if the argument is not present.
517 :param deprecated_fallback: Optional dialect name to fall back to
518 if the argument is not present for the current dialect. If the
519 argument is present for the fallback dialect but not the current
520 dialect, a deprecation warning will be emitted.
522 """
524 registry = DialectKWArgs._kw_registry[dialect.name]
525 if registry is None:
526 return else_
528 if argument_name in registry.get(self.__class__, {}):
529 if (
530 deprecated_fallback is None
531 or dialect.name == deprecated_fallback
532 ):
533 return self.dialect_options[dialect.name][argument_name]
535 # deprecated_fallback is present; need to look in two places
537 # Current dialect has this option registered.
538 # Check if user explicitly set it.
539 if (
540 dialect.name in self.dialect_options
541 and argument_name
542 in self.dialect_options[dialect.name]._non_defaults
543 ):
544 # User explicitly set this dialect's option - use it
545 return self.dialect_options[dialect.name][argument_name]
547 # User didn't set current dialect's option.
548 # Check for deprecated fallback.
549 elif (
550 deprecated_fallback in self.dialect_options
551 and argument_name
552 in self.dialect_options[deprecated_fallback]._non_defaults
553 ):
554 # User set fallback option but not current dialect's option
555 warn_deprecated(
556 f"Using '{deprecated_fallback}_{argument_name}' "
557 f"with the '{dialect.name}' dialect is deprecated; "
558 f"please additionally specify "
559 f"'{dialect.name}_{argument_name}'.",
560 version="2.1",
561 )
562 return self.dialect_options[deprecated_fallback][argument_name]
564 # Return default value
565 return self.dialect_options[dialect.name][argument_name]
566 else:
567 # Current dialect doesn't have the option registered at all.
568 # Don't warn - if a third-party dialect doesn't support an
569 # option, that's their choice, not a deprecation case.
570 return else_
572 @classmethod
573 def argument_for(
574 cls, dialect_name: str, argument_name: str, default: Any
575 ) -> None:
576 """Add a new kind of dialect-specific keyword argument for this class.
578 E.g.::
580 Index.argument_for("mydialect", "length", None)
582 some_index = Index("a", "b", mydialect_length=5)
584 The :meth:`.DialectKWArgs.argument_for` method is a per-argument
585 way adding extra arguments to the
586 :attr:`.DefaultDialect.construct_arguments` dictionary. This
587 dictionary provides a list of argument names accepted by various
588 schema-level constructs on behalf of a dialect.
590 New dialects should typically specify this dictionary all at once as a
591 data member of the dialect class. The use case for ad-hoc addition of
592 argument names is typically for end-user code that is also using
593 a custom compilation scheme which consumes the additional arguments.
595 :param dialect_name: name of a dialect. The dialect must be
596 locatable, else a :class:`.NoSuchModuleError` is raised. The
597 dialect must also include an existing
598 :attr:`.DefaultDialect.construct_arguments` collection, indicating
599 that it participates in the keyword-argument validation and default
600 system, else :class:`.ArgumentError` is raised. If the dialect does
601 not include this collection, then any keyword argument can be
602 specified on behalf of this dialect already. All dialects packaged
603 within SQLAlchemy include this collection, however for third party
604 dialects, support may vary.
606 :param argument_name: name of the parameter.
608 :param default: default value of the parameter.
610 """
612 construct_arg_dictionary: Optional[Dict[Any, Any]] = (
613 DialectKWArgs._kw_registry[dialect_name]
614 )
615 if construct_arg_dictionary is None:
616 raise exc.ArgumentError(
617 "Dialect '%s' does have keyword-argument "
618 "validation and defaults enabled configured" % dialect_name
619 )
620 if cls not in construct_arg_dictionary:
621 construct_arg_dictionary[cls] = {}
622 construct_arg_dictionary[cls][argument_name] = default
624 @property
625 def dialect_kwargs(self) -> _DialectArgView:
626 """A collection of keyword arguments specified as dialect-specific
627 options to this construct.
629 The arguments are present here in their original ``<dialect>_<kwarg>``
630 format. Only arguments that were actually passed are included;
631 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which
632 contains all options known by this dialect including defaults.
634 The collection is also writable; keys are accepted of the
635 form ``<dialect>_<kwarg>`` where the value will be assembled
636 into the list of options.
638 .. seealso::
640 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form
642 """
643 return _DialectArgView(self)
645 @property
646 def kwargs(self) -> _DialectArgView:
647 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
648 return self.dialect_kwargs
650 _kw_registry: util.PopulateDict[str, Optional[Dict[Any, Any]]] = (
651 util.PopulateDict(_kw_reg_for_dialect)
652 )
654 @classmethod
655 def _kw_reg_for_dialect_cls(cls, dialect_name: str) -> _DialectArgDict:
656 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
657 d = _DialectArgDict()
659 if construct_arg_dictionary is None:
660 d._defaults.update({"*": None})
661 else:
662 for cls in reversed(cls.__mro__):
663 if cls in construct_arg_dictionary:
664 d._defaults.update(construct_arg_dictionary[cls])
665 return d
667 @util.memoized_property
668 def dialect_options(self) -> util.PopulateDict[str, _DialectArgDict]:
669 """A collection of keyword arguments specified as dialect-specific
670 options to this construct.
672 This is a two-level nested registry, keyed to ``<dialect_name>``
673 and ``<argument_name>``. For example, the ``postgresql_where``
674 argument would be locatable as::
676 arg = my_object.dialect_options["postgresql"]["where"]
678 .. versionadded:: 0.9.2
680 .. seealso::
682 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form
684 """
686 return util.PopulateDict(self._kw_reg_for_dialect_cls)
688 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None:
689 # validate remaining kwargs that they all specify DB prefixes
691 if not kwargs:
692 return
694 for k in kwargs:
695 m = re.match("^(.+?)_(.+)$", k)
696 if not m:
697 raise TypeError(
698 "Additional arguments should be "
699 "named <dialectname>_<argument>, got '%s'" % k
700 )
701 dialect_name, arg_name = m.group(1, 2)
703 try:
704 construct_arg_dictionary = self.dialect_options[dialect_name]
705 except exc.NoSuchModuleError:
706 util.warn(
707 "Can't validate argument %r; can't "
708 "locate any SQLAlchemy dialect named %r"
709 % (k, dialect_name)
710 )
711 self.dialect_options[dialect_name] = d = _DialectArgDict()
712 d._defaults.update({"*": None})
713 d._non_defaults[arg_name] = kwargs[k]
714 else:
715 if (
716 "*" not in construct_arg_dictionary
717 and arg_name not in construct_arg_dictionary
718 ):
719 raise exc.ArgumentError(
720 "Argument %r is not accepted by "
721 "dialect %r on behalf of %r"
722 % (k, dialect_name, self.__class__)
723 )
724 else:
725 construct_arg_dictionary[arg_name] = kwargs[k]
728class CompileState:
729 """Produces additional object state necessary for a statement to be
730 compiled.
732 the :class:`.CompileState` class is at the base of classes that assemble
733 state for a particular statement object that is then used by the
734 compiler. This process is essentially an extension of the process that
735 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis
736 on converting raw user intent into more organized structures rather than
737 producing string output. The top-level :class:`.CompileState` for the
738 statement being executed is also accessible when the execution context
739 works with invoking the statement and collecting results.
741 The production of :class:`.CompileState` is specific to the compiler, such
742 as within the :meth:`.SQLCompiler.visit_insert`,
743 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also
744 responsible for associating the :class:`.CompileState` with the
745 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement,
746 i.e. the outermost SQL statement that's actually being executed.
747 There can be other :class:`.CompileState` objects that are not the
748 toplevel, such as when a SELECT subquery or CTE-nested
749 INSERT/UPDATE/DELETE is generated.
751 .. versionadded:: 1.4
753 """
755 __slots__ = ("statement", "_ambiguous_table_name_map")
757 plugins: Dict[Tuple[str, str], Type[CompileState]] = {}
759 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap]
761 @classmethod
762 def create_for_statement(
763 cls, statement: Executable, compiler: SQLCompiler, **kw: Any
764 ) -> CompileState:
765 # factory construction.
767 if statement._propagate_attrs:
768 plugin_name = statement._propagate_attrs.get(
769 "compile_state_plugin", "default"
770 )
771 klass = cls.plugins.get(
772 (plugin_name, statement._effective_plugin_target), None
773 )
774 if klass is None:
775 klass = cls.plugins[
776 ("default", statement._effective_plugin_target)
777 ]
779 else:
780 klass = cls.plugins[
781 ("default", statement._effective_plugin_target)
782 ]
784 if klass is cls:
785 return cls(statement, compiler, **kw)
786 else:
787 return klass.create_for_statement(statement, compiler, **kw)
789 def __init__(self, statement, compiler, **kw):
790 self.statement = statement
792 @classmethod
793 def get_plugin_class(
794 cls, statement: Executable
795 ) -> Optional[Type[CompileState]]:
796 plugin_name = statement._propagate_attrs.get(
797 "compile_state_plugin", None
798 )
800 if plugin_name:
801 key = (plugin_name, statement._effective_plugin_target)
802 if key in cls.plugins:
803 return cls.plugins[key]
805 # there's no case where we call upon get_plugin_class() and want
806 # to get None back, there should always be a default. return that
807 # if there was no plugin-specific class (e.g. Insert with "orm"
808 # plugin)
809 try:
810 return cls.plugins[("default", statement._effective_plugin_target)]
811 except KeyError:
812 return None
814 @classmethod
815 def _get_plugin_class_for_plugin(
816 cls, statement: Executable, plugin_name: str
817 ) -> Optional[Type[CompileState]]:
818 try:
819 return cls.plugins[
820 (plugin_name, statement._effective_plugin_target)
821 ]
822 except KeyError:
823 return None
825 @classmethod
826 def plugin_for(
827 cls, plugin_name: str, visit_name: str
828 ) -> Callable[[_Fn], _Fn]:
829 def decorate(cls_to_decorate):
830 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate
831 return cls_to_decorate
833 return decorate
836class Generative(HasMemoized):
837 """Provide a method-chaining pattern in conjunction with the
838 @_generative decorator."""
840 def _generate(self) -> Self:
841 skip = self._memoized_keys
842 cls = self.__class__
843 s = cls.__new__(cls)
844 if skip:
845 # ensure this iteration remains atomic
846 s.__dict__ = {
847 k: v for k, v in self.__dict__.copy().items() if k not in skip
848 }
849 else:
850 s.__dict__ = self.__dict__.copy()
851 return s
854class InPlaceGenerative(HasMemoized):
855 """Provide a method-chaining pattern in conjunction with the
856 @_generative decorator that mutates in place."""
858 __slots__ = ()
860 def _generate(self) -> Self:
861 skip = self._memoized_keys
862 # note __dict__ needs to be in __slots__ if this is used
863 for k in skip:
864 self.__dict__.pop(k, None)
865 return self
868class HasCompileState(Generative):
869 """A class that has a :class:`.CompileState` associated with it."""
871 _compile_state_plugin: Optional[Type[CompileState]] = None
873 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT
875 _compile_state_factory = CompileState.create_for_statement
878class _MetaOptions(type):
879 """metaclass for the Options class.
881 This metaclass is actually necessary despite the availability of the
882 ``__init_subclass__()`` hook as this type also provides custom class-level
883 behavior for the ``__add__()`` method.
885 """
887 _cache_attrs: Tuple[str, ...]
889 def __add__(self, other):
890 o1 = self()
892 if set(other).difference(self._cache_attrs):
893 raise TypeError(
894 "dictionary contains attributes not covered by "
895 "Options class %s: %r"
896 % (self, set(other).difference(self._cache_attrs))
897 )
899 o1.__dict__.update(other)
900 return o1
902 if TYPE_CHECKING:
904 def __getattr__(self, key: str) -> Any: ...
906 def __setattr__(self, key: str, value: Any) -> None: ...
908 def __delattr__(self, key: str) -> None: ...
911class Options(metaclass=_MetaOptions):
912 """A cacheable option dictionary with defaults."""
914 __slots__ = ()
916 _cache_attrs: Tuple[str, ...]
918 def __init_subclass__(cls) -> None:
919 dict_ = cls.__dict__
920 cls._cache_attrs = tuple(
921 sorted(
922 d
923 for d in dict_
924 if not d.startswith("__")
925 and d not in ("_cache_key_traversal",)
926 )
927 )
928 super().__init_subclass__()
930 def __init__(self, **kw: Any) -> None:
931 self.__dict__.update(kw)
933 def __add__(self, other):
934 o1 = self.__class__.__new__(self.__class__)
935 o1.__dict__.update(self.__dict__)
937 if set(other).difference(self._cache_attrs):
938 raise TypeError(
939 "dictionary contains attributes not covered by "
940 "Options class %s: %r"
941 % (self, set(other).difference(self._cache_attrs))
942 )
944 o1.__dict__.update(other)
945 return o1
947 def __eq__(self, other):
948 # TODO: very inefficient. This is used only in test suites
949 # right now.
950 for a, b in zip_longest(self._cache_attrs, other._cache_attrs):
951 if getattr(self, a) != getattr(other, b):
952 return False
953 return True
955 def __repr__(self) -> str:
956 # TODO: fairly inefficient, used only in debugging right now.
958 return "%s(%s)" % (
959 self.__class__.__name__,
960 ", ".join(
961 "%s=%r" % (k, self.__dict__[k])
962 for k in self._cache_attrs
963 if k in self.__dict__
964 ),
965 )
967 @classmethod
968 def isinstance(cls, klass: Type[Any]) -> bool:
969 return issubclass(cls, klass)
971 @hybridmethod
972 def add_to_element(self, name: str, value: str) -> Any:
973 return self + {name: getattr(self, name) + value}
975 @hybridmethod
976 def _state_dict_inst(self) -> Mapping[str, Any]:
977 return self.__dict__
979 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT
981 @_state_dict_inst.classlevel
982 def _state_dict(cls) -> Mapping[str, Any]:
983 return cls._state_dict_const
985 @classmethod
986 def safe_merge(cls, other: "Options") -> Any:
987 d = other._state_dict()
989 # only support a merge with another object of our class
990 # and which does not have attrs that we don't. otherwise
991 # we risk having state that might not be part of our cache
992 # key strategy
994 if (
995 cls is not other.__class__
996 and other._cache_attrs
997 and set(other._cache_attrs).difference(cls._cache_attrs)
998 ):
999 raise TypeError(
1000 "other element %r is not empty, is not of type %s, "
1001 "and contains attributes not covered here %r"
1002 % (
1003 other,
1004 cls,
1005 set(other._cache_attrs).difference(cls._cache_attrs),
1006 )
1007 )
1008 return cls + d
1010 @classmethod
1011 def from_execution_options(
1012 cls,
1013 key: str,
1014 attrs: set[str],
1015 exec_options: Mapping[str, Any],
1016 statement_exec_options: Mapping[str, Any],
1017 ) -> Tuple["Options", Mapping[str, Any]]:
1018 """process Options argument in terms of execution options.
1021 e.g.::
1023 (
1024 load_options,
1025 execution_options,
1026 ) = QueryContext.default_load_options.from_execution_options(
1027 "_sa_orm_load_options",
1028 {"populate_existing", "autoflush", "yield_per"},
1029 execution_options,
1030 statement._execution_options,
1031 )
1033 get back the Options and refresh "_sa_orm_load_options" in the
1034 exec options dict w/ the Options as well
1036 """
1038 # common case is that no options we are looking for are
1039 # in either dictionary, so cancel for that first
1040 check_argnames = attrs.intersection(
1041 set(exec_options).union(statement_exec_options)
1042 )
1044 existing_options = exec_options.get(key, cls)
1046 if check_argnames:
1047 result = {}
1048 for argname in check_argnames:
1049 local = "_" + argname
1050 if argname in exec_options:
1051 result[local] = exec_options[argname]
1052 elif argname in statement_exec_options:
1053 result[local] = statement_exec_options[argname]
1055 new_options = existing_options + result
1056 exec_options = util.EMPTY_DICT.merge_with(
1057 exec_options, {key: new_options}
1058 )
1059 return new_options, exec_options
1061 else:
1062 return existing_options, exec_options
1064 if TYPE_CHECKING:
1066 def __getattr__(self, key: str) -> Any: ...
1068 def __setattr__(self, key: str, value: Any) -> None: ...
1070 def __delattr__(self, key: str) -> None: ...
1073class CacheableOptions(Options, HasCacheKey):
1074 __slots__ = ()
1076 @hybridmethod
1077 def _gen_cache_key_inst(
1078 self, anon_map: Any, bindparams: List[BindParameter[Any]]
1079 ) -> Optional[Tuple[Any]]:
1080 return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
1082 @_gen_cache_key_inst.classlevel
1083 def _gen_cache_key(
1084 cls, anon_map: "anon_map", bindparams: List[BindParameter[Any]]
1085 ) -> Tuple[CacheableOptions, Any]:
1086 return (cls, ())
1088 @hybridmethod
1089 def _generate_cache_key(self) -> Optional[CacheKey]:
1090 return HasCacheKey._generate_cache_key(self)
1093class ExecutableOption(HasCopyInternals):
1094 __slots__ = ()
1096 _annotations: _ImmutableExecuteOptions = util.EMPTY_DICT
1098 __visit_name__: str = "executable_option"
1100 _is_has_cache_key: bool = False
1102 _is_core: bool = True
1104 def _clone(self, **kw):
1105 """Create a shallow copy of this ExecutableOption."""
1106 c = self.__class__.__new__(self.__class__)
1107 c.__dict__ = dict(self.__dict__) # type: ignore
1108 return c
1111_L = TypeVar("_L", bound=str)
1114class HasSyntaxExtensions(Generic[_L]):
1116 _position_map: Mapping[_L, str]
1118 @_generative
1119 def ext(self, extension: SyntaxExtension) -> Self:
1120 """Applies a SQL syntax extension to this statement.
1122 SQL syntax extensions are :class:`.ClauseElement` objects that define
1123 some vendor-specific syntactical construct that take place in specific
1124 parts of a SQL statement. Examples include vendor extensions like
1125 PostgreSQL / SQLite's "ON DUPLICATE KEY UPDATE", PostgreSQL's
1126 "DISTINCT ON", and MySQL's "LIMIT" that can be applied to UPDATE
1127 and DELETE statements.
1129 .. seealso::
1131 :ref:`examples_syntax_extensions`
1133 :func:`_mysql.limit` - DML LIMIT for MySQL
1135 :func:`_postgresql.distinct_on` - DISTINCT ON for PostgreSQL
1137 .. versionadded:: 2.1
1139 """
1140 extension = coercions.expect(
1141 roles.SyntaxExtensionRole, extension, apply_propagate_attrs=self
1142 )
1143 self._apply_syntax_extension_to_self(extension)
1144 return self
1146 @util.preload_module("sqlalchemy.sql.elements")
1147 def apply_syntax_extension_point(
1148 self,
1149 apply_fn: Callable[[Sequence[ClauseElement]], Sequence[ClauseElement]],
1150 position: _L,
1151 ) -> None:
1152 """Apply a :class:`.SyntaxExtension` to a known extension point.
1154 Should be used only internally by :class:`.SyntaxExtension`.
1156 E.g.::
1158 class Qualify(SyntaxExtension, ClauseElement):
1160 # ...
1162 def apply_to_select(self, select_stmt: Select) -> None:
1163 # append self to existing
1164 select_stmt.apply_extension_point(
1165 lambda existing: [*existing, self], "post_criteria"
1166 )
1169 class ReplaceExt(SyntaxExtension, ClauseElement):
1171 # ...
1173 def apply_to_select(self, select_stmt: Select) -> None:
1174 # replace any existing elements regardless of type
1175 select_stmt.apply_extension_point(
1176 lambda existing: [self], "post_criteria"
1177 )
1180 class ReplaceOfTypeExt(SyntaxExtension, ClauseElement):
1182 # ...
1184 def apply_to_select(self, select_stmt: Select) -> None:
1185 # replace any existing elements of the same type
1186 select_stmt.apply_extension_point(
1187 self.append_replacing_same_type, "post_criteria"
1188 )
1190 :param apply_fn: callable function that will receive a sequence of
1191 :class:`.ClauseElement` that is already populating the extension
1192 point (the sequence is empty if there isn't one), and should return
1193 a new sequence of :class:`.ClauseElement` that will newly populate
1194 that point. The function typically can choose to concatenate the
1195 existing values with the new one, or to replace the values that are
1196 there with a new one by returning a list of a single element, or
1197 to perform more complex operations like removing only the same
1198 type element from the input list of merging already existing elements
1199 of the same type. Some examples are shown in the examples above
1200 :param position: string name of the position to apply to. This
1201 varies per statement type. IDEs should show the possible values
1202 for each statement type as it's typed with a ``typing.Literal`` per
1203 statement.
1205 .. seealso::
1207 :ref:`examples_syntax_extensions`
1209 :meth:`.ext`
1212 """ # noqa: E501
1214 try:
1215 attrname = self._position_map[position]
1216 except KeyError as ke:
1217 raise ValueError(
1218 f"Unknown position {position!r} for {self.__class__} "
1219 f"construct; known positions: "
1220 f"{', '.join(repr(k) for k in self._position_map)}"
1221 ) from ke
1222 else:
1223 ElementList = util.preloaded.sql_elements.ElementList
1224 existing: Optional[ClauseElement] = getattr(self, attrname, None)
1225 if existing is None:
1226 input_seq: Tuple[ClauseElement, ...] = ()
1227 elif isinstance(existing, ElementList):
1228 input_seq = existing.clauses
1229 else:
1230 input_seq = (existing,)
1232 new_seq = apply_fn(input_seq)
1233 assert new_seq, "cannot return empty sequence"
1234 new = new_seq[0] if len(new_seq) == 1 else ElementList(new_seq)
1235 setattr(self, attrname, new)
1237 def _apply_syntax_extension_to_self(
1238 self, extension: SyntaxExtension
1239 ) -> None:
1240 raise NotImplementedError()
1242 def _get_syntax_extensions_as_dict(self) -> Mapping[_L, SyntaxExtension]:
1243 res: Dict[_L, SyntaxExtension] = {}
1244 for name, attr in self._position_map.items():
1245 value = getattr(self, attr)
1246 if value is not None:
1247 res[name] = value
1248 return res
1250 def _set_syntax_extensions(self, **extensions: SyntaxExtension) -> None:
1251 for name, value in extensions.items():
1252 setattr(self, self._position_map[name], value) # type: ignore[index] # noqa: E501
1255class SyntaxExtension(roles.SyntaxExtensionRole):
1256 """Defines a unit that when also extending from :class:`.ClauseElement`
1257 can be applied to SQLAlchemy statements :class:`.Select`,
1258 :class:`_sql.Insert`, :class:`.Update` and :class:`.Delete` making use of
1259 pre-established SQL insertion points within these constructs.
1261 .. versionadded:: 2.1
1263 .. seealso::
1265 :ref:`examples_syntax_extensions`
1267 """
1269 def append_replacing_same_type(
1270 self, existing: Sequence[ClauseElement]
1271 ) -> Sequence[ClauseElement]:
1272 """Utility function that can be used as
1273 :paramref:`_sql.Select.apply_syntax_extension_point.apply_fn`
1274 to remove any other element of the same type in existing and appending
1275 ``self`` to the list.
1277 This is equivalent to::
1279 stmt.apply_syntax_extension_point(
1280 lambda existing: [
1281 *(e for e in existing if not isinstance(e, ReplaceOfTypeExt)),
1282 self,
1283 ],
1284 "post_criteria",
1285 )
1287 .. seealso::
1289 :ref:`examples_syntax_extensions`
1291 :meth:`_sql.Select.apply_syntax_extension_point` and equivalents
1292 in :class:`_dml.Insert`, :class:`_dml.Delete`, :class:`_dml.Update`
1294 """ # noqa: E501
1295 cls = type(self)
1296 return [*(e for e in existing if not isinstance(e, cls)), self] # type: ignore[list-item] # noqa: E501
1298 def apply_to_select(self, select_stmt: Select[Unpack[_Ts]]) -> None:
1299 """Apply this :class:`.SyntaxExtension` to a :class:`.Select`"""
1300 raise NotImplementedError(
1301 f"Extension {type(self).__name__} cannot be applied to select"
1302 )
1304 def apply_to_update(self, update_stmt: Update) -> None:
1305 """Apply this :class:`.SyntaxExtension` to an :class:`.Update`"""
1306 raise NotImplementedError(
1307 f"Extension {type(self).__name__} cannot be applied to update"
1308 )
1310 def apply_to_delete(self, delete_stmt: Delete) -> None:
1311 """Apply this :class:`.SyntaxExtension` to a :class:`.Delete`"""
1312 raise NotImplementedError(
1313 f"Extension {type(self).__name__} cannot be applied to delete"
1314 )
1316 def apply_to_insert(self, insert_stmt: Insert) -> None:
1317 """Apply this :class:`.SyntaxExtension` to an :class:`_sql.Insert`"""
1318 raise NotImplementedError(
1319 f"Extension {type(self).__name__} cannot be applied to insert"
1320 )
1323class Executable(roles.StatementRole):
1324 """Mark a :class:`_expression.ClauseElement` as supporting execution.
1326 :class:`.Executable` is a superclass for all "statement" types
1327 of objects, including :func:`select`, :func:`delete`, :func:`update`,
1328 :func:`insert`, :func:`text`.
1330 """
1332 supports_execution: bool = True
1333 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT
1334 _is_default_generator: bool = False
1335 _with_options: Tuple[ExecutableOption, ...] = ()
1336 _compile_state_funcs: Tuple[
1337 Tuple[Callable[[CompileState], None], Any], ...
1338 ] = ()
1339 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]]
1341 _executable_traverse_internals = [
1342 ("_with_options", InternalTraversal.dp_executable_options),
1343 (
1344 "_compile_state_funcs",
1345 ExtendedInternalTraversal.dp_compile_state_funcs,
1346 ),
1347 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
1348 ]
1350 is_select: bool = False
1351 is_from_statement: bool = False
1352 is_update: bool = False
1353 is_insert: bool = False
1354 is_text: bool = False
1355 is_delete: bool = False
1356 is_dml: bool = False
1358 if TYPE_CHECKING:
1359 __visit_name__: str
1361 def _compile_w_cache(
1362 self,
1363 dialect: Dialect,
1364 *,
1365 compiled_cache: Optional[CompiledCacheType],
1366 column_keys: List[str],
1367 for_executemany: bool = False,
1368 schema_translate_map: Optional[SchemaTranslateMapType] = None,
1369 **kw: Any,
1370 ) -> tuple[
1371 Compiled,
1372 Sequence[BindParameter[Any]] | None,
1373 _CoreSingleExecuteParams | None,
1374 CacheStats,
1375 ]: ...
1377 def _execute_on_connection(
1378 self,
1379 connection: Connection,
1380 distilled_params: _CoreMultiExecuteParams,
1381 execution_options: CoreExecuteOptionsParameter,
1382 ) -> CursorResult[Any]: ...
1384 def _execute_on_scalar(
1385 self,
1386 connection: Connection,
1387 distilled_params: _CoreMultiExecuteParams,
1388 execution_options: CoreExecuteOptionsParameter,
1389 ) -> Any: ...
1391 @util.ro_non_memoized_property
1392 def _all_selected_columns(self) -> _SelectIterable:
1393 raise NotImplementedError()
1395 @property
1396 def _effective_plugin_target(self) -> str:
1397 return self.__visit_name__
1399 @_generative
1400 def options(self, *options: ExecutableOption) -> Self:
1401 """Apply options to this statement.
1403 In the general sense, options are any kind of Python object
1404 that can be interpreted by systems that consume the statement outside
1405 of the regular SQL compiler chain. Specifically, these options are
1406 the ORM level options that apply "eager load" and other loading
1407 behaviors to an ORM query.
1409 For background on specific kinds of options for specific kinds of
1410 statements, refer to the documentation for those option objects.
1412 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to
1413 Core statement objects towards the goal of allowing unified
1414 Core / ORM querying capabilities.
1416 .. seealso::
1418 :ref:`loading_columns` - refers to options specific to the usage
1419 of ORM queries
1421 :ref:`relationship_loader_options` - refers to options specific
1422 to the usage of ORM queries
1424 """
1425 self._with_options += tuple(
1426 coercions.expect(roles.ExecutableOptionRole, opt)
1427 for opt in options
1428 )
1429 return self
1431 @_generative
1432 def _set_compile_options(self, compile_options: CacheableOptions) -> Self:
1433 """Assign the compile options to a new value.
1435 :param compile_options: appropriate CacheableOptions structure
1437 """
1439 self._compile_options = compile_options
1440 return self
1442 @_generative
1443 def _update_compile_options(self, options: CacheableOptions) -> Self:
1444 """update the _compile_options with new keys."""
1446 assert self._compile_options is not None
1447 self._compile_options += options
1448 return self
1450 @_generative
1451 def _add_compile_state_func(
1452 self,
1453 callable_: Callable[[CompileState], None],
1454 cache_args: Any,
1455 ) -> Self:
1456 """Add a compile state function to this statement.
1458 When using the ORM only, these are callable functions that will
1459 be given the CompileState object upon compilation.
1461 A second argument cache_args is required, which will be combined with
1462 the ``__code__`` identity of the function itself in order to produce a
1463 cache key.
1465 """
1466 self._compile_state_funcs += ((callable_, cache_args),)
1467 return self
1469 @overload
1470 def execution_options(
1471 self,
1472 *,
1473 compiled_cache: Optional[CompiledCacheType] = ...,
1474 logging_token: str = ...,
1475 isolation_level: IsolationLevel = ...,
1476 no_parameters: bool = False,
1477 stream_results: bool = False,
1478 max_row_buffer: int = ...,
1479 yield_per: int = ...,
1480 driver_column_names: bool = ...,
1481 insertmanyvalues_page_size: int = ...,
1482 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
1483 populate_existing: bool = False,
1484 autoflush: bool = False,
1485 synchronize_session: SynchronizeSessionArgument = ...,
1486 dml_strategy: DMLStrategyArgument = ...,
1487 render_nulls: bool = ...,
1488 is_delete_using: bool = ...,
1489 is_update_from: bool = ...,
1490 preserve_rowcount: bool = False,
1491 **opt: Any,
1492 ) -> Self: ...
1494 @overload
1495 def execution_options(self, **opt: Any) -> Self: ...
1497 @_generative
1498 def execution_options(self, **kw: Any) -> Self:
1499 """Set non-SQL options for the statement which take effect during
1500 execution.
1502 Execution options can be set at many scopes, including per-statement,
1503 per-connection, or per execution, using methods such as
1504 :meth:`_engine.Connection.execution_options` and parameters which
1505 accept a dictionary of options such as
1506 :paramref:`_engine.Connection.execute.execution_options` and
1507 :paramref:`_orm.Session.execute.execution_options`.
1509 The primary characteristic of an execution option, as opposed to
1510 other kinds of options such as ORM loader options, is that
1511 **execution options never affect the compiled SQL of a query, only
1512 things that affect how the SQL statement itself is invoked or how
1513 results are fetched**. That is, execution options are not part of
1514 what's accommodated by SQL compilation nor are they considered part of
1515 the cached state of a statement.
1517 The :meth:`_sql.Executable.execution_options` method is
1518 :term:`generative`, as
1519 is the case for the method as applied to the :class:`_engine.Engine`
1520 and :class:`_orm.Query` objects, which means when the method is called,
1521 a copy of the object is returned, which applies the given parameters to
1522 that new copy, but leaves the original unchanged::
1524 statement = select(table.c.x, table.c.y)
1525 new_statement = statement.execution_options(my_option=True)
1527 An exception to this behavior is the :class:`_engine.Connection`
1528 object, where the :meth:`_engine.Connection.execution_options` method
1529 is explicitly **not** generative.
1531 The kinds of options that may be passed to
1532 :meth:`_sql.Executable.execution_options` and other related methods and
1533 parameter dictionaries include parameters that are explicitly consumed
1534 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not
1535 defined by SQLAlchemy, which means the methods and/or parameter
1536 dictionaries may be used for user-defined parameters that interact with
1537 custom code, which may access the parameters using methods such as
1538 :meth:`_sql.Executable.get_execution_options` and
1539 :meth:`_engine.Connection.get_execution_options`, or within selected
1540 event hooks using a dedicated ``execution_options`` event parameter
1541 such as
1542 :paramref:`_events.ConnectionEvents.before_execute.execution_options`
1543 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.::
1545 from sqlalchemy import event
1548 @event.listens_for(some_engine, "before_execute")
1549 def _process_opt(conn, statement, multiparams, params, execution_options):
1550 "run a SQL function before invoking a statement"
1552 if execution_options.get("do_special_thing", False):
1553 conn.exec_driver_sql("run_special_function()")
1555 Within the scope of options that are explicitly recognized by
1556 SQLAlchemy, most apply to specific classes of objects and not others.
1557 The most common execution options include:
1559 * :paramref:`_engine.Connection.execution_options.isolation_level` -
1560 sets the isolation level for a connection or a class of connections
1561 via an :class:`_engine.Engine`. This option is accepted only
1562 by :class:`_engine.Connection` or :class:`_engine.Engine`.
1564 * :paramref:`_engine.Connection.execution_options.stream_results` -
1565 indicates results should be fetched using a server side cursor;
1566 this option is accepted by :class:`_engine.Connection`, by the
1567 :paramref:`_engine.Connection.execute.execution_options` parameter
1568 on :meth:`_engine.Connection.execute`, and additionally by
1569 :meth:`_sql.Executable.execution_options` on a SQL statement object,
1570 as well as by ORM constructs like :meth:`_orm.Session.execute`.
1572 * :paramref:`_engine.Connection.execution_options.compiled_cache` -
1573 indicates a dictionary that will serve as the
1574 :ref:`SQL compilation cache <sql_caching>`
1575 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as
1576 well as for ORM methods like :meth:`_orm.Session.execute`.
1577 Can be passed as ``None`` to disable caching for statements.
1578 This option is not accepted by
1579 :meth:`_sql.Executable.execution_options` as it is inadvisable to
1580 carry along a compilation cache within a statement object.
1582 * :paramref:`_engine.Connection.execution_options.schema_translate_map`
1583 - a mapping of schema names used by the
1584 :ref:`Schema Translate Map <schema_translating>` feature, accepted
1585 by :class:`_engine.Connection`, :class:`_engine.Engine`,
1586 :class:`_sql.Executable`, as well as by ORM constructs
1587 like :meth:`_orm.Session.execute`.
1589 .. seealso::
1591 :meth:`_engine.Connection.execution_options`
1593 :paramref:`_engine.Connection.execute.execution_options`
1595 :paramref:`_orm.Session.execute.execution_options`
1597 :ref:`orm_queryguide_execution_options` - documentation on all
1598 ORM-specific execution options
1600 """ # noqa: E501
1601 if "isolation_level" in kw:
1602 raise exc.ArgumentError(
1603 "'isolation_level' execution option may only be specified "
1604 "on Connection.execution_options(), or "
1605 "per-engine using the isolation_level "
1606 "argument to create_engine()."
1607 )
1608 if "compiled_cache" in kw:
1609 raise exc.ArgumentError(
1610 "'compiled_cache' execution option may only be specified "
1611 "on Connection.execution_options(), not per statement."
1612 )
1613 self._execution_options = self._execution_options.union(kw)
1614 return self
1616 def get_execution_options(self) -> _ExecuteOptions:
1617 """Get the non-SQL options which will take effect during execution.
1619 .. seealso::
1621 :meth:`.Executable.execution_options`
1622 """
1623 return self._execution_options
1626class ExecutableStatement(Executable):
1627 """Executable subclass that implements a lightweight version of ``params``
1628 that avoids a full cloned traverse.
1630 .. versionadded:: 2.1
1632 """
1634 _params: util.immutabledict[str, Any] = EMPTY_DICT
1636 _executable_traverse_internals = (
1637 Executable._executable_traverse_internals
1638 + [("_params", InternalTraversal.dp_params)]
1639 )
1641 @_generative
1642 def params(
1643 self,
1644 __optionaldict: _CoreSingleExecuteParams | None = None,
1645 /,
1646 **kwargs: Any,
1647 ) -> Self:
1648 """Return a copy with the provided bindparam values.
1650 Returns a copy of this Executable with bindparam values set
1651 to the given dictionary::
1653 >>> clause = column("x") + bindparam("foo")
1654 >>> print(clause.compile().params)
1655 {'foo': None}
1656 >>> print(clause.params({"foo": 7}).compile().params)
1657 {'foo': 7}
1659 """
1660 if __optionaldict:
1661 kwargs.update(__optionaldict)
1662 self._params = (
1663 util.immutabledict(kwargs)
1664 if not self._params
1665 else self._params | kwargs
1666 )
1667 return self
1670class SchemaEventTarget(event.EventTarget):
1671 """Base class for elements that are the targets of :class:`.DDLEvents`
1672 events.
1674 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
1676 """
1678 dispatch: dispatcher[SchemaEventTarget]
1680 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
1681 """Associate with this SchemaEvent's parent object."""
1683 def _set_parent_with_dispatch(
1684 self, parent: SchemaEventTarget, **kw: Any
1685 ) -> None:
1686 self.dispatch.before_parent_attach(self, parent)
1687 self._set_parent(parent, **kw)
1688 self.dispatch.after_parent_attach(self, parent)
1691class SchemaVisitable(SchemaEventTarget, visitors.Visitable):
1692 """Base class for elements that are targets of a :class:`.SchemaVisitor`.
1694 .. versionadded:: 2.0.41
1696 """
1699class SchemaVisitor(ClauseVisitor):
1700 """Define the visiting for ``SchemaItem`` and more
1701 generally ``SchemaVisitable`` objects.
1703 """
1705 __traverse_options__: Dict[str, Any] = {"schema_visitor": True}
1708class _SentinelDefaultCharacterization(Enum):
1709 NONE = "none"
1710 UNKNOWN = "unknown"
1711 CLIENTSIDE = "clientside"
1712 SENTINEL_DEFAULT = "sentinel_default"
1713 SERVERSIDE = "serverside"
1714 IDENTITY = "identity"
1715 SEQUENCE = "sequence"
1716 MONOTONIC_FUNCTION = "monotonic"
1719class _SentinelColumnCharacterization(NamedTuple):
1720 columns: Optional[Sequence[Column[Any]]] = None
1721 is_explicit: bool = False
1722 is_autoinc: bool = False
1723 default_characterization: _SentinelDefaultCharacterization = (
1724 _SentinelDefaultCharacterization.NONE
1725 )
1728_COLKEY = TypeVar("_COLKEY", Union[None, str], str)
1730_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True)
1731_COL = TypeVar("_COL", bound="ColumnElement[Any]")
1734class _ColumnMetrics(Generic[_COL_co]):
1735 __slots__ = ("column",)
1737 column: _COL_co
1739 def __init__(
1740 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co
1741 ) -> None:
1742 self.column = col
1744 # proxy_index being non-empty means it was initialized.
1745 # so we need to update it
1746 pi = collection._proxy_index
1747 if pi:
1748 for eps_col in col._expanded_proxy_set:
1749 pi[eps_col].add(self)
1751 def get_expanded_proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
1752 return self.column._expanded_proxy_set
1754 def dispose(self, collection: ColumnCollection[_COLKEY, _COL_co]) -> None:
1755 pi = collection._proxy_index
1756 if not pi:
1757 return
1758 for col in self.column._expanded_proxy_set:
1759 colset = pi.get(col, None)
1760 if colset:
1761 colset.discard(self)
1762 if colset is not None and not colset:
1763 del pi[col]
1765 def embedded(
1766 self,
1767 target_set: Union[
1768 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]]
1769 ],
1770 ) -> bool:
1771 expanded_proxy_set = self.column._expanded_proxy_set
1772 for t in target_set.difference(expanded_proxy_set):
1773 if not expanded_proxy_set.intersection(_expand_cloned([t])):
1774 return False
1775 return True
1778class ColumnCollection(Generic[_COLKEY, _COL_co]):
1779 """Base class for collection of :class:`_expression.ColumnElement`
1780 instances, typically for :class:`_sql.FromClause` objects.
1782 The :class:`_sql.ColumnCollection` object is most commonly available
1783 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection
1784 on the :class:`_schema.Table` object, introduced at
1785 :ref:`metadata_tables_and_columns`.
1787 The :class:`_expression.ColumnCollection` has both mapping- and sequence-
1788 like behaviors. A :class:`_expression.ColumnCollection` usually stores
1789 :class:`_schema.Column` objects, which are then accessible both via mapping
1790 style access as well as attribute access style.
1792 To access :class:`_schema.Column` objects using ordinary attribute-style
1793 access, specify the name like any other object attribute, such as below
1794 a column named ``employee_name`` is accessed::
1796 >>> employee_table.c.employee_name
1798 To access columns that have names with special characters or spaces,
1799 index-style access is used, such as below which illustrates a column named
1800 ``employee ' payment`` is accessed::
1802 >>> employee_table.c["employee ' payment"]
1804 As the :class:`_sql.ColumnCollection` object provides a Python dictionary
1805 interface, common dictionary method names like
1806 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`,
1807 and :meth:`_sql.ColumnCollection.items` are available, which means that
1808 database columns that are keyed under these names also need to use indexed
1809 access::
1811 >>> employee_table.c["values"]
1814 The name for which a :class:`_schema.Column` would be present is normally
1815 that of the :paramref:`_schema.Column.key` parameter. In some contexts,
1816 such as a :class:`_sql.Select` object that uses a label style set
1817 using the :meth:`_sql.Select.set_label_style` method, a column of a certain
1818 key may instead be represented under a particular label name such
1819 as ``tablename_columnname``::
1821 >>> from sqlalchemy import select, column, table
1822 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
1823 >>> t = table("t", column("c"))
1824 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
1825 >>> subq = stmt.subquery()
1826 >>> subq.c.t_c
1827 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c>
1829 :class:`.ColumnCollection` also indexes the columns in order and allows
1830 them to be accessible by their integer position::
1832 >>> cc[0]
1833 Column('x', Integer(), table=None)
1834 >>> cc[1]
1835 Column('y', Integer(), table=None)
1837 .. versionadded:: 1.4 :class:`_expression.ColumnCollection`
1838 allows integer-based
1839 index access to the collection.
1841 Iterating the collection yields the column expressions in order::
1843 >>> list(cc)
1844 [Column('x', Integer(), table=None),
1845 Column('y', Integer(), table=None)]
1847 The :class:`_expression.ColumnCollection` base class is read-only.
1848 For mutation operations, the :class:`.WriteableColumnCollection` subclass
1849 provides methods such as :meth:`.WriteableColumnCollection.add`.
1850 A special subclass :class:`.DedupeColumnCollection` exists which
1851 maintains SQLAlchemy's older behavior of not allowing duplicates; this
1852 collection is used for schema level objects like :class:`_schema.Table`
1853 and :class:`.PrimaryKeyConstraint` where this deduping is helpful.
1854 The :class:`.DedupeColumnCollection` class also has additional mutation
1855 methods as the schema constructs have more use cases that require removal
1856 and replacement of columns.
1858 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection`
1859 now stores duplicate
1860 column keys as well as the same column in multiple positions. The
1861 :class:`.DedupeColumnCollection` class is added to maintain the
1862 former behavior in those cases where deduplication as well as
1863 additional replace/remove operations are needed.
1865 .. versionchanged:: 2.1 :class:`_expression.ColumnCollection` is now
1866 a read-only base class. Mutation operations are available through
1867 :class:`.WriteableColumnCollection` and :class:`.DedupeColumnCollection`
1868 subclasses.
1871 """
1873 __slots__ = ("_collection", "_index", "_colset", "_proxy_index")
1875 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]]
1876 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
1877 _colset: Set[_COL_co]
1878 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]]
1880 def __init__(self) -> None:
1881 raise TypeError(
1882 "ColumnCollection is an abstract base class and cannot be "
1883 "instantiated directly. Use WriteableColumnCollection or "
1884 "DedupeColumnCollection instead."
1885 )
1887 @util.preload_module("sqlalchemy.sql.elements")
1888 def __clause_element__(self) -> ClauseList:
1889 elements = util.preloaded.sql_elements
1891 return elements.ClauseList(
1892 _literal_as_text_role=roles.ColumnsClauseRole,
1893 group=False,
1894 *self._all_columns,
1895 )
1897 @property
1898 def _all_columns(self) -> List[_COL_co]:
1899 return [col for (_, col, _) in self._collection]
1901 def keys(self) -> List[_COLKEY]:
1902 """Return a sequence of string key names for all columns in this
1903 collection."""
1904 return [k for (k, _, _) in self._collection]
1906 def values(self) -> List[_COL_co]:
1907 """Return a sequence of :class:`_sql.ColumnClause` or
1908 :class:`_schema.Column` objects for all columns in this
1909 collection."""
1910 return [col for (_, col, _) in self._collection]
1912 def items(self) -> List[Tuple[_COLKEY, _COL_co]]:
1913 """Return a sequence of (key, column) tuples for all columns in this
1914 collection each consisting of a string key name and a
1915 :class:`_sql.ColumnClause` or
1916 :class:`_schema.Column` object.
1917 """
1919 return [(k, col) for (k, col, _) in self._collection]
1921 def __bool__(self) -> bool:
1922 return bool(self._collection)
1924 def __len__(self) -> int:
1925 return len(self._collection)
1927 def __iter__(self) -> Iterator[_COL_co]:
1928 # turn to a list first to maintain over a course of changes
1929 return iter([col for _, col, _ in self._collection])
1931 @overload
1932 def __getitem__(self, key: Union[str, int]) -> _COL_co: ...
1934 @overload
1935 def __getitem__(
1936 self, key: Union[Tuple[Union[str, int], ...], slice]
1937 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1939 def __getitem__(
1940 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]]
1941 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]:
1942 try:
1943 if isinstance(key, (tuple, slice)):
1944 if isinstance(key, slice):
1945 cols = (
1946 (sub_key, col)
1947 for (sub_key, col, _) in self._collection[key]
1948 )
1949 else:
1950 cols = (self._index[sub_key] for sub_key in key)
1952 return WriteableColumnCollection(cols).as_readonly()
1953 else:
1954 return self._index[key][1]
1955 except KeyError as err:
1956 if isinstance(err.args[0], int):
1957 raise IndexError(err.args[0]) from err
1958 else:
1959 raise
1961 def __getattr__(self, key: str) -> _COL_co:
1962 try:
1963 return self._index[key][1]
1964 except KeyError as err:
1965 raise AttributeError(key) from err
1967 def __contains__(self, key: str) -> bool:
1968 if key not in self._index:
1969 if not isinstance(key, str):
1970 raise exc.ArgumentError(
1971 "__contains__ requires a string argument"
1972 )
1973 return False
1974 else:
1975 return True
1977 def compare(self, other: ColumnCollection[_COLKEY, _COL_co]) -> bool:
1978 """Compare this :class:`_expression.ColumnCollection` to another
1979 based on the names of the keys"""
1981 for l, r in zip_longest(self, other):
1982 if l is not r:
1983 return False
1984 else:
1985 return True
1987 def __eq__(self, other: Any) -> bool:
1988 return self.compare(other)
1990 @overload
1991 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ...
1993 @overload
1994 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ...
1996 def get(
1997 self, key: str, default: Optional[_COL] = None
1998 ) -> Optional[Union[_COL_co, _COL]]:
1999 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object
2000 based on a string key name from this
2001 :class:`_expression.ColumnCollection`."""
2003 if key in self._index:
2004 return self._index[key][1]
2005 else:
2006 return default
2008 def __str__(self) -> str:
2009 return "%s(%s)" % (
2010 self.__class__.__name__,
2011 ", ".join(str(c) for c in self),
2012 )
2014 # https://github.com/python/mypy/issues/4266
2015 __hash__: Optional[int] = None # type: ignore
2017 def contains_column(self, col: ColumnElement[Any]) -> bool:
2018 """Checks if a column object exists in this collection"""
2019 if col not in self._colset:
2020 if isinstance(col, str):
2021 raise exc.ArgumentError(
2022 "contains_column cannot be used with string arguments. "
2023 "Use ``col_name in table.c`` instead."
2024 )
2025 return False
2026 else:
2027 return True
2029 def _as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
2030 raise NotImplementedError()
2032 def corresponding_column(
2033 self, column: _COL, require_embedded: bool = False
2034 ) -> Optional[Union[_COL, _COL_co]]:
2035 """Given a :class:`_expression.ColumnElement`, return the exported
2036 :class:`_expression.ColumnElement` object from this
2037 :class:`_expression.ColumnCollection`
2038 which corresponds to that original :class:`_expression.ColumnElement`
2039 via a common
2040 ancestor column.
2042 :param column: the target :class:`_expression.ColumnElement`
2043 to be matched.
2045 :param require_embedded: only return corresponding columns for
2046 the given :class:`_expression.ColumnElement`, if the given
2047 :class:`_expression.ColumnElement`
2048 is actually present within a sub-element
2049 of this :class:`_expression.Selectable`.
2050 Normally the column will match if
2051 it merely shares a common ancestor with one of the exported
2052 columns of this :class:`_expression.Selectable`.
2054 .. seealso::
2056 :meth:`_expression.Selectable.corresponding_column`
2057 - invokes this method
2058 against the collection returned by
2059 :attr:`_expression.Selectable.exported_columns`.
2061 .. versionchanged:: 1.4 the implementation for ``corresponding_column``
2062 was moved onto the :class:`_expression.ColumnCollection` itself.
2064 """
2065 raise NotImplementedError()
2068class WriteableColumnCollection(ColumnCollection[_COLKEY, _COL_co]):
2069 """A :class:`_sql.ColumnCollection` that allows mutation operations.
2071 This is the writable form of :class:`_sql.ColumnCollection` that
2072 implements methods such as :meth:`.add`, :meth:`.remove`, :meth:`.update`,
2073 and :meth:`.clear`.
2075 This class is used internally for building column collections during
2076 construction of SQL constructs. For schema-level objects that require
2077 deduplication behavior, use :class:`.DedupeColumnCollection`.
2079 .. versionadded:: 2.1
2081 """
2083 __slots__ = ()
2085 def __init__(
2086 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None
2087 ):
2088 object.__setattr__(self, "_colset", set())
2089 object.__setattr__(self, "_index", {})
2090 object.__setattr__(
2091 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
2092 )
2093 object.__setattr__(self, "_collection", [])
2094 if columns:
2095 self._initial_populate(columns)
2097 def _initial_populate(
2098 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
2099 ) -> None:
2100 self._populate_separate_keys(iter_)
2102 def _populate_separate_keys(
2103 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
2104 ) -> None:
2105 """populate from an iterator of (key, column)"""
2107 self._collection[:] = collection = [
2108 (k, c, _ColumnMetrics(self, c)) for k, c in iter_
2109 ]
2110 self._colset.update(c._deannotate() for _, c, _ in collection)
2111 self._index.update(
2112 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)}
2113 )
2114 self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
2116 def __getstate__(self) -> Dict[str, Any]:
2117 return {
2118 "_collection": [(k, c) for k, c, _ in self._collection],
2119 "_index": self._index,
2120 }
2122 def __setstate__(self, state: Dict[str, Any]) -> None:
2123 object.__setattr__(self, "_index", state["_index"])
2124 object.__setattr__(
2125 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
2126 )
2127 object.__setattr__(
2128 self,
2129 "_collection",
2130 [
2131 (k, c, _ColumnMetrics(self, c))
2132 for (k, c) in state["_collection"]
2133 ],
2134 )
2135 object.__setattr__(
2136 self, "_colset", {col for k, col, _ in self._collection}
2137 )
2139 def add(
2140 self,
2141 column: ColumnElement[Any],
2142 key: Optional[_COLKEY] = None,
2143 ) -> None:
2144 """Add a column to this :class:`_sql.WriteableColumnCollection`.
2146 .. note::
2148 This method is **not normally used by user-facing code**, as the
2149 :class:`_sql.WriteableColumnCollection` is usually part of an
2150 existing object such as a :class:`_schema.Table`. To add a
2151 :class:`_schema.Column` to an existing :class:`_schema.Table`
2152 object, use the :meth:`_schema.Table.append_column` method.
2154 """
2155 colkey: _COLKEY
2157 if key is None:
2158 colkey = column.key # type: ignore
2159 else:
2160 colkey = key
2162 l = len(self._collection)
2164 # don't really know how this part is supposed to work w/ the
2165 # covariant thing
2167 _column = cast(_COL_co, column)
2169 self._collection.append(
2170 (colkey, _column, _ColumnMetrics(self, _column))
2171 )
2172 self._colset.add(_column._deannotate())
2174 self._index[l] = (colkey, _column)
2175 if colkey not in self._index:
2176 self._index[colkey] = (colkey, _column)
2178 def _as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
2179 return ReadOnlyColumnCollection(self)
2181 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
2182 """Return a "read only" form of this
2183 :class:`_sql.WriteableColumnCollection`."""
2185 return self._as_readonly()
2187 def _init_proxy_index(self) -> None:
2188 """populate the "proxy index", if empty.
2190 proxy index is added in 2.0 to provide more efficient operation
2191 for the corresponding_column() method.
2193 For reasons of both time to construct new .c collections as well as
2194 memory conservation for large numbers of large .c collections, the
2195 proxy_index is only filled if corresponding_column() is called. once
2196 filled it stays that way, and new _ColumnMetrics objects created after
2197 that point will populate it with new data. Note this case would be
2198 unusual, if not nonexistent, as it means a .c collection is being
2199 mutated after corresponding_column() were used, however it is tested in
2200 test/base/test_utils.py.
2202 """
2203 pi = self._proxy_index
2204 if pi:
2205 return
2207 for _, _, metrics in self._collection:
2208 eps = metrics.column._expanded_proxy_set
2210 for eps_col in eps:
2211 pi[eps_col].add(metrics)
2213 def corresponding_column(
2214 self, column: _COL, require_embedded: bool = False
2215 ) -> Optional[Union[_COL, _COL_co]]:
2216 """Given a :class:`_expression.ColumnElement`, return the exported
2217 :class:`_expression.ColumnElement` object from this
2218 :class:`_expression.ColumnCollection`
2219 which corresponds to that original :class:`_expression.ColumnElement`
2220 via a common
2221 ancestor column.
2223 See :meth:`.ColumnCollection.corresponding_column` for parameter
2224 information.
2226 """
2227 # TODO: cython candidate
2229 # don't dig around if the column is locally present
2230 if column in self._colset:
2231 return column
2233 selected_intersection, selected_metrics = None, None
2234 target_set = column.proxy_set
2236 pi = self._proxy_index
2237 if not pi:
2238 self._init_proxy_index()
2240 for current_metrics in (
2241 mm for ts in target_set if ts in pi for mm in pi[ts]
2242 ):
2243 if not require_embedded or current_metrics.embedded(target_set):
2244 if selected_metrics is None:
2245 # no corresponding column yet, pick this one.
2246 selected_metrics = current_metrics
2247 continue
2249 current_intersection = target_set.intersection(
2250 current_metrics.column._expanded_proxy_set
2251 )
2252 if selected_intersection is None:
2253 selected_intersection = target_set.intersection(
2254 selected_metrics.column._expanded_proxy_set
2255 )
2257 if len(current_intersection) > len(selected_intersection):
2258 # 'current' has a larger field of correspondence than
2259 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x
2260 # matches a1.c.x->table.c.x better than
2261 # selectable.c.x->table.c.x does.
2263 selected_metrics = current_metrics
2264 selected_intersection = current_intersection
2265 elif current_intersection == selected_intersection:
2266 # they have the same field of correspondence. see
2267 # which proxy_set has fewer columns in it, which
2268 # indicates a closer relationship with the root
2269 # column. Also take into account the "weight"
2270 # attribute which CompoundSelect() uses to give
2271 # higher precedence to columns based on vertical
2272 # position in the compound statement, and discard
2273 # columns that have no reference to the target
2274 # column (also occurs with CompoundSelect)
2276 selected_col_distance = sum(
2277 [
2278 sc._annotations.get("weight", 1)
2279 for sc in (
2280 selected_metrics.column._uncached_proxy_list()
2281 )
2282 if sc.shares_lineage(column)
2283 ],
2284 )
2285 current_col_distance = sum(
2286 [
2287 sc._annotations.get("weight", 1)
2288 for sc in (
2289 current_metrics.column._uncached_proxy_list()
2290 )
2291 if sc.shares_lineage(column)
2292 ],
2293 )
2294 if current_col_distance < selected_col_distance:
2295 selected_metrics = current_metrics
2296 selected_intersection = current_intersection
2298 return selected_metrics.column if selected_metrics else None
2301_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]")
2304class DedupeColumnCollection(WriteableColumnCollection[str, _NAMEDCOL]):
2305 """A :class:`_expression.ColumnCollection`
2306 that maintains deduplicating behavior.
2308 This is useful by schema level objects such as :class:`_schema.Table` and
2309 :class:`.PrimaryKeyConstraint`. The collection includes more
2310 sophisticated mutator methods as well to suit schema objects which
2311 require mutable column collections.
2313 .. versionadded:: 1.4
2315 """
2317 def add( # type: ignore[override]
2318 self,
2319 column: _NAMEDCOL,
2320 key: Optional[str] = None,
2321 *,
2322 index: Optional[int] = None,
2323 ) -> None:
2324 if key is not None and column.key != key:
2325 raise exc.ArgumentError(
2326 "DedupeColumnCollection requires columns be under "
2327 "the same key as their .key"
2328 )
2329 key = column.key
2331 if key is None:
2332 raise exc.ArgumentError(
2333 "Can't add unnamed column to column collection"
2334 )
2336 if key in self._index:
2337 existing = self._index[key][1]
2339 if existing is column:
2340 return
2342 self.replace(column, index=index)
2344 # pop out memoized proxy_set as this
2345 # operation may very well be occurring
2346 # in a _make_proxy operation
2347 util.memoized_property.reset(column, "proxy_set")
2348 else:
2349 self._append_new_column(key, column, index=index)
2351 def _append_new_column(
2352 self, key: str, named_column: _NAMEDCOL, *, index: Optional[int] = None
2353 ) -> None:
2354 collection_length = len(self._collection)
2356 if index is None:
2357 l = collection_length
2358 else:
2359 if index < 0:
2360 index = max(0, collection_length + index)
2361 l = index
2363 if index is None:
2364 self._collection.append(
2365 (key, named_column, _ColumnMetrics(self, named_column))
2366 )
2367 else:
2368 self._collection.insert(
2369 index, (key, named_column, _ColumnMetrics(self, named_column))
2370 )
2372 self._colset.add(named_column._deannotate())
2374 if index is not None:
2375 for idx in reversed(range(index, collection_length)):
2376 self._index[idx + 1] = self._index[idx]
2378 self._index[l] = (key, named_column)
2379 self._index[key] = (key, named_column)
2381 def _populate_separate_keys(
2382 self, iter_: Iterable[Tuple[str, _NAMEDCOL]]
2383 ) -> None:
2384 """populate from an iterator of (key, column)"""
2385 cols = list(iter_)
2387 replace_col = []
2388 for k, col in cols:
2389 if col.key != k:
2390 raise exc.ArgumentError(
2391 "DedupeColumnCollection requires columns be under "
2392 "the same key as their .key"
2393 )
2394 if col.name in self._index and col.key != col.name:
2395 replace_col.append(col)
2396 elif col.key in self._index:
2397 replace_col.append(col)
2398 else:
2399 self._index[k] = (k, col)
2400 self._collection.append((k, col, _ColumnMetrics(self, col)))
2401 self._colset.update(c._deannotate() for (k, c, _) in self._collection)
2403 self._index.update(
2404 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection)
2405 )
2406 for col in replace_col:
2407 self.replace(col)
2409 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None:
2410 self._populate_separate_keys((col.key, col) for col in iter_)
2412 def remove(self, column: _NAMEDCOL) -> None:
2413 if column not in self._colset:
2414 raise ValueError(
2415 "Can't remove column %r; column is not in this collection"
2416 % column
2417 )
2418 del self._index[column.key]
2419 self._colset.remove(column)
2420 self._collection[:] = [
2421 (k, c, metrics)
2422 for (k, c, metrics) in self._collection
2423 if c is not column
2424 ]
2425 for metrics in self._proxy_index.get(column, ()):
2426 metrics.dispose(self)
2428 self._index.update(
2429 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2430 )
2431 # delete higher index
2432 del self._index[len(self._collection)]
2434 def replace(
2435 self,
2436 column: _NAMEDCOL,
2437 *,
2438 extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
2439 index: Optional[int] = None,
2440 ) -> None:
2441 """add the given column to this collection, removing unaliased
2442 versions of this column as well as existing columns with the
2443 same key.
2445 e.g.::
2447 t = Table("sometable", metadata, Column("col1", Integer))
2448 t.columns.replace(Column("col1", Integer, key="columnone"))
2450 will remove the original 'col1' from the collection, and add
2451 the new column under the name 'columnname'.
2453 Used by schema.Column to override columns during table reflection.
2455 """
2457 if extra_remove:
2458 remove_col = set(extra_remove)
2459 else:
2460 remove_col = set()
2461 # remove up to two columns based on matches of name as well as key
2462 if column.name in self._index and column.key != column.name:
2463 other = self._index[column.name][1]
2464 if other.name == other.key:
2465 remove_col.add(other)
2467 if column.key in self._index:
2468 remove_col.add(self._index[column.key][1])
2470 if not remove_col:
2471 self._append_new_column(column.key, column, index=index)
2472 return
2473 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
2474 replace_index = None
2476 for idx, (k, col, metrics) in enumerate(self._collection):
2477 if col in remove_col:
2478 if replace_index is None:
2479 replace_index = idx
2480 new_cols.append(
2481 (column.key, column, _ColumnMetrics(self, column))
2482 )
2483 else:
2484 new_cols.append((k, col, metrics))
2486 if remove_col:
2487 self._colset.difference_update(remove_col)
2489 for rc in remove_col:
2490 for metrics in self._proxy_index.get(rc, ()):
2491 metrics.dispose(self)
2493 if replace_index is None:
2494 if index is not None:
2495 new_cols.insert(
2496 index, (column.key, column, _ColumnMetrics(self, column))
2497 )
2499 else:
2500 new_cols.append(
2501 (column.key, column, _ColumnMetrics(self, column))
2502 )
2503 elif index is not None:
2504 to_move = new_cols[replace_index]
2505 effective_positive_index = (
2506 index if index >= 0 else max(0, len(new_cols) + index)
2507 )
2508 new_cols.insert(index, to_move)
2509 if replace_index > effective_positive_index:
2510 del new_cols[replace_index + 1]
2511 else:
2512 del new_cols[replace_index]
2514 self._colset.add(column._deannotate())
2515 self._collection[:] = new_cols
2517 self._index.clear()
2519 self._index.update(
2520 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2521 )
2522 self._index.update({k: (k, col) for (k, col, _) in self._collection})
2525class ReadOnlyColumnCollection(
2526 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co]
2527):
2528 __slots__ = ("_parent",)
2530 _parent: WriteableColumnCollection[_COLKEY, _COL_co]
2532 def __init__(
2533 self, collection: WriteableColumnCollection[_COLKEY, _COL_co]
2534 ):
2535 object.__setattr__(self, "_parent", collection)
2536 object.__setattr__(self, "_index", collection._index)
2537 object.__setattr__(self, "_collection", collection._collection)
2538 object.__setattr__(self, "_colset", collection._colset)
2539 object.__setattr__(self, "_proxy_index", collection._proxy_index)
2541 def _as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
2542 return self
2544 def __getstate__(self) -> Dict[str, ColumnCollection[_COLKEY, _COL_co]]:
2545 return {"_parent": self._parent}
2547 def __setstate__(self, state: Dict[str, Any]) -> None:
2548 parent = state["_parent"]
2549 self.__init__(parent) # type: ignore
2551 def corresponding_column(
2552 self, column: _COL, require_embedded: bool = False
2553 ) -> Optional[Union[_COL, _COL_co]]:
2554 """Given a :class:`_expression.ColumnElement`, return the exported
2555 :class:`_expression.ColumnElement` object from this
2556 :class:`_expression.ColumnCollection`
2557 which corresponds to that original :class:`_expression.ColumnElement`
2558 via a common
2559 ancestor column.
2561 See :meth:`.ColumnCollection.corresponding_column` for parameter
2562 information.
2564 """
2565 return self._parent.corresponding_column(column, require_embedded)
2568class ColumnSet(util.OrderedSet["ColumnClause[Any]"]):
2569 def contains_column(self, col: ColumnClause[Any]) -> bool:
2570 return col in self
2572 def extend(self, cols: Iterable[Any]) -> None:
2573 for col in cols:
2574 self.add(col)
2576 def __eq__(self, other):
2577 l = []
2578 for c in other:
2579 for local in self:
2580 if c.shares_lineage(local):
2581 l.append(c == local)
2582 return elements.and_(*l)
2584 def __hash__(self) -> int: # type: ignore[override]
2585 return hash(tuple(x for x in self))
2588def _entity_namespace(
2589 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2590) -> _EntityNamespace:
2591 """Return the nearest .entity_namespace for the given entity.
2593 If not immediately available, does an iterate to find a sub-element
2594 that has one, if any.
2596 """
2597 try:
2598 return cast(_HasEntityNamespace, entity).entity_namespace
2599 except AttributeError:
2600 for elem in visitors.iterate(cast(ExternallyTraversible, entity)):
2601 if _is_has_entity_namespace(elem):
2602 return elem.entity_namespace
2603 else:
2604 raise
2607@overload
2608def _entity_namespace_key(
2609 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2610 key: str,
2611) -> SQLCoreOperations[Any]: ...
2614@overload
2615def _entity_namespace_key(
2616 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2617 key: str,
2618 default: _NoArg,
2619) -> SQLCoreOperations[Any]: ...
2622@overload
2623def _entity_namespace_key(
2624 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2625 key: str,
2626 default: _T,
2627) -> Union[SQLCoreOperations[Any], _T]: ...
2630def _entity_namespace_key(
2631 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2632 key: str,
2633 default: Union[SQLCoreOperations[Any], _T, _NoArg] = NO_ARG,
2634) -> Union[SQLCoreOperations[Any], _T]:
2635 """Return an entry from an entity_namespace.
2638 Raises :class:`_exc.InvalidRequestError` rather than attribute error
2639 on not found.
2641 """
2643 try:
2644 ns = _entity_namespace(entity)
2645 if default is not NO_ARG:
2646 return getattr(ns, key, default)
2647 else:
2648 return getattr(ns, key) # type: ignore
2649 except AttributeError as err:
2650 raise exc.InvalidRequestError(
2651 'Entity namespace for "%s" has no property "%s"' % (entity, key)
2652 ) from err
2655def _entity_namespace_key_search_all(
2656 entities: Collection[Any],
2657 key: str,
2658) -> SQLCoreOperations[Any]:
2659 """Search multiple entities for a key, raise if ambiguous or not found.
2661 This is used by filter_by() to search across all FROM clause entities
2662 when a single entity doesn't have the requested attribute.
2664 .. versionadded:: 2.1
2666 Raises:
2667 AmbiguousColumnError: If key exists in multiple entities
2668 InvalidRequestError: If key doesn't exist in any entity
2669 """
2671 match_: SQLCoreOperations[Any] | None = None
2673 for entity in entities:
2674 ns = _entity_namespace(entity)
2675 # Check if the attribute exists
2676 if hasattr(ns, key):
2677 if match_ is not None:
2678 entity_desc = ", ".join(str(e) for e in list(entities)[:3])
2679 if len(entities) > 3:
2680 entity_desc += f", ... ({len(entities)} total)"
2681 raise exc.AmbiguousColumnError(
2682 f'Attribute name "{key}" is ambiguous; it exists in '
2683 f"multiple FROM clause entities ({entity_desc}). "
2684 f"Use filter() with explicit column references instead "
2685 f"of filter_by()."
2686 )
2687 match_ = getattr(ns, key)
2689 if match_ is None:
2690 # No entity has this attribute
2691 entity_desc = ", ".join(str(e) for e in list(entities)[:3])
2692 if len(entities) > 3:
2693 entity_desc += f", ... ({len(entities)} total)"
2694 raise exc.InvalidRequestError(
2695 f'None of the FROM clause entities have a property "{key}". '
2696 f"Searched entities: {entity_desc}"
2697 )
2699 return match_