Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/base.py: 57%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# sql/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
9"""Foundational utilities common to many sql modules."""
12from __future__ import annotations
14import collections
15from enum import Enum
16import itertools
17from itertools import zip_longest
18import operator
19import re
20from typing import Any
21from typing import Callable
22from typing import cast
23from typing import Dict
24from typing import FrozenSet
25from typing import Generator
26from typing import Generic
27from typing import Iterable
28from typing import Iterator
29from typing import List
30from typing import Mapping
31from typing import MutableMapping
32from typing import NamedTuple
33from typing import NoReturn
34from typing import Optional
35from typing import overload
36from typing import Sequence
37from typing import Set
38from typing import Tuple
39from typing import Type
40from typing import TYPE_CHECKING
41from typing import TypeVar
42from typing import Union
44from . import roles
45from . import visitors
46from .cache_key import HasCacheKey # noqa
47from .cache_key import MemoizedHasCacheKey # noqa
48from .traversals import HasCopyInternals # noqa
49from .visitors import ClauseVisitor
50from .visitors import ExtendedInternalTraversal
51from .visitors import ExternallyTraversible
52from .visitors import InternalTraversal
53from .. import event
54from .. import exc
55from .. import util
56from ..util import HasMemoized as HasMemoized
57from ..util import hybridmethod
58from ..util import typing as compat_typing
59from ..util.typing import Final
60from ..util.typing import Protocol
61from ..util.typing import Self
62from ..util.typing import TypeGuard
64if TYPE_CHECKING:
65 from . import coercions
66 from . import elements
67 from . import type_api
68 from ._orm_types import DMLStrategyArgument
69 from ._orm_types import SynchronizeSessionArgument
70 from ._typing import _CLE
71 from .cache_key import CacheKey
72 from .compiler import SQLCompiler
73 from .elements import BindParameter
74 from .elements import ClauseList
75 from .elements import ColumnClause # noqa
76 from .elements import ColumnElement
77 from .elements import NamedColumn
78 from .elements import SQLCoreOperations
79 from .elements import TextClause
80 from .schema import Column
81 from .schema import DefaultGenerator
82 from .selectable import _JoinTargetElement
83 from .selectable import _SelectIterable
84 from .selectable import FromClause
85 from .visitors import anon_map
86 from ..engine import Connection
87 from ..engine import CursorResult
88 from ..engine.interfaces import _CoreMultiExecuteParams
89 from ..engine.interfaces import _ExecuteOptions
90 from ..engine.interfaces import _ImmutableExecuteOptions
91 from ..engine.interfaces import CacheStats
92 from ..engine.interfaces import Compiled
93 from ..engine.interfaces import CompiledCacheType
94 from ..engine.interfaces import CoreExecuteOptionsParameter
95 from ..engine.interfaces import Dialect
96 from ..engine.interfaces import IsolationLevel
97 from ..engine.interfaces import SchemaTranslateMapType
98 from ..event import dispatcher
100if not TYPE_CHECKING:
101 coercions = None # noqa
102 elements = None # noqa
103 type_api = None # noqa
106class _NoArg(Enum):
107 NO_ARG = 0
109 def __repr__(self):
110 return f"_NoArg.{self.name}"
113NO_ARG: Final = _NoArg.NO_ARG
116class _NoneName(Enum):
117 NONE_NAME = 0
118 """indicate a 'deferred' name that was ultimately the value None."""
121_NONE_NAME: Final = _NoneName.NONE_NAME
123_T = TypeVar("_T", bound=Any)
125_Fn = TypeVar("_Fn", bound=Callable[..., Any])
127_AmbiguousTableNameMap = MutableMapping[str, str]
130class _DefaultDescriptionTuple(NamedTuple):
131 arg: Any
132 is_scalar: Optional[bool]
133 is_callable: Optional[bool]
134 is_sentinel: Optional[bool]
136 @classmethod
137 def _from_column_default(
138 cls, default: Optional[DefaultGenerator]
139 ) -> _DefaultDescriptionTuple:
140 return (
141 _DefaultDescriptionTuple(
142 default.arg, # type: ignore
143 default.is_scalar,
144 default.is_callable,
145 default.is_sentinel,
146 )
147 if default
148 and (
149 default.has_arg
150 or (not default.for_update and default.is_sentinel)
151 )
152 else _DefaultDescriptionTuple(None, None, None, None)
153 )
156_never_select_column: operator.attrgetter[Any] = operator.attrgetter(
157 "_omit_from_statements"
158)
161class _EntityNamespace(Protocol):
162 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ...
165class _HasEntityNamespace(Protocol):
166 @util.ro_non_memoized_property
167 def entity_namespace(self) -> _EntityNamespace: ...
170def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]:
171 return hasattr(element, "entity_namespace")
174# Remove when https://github.com/python/mypy/issues/14640 will be fixed
175_Self = TypeVar("_Self", bound=Any)
178class Immutable:
179 """mark a ClauseElement as 'immutable' when expressions are cloned.
181 "immutable" objects refers to the "mutability" of an object in the
182 context of SQL DQL and DML generation. Such as, in DQL, one can
183 compose a SELECT or subquery of varied forms, but one cannot modify
184 the structure of a specific table or column within DQL.
185 :class:`.Immutable` is mostly intended to follow this concept, and as
186 such the primary "immutable" objects are :class:`.ColumnClause`,
187 :class:`.Column`, :class:`.TableClause`, :class:`.Table`.
189 """
191 __slots__ = ()
193 _is_immutable: bool = True
195 def unique_params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
196 raise NotImplementedError("Immutable objects do not support copying")
198 def params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
199 raise NotImplementedError("Immutable objects do not support copying")
201 def _clone(self: _Self, **kw: Any) -> _Self:
202 return self
204 def _copy_internals(
205 self, *, omit_attrs: Iterable[str] = (), **kw: Any
206 ) -> None:
207 pass
210class SingletonConstant(Immutable):
211 """Represent SQL constants like NULL, TRUE, FALSE"""
213 _is_singleton_constant: bool = True
215 _singleton: SingletonConstant
217 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T:
218 return cast(_T, cls._singleton)
220 @util.non_memoized_property
221 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
222 raise NotImplementedError()
224 @classmethod
225 def _create_singleton(cls) -> None:
226 obj = object.__new__(cls)
227 obj.__init__() # type: ignore
229 # for a long time this was an empty frozenset, meaning
230 # a SingletonConstant would never be a "corresponding column" in
231 # a statement. This referred to #6259. However, in #7154 we see
232 # that we do in fact need "correspondence" to work when matching cols
233 # in result sets, so the non-correspondence was moved to a more
234 # specific level when we are actually adapting expressions for SQL
235 # render only.
236 obj.proxy_set = frozenset([obj])
237 cls._singleton = obj
240def _from_objects(
241 *elements: Union[
242 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement
243 ]
244) -> Iterator[FromClause]:
245 return itertools.chain.from_iterable(
246 [element._from_objects for element in elements]
247 )
250def _select_iterables(
251 elements: Iterable[roles.ColumnsClauseRole],
252) -> _SelectIterable:
253 """expand tables into individual columns in the
254 given list of column expressions.
256 """
257 return itertools.chain.from_iterable(
258 [c._select_iterable for c in elements]
259 )
262_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType")
265class _GenerativeType(compat_typing.Protocol):
266 def _generate(self) -> Self: ...
269def _generative(fn: _Fn) -> _Fn:
270 """non-caching _generative() decorator.
272 This is basically the legacy decorator that copies the object and
273 runs a method on the new copy.
275 """
277 @util.decorator
278 def _generative(
279 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any
280 ) -> _SelfGenerativeType:
281 """Mark a method as generative."""
283 self = self._generate()
284 x = fn(self, *args, **kw)
285 assert x is self, "generative methods must return self"
286 return self
288 decorated = _generative(fn)
289 decorated.non_generative = fn # type: ignore
290 return decorated
293def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]:
294 msgs: Dict[str, str] = kw.pop("msgs", {})
296 defaults: Dict[str, str] = kw.pop("defaults", {})
298 getters: List[Tuple[str, operator.attrgetter[Any], Optional[str]]] = [
299 (name, operator.attrgetter(name), defaults.get(name, None))
300 for name in names
301 ]
303 @util.decorator
304 def check(fn: _Fn, *args: Any, **kw: Any) -> Any:
305 # make pylance happy by not including "self" in the argument
306 # list
307 self = args[0]
308 args = args[1:]
309 for name, getter, default_ in getters:
310 if getter(self) is not default_:
311 msg = msgs.get(
312 name,
313 "Method %s() has already been invoked on this %s construct"
314 % (fn.__name__, self.__class__),
315 )
316 raise exc.InvalidRequestError(msg)
317 return fn(self, *args, **kw)
319 return check
322def _clone(element, **kw):
323 return element._clone(**kw)
326def _expand_cloned(
327 elements: Iterable[_CLE],
328) -> Iterable[_CLE]:
329 """expand the given set of ClauseElements to be the set of all 'cloned'
330 predecessors.
332 """
333 # TODO: cython candidate
334 return itertools.chain(*[x._cloned_set for x in elements])
337def _de_clone(
338 elements: Iterable[_CLE],
339) -> Iterable[_CLE]:
340 for x in elements:
341 while x._is_clone_of is not None:
342 x = x._is_clone_of
343 yield x
346def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
347 """return the intersection of sets a and b, counting
348 any overlap between 'cloned' predecessors.
350 The returned set is in terms of the entities present within 'a'.
352 """
353 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
354 _expand_cloned(b)
355 )
356 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)}
359def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
360 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
361 _expand_cloned(b)
362 )
363 return {
364 elem for elem in a if not all_overlap.intersection(elem._cloned_set)
365 }
368class _DialectArgView(MutableMapping[str, Any]):
369 """A dictionary view of dialect-level arguments in the form
370 <dialectname>_<argument_name>.
372 """
374 __slots__ = ("obj",)
376 def __init__(self, obj: DialectKWArgs) -> None:
377 self.obj = obj
379 def _key(self, key: str) -> Tuple[str, str]:
380 try:
381 dialect, value_key = key.split("_", 1)
382 except ValueError as err:
383 raise KeyError(key) from err
384 else:
385 return dialect, value_key
387 def __getitem__(self, key: str) -> Any:
388 dialect, value_key = self._key(key)
390 try:
391 opt = self.obj.dialect_options[dialect]
392 except exc.NoSuchModuleError as err:
393 raise KeyError(key) from err
394 else:
395 return opt[value_key]
397 def __setitem__(self, key: str, value: Any) -> None:
398 try:
399 dialect, value_key = self._key(key)
400 except KeyError as err:
401 raise exc.ArgumentError(
402 "Keys must be of the form <dialectname>_<argname>"
403 ) from err
404 else:
405 self.obj.dialect_options[dialect][value_key] = value
407 def __delitem__(self, key: str) -> None:
408 dialect, value_key = self._key(key)
409 del self.obj.dialect_options[dialect][value_key]
411 def __len__(self) -> int:
412 return sum(
413 len(args._non_defaults)
414 for args in self.obj.dialect_options.values()
415 )
417 def __iter__(self) -> Generator[str, None, None]:
418 return (
419 "%s_%s" % (dialect_name, value_name)
420 for dialect_name in self.obj.dialect_options
421 for value_name in self.obj.dialect_options[
422 dialect_name
423 ]._non_defaults
424 )
427class _DialectArgDict(MutableMapping[str, Any]):
428 """A dictionary view of dialect-level arguments for a specific
429 dialect.
431 Maintains a separate collection of user-specified arguments
432 and dialect-specified default arguments.
434 """
436 def __init__(self) -> None:
437 self._non_defaults: Dict[str, Any] = {}
438 self._defaults: Dict[str, Any] = {}
440 def __len__(self) -> int:
441 return len(set(self._non_defaults).union(self._defaults))
443 def __iter__(self) -> Iterator[str]:
444 return iter(set(self._non_defaults).union(self._defaults))
446 def __getitem__(self, key: str) -> Any:
447 if key in self._non_defaults:
448 return self._non_defaults[key]
449 else:
450 return self._defaults[key]
452 def __setitem__(self, key: str, value: Any) -> None:
453 self._non_defaults[key] = value
455 def __delitem__(self, key: str) -> None:
456 del self._non_defaults[key]
459@util.preload_module("sqlalchemy.dialects")
460def _kw_reg_for_dialect(dialect_name: str) -> Optional[Dict[Any, Any]]:
461 dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
462 if dialect_cls.construct_arguments is None:
463 return None
464 return dict(dialect_cls.construct_arguments)
467class DialectKWArgs:
468 """Establish the ability for a class to have dialect-specific arguments
469 with defaults and constructor validation.
471 The :class:`.DialectKWArgs` interacts with the
472 :attr:`.DefaultDialect.construct_arguments` present on a dialect.
474 .. seealso::
476 :attr:`.DefaultDialect.construct_arguments`
478 """
480 __slots__ = ()
482 _dialect_kwargs_traverse_internals: List[Tuple[str, Any]] = [
483 ("dialect_options", InternalTraversal.dp_dialect_options)
484 ]
486 @classmethod
487 def argument_for(
488 cls, dialect_name: str, argument_name: str, default: Any
489 ) -> None:
490 """Add a new kind of dialect-specific keyword argument for this class.
492 E.g.::
494 Index.argument_for("mydialect", "length", None)
496 some_index = Index("a", "b", mydialect_length=5)
498 The :meth:`.DialectKWArgs.argument_for` method is a per-argument
499 way adding extra arguments to the
500 :attr:`.DefaultDialect.construct_arguments` dictionary. This
501 dictionary provides a list of argument names accepted by various
502 schema-level constructs on behalf of a dialect.
504 New dialects should typically specify this dictionary all at once as a
505 data member of the dialect class. The use case for ad-hoc addition of
506 argument names is typically for end-user code that is also using
507 a custom compilation scheme which consumes the additional arguments.
509 :param dialect_name: name of a dialect. The dialect must be
510 locatable, else a :class:`.NoSuchModuleError` is raised. The
511 dialect must also include an existing
512 :attr:`.DefaultDialect.construct_arguments` collection, indicating
513 that it participates in the keyword-argument validation and default
514 system, else :class:`.ArgumentError` is raised. If the dialect does
515 not include this collection, then any keyword argument can be
516 specified on behalf of this dialect already. All dialects packaged
517 within SQLAlchemy include this collection, however for third party
518 dialects, support may vary.
520 :param argument_name: name of the parameter.
522 :param default: default value of the parameter.
524 """
526 construct_arg_dictionary: Optional[Dict[Any, Any]] = (
527 DialectKWArgs._kw_registry[dialect_name]
528 )
529 if construct_arg_dictionary is None:
530 raise exc.ArgumentError(
531 "Dialect '%s' does have keyword-argument "
532 "validation and defaults enabled configured" % dialect_name
533 )
534 if cls not in construct_arg_dictionary:
535 construct_arg_dictionary[cls] = {}
536 construct_arg_dictionary[cls][argument_name] = default
538 @property
539 def dialect_kwargs(self) -> _DialectArgView:
540 """A collection of keyword arguments specified as dialect-specific
541 options to this construct.
543 The arguments are present here in their original ``<dialect>_<kwarg>``
544 format. Only arguments that were actually passed are included;
545 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which
546 contains all options known by this dialect including defaults.
548 The collection is also writable; keys are accepted of the
549 form ``<dialect>_<kwarg>`` where the value will be assembled
550 into the list of options.
552 .. seealso::
554 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form
556 """
557 return _DialectArgView(self)
559 @property
560 def kwargs(self) -> _DialectArgView:
561 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
562 return self.dialect_kwargs
564 _kw_registry: util.PopulateDict[str, Optional[Dict[Any, Any]]] = (
565 util.PopulateDict(_kw_reg_for_dialect)
566 )
568 @classmethod
569 def _kw_reg_for_dialect_cls(cls, dialect_name: str) -> _DialectArgDict:
570 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
571 d = _DialectArgDict()
573 if construct_arg_dictionary is None:
574 d._defaults.update({"*": None})
575 else:
576 for cls in reversed(cls.__mro__):
577 if cls in construct_arg_dictionary:
578 d._defaults.update(construct_arg_dictionary[cls])
579 return d
581 @util.memoized_property
582 def dialect_options(self) -> util.PopulateDict[str, _DialectArgDict]:
583 """A collection of keyword arguments specified as dialect-specific
584 options to this construct.
586 This is a two-level nested registry, keyed to ``<dialect_name>``
587 and ``<argument_name>``. For example, the ``postgresql_where``
588 argument would be locatable as::
590 arg = my_object.dialect_options["postgresql"]["where"]
592 .. versionadded:: 0.9.2
594 .. seealso::
596 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form
598 """
600 return util.PopulateDict(self._kw_reg_for_dialect_cls)
602 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None:
603 # validate remaining kwargs that they all specify DB prefixes
605 if not kwargs:
606 return
608 for k in kwargs:
609 m = re.match("^(.+?)_(.+)$", k)
610 if not m:
611 raise TypeError(
612 "Additional arguments should be "
613 "named <dialectname>_<argument>, got '%s'" % k
614 )
615 dialect_name, arg_name = m.group(1, 2)
617 try:
618 construct_arg_dictionary = self.dialect_options[dialect_name]
619 except exc.NoSuchModuleError:
620 util.warn(
621 "Can't validate argument %r; can't "
622 "locate any SQLAlchemy dialect named %r"
623 % (k, dialect_name)
624 )
625 self.dialect_options[dialect_name] = d = _DialectArgDict()
626 d._defaults.update({"*": None})
627 d._non_defaults[arg_name] = kwargs[k]
628 else:
629 if (
630 "*" not in construct_arg_dictionary
631 and arg_name not in construct_arg_dictionary
632 ):
633 raise exc.ArgumentError(
634 "Argument %r is not accepted by "
635 "dialect %r on behalf of %r"
636 % (k, dialect_name, self.__class__)
637 )
638 else:
639 construct_arg_dictionary[arg_name] = kwargs[k]
642class CompileState:
643 """Produces additional object state necessary for a statement to be
644 compiled.
646 the :class:`.CompileState` class is at the base of classes that assemble
647 state for a particular statement object that is then used by the
648 compiler. This process is essentially an extension of the process that
649 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis
650 on converting raw user intent into more organized structures rather than
651 producing string output. The top-level :class:`.CompileState` for the
652 statement being executed is also accessible when the execution context
653 works with invoking the statement and collecting results.
655 The production of :class:`.CompileState` is specific to the compiler, such
656 as within the :meth:`.SQLCompiler.visit_insert`,
657 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also
658 responsible for associating the :class:`.CompileState` with the
659 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement,
660 i.e. the outermost SQL statement that's actually being executed.
661 There can be other :class:`.CompileState` objects that are not the
662 toplevel, such as when a SELECT subquery or CTE-nested
663 INSERT/UPDATE/DELETE is generated.
665 .. versionadded:: 1.4
667 """
669 __slots__ = ("statement", "_ambiguous_table_name_map")
671 plugins: Dict[Tuple[str, str], Type[CompileState]] = {}
673 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap]
675 @classmethod
676 def create_for_statement(
677 cls, statement: Executable, compiler: SQLCompiler, **kw: Any
678 ) -> CompileState:
679 # factory construction.
681 if statement._propagate_attrs:
682 plugin_name = statement._propagate_attrs.get(
683 "compile_state_plugin", "default"
684 )
685 klass = cls.plugins.get(
686 (plugin_name, statement._effective_plugin_target), None
687 )
688 if klass is None:
689 klass = cls.plugins[
690 ("default", statement._effective_plugin_target)
691 ]
693 else:
694 klass = cls.plugins[
695 ("default", statement._effective_plugin_target)
696 ]
698 if klass is cls:
699 return cls(statement, compiler, **kw)
700 else:
701 return klass.create_for_statement(statement, compiler, **kw)
703 def __init__(self, statement, compiler, **kw):
704 self.statement = statement
706 @classmethod
707 def get_plugin_class(
708 cls, statement: Executable
709 ) -> Optional[Type[CompileState]]:
710 plugin_name = statement._propagate_attrs.get(
711 "compile_state_plugin", None
712 )
714 if plugin_name:
715 key = (plugin_name, statement._effective_plugin_target)
716 if key in cls.plugins:
717 return cls.plugins[key]
719 # there's no case where we call upon get_plugin_class() and want
720 # to get None back, there should always be a default. return that
721 # if there was no plugin-specific class (e.g. Insert with "orm"
722 # plugin)
723 try:
724 return cls.plugins[("default", statement._effective_plugin_target)]
725 except KeyError:
726 return None
728 @classmethod
729 def _get_plugin_class_for_plugin(
730 cls, statement: Executable, plugin_name: str
731 ) -> Optional[Type[CompileState]]:
732 try:
733 return cls.plugins[
734 (plugin_name, statement._effective_plugin_target)
735 ]
736 except KeyError:
737 return None
739 @classmethod
740 def plugin_for(
741 cls, plugin_name: str, visit_name: str
742 ) -> Callable[[_Fn], _Fn]:
743 def decorate(cls_to_decorate):
744 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate
745 return cls_to_decorate
747 return decorate
750class Generative(HasMemoized):
751 """Provide a method-chaining pattern in conjunction with the
752 @_generative decorator."""
754 def _generate(self) -> Self:
755 skip = self._memoized_keys
756 cls = self.__class__
757 s = cls.__new__(cls)
758 if skip:
759 # ensure this iteration remains atomic
760 s.__dict__ = {
761 k: v for k, v in self.__dict__.copy().items() if k not in skip
762 }
763 else:
764 s.__dict__ = self.__dict__.copy()
765 return s
768class InPlaceGenerative(HasMemoized):
769 """Provide a method-chaining pattern in conjunction with the
770 @_generative decorator that mutates in place."""
772 __slots__ = ()
774 def _generate(self):
775 skip = self._memoized_keys
776 # note __dict__ needs to be in __slots__ if this is used
777 for k in skip:
778 self.__dict__.pop(k, None)
779 return self
782class HasCompileState(Generative):
783 """A class that has a :class:`.CompileState` associated with it."""
785 _compile_state_plugin: Optional[Type[CompileState]] = None
787 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT
789 _compile_state_factory = CompileState.create_for_statement
792class _MetaOptions(type):
793 """metaclass for the Options class.
795 This metaclass is actually necessary despite the availability of the
796 ``__init_subclass__()`` hook as this type also provides custom class-level
797 behavior for the ``__add__()`` method.
799 """
801 _cache_attrs: Tuple[str, ...]
803 def __add__(self, other):
804 o1 = self()
806 if set(other).difference(self._cache_attrs):
807 raise TypeError(
808 "dictionary contains attributes not covered by "
809 "Options class %s: %r"
810 % (self, set(other).difference(self._cache_attrs))
811 )
813 o1.__dict__.update(other)
814 return o1
816 if TYPE_CHECKING:
818 def __getattr__(self, key: str) -> Any: ...
820 def __setattr__(self, key: str, value: Any) -> None: ...
822 def __delattr__(self, key: str) -> None: ...
825class Options(metaclass=_MetaOptions):
826 """A cacheable option dictionary with defaults."""
828 __slots__ = ()
830 _cache_attrs: Tuple[str, ...]
832 def __init_subclass__(cls) -> None:
833 dict_ = cls.__dict__
834 cls._cache_attrs = tuple(
835 sorted(
836 d
837 for d in dict_
838 if not d.startswith("__")
839 and d not in ("_cache_key_traversal",)
840 )
841 )
842 super().__init_subclass__()
844 def __init__(self, **kw: Any) -> None:
845 self.__dict__.update(kw)
847 def __add__(self, other):
848 o1 = self.__class__.__new__(self.__class__)
849 o1.__dict__.update(self.__dict__)
851 if set(other).difference(self._cache_attrs):
852 raise TypeError(
853 "dictionary contains attributes not covered by "
854 "Options class %s: %r"
855 % (self, set(other).difference(self._cache_attrs))
856 )
858 o1.__dict__.update(other)
859 return o1
861 def __eq__(self, other):
862 # TODO: very inefficient. This is used only in test suites
863 # right now.
864 for a, b in zip_longest(self._cache_attrs, other._cache_attrs):
865 if getattr(self, a) != getattr(other, b):
866 return False
867 return True
869 def __repr__(self) -> str:
870 # TODO: fairly inefficient, used only in debugging right now.
872 return "%s(%s)" % (
873 self.__class__.__name__,
874 ", ".join(
875 "%s=%r" % (k, self.__dict__[k])
876 for k in self._cache_attrs
877 if k in self.__dict__
878 ),
879 )
881 @classmethod
882 def isinstance(cls, klass: Type[Any]) -> bool:
883 return issubclass(cls, klass)
885 @hybridmethod
886 def add_to_element(self, name: str, value: str) -> Any:
887 return self + {name: getattr(self, name) + value}
889 @hybridmethod
890 def _state_dict_inst(self) -> Mapping[str, Any]:
891 return self.__dict__
893 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT
895 @_state_dict_inst.classlevel
896 def _state_dict(cls) -> Mapping[str, Any]:
897 return cls._state_dict_const
899 @classmethod
900 def safe_merge(cls, other: "Options") -> Any:
901 d = other._state_dict()
903 # only support a merge with another object of our class
904 # and which does not have attrs that we don't. otherwise
905 # we risk having state that might not be part of our cache
906 # key strategy
908 if (
909 cls is not other.__class__
910 and other._cache_attrs
911 and set(other._cache_attrs).difference(cls._cache_attrs)
912 ):
913 raise TypeError(
914 "other element %r is not empty, is not of type %s, "
915 "and contains attributes not covered here %r"
916 % (
917 other,
918 cls,
919 set(other._cache_attrs).difference(cls._cache_attrs),
920 )
921 )
922 return cls + d
924 @classmethod
925 def from_execution_options(
926 cls,
927 key: str,
928 attrs: set[str],
929 exec_options: Mapping[str, Any],
930 statement_exec_options: Mapping[str, Any],
931 ) -> Tuple["Options", Mapping[str, Any]]:
932 """process Options argument in terms of execution options.
935 e.g.::
937 (
938 load_options,
939 execution_options,
940 ) = QueryContext.default_load_options.from_execution_options(
941 "_sa_orm_load_options",
942 {"populate_existing", "autoflush", "yield_per"},
943 execution_options,
944 statement._execution_options,
945 )
947 get back the Options and refresh "_sa_orm_load_options" in the
948 exec options dict w/ the Options as well
950 """
952 # common case is that no options we are looking for are
953 # in either dictionary, so cancel for that first
954 check_argnames = attrs.intersection(
955 set(exec_options).union(statement_exec_options)
956 )
958 existing_options = exec_options.get(key, cls)
960 if check_argnames:
961 result = {}
962 for argname in check_argnames:
963 local = "_" + argname
964 if argname in exec_options:
965 result[local] = exec_options[argname]
966 elif argname in statement_exec_options:
967 result[local] = statement_exec_options[argname]
969 new_options = existing_options + result
970 exec_options = util.immutabledict(exec_options).merge_with(
971 {key: new_options}
972 )
973 return new_options, exec_options
975 else:
976 return existing_options, exec_options
978 if TYPE_CHECKING:
980 def __getattr__(self, key: str) -> Any: ...
982 def __setattr__(self, key: str, value: Any) -> None: ...
984 def __delattr__(self, key: str) -> None: ...
987class CacheableOptions(Options, HasCacheKey):
988 __slots__ = ()
990 @hybridmethod
991 def _gen_cache_key_inst(
992 self, anon_map: Any, bindparams: List[BindParameter[Any]]
993 ) -> Optional[Tuple[Any]]:
994 return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
996 @_gen_cache_key_inst.classlevel
997 def _gen_cache_key(
998 cls, anon_map: "anon_map", bindparams: List[BindParameter[Any]]
999 ) -> Tuple[CacheableOptions, Any]:
1000 return (cls, ())
1002 @hybridmethod
1003 def _generate_cache_key(self) -> Optional[CacheKey]:
1004 return HasCacheKey._generate_cache_key_for_object(self)
1007class ExecutableOption(HasCopyInternals):
1008 __slots__ = ()
1010 _annotations: _ImmutableExecuteOptions = util.EMPTY_DICT
1012 __visit_name__: str = "executable_option"
1014 _is_has_cache_key: bool = False
1016 _is_core: bool = True
1018 def _clone(self, **kw):
1019 """Create a shallow copy of this ExecutableOption."""
1020 c = self.__class__.__new__(self.__class__)
1021 c.__dict__ = dict(self.__dict__) # type: ignore
1022 return c
1025class Executable(roles.StatementRole):
1026 """Mark a :class:`_expression.ClauseElement` as supporting execution.
1028 :class:`.Executable` is a superclass for all "statement" types
1029 of objects, including :func:`select`, :func:`delete`, :func:`update`,
1030 :func:`insert`, :func:`text`.
1032 """
1034 supports_execution: bool = True
1035 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT
1036 _is_default_generator: bool = False
1037 _with_options: Tuple[ExecutableOption, ...] = ()
1038 _with_context_options: Tuple[
1039 Tuple[Callable[[CompileState], None], Any], ...
1040 ] = ()
1041 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]]
1043 _executable_traverse_internals = [
1044 ("_with_options", InternalTraversal.dp_executable_options),
1045 (
1046 "_with_context_options",
1047 ExtendedInternalTraversal.dp_with_context_options,
1048 ),
1049 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
1050 ]
1052 is_select: bool = False
1053 is_from_statement: bool = False
1054 is_update: bool = False
1055 is_insert: bool = False
1056 is_text: bool = False
1057 is_delete: bool = False
1058 is_dml: bool = False
1060 if TYPE_CHECKING:
1061 __visit_name__: str
1063 def _compile_w_cache(
1064 self,
1065 dialect: Dialect,
1066 *,
1067 compiled_cache: Optional[CompiledCacheType],
1068 column_keys: List[str],
1069 for_executemany: bool = False,
1070 schema_translate_map: Optional[SchemaTranslateMapType] = None,
1071 **kw: Any,
1072 ) -> Tuple[
1073 Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats
1074 ]: ...
1076 def _execute_on_connection(
1077 self,
1078 connection: Connection,
1079 distilled_params: _CoreMultiExecuteParams,
1080 execution_options: CoreExecuteOptionsParameter,
1081 ) -> CursorResult[Any]: ...
1083 def _execute_on_scalar(
1084 self,
1085 connection: Connection,
1086 distilled_params: _CoreMultiExecuteParams,
1087 execution_options: CoreExecuteOptionsParameter,
1088 ) -> Any: ...
1090 @util.ro_non_memoized_property
1091 def _all_selected_columns(self) -> _SelectIterable:
1092 raise NotImplementedError()
1094 @property
1095 def _effective_plugin_target(self) -> str:
1096 return self.__visit_name__
1098 @_generative
1099 def options(self, *options: ExecutableOption) -> Self:
1100 """Apply options to this statement.
1102 In the general sense, options are any kind of Python object
1103 that can be interpreted by the SQL compiler for the statement.
1104 These options can be consumed by specific dialects or specific kinds
1105 of compilers.
1107 The most commonly known kind of option are the ORM level options
1108 that apply "eager load" and other loading behaviors to an ORM
1109 query. However, options can theoretically be used for many other
1110 purposes.
1112 For background on specific kinds of options for specific kinds of
1113 statements, refer to the documentation for those option objects.
1115 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to
1116 Core statement objects towards the goal of allowing unified
1117 Core / ORM querying capabilities.
1119 .. seealso::
1121 :ref:`loading_columns` - refers to options specific to the usage
1122 of ORM queries
1124 :ref:`relationship_loader_options` - refers to options specific
1125 to the usage of ORM queries
1127 """
1128 self._with_options += tuple(
1129 coercions.expect(roles.ExecutableOptionRole, opt)
1130 for opt in options
1131 )
1132 return self
1134 @_generative
1135 def _set_compile_options(self, compile_options: CacheableOptions) -> Self:
1136 """Assign the compile options to a new value.
1138 :param compile_options: appropriate CacheableOptions structure
1140 """
1142 self._compile_options = compile_options
1143 return self
1145 @_generative
1146 def _update_compile_options(self, options: CacheableOptions) -> Self:
1147 """update the _compile_options with new keys."""
1149 assert self._compile_options is not None
1150 self._compile_options += options
1151 return self
1153 @_generative
1154 def _add_context_option(
1155 self,
1156 callable_: Callable[[CompileState], None],
1157 cache_args: Any,
1158 ) -> Self:
1159 """Add a context option to this statement.
1161 These are callable functions that will
1162 be given the CompileState object upon compilation.
1164 A second argument cache_args is required, which will be combined with
1165 the ``__code__`` identity of the function itself in order to produce a
1166 cache key.
1168 """
1169 self._with_context_options += ((callable_, cache_args),)
1170 return self
1172 @overload
1173 def execution_options(
1174 self,
1175 *,
1176 compiled_cache: Optional[CompiledCacheType] = ...,
1177 logging_token: str = ...,
1178 isolation_level: IsolationLevel = ...,
1179 no_parameters: bool = False,
1180 stream_results: bool = False,
1181 max_row_buffer: int = ...,
1182 yield_per: int = ...,
1183 insertmanyvalues_page_size: int = ...,
1184 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
1185 populate_existing: bool = False,
1186 autoflush: bool = False,
1187 synchronize_session: SynchronizeSessionArgument = ...,
1188 dml_strategy: DMLStrategyArgument = ...,
1189 render_nulls: bool = ...,
1190 is_delete_using: bool = ...,
1191 is_update_from: bool = ...,
1192 preserve_rowcount: bool = False,
1193 **opt: Any,
1194 ) -> Self: ...
1196 @overload
1197 def execution_options(self, **opt: Any) -> Self: ...
1199 @_generative
1200 def execution_options(self, **kw: Any) -> Self:
1201 """Set non-SQL options for the statement which take effect during
1202 execution.
1204 Execution options can be set at many scopes, including per-statement,
1205 per-connection, or per execution, using methods such as
1206 :meth:`_engine.Connection.execution_options` and parameters which
1207 accept a dictionary of options such as
1208 :paramref:`_engine.Connection.execute.execution_options` and
1209 :paramref:`_orm.Session.execute.execution_options`.
1211 The primary characteristic of an execution option, as opposed to
1212 other kinds of options such as ORM loader options, is that
1213 **execution options never affect the compiled SQL of a query, only
1214 things that affect how the SQL statement itself is invoked or how
1215 results are fetched**. That is, execution options are not part of
1216 what's accommodated by SQL compilation nor are they considered part of
1217 the cached state of a statement.
1219 The :meth:`_sql.Executable.execution_options` method is
1220 :term:`generative`, as
1221 is the case for the method as applied to the :class:`_engine.Engine`
1222 and :class:`_orm.Query` objects, which means when the method is called,
1223 a copy of the object is returned, which applies the given parameters to
1224 that new copy, but leaves the original unchanged::
1226 statement = select(table.c.x, table.c.y)
1227 new_statement = statement.execution_options(my_option=True)
1229 An exception to this behavior is the :class:`_engine.Connection`
1230 object, where the :meth:`_engine.Connection.execution_options` method
1231 is explicitly **not** generative.
1233 The kinds of options that may be passed to
1234 :meth:`_sql.Executable.execution_options` and other related methods and
1235 parameter dictionaries include parameters that are explicitly consumed
1236 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not
1237 defined by SQLAlchemy, which means the methods and/or parameter
1238 dictionaries may be used for user-defined parameters that interact with
1239 custom code, which may access the parameters using methods such as
1240 :meth:`_sql.Executable.get_execution_options` and
1241 :meth:`_engine.Connection.get_execution_options`, or within selected
1242 event hooks using a dedicated ``execution_options`` event parameter
1243 such as
1244 :paramref:`_events.ConnectionEvents.before_execute.execution_options`
1245 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.::
1247 from sqlalchemy import event
1250 @event.listens_for(some_engine, "before_execute")
1251 def _process_opt(conn, statement, multiparams, params, execution_options):
1252 "run a SQL function before invoking a statement"
1254 if execution_options.get("do_special_thing", False):
1255 conn.exec_driver_sql("run_special_function()")
1257 Within the scope of options that are explicitly recognized by
1258 SQLAlchemy, most apply to specific classes of objects and not others.
1259 The most common execution options include:
1261 * :paramref:`_engine.Connection.execution_options.isolation_level` -
1262 sets the isolation level for a connection or a class of connections
1263 via an :class:`_engine.Engine`. This option is accepted only
1264 by :class:`_engine.Connection` or :class:`_engine.Engine`.
1266 * :paramref:`_engine.Connection.execution_options.stream_results` -
1267 indicates results should be fetched using a server side cursor;
1268 this option is accepted by :class:`_engine.Connection`, by the
1269 :paramref:`_engine.Connection.execute.execution_options` parameter
1270 on :meth:`_engine.Connection.execute`, and additionally by
1271 :meth:`_sql.Executable.execution_options` on a SQL statement object,
1272 as well as by ORM constructs like :meth:`_orm.Session.execute`.
1274 * :paramref:`_engine.Connection.execution_options.compiled_cache` -
1275 indicates a dictionary that will serve as the
1276 :ref:`SQL compilation cache <sql_caching>`
1277 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as
1278 well as for ORM methods like :meth:`_orm.Session.execute`.
1279 Can be passed as ``None`` to disable caching for statements.
1280 This option is not accepted by
1281 :meth:`_sql.Executable.execution_options` as it is inadvisable to
1282 carry along a compilation cache within a statement object.
1284 * :paramref:`_engine.Connection.execution_options.schema_translate_map`
1285 - a mapping of schema names used by the
1286 :ref:`Schema Translate Map <schema_translating>` feature, accepted
1287 by :class:`_engine.Connection`, :class:`_engine.Engine`,
1288 :class:`_sql.Executable`, as well as by ORM constructs
1289 like :meth:`_orm.Session.execute`.
1291 .. seealso::
1293 :meth:`_engine.Connection.execution_options`
1295 :paramref:`_engine.Connection.execute.execution_options`
1297 :paramref:`_orm.Session.execute.execution_options`
1299 :ref:`orm_queryguide_execution_options` - documentation on all
1300 ORM-specific execution options
1302 """ # noqa: E501
1303 if "isolation_level" in kw:
1304 raise exc.ArgumentError(
1305 "'isolation_level' execution option may only be specified "
1306 "on Connection.execution_options(), or "
1307 "per-engine using the isolation_level "
1308 "argument to create_engine()."
1309 )
1310 if "compiled_cache" in kw:
1311 raise exc.ArgumentError(
1312 "'compiled_cache' execution option may only be specified "
1313 "on Connection.execution_options(), not per statement."
1314 )
1315 self._execution_options = self._execution_options.union(kw)
1316 return self
1318 def get_execution_options(self) -> _ExecuteOptions:
1319 """Get the non-SQL options which will take effect during execution.
1321 .. versionadded:: 1.3
1323 .. seealso::
1325 :meth:`.Executable.execution_options`
1326 """
1327 return self._execution_options
1330class SchemaEventTarget(event.EventTarget):
1331 """Base class for elements that are the targets of :class:`.DDLEvents`
1332 events.
1334 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
1336 """
1338 dispatch: dispatcher[SchemaEventTarget]
1340 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
1341 """Associate with this SchemaEvent's parent object."""
1343 def _set_parent_with_dispatch(
1344 self, parent: SchemaEventTarget, **kw: Any
1345 ) -> None:
1346 self.dispatch.before_parent_attach(self, parent)
1347 self._set_parent(parent, **kw)
1348 self.dispatch.after_parent_attach(self, parent)
1351class SchemaVisitable(SchemaEventTarget, visitors.Visitable):
1352 """Base class for elements that are targets of a :class:`.SchemaVisitor`.
1354 .. versionadded:: 2.0.41
1356 """
1359class SchemaVisitor(ClauseVisitor):
1360 """Define the visiting for ``SchemaItem`` and more
1361 generally ``SchemaVisitable`` objects.
1363 """
1365 __traverse_options__: Dict[str, Any] = {"schema_visitor": True}
1368class _SentinelDefaultCharacterization(Enum):
1369 NONE = "none"
1370 UNKNOWN = "unknown"
1371 CLIENTSIDE = "clientside"
1372 SENTINEL_DEFAULT = "sentinel_default"
1373 SERVERSIDE = "serverside"
1374 IDENTITY = "identity"
1375 SEQUENCE = "sequence"
1378class _SentinelColumnCharacterization(NamedTuple):
1379 columns: Optional[Sequence[Column[Any]]] = None
1380 is_explicit: bool = False
1381 is_autoinc: bool = False
1382 default_characterization: _SentinelDefaultCharacterization = (
1383 _SentinelDefaultCharacterization.NONE
1384 )
1387_COLKEY = TypeVar("_COLKEY", Union[None, str], str)
1389_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True)
1390_COL = TypeVar("_COL", bound="ColumnElement[Any]")
1393class _ColumnMetrics(Generic[_COL_co]):
1394 __slots__ = ("column",)
1396 column: _COL_co
1398 def __init__(
1399 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co
1400 ) -> None:
1401 self.column = col
1403 # proxy_index being non-empty means it was initialized.
1404 # so we need to update it
1405 pi = collection._proxy_index
1406 if pi:
1407 for eps_col in col._expanded_proxy_set:
1408 pi[eps_col].add(self)
1410 def get_expanded_proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
1411 return self.column._expanded_proxy_set
1413 def dispose(self, collection: ColumnCollection[_COLKEY, _COL_co]) -> None:
1414 pi = collection._proxy_index
1415 if not pi:
1416 return
1417 for col in self.column._expanded_proxy_set:
1418 colset = pi.get(col, None)
1419 if colset:
1420 colset.discard(self)
1421 if colset is not None and not colset:
1422 del pi[col]
1424 def embedded(
1425 self,
1426 target_set: Union[
1427 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]]
1428 ],
1429 ) -> bool:
1430 expanded_proxy_set = self.column._expanded_proxy_set
1431 for t in target_set.difference(expanded_proxy_set):
1432 if not expanded_proxy_set.intersection(_expand_cloned([t])):
1433 return False
1434 return True
1437class ColumnCollection(Generic[_COLKEY, _COL_co]):
1438 """Collection of :class:`_expression.ColumnElement` instances,
1439 typically for
1440 :class:`_sql.FromClause` objects.
1442 The :class:`_sql.ColumnCollection` object is most commonly available
1443 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection
1444 on the :class:`_schema.Table` object, introduced at
1445 :ref:`metadata_tables_and_columns`.
1447 The :class:`_expression.ColumnCollection` has both mapping- and sequence-
1448 like behaviors. A :class:`_expression.ColumnCollection` usually stores
1449 :class:`_schema.Column` objects, which are then accessible both via mapping
1450 style access as well as attribute access style.
1452 To access :class:`_schema.Column` objects using ordinary attribute-style
1453 access, specify the name like any other object attribute, such as below
1454 a column named ``employee_name`` is accessed::
1456 >>> employee_table.c.employee_name
1458 To access columns that have names with special characters or spaces,
1459 index-style access is used, such as below which illustrates a column named
1460 ``employee ' payment`` is accessed::
1462 >>> employee_table.c["employee ' payment"]
1464 As the :class:`_sql.ColumnCollection` object provides a Python dictionary
1465 interface, common dictionary method names like
1466 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`,
1467 and :meth:`_sql.ColumnCollection.items` are available, which means that
1468 database columns that are keyed under these names also need to use indexed
1469 access::
1471 >>> employee_table.c["values"]
1474 The name for which a :class:`_schema.Column` would be present is normally
1475 that of the :paramref:`_schema.Column.key` parameter. In some contexts,
1476 such as a :class:`_sql.Select` object that uses a label style set
1477 using the :meth:`_sql.Select.set_label_style` method, a column of a certain
1478 key may instead be represented under a particular label name such
1479 as ``tablename_columnname``::
1481 >>> from sqlalchemy import select, column, table
1482 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
1483 >>> t = table("t", column("c"))
1484 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
1485 >>> subq = stmt.subquery()
1486 >>> subq.c.t_c
1487 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c>
1489 :class:`.ColumnCollection` also indexes the columns in order and allows
1490 them to be accessible by their integer position::
1492 >>> cc[0]
1493 Column('x', Integer(), table=None)
1494 >>> cc[1]
1495 Column('y', Integer(), table=None)
1497 .. versionadded:: 1.4 :class:`_expression.ColumnCollection`
1498 allows integer-based
1499 index access to the collection.
1501 Iterating the collection yields the column expressions in order::
1503 >>> list(cc)
1504 [Column('x', Integer(), table=None),
1505 Column('y', Integer(), table=None)]
1507 The base :class:`_expression.ColumnCollection` object can store
1508 duplicates, which can
1509 mean either two columns with the same key, in which case the column
1510 returned by key access is **arbitrary**::
1512 >>> x1, x2 = Column("x", Integer), Column("x", Integer)
1513 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)])
1514 >>> list(cc)
1515 [Column('x', Integer(), table=None),
1516 Column('x', Integer(), table=None)]
1517 >>> cc["x"] is x1
1518 False
1519 >>> cc["x"] is x2
1520 True
1522 Or it can also mean the same column multiple times. These cases are
1523 supported as :class:`_expression.ColumnCollection`
1524 is used to represent the columns in
1525 a SELECT statement which may include duplicates.
1527 A special subclass :class:`.DedupeColumnCollection` exists which instead
1528 maintains SQLAlchemy's older behavior of not allowing duplicates; this
1529 collection is used for schema level objects like :class:`_schema.Table`
1530 and
1531 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The
1532 :class:`.DedupeColumnCollection` class also has additional mutation methods
1533 as the schema constructs have more use cases that require removal and
1534 replacement of columns.
1536 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection`
1537 now stores duplicate
1538 column keys as well as the same column in multiple positions. The
1539 :class:`.DedupeColumnCollection` class is added to maintain the
1540 former behavior in those cases where deduplication as well as
1541 additional replace/remove operations are needed.
1544 """
1546 __slots__ = ("_collection", "_index", "_colset", "_proxy_index")
1548 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]]
1549 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
1550 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]]
1551 _colset: Set[_COL_co]
1553 def __init__(
1554 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None
1555 ):
1556 object.__setattr__(self, "_colset", set())
1557 object.__setattr__(self, "_index", {})
1558 object.__setattr__(
1559 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1560 )
1561 object.__setattr__(self, "_collection", [])
1562 if columns:
1563 self._initial_populate(columns)
1565 @util.preload_module("sqlalchemy.sql.elements")
1566 def __clause_element__(self) -> ClauseList:
1567 elements = util.preloaded.sql_elements
1569 return elements.ClauseList(
1570 _literal_as_text_role=roles.ColumnsClauseRole,
1571 group=False,
1572 *self._all_columns,
1573 )
1575 def _initial_populate(
1576 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1577 ) -> None:
1578 self._populate_separate_keys(iter_)
1580 @property
1581 def _all_columns(self) -> List[_COL_co]:
1582 return [col for (_, col, _) in self._collection]
1584 def keys(self) -> List[_COLKEY]:
1585 """Return a sequence of string key names for all columns in this
1586 collection."""
1587 return [k for (k, _, _) in self._collection]
1589 def values(self) -> List[_COL_co]:
1590 """Return a sequence of :class:`_sql.ColumnClause` or
1591 :class:`_schema.Column` objects for all columns in this
1592 collection."""
1593 return [col for (_, col, _) in self._collection]
1595 def items(self) -> List[Tuple[_COLKEY, _COL_co]]:
1596 """Return a sequence of (key, column) tuples for all columns in this
1597 collection each consisting of a string key name and a
1598 :class:`_sql.ColumnClause` or
1599 :class:`_schema.Column` object.
1600 """
1602 return [(k, col) for (k, col, _) in self._collection]
1604 def __bool__(self) -> bool:
1605 return bool(self._collection)
1607 def __len__(self) -> int:
1608 return len(self._collection)
1610 def __iter__(self) -> Iterator[_COL_co]:
1611 # turn to a list first to maintain over a course of changes
1612 return iter([col for _, col, _ in self._collection])
1614 @overload
1615 def __getitem__(self, key: Union[str, int]) -> _COL_co: ...
1617 @overload
1618 def __getitem__(
1619 self, key: Tuple[Union[str, int], ...]
1620 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1622 @overload
1623 def __getitem__(
1624 self, key: slice
1625 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1627 def __getitem__(
1628 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]]
1629 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]:
1630 try:
1631 if isinstance(key, (tuple, slice)):
1632 if isinstance(key, slice):
1633 cols = (
1634 (sub_key, col)
1635 for (sub_key, col, _) in self._collection[key]
1636 )
1637 else:
1638 cols = (self._index[sub_key] for sub_key in key)
1640 return ColumnCollection(cols).as_readonly()
1641 else:
1642 return self._index[key][1]
1643 except KeyError as err:
1644 if isinstance(err.args[0], int):
1645 raise IndexError(err.args[0]) from err
1646 else:
1647 raise
1649 def __getattr__(self, key: str) -> _COL_co:
1650 try:
1651 return self._index[key][1]
1652 except KeyError as err:
1653 raise AttributeError(key) from err
1655 def __contains__(self, key: str) -> bool:
1656 if key not in self._index:
1657 if not isinstance(key, str):
1658 raise exc.ArgumentError(
1659 "__contains__ requires a string argument"
1660 )
1661 return False
1662 else:
1663 return True
1665 def compare(self, other: ColumnCollection[_COLKEY, _COL_co]) -> bool:
1666 """Compare this :class:`_expression.ColumnCollection` to another
1667 based on the names of the keys"""
1669 for l, r in zip_longest(self, other):
1670 if l is not r:
1671 return False
1672 else:
1673 return True
1675 def __eq__(self, other: Any) -> bool:
1676 return self.compare(other)
1678 @overload
1679 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ...
1681 @overload
1682 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ...
1684 def get(
1685 self, key: str, default: Optional[_COL] = None
1686 ) -> Optional[Union[_COL_co, _COL]]:
1687 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object
1688 based on a string key name from this
1689 :class:`_expression.ColumnCollection`."""
1691 if key in self._index:
1692 return self._index[key][1]
1693 else:
1694 return default
1696 def __str__(self) -> str:
1697 return "%s(%s)" % (
1698 self.__class__.__name__,
1699 ", ".join(str(c) for c in self),
1700 )
1702 def __setitem__(self, key: str, value: Any) -> NoReturn:
1703 raise NotImplementedError()
1705 def __delitem__(self, key: str) -> NoReturn:
1706 raise NotImplementedError()
1708 def __setattr__(self, key: str, obj: Any) -> NoReturn:
1709 raise NotImplementedError()
1711 def clear(self) -> NoReturn:
1712 """Dictionary clear() is not implemented for
1713 :class:`_sql.ColumnCollection`."""
1714 raise NotImplementedError()
1716 def remove(self, column: Any) -> NoReturn:
1717 raise NotImplementedError()
1719 def update(self, iter_: Any) -> NoReturn:
1720 """Dictionary update() is not implemented for
1721 :class:`_sql.ColumnCollection`."""
1722 raise NotImplementedError()
1724 # https://github.com/python/mypy/issues/4266
1725 __hash__: Optional[int] = None # type: ignore
1727 def _populate_separate_keys(
1728 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1729 ) -> None:
1730 """populate from an iterator of (key, column)"""
1732 self._collection[:] = collection = [
1733 (k, c, _ColumnMetrics(self, c)) for k, c in iter_
1734 ]
1735 self._colset.update(c._deannotate() for _, c, _ in collection)
1736 self._index.update(
1737 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)}
1738 )
1739 self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
1741 def add(
1742 self, column: ColumnElement[Any], key: Optional[_COLKEY] = None
1743 ) -> None:
1744 """Add a column to this :class:`_sql.ColumnCollection`.
1746 .. note::
1748 This method is **not normally used by user-facing code**, as the
1749 :class:`_sql.ColumnCollection` is usually part of an existing
1750 object such as a :class:`_schema.Table`. To add a
1751 :class:`_schema.Column` to an existing :class:`_schema.Table`
1752 object, use the :meth:`_schema.Table.append_column` method.
1754 """
1755 colkey: _COLKEY
1757 if key is None:
1758 colkey = column.key # type: ignore
1759 else:
1760 colkey = key
1762 l = len(self._collection)
1764 # don't really know how this part is supposed to work w/ the
1765 # covariant thing
1767 _column = cast(_COL_co, column)
1769 self._collection.append(
1770 (colkey, _column, _ColumnMetrics(self, _column))
1771 )
1772 self._colset.add(_column._deannotate())
1773 self._index[l] = (colkey, _column)
1774 if colkey not in self._index:
1775 self._index[colkey] = (colkey, _column)
1777 def __getstate__(self) -> Dict[str, Any]:
1778 return {
1779 "_collection": [(k, c) for k, c, _ in self._collection],
1780 "_index": self._index,
1781 }
1783 def __setstate__(self, state: Dict[str, Any]) -> None:
1784 object.__setattr__(self, "_index", state["_index"])
1785 object.__setattr__(
1786 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1787 )
1788 object.__setattr__(
1789 self,
1790 "_collection",
1791 [
1792 (k, c, _ColumnMetrics(self, c))
1793 for (k, c) in state["_collection"]
1794 ],
1795 )
1796 object.__setattr__(
1797 self, "_colset", {col for k, col, _ in self._collection}
1798 )
1800 def contains_column(self, col: ColumnElement[Any]) -> bool:
1801 """Checks if a column object exists in this collection"""
1802 if col not in self._colset:
1803 if isinstance(col, str):
1804 raise exc.ArgumentError(
1805 "contains_column cannot be used with string arguments. "
1806 "Use ``col_name in table.c`` instead."
1807 )
1808 return False
1809 else:
1810 return True
1812 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
1813 """Return a "read only" form of this
1814 :class:`_sql.ColumnCollection`."""
1816 return ReadOnlyColumnCollection(self)
1818 def _init_proxy_index(self) -> None:
1819 """populate the "proxy index", if empty.
1821 proxy index is added in 2.0 to provide more efficient operation
1822 for the corresponding_column() method.
1824 For reasons of both time to construct new .c collections as well as
1825 memory conservation for large numbers of large .c collections, the
1826 proxy_index is only filled if corresponding_column() is called. once
1827 filled it stays that way, and new _ColumnMetrics objects created after
1828 that point will populate it with new data. Note this case would be
1829 unusual, if not nonexistent, as it means a .c collection is being
1830 mutated after corresponding_column() were used, however it is tested in
1831 test/base/test_utils.py.
1833 """
1834 pi = self._proxy_index
1835 if pi:
1836 return
1838 for _, _, metrics in self._collection:
1839 eps = metrics.column._expanded_proxy_set
1841 for eps_col in eps:
1842 pi[eps_col].add(metrics)
1844 def corresponding_column(
1845 self, column: _COL, require_embedded: bool = False
1846 ) -> Optional[Union[_COL, _COL_co]]:
1847 """Given a :class:`_expression.ColumnElement`, return the exported
1848 :class:`_expression.ColumnElement` object from this
1849 :class:`_expression.ColumnCollection`
1850 which corresponds to that original :class:`_expression.ColumnElement`
1851 via a common
1852 ancestor column.
1854 :param column: the target :class:`_expression.ColumnElement`
1855 to be matched.
1857 :param require_embedded: only return corresponding columns for
1858 the given :class:`_expression.ColumnElement`, if the given
1859 :class:`_expression.ColumnElement`
1860 is actually present within a sub-element
1861 of this :class:`_expression.Selectable`.
1862 Normally the column will match if
1863 it merely shares a common ancestor with one of the exported
1864 columns of this :class:`_expression.Selectable`.
1866 .. seealso::
1868 :meth:`_expression.Selectable.corresponding_column`
1869 - invokes this method
1870 against the collection returned by
1871 :attr:`_expression.Selectable.exported_columns`.
1873 .. versionchanged:: 1.4 the implementation for ``corresponding_column``
1874 was moved onto the :class:`_expression.ColumnCollection` itself.
1876 """
1877 # TODO: cython candidate
1879 # don't dig around if the column is locally present
1880 if column in self._colset:
1881 return column
1883 selected_intersection, selected_metrics = None, None
1884 target_set = column.proxy_set
1886 pi = self._proxy_index
1887 if not pi:
1888 self._init_proxy_index()
1890 for current_metrics in (
1891 mm for ts in target_set if ts in pi for mm in pi[ts]
1892 ):
1893 if not require_embedded or current_metrics.embedded(target_set):
1894 if selected_metrics is None:
1895 # no corresponding column yet, pick this one.
1896 selected_metrics = current_metrics
1897 continue
1899 current_intersection = target_set.intersection(
1900 current_metrics.column._expanded_proxy_set
1901 )
1902 if selected_intersection is None:
1903 selected_intersection = target_set.intersection(
1904 selected_metrics.column._expanded_proxy_set
1905 )
1907 if len(current_intersection) > len(selected_intersection):
1908 # 'current' has a larger field of correspondence than
1909 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x
1910 # matches a1.c.x->table.c.x better than
1911 # selectable.c.x->table.c.x does.
1913 selected_metrics = current_metrics
1914 selected_intersection = current_intersection
1915 elif current_intersection == selected_intersection:
1916 # they have the same field of correspondence. see
1917 # which proxy_set has fewer columns in it, which
1918 # indicates a closer relationship with the root
1919 # column. Also take into account the "weight"
1920 # attribute which CompoundSelect() uses to give
1921 # higher precedence to columns based on vertical
1922 # position in the compound statement, and discard
1923 # columns that have no reference to the target
1924 # column (also occurs with CompoundSelect)
1926 selected_col_distance = sum(
1927 [
1928 sc._annotations.get("weight", 1)
1929 for sc in (
1930 selected_metrics.column._uncached_proxy_list()
1931 )
1932 if sc.shares_lineage(column)
1933 ],
1934 )
1935 current_col_distance = sum(
1936 [
1937 sc._annotations.get("weight", 1)
1938 for sc in (
1939 current_metrics.column._uncached_proxy_list()
1940 )
1941 if sc.shares_lineage(column)
1942 ],
1943 )
1944 if current_col_distance < selected_col_distance:
1945 selected_metrics = current_metrics
1946 selected_intersection = current_intersection
1948 return selected_metrics.column if selected_metrics else None
1951_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]")
1954class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
1955 """A :class:`_expression.ColumnCollection`
1956 that maintains deduplicating behavior.
1958 This is useful by schema level objects such as :class:`_schema.Table` and
1959 :class:`.PrimaryKeyConstraint`. The collection includes more
1960 sophisticated mutator methods as well to suit schema objects which
1961 require mutable column collections.
1963 .. versionadded:: 1.4
1965 """
1967 def add( # type: ignore[override]
1968 self, column: _NAMEDCOL, key: Optional[str] = None
1969 ) -> None:
1970 if key is not None and column.key != key:
1971 raise exc.ArgumentError(
1972 "DedupeColumnCollection requires columns be under "
1973 "the same key as their .key"
1974 )
1975 key = column.key
1977 if key is None:
1978 raise exc.ArgumentError(
1979 "Can't add unnamed column to column collection"
1980 )
1982 if key in self._index:
1983 existing = self._index[key][1]
1985 if existing is column:
1986 return
1988 self.replace(column)
1990 # pop out memoized proxy_set as this
1991 # operation may very well be occurring
1992 # in a _make_proxy operation
1993 util.memoized_property.reset(column, "proxy_set")
1994 else:
1995 self._append_new_column(key, column)
1997 def _append_new_column(self, key: str, named_column: _NAMEDCOL) -> None:
1998 l = len(self._collection)
1999 self._collection.append(
2000 (key, named_column, _ColumnMetrics(self, named_column))
2001 )
2002 self._colset.add(named_column._deannotate())
2003 self._index[l] = (key, named_column)
2004 self._index[key] = (key, named_column)
2006 def _populate_separate_keys(
2007 self, iter_: Iterable[Tuple[str, _NAMEDCOL]]
2008 ) -> None:
2009 """populate from an iterator of (key, column)"""
2010 cols = list(iter_)
2012 replace_col = []
2013 for k, col in cols:
2014 if col.key != k:
2015 raise exc.ArgumentError(
2016 "DedupeColumnCollection requires columns be under "
2017 "the same key as their .key"
2018 )
2019 if col.name in self._index and col.key != col.name:
2020 replace_col.append(col)
2021 elif col.key in self._index:
2022 replace_col.append(col)
2023 else:
2024 self._index[k] = (k, col)
2025 self._collection.append((k, col, _ColumnMetrics(self, col)))
2026 self._colset.update(c._deannotate() for (k, c, _) in self._collection)
2028 self._index.update(
2029 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection)
2030 )
2031 for col in replace_col:
2032 self.replace(col)
2034 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None:
2035 self._populate_separate_keys((col.key, col) for col in iter_)
2037 def remove(self, column: _NAMEDCOL) -> None: # type: ignore[override]
2038 if column not in self._colset:
2039 raise ValueError(
2040 "Can't remove column %r; column is not in this collection"
2041 % column
2042 )
2043 del self._index[column.key]
2044 self._colset.remove(column)
2045 self._collection[:] = [
2046 (k, c, metrics)
2047 for (k, c, metrics) in self._collection
2048 if c is not column
2049 ]
2050 for metrics in self._proxy_index.get(column, ()):
2051 metrics.dispose(self)
2053 self._index.update(
2054 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2055 )
2056 # delete higher index
2057 del self._index[len(self._collection)]
2059 def replace(
2060 self,
2061 column: _NAMEDCOL,
2062 extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
2063 ) -> None:
2064 """add the given column to this collection, removing unaliased
2065 versions of this column as well as existing columns with the
2066 same key.
2068 e.g.::
2070 t = Table("sometable", metadata, Column("col1", Integer))
2071 t.columns.replace(Column("col1", Integer, key="columnone"))
2073 will remove the original 'col1' from the collection, and add
2074 the new column under the name 'columnname'.
2076 Used by schema.Column to override columns during table reflection.
2078 """
2080 if extra_remove:
2081 remove_col = set(extra_remove)
2082 else:
2083 remove_col = set()
2084 # remove up to two columns based on matches of name as well as key
2085 if column.name in self._index and column.key != column.name:
2086 other = self._index[column.name][1]
2087 if other.name == other.key:
2088 remove_col.add(other)
2090 if column.key in self._index:
2091 remove_col.add(self._index[column.key][1])
2093 if not remove_col:
2094 self._append_new_column(column.key, column)
2095 return
2096 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
2097 replaced = False
2098 for k, col, metrics in self._collection:
2099 if col in remove_col:
2100 if not replaced:
2101 replaced = True
2102 new_cols.append(
2103 (column.key, column, _ColumnMetrics(self, column))
2104 )
2105 else:
2106 new_cols.append((k, col, metrics))
2108 if remove_col:
2109 self._colset.difference_update(remove_col)
2111 for rc in remove_col:
2112 for metrics in self._proxy_index.get(rc, ()):
2113 metrics.dispose(self)
2115 if not replaced:
2116 new_cols.append((column.key, column, _ColumnMetrics(self, column)))
2118 self._colset.add(column._deannotate())
2119 self._collection[:] = new_cols
2121 self._index.clear()
2123 self._index.update(
2124 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2125 )
2126 self._index.update({k: (k, col) for (k, col, _) in self._collection})
2129class ReadOnlyColumnCollection(
2130 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co]
2131):
2132 __slots__ = ("_parent",)
2134 def __init__(self, collection: ColumnCollection[_COLKEY, _COL_co]):
2135 object.__setattr__(self, "_parent", collection)
2136 object.__setattr__(self, "_colset", collection._colset)
2137 object.__setattr__(self, "_index", collection._index)
2138 object.__setattr__(self, "_collection", collection._collection)
2139 object.__setattr__(self, "_proxy_index", collection._proxy_index)
2141 def __getstate__(self) -> Dict[str, _COL_co]:
2142 return {"_parent": self._parent}
2144 def __setstate__(self, state: Dict[str, Any]) -> None:
2145 parent = state["_parent"]
2146 self.__init__(parent) # type: ignore
2148 def add(self, column: Any, key: Any = ...) -> Any:
2149 self._readonly()
2151 def extend(self, elements: Any) -> NoReturn:
2152 self._readonly()
2154 def remove(self, item: Any) -> NoReturn:
2155 self._readonly()
2158class ColumnSet(util.OrderedSet["ColumnClause[Any]"]):
2159 def contains_column(self, col: ColumnClause[Any]) -> bool:
2160 return col in self
2162 def extend(self, cols: Iterable[Any]) -> None:
2163 for col in cols:
2164 self.add(col)
2166 def __eq__(self, other):
2167 l = []
2168 for c in other:
2169 for local in self:
2170 if c.shares_lineage(local):
2171 l.append(c == local)
2172 return elements.and_(*l)
2174 def __hash__(self) -> int: # type: ignore[override]
2175 return hash(tuple(x for x in self))
2178def _entity_namespace(
2179 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2180) -> _EntityNamespace:
2181 """Return the nearest .entity_namespace for the given entity.
2183 If not immediately available, does an iterate to find a sub-element
2184 that has one, if any.
2186 """
2187 try:
2188 return cast(_HasEntityNamespace, entity).entity_namespace
2189 except AttributeError:
2190 for elem in visitors.iterate(cast(ExternallyTraversible, entity)):
2191 if _is_has_entity_namespace(elem):
2192 return elem.entity_namespace
2193 else:
2194 raise
2197def _entity_namespace_key(
2198 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2199 key: str,
2200 default: Union[SQLCoreOperations[Any], _NoArg] = NO_ARG,
2201) -> SQLCoreOperations[Any]:
2202 """Return an entry from an entity_namespace.
2205 Raises :class:`_exc.InvalidRequestError` rather than attribute error
2206 on not found.
2208 """
2210 try:
2211 ns = _entity_namespace(entity)
2212 if default is not NO_ARG:
2213 return getattr(ns, key, default)
2214 else:
2215 return getattr(ns, key) # type: ignore
2216 except AttributeError as err:
2217 raise exc.InvalidRequestError(
2218 'Entity namespace for "%s" has no property "%s"' % (entity, key)
2219 ) from err