Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/base.py: 50%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# sql/base.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
9"""Foundational utilities common to many sql modules.
11"""
14from __future__ import annotations
16import collections
17from enum import Enum
18import itertools
19from itertools import zip_longest
20import operator
21import re
22from typing import Any
23from typing import Callable
24from typing import cast
25from typing import Dict
26from typing import FrozenSet
27from typing import Generic
28from typing import Iterable
29from typing import Iterator
30from typing import List
31from typing import Mapping
32from typing import MutableMapping
33from typing import NamedTuple
34from typing import NoReturn
35from typing import Optional
36from typing import overload
37from typing import Protocol
38from typing import Sequence
39from typing import Set
40from typing import Tuple
41from typing import Type
42from typing import TYPE_CHECKING
43from typing import TypeVar
44from typing import Union
46from . import roles
47from . import visitors
48from .cache_key import HasCacheKey # noqa
49from .cache_key import MemoizedHasCacheKey # noqa
50from .traversals import HasCopyInternals # noqa
51from .visitors import ClauseVisitor
52from .visitors import ExtendedInternalTraversal
53from .visitors import ExternallyTraversible
54from .visitors import InternalTraversal
55from .. import event
56from .. import exc
57from .. import util
58from ..util import HasMemoized as HasMemoized
59from ..util import hybridmethod
60from ..util.typing import Self
61from ..util.typing import TypeGuard
63if TYPE_CHECKING:
64 from . import coercions
65 from . import elements
66 from . import type_api
67 from ._orm_types import DMLStrategyArgument
68 from ._orm_types import SynchronizeSessionArgument
69 from ._typing import _CLE
70 from .elements import BindParameter
71 from .elements import ClauseList
72 from .elements import ColumnClause # noqa
73 from .elements import ColumnElement
74 from .elements import NamedColumn
75 from .elements import SQLCoreOperations
76 from .elements import TextClause
77 from .schema import Column
78 from .schema import DefaultGenerator
79 from .selectable import _JoinTargetElement
80 from .selectable import _SelectIterable
81 from .selectable import FromClause
82 from ..engine import Connection
83 from ..engine import CursorResult
84 from ..engine.interfaces import _CoreMultiExecuteParams
85 from ..engine.interfaces import _ExecuteOptions
86 from ..engine.interfaces import _ImmutableExecuteOptions
87 from ..engine.interfaces import CacheStats
88 from ..engine.interfaces import Compiled
89 from ..engine.interfaces import CompiledCacheType
90 from ..engine.interfaces import CoreExecuteOptionsParameter
91 from ..engine.interfaces import Dialect
92 from ..engine.interfaces import IsolationLevel
93 from ..engine.interfaces import SchemaTranslateMapType
94 from ..event import dispatcher
96if not TYPE_CHECKING:
97 coercions = None # noqa
98 elements = None # noqa
99 type_api = None # noqa
102class _NoArg(Enum):
103 NO_ARG = 0
105 def __repr__(self):
106 return f"_NoArg.{self.name}"
109NO_ARG = _NoArg.NO_ARG
112class _NoneName(Enum):
113 NONE_NAME = 0
114 """indicate a 'deferred' name that was ultimately the value None."""
117_NONE_NAME = _NoneName.NONE_NAME
119_T = TypeVar("_T", bound=Any)
121_Fn = TypeVar("_Fn", bound=Callable[..., Any])
123_AmbiguousTableNameMap = MutableMapping[str, str]
126class _DefaultDescriptionTuple(NamedTuple):
127 arg: Any
128 is_scalar: Optional[bool]
129 is_callable: Optional[bool]
130 is_sentinel: Optional[bool]
132 @classmethod
133 def _from_column_default(
134 cls, default: Optional[DefaultGenerator]
135 ) -> _DefaultDescriptionTuple:
136 return (
137 _DefaultDescriptionTuple(
138 default.arg, # type: ignore
139 default.is_scalar,
140 default.is_callable,
141 default.is_sentinel,
142 )
143 if default
144 and (
145 default.has_arg
146 or (not default.for_update and default.is_sentinel)
147 )
148 else _DefaultDescriptionTuple(None, None, None, None)
149 )
152_never_select_column = operator.attrgetter("_omit_from_statements")
155class _EntityNamespace(Protocol):
156 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ...
159class _HasEntityNamespace(Protocol):
160 @util.ro_non_memoized_property
161 def entity_namespace(self) -> _EntityNamespace: ...
164def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]:
165 return hasattr(element, "entity_namespace")
168# Remove when https://github.com/python/mypy/issues/14640 will be fixed
169_Self = TypeVar("_Self", bound=Any)
172class Immutable:
173 """mark a ClauseElement as 'immutable' when expressions are cloned.
175 "immutable" objects refers to the "mutability" of an object in the
176 context of SQL DQL and DML generation. Such as, in DQL, one can
177 compose a SELECT or subquery of varied forms, but one cannot modify
178 the structure of a specific table or column within DQL.
179 :class:`.Immutable` is mostly intended to follow this concept, and as
180 such the primary "immutable" objects are :class:`.ColumnClause`,
181 :class:`.Column`, :class:`.TableClause`, :class:`.Table`.
183 """
185 __slots__ = ()
187 _is_immutable = True
189 def unique_params(self, *optionaldict, **kwargs):
190 raise NotImplementedError("Immutable objects do not support copying")
192 def params(self, *optionaldict, **kwargs):
193 raise NotImplementedError("Immutable objects do not support copying")
195 def _clone(self: _Self, **kw: Any) -> _Self:
196 return self
198 def _copy_internals(
199 self, *, omit_attrs: Iterable[str] = (), **kw: Any
200 ) -> None:
201 pass
204class SingletonConstant(Immutable):
205 """Represent SQL constants like NULL, TRUE, FALSE"""
207 _is_singleton_constant = True
209 _singleton: SingletonConstant
211 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T:
212 return cast(_T, cls._singleton)
214 @util.non_memoized_property
215 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
216 raise NotImplementedError()
218 @classmethod
219 def _create_singleton(cls):
220 obj = object.__new__(cls)
221 obj.__init__() # type: ignore
223 # for a long time this was an empty frozenset, meaning
224 # a SingletonConstant would never be a "corresponding column" in
225 # a statement. This referred to #6259. However, in #7154 we see
226 # that we do in fact need "correspondence" to work when matching cols
227 # in result sets, so the non-correspondence was moved to a more
228 # specific level when we are actually adapting expressions for SQL
229 # render only.
230 obj.proxy_set = frozenset([obj])
231 cls._singleton = obj
234def _from_objects(
235 *elements: Union[
236 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement
237 ]
238) -> Iterator[FromClause]:
239 return itertools.chain.from_iterable(
240 [element._from_objects for element in elements]
241 )
244def _select_iterables(
245 elements: Iterable[roles.ColumnsClauseRole],
246) -> _SelectIterable:
247 """expand tables into individual columns in the
248 given list of column expressions.
250 """
251 return itertools.chain.from_iterable(
252 [c._select_iterable for c in elements]
253 )
256_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType")
259class _GenerativeType(Protocol):
260 def _generate(self) -> Self: ...
263def _generative(fn: _Fn) -> _Fn:
264 """non-caching _generative() decorator.
266 This is basically the legacy decorator that copies the object and
267 runs a method on the new copy.
269 """
271 @util.decorator
272 def _generative(
273 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any
274 ) -> _SelfGenerativeType:
275 """Mark a method as generative."""
277 self = self._generate()
278 x = fn(self, *args, **kw)
279 assert x is self, "generative methods must return self"
280 return self
282 decorated = _generative(fn)
283 decorated.non_generative = fn # type: ignore
284 return decorated
287def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]:
288 msgs = kw.pop("msgs", {})
290 defaults = kw.pop("defaults", {})
292 getters = [
293 (name, operator.attrgetter(name), defaults.get(name, None))
294 for name in names
295 ]
297 @util.decorator
298 def check(fn, *args, **kw):
299 # make pylance happy by not including "self" in the argument
300 # list
301 self = args[0]
302 args = args[1:]
303 for name, getter, default_ in getters:
304 if getter(self) is not default_:
305 msg = msgs.get(
306 name,
307 "Method %s() has already been invoked on this %s construct"
308 % (fn.__name__, self.__class__),
309 )
310 raise exc.InvalidRequestError(msg)
311 return fn(self, *args, **kw)
313 return check
316def _clone(element, **kw):
317 return element._clone(**kw)
320def _expand_cloned(
321 elements: Iterable[_CLE],
322) -> Iterable[_CLE]:
323 """expand the given set of ClauseElements to be the set of all 'cloned'
324 predecessors.
326 """
327 # TODO: cython candidate
328 return itertools.chain(*[x._cloned_set for x in elements])
331def _de_clone(
332 elements: Iterable[_CLE],
333) -> Iterable[_CLE]:
334 for x in elements:
335 while x._is_clone_of is not None:
336 x = x._is_clone_of
337 yield x
340def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
341 """return the intersection of sets a and b, counting
342 any overlap between 'cloned' predecessors.
344 The returned set is in terms of the entities present within 'a'.
346 """
347 all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
348 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)}
351def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
352 all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
353 return {
354 elem for elem in a if not all_overlap.intersection(elem._cloned_set)
355 }
358class _DialectArgView(MutableMapping[str, Any]):
359 """A dictionary view of dialect-level arguments in the form
360 <dialectname>_<argument_name>.
362 """
364 def __init__(self, obj):
365 self.obj = obj
367 def _key(self, key):
368 try:
369 dialect, value_key = key.split("_", 1)
370 except ValueError as err:
371 raise KeyError(key) from err
372 else:
373 return dialect, value_key
375 def __getitem__(self, key):
376 dialect, value_key = self._key(key)
378 try:
379 opt = self.obj.dialect_options[dialect]
380 except exc.NoSuchModuleError as err:
381 raise KeyError(key) from err
382 else:
383 return opt[value_key]
385 def __setitem__(self, key, value):
386 try:
387 dialect, value_key = self._key(key)
388 except KeyError as err:
389 raise exc.ArgumentError(
390 "Keys must be of the form <dialectname>_<argname>"
391 ) from err
392 else:
393 self.obj.dialect_options[dialect][value_key] = value
395 def __delitem__(self, key):
396 dialect, value_key = self._key(key)
397 del self.obj.dialect_options[dialect][value_key]
399 def __len__(self):
400 return sum(
401 len(args._non_defaults)
402 for args in self.obj.dialect_options.values()
403 )
405 def __iter__(self):
406 return (
407 "%s_%s" % (dialect_name, value_name)
408 for dialect_name in self.obj.dialect_options
409 for value_name in self.obj.dialect_options[
410 dialect_name
411 ]._non_defaults
412 )
415class _DialectArgDict(MutableMapping[str, Any]):
416 """A dictionary view of dialect-level arguments for a specific
417 dialect.
419 Maintains a separate collection of user-specified arguments
420 and dialect-specified default arguments.
422 """
424 def __init__(self):
425 self._non_defaults = {}
426 self._defaults = {}
428 def __len__(self):
429 return len(set(self._non_defaults).union(self._defaults))
431 def __iter__(self):
432 return iter(set(self._non_defaults).union(self._defaults))
434 def __getitem__(self, key):
435 if key in self._non_defaults:
436 return self._non_defaults[key]
437 else:
438 return self._defaults[key]
440 def __setitem__(self, key, value):
441 self._non_defaults[key] = value
443 def __delitem__(self, key):
444 del self._non_defaults[key]
447@util.preload_module("sqlalchemy.dialects")
448def _kw_reg_for_dialect(dialect_name):
449 dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
450 if dialect_cls.construct_arguments is None:
451 return None
452 return dict(dialect_cls.construct_arguments)
455class DialectKWArgs:
456 """Establish the ability for a class to have dialect-specific arguments
457 with defaults and constructor validation.
459 The :class:`.DialectKWArgs` interacts with the
460 :attr:`.DefaultDialect.construct_arguments` present on a dialect.
462 .. seealso::
464 :attr:`.DefaultDialect.construct_arguments`
466 """
468 __slots__ = ()
470 _dialect_kwargs_traverse_internals = [
471 ("dialect_options", InternalTraversal.dp_dialect_options)
472 ]
474 @classmethod
475 def argument_for(cls, dialect_name, argument_name, default):
476 """Add a new kind of dialect-specific keyword argument for this class.
478 E.g.::
480 Index.argument_for("mydialect", "length", None)
482 some_index = Index('a', 'b', mydialect_length=5)
484 The :meth:`.DialectKWArgs.argument_for` method is a per-argument
485 way adding extra arguments to the
486 :attr:`.DefaultDialect.construct_arguments` dictionary. This
487 dictionary provides a list of argument names accepted by various
488 schema-level constructs on behalf of a dialect.
490 New dialects should typically specify this dictionary all at once as a
491 data member of the dialect class. The use case for ad-hoc addition of
492 argument names is typically for end-user code that is also using
493 a custom compilation scheme which consumes the additional arguments.
495 :param dialect_name: name of a dialect. The dialect must be
496 locatable, else a :class:`.NoSuchModuleError` is raised. The
497 dialect must also include an existing
498 :attr:`.DefaultDialect.construct_arguments` collection, indicating
499 that it participates in the keyword-argument validation and default
500 system, else :class:`.ArgumentError` is raised. If the dialect does
501 not include this collection, then any keyword argument can be
502 specified on behalf of this dialect already. All dialects packaged
503 within SQLAlchemy include this collection, however for third party
504 dialects, support may vary.
506 :param argument_name: name of the parameter.
508 :param default: default value of the parameter.
510 """
512 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
513 if construct_arg_dictionary is None:
514 raise exc.ArgumentError(
515 "Dialect '%s' does have keyword-argument "
516 "validation and defaults enabled configured" % dialect_name
517 )
518 if cls not in construct_arg_dictionary:
519 construct_arg_dictionary[cls] = {}
520 construct_arg_dictionary[cls][argument_name] = default
522 @util.memoized_property
523 def dialect_kwargs(self):
524 """A collection of keyword arguments specified as dialect-specific
525 options to this construct.
527 The arguments are present here in their original ``<dialect>_<kwarg>``
528 format. Only arguments that were actually passed are included;
529 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which
530 contains all options known by this dialect including defaults.
532 The collection is also writable; keys are accepted of the
533 form ``<dialect>_<kwarg>`` where the value will be assembled
534 into the list of options.
536 .. seealso::
538 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form
540 """
541 return _DialectArgView(self)
543 @property
544 def kwargs(self):
545 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
546 return self.dialect_kwargs
548 _kw_registry = util.PopulateDict(_kw_reg_for_dialect)
550 def _kw_reg_for_dialect_cls(self, dialect_name):
551 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
552 d = _DialectArgDict()
554 if construct_arg_dictionary is None:
555 d._defaults.update({"*": None})
556 else:
557 for cls in reversed(self.__class__.__mro__):
558 if cls in construct_arg_dictionary:
559 d._defaults.update(construct_arg_dictionary[cls])
560 return d
562 @util.memoized_property
563 def dialect_options(self):
564 """A collection of keyword arguments specified as dialect-specific
565 options to this construct.
567 This is a two-level nested registry, keyed to ``<dialect_name>``
568 and ``<argument_name>``. For example, the ``postgresql_where``
569 argument would be locatable as::
571 arg = my_object.dialect_options['postgresql']['where']
573 .. versionadded:: 0.9.2
575 .. seealso::
577 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form
579 """
581 return util.PopulateDict(
582 util.portable_instancemethod(self._kw_reg_for_dialect_cls)
583 )
585 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None:
586 # validate remaining kwargs that they all specify DB prefixes
588 if not kwargs:
589 return
591 for k in kwargs:
592 m = re.match("^(.+?)_(.+)$", k)
593 if not m:
594 raise TypeError(
595 "Additional arguments should be "
596 "named <dialectname>_<argument>, got '%s'" % k
597 )
598 dialect_name, arg_name = m.group(1, 2)
600 try:
601 construct_arg_dictionary = self.dialect_options[dialect_name]
602 except exc.NoSuchModuleError:
603 util.warn(
604 "Can't validate argument %r; can't "
605 "locate any SQLAlchemy dialect named %r"
606 % (k, dialect_name)
607 )
608 self.dialect_options[dialect_name] = d = _DialectArgDict()
609 d._defaults.update({"*": None})
610 d._non_defaults[arg_name] = kwargs[k]
611 else:
612 if (
613 "*" not in construct_arg_dictionary
614 and arg_name not in construct_arg_dictionary
615 ):
616 raise exc.ArgumentError(
617 "Argument %r is not accepted by "
618 "dialect %r on behalf of %r"
619 % (k, dialect_name, self.__class__)
620 )
621 else:
622 construct_arg_dictionary[arg_name] = kwargs[k]
625class CompileState:
626 """Produces additional object state necessary for a statement to be
627 compiled.
629 the :class:`.CompileState` class is at the base of classes that assemble
630 state for a particular statement object that is then used by the
631 compiler. This process is essentially an extension of the process that
632 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis
633 on converting raw user intent into more organized structures rather than
634 producing string output. The top-level :class:`.CompileState` for the
635 statement being executed is also accessible when the execution context
636 works with invoking the statement and collecting results.
638 The production of :class:`.CompileState` is specific to the compiler, such
639 as within the :meth:`.SQLCompiler.visit_insert`,
640 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also
641 responsible for associating the :class:`.CompileState` with the
642 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement,
643 i.e. the outermost SQL statement that's actually being executed.
644 There can be other :class:`.CompileState` objects that are not the
645 toplevel, such as when a SELECT subquery or CTE-nested
646 INSERT/UPDATE/DELETE is generated.
648 .. versionadded:: 1.4
650 """
652 __slots__ = ("statement", "_ambiguous_table_name_map")
654 plugins: Dict[Tuple[str, str], Type[CompileState]] = {}
656 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap]
658 @classmethod
659 def create_for_statement(cls, statement, compiler, **kw):
660 # factory construction.
662 if statement._propagate_attrs:
663 plugin_name = statement._propagate_attrs.get(
664 "compile_state_plugin", "default"
665 )
666 klass = cls.plugins.get(
667 (plugin_name, statement._effective_plugin_target), None
668 )
669 if klass is None:
670 klass = cls.plugins[
671 ("default", statement._effective_plugin_target)
672 ]
674 else:
675 klass = cls.plugins[
676 ("default", statement._effective_plugin_target)
677 ]
679 if klass is cls:
680 return cls(statement, compiler, **kw)
681 else:
682 return klass.create_for_statement(statement, compiler, **kw)
684 def __init__(self, statement, compiler, **kw):
685 self.statement = statement
687 @classmethod
688 def get_plugin_class(
689 cls, statement: Executable
690 ) -> Optional[Type[CompileState]]:
691 plugin_name = statement._propagate_attrs.get(
692 "compile_state_plugin", None
693 )
695 if plugin_name:
696 key = (plugin_name, statement._effective_plugin_target)
697 if key in cls.plugins:
698 return cls.plugins[key]
700 # there's no case where we call upon get_plugin_class() and want
701 # to get None back, there should always be a default. return that
702 # if there was no plugin-specific class (e.g. Insert with "orm"
703 # plugin)
704 try:
705 return cls.plugins[("default", statement._effective_plugin_target)]
706 except KeyError:
707 return None
709 @classmethod
710 def _get_plugin_class_for_plugin(
711 cls, statement: Executable, plugin_name: str
712 ) -> Optional[Type[CompileState]]:
713 try:
714 return cls.plugins[
715 (plugin_name, statement._effective_plugin_target)
716 ]
717 except KeyError:
718 return None
720 @classmethod
721 def plugin_for(
722 cls, plugin_name: str, visit_name: str
723 ) -> Callable[[_Fn], _Fn]:
724 def decorate(cls_to_decorate):
725 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate
726 return cls_to_decorate
728 return decorate
731class Generative(HasMemoized):
732 """Provide a method-chaining pattern in conjunction with the
733 @_generative decorator."""
735 def _generate(self) -> Self:
736 skip = self._memoized_keys
737 cls = self.__class__
738 s = cls.__new__(cls)
739 if skip:
740 # ensure this iteration remains atomic
741 s.__dict__ = {
742 k: v for k, v in self.__dict__.copy().items() if k not in skip
743 }
744 else:
745 s.__dict__ = self.__dict__.copy()
746 return s
749class InPlaceGenerative(HasMemoized):
750 """Provide a method-chaining pattern in conjunction with the
751 @_generative decorator that mutates in place."""
753 __slots__ = ()
755 def _generate(self):
756 skip = self._memoized_keys
757 # note __dict__ needs to be in __slots__ if this is used
758 for k in skip:
759 self.__dict__.pop(k, None)
760 return self
763class HasCompileState(Generative):
764 """A class that has a :class:`.CompileState` associated with it."""
766 _compile_state_plugin: Optional[Type[CompileState]] = None
768 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT
770 _compile_state_factory = CompileState.create_for_statement
773class _MetaOptions(type):
774 """metaclass for the Options class.
776 This metaclass is actually necessary despite the availability of the
777 ``__init_subclass__()`` hook as this type also provides custom class-level
778 behavior for the ``__add__()`` method.
780 """
782 _cache_attrs: Tuple[str, ...]
784 def __add__(self, other):
785 o1 = self()
787 if set(other).difference(self._cache_attrs):
788 raise TypeError(
789 "dictionary contains attributes not covered by "
790 "Options class %s: %r"
791 % (self, set(other).difference(self._cache_attrs))
792 )
794 o1.__dict__.update(other)
795 return o1
797 if TYPE_CHECKING:
799 def __getattr__(self, key: str) -> Any: ...
801 def __setattr__(self, key: str, value: Any) -> None: ...
803 def __delattr__(self, key: str) -> None: ...
806class Options(metaclass=_MetaOptions):
807 """A cacheable option dictionary with defaults."""
809 __slots__ = ()
811 _cache_attrs: Tuple[str, ...]
813 def __init_subclass__(cls) -> None:
814 dict_ = cls.__dict__
815 cls._cache_attrs = tuple(
816 sorted(
817 d
818 for d in dict_
819 if not d.startswith("__")
820 and d not in ("_cache_key_traversal",)
821 )
822 )
823 super().__init_subclass__()
825 def __init__(self, **kw):
826 self.__dict__.update(kw)
828 def __add__(self, other):
829 o1 = self.__class__.__new__(self.__class__)
830 o1.__dict__.update(self.__dict__)
832 if set(other).difference(self._cache_attrs):
833 raise TypeError(
834 "dictionary contains attributes not covered by "
835 "Options class %s: %r"
836 % (self, set(other).difference(self._cache_attrs))
837 )
839 o1.__dict__.update(other)
840 return o1
842 def __eq__(self, other):
843 # TODO: very inefficient. This is used only in test suites
844 # right now.
845 for a, b in zip_longest(self._cache_attrs, other._cache_attrs):
846 if getattr(self, a) != getattr(other, b):
847 return False
848 return True
850 def __repr__(self):
851 # TODO: fairly inefficient, used only in debugging right now.
853 return "%s(%s)" % (
854 self.__class__.__name__,
855 ", ".join(
856 "%s=%r" % (k, self.__dict__[k])
857 for k in self._cache_attrs
858 if k in self.__dict__
859 ),
860 )
862 @classmethod
863 def isinstance(cls, klass: Type[Any]) -> bool:
864 return issubclass(cls, klass)
866 @hybridmethod
867 def add_to_element(self, name, value):
868 return self + {name: getattr(self, name) + value}
870 @hybridmethod
871 def _state_dict_inst(self) -> Mapping[str, Any]:
872 return self.__dict__
874 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT
876 @_state_dict_inst.classlevel
877 def _state_dict(cls) -> Mapping[str, Any]:
878 return cls._state_dict_const
880 @classmethod
881 def safe_merge(cls, other):
882 d = other._state_dict()
884 # only support a merge with another object of our class
885 # and which does not have attrs that we don't. otherwise
886 # we risk having state that might not be part of our cache
887 # key strategy
889 if (
890 cls is not other.__class__
891 and other._cache_attrs
892 and set(other._cache_attrs).difference(cls._cache_attrs)
893 ):
894 raise TypeError(
895 "other element %r is not empty, is not of type %s, "
896 "and contains attributes not covered here %r"
897 % (
898 other,
899 cls,
900 set(other._cache_attrs).difference(cls._cache_attrs),
901 )
902 )
903 return cls + d
905 @classmethod
906 def from_execution_options(
907 cls, key, attrs, exec_options, statement_exec_options
908 ):
909 """process Options argument in terms of execution options.
912 e.g.::
914 (
915 load_options,
916 execution_options,
917 ) = QueryContext.default_load_options.from_execution_options(
918 "_sa_orm_load_options",
919 {
920 "populate_existing",
921 "autoflush",
922 "yield_per"
923 },
924 execution_options,
925 statement._execution_options,
926 )
928 get back the Options and refresh "_sa_orm_load_options" in the
929 exec options dict w/ the Options as well
931 """
933 # common case is that no options we are looking for are
934 # in either dictionary, so cancel for that first
935 check_argnames = attrs.intersection(
936 set(exec_options).union(statement_exec_options)
937 )
939 existing_options = exec_options.get(key, cls)
941 if check_argnames:
942 result = {}
943 for argname in check_argnames:
944 local = "_" + argname
945 if argname in exec_options:
946 result[local] = exec_options[argname]
947 elif argname in statement_exec_options:
948 result[local] = statement_exec_options[argname]
950 new_options = existing_options + result
951 exec_options = util.immutabledict().merge_with(
952 exec_options, {key: new_options}
953 )
954 return new_options, exec_options
956 else:
957 return existing_options, exec_options
959 if TYPE_CHECKING:
961 def __getattr__(self, key: str) -> Any: ...
963 def __setattr__(self, key: str, value: Any) -> None: ...
965 def __delattr__(self, key: str) -> None: ...
968class CacheableOptions(Options, HasCacheKey):
969 __slots__ = ()
971 @hybridmethod
972 def _gen_cache_key_inst(self, anon_map, bindparams):
973 return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
975 @_gen_cache_key_inst.classlevel
976 def _gen_cache_key(cls, anon_map, bindparams):
977 return (cls, ())
979 @hybridmethod
980 def _generate_cache_key(self):
981 return HasCacheKey._generate_cache_key_for_object(self)
984class ExecutableOption(HasCopyInternals):
985 __slots__ = ()
987 _annotations = util.EMPTY_DICT
989 __visit_name__ = "executable_option"
991 _is_has_cache_key = False
993 _is_core = True
995 def _clone(self, **kw):
996 """Create a shallow copy of this ExecutableOption."""
997 c = self.__class__.__new__(self.__class__)
998 c.__dict__ = dict(self.__dict__) # type: ignore
999 return c
1002class Executable(roles.StatementRole):
1003 """Mark a :class:`_expression.ClauseElement` as supporting execution.
1005 :class:`.Executable` is a superclass for all "statement" types
1006 of objects, including :func:`select`, :func:`delete`, :func:`update`,
1007 :func:`insert`, :func:`text`.
1009 """
1011 supports_execution: bool = True
1012 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT
1013 _is_default_generator = False
1014 _with_options: Tuple[ExecutableOption, ...] = ()
1015 _with_context_options: Tuple[
1016 Tuple[Callable[[CompileState], None], Any], ...
1017 ] = ()
1018 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]]
1020 _executable_traverse_internals = [
1021 ("_with_options", InternalTraversal.dp_executable_options),
1022 (
1023 "_with_context_options",
1024 ExtendedInternalTraversal.dp_with_context_options,
1025 ),
1026 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
1027 ]
1029 is_select = False
1030 is_from_statement = False
1031 is_update = False
1032 is_insert = False
1033 is_text = False
1034 is_delete = False
1035 is_dml = False
1037 if TYPE_CHECKING:
1038 __visit_name__: str
1040 def _compile_w_cache(
1041 self,
1042 dialect: Dialect,
1043 *,
1044 compiled_cache: Optional[CompiledCacheType],
1045 column_keys: List[str],
1046 for_executemany: bool = False,
1047 schema_translate_map: Optional[SchemaTranslateMapType] = None,
1048 **kw: Any,
1049 ) -> Tuple[
1050 Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats
1051 ]: ...
1053 def _execute_on_connection(
1054 self,
1055 connection: Connection,
1056 distilled_params: _CoreMultiExecuteParams,
1057 execution_options: CoreExecuteOptionsParameter,
1058 ) -> CursorResult[Any]: ...
1060 def _execute_on_scalar(
1061 self,
1062 connection: Connection,
1063 distilled_params: _CoreMultiExecuteParams,
1064 execution_options: CoreExecuteOptionsParameter,
1065 ) -> Any: ...
1067 @util.ro_non_memoized_property
1068 def _all_selected_columns(self):
1069 raise NotImplementedError()
1071 @property
1072 def _effective_plugin_target(self) -> str:
1073 return self.__visit_name__
1075 @_generative
1076 def options(self, *options: ExecutableOption) -> Self:
1077 """Apply options to this statement.
1079 In the general sense, options are any kind of Python object
1080 that can be interpreted by the SQL compiler for the statement.
1081 These options can be consumed by specific dialects or specific kinds
1082 of compilers.
1084 The most commonly known kind of option are the ORM level options
1085 that apply "eager load" and other loading behaviors to an ORM
1086 query. However, options can theoretically be used for many other
1087 purposes.
1089 For background on specific kinds of options for specific kinds of
1090 statements, refer to the documentation for those option objects.
1092 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to
1093 Core statement objects towards the goal of allowing unified
1094 Core / ORM querying capabilities.
1096 .. seealso::
1098 :ref:`loading_columns` - refers to options specific to the usage
1099 of ORM queries
1101 :ref:`relationship_loader_options` - refers to options specific
1102 to the usage of ORM queries
1104 """
1105 self._with_options += tuple(
1106 coercions.expect(roles.ExecutableOptionRole, opt)
1107 for opt in options
1108 )
1109 return self
1111 @_generative
1112 def _set_compile_options(self, compile_options: CacheableOptions) -> Self:
1113 """Assign the compile options to a new value.
1115 :param compile_options: appropriate CacheableOptions structure
1117 """
1119 self._compile_options = compile_options
1120 return self
1122 @_generative
1123 def _update_compile_options(self, options: CacheableOptions) -> Self:
1124 """update the _compile_options with new keys."""
1126 assert self._compile_options is not None
1127 self._compile_options += options
1128 return self
1130 @_generative
1131 def _add_context_option(
1132 self,
1133 callable_: Callable[[CompileState], None],
1134 cache_args: Any,
1135 ) -> Self:
1136 """Add a context option to this statement.
1138 These are callable functions that will
1139 be given the CompileState object upon compilation.
1141 A second argument cache_args is required, which will be combined with
1142 the ``__code__`` identity of the function itself in order to produce a
1143 cache key.
1145 """
1146 self._with_context_options += ((callable_, cache_args),)
1147 return self
1149 @overload
1150 def execution_options(
1151 self,
1152 *,
1153 compiled_cache: Optional[CompiledCacheType] = ...,
1154 logging_token: str = ...,
1155 isolation_level: IsolationLevel = ...,
1156 no_parameters: bool = False,
1157 stream_results: bool = False,
1158 max_row_buffer: int = ...,
1159 yield_per: int = ...,
1160 insertmanyvalues_page_size: int = ...,
1161 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
1162 populate_existing: bool = False,
1163 autoflush: bool = False,
1164 synchronize_session: SynchronizeSessionArgument = ...,
1165 dml_strategy: DMLStrategyArgument = ...,
1166 render_nulls: bool = ...,
1167 is_delete_using: bool = ...,
1168 is_update_from: bool = ...,
1169 preserve_rowcount: bool = False,
1170 **opt: Any,
1171 ) -> Self: ...
1173 @overload
1174 def execution_options(self, **opt: Any) -> Self: ...
1176 @_generative
1177 def execution_options(self, **kw: Any) -> Self:
1178 """Set non-SQL options for the statement which take effect during
1179 execution.
1181 Execution options can be set at many scopes, including per-statement,
1182 per-connection, or per execution, using methods such as
1183 :meth:`_engine.Connection.execution_options` and parameters which
1184 accept a dictionary of options such as
1185 :paramref:`_engine.Connection.execute.execution_options` and
1186 :paramref:`_orm.Session.execute.execution_options`.
1188 The primary characteristic of an execution option, as opposed to
1189 other kinds of options such as ORM loader options, is that
1190 **execution options never affect the compiled SQL of a query, only
1191 things that affect how the SQL statement itself is invoked or how
1192 results are fetched**. That is, execution options are not part of
1193 what's accommodated by SQL compilation nor are they considered part of
1194 the cached state of a statement.
1196 The :meth:`_sql.Executable.execution_options` method is
1197 :term:`generative`, as
1198 is the case for the method as applied to the :class:`_engine.Engine`
1199 and :class:`_orm.Query` objects, which means when the method is called,
1200 a copy of the object is returned, which applies the given parameters to
1201 that new copy, but leaves the original unchanged::
1203 statement = select(table.c.x, table.c.y)
1204 new_statement = statement.execution_options(my_option=True)
1206 An exception to this behavior is the :class:`_engine.Connection`
1207 object, where the :meth:`_engine.Connection.execution_options` method
1208 is explicitly **not** generative.
1210 The kinds of options that may be passed to
1211 :meth:`_sql.Executable.execution_options` and other related methods and
1212 parameter dictionaries include parameters that are explicitly consumed
1213 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not
1214 defined by SQLAlchemy, which means the methods and/or parameter
1215 dictionaries may be used for user-defined parameters that interact with
1216 custom code, which may access the parameters using methods such as
1217 :meth:`_sql.Executable.get_execution_options` and
1218 :meth:`_engine.Connection.get_execution_options`, or within selected
1219 event hooks using a dedicated ``execution_options`` event parameter
1220 such as
1221 :paramref:`_events.ConnectionEvents.before_execute.execution_options`
1222 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.::
1224 from sqlalchemy import event
1226 @event.listens_for(some_engine, "before_execute")
1227 def _process_opt(conn, statement, multiparams, params, execution_options):
1228 "run a SQL function before invoking a statement"
1230 if execution_options.get("do_special_thing", False):
1231 conn.exec_driver_sql("run_special_function()")
1233 Within the scope of options that are explicitly recognized by
1234 SQLAlchemy, most apply to specific classes of objects and not others.
1235 The most common execution options include:
1237 * :paramref:`_engine.Connection.execution_options.isolation_level` -
1238 sets the isolation level for a connection or a class of connections
1239 via an :class:`_engine.Engine`. This option is accepted only
1240 by :class:`_engine.Connection` or :class:`_engine.Engine`.
1242 * :paramref:`_engine.Connection.execution_options.stream_results` -
1243 indicates results should be fetched using a server side cursor;
1244 this option is accepted by :class:`_engine.Connection`, by the
1245 :paramref:`_engine.Connection.execute.execution_options` parameter
1246 on :meth:`_engine.Connection.execute`, and additionally by
1247 :meth:`_sql.Executable.execution_options` on a SQL statement object,
1248 as well as by ORM constructs like :meth:`_orm.Session.execute`.
1250 * :paramref:`_engine.Connection.execution_options.compiled_cache` -
1251 indicates a dictionary that will serve as the
1252 :ref:`SQL compilation cache <sql_caching>`
1253 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as
1254 well as for ORM methods like :meth:`_orm.Session.execute`.
1255 Can be passed as ``None`` to disable caching for statements.
1256 This option is not accepted by
1257 :meth:`_sql.Executable.execution_options` as it is inadvisable to
1258 carry along a compilation cache within a statement object.
1260 * :paramref:`_engine.Connection.execution_options.schema_translate_map`
1261 - a mapping of schema names used by the
1262 :ref:`Schema Translate Map <schema_translating>` feature, accepted
1263 by :class:`_engine.Connection`, :class:`_engine.Engine`,
1264 :class:`_sql.Executable`, as well as by ORM constructs
1265 like :meth:`_orm.Session.execute`.
1267 .. seealso::
1269 :meth:`_engine.Connection.execution_options`
1271 :paramref:`_engine.Connection.execute.execution_options`
1273 :paramref:`_orm.Session.execute.execution_options`
1275 :ref:`orm_queryguide_execution_options` - documentation on all
1276 ORM-specific execution options
1278 """ # noqa: E501
1279 if "isolation_level" in kw:
1280 raise exc.ArgumentError(
1281 "'isolation_level' execution option may only be specified "
1282 "on Connection.execution_options(), or "
1283 "per-engine using the isolation_level "
1284 "argument to create_engine()."
1285 )
1286 if "compiled_cache" in kw:
1287 raise exc.ArgumentError(
1288 "'compiled_cache' execution option may only be specified "
1289 "on Connection.execution_options(), not per statement."
1290 )
1291 self._execution_options = self._execution_options.union(kw)
1292 return self
1294 def get_execution_options(self) -> _ExecuteOptions:
1295 """Get the non-SQL options which will take effect during execution.
1297 .. versionadded:: 1.3
1299 .. seealso::
1301 :meth:`.Executable.execution_options`
1302 """
1303 return self._execution_options
1306class SchemaEventTarget(event.EventTarget):
1307 """Base class for elements that are the targets of :class:`.DDLEvents`
1308 events.
1310 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
1312 """
1314 dispatch: dispatcher[SchemaEventTarget]
1316 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
1317 """Associate with this SchemaEvent's parent object."""
1319 def _set_parent_with_dispatch(
1320 self, parent: SchemaEventTarget, **kw: Any
1321 ) -> None:
1322 self.dispatch.before_parent_attach(self, parent)
1323 self._set_parent(parent, **kw)
1324 self.dispatch.after_parent_attach(self, parent)
1327class SchemaVisitor(ClauseVisitor):
1328 """Define the visiting for ``SchemaItem`` objects."""
1330 __traverse_options__ = {"schema_visitor": True}
1333class _SentinelDefaultCharacterization(Enum):
1334 NONE = "none"
1335 UNKNOWN = "unknown"
1336 CLIENTSIDE = "clientside"
1337 SENTINEL_DEFAULT = "sentinel_default"
1338 SERVERSIDE = "serverside"
1339 IDENTITY = "identity"
1340 SEQUENCE = "sequence"
1343class _SentinelColumnCharacterization(NamedTuple):
1344 columns: Optional[Sequence[Column[Any]]] = None
1345 is_explicit: bool = False
1346 is_autoinc: bool = False
1347 default_characterization: _SentinelDefaultCharacterization = (
1348 _SentinelDefaultCharacterization.NONE
1349 )
1352_COLKEY = TypeVar("_COLKEY", Union[None, str], str)
1354_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True)
1355_COL = TypeVar("_COL", bound="ColumnElement[Any]")
1358class _ColumnMetrics(Generic[_COL_co]):
1359 __slots__ = ("column",)
1361 column: _COL_co
1363 def __init__(
1364 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co
1365 ):
1366 self.column = col
1368 # proxy_index being non-empty means it was initialized.
1369 # so we need to update it
1370 pi = collection._proxy_index
1371 if pi:
1372 for eps_col in col._expanded_proxy_set:
1373 pi[eps_col].add(self)
1375 def get_expanded_proxy_set(self):
1376 return self.column._expanded_proxy_set
1378 def dispose(self, collection):
1379 pi = collection._proxy_index
1380 if not pi:
1381 return
1382 for col in self.column._expanded_proxy_set:
1383 colset = pi.get(col, None)
1384 if colset:
1385 colset.discard(self)
1386 if colset is not None and not colset:
1387 del pi[col]
1389 def embedded(
1390 self,
1391 target_set: Union[
1392 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]]
1393 ],
1394 ) -> bool:
1395 expanded_proxy_set = self.column._expanded_proxy_set
1396 for t in target_set.difference(expanded_proxy_set):
1397 if not expanded_proxy_set.intersection(_expand_cloned([t])):
1398 return False
1399 return True
1402class ColumnCollection(Generic[_COLKEY, _COL_co]):
1403 """Collection of :class:`_expression.ColumnElement` instances,
1404 typically for
1405 :class:`_sql.FromClause` objects.
1407 The :class:`_sql.ColumnCollection` object is most commonly available
1408 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection
1409 on the :class:`_schema.Table` object, introduced at
1410 :ref:`metadata_tables_and_columns`.
1412 The :class:`_expression.ColumnCollection` has both mapping- and sequence-
1413 like behaviors. A :class:`_expression.ColumnCollection` usually stores
1414 :class:`_schema.Column` objects, which are then accessible both via mapping
1415 style access as well as attribute access style.
1417 To access :class:`_schema.Column` objects using ordinary attribute-style
1418 access, specify the name like any other object attribute, such as below
1419 a column named ``employee_name`` is accessed::
1421 >>> employee_table.c.employee_name
1423 To access columns that have names with special characters or spaces,
1424 index-style access is used, such as below which illustrates a column named
1425 ``employee ' payment`` is accessed::
1427 >>> employee_table.c["employee ' payment"]
1429 As the :class:`_sql.ColumnCollection` object provides a Python dictionary
1430 interface, common dictionary method names like
1431 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`,
1432 and :meth:`_sql.ColumnCollection.items` are available, which means that
1433 database columns that are keyed under these names also need to use indexed
1434 access::
1436 >>> employee_table.c["values"]
1439 The name for which a :class:`_schema.Column` would be present is normally
1440 that of the :paramref:`_schema.Column.key` parameter. In some contexts,
1441 such as a :class:`_sql.Select` object that uses a label style set
1442 using the :meth:`_sql.Select.set_label_style` method, a column of a certain
1443 key may instead be represented under a particular label name such
1444 as ``tablename_columnname``::
1446 >>> from sqlalchemy import select, column, table
1447 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
1448 >>> t = table("t", column("c"))
1449 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
1450 >>> subq = stmt.subquery()
1451 >>> subq.c.t_c
1452 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c>
1454 :class:`.ColumnCollection` also indexes the columns in order and allows
1455 them to be accessible by their integer position::
1457 >>> cc[0]
1458 Column('x', Integer(), table=None)
1459 >>> cc[1]
1460 Column('y', Integer(), table=None)
1462 .. versionadded:: 1.4 :class:`_expression.ColumnCollection`
1463 allows integer-based
1464 index access to the collection.
1466 Iterating the collection yields the column expressions in order::
1468 >>> list(cc)
1469 [Column('x', Integer(), table=None),
1470 Column('y', Integer(), table=None)]
1472 The base :class:`_expression.ColumnCollection` object can store
1473 duplicates, which can
1474 mean either two columns with the same key, in which case the column
1475 returned by key access is **arbitrary**::
1477 >>> x1, x2 = Column('x', Integer), Column('x', Integer)
1478 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)])
1479 >>> list(cc)
1480 [Column('x', Integer(), table=None),
1481 Column('x', Integer(), table=None)]
1482 >>> cc['x'] is x1
1483 False
1484 >>> cc['x'] is x2
1485 True
1487 Or it can also mean the same column multiple times. These cases are
1488 supported as :class:`_expression.ColumnCollection`
1489 is used to represent the columns in
1490 a SELECT statement which may include duplicates.
1492 A special subclass :class:`.DedupeColumnCollection` exists which instead
1493 maintains SQLAlchemy's older behavior of not allowing duplicates; this
1494 collection is used for schema level objects like :class:`_schema.Table`
1495 and
1496 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The
1497 :class:`.DedupeColumnCollection` class also has additional mutation methods
1498 as the schema constructs have more use cases that require removal and
1499 replacement of columns.
1501 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection`
1502 now stores duplicate
1503 column keys as well as the same column in multiple positions. The
1504 :class:`.DedupeColumnCollection` class is added to maintain the
1505 former behavior in those cases where deduplication as well as
1506 additional replace/remove operations are needed.
1509 """
1511 __slots__ = "_collection", "_index", "_colset", "_proxy_index"
1513 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]]
1514 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
1515 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]]
1516 _colset: Set[_COL_co]
1518 def __init__(
1519 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None
1520 ):
1521 object.__setattr__(self, "_colset", set())
1522 object.__setattr__(self, "_index", {})
1523 object.__setattr__(
1524 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1525 )
1526 object.__setattr__(self, "_collection", [])
1527 if columns:
1528 self._initial_populate(columns)
1530 @util.preload_module("sqlalchemy.sql.elements")
1531 def __clause_element__(self) -> ClauseList:
1532 elements = util.preloaded.sql_elements
1534 return elements.ClauseList(
1535 _literal_as_text_role=roles.ColumnsClauseRole,
1536 group=False,
1537 *self._all_columns,
1538 )
1540 def _initial_populate(
1541 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1542 ) -> None:
1543 self._populate_separate_keys(iter_)
1545 @property
1546 def _all_columns(self) -> List[_COL_co]:
1547 return [col for (_, col, _) in self._collection]
1549 def keys(self) -> List[_COLKEY]:
1550 """Return a sequence of string key names for all columns in this
1551 collection."""
1552 return [k for (k, _, _) in self._collection]
1554 def values(self) -> List[_COL_co]:
1555 """Return a sequence of :class:`_sql.ColumnClause` or
1556 :class:`_schema.Column` objects for all columns in this
1557 collection."""
1558 return [col for (_, col, _) in self._collection]
1560 def items(self) -> List[Tuple[_COLKEY, _COL_co]]:
1561 """Return a sequence of (key, column) tuples for all columns in this
1562 collection each consisting of a string key name and a
1563 :class:`_sql.ColumnClause` or
1564 :class:`_schema.Column` object.
1565 """
1567 return [(k, col) for (k, col, _) in self._collection]
1569 def __bool__(self) -> bool:
1570 return bool(self._collection)
1572 def __len__(self) -> int:
1573 return len(self._collection)
1575 def __iter__(self) -> Iterator[_COL_co]:
1576 # turn to a list first to maintain over a course of changes
1577 return iter([col for _, col, _ in self._collection])
1579 @overload
1580 def __getitem__(self, key: Union[str, int]) -> _COL_co: ...
1582 @overload
1583 def __getitem__(
1584 self, key: Tuple[Union[str, int], ...]
1585 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1587 @overload
1588 def __getitem__(
1589 self, key: slice
1590 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1592 def __getitem__(
1593 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]]
1594 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]:
1595 try:
1596 if isinstance(key, (tuple, slice)):
1597 if isinstance(key, slice):
1598 cols = (
1599 (sub_key, col)
1600 for (sub_key, col, _) in self._collection[key]
1601 )
1602 else:
1603 cols = (self._index[sub_key] for sub_key in key)
1605 return ColumnCollection(cols).as_readonly()
1606 else:
1607 return self._index[key][1]
1608 except KeyError as err:
1609 if isinstance(err.args[0], int):
1610 raise IndexError(err.args[0]) from err
1611 else:
1612 raise
1614 def __getattr__(self, key: str) -> _COL_co:
1615 try:
1616 return self._index[key][1]
1617 except KeyError as err:
1618 raise AttributeError(key) from err
1620 def __contains__(self, key: str) -> bool:
1621 if key not in self._index:
1622 if not isinstance(key, str):
1623 raise exc.ArgumentError(
1624 "__contains__ requires a string argument"
1625 )
1626 return False
1627 else:
1628 return True
1630 def compare(self, other: ColumnCollection[Any, Any]) -> bool:
1631 """Compare this :class:`_expression.ColumnCollection` to another
1632 based on the names of the keys"""
1634 for l, r in zip_longest(self, other):
1635 if l is not r:
1636 return False
1637 else:
1638 return True
1640 def __eq__(self, other: Any) -> bool:
1641 return self.compare(other)
1643 @overload
1644 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ...
1646 @overload
1647 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ...
1649 def get(
1650 self, key: str, default: Optional[_COL] = None
1651 ) -> Optional[Union[_COL_co, _COL]]:
1652 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object
1653 based on a string key name from this
1654 :class:`_expression.ColumnCollection`."""
1656 if key in self._index:
1657 return self._index[key][1]
1658 else:
1659 return default
1661 def __str__(self) -> str:
1662 return "%s(%s)" % (
1663 self.__class__.__name__,
1664 ", ".join(str(c) for c in self),
1665 )
1667 def __setitem__(self, key: str, value: Any) -> NoReturn:
1668 raise NotImplementedError()
1670 def __delitem__(self, key: str) -> NoReturn:
1671 raise NotImplementedError()
1673 def __setattr__(self, key: str, obj: Any) -> NoReturn:
1674 raise NotImplementedError()
1676 def clear(self) -> NoReturn:
1677 """Dictionary clear() is not implemented for
1678 :class:`_sql.ColumnCollection`."""
1679 raise NotImplementedError()
1681 def remove(self, column: Any) -> None:
1682 raise NotImplementedError()
1684 def update(self, iter_: Any) -> NoReturn:
1685 """Dictionary update() is not implemented for
1686 :class:`_sql.ColumnCollection`."""
1687 raise NotImplementedError()
1689 # https://github.com/python/mypy/issues/4266
1690 __hash__ = None # type: ignore
1692 def _populate_separate_keys(
1693 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1694 ) -> None:
1695 """populate from an iterator of (key, column)"""
1697 self._collection[:] = collection = [
1698 (k, c, _ColumnMetrics(self, c)) for k, c in iter_
1699 ]
1700 self._colset.update(c._deannotate() for _, c, _ in collection)
1701 self._index.update(
1702 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)}
1703 )
1704 self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
1706 def add(
1707 self, column: ColumnElement[Any], key: Optional[_COLKEY] = None
1708 ) -> None:
1709 """Add a column to this :class:`_sql.ColumnCollection`.
1711 .. note::
1713 This method is **not normally used by user-facing code**, as the
1714 :class:`_sql.ColumnCollection` is usually part of an existing
1715 object such as a :class:`_schema.Table`. To add a
1716 :class:`_schema.Column` to an existing :class:`_schema.Table`
1717 object, use the :meth:`_schema.Table.append_column` method.
1719 """
1720 colkey: _COLKEY
1722 if key is None:
1723 colkey = column.key # type: ignore
1724 else:
1725 colkey = key
1727 l = len(self._collection)
1729 # don't really know how this part is supposed to work w/ the
1730 # covariant thing
1732 _column = cast(_COL_co, column)
1734 self._collection.append(
1735 (colkey, _column, _ColumnMetrics(self, _column))
1736 )
1737 self._colset.add(_column._deannotate())
1738 self._index[l] = (colkey, _column)
1739 if colkey not in self._index:
1740 self._index[colkey] = (colkey, _column)
1742 def __getstate__(self) -> Dict[str, Any]:
1743 return {
1744 "_collection": [(k, c) for k, c, _ in self._collection],
1745 "_index": self._index,
1746 }
1748 def __setstate__(self, state: Dict[str, Any]) -> None:
1749 object.__setattr__(self, "_index", state["_index"])
1750 object.__setattr__(
1751 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1752 )
1753 object.__setattr__(
1754 self,
1755 "_collection",
1756 [
1757 (k, c, _ColumnMetrics(self, c))
1758 for (k, c) in state["_collection"]
1759 ],
1760 )
1761 object.__setattr__(
1762 self, "_colset", {col for k, col, _ in self._collection}
1763 )
1765 def contains_column(self, col: ColumnElement[Any]) -> bool:
1766 """Checks if a column object exists in this collection"""
1767 if col not in self._colset:
1768 if isinstance(col, str):
1769 raise exc.ArgumentError(
1770 "contains_column cannot be used with string arguments. "
1771 "Use ``col_name in table.c`` instead."
1772 )
1773 return False
1774 else:
1775 return True
1777 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
1778 """Return a "read only" form of this
1779 :class:`_sql.ColumnCollection`."""
1781 return ReadOnlyColumnCollection(self)
1783 def _init_proxy_index(self):
1784 """populate the "proxy index", if empty.
1786 proxy index is added in 2.0 to provide more efficient operation
1787 for the corresponding_column() method.
1789 For reasons of both time to construct new .c collections as well as
1790 memory conservation for large numbers of large .c collections, the
1791 proxy_index is only filled if corresponding_column() is called. once
1792 filled it stays that way, and new _ColumnMetrics objects created after
1793 that point will populate it with new data. Note this case would be
1794 unusual, if not nonexistent, as it means a .c collection is being
1795 mutated after corresponding_column() were used, however it is tested in
1796 test/base/test_utils.py.
1798 """
1799 pi = self._proxy_index
1800 if pi:
1801 return
1803 for _, _, metrics in self._collection:
1804 eps = metrics.column._expanded_proxy_set
1806 for eps_col in eps:
1807 pi[eps_col].add(metrics)
1809 def corresponding_column(
1810 self, column: _COL, require_embedded: bool = False
1811 ) -> Optional[Union[_COL, _COL_co]]:
1812 """Given a :class:`_expression.ColumnElement`, return the exported
1813 :class:`_expression.ColumnElement` object from this
1814 :class:`_expression.ColumnCollection`
1815 which corresponds to that original :class:`_expression.ColumnElement`
1816 via a common
1817 ancestor column.
1819 :param column: the target :class:`_expression.ColumnElement`
1820 to be matched.
1822 :param require_embedded: only return corresponding columns for
1823 the given :class:`_expression.ColumnElement`, if the given
1824 :class:`_expression.ColumnElement`
1825 is actually present within a sub-element
1826 of this :class:`_expression.Selectable`.
1827 Normally the column will match if
1828 it merely shares a common ancestor with one of the exported
1829 columns of this :class:`_expression.Selectable`.
1831 .. seealso::
1833 :meth:`_expression.Selectable.corresponding_column`
1834 - invokes this method
1835 against the collection returned by
1836 :attr:`_expression.Selectable.exported_columns`.
1838 .. versionchanged:: 1.4 the implementation for ``corresponding_column``
1839 was moved onto the :class:`_expression.ColumnCollection` itself.
1841 """
1842 # TODO: cython candidate
1844 # don't dig around if the column is locally present
1845 if column in self._colset:
1846 return column
1848 selected_intersection, selected_metrics = None, None
1849 target_set = column.proxy_set
1851 pi = self._proxy_index
1852 if not pi:
1853 self._init_proxy_index()
1855 for current_metrics in (
1856 mm for ts in target_set if ts in pi for mm in pi[ts]
1857 ):
1858 if not require_embedded or current_metrics.embedded(target_set):
1859 if selected_metrics is None:
1860 # no corresponding column yet, pick this one.
1861 selected_metrics = current_metrics
1862 continue
1864 current_intersection = target_set.intersection(
1865 current_metrics.column._expanded_proxy_set
1866 )
1867 if selected_intersection is None:
1868 selected_intersection = target_set.intersection(
1869 selected_metrics.column._expanded_proxy_set
1870 )
1872 if len(current_intersection) > len(selected_intersection):
1873 # 'current' has a larger field of correspondence than
1874 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x
1875 # matches a1.c.x->table.c.x better than
1876 # selectable.c.x->table.c.x does.
1878 selected_metrics = current_metrics
1879 selected_intersection = current_intersection
1880 elif current_intersection == selected_intersection:
1881 # they have the same field of correspondence. see
1882 # which proxy_set has fewer columns in it, which
1883 # indicates a closer relationship with the root
1884 # column. Also take into account the "weight"
1885 # attribute which CompoundSelect() uses to give
1886 # higher precedence to columns based on vertical
1887 # position in the compound statement, and discard
1888 # columns that have no reference to the target
1889 # column (also occurs with CompoundSelect)
1891 selected_col_distance = sum(
1892 [
1893 sc._annotations.get("weight", 1)
1894 for sc in (
1895 selected_metrics.column._uncached_proxy_list()
1896 )
1897 if sc.shares_lineage(column)
1898 ],
1899 )
1900 current_col_distance = sum(
1901 [
1902 sc._annotations.get("weight", 1)
1903 for sc in (
1904 current_metrics.column._uncached_proxy_list()
1905 )
1906 if sc.shares_lineage(column)
1907 ],
1908 )
1909 if current_col_distance < selected_col_distance:
1910 selected_metrics = current_metrics
1911 selected_intersection = current_intersection
1913 return selected_metrics.column if selected_metrics else None
1916_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]")
1919class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
1920 """A :class:`_expression.ColumnCollection`
1921 that maintains deduplicating behavior.
1923 This is useful by schema level objects such as :class:`_schema.Table` and
1924 :class:`.PrimaryKeyConstraint`. The collection includes more
1925 sophisticated mutator methods as well to suit schema objects which
1926 require mutable column collections.
1928 .. versionadded:: 1.4
1930 """
1932 def add( # type: ignore[override]
1933 self, column: _NAMEDCOL, key: Optional[str] = None
1934 ) -> None:
1935 if key is not None and column.key != key:
1936 raise exc.ArgumentError(
1937 "DedupeColumnCollection requires columns be under "
1938 "the same key as their .key"
1939 )
1940 key = column.key
1942 if key is None:
1943 raise exc.ArgumentError(
1944 "Can't add unnamed column to column collection"
1945 )
1947 if key in self._index:
1948 existing = self._index[key][1]
1950 if existing is column:
1951 return
1953 self.replace(column)
1955 # pop out memoized proxy_set as this
1956 # operation may very well be occurring
1957 # in a _make_proxy operation
1958 util.memoized_property.reset(column, "proxy_set")
1959 else:
1960 self._append_new_column(key, column)
1962 def _append_new_column(self, key: str, named_column: _NAMEDCOL) -> None:
1963 l = len(self._collection)
1964 self._collection.append(
1965 (key, named_column, _ColumnMetrics(self, named_column))
1966 )
1967 self._colset.add(named_column._deannotate())
1968 self._index[l] = (key, named_column)
1969 self._index[key] = (key, named_column)
1971 def _populate_separate_keys(
1972 self, iter_: Iterable[Tuple[str, _NAMEDCOL]]
1973 ) -> None:
1974 """populate from an iterator of (key, column)"""
1975 cols = list(iter_)
1977 replace_col = []
1978 for k, col in cols:
1979 if col.key != k:
1980 raise exc.ArgumentError(
1981 "DedupeColumnCollection requires columns be under "
1982 "the same key as their .key"
1983 )
1984 if col.name in self._index and col.key != col.name:
1985 replace_col.append(col)
1986 elif col.key in self._index:
1987 replace_col.append(col)
1988 else:
1989 self._index[k] = (k, col)
1990 self._collection.append((k, col, _ColumnMetrics(self, col)))
1991 self._colset.update(c._deannotate() for (k, c, _) in self._collection)
1993 self._index.update(
1994 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection)
1995 )
1996 for col in replace_col:
1997 self.replace(col)
1999 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None:
2000 self._populate_separate_keys((col.key, col) for col in iter_)
2002 def remove(self, column: _NAMEDCOL) -> None:
2003 if column not in self._colset:
2004 raise ValueError(
2005 "Can't remove column %r; column is not in this collection"
2006 % column
2007 )
2008 del self._index[column.key]
2009 self._colset.remove(column)
2010 self._collection[:] = [
2011 (k, c, metrics)
2012 for (k, c, metrics) in self._collection
2013 if c is not column
2014 ]
2015 for metrics in self._proxy_index.get(column, ()):
2016 metrics.dispose(self)
2018 self._index.update(
2019 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2020 )
2021 # delete higher index
2022 del self._index[len(self._collection)]
2024 def replace(
2025 self,
2026 column: _NAMEDCOL,
2027 extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
2028 ) -> None:
2029 """add the given column to this collection, removing unaliased
2030 versions of this column as well as existing columns with the
2031 same key.
2033 e.g.::
2035 t = Table('sometable', metadata, Column('col1', Integer))
2036 t.columns.replace(Column('col1', Integer, key='columnone'))
2038 will remove the original 'col1' from the collection, and add
2039 the new column under the name 'columnname'.
2041 Used by schema.Column to override columns during table reflection.
2043 """
2045 if extra_remove:
2046 remove_col = set(extra_remove)
2047 else:
2048 remove_col = set()
2049 # remove up to two columns based on matches of name as well as key
2050 if column.name in self._index and column.key != column.name:
2051 other = self._index[column.name][1]
2052 if other.name == other.key:
2053 remove_col.add(other)
2055 if column.key in self._index:
2056 remove_col.add(self._index[column.key][1])
2058 if not remove_col:
2059 self._append_new_column(column.key, column)
2060 return
2061 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
2062 replaced = False
2063 for k, col, metrics in self._collection:
2064 if col in remove_col:
2065 if not replaced:
2066 replaced = True
2067 new_cols.append(
2068 (column.key, column, _ColumnMetrics(self, column))
2069 )
2070 else:
2071 new_cols.append((k, col, metrics))
2073 if remove_col:
2074 self._colset.difference_update(remove_col)
2076 for rc in remove_col:
2077 for metrics in self._proxy_index.get(rc, ()):
2078 metrics.dispose(self)
2080 if not replaced:
2081 new_cols.append((column.key, column, _ColumnMetrics(self, column)))
2083 self._colset.add(column._deannotate())
2084 self._collection[:] = new_cols
2086 self._index.clear()
2088 self._index.update(
2089 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2090 )
2091 self._index.update({k: (k, col) for (k, col, _) in self._collection})
2094class ReadOnlyColumnCollection(
2095 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co]
2096):
2097 __slots__ = ("_parent",)
2099 def __init__(self, collection):
2100 object.__setattr__(self, "_parent", collection)
2101 object.__setattr__(self, "_colset", collection._colset)
2102 object.__setattr__(self, "_index", collection._index)
2103 object.__setattr__(self, "_collection", collection._collection)
2104 object.__setattr__(self, "_proxy_index", collection._proxy_index)
2106 def __getstate__(self):
2107 return {"_parent": self._parent}
2109 def __setstate__(self, state):
2110 parent = state["_parent"]
2111 self.__init__(parent) # type: ignore
2113 def add(self, column: Any, key: Any = ...) -> Any:
2114 self._readonly()
2116 def extend(self, elements: Any) -> NoReturn:
2117 self._readonly()
2119 def remove(self, item: Any) -> NoReturn:
2120 self._readonly()
2123class ColumnSet(util.OrderedSet["ColumnClause[Any]"]):
2124 def contains_column(self, col):
2125 return col in self
2127 def extend(self, cols):
2128 for col in cols:
2129 self.add(col)
2131 def __eq__(self, other):
2132 l = []
2133 for c in other:
2134 for local in self:
2135 if c.shares_lineage(local):
2136 l.append(c == local)
2137 return elements.and_(*l)
2139 def __hash__(self):
2140 return hash(tuple(x for x in self))
2143def _entity_namespace(
2144 entity: Union[_HasEntityNamespace, ExternallyTraversible]
2145) -> _EntityNamespace:
2146 """Return the nearest .entity_namespace for the given entity.
2148 If not immediately available, does an iterate to find a sub-element
2149 that has one, if any.
2151 """
2152 try:
2153 return cast(_HasEntityNamespace, entity).entity_namespace
2154 except AttributeError:
2155 for elem in visitors.iterate(cast(ExternallyTraversible, entity)):
2156 if _is_has_entity_namespace(elem):
2157 return elem.entity_namespace
2158 else:
2159 raise
2162def _entity_namespace_key(
2163 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2164 key: str,
2165 default: Union[SQLCoreOperations[Any], _NoArg] = NO_ARG,
2166) -> SQLCoreOperations[Any]:
2167 """Return an entry from an entity_namespace.
2170 Raises :class:`_exc.InvalidRequestError` rather than attribute error
2171 on not found.
2173 """
2175 try:
2176 ns = _entity_namespace(entity)
2177 if default is not NO_ARG:
2178 return getattr(ns, key, default)
2179 else:
2180 return getattr(ns, key) # type: ignore
2181 except AttributeError as err:
2182 raise exc.InvalidRequestError(
2183 'Entity namespace for "%s" has no property "%s"' % (entity, key)
2184 ) from err