Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/base.py: 50%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# sql/base.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
9"""Foundational utilities common to many sql modules.
11"""
14from __future__ import annotations
16import collections
17from enum import Enum
18import itertools
19from itertools import zip_longest
20import operator
21import re
22from typing import Any
23from typing import Callable
24from typing import cast
25from typing import Dict
26from typing import FrozenSet
27from typing import Generic
28from typing import Iterable
29from typing import Iterator
30from typing import List
31from typing import Mapping
32from typing import MutableMapping
33from typing import NamedTuple
34from typing import NoReturn
35from typing import Optional
36from typing import overload
37from typing import Protocol
38from typing import Sequence
39from typing import Set
40from typing import Tuple
41from typing import Type
42from typing import TYPE_CHECKING
43from typing import TypeVar
44from typing import Union
46from . import roles
47from . import visitors
48from .cache_key import HasCacheKey # noqa
49from .cache_key import MemoizedHasCacheKey # noqa
50from .traversals import HasCopyInternals # noqa
51from .visitors import ClauseVisitor
52from .visitors import ExtendedInternalTraversal
53from .visitors import ExternallyTraversible
54from .visitors import InternalTraversal
55from .. import event
56from .. import exc
57from .. import util
58from ..util import HasMemoized as HasMemoized
59from ..util import hybridmethod
60from ..util.typing import Self
61from ..util.typing import TypeGuard
63if TYPE_CHECKING:
64 from . import coercions
65 from . import elements
66 from . import type_api
67 from ._orm_types import DMLStrategyArgument
68 from ._orm_types import SynchronizeSessionArgument
69 from ._typing import _CLE
70 from .elements import BindParameter
71 from .elements import ClauseList
72 from .elements import ColumnClause # noqa
73 from .elements import ColumnElement
74 from .elements import NamedColumn
75 from .elements import SQLCoreOperations
76 from .elements import TextClause
77 from .schema import Column
78 from .schema import DefaultGenerator
79 from .selectable import _JoinTargetElement
80 from .selectable import _SelectIterable
81 from .selectable import FromClause
82 from ..engine import Connection
83 from ..engine import CursorResult
84 from ..engine.interfaces import _CoreMultiExecuteParams
85 from ..engine.interfaces import _ExecuteOptions
86 from ..engine.interfaces import _ImmutableExecuteOptions
87 from ..engine.interfaces import CacheStats
88 from ..engine.interfaces import Compiled
89 from ..engine.interfaces import CompiledCacheType
90 from ..engine.interfaces import CoreExecuteOptionsParameter
91 from ..engine.interfaces import Dialect
92 from ..engine.interfaces import IsolationLevel
93 from ..engine.interfaces import SchemaTranslateMapType
94 from ..event import dispatcher
96if not TYPE_CHECKING:
97 coercions = None # noqa
98 elements = None # noqa
99 type_api = None # noqa
102class _NoArg(Enum):
103 NO_ARG = 0
105 def __repr__(self):
106 return f"_NoArg.{self.name}"
109NO_ARG = _NoArg.NO_ARG
112class _NoneName(Enum):
113 NONE_NAME = 0
114 """indicate a 'deferred' name that was ultimately the value None."""
117_NONE_NAME = _NoneName.NONE_NAME
119_T = TypeVar("_T", bound=Any)
121_Fn = TypeVar("_Fn", bound=Callable[..., Any])
123_AmbiguousTableNameMap = MutableMapping[str, str]
126class _DefaultDescriptionTuple(NamedTuple):
127 arg: Any
128 is_scalar: Optional[bool]
129 is_callable: Optional[bool]
130 is_sentinel: Optional[bool]
132 @classmethod
133 def _from_column_default(
134 cls, default: Optional[DefaultGenerator]
135 ) -> _DefaultDescriptionTuple:
136 return (
137 _DefaultDescriptionTuple(
138 default.arg, # type: ignore
139 default.is_scalar,
140 default.is_callable,
141 default.is_sentinel,
142 )
143 if default
144 and (
145 default.has_arg
146 or (not default.for_update and default.is_sentinel)
147 )
148 else _DefaultDescriptionTuple(None, None, None, None)
149 )
152_never_select_column = operator.attrgetter("_omit_from_statements")
155class _EntityNamespace(Protocol):
156 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ...
159class _HasEntityNamespace(Protocol):
160 @util.ro_non_memoized_property
161 def entity_namespace(self) -> _EntityNamespace: ...
164def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]:
165 return hasattr(element, "entity_namespace")
168# Remove when https://github.com/python/mypy/issues/14640 will be fixed
169_Self = TypeVar("_Self", bound=Any)
172class Immutable:
173 """mark a ClauseElement as 'immutable' when expressions are cloned.
175 "immutable" objects refers to the "mutability" of an object in the
176 context of SQL DQL and DML generation. Such as, in DQL, one can
177 compose a SELECT or subquery of varied forms, but one cannot modify
178 the structure of a specific table or column within DQL.
179 :class:`.Immutable` is mostly intended to follow this concept, and as
180 such the primary "immutable" objects are :class:`.ColumnClause`,
181 :class:`.Column`, :class:`.TableClause`, :class:`.Table`.
183 """
185 __slots__ = ()
187 _is_immutable = True
189 def unique_params(self, *optionaldict, **kwargs):
190 raise NotImplementedError("Immutable objects do not support copying")
192 def params(self, *optionaldict, **kwargs):
193 raise NotImplementedError("Immutable objects do not support copying")
195 def _clone(self: _Self, **kw: Any) -> _Self:
196 return self
198 def _copy_internals(
199 self, *, omit_attrs: Iterable[str] = (), **kw: Any
200 ) -> None:
201 pass
204class SingletonConstant(Immutable):
205 """Represent SQL constants like NULL, TRUE, FALSE"""
207 _is_singleton_constant = True
209 _singleton: SingletonConstant
211 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T:
212 return cast(_T, cls._singleton)
214 @util.non_memoized_property
215 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
216 raise NotImplementedError()
218 @classmethod
219 def _create_singleton(cls):
220 obj = object.__new__(cls)
221 obj.__init__() # type: ignore
223 # for a long time this was an empty frozenset, meaning
224 # a SingletonConstant would never be a "corresponding column" in
225 # a statement. This referred to #6259. However, in #7154 we see
226 # that we do in fact need "correspondence" to work when matching cols
227 # in result sets, so the non-correspondence was moved to a more
228 # specific level when we are actually adapting expressions for SQL
229 # render only.
230 obj.proxy_set = frozenset([obj])
231 cls._singleton = obj
234def _from_objects(
235 *elements: Union[
236 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement
237 ]
238) -> Iterator[FromClause]:
239 return itertools.chain.from_iterable(
240 [element._from_objects for element in elements]
241 )
244def _select_iterables(
245 elements: Iterable[roles.ColumnsClauseRole],
246) -> _SelectIterable:
247 """expand tables into individual columns in the
248 given list of column expressions.
250 """
251 return itertools.chain.from_iterable(
252 [c._select_iterable for c in elements]
253 )
256_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType")
259class _GenerativeType(Protocol):
260 def _generate(self) -> Self: ...
263def _generative(fn: _Fn) -> _Fn:
264 """non-caching _generative() decorator.
266 This is basically the legacy decorator that copies the object and
267 runs a method on the new copy.
269 """
271 @util.decorator
272 def _generative(
273 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any
274 ) -> _SelfGenerativeType:
275 """Mark a method as generative."""
277 self = self._generate()
278 x = fn(self, *args, **kw)
279 assert x is self, "generative methods must return self"
280 return self
282 decorated = _generative(fn)
283 decorated.non_generative = fn # type: ignore
284 return decorated
287def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]:
288 msgs = kw.pop("msgs", {})
290 defaults = kw.pop("defaults", {})
292 getters = [
293 (name, operator.attrgetter(name), defaults.get(name, None))
294 for name in names
295 ]
297 @util.decorator
298 def check(fn, *args, **kw):
299 # make pylance happy by not including "self" in the argument
300 # list
301 self = args[0]
302 args = args[1:]
303 for name, getter, default_ in getters:
304 if getter(self) is not default_:
305 msg = msgs.get(
306 name,
307 "Method %s() has already been invoked on this %s construct"
308 % (fn.__name__, self.__class__),
309 )
310 raise exc.InvalidRequestError(msg)
311 return fn(self, *args, **kw)
313 return check
316def _clone(element, **kw):
317 return element._clone(**kw)
320def _expand_cloned(
321 elements: Iterable[_CLE],
322) -> Iterable[_CLE]:
323 """expand the given set of ClauseElements to be the set of all 'cloned'
324 predecessors.
326 """
327 # TODO: cython candidate
328 return itertools.chain(*[x._cloned_set for x in elements])
331def _de_clone(
332 elements: Iterable[_CLE],
333) -> Iterable[_CLE]:
334 for x in elements:
335 while x._is_clone_of is not None:
336 x = x._is_clone_of
337 yield x
340def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
341 """return the intersection of sets a and b, counting
342 any overlap between 'cloned' predecessors.
344 The returned set is in terms of the entities present within 'a'.
346 """
347 all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
348 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)}
351def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
352 all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
353 return {
354 elem for elem in a if not all_overlap.intersection(elem._cloned_set)
355 }
358class _DialectArgView(MutableMapping[str, Any]):
359 """A dictionary view of dialect-level arguments in the form
360 <dialectname>_<argument_name>.
362 """
364 def __init__(self, obj):
365 self.obj = obj
367 def _key(self, key):
368 try:
369 dialect, value_key = key.split("_", 1)
370 except ValueError as err:
371 raise KeyError(key) from err
372 else:
373 return dialect, value_key
375 def __getitem__(self, key):
376 dialect, value_key = self._key(key)
378 try:
379 opt = self.obj.dialect_options[dialect]
380 except exc.NoSuchModuleError as err:
381 raise KeyError(key) from err
382 else:
383 return opt[value_key]
385 def __setitem__(self, key, value):
386 try:
387 dialect, value_key = self._key(key)
388 except KeyError as err:
389 raise exc.ArgumentError(
390 "Keys must be of the form <dialectname>_<argname>"
391 ) from err
392 else:
393 self.obj.dialect_options[dialect][value_key] = value
395 def __delitem__(self, key):
396 dialect, value_key = self._key(key)
397 del self.obj.dialect_options[dialect][value_key]
399 def __len__(self):
400 return sum(
401 len(args._non_defaults)
402 for args in self.obj.dialect_options.values()
403 )
405 def __iter__(self):
406 return (
407 "%s_%s" % (dialect_name, value_name)
408 for dialect_name in self.obj.dialect_options
409 for value_name in self.obj.dialect_options[
410 dialect_name
411 ]._non_defaults
412 )
415class _DialectArgDict(MutableMapping[str, Any]):
416 """A dictionary view of dialect-level arguments for a specific
417 dialect.
419 Maintains a separate collection of user-specified arguments
420 and dialect-specified default arguments.
422 """
424 def __init__(self):
425 self._non_defaults = {}
426 self._defaults = {}
428 def __len__(self):
429 return len(set(self._non_defaults).union(self._defaults))
431 def __iter__(self):
432 return iter(set(self._non_defaults).union(self._defaults))
434 def __getitem__(self, key):
435 if key in self._non_defaults:
436 return self._non_defaults[key]
437 else:
438 return self._defaults[key]
440 def __setitem__(self, key, value):
441 self._non_defaults[key] = value
443 def __delitem__(self, key):
444 del self._non_defaults[key]
447@util.preload_module("sqlalchemy.dialects")
448def _kw_reg_for_dialect(dialect_name):
449 dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
450 if dialect_cls.construct_arguments is None:
451 return None
452 return dict(dialect_cls.construct_arguments)
455class DialectKWArgs:
456 """Establish the ability for a class to have dialect-specific arguments
457 with defaults and constructor validation.
459 The :class:`.DialectKWArgs` interacts with the
460 :attr:`.DefaultDialect.construct_arguments` present on a dialect.
462 .. seealso::
464 :attr:`.DefaultDialect.construct_arguments`
466 """
468 __slots__ = ()
470 _dialect_kwargs_traverse_internals = [
471 ("dialect_options", InternalTraversal.dp_dialect_options)
472 ]
474 @classmethod
475 def argument_for(cls, dialect_name, argument_name, default):
476 """Add a new kind of dialect-specific keyword argument for this class.
478 E.g.::
480 Index.argument_for("mydialect", "length", None)
482 some_index = Index('a', 'b', mydialect_length=5)
484 The :meth:`.DialectKWArgs.argument_for` method is a per-argument
485 way adding extra arguments to the
486 :attr:`.DefaultDialect.construct_arguments` dictionary. This
487 dictionary provides a list of argument names accepted by various
488 schema-level constructs on behalf of a dialect.
490 New dialects should typically specify this dictionary all at once as a
491 data member of the dialect class. The use case for ad-hoc addition of
492 argument names is typically for end-user code that is also using
493 a custom compilation scheme which consumes the additional arguments.
495 :param dialect_name: name of a dialect. The dialect must be
496 locatable, else a :class:`.NoSuchModuleError` is raised. The
497 dialect must also include an existing
498 :attr:`.DefaultDialect.construct_arguments` collection, indicating
499 that it participates in the keyword-argument validation and default
500 system, else :class:`.ArgumentError` is raised. If the dialect does
501 not include this collection, then any keyword argument can be
502 specified on behalf of this dialect already. All dialects packaged
503 within SQLAlchemy include this collection, however for third party
504 dialects, support may vary.
506 :param argument_name: name of the parameter.
508 :param default: default value of the parameter.
510 """
512 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
513 if construct_arg_dictionary is None:
514 raise exc.ArgumentError(
515 "Dialect '%s' does have keyword-argument "
516 "validation and defaults enabled configured" % dialect_name
517 )
518 if cls not in construct_arg_dictionary:
519 construct_arg_dictionary[cls] = {}
520 construct_arg_dictionary[cls][argument_name] = default
522 @util.memoized_property
523 def dialect_kwargs(self):
524 """A collection of keyword arguments specified as dialect-specific
525 options to this construct.
527 The arguments are present here in their original ``<dialect>_<kwarg>``
528 format. Only arguments that were actually passed are included;
529 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which
530 contains all options known by this dialect including defaults.
532 The collection is also writable; keys are accepted of the
533 form ``<dialect>_<kwarg>`` where the value will be assembled
534 into the list of options.
536 .. seealso::
538 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form
540 """
541 return _DialectArgView(self)
543 @property
544 def kwargs(self):
545 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
546 return self.dialect_kwargs
548 _kw_registry = util.PopulateDict(_kw_reg_for_dialect)
550 def _kw_reg_for_dialect_cls(self, dialect_name):
551 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
552 d = _DialectArgDict()
554 if construct_arg_dictionary is None:
555 d._defaults.update({"*": None})
556 else:
557 for cls in reversed(self.__class__.__mro__):
558 if cls in construct_arg_dictionary:
559 d._defaults.update(construct_arg_dictionary[cls])
560 return d
562 @util.memoized_property
563 def dialect_options(self):
564 """A collection of keyword arguments specified as dialect-specific
565 options to this construct.
567 This is a two-level nested registry, keyed to ``<dialect_name>``
568 and ``<argument_name>``. For example, the ``postgresql_where``
569 argument would be locatable as::
571 arg = my_object.dialect_options['postgresql']['where']
573 .. versionadded:: 0.9.2
575 .. seealso::
577 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form
579 """
581 return util.PopulateDict(
582 util.portable_instancemethod(self._kw_reg_for_dialect_cls)
583 )
585 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None:
586 # validate remaining kwargs that they all specify DB prefixes
588 if not kwargs:
589 return
591 for k in kwargs:
592 m = re.match("^(.+?)_(.+)$", k)
593 if not m:
594 raise TypeError(
595 "Additional arguments should be "
596 "named <dialectname>_<argument>, got '%s'" % k
597 )
598 dialect_name, arg_name = m.group(1, 2)
600 try:
601 construct_arg_dictionary = self.dialect_options[dialect_name]
602 except exc.NoSuchModuleError:
603 util.warn(
604 "Can't validate argument %r; can't "
605 "locate any SQLAlchemy dialect named %r"
606 % (k, dialect_name)
607 )
608 self.dialect_options[dialect_name] = d = _DialectArgDict()
609 d._defaults.update({"*": None})
610 d._non_defaults[arg_name] = kwargs[k]
611 else:
612 if (
613 "*" not in construct_arg_dictionary
614 and arg_name not in construct_arg_dictionary
615 ):
616 raise exc.ArgumentError(
617 "Argument %r is not accepted by "
618 "dialect %r on behalf of %r"
619 % (k, dialect_name, self.__class__)
620 )
621 else:
622 construct_arg_dictionary[arg_name] = kwargs[k]
625class CompileState:
626 """Produces additional object state necessary for a statement to be
627 compiled.
629 the :class:`.CompileState` class is at the base of classes that assemble
630 state for a particular statement object that is then used by the
631 compiler. This process is essentially an extension of the process that
632 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis
633 on converting raw user intent into more organized structures rather than
634 producing string output. The top-level :class:`.CompileState` for the
635 statement being executed is also accessible when the execution context
636 works with invoking the statement and collecting results.
638 The production of :class:`.CompileState` is specific to the compiler, such
639 as within the :meth:`.SQLCompiler.visit_insert`,
640 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also
641 responsible for associating the :class:`.CompileState` with the
642 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement,
643 i.e. the outermost SQL statement that's actually being executed.
644 There can be other :class:`.CompileState` objects that are not the
645 toplevel, such as when a SELECT subquery or CTE-nested
646 INSERT/UPDATE/DELETE is generated.
648 .. versionadded:: 1.4
650 """
652 __slots__ = ("statement", "_ambiguous_table_name_map")
654 plugins: Dict[Tuple[str, str], Type[CompileState]] = {}
656 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap]
658 @classmethod
659 def create_for_statement(cls, statement, compiler, **kw):
660 # factory construction.
662 if statement._propagate_attrs:
663 plugin_name = statement._propagate_attrs.get(
664 "compile_state_plugin", "default"
665 )
666 klass = cls.plugins.get(
667 (plugin_name, statement._effective_plugin_target), None
668 )
669 if klass is None:
670 klass = cls.plugins[
671 ("default", statement._effective_plugin_target)
672 ]
674 else:
675 klass = cls.plugins[
676 ("default", statement._effective_plugin_target)
677 ]
679 if klass is cls:
680 return cls(statement, compiler, **kw)
681 else:
682 return klass.create_for_statement(statement, compiler, **kw)
684 def __init__(self, statement, compiler, **kw):
685 self.statement = statement
687 @classmethod
688 def get_plugin_class(
689 cls, statement: Executable
690 ) -> Optional[Type[CompileState]]:
691 plugin_name = statement._propagate_attrs.get(
692 "compile_state_plugin", None
693 )
695 if plugin_name:
696 key = (plugin_name, statement._effective_plugin_target)
697 if key in cls.plugins:
698 return cls.plugins[key]
700 # there's no case where we call upon get_plugin_class() and want
701 # to get None back, there should always be a default. return that
702 # if there was no plugin-specific class (e.g. Insert with "orm"
703 # plugin)
704 try:
705 return cls.plugins[("default", statement._effective_plugin_target)]
706 except KeyError:
707 return None
709 @classmethod
710 def _get_plugin_class_for_plugin(
711 cls, statement: Executable, plugin_name: str
712 ) -> Optional[Type[CompileState]]:
713 try:
714 return cls.plugins[
715 (plugin_name, statement._effective_plugin_target)
716 ]
717 except KeyError:
718 return None
720 @classmethod
721 def plugin_for(
722 cls, plugin_name: str, visit_name: str
723 ) -> Callable[[_Fn], _Fn]:
724 def decorate(cls_to_decorate):
725 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate
726 return cls_to_decorate
728 return decorate
731class Generative(HasMemoized):
732 """Provide a method-chaining pattern in conjunction with the
733 @_generative decorator."""
735 def _generate(self) -> Self:
736 skip = self._memoized_keys
737 cls = self.__class__
738 s = cls.__new__(cls)
739 if skip:
740 # ensure this iteration remains atomic
741 s.__dict__ = {
742 k: v for k, v in self.__dict__.copy().items() if k not in skip
743 }
744 else:
745 s.__dict__ = self.__dict__.copy()
746 return s
749class InPlaceGenerative(HasMemoized):
750 """Provide a method-chaining pattern in conjunction with the
751 @_generative decorator that mutates in place."""
753 __slots__ = ()
755 def _generate(self):
756 skip = self._memoized_keys
757 # note __dict__ needs to be in __slots__ if this is used
758 for k in skip:
759 self.__dict__.pop(k, None)
760 return self
763class HasCompileState(Generative):
764 """A class that has a :class:`.CompileState` associated with it."""
766 _compile_state_plugin: Optional[Type[CompileState]] = None
768 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT
770 _compile_state_factory = CompileState.create_for_statement
773class _MetaOptions(type):
774 """metaclass for the Options class.
776 This metaclass is actually necessary despite the availability of the
777 ``__init_subclass__()`` hook as this type also provides custom class-level
778 behavior for the ``__add__()`` method.
780 """
782 _cache_attrs: Tuple[str, ...]
784 def __add__(self, other):
785 o1 = self()
787 if set(other).difference(self._cache_attrs):
788 raise TypeError(
789 "dictionary contains attributes not covered by "
790 "Options class %s: %r"
791 % (self, set(other).difference(self._cache_attrs))
792 )
794 o1.__dict__.update(other)
795 return o1
797 if TYPE_CHECKING:
799 def __getattr__(self, key: str) -> Any: ...
801 def __setattr__(self, key: str, value: Any) -> None: ...
803 def __delattr__(self, key: str) -> None: ...
806class Options(metaclass=_MetaOptions):
807 """A cacheable option dictionary with defaults."""
809 __slots__ = ()
811 _cache_attrs: Tuple[str, ...]
813 def __init_subclass__(cls) -> None:
814 dict_ = cls.__dict__
815 cls._cache_attrs = tuple(
816 sorted(
817 d
818 for d in dict_
819 if not d.startswith("__")
820 and d not in ("_cache_key_traversal",)
821 )
822 )
823 super().__init_subclass__()
825 def __init__(self, **kw):
826 self.__dict__.update(kw)
828 def __add__(self, other):
829 o1 = self.__class__.__new__(self.__class__)
830 o1.__dict__.update(self.__dict__)
832 if set(other).difference(self._cache_attrs):
833 raise TypeError(
834 "dictionary contains attributes not covered by "
835 "Options class %s: %r"
836 % (self, set(other).difference(self._cache_attrs))
837 )
839 o1.__dict__.update(other)
840 return o1
842 def __eq__(self, other):
843 # TODO: very inefficient. This is used only in test suites
844 # right now.
845 for a, b in zip_longest(self._cache_attrs, other._cache_attrs):
846 if getattr(self, a) != getattr(other, b):
847 return False
848 return True
850 def __repr__(self):
851 # TODO: fairly inefficient, used only in debugging right now.
853 return "%s(%s)" % (
854 self.__class__.__name__,
855 ", ".join(
856 "%s=%r" % (k, self.__dict__[k])
857 for k in self._cache_attrs
858 if k in self.__dict__
859 ),
860 )
862 @classmethod
863 def isinstance(cls, klass: Type[Any]) -> bool:
864 return issubclass(cls, klass)
866 @hybridmethod
867 def add_to_element(self, name, value):
868 return self + {name: getattr(self, name) + value}
870 @hybridmethod
871 def _state_dict_inst(self) -> Mapping[str, Any]:
872 return self.__dict__
874 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT
876 @_state_dict_inst.classlevel
877 def _state_dict(cls) -> Mapping[str, Any]:
878 return cls._state_dict_const
880 @classmethod
881 def safe_merge(cls, other):
882 d = other._state_dict()
884 # only support a merge with another object of our class
885 # and which does not have attrs that we don't. otherwise
886 # we risk having state that might not be part of our cache
887 # key strategy
889 if (
890 cls is not other.__class__
891 and other._cache_attrs
892 and set(other._cache_attrs).difference(cls._cache_attrs)
893 ):
894 raise TypeError(
895 "other element %r is not empty, is not of type %s, "
896 "and contains attributes not covered here %r"
897 % (
898 other,
899 cls,
900 set(other._cache_attrs).difference(cls._cache_attrs),
901 )
902 )
903 return cls + d
905 @classmethod
906 def from_execution_options(
907 cls, key, attrs, exec_options, statement_exec_options
908 ):
909 """process Options argument in terms of execution options.
912 e.g.::
914 (
915 load_options,
916 execution_options,
917 ) = QueryContext.default_load_options.from_execution_options(
918 "_sa_orm_load_options",
919 {
920 "populate_existing",
921 "autoflush",
922 "yield_per"
923 },
924 execution_options,
925 statement._execution_options,
926 )
928 get back the Options and refresh "_sa_orm_load_options" in the
929 exec options dict w/ the Options as well
931 """
933 # common case is that no options we are looking for are
934 # in either dictionary, so cancel for that first
935 check_argnames = attrs.intersection(
936 set(exec_options).union(statement_exec_options)
937 )
939 existing_options = exec_options.get(key, cls)
941 if check_argnames:
942 result = {}
943 for argname in check_argnames:
944 local = "_" + argname
945 if argname in exec_options:
946 result[local] = exec_options[argname]
947 elif argname in statement_exec_options:
948 result[local] = statement_exec_options[argname]
950 new_options = existing_options + result
951 exec_options = util.immutabledict().merge_with(
952 exec_options, {key: new_options}
953 )
954 return new_options, exec_options
956 else:
957 return existing_options, exec_options
959 if TYPE_CHECKING:
961 def __getattr__(self, key: str) -> Any: ...
963 def __setattr__(self, key: str, value: Any) -> None: ...
965 def __delattr__(self, key: str) -> None: ...
968class CacheableOptions(Options, HasCacheKey):
969 __slots__ = ()
971 @hybridmethod
972 def _gen_cache_key_inst(self, anon_map, bindparams):
973 return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
975 @_gen_cache_key_inst.classlevel
976 def _gen_cache_key(cls, anon_map, bindparams):
977 return (cls, ())
979 @hybridmethod
980 def _generate_cache_key(self):
981 return HasCacheKey._generate_cache_key_for_object(self)
984class ExecutableOption(HasCopyInternals):
985 __slots__ = ()
987 _annotations = util.EMPTY_DICT
989 __visit_name__ = "executable_option"
991 _is_has_cache_key = False
993 _is_core = True
995 def _clone(self, **kw):
996 """Create a shallow copy of this ExecutableOption."""
997 c = self.__class__.__new__(self.__class__)
998 c.__dict__ = dict(self.__dict__) # type: ignore
999 return c
1002class Executable(roles.StatementRole):
1003 """Mark a :class:`_expression.ClauseElement` as supporting execution.
1005 :class:`.Executable` is a superclass for all "statement" types
1006 of objects, including :func:`select`, :func:`delete`, :func:`update`,
1007 :func:`insert`, :func:`text`.
1009 """
1011 supports_execution: bool = True
1012 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT
1013 _is_default_generator = False
1014 _with_options: Tuple[ExecutableOption, ...] = ()
1015 _with_context_options: Tuple[
1016 Tuple[Callable[[CompileState], None], Any], ...
1017 ] = ()
1018 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]]
1020 _executable_traverse_internals = [
1021 ("_with_options", InternalTraversal.dp_executable_options),
1022 (
1023 "_with_context_options",
1024 ExtendedInternalTraversal.dp_with_context_options,
1025 ),
1026 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
1027 ]
1029 is_select = False
1030 is_from_statement = False
1031 is_update = False
1032 is_insert = False
1033 is_text = False
1034 is_delete = False
1035 is_dml = False
1037 if TYPE_CHECKING:
1038 __visit_name__: str
1040 def _compile_w_cache(
1041 self,
1042 dialect: Dialect,
1043 *,
1044 compiled_cache: Optional[CompiledCacheType],
1045 column_keys: List[str],
1046 for_executemany: bool = False,
1047 schema_translate_map: Optional[SchemaTranslateMapType] = None,
1048 **kw: Any,
1049 ) -> Tuple[
1050 Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats
1051 ]: ...
1053 def _execute_on_connection(
1054 self,
1055 connection: Connection,
1056 distilled_params: _CoreMultiExecuteParams,
1057 execution_options: CoreExecuteOptionsParameter,
1058 ) -> CursorResult[Any]: ...
1060 def _execute_on_scalar(
1061 self,
1062 connection: Connection,
1063 distilled_params: _CoreMultiExecuteParams,
1064 execution_options: CoreExecuteOptionsParameter,
1065 ) -> Any: ...
1067 @util.ro_non_memoized_property
1068 def _all_selected_columns(self):
1069 raise NotImplementedError()
1071 @property
1072 def _effective_plugin_target(self) -> str:
1073 return self.__visit_name__
1075 @_generative
1076 def options(self, *options: ExecutableOption) -> Self:
1077 """Apply options to this statement.
1079 In the general sense, options are any kind of Python object
1080 that can be interpreted by the SQL compiler for the statement.
1081 These options can be consumed by specific dialects or specific kinds
1082 of compilers.
1084 The most commonly known kind of option are the ORM level options
1085 that apply "eager load" and other loading behaviors to an ORM
1086 query. However, options can theoretically be used for many other
1087 purposes.
1089 For background on specific kinds of options for specific kinds of
1090 statements, refer to the documentation for those option objects.
1092 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to
1093 Core statement objects towards the goal of allowing unified
1094 Core / ORM querying capabilities.
1096 .. seealso::
1098 :ref:`loading_columns` - refers to options specific to the usage
1099 of ORM queries
1101 :ref:`relationship_loader_options` - refers to options specific
1102 to the usage of ORM queries
1104 """
1105 self._with_options += tuple(
1106 coercions.expect(roles.ExecutableOptionRole, opt)
1107 for opt in options
1108 )
1109 return self
1111 @_generative
1112 def _set_compile_options(self, compile_options: CacheableOptions) -> Self:
1113 """Assign the compile options to a new value.
1115 :param compile_options: appropriate CacheableOptions structure
1117 """
1119 self._compile_options = compile_options
1120 return self
1122 @_generative
1123 def _update_compile_options(self, options: CacheableOptions) -> Self:
1124 """update the _compile_options with new keys."""
1126 assert self._compile_options is not None
1127 self._compile_options += options
1128 return self
1130 @_generative
1131 def _add_context_option(
1132 self,
1133 callable_: Callable[[CompileState], None],
1134 cache_args: Any,
1135 ) -> Self:
1136 """Add a context option to this statement.
1138 These are callable functions that will
1139 be given the CompileState object upon compilation.
1141 A second argument cache_args is required, which will be combined with
1142 the ``__code__`` identity of the function itself in order to produce a
1143 cache key.
1145 """
1146 self._with_context_options += ((callable_, cache_args),)
1147 return self
1149 @overload
1150 def execution_options(
1151 self,
1152 *,
1153 compiled_cache: Optional[CompiledCacheType] = ...,
1154 logging_token: str = ...,
1155 isolation_level: IsolationLevel = ...,
1156 no_parameters: bool = False,
1157 stream_results: bool = False,
1158 max_row_buffer: int = ...,
1159 yield_per: int = ...,
1160 driver_column_names: bool = ...,
1161 insertmanyvalues_page_size: int = ...,
1162 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
1163 populate_existing: bool = False,
1164 autoflush: bool = False,
1165 synchronize_session: SynchronizeSessionArgument = ...,
1166 dml_strategy: DMLStrategyArgument = ...,
1167 render_nulls: bool = ...,
1168 is_delete_using: bool = ...,
1169 is_update_from: bool = ...,
1170 preserve_rowcount: bool = False,
1171 **opt: Any,
1172 ) -> Self: ...
1174 @overload
1175 def execution_options(self, **opt: Any) -> Self: ...
1177 @_generative
1178 def execution_options(self, **kw: Any) -> Self:
1179 """Set non-SQL options for the statement which take effect during
1180 execution.
1182 Execution options can be set at many scopes, including per-statement,
1183 per-connection, or per execution, using methods such as
1184 :meth:`_engine.Connection.execution_options` and parameters which
1185 accept a dictionary of options such as
1186 :paramref:`_engine.Connection.execute.execution_options` and
1187 :paramref:`_orm.Session.execute.execution_options`.
1189 The primary characteristic of an execution option, as opposed to
1190 other kinds of options such as ORM loader options, is that
1191 **execution options never affect the compiled SQL of a query, only
1192 things that affect how the SQL statement itself is invoked or how
1193 results are fetched**. That is, execution options are not part of
1194 what's accommodated by SQL compilation nor are they considered part of
1195 the cached state of a statement.
1197 The :meth:`_sql.Executable.execution_options` method is
1198 :term:`generative`, as
1199 is the case for the method as applied to the :class:`_engine.Engine`
1200 and :class:`_orm.Query` objects, which means when the method is called,
1201 a copy of the object is returned, which applies the given parameters to
1202 that new copy, but leaves the original unchanged::
1204 statement = select(table.c.x, table.c.y)
1205 new_statement = statement.execution_options(my_option=True)
1207 An exception to this behavior is the :class:`_engine.Connection`
1208 object, where the :meth:`_engine.Connection.execution_options` method
1209 is explicitly **not** generative.
1211 The kinds of options that may be passed to
1212 :meth:`_sql.Executable.execution_options` and other related methods and
1213 parameter dictionaries include parameters that are explicitly consumed
1214 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not
1215 defined by SQLAlchemy, which means the methods and/or parameter
1216 dictionaries may be used for user-defined parameters that interact with
1217 custom code, which may access the parameters using methods such as
1218 :meth:`_sql.Executable.get_execution_options` and
1219 :meth:`_engine.Connection.get_execution_options`, or within selected
1220 event hooks using a dedicated ``execution_options`` event parameter
1221 such as
1222 :paramref:`_events.ConnectionEvents.before_execute.execution_options`
1223 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.::
1225 from sqlalchemy import event
1227 @event.listens_for(some_engine, "before_execute")
1228 def _process_opt(conn, statement, multiparams, params, execution_options):
1229 "run a SQL function before invoking a statement"
1231 if execution_options.get("do_special_thing", False):
1232 conn.exec_driver_sql("run_special_function()")
1234 Within the scope of options that are explicitly recognized by
1235 SQLAlchemy, most apply to specific classes of objects and not others.
1236 The most common execution options include:
1238 * :paramref:`_engine.Connection.execution_options.isolation_level` -
1239 sets the isolation level for a connection or a class of connections
1240 via an :class:`_engine.Engine`. This option is accepted only
1241 by :class:`_engine.Connection` or :class:`_engine.Engine`.
1243 * :paramref:`_engine.Connection.execution_options.stream_results` -
1244 indicates results should be fetched using a server side cursor;
1245 this option is accepted by :class:`_engine.Connection`, by the
1246 :paramref:`_engine.Connection.execute.execution_options` parameter
1247 on :meth:`_engine.Connection.execute`, and additionally by
1248 :meth:`_sql.Executable.execution_options` on a SQL statement object,
1249 as well as by ORM constructs like :meth:`_orm.Session.execute`.
1251 * :paramref:`_engine.Connection.execution_options.compiled_cache` -
1252 indicates a dictionary that will serve as the
1253 :ref:`SQL compilation cache <sql_caching>`
1254 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as
1255 well as for ORM methods like :meth:`_orm.Session.execute`.
1256 Can be passed as ``None`` to disable caching for statements.
1257 This option is not accepted by
1258 :meth:`_sql.Executable.execution_options` as it is inadvisable to
1259 carry along a compilation cache within a statement object.
1261 * :paramref:`_engine.Connection.execution_options.schema_translate_map`
1262 - a mapping of schema names used by the
1263 :ref:`Schema Translate Map <schema_translating>` feature, accepted
1264 by :class:`_engine.Connection`, :class:`_engine.Engine`,
1265 :class:`_sql.Executable`, as well as by ORM constructs
1266 like :meth:`_orm.Session.execute`.
1268 .. seealso::
1270 :meth:`_engine.Connection.execution_options`
1272 :paramref:`_engine.Connection.execute.execution_options`
1274 :paramref:`_orm.Session.execute.execution_options`
1276 :ref:`orm_queryguide_execution_options` - documentation on all
1277 ORM-specific execution options
1279 """ # noqa: E501
1280 if "isolation_level" in kw:
1281 raise exc.ArgumentError(
1282 "'isolation_level' execution option may only be specified "
1283 "on Connection.execution_options(), or "
1284 "per-engine using the isolation_level "
1285 "argument to create_engine()."
1286 )
1287 if "compiled_cache" in kw:
1288 raise exc.ArgumentError(
1289 "'compiled_cache' execution option may only be specified "
1290 "on Connection.execution_options(), not per statement."
1291 )
1292 self._execution_options = self._execution_options.union(kw)
1293 return self
1295 def get_execution_options(self) -> _ExecuteOptions:
1296 """Get the non-SQL options which will take effect during execution.
1298 .. versionadded:: 1.3
1300 .. seealso::
1302 :meth:`.Executable.execution_options`
1303 """
1304 return self._execution_options
1307class SchemaEventTarget(event.EventTarget):
1308 """Base class for elements that are the targets of :class:`.DDLEvents`
1309 events.
1311 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
1313 """
1315 dispatch: dispatcher[SchemaEventTarget]
1317 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
1318 """Associate with this SchemaEvent's parent object."""
1320 def _set_parent_with_dispatch(
1321 self, parent: SchemaEventTarget, **kw: Any
1322 ) -> None:
1323 self.dispatch.before_parent_attach(self, parent)
1324 self._set_parent(parent, **kw)
1325 self.dispatch.after_parent_attach(self, parent)
1328class SchemaVisitor(ClauseVisitor):
1329 """Define the visiting for ``SchemaItem`` objects."""
1331 __traverse_options__ = {"schema_visitor": True}
1334class _SentinelDefaultCharacterization(Enum):
1335 NONE = "none"
1336 UNKNOWN = "unknown"
1337 CLIENTSIDE = "clientside"
1338 SENTINEL_DEFAULT = "sentinel_default"
1339 SERVERSIDE = "serverside"
1340 IDENTITY = "identity"
1341 SEQUENCE = "sequence"
1344class _SentinelColumnCharacterization(NamedTuple):
1345 columns: Optional[Sequence[Column[Any]]] = None
1346 is_explicit: bool = False
1347 is_autoinc: bool = False
1348 default_characterization: _SentinelDefaultCharacterization = (
1349 _SentinelDefaultCharacterization.NONE
1350 )
1353_COLKEY = TypeVar("_COLKEY", Union[None, str], str)
1355_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True)
1356_COL = TypeVar("_COL", bound="ColumnElement[Any]")
1359class _ColumnMetrics(Generic[_COL_co]):
1360 __slots__ = ("column",)
1362 column: _COL_co
1364 def __init__(
1365 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co
1366 ):
1367 self.column = col
1369 # proxy_index being non-empty means it was initialized.
1370 # so we need to update it
1371 pi = collection._proxy_index
1372 if pi:
1373 for eps_col in col._expanded_proxy_set:
1374 pi[eps_col].add(self)
1376 def get_expanded_proxy_set(self):
1377 return self.column._expanded_proxy_set
1379 def dispose(self, collection):
1380 pi = collection._proxy_index
1381 if not pi:
1382 return
1383 for col in self.column._expanded_proxy_set:
1384 colset = pi.get(col, None)
1385 if colset:
1386 colset.discard(self)
1387 if colset is not None and not colset:
1388 del pi[col]
1390 def embedded(
1391 self,
1392 target_set: Union[
1393 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]]
1394 ],
1395 ) -> bool:
1396 expanded_proxy_set = self.column._expanded_proxy_set
1397 for t in target_set.difference(expanded_proxy_set):
1398 if not expanded_proxy_set.intersection(_expand_cloned([t])):
1399 return False
1400 return True
1403class ColumnCollection(Generic[_COLKEY, _COL_co]):
1404 """Collection of :class:`_expression.ColumnElement` instances,
1405 typically for
1406 :class:`_sql.FromClause` objects.
1408 The :class:`_sql.ColumnCollection` object is most commonly available
1409 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection
1410 on the :class:`_schema.Table` object, introduced at
1411 :ref:`metadata_tables_and_columns`.
1413 The :class:`_expression.ColumnCollection` has both mapping- and sequence-
1414 like behaviors. A :class:`_expression.ColumnCollection` usually stores
1415 :class:`_schema.Column` objects, which are then accessible both via mapping
1416 style access as well as attribute access style.
1418 To access :class:`_schema.Column` objects using ordinary attribute-style
1419 access, specify the name like any other object attribute, such as below
1420 a column named ``employee_name`` is accessed::
1422 >>> employee_table.c.employee_name
1424 To access columns that have names with special characters or spaces,
1425 index-style access is used, such as below which illustrates a column named
1426 ``employee ' payment`` is accessed::
1428 >>> employee_table.c["employee ' payment"]
1430 As the :class:`_sql.ColumnCollection` object provides a Python dictionary
1431 interface, common dictionary method names like
1432 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`,
1433 and :meth:`_sql.ColumnCollection.items` are available, which means that
1434 database columns that are keyed under these names also need to use indexed
1435 access::
1437 >>> employee_table.c["values"]
1440 The name for which a :class:`_schema.Column` would be present is normally
1441 that of the :paramref:`_schema.Column.key` parameter. In some contexts,
1442 such as a :class:`_sql.Select` object that uses a label style set
1443 using the :meth:`_sql.Select.set_label_style` method, a column of a certain
1444 key may instead be represented under a particular label name such
1445 as ``tablename_columnname``::
1447 >>> from sqlalchemy import select, column, table
1448 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
1449 >>> t = table("t", column("c"))
1450 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
1451 >>> subq = stmt.subquery()
1452 >>> subq.c.t_c
1453 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c>
1455 :class:`.ColumnCollection` also indexes the columns in order and allows
1456 them to be accessible by their integer position::
1458 >>> cc[0]
1459 Column('x', Integer(), table=None)
1460 >>> cc[1]
1461 Column('y', Integer(), table=None)
1463 .. versionadded:: 1.4 :class:`_expression.ColumnCollection`
1464 allows integer-based
1465 index access to the collection.
1467 Iterating the collection yields the column expressions in order::
1469 >>> list(cc)
1470 [Column('x', Integer(), table=None),
1471 Column('y', Integer(), table=None)]
1473 The base :class:`_expression.ColumnCollection` object can store
1474 duplicates, which can
1475 mean either two columns with the same key, in which case the column
1476 returned by key access is **arbitrary**::
1478 >>> x1, x2 = Column('x', Integer), Column('x', Integer)
1479 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)])
1480 >>> list(cc)
1481 [Column('x', Integer(), table=None),
1482 Column('x', Integer(), table=None)]
1483 >>> cc['x'] is x1
1484 False
1485 >>> cc['x'] is x2
1486 True
1488 Or it can also mean the same column multiple times. These cases are
1489 supported as :class:`_expression.ColumnCollection`
1490 is used to represent the columns in
1491 a SELECT statement which may include duplicates.
1493 A special subclass :class:`.DedupeColumnCollection` exists which instead
1494 maintains SQLAlchemy's older behavior of not allowing duplicates; this
1495 collection is used for schema level objects like :class:`_schema.Table`
1496 and
1497 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The
1498 :class:`.DedupeColumnCollection` class also has additional mutation methods
1499 as the schema constructs have more use cases that require removal and
1500 replacement of columns.
1502 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection`
1503 now stores duplicate
1504 column keys as well as the same column in multiple positions. The
1505 :class:`.DedupeColumnCollection` class is added to maintain the
1506 former behavior in those cases where deduplication as well as
1507 additional replace/remove operations are needed.
1510 """
1512 __slots__ = "_collection", "_index", "_colset", "_proxy_index"
1514 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]]
1515 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
1516 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]]
1517 _colset: Set[_COL_co]
1519 def __init__(
1520 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None
1521 ):
1522 object.__setattr__(self, "_colset", set())
1523 object.__setattr__(self, "_index", {})
1524 object.__setattr__(
1525 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1526 )
1527 object.__setattr__(self, "_collection", [])
1528 if columns:
1529 self._initial_populate(columns)
1531 @util.preload_module("sqlalchemy.sql.elements")
1532 def __clause_element__(self) -> ClauseList:
1533 elements = util.preloaded.sql_elements
1535 return elements.ClauseList(
1536 _literal_as_text_role=roles.ColumnsClauseRole,
1537 group=False,
1538 *self._all_columns,
1539 )
1541 def _initial_populate(
1542 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1543 ) -> None:
1544 self._populate_separate_keys(iter_)
1546 @property
1547 def _all_columns(self) -> List[_COL_co]:
1548 return [col for (_, col, _) in self._collection]
1550 def keys(self) -> List[_COLKEY]:
1551 """Return a sequence of string key names for all columns in this
1552 collection."""
1553 return [k for (k, _, _) in self._collection]
1555 def values(self) -> List[_COL_co]:
1556 """Return a sequence of :class:`_sql.ColumnClause` or
1557 :class:`_schema.Column` objects for all columns in this
1558 collection."""
1559 return [col for (_, col, _) in self._collection]
1561 def items(self) -> List[Tuple[_COLKEY, _COL_co]]:
1562 """Return a sequence of (key, column) tuples for all columns in this
1563 collection each consisting of a string key name and a
1564 :class:`_sql.ColumnClause` or
1565 :class:`_schema.Column` object.
1566 """
1568 return [(k, col) for (k, col, _) in self._collection]
1570 def __bool__(self) -> bool:
1571 return bool(self._collection)
1573 def __len__(self) -> int:
1574 return len(self._collection)
1576 def __iter__(self) -> Iterator[_COL_co]:
1577 # turn to a list first to maintain over a course of changes
1578 return iter([col for _, col, _ in self._collection])
1580 @overload
1581 def __getitem__(self, key: Union[str, int]) -> _COL_co: ...
1583 @overload
1584 def __getitem__(
1585 self, key: Tuple[Union[str, int], ...]
1586 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1588 @overload
1589 def __getitem__(
1590 self, key: slice
1591 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1593 def __getitem__(
1594 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]]
1595 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]:
1596 try:
1597 if isinstance(key, (tuple, slice)):
1598 if isinstance(key, slice):
1599 cols = (
1600 (sub_key, col)
1601 for (sub_key, col, _) in self._collection[key]
1602 )
1603 else:
1604 cols = (self._index[sub_key] for sub_key in key)
1606 return ColumnCollection(cols).as_readonly()
1607 else:
1608 return self._index[key][1]
1609 except KeyError as err:
1610 if isinstance(err.args[0], int):
1611 raise IndexError(err.args[0]) from err
1612 else:
1613 raise
1615 def __getattr__(self, key: str) -> _COL_co:
1616 try:
1617 return self._index[key][1]
1618 except KeyError as err:
1619 raise AttributeError(key) from err
1621 def __contains__(self, key: str) -> bool:
1622 if key not in self._index:
1623 if not isinstance(key, str):
1624 raise exc.ArgumentError(
1625 "__contains__ requires a string argument"
1626 )
1627 return False
1628 else:
1629 return True
1631 def compare(self, other: ColumnCollection[Any, Any]) -> bool:
1632 """Compare this :class:`_expression.ColumnCollection` to another
1633 based on the names of the keys"""
1635 for l, r in zip_longest(self, other):
1636 if l is not r:
1637 return False
1638 else:
1639 return True
1641 def __eq__(self, other: Any) -> bool:
1642 return self.compare(other)
1644 @overload
1645 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ...
1647 @overload
1648 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ...
1650 def get(
1651 self, key: str, default: Optional[_COL] = None
1652 ) -> Optional[Union[_COL_co, _COL]]:
1653 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object
1654 based on a string key name from this
1655 :class:`_expression.ColumnCollection`."""
1657 if key in self._index:
1658 return self._index[key][1]
1659 else:
1660 return default
1662 def __str__(self) -> str:
1663 return "%s(%s)" % (
1664 self.__class__.__name__,
1665 ", ".join(str(c) for c in self),
1666 )
1668 def __setitem__(self, key: str, value: Any) -> NoReturn:
1669 raise NotImplementedError()
1671 def __delitem__(self, key: str) -> NoReturn:
1672 raise NotImplementedError()
1674 def __setattr__(self, key: str, obj: Any) -> NoReturn:
1675 raise NotImplementedError()
1677 def clear(self) -> NoReturn:
1678 """Dictionary clear() is not implemented for
1679 :class:`_sql.ColumnCollection`."""
1680 raise NotImplementedError()
1682 def remove(self, column: Any) -> None:
1683 raise NotImplementedError()
1685 def update(self, iter_: Any) -> NoReturn:
1686 """Dictionary update() is not implemented for
1687 :class:`_sql.ColumnCollection`."""
1688 raise NotImplementedError()
1690 # https://github.com/python/mypy/issues/4266
1691 __hash__ = None # type: ignore
1693 def _populate_separate_keys(
1694 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1695 ) -> None:
1696 """populate from an iterator of (key, column)"""
1698 self._collection[:] = collection = [
1699 (k, c, _ColumnMetrics(self, c)) for k, c in iter_
1700 ]
1701 self._colset.update(c._deannotate() for _, c, _ in collection)
1702 self._index.update(
1703 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)}
1704 )
1705 self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
1707 def add(
1708 self, column: ColumnElement[Any], key: Optional[_COLKEY] = None
1709 ) -> None:
1710 """Add a column to this :class:`_sql.ColumnCollection`.
1712 .. note::
1714 This method is **not normally used by user-facing code**, as the
1715 :class:`_sql.ColumnCollection` is usually part of an existing
1716 object such as a :class:`_schema.Table`. To add a
1717 :class:`_schema.Column` to an existing :class:`_schema.Table`
1718 object, use the :meth:`_schema.Table.append_column` method.
1720 """
1721 colkey: _COLKEY
1723 if key is None:
1724 colkey = column.key # type: ignore
1725 else:
1726 colkey = key
1728 l = len(self._collection)
1730 # don't really know how this part is supposed to work w/ the
1731 # covariant thing
1733 _column = cast(_COL_co, column)
1735 self._collection.append(
1736 (colkey, _column, _ColumnMetrics(self, _column))
1737 )
1738 self._colset.add(_column._deannotate())
1739 self._index[l] = (colkey, _column)
1740 if colkey not in self._index:
1741 self._index[colkey] = (colkey, _column)
1743 def __getstate__(self) -> Dict[str, Any]:
1744 return {
1745 "_collection": [(k, c) for k, c, _ in self._collection],
1746 "_index": self._index,
1747 }
1749 def __setstate__(self, state: Dict[str, Any]) -> None:
1750 object.__setattr__(self, "_index", state["_index"])
1751 object.__setattr__(
1752 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1753 )
1754 object.__setattr__(
1755 self,
1756 "_collection",
1757 [
1758 (k, c, _ColumnMetrics(self, c))
1759 for (k, c) in state["_collection"]
1760 ],
1761 )
1762 object.__setattr__(
1763 self, "_colset", {col for k, col, _ in self._collection}
1764 )
1766 def contains_column(self, col: ColumnElement[Any]) -> bool:
1767 """Checks if a column object exists in this collection"""
1768 if col not in self._colset:
1769 if isinstance(col, str):
1770 raise exc.ArgumentError(
1771 "contains_column cannot be used with string arguments. "
1772 "Use ``col_name in table.c`` instead."
1773 )
1774 return False
1775 else:
1776 return True
1778 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
1779 """Return a "read only" form of this
1780 :class:`_sql.ColumnCollection`."""
1782 return ReadOnlyColumnCollection(self)
1784 def _init_proxy_index(self):
1785 """populate the "proxy index", if empty.
1787 proxy index is added in 2.0 to provide more efficient operation
1788 for the corresponding_column() method.
1790 For reasons of both time to construct new .c collections as well as
1791 memory conservation for large numbers of large .c collections, the
1792 proxy_index is only filled if corresponding_column() is called. once
1793 filled it stays that way, and new _ColumnMetrics objects created after
1794 that point will populate it with new data. Note this case would be
1795 unusual, if not nonexistent, as it means a .c collection is being
1796 mutated after corresponding_column() were used, however it is tested in
1797 test/base/test_utils.py.
1799 """
1800 pi = self._proxy_index
1801 if pi:
1802 return
1804 for _, _, metrics in self._collection:
1805 eps = metrics.column._expanded_proxy_set
1807 for eps_col in eps:
1808 pi[eps_col].add(metrics)
1810 def corresponding_column(
1811 self, column: _COL, require_embedded: bool = False
1812 ) -> Optional[Union[_COL, _COL_co]]:
1813 """Given a :class:`_expression.ColumnElement`, return the exported
1814 :class:`_expression.ColumnElement` object from this
1815 :class:`_expression.ColumnCollection`
1816 which corresponds to that original :class:`_expression.ColumnElement`
1817 via a common
1818 ancestor column.
1820 :param column: the target :class:`_expression.ColumnElement`
1821 to be matched.
1823 :param require_embedded: only return corresponding columns for
1824 the given :class:`_expression.ColumnElement`, if the given
1825 :class:`_expression.ColumnElement`
1826 is actually present within a sub-element
1827 of this :class:`_expression.Selectable`.
1828 Normally the column will match if
1829 it merely shares a common ancestor with one of the exported
1830 columns of this :class:`_expression.Selectable`.
1832 .. seealso::
1834 :meth:`_expression.Selectable.corresponding_column`
1835 - invokes this method
1836 against the collection returned by
1837 :attr:`_expression.Selectable.exported_columns`.
1839 .. versionchanged:: 1.4 the implementation for ``corresponding_column``
1840 was moved onto the :class:`_expression.ColumnCollection` itself.
1842 """
1843 # TODO: cython candidate
1845 # don't dig around if the column is locally present
1846 if column in self._colset:
1847 return column
1849 selected_intersection, selected_metrics = None, None
1850 target_set = column.proxy_set
1852 pi = self._proxy_index
1853 if not pi:
1854 self._init_proxy_index()
1856 for current_metrics in (
1857 mm for ts in target_set if ts in pi for mm in pi[ts]
1858 ):
1859 if not require_embedded or current_metrics.embedded(target_set):
1860 if selected_metrics is None:
1861 # no corresponding column yet, pick this one.
1862 selected_metrics = current_metrics
1863 continue
1865 current_intersection = target_set.intersection(
1866 current_metrics.column._expanded_proxy_set
1867 )
1868 if selected_intersection is None:
1869 selected_intersection = target_set.intersection(
1870 selected_metrics.column._expanded_proxy_set
1871 )
1873 if len(current_intersection) > len(selected_intersection):
1874 # 'current' has a larger field of correspondence than
1875 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x
1876 # matches a1.c.x->table.c.x better than
1877 # selectable.c.x->table.c.x does.
1879 selected_metrics = current_metrics
1880 selected_intersection = current_intersection
1881 elif current_intersection == selected_intersection:
1882 # they have the same field of correspondence. see
1883 # which proxy_set has fewer columns in it, which
1884 # indicates a closer relationship with the root
1885 # column. Also take into account the "weight"
1886 # attribute which CompoundSelect() uses to give
1887 # higher precedence to columns based on vertical
1888 # position in the compound statement, and discard
1889 # columns that have no reference to the target
1890 # column (also occurs with CompoundSelect)
1892 selected_col_distance = sum(
1893 [
1894 sc._annotations.get("weight", 1)
1895 for sc in (
1896 selected_metrics.column._uncached_proxy_list()
1897 )
1898 if sc.shares_lineage(column)
1899 ],
1900 )
1901 current_col_distance = sum(
1902 [
1903 sc._annotations.get("weight", 1)
1904 for sc in (
1905 current_metrics.column._uncached_proxy_list()
1906 )
1907 if sc.shares_lineage(column)
1908 ],
1909 )
1910 if current_col_distance < selected_col_distance:
1911 selected_metrics = current_metrics
1912 selected_intersection = current_intersection
1914 return selected_metrics.column if selected_metrics else None
1917_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]")
1920class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
1921 """A :class:`_expression.ColumnCollection`
1922 that maintains deduplicating behavior.
1924 This is useful by schema level objects such as :class:`_schema.Table` and
1925 :class:`.PrimaryKeyConstraint`. The collection includes more
1926 sophisticated mutator methods as well to suit schema objects which
1927 require mutable column collections.
1929 .. versionadded:: 1.4
1931 """
1933 def add( # type: ignore[override]
1934 self, column: _NAMEDCOL, key: Optional[str] = None
1935 ) -> None:
1936 if key is not None and column.key != key:
1937 raise exc.ArgumentError(
1938 "DedupeColumnCollection requires columns be under "
1939 "the same key as their .key"
1940 )
1941 key = column.key
1943 if key is None:
1944 raise exc.ArgumentError(
1945 "Can't add unnamed column to column collection"
1946 )
1948 if key in self._index:
1949 existing = self._index[key][1]
1951 if existing is column:
1952 return
1954 self.replace(column)
1956 # pop out memoized proxy_set as this
1957 # operation may very well be occurring
1958 # in a _make_proxy operation
1959 util.memoized_property.reset(column, "proxy_set")
1960 else:
1961 self._append_new_column(key, column)
1963 def _append_new_column(self, key: str, named_column: _NAMEDCOL) -> None:
1964 l = len(self._collection)
1965 self._collection.append(
1966 (key, named_column, _ColumnMetrics(self, named_column))
1967 )
1968 self._colset.add(named_column._deannotate())
1969 self._index[l] = (key, named_column)
1970 self._index[key] = (key, named_column)
1972 def _populate_separate_keys(
1973 self, iter_: Iterable[Tuple[str, _NAMEDCOL]]
1974 ) -> None:
1975 """populate from an iterator of (key, column)"""
1976 cols = list(iter_)
1978 replace_col = []
1979 for k, col in cols:
1980 if col.key != k:
1981 raise exc.ArgumentError(
1982 "DedupeColumnCollection requires columns be under "
1983 "the same key as their .key"
1984 )
1985 if col.name in self._index and col.key != col.name:
1986 replace_col.append(col)
1987 elif col.key in self._index:
1988 replace_col.append(col)
1989 else:
1990 self._index[k] = (k, col)
1991 self._collection.append((k, col, _ColumnMetrics(self, col)))
1992 self._colset.update(c._deannotate() for (k, c, _) in self._collection)
1994 self._index.update(
1995 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection)
1996 )
1997 for col in replace_col:
1998 self.replace(col)
2000 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None:
2001 self._populate_separate_keys((col.key, col) for col in iter_)
2003 def remove(self, column: _NAMEDCOL) -> None:
2004 if column not in self._colset:
2005 raise ValueError(
2006 "Can't remove column %r; column is not in this collection"
2007 % column
2008 )
2009 del self._index[column.key]
2010 self._colset.remove(column)
2011 self._collection[:] = [
2012 (k, c, metrics)
2013 for (k, c, metrics) in self._collection
2014 if c is not column
2015 ]
2016 for metrics in self._proxy_index.get(column, ()):
2017 metrics.dispose(self)
2019 self._index.update(
2020 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2021 )
2022 # delete higher index
2023 del self._index[len(self._collection)]
2025 def replace(
2026 self,
2027 column: _NAMEDCOL,
2028 extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
2029 ) -> None:
2030 """add the given column to this collection, removing unaliased
2031 versions of this column as well as existing columns with the
2032 same key.
2034 e.g.::
2036 t = Table('sometable', metadata, Column('col1', Integer))
2037 t.columns.replace(Column('col1', Integer, key='columnone'))
2039 will remove the original 'col1' from the collection, and add
2040 the new column under the name 'columnname'.
2042 Used by schema.Column to override columns during table reflection.
2044 """
2046 if extra_remove:
2047 remove_col = set(extra_remove)
2048 else:
2049 remove_col = set()
2050 # remove up to two columns based on matches of name as well as key
2051 if column.name in self._index and column.key != column.name:
2052 other = self._index[column.name][1]
2053 if other.name == other.key:
2054 remove_col.add(other)
2056 if column.key in self._index:
2057 remove_col.add(self._index[column.key][1])
2059 if not remove_col:
2060 self._append_new_column(column.key, column)
2061 return
2062 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
2063 replaced = False
2064 for k, col, metrics in self._collection:
2065 if col in remove_col:
2066 if not replaced:
2067 replaced = True
2068 new_cols.append(
2069 (column.key, column, _ColumnMetrics(self, column))
2070 )
2071 else:
2072 new_cols.append((k, col, metrics))
2074 if remove_col:
2075 self._colset.difference_update(remove_col)
2077 for rc in remove_col:
2078 for metrics in self._proxy_index.get(rc, ()):
2079 metrics.dispose(self)
2081 if not replaced:
2082 new_cols.append((column.key, column, _ColumnMetrics(self, column)))
2084 self._colset.add(column._deannotate())
2085 self._collection[:] = new_cols
2087 self._index.clear()
2089 self._index.update(
2090 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2091 )
2092 self._index.update({k: (k, col) for (k, col, _) in self._collection})
2095class ReadOnlyColumnCollection(
2096 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co]
2097):
2098 __slots__ = ("_parent",)
2100 def __init__(self, collection):
2101 object.__setattr__(self, "_parent", collection)
2102 object.__setattr__(self, "_colset", collection._colset)
2103 object.__setattr__(self, "_index", collection._index)
2104 object.__setattr__(self, "_collection", collection._collection)
2105 object.__setattr__(self, "_proxy_index", collection._proxy_index)
2107 def __getstate__(self):
2108 return {"_parent": self._parent}
2110 def __setstate__(self, state):
2111 parent = state["_parent"]
2112 self.__init__(parent) # type: ignore
2114 def add(self, column: Any, key: Any = ...) -> Any:
2115 self._readonly()
2117 def extend(self, elements: Any) -> NoReturn:
2118 self._readonly()
2120 def remove(self, item: Any) -> NoReturn:
2121 self._readonly()
2124class ColumnSet(util.OrderedSet["ColumnClause[Any]"]):
2125 def contains_column(self, col):
2126 return col in self
2128 def extend(self, cols):
2129 for col in cols:
2130 self.add(col)
2132 def __eq__(self, other):
2133 l = []
2134 for c in other:
2135 for local in self:
2136 if c.shares_lineage(local):
2137 l.append(c == local)
2138 return elements.and_(*l)
2140 def __hash__(self): # type: ignore[override]
2141 return hash(tuple(x for x in self))
2144def _entity_namespace(
2145 entity: Union[_HasEntityNamespace, ExternallyTraversible]
2146) -> _EntityNamespace:
2147 """Return the nearest .entity_namespace for the given entity.
2149 If not immediately available, does an iterate to find a sub-element
2150 that has one, if any.
2152 """
2153 try:
2154 return cast(_HasEntityNamespace, entity).entity_namespace
2155 except AttributeError:
2156 for elem in visitors.iterate(cast(ExternallyTraversible, entity)):
2157 if _is_has_entity_namespace(elem):
2158 return elem.entity_namespace
2159 else:
2160 raise
2163def _entity_namespace_key(
2164 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2165 key: str,
2166 default: Union[SQLCoreOperations[Any], _NoArg] = NO_ARG,
2167) -> SQLCoreOperations[Any]:
2168 """Return an entry from an entity_namespace.
2171 Raises :class:`_exc.InvalidRequestError` rather than attribute error
2172 on not found.
2174 """
2176 try:
2177 ns = _entity_namespace(entity)
2178 if default is not NO_ARG:
2179 return getattr(ns, key, default)
2180 else:
2181 return getattr(ns, key) # type: ignore
2182 except AttributeError as err:
2183 raise exc.InvalidRequestError(
2184 'Entity namespace for "%s" has no property "%s"' % (entity, key)
2185 ) from err