Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/base.py: 49%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# sql/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
9"""Foundational utilities common to many sql modules."""
12from __future__ import annotations
14import collections
15from enum import Enum
16import itertools
17from itertools import zip_longest
18import operator
19import re
20from typing import Any
21from typing import Callable
22from typing import cast
23from typing import Dict
24from typing import Final
25from typing import FrozenSet
26from typing import Generator
27from typing import Generic
28from typing import Iterable
29from typing import Iterator
30from typing import List
31from typing import Mapping
32from typing import MutableMapping
33from typing import NamedTuple
34from typing import NoReturn
35from typing import Optional
36from typing import overload
37from typing import Protocol
38from typing import Sequence
39from typing import Set
40from typing import Tuple
41from typing import Type
42from typing import TYPE_CHECKING
43from typing import TypeGuard
44from typing import TypeVar
45from typing import Union
47from . import roles
48from . import visitors
49from .cache_key import HasCacheKey # noqa
50from .cache_key import MemoizedHasCacheKey # noqa
51from .traversals import HasCopyInternals # noqa
52from .visitors import ClauseVisitor
53from .visitors import ExtendedInternalTraversal
54from .visitors import ExternallyTraversible
55from .visitors import InternalTraversal
56from .. import event
57from .. import exc
58from .. import util
59from ..util import HasMemoized as HasMemoized
60from ..util import hybridmethod
61from ..util.typing import Self
62from ..util.typing import TypeVarTuple
63from ..util.typing import Unpack
65if TYPE_CHECKING:
66 from . import coercions
67 from . import elements
68 from . import type_api
69 from ._orm_types import DMLStrategyArgument
70 from ._orm_types import SynchronizeSessionArgument
71 from ._typing import _CLE
72 from .cache_key import CacheKey
73 from .compiler import SQLCompiler
74 from .dml import Delete
75 from .dml import Insert
76 from .dml import Update
77 from .elements import BindParameter
78 from .elements import ClauseElement
79 from .elements import ClauseList
80 from .elements import ColumnClause # noqa
81 from .elements import ColumnElement
82 from .elements import NamedColumn
83 from .elements import SQLCoreOperations
84 from .elements import TextClause
85 from .schema import Column
86 from .schema import DefaultGenerator
87 from .selectable import _JoinTargetElement
88 from .selectable import _SelectIterable
89 from .selectable import FromClause
90 from .selectable import Select
91 from .visitors import anon_map
92 from ..engine import Connection
93 from ..engine import CursorResult
94 from ..engine.interfaces import _CoreMultiExecuteParams
95 from ..engine.interfaces import _ExecuteOptions
96 from ..engine.interfaces import _ImmutableExecuteOptions
97 from ..engine.interfaces import CacheStats
98 from ..engine.interfaces import Compiled
99 from ..engine.interfaces import CompiledCacheType
100 from ..engine.interfaces import CoreExecuteOptionsParameter
101 from ..engine.interfaces import Dialect
102 from ..engine.interfaces import IsolationLevel
103 from ..engine.interfaces import SchemaTranslateMapType
104 from ..event import dispatcher
106if not TYPE_CHECKING:
107 coercions = None # noqa
108 elements = None # noqa
109 type_api = None # noqa
112_Ts = TypeVarTuple("_Ts")
115class _NoArg(Enum):
116 NO_ARG = 0
118 def __repr__(self):
119 return f"_NoArg.{self.name}"
122NO_ARG: Final = _NoArg.NO_ARG
125class _NoneName(Enum):
126 NONE_NAME = 0
127 """indicate a 'deferred' name that was ultimately the value None."""
130_NONE_NAME: Final = _NoneName.NONE_NAME
132_T = TypeVar("_T", bound=Any)
134_Fn = TypeVar("_Fn", bound=Callable[..., Any])
136_AmbiguousTableNameMap = MutableMapping[str, str]
139class _DefaultDescriptionTuple(NamedTuple):
140 arg: Any
141 is_scalar: Optional[bool]
142 is_callable: Optional[bool]
143 is_sentinel: Optional[bool]
145 @classmethod
146 def _from_column_default(
147 cls, default: Optional[DefaultGenerator]
148 ) -> _DefaultDescriptionTuple:
149 return (
150 _DefaultDescriptionTuple(
151 default.arg, # type: ignore
152 default.is_scalar,
153 default.is_callable,
154 default.is_sentinel,
155 )
156 if default
157 and (
158 default.has_arg
159 or (not default.for_update and default.is_sentinel)
160 )
161 else _DefaultDescriptionTuple(None, None, None, None)
162 )
165_never_select_column: operator.attrgetter[Any] = operator.attrgetter(
166 "_omit_from_statements"
167)
170class _EntityNamespace(Protocol):
171 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ...
174class _HasEntityNamespace(Protocol):
175 @util.ro_non_memoized_property
176 def entity_namespace(self) -> _EntityNamespace: ...
179def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]:
180 return hasattr(element, "entity_namespace")
183# Remove when https://github.com/python/mypy/issues/14640 will be fixed
184_Self = TypeVar("_Self", bound=Any)
187class Immutable:
188 """mark a ClauseElement as 'immutable' when expressions are cloned.
190 "immutable" objects refers to the "mutability" of an object in the
191 context of SQL DQL and DML generation. Such as, in DQL, one can
192 compose a SELECT or subquery of varied forms, but one cannot modify
193 the structure of a specific table or column within DQL.
194 :class:`.Immutable` is mostly intended to follow this concept, and as
195 such the primary "immutable" objects are :class:`.ColumnClause`,
196 :class:`.Column`, :class:`.TableClause`, :class:`.Table`.
198 """
200 __slots__ = ()
202 _is_immutable: bool = True
204 def unique_params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
205 raise NotImplementedError("Immutable objects do not support copying")
207 def params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
208 raise NotImplementedError("Immutable objects do not support copying")
210 def _clone(self: _Self, **kw: Any) -> _Self:
211 return self
213 def _copy_internals(
214 self, *, omit_attrs: Iterable[str] = (), **kw: Any
215 ) -> None:
216 pass
219class SingletonConstant(Immutable):
220 """Represent SQL constants like NULL, TRUE, FALSE"""
222 _is_singleton_constant: bool = True
224 _singleton: SingletonConstant
226 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T:
227 return cast(_T, cls._singleton)
229 @util.non_memoized_property
230 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
231 raise NotImplementedError()
233 @classmethod
234 def _create_singleton(cls) -> None:
235 obj = object.__new__(cls)
236 obj.__init__() # type: ignore
238 # for a long time this was an empty frozenset, meaning
239 # a SingletonConstant would never be a "corresponding column" in
240 # a statement. This referred to #6259. However, in #7154 we see
241 # that we do in fact need "correspondence" to work when matching cols
242 # in result sets, so the non-correspondence was moved to a more
243 # specific level when we are actually adapting expressions for SQL
244 # render only.
245 obj.proxy_set = frozenset([obj])
246 cls._singleton = obj
249def _from_objects(
250 *elements: Union[
251 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement
252 ]
253) -> Iterator[FromClause]:
254 return itertools.chain.from_iterable(
255 [element._from_objects for element in elements]
256 )
259def _select_iterables(
260 elements: Iterable[roles.ColumnsClauseRole],
261) -> _SelectIterable:
262 """expand tables into individual columns in the
263 given list of column expressions.
265 """
266 return itertools.chain.from_iterable(
267 [c._select_iterable for c in elements]
268 )
271_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType")
274class _GenerativeType(Protocol):
275 def _generate(self) -> Self: ...
278def _generative(fn: _Fn) -> _Fn:
279 """non-caching _generative() decorator.
281 This is basically the legacy decorator that copies the object and
282 runs a method on the new copy.
284 """
286 @util.decorator
287 def _generative(
288 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any
289 ) -> _SelfGenerativeType:
290 """Mark a method as generative."""
292 self = self._generate()
293 x = fn(self, *args, **kw)
294 assert x is self, "generative methods must return self"
295 return self
297 decorated = _generative(fn)
298 decorated.non_generative = fn # type: ignore
299 return decorated
302def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]:
303 msgs: Dict[str, str] = kw.pop("msgs", {})
305 defaults: Dict[str, str] = kw.pop("defaults", {})
307 getters: List[Tuple[str, operator.attrgetter[Any], Optional[str]]] = [
308 (name, operator.attrgetter(name), defaults.get(name, None))
309 for name in names
310 ]
312 @util.decorator
313 def check(fn: _Fn, *args: Any, **kw: Any) -> Any:
314 # make pylance happy by not including "self" in the argument
315 # list
316 self = args[0]
317 args = args[1:]
318 for name, getter, default_ in getters:
319 if getter(self) is not default_:
320 msg = msgs.get(
321 name,
322 "Method %s() has already been invoked on this %s construct"
323 % (fn.__name__, self.__class__),
324 )
325 raise exc.InvalidRequestError(msg)
326 return fn(self, *args, **kw)
328 return check
331def _clone(element, **kw):
332 return element._clone(**kw)
335def _expand_cloned(
336 elements: Iterable[_CLE],
337) -> Iterable[_CLE]:
338 """expand the given set of ClauseElements to be the set of all 'cloned'
339 predecessors.
341 """
342 # TODO: cython candidate
343 return itertools.chain(*[x._cloned_set for x in elements])
346def _de_clone(
347 elements: Iterable[_CLE],
348) -> Iterable[_CLE]:
349 for x in elements:
350 while x._is_clone_of is not None:
351 x = x._is_clone_of
352 yield x
355def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
356 """return the intersection of sets a and b, counting
357 any overlap between 'cloned' predecessors.
359 The returned set is in terms of the entities present within 'a'.
361 """
362 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
363 _expand_cloned(b)
364 )
365 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)}
368def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
369 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
370 _expand_cloned(b)
371 )
372 return {
373 elem for elem in a if not all_overlap.intersection(elem._cloned_set)
374 }
377class _DialectArgView(MutableMapping[str, Any]):
378 """A dictionary view of dialect-level arguments in the form
379 <dialectname>_<argument_name>.
381 """
383 __slots__ = ("obj",)
385 def __init__(self, obj: DialectKWArgs) -> None:
386 self.obj = obj
388 def _key(self, key: str) -> Tuple[str, str]:
389 try:
390 dialect, value_key = key.split("_", 1)
391 except ValueError as err:
392 raise KeyError(key) from err
393 else:
394 return dialect, value_key
396 def __getitem__(self, key: str) -> Any:
397 dialect, value_key = self._key(key)
399 try:
400 opt = self.obj.dialect_options[dialect]
401 except exc.NoSuchModuleError as err:
402 raise KeyError(key) from err
403 else:
404 return opt[value_key]
406 def __setitem__(self, key: str, value: Any) -> None:
407 try:
408 dialect, value_key = self._key(key)
409 except KeyError as err:
410 raise exc.ArgumentError(
411 "Keys must be of the form <dialectname>_<argname>"
412 ) from err
413 else:
414 self.obj.dialect_options[dialect][value_key] = value
416 def __delitem__(self, key: str) -> None:
417 dialect, value_key = self._key(key)
418 del self.obj.dialect_options[dialect][value_key]
420 def __len__(self) -> int:
421 return sum(
422 len(args._non_defaults)
423 for args in self.obj.dialect_options.values()
424 )
426 def __iter__(self) -> Generator[str, None, None]:
427 return (
428 "%s_%s" % (dialect_name, value_name)
429 for dialect_name in self.obj.dialect_options
430 for value_name in self.obj.dialect_options[
431 dialect_name
432 ]._non_defaults
433 )
436class _DialectArgDict(MutableMapping[str, Any]):
437 """A dictionary view of dialect-level arguments for a specific
438 dialect.
440 Maintains a separate collection of user-specified arguments
441 and dialect-specified default arguments.
443 """
445 def __init__(self) -> None:
446 self._non_defaults: Dict[str, Any] = {}
447 self._defaults: Dict[str, Any] = {}
449 def __len__(self) -> int:
450 return len(set(self._non_defaults).union(self._defaults))
452 def __iter__(self) -> Iterator[str]:
453 return iter(set(self._non_defaults).union(self._defaults))
455 def __getitem__(self, key: str) -> Any:
456 if key in self._non_defaults:
457 return self._non_defaults[key]
458 else:
459 return self._defaults[key]
461 def __setitem__(self, key: str, value: Any) -> None:
462 self._non_defaults[key] = value
464 def __delitem__(self, key: str) -> None:
465 del self._non_defaults[key]
468@util.preload_module("sqlalchemy.dialects")
469def _kw_reg_for_dialect(dialect_name: str) -> Optional[Dict[Any, Any]]:
470 dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
471 if dialect_cls.construct_arguments is None:
472 return None
473 return dict(dialect_cls.construct_arguments)
476class DialectKWArgs:
477 """Establish the ability for a class to have dialect-specific arguments
478 with defaults and constructor validation.
480 The :class:`.DialectKWArgs` interacts with the
481 :attr:`.DefaultDialect.construct_arguments` present on a dialect.
483 .. seealso::
485 :attr:`.DefaultDialect.construct_arguments`
487 """
489 __slots__ = ()
491 _dialect_kwargs_traverse_internals: List[Tuple[str, Any]] = [
492 ("dialect_options", InternalTraversal.dp_dialect_options)
493 ]
495 @classmethod
496 def argument_for(
497 cls, dialect_name: str, argument_name: str, default: Any
498 ) -> None:
499 """Add a new kind of dialect-specific keyword argument for this class.
501 E.g.::
503 Index.argument_for("mydialect", "length", None)
505 some_index = Index("a", "b", mydialect_length=5)
507 The :meth:`.DialectKWArgs.argument_for` method is a per-argument
508 way adding extra arguments to the
509 :attr:`.DefaultDialect.construct_arguments` dictionary. This
510 dictionary provides a list of argument names accepted by various
511 schema-level constructs on behalf of a dialect.
513 New dialects should typically specify this dictionary all at once as a
514 data member of the dialect class. The use case for ad-hoc addition of
515 argument names is typically for end-user code that is also using
516 a custom compilation scheme which consumes the additional arguments.
518 :param dialect_name: name of a dialect. The dialect must be
519 locatable, else a :class:`.NoSuchModuleError` is raised. The
520 dialect must also include an existing
521 :attr:`.DefaultDialect.construct_arguments` collection, indicating
522 that it participates in the keyword-argument validation and default
523 system, else :class:`.ArgumentError` is raised. If the dialect does
524 not include this collection, then any keyword argument can be
525 specified on behalf of this dialect already. All dialects packaged
526 within SQLAlchemy include this collection, however for third party
527 dialects, support may vary.
529 :param argument_name: name of the parameter.
531 :param default: default value of the parameter.
533 """
535 construct_arg_dictionary: Optional[Dict[Any, Any]] = (
536 DialectKWArgs._kw_registry[dialect_name]
537 )
538 if construct_arg_dictionary is None:
539 raise exc.ArgumentError(
540 "Dialect '%s' does have keyword-argument "
541 "validation and defaults enabled configured" % dialect_name
542 )
543 if cls not in construct_arg_dictionary:
544 construct_arg_dictionary[cls] = {}
545 construct_arg_dictionary[cls][argument_name] = default
547 @property
548 def dialect_kwargs(self) -> _DialectArgView:
549 """A collection of keyword arguments specified as dialect-specific
550 options to this construct.
552 The arguments are present here in their original ``<dialect>_<kwarg>``
553 format. Only arguments that were actually passed are included;
554 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which
555 contains all options known by this dialect including defaults.
557 The collection is also writable; keys are accepted of the
558 form ``<dialect>_<kwarg>`` where the value will be assembled
559 into the list of options.
561 .. seealso::
563 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form
565 """
566 return _DialectArgView(self)
568 @property
569 def kwargs(self) -> _DialectArgView:
570 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
571 return self.dialect_kwargs
573 _kw_registry: util.PopulateDict[str, Optional[Dict[Any, Any]]] = (
574 util.PopulateDict(_kw_reg_for_dialect)
575 )
577 @classmethod
578 def _kw_reg_for_dialect_cls(cls, dialect_name: str) -> _DialectArgDict:
579 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
580 d = _DialectArgDict()
582 if construct_arg_dictionary is None:
583 d._defaults.update({"*": None})
584 else:
585 for cls in reversed(cls.__mro__):
586 if cls in construct_arg_dictionary:
587 d._defaults.update(construct_arg_dictionary[cls])
588 return d
590 @util.memoized_property
591 def dialect_options(self) -> util.PopulateDict[str, _DialectArgDict]:
592 """A collection of keyword arguments specified as dialect-specific
593 options to this construct.
595 This is a two-level nested registry, keyed to ``<dialect_name>``
596 and ``<argument_name>``. For example, the ``postgresql_where``
597 argument would be locatable as::
599 arg = my_object.dialect_options["postgresql"]["where"]
601 .. versionadded:: 0.9.2
603 .. seealso::
605 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form
607 """
609 return util.PopulateDict(self._kw_reg_for_dialect_cls)
611 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None:
612 # validate remaining kwargs that they all specify DB prefixes
614 if not kwargs:
615 return
617 for k in kwargs:
618 m = re.match("^(.+?)_(.+)$", k)
619 if not m:
620 raise TypeError(
621 "Additional arguments should be "
622 "named <dialectname>_<argument>, got '%s'" % k
623 )
624 dialect_name, arg_name = m.group(1, 2)
626 try:
627 construct_arg_dictionary = self.dialect_options[dialect_name]
628 except exc.NoSuchModuleError:
629 util.warn(
630 "Can't validate argument %r; can't "
631 "locate any SQLAlchemy dialect named %r"
632 % (k, dialect_name)
633 )
634 self.dialect_options[dialect_name] = d = _DialectArgDict()
635 d._defaults.update({"*": None})
636 d._non_defaults[arg_name] = kwargs[k]
637 else:
638 if (
639 "*" not in construct_arg_dictionary
640 and arg_name not in construct_arg_dictionary
641 ):
642 raise exc.ArgumentError(
643 "Argument %r is not accepted by "
644 "dialect %r on behalf of %r"
645 % (k, dialect_name, self.__class__)
646 )
647 else:
648 construct_arg_dictionary[arg_name] = kwargs[k]
651class CompileState:
652 """Produces additional object state necessary for a statement to be
653 compiled.
655 the :class:`.CompileState` class is at the base of classes that assemble
656 state for a particular statement object that is then used by the
657 compiler. This process is essentially an extension of the process that
658 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis
659 on converting raw user intent into more organized structures rather than
660 producing string output. The top-level :class:`.CompileState` for the
661 statement being executed is also accessible when the execution context
662 works with invoking the statement and collecting results.
664 The production of :class:`.CompileState` is specific to the compiler, such
665 as within the :meth:`.SQLCompiler.visit_insert`,
666 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also
667 responsible for associating the :class:`.CompileState` with the
668 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement,
669 i.e. the outermost SQL statement that's actually being executed.
670 There can be other :class:`.CompileState` objects that are not the
671 toplevel, such as when a SELECT subquery or CTE-nested
672 INSERT/UPDATE/DELETE is generated.
674 .. versionadded:: 1.4
676 """
678 __slots__ = ("statement", "_ambiguous_table_name_map")
680 plugins: Dict[Tuple[str, str], Type[CompileState]] = {}
682 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap]
684 @classmethod
685 def create_for_statement(
686 cls, statement: Executable, compiler: SQLCompiler, **kw: Any
687 ) -> CompileState:
688 # factory construction.
690 if statement._propagate_attrs:
691 plugin_name = statement._propagate_attrs.get(
692 "compile_state_plugin", "default"
693 )
694 klass = cls.plugins.get(
695 (plugin_name, statement._effective_plugin_target), None
696 )
697 if klass is None:
698 klass = cls.plugins[
699 ("default", statement._effective_plugin_target)
700 ]
702 else:
703 klass = cls.plugins[
704 ("default", statement._effective_plugin_target)
705 ]
707 if klass is cls:
708 return cls(statement, compiler, **kw)
709 else:
710 return klass.create_for_statement(statement, compiler, **kw)
712 def __init__(self, statement, compiler, **kw):
713 self.statement = statement
715 @classmethod
716 def get_plugin_class(
717 cls, statement: Executable
718 ) -> Optional[Type[CompileState]]:
719 plugin_name = statement._propagate_attrs.get(
720 "compile_state_plugin", None
721 )
723 if plugin_name:
724 key = (plugin_name, statement._effective_plugin_target)
725 if key in cls.plugins:
726 return cls.plugins[key]
728 # there's no case where we call upon get_plugin_class() and want
729 # to get None back, there should always be a default. return that
730 # if there was no plugin-specific class (e.g. Insert with "orm"
731 # plugin)
732 try:
733 return cls.plugins[("default", statement._effective_plugin_target)]
734 except KeyError:
735 return None
737 @classmethod
738 def _get_plugin_class_for_plugin(
739 cls, statement: Executable, plugin_name: str
740 ) -> Optional[Type[CompileState]]:
741 try:
742 return cls.plugins[
743 (plugin_name, statement._effective_plugin_target)
744 ]
745 except KeyError:
746 return None
748 @classmethod
749 def plugin_for(
750 cls, plugin_name: str, visit_name: str
751 ) -> Callable[[_Fn], _Fn]:
752 def decorate(cls_to_decorate):
753 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate
754 return cls_to_decorate
756 return decorate
759class Generative(HasMemoized):
760 """Provide a method-chaining pattern in conjunction with the
761 @_generative decorator."""
763 def _generate(self) -> Self:
764 skip = self._memoized_keys
765 cls = self.__class__
766 s = cls.__new__(cls)
767 if skip:
768 # ensure this iteration remains atomic
769 s.__dict__ = {
770 k: v for k, v in self.__dict__.copy().items() if k not in skip
771 }
772 else:
773 s.__dict__ = self.__dict__.copy()
774 return s
777class InPlaceGenerative(HasMemoized):
778 """Provide a method-chaining pattern in conjunction with the
779 @_generative decorator that mutates in place."""
781 __slots__ = ()
783 def _generate(self) -> Self:
784 skip = self._memoized_keys
785 # note __dict__ needs to be in __slots__ if this is used
786 for k in skip:
787 self.__dict__.pop(k, None)
788 return self
791class HasCompileState(Generative):
792 """A class that has a :class:`.CompileState` associated with it."""
794 _compile_state_plugin: Optional[Type[CompileState]] = None
796 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT
798 _compile_state_factory = CompileState.create_for_statement
801class _MetaOptions(type):
802 """metaclass for the Options class.
804 This metaclass is actually necessary despite the availability of the
805 ``__init_subclass__()`` hook as this type also provides custom class-level
806 behavior for the ``__add__()`` method.
808 """
810 _cache_attrs: Tuple[str, ...]
812 def __add__(self, other):
813 o1 = self()
815 if set(other).difference(self._cache_attrs):
816 raise TypeError(
817 "dictionary contains attributes not covered by "
818 "Options class %s: %r"
819 % (self, set(other).difference(self._cache_attrs))
820 )
822 o1.__dict__.update(other)
823 return o1
825 if TYPE_CHECKING:
827 def __getattr__(self, key: str) -> Any: ...
829 def __setattr__(self, key: str, value: Any) -> None: ...
831 def __delattr__(self, key: str) -> None: ...
834class Options(metaclass=_MetaOptions):
835 """A cacheable option dictionary with defaults."""
837 __slots__ = ()
839 _cache_attrs: Tuple[str, ...]
841 def __init_subclass__(cls) -> None:
842 dict_ = cls.__dict__
843 cls._cache_attrs = tuple(
844 sorted(
845 d
846 for d in dict_
847 if not d.startswith("__")
848 and d not in ("_cache_key_traversal",)
849 )
850 )
851 super().__init_subclass__()
853 def __init__(self, **kw: Any) -> None:
854 self.__dict__.update(kw)
856 def __add__(self, other):
857 o1 = self.__class__.__new__(self.__class__)
858 o1.__dict__.update(self.__dict__)
860 if set(other).difference(self._cache_attrs):
861 raise TypeError(
862 "dictionary contains attributes not covered by "
863 "Options class %s: %r"
864 % (self, set(other).difference(self._cache_attrs))
865 )
867 o1.__dict__.update(other)
868 return o1
870 def __eq__(self, other):
871 # TODO: very inefficient. This is used only in test suites
872 # right now.
873 for a, b in zip_longest(self._cache_attrs, other._cache_attrs):
874 if getattr(self, a) != getattr(other, b):
875 return False
876 return True
878 def __repr__(self) -> str:
879 # TODO: fairly inefficient, used only in debugging right now.
881 return "%s(%s)" % (
882 self.__class__.__name__,
883 ", ".join(
884 "%s=%r" % (k, self.__dict__[k])
885 for k in self._cache_attrs
886 if k in self.__dict__
887 ),
888 )
890 @classmethod
891 def isinstance(cls, klass: Type[Any]) -> bool:
892 return issubclass(cls, klass)
894 @hybridmethod
895 def add_to_element(self, name: str, value: str) -> Any:
896 return self + {name: getattr(self, name) + value}
898 @hybridmethod
899 def _state_dict_inst(self) -> Mapping[str, Any]:
900 return self.__dict__
902 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT
904 @_state_dict_inst.classlevel
905 def _state_dict(cls) -> Mapping[str, Any]:
906 return cls._state_dict_const
908 @classmethod
909 def safe_merge(cls, other: "Options") -> Any:
910 d = other._state_dict()
912 # only support a merge with another object of our class
913 # and which does not have attrs that we don't. otherwise
914 # we risk having state that might not be part of our cache
915 # key strategy
917 if (
918 cls is not other.__class__
919 and other._cache_attrs
920 and set(other._cache_attrs).difference(cls._cache_attrs)
921 ):
922 raise TypeError(
923 "other element %r is not empty, is not of type %s, "
924 "and contains attributes not covered here %r"
925 % (
926 other,
927 cls,
928 set(other._cache_attrs).difference(cls._cache_attrs),
929 )
930 )
931 return cls + d
933 @classmethod
934 def from_execution_options(
935 cls,
936 key: str,
937 attrs: set[str],
938 exec_options: Mapping[str, Any],
939 statement_exec_options: Mapping[str, Any],
940 ) -> Tuple["Options", Mapping[str, Any]]:
941 """process Options argument in terms of execution options.
944 e.g.::
946 (
947 load_options,
948 execution_options,
949 ) = QueryContext.default_load_options.from_execution_options(
950 "_sa_orm_load_options",
951 {"populate_existing", "autoflush", "yield_per"},
952 execution_options,
953 statement._execution_options,
954 )
956 get back the Options and refresh "_sa_orm_load_options" in the
957 exec options dict w/ the Options as well
959 """
961 # common case is that no options we are looking for are
962 # in either dictionary, so cancel for that first
963 check_argnames = attrs.intersection(
964 set(exec_options).union(statement_exec_options)
965 )
967 existing_options = exec_options.get(key, cls)
969 if check_argnames:
970 result = {}
971 for argname in check_argnames:
972 local = "_" + argname
973 if argname in exec_options:
974 result[local] = exec_options[argname]
975 elif argname in statement_exec_options:
976 result[local] = statement_exec_options[argname]
978 new_options = existing_options + result
979 exec_options = util.immutabledict().merge_with(
980 exec_options, {key: new_options}
981 )
982 return new_options, exec_options
984 else:
985 return existing_options, exec_options
987 if TYPE_CHECKING:
989 def __getattr__(self, key: str) -> Any: ...
991 def __setattr__(self, key: str, value: Any) -> None: ...
993 def __delattr__(self, key: str) -> None: ...
996class CacheableOptions(Options, HasCacheKey):
997 __slots__ = ()
999 @hybridmethod
1000 def _gen_cache_key_inst(
1001 self, anon_map: Any, bindparams: List[BindParameter[Any]]
1002 ) -> Optional[Tuple[Any]]:
1003 return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
1005 @_gen_cache_key_inst.classlevel
1006 def _gen_cache_key(
1007 cls, anon_map: "anon_map", bindparams: List[BindParameter[Any]]
1008 ) -> Tuple[CacheableOptions, Any]:
1009 return (cls, ())
1011 @hybridmethod
1012 def _generate_cache_key(self) -> Optional[CacheKey]:
1013 return HasCacheKey._generate_cache_key_for_object(self)
1016class ExecutableOption(HasCopyInternals):
1017 __slots__ = ()
1019 _annotations: _ImmutableExecuteOptions = util.EMPTY_DICT
1021 __visit_name__: str = "executable_option"
1023 _is_has_cache_key: bool = False
1025 _is_core: bool = True
1027 def _clone(self, **kw):
1028 """Create a shallow copy of this ExecutableOption."""
1029 c = self.__class__.__new__(self.__class__)
1030 c.__dict__ = dict(self.__dict__) # type: ignore
1031 return c
1034_L = TypeVar("_L", bound=str)
1037class HasSyntaxExtensions(Generic[_L]):
1039 _position_map: Mapping[_L, str]
1041 @_generative
1042 def ext(self, extension: SyntaxExtension) -> Self:
1043 """Applies a SQL syntax extension to this statement.
1045 SQL syntax extensions are :class:`.ClauseElement` objects that define
1046 some vendor-specific syntactical construct that take place in specific
1047 parts of a SQL statement. Examples include vendor extensions like
1048 PostgreSQL / SQLite's "ON DUPLICATE KEY UPDATE", PostgreSQL's
1049 "DISTINCT ON", and MySQL's "LIMIT" that can be applied to UPDATE
1050 and DELETE statements.
1052 .. seealso::
1054 :ref:`examples_syntax_extensions`
1056 :func:`_mysql.limit` - DML LIMIT for MySQL
1058 :func:`_postgresql.distinct_on` - DISTINCT ON for PostgreSQL
1060 .. versionadded:: 2.1
1062 """
1063 extension = coercions.expect(
1064 roles.SyntaxExtensionRole, extension, apply_propagate_attrs=self
1065 )
1066 self._apply_syntax_extension_to_self(extension)
1067 return self
1069 @util.preload_module("sqlalchemy.sql.elements")
1070 def apply_syntax_extension_point(
1071 self,
1072 apply_fn: Callable[[Sequence[ClauseElement]], Sequence[ClauseElement]],
1073 position: _L,
1074 ) -> None:
1075 """Apply a :class:`.SyntaxExtension` to a known extension point.
1077 Should be used only internally by :class:`.SyntaxExtension`.
1079 E.g.::
1081 class Qualify(SyntaxExtension, ClauseElement):
1083 # ...
1085 def apply_to_select(self, select_stmt: Select) -> None:
1086 # append self to existing
1087 select_stmt.apply_extension_point(
1088 lambda existing: [*existing, self], "post_criteria"
1089 )
1092 class ReplaceExt(SyntaxExtension, ClauseElement):
1094 # ...
1096 def apply_to_select(self, select_stmt: Select) -> None:
1097 # replace any existing elements regardless of type
1098 select_stmt.apply_extension_point(
1099 lambda existing: [self], "post_criteria"
1100 )
1103 class ReplaceOfTypeExt(SyntaxExtension, ClauseElement):
1105 # ...
1107 def apply_to_select(self, select_stmt: Select) -> None:
1108 # replace any existing elements of the same type
1109 select_stmt.apply_extension_point(
1110 self.append_replacing_same_type, "post_criteria"
1111 )
1113 :param apply_fn: callable function that will receive a sequence of
1114 :class:`.ClauseElement` that is already populating the extension
1115 point (the sequence is empty if there isn't one), and should return
1116 a new sequence of :class:`.ClauseElement` that will newly populate
1117 that point. The function typically can choose to concatenate the
1118 existing values with the new one, or to replace the values that are
1119 there with a new one by returning a list of a single element, or
1120 to perform more complex operations like removing only the same
1121 type element from the input list of merging already existing elements
1122 of the same type. Some examples are shown in the examples above
1123 :param position: string name of the position to apply to. This
1124 varies per statement type. IDEs should show the possible values
1125 for each statement type as it's typed with a ``typing.Literal`` per
1126 statement.
1128 .. seealso::
1130 :ref:`examples_syntax_extensions`
1133 """ # noqa: E501
1135 try:
1136 attrname = self._position_map[position]
1137 except KeyError as ke:
1138 raise ValueError(
1139 f"Unknown position {position!r} for {self.__class__} "
1140 f"construct; known positions: "
1141 f"{', '.join(repr(k) for k in self._position_map)}"
1142 ) from ke
1143 else:
1144 ElementList = util.preloaded.sql_elements.ElementList
1145 existing: Optional[ClauseElement] = getattr(self, attrname, None)
1146 if existing is None:
1147 input_seq: Tuple[ClauseElement, ...] = ()
1148 elif isinstance(existing, ElementList):
1149 input_seq = existing.clauses
1150 else:
1151 input_seq = (existing,)
1153 new_seq = apply_fn(input_seq)
1154 assert new_seq, "cannot return empty sequence"
1155 new = new_seq[0] if len(new_seq) == 1 else ElementList(new_seq)
1156 setattr(self, attrname, new)
1158 def _apply_syntax_extension_to_self(
1159 self, extension: SyntaxExtension
1160 ) -> None:
1161 raise NotImplementedError()
1163 def _get_syntax_extensions_as_dict(self) -> Mapping[_L, SyntaxExtension]:
1164 res: Dict[_L, SyntaxExtension] = {}
1165 for name, attr in self._position_map.items():
1166 value = getattr(self, attr)
1167 if value is not None:
1168 res[name] = value
1169 return res
1171 def _set_syntax_extensions(self, **extensions: SyntaxExtension) -> None:
1172 for name, value in extensions.items():
1173 setattr(self, self._position_map[name], value) # type: ignore[index] # noqa: E501
1176class SyntaxExtension(roles.SyntaxExtensionRole):
1177 """Defines a unit that when also extending from :class:`.ClauseElement`
1178 can be applied to SQLAlchemy statements :class:`.Select`,
1179 :class:`_sql.Insert`, :class:`.Update` and :class:`.Delete` making use of
1180 pre-established SQL insertion points within these constructs.
1182 .. versionadded:: 2.1
1184 .. seealso::
1186 :ref:`examples_syntax_extensions`
1188 """
1190 def append_replacing_same_type(
1191 self, existing: Sequence[ClauseElement]
1192 ) -> Sequence[ClauseElement]:
1193 """Utility function that can be used as
1194 :paramref:`_sql.HasSyntaxExtensions.apply_extension_point.apply_fn`
1195 to remove any other element of the same type in existing and appending
1196 ``self`` to the list.
1198 This is equivalent to::
1200 stmt.apply_extension_point(
1201 lambda existing: [
1202 *(e for e in existing if not isinstance(e, ReplaceOfTypeExt)),
1203 self,
1204 ],
1205 "post_criteria",
1206 )
1208 .. seealso::
1210 :ref:`examples_syntax_extensions`
1212 :meth:`_sql.HasSyntaxExtensions.apply_syntax_extension_point`
1214 """ # noqa: E501
1215 cls = type(self)
1216 return [*(e for e in existing if not isinstance(e, cls)), self] # type: ignore[list-item] # noqa: E501
1218 def apply_to_select(self, select_stmt: Select[Unpack[_Ts]]) -> None:
1219 """Apply this :class:`.SyntaxExtension` to a :class:`.Select`"""
1220 raise NotImplementedError(
1221 f"Extension {type(self).__name__} cannot be applied to select"
1222 )
1224 def apply_to_update(self, update_stmt: Update) -> None:
1225 """Apply this :class:`.SyntaxExtension` to an :class:`.Update`"""
1226 raise NotImplementedError(
1227 f"Extension {type(self).__name__} cannot be applied to update"
1228 )
1230 def apply_to_delete(self, delete_stmt: Delete) -> None:
1231 """Apply this :class:`.SyntaxExtension` to a :class:`.Delete`"""
1232 raise NotImplementedError(
1233 f"Extension {type(self).__name__} cannot be applied to delete"
1234 )
1236 def apply_to_insert(self, insert_stmt: Insert) -> None:
1237 """Apply this :class:`.SyntaxExtension` to an :class:`_sql.Insert`"""
1238 raise NotImplementedError(
1239 f"Extension {type(self).__name__} cannot be applied to insert"
1240 )
1243class Executable(roles.StatementRole):
1244 """Mark a :class:`_expression.ClauseElement` as supporting execution.
1246 :class:`.Executable` is a superclass for all "statement" types
1247 of objects, including :func:`select`, :func:`delete`, :func:`update`,
1248 :func:`insert`, :func:`text`.
1250 """
1252 supports_execution: bool = True
1253 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT
1254 _is_default_generator: bool = False
1255 _with_options: Tuple[ExecutableOption, ...] = ()
1256 _compile_state_funcs: Tuple[
1257 Tuple[Callable[[CompileState], None], Any], ...
1258 ] = ()
1259 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]]
1261 _executable_traverse_internals = [
1262 ("_with_options", InternalTraversal.dp_executable_options),
1263 (
1264 "_compile_state_funcs",
1265 ExtendedInternalTraversal.dp_compile_state_funcs,
1266 ),
1267 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
1268 ]
1270 is_select: bool = False
1271 is_from_statement: bool = False
1272 is_update: bool = False
1273 is_insert: bool = False
1274 is_text: bool = False
1275 is_delete: bool = False
1276 is_dml: bool = False
1278 if TYPE_CHECKING:
1279 __visit_name__: str
1281 def _compile_w_cache(
1282 self,
1283 dialect: Dialect,
1284 *,
1285 compiled_cache: Optional[CompiledCacheType],
1286 column_keys: List[str],
1287 for_executemany: bool = False,
1288 schema_translate_map: Optional[SchemaTranslateMapType] = None,
1289 **kw: Any,
1290 ) -> Tuple[
1291 Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats
1292 ]: ...
1294 def _execute_on_connection(
1295 self,
1296 connection: Connection,
1297 distilled_params: _CoreMultiExecuteParams,
1298 execution_options: CoreExecuteOptionsParameter,
1299 ) -> CursorResult[Any]: ...
1301 def _execute_on_scalar(
1302 self,
1303 connection: Connection,
1304 distilled_params: _CoreMultiExecuteParams,
1305 execution_options: CoreExecuteOptionsParameter,
1306 ) -> Any: ...
1308 @util.ro_non_memoized_property
1309 def _all_selected_columns(self) -> _SelectIterable:
1310 raise NotImplementedError()
1312 @property
1313 def _effective_plugin_target(self) -> str:
1314 return self.__visit_name__
1316 @_generative
1317 def options(self, *options: ExecutableOption) -> Self:
1318 """Apply options to this statement.
1320 In the general sense, options are any kind of Python object
1321 that can be interpreted by systems that consume the statement outside
1322 of the regular SQL compiler chain. Specifically, these options are
1323 the ORM level options that apply "eager load" and other loading
1324 behaviors to an ORM query.
1326 For background on specific kinds of options for specific kinds of
1327 statements, refer to the documentation for those option objects.
1329 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to
1330 Core statement objects towards the goal of allowing unified
1331 Core / ORM querying capabilities.
1333 .. seealso::
1335 :ref:`loading_columns` - refers to options specific to the usage
1336 of ORM queries
1338 :ref:`relationship_loader_options` - refers to options specific
1339 to the usage of ORM queries
1341 """
1342 self._with_options += tuple(
1343 coercions.expect(roles.ExecutableOptionRole, opt)
1344 for opt in options
1345 )
1346 return self
1348 @_generative
1349 def _set_compile_options(self, compile_options: CacheableOptions) -> Self:
1350 """Assign the compile options to a new value.
1352 :param compile_options: appropriate CacheableOptions structure
1354 """
1356 self._compile_options = compile_options
1357 return self
1359 @_generative
1360 def _update_compile_options(self, options: CacheableOptions) -> Self:
1361 """update the _compile_options with new keys."""
1363 assert self._compile_options is not None
1364 self._compile_options += options
1365 return self
1367 @_generative
1368 def _add_compile_state_func(
1369 self,
1370 callable_: Callable[[CompileState], None],
1371 cache_args: Any,
1372 ) -> Self:
1373 """Add a compile state function to this statement.
1375 When using the ORM only, these are callable functions that will
1376 be given the CompileState object upon compilation.
1378 A second argument cache_args is required, which will be combined with
1379 the ``__code__`` identity of the function itself in order to produce a
1380 cache key.
1382 """
1383 self._compile_state_funcs += ((callable_, cache_args),)
1384 return self
1386 @overload
1387 def execution_options(
1388 self,
1389 *,
1390 compiled_cache: Optional[CompiledCacheType] = ...,
1391 logging_token: str = ...,
1392 isolation_level: IsolationLevel = ...,
1393 no_parameters: bool = False,
1394 stream_results: bool = False,
1395 max_row_buffer: int = ...,
1396 yield_per: int = ...,
1397 driver_column_names: bool = ...,
1398 insertmanyvalues_page_size: int = ...,
1399 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
1400 populate_existing: bool = False,
1401 autoflush: bool = False,
1402 synchronize_session: SynchronizeSessionArgument = ...,
1403 dml_strategy: DMLStrategyArgument = ...,
1404 render_nulls: bool = ...,
1405 is_delete_using: bool = ...,
1406 is_update_from: bool = ...,
1407 preserve_rowcount: bool = False,
1408 **opt: Any,
1409 ) -> Self: ...
1411 @overload
1412 def execution_options(self, **opt: Any) -> Self: ...
1414 @_generative
1415 def execution_options(self, **kw: Any) -> Self:
1416 """Set non-SQL options for the statement which take effect during
1417 execution.
1419 Execution options can be set at many scopes, including per-statement,
1420 per-connection, or per execution, using methods such as
1421 :meth:`_engine.Connection.execution_options` and parameters which
1422 accept a dictionary of options such as
1423 :paramref:`_engine.Connection.execute.execution_options` and
1424 :paramref:`_orm.Session.execute.execution_options`.
1426 The primary characteristic of an execution option, as opposed to
1427 other kinds of options such as ORM loader options, is that
1428 **execution options never affect the compiled SQL of a query, only
1429 things that affect how the SQL statement itself is invoked or how
1430 results are fetched**. That is, execution options are not part of
1431 what's accommodated by SQL compilation nor are they considered part of
1432 the cached state of a statement.
1434 The :meth:`_sql.Executable.execution_options` method is
1435 :term:`generative`, as
1436 is the case for the method as applied to the :class:`_engine.Engine`
1437 and :class:`_orm.Query` objects, which means when the method is called,
1438 a copy of the object is returned, which applies the given parameters to
1439 that new copy, but leaves the original unchanged::
1441 statement = select(table.c.x, table.c.y)
1442 new_statement = statement.execution_options(my_option=True)
1444 An exception to this behavior is the :class:`_engine.Connection`
1445 object, where the :meth:`_engine.Connection.execution_options` method
1446 is explicitly **not** generative.
1448 The kinds of options that may be passed to
1449 :meth:`_sql.Executable.execution_options` and other related methods and
1450 parameter dictionaries include parameters that are explicitly consumed
1451 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not
1452 defined by SQLAlchemy, which means the methods and/or parameter
1453 dictionaries may be used for user-defined parameters that interact with
1454 custom code, which may access the parameters using methods such as
1455 :meth:`_sql.Executable.get_execution_options` and
1456 :meth:`_engine.Connection.get_execution_options`, or within selected
1457 event hooks using a dedicated ``execution_options`` event parameter
1458 such as
1459 :paramref:`_events.ConnectionEvents.before_execute.execution_options`
1460 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.::
1462 from sqlalchemy import event
1465 @event.listens_for(some_engine, "before_execute")
1466 def _process_opt(conn, statement, multiparams, params, execution_options):
1467 "run a SQL function before invoking a statement"
1469 if execution_options.get("do_special_thing", False):
1470 conn.exec_driver_sql("run_special_function()")
1472 Within the scope of options that are explicitly recognized by
1473 SQLAlchemy, most apply to specific classes of objects and not others.
1474 The most common execution options include:
1476 * :paramref:`_engine.Connection.execution_options.isolation_level` -
1477 sets the isolation level for a connection or a class of connections
1478 via an :class:`_engine.Engine`. This option is accepted only
1479 by :class:`_engine.Connection` or :class:`_engine.Engine`.
1481 * :paramref:`_engine.Connection.execution_options.stream_results` -
1482 indicates results should be fetched using a server side cursor;
1483 this option is accepted by :class:`_engine.Connection`, by the
1484 :paramref:`_engine.Connection.execute.execution_options` parameter
1485 on :meth:`_engine.Connection.execute`, and additionally by
1486 :meth:`_sql.Executable.execution_options` on a SQL statement object,
1487 as well as by ORM constructs like :meth:`_orm.Session.execute`.
1489 * :paramref:`_engine.Connection.execution_options.compiled_cache` -
1490 indicates a dictionary that will serve as the
1491 :ref:`SQL compilation cache <sql_caching>`
1492 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as
1493 well as for ORM methods like :meth:`_orm.Session.execute`.
1494 Can be passed as ``None`` to disable caching for statements.
1495 This option is not accepted by
1496 :meth:`_sql.Executable.execution_options` as it is inadvisable to
1497 carry along a compilation cache within a statement object.
1499 * :paramref:`_engine.Connection.execution_options.schema_translate_map`
1500 - a mapping of schema names used by the
1501 :ref:`Schema Translate Map <schema_translating>` feature, accepted
1502 by :class:`_engine.Connection`, :class:`_engine.Engine`,
1503 :class:`_sql.Executable`, as well as by ORM constructs
1504 like :meth:`_orm.Session.execute`.
1506 .. seealso::
1508 :meth:`_engine.Connection.execution_options`
1510 :paramref:`_engine.Connection.execute.execution_options`
1512 :paramref:`_orm.Session.execute.execution_options`
1514 :ref:`orm_queryguide_execution_options` - documentation on all
1515 ORM-specific execution options
1517 """ # noqa: E501
1518 if "isolation_level" in kw:
1519 raise exc.ArgumentError(
1520 "'isolation_level' execution option may only be specified "
1521 "on Connection.execution_options(), or "
1522 "per-engine using the isolation_level "
1523 "argument to create_engine()."
1524 )
1525 if "compiled_cache" in kw:
1526 raise exc.ArgumentError(
1527 "'compiled_cache' execution option may only be specified "
1528 "on Connection.execution_options(), not per statement."
1529 )
1530 self._execution_options = self._execution_options.union(kw)
1531 return self
1533 def get_execution_options(self) -> _ExecuteOptions:
1534 """Get the non-SQL options which will take effect during execution.
1536 .. seealso::
1538 :meth:`.Executable.execution_options`
1539 """
1540 return self._execution_options
1543class SchemaEventTarget(event.EventTarget):
1544 """Base class for elements that are the targets of :class:`.DDLEvents`
1545 events.
1547 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
1549 """
1551 dispatch: dispatcher[SchemaEventTarget]
1553 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
1554 """Associate with this SchemaEvent's parent object."""
1556 def _set_parent_with_dispatch(
1557 self, parent: SchemaEventTarget, **kw: Any
1558 ) -> None:
1559 self.dispatch.before_parent_attach(self, parent)
1560 self._set_parent(parent, **kw)
1561 self.dispatch.after_parent_attach(self, parent)
1564class SchemaVisitable(SchemaEventTarget, visitors.Visitable):
1565 """Base class for elements that are targets of a :class:`.SchemaVisitor`.
1567 .. versionadded:: 2.0.41
1569 """
1572class SchemaVisitor(ClauseVisitor):
1573 """Define the visiting for ``SchemaItem`` and more
1574 generally ``SchemaVisitable`` objects.
1576 """
1578 __traverse_options__: Dict[str, Any] = {"schema_visitor": True}
1581class _SentinelDefaultCharacterization(Enum):
1582 NONE = "none"
1583 UNKNOWN = "unknown"
1584 CLIENTSIDE = "clientside"
1585 SENTINEL_DEFAULT = "sentinel_default"
1586 SERVERSIDE = "serverside"
1587 IDENTITY = "identity"
1588 SEQUENCE = "sequence"
1591class _SentinelColumnCharacterization(NamedTuple):
1592 columns: Optional[Sequence[Column[Any]]] = None
1593 is_explicit: bool = False
1594 is_autoinc: bool = False
1595 default_characterization: _SentinelDefaultCharacterization = (
1596 _SentinelDefaultCharacterization.NONE
1597 )
1600_COLKEY = TypeVar("_COLKEY", Union[None, str], str)
1602_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True)
1603_COL = TypeVar("_COL", bound="ColumnElement[Any]")
1606class _ColumnMetrics(Generic[_COL_co]):
1607 __slots__ = ("column",)
1609 column: _COL_co
1611 def __init__(
1612 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co
1613 ) -> None:
1614 self.column = col
1616 # proxy_index being non-empty means it was initialized.
1617 # so we need to update it
1618 pi = collection._proxy_index
1619 if pi:
1620 for eps_col in col._expanded_proxy_set:
1621 pi[eps_col].add(self)
1623 def get_expanded_proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
1624 return self.column._expanded_proxy_set
1626 def dispose(self, collection: ColumnCollection[_COLKEY, _COL_co]) -> None:
1627 pi = collection._proxy_index
1628 if not pi:
1629 return
1630 for col in self.column._expanded_proxy_set:
1631 colset = pi.get(col, None)
1632 if colset:
1633 colset.discard(self)
1634 if colset is not None and not colset:
1635 del pi[col]
1637 def embedded(
1638 self,
1639 target_set: Union[
1640 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]]
1641 ],
1642 ) -> bool:
1643 expanded_proxy_set = self.column._expanded_proxy_set
1644 for t in target_set.difference(expanded_proxy_set):
1645 if not expanded_proxy_set.intersection(_expand_cloned([t])):
1646 return False
1647 return True
1650class ColumnCollection(Generic[_COLKEY, _COL_co]):
1651 """Collection of :class:`_expression.ColumnElement` instances,
1652 typically for
1653 :class:`_sql.FromClause` objects.
1655 The :class:`_sql.ColumnCollection` object is most commonly available
1656 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection
1657 on the :class:`_schema.Table` object, introduced at
1658 :ref:`metadata_tables_and_columns`.
1660 The :class:`_expression.ColumnCollection` has both mapping- and sequence-
1661 like behaviors. A :class:`_expression.ColumnCollection` usually stores
1662 :class:`_schema.Column` objects, which are then accessible both via mapping
1663 style access as well as attribute access style.
1665 To access :class:`_schema.Column` objects using ordinary attribute-style
1666 access, specify the name like any other object attribute, such as below
1667 a column named ``employee_name`` is accessed::
1669 >>> employee_table.c.employee_name
1671 To access columns that have names with special characters or spaces,
1672 index-style access is used, such as below which illustrates a column named
1673 ``employee ' payment`` is accessed::
1675 >>> employee_table.c["employee ' payment"]
1677 As the :class:`_sql.ColumnCollection` object provides a Python dictionary
1678 interface, common dictionary method names like
1679 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`,
1680 and :meth:`_sql.ColumnCollection.items` are available, which means that
1681 database columns that are keyed under these names also need to use indexed
1682 access::
1684 >>> employee_table.c["values"]
1687 The name for which a :class:`_schema.Column` would be present is normally
1688 that of the :paramref:`_schema.Column.key` parameter. In some contexts,
1689 such as a :class:`_sql.Select` object that uses a label style set
1690 using the :meth:`_sql.Select.set_label_style` method, a column of a certain
1691 key may instead be represented under a particular label name such
1692 as ``tablename_columnname``::
1694 >>> from sqlalchemy import select, column, table
1695 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
1696 >>> t = table("t", column("c"))
1697 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
1698 >>> subq = stmt.subquery()
1699 >>> subq.c.t_c
1700 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c>
1702 :class:`.ColumnCollection` also indexes the columns in order and allows
1703 them to be accessible by their integer position::
1705 >>> cc[0]
1706 Column('x', Integer(), table=None)
1707 >>> cc[1]
1708 Column('y', Integer(), table=None)
1710 .. versionadded:: 1.4 :class:`_expression.ColumnCollection`
1711 allows integer-based
1712 index access to the collection.
1714 Iterating the collection yields the column expressions in order::
1716 >>> list(cc)
1717 [Column('x', Integer(), table=None),
1718 Column('y', Integer(), table=None)]
1720 The base :class:`_expression.ColumnCollection` object can store
1721 duplicates, which can
1722 mean either two columns with the same key, in which case the column
1723 returned by key access is **arbitrary**::
1725 >>> x1, x2 = Column("x", Integer), Column("x", Integer)
1726 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)])
1727 >>> list(cc)
1728 [Column('x', Integer(), table=None),
1729 Column('x', Integer(), table=None)]
1730 >>> cc["x"] is x1
1731 False
1732 >>> cc["x"] is x2
1733 True
1735 Or it can also mean the same column multiple times. These cases are
1736 supported as :class:`_expression.ColumnCollection`
1737 is used to represent the columns in
1738 a SELECT statement which may include duplicates.
1740 A special subclass :class:`.DedupeColumnCollection` exists which instead
1741 maintains SQLAlchemy's older behavior of not allowing duplicates; this
1742 collection is used for schema level objects like :class:`_schema.Table`
1743 and
1744 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The
1745 :class:`.DedupeColumnCollection` class also has additional mutation methods
1746 as the schema constructs have more use cases that require removal and
1747 replacement of columns.
1749 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection`
1750 now stores duplicate
1751 column keys as well as the same column in multiple positions. The
1752 :class:`.DedupeColumnCollection` class is added to maintain the
1753 former behavior in those cases where deduplication as well as
1754 additional replace/remove operations are needed.
1757 """
1759 __slots__ = ("_collection", "_index", "_colset", "_proxy_index")
1761 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]]
1762 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
1763 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]]
1764 _colset: Set[_COL_co]
1766 def __init__(
1767 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None
1768 ):
1769 object.__setattr__(self, "_colset", set())
1770 object.__setattr__(self, "_index", {})
1771 object.__setattr__(
1772 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1773 )
1774 object.__setattr__(self, "_collection", [])
1775 if columns:
1776 self._initial_populate(columns)
1778 @util.preload_module("sqlalchemy.sql.elements")
1779 def __clause_element__(self) -> ClauseList:
1780 elements = util.preloaded.sql_elements
1782 return elements.ClauseList(
1783 _literal_as_text_role=roles.ColumnsClauseRole,
1784 group=False,
1785 *self._all_columns,
1786 )
1788 def _initial_populate(
1789 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1790 ) -> None:
1791 self._populate_separate_keys(iter_)
1793 @property
1794 def _all_columns(self) -> List[_COL_co]:
1795 return [col for (_, col, _) in self._collection]
1797 def keys(self) -> List[_COLKEY]:
1798 """Return a sequence of string key names for all columns in this
1799 collection."""
1800 return [k for (k, _, _) in self._collection]
1802 def values(self) -> List[_COL_co]:
1803 """Return a sequence of :class:`_sql.ColumnClause` or
1804 :class:`_schema.Column` objects for all columns in this
1805 collection."""
1806 return [col for (_, col, _) in self._collection]
1808 def items(self) -> List[Tuple[_COLKEY, _COL_co]]:
1809 """Return a sequence of (key, column) tuples for all columns in this
1810 collection each consisting of a string key name and a
1811 :class:`_sql.ColumnClause` or
1812 :class:`_schema.Column` object.
1813 """
1815 return [(k, col) for (k, col, _) in self._collection]
1817 def __bool__(self) -> bool:
1818 return bool(self._collection)
1820 def __len__(self) -> int:
1821 return len(self._collection)
1823 def __iter__(self) -> Iterator[_COL_co]:
1824 # turn to a list first to maintain over a course of changes
1825 return iter([col for _, col, _ in self._collection])
1827 @overload
1828 def __getitem__(self, key: Union[str, int]) -> _COL_co: ...
1830 @overload
1831 def __getitem__(
1832 self, key: Tuple[Union[str, int], ...]
1833 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1835 @overload
1836 def __getitem__(
1837 self, key: slice
1838 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1840 def __getitem__(
1841 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]]
1842 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]:
1843 try:
1844 if isinstance(key, (tuple, slice)):
1845 if isinstance(key, slice):
1846 cols = (
1847 (sub_key, col)
1848 for (sub_key, col, _) in self._collection[key]
1849 )
1850 else:
1851 cols = (self._index[sub_key] for sub_key in key)
1853 return ColumnCollection(cols).as_readonly()
1854 else:
1855 return self._index[key][1]
1856 except KeyError as err:
1857 if isinstance(err.args[0], int):
1858 raise IndexError(err.args[0]) from err
1859 else:
1860 raise
1862 def __getattr__(self, key: str) -> _COL_co:
1863 try:
1864 return self._index[key][1]
1865 except KeyError as err:
1866 raise AttributeError(key) from err
1868 def __contains__(self, key: str) -> bool:
1869 if key not in self._index:
1870 if not isinstance(key, str):
1871 raise exc.ArgumentError(
1872 "__contains__ requires a string argument"
1873 )
1874 return False
1875 else:
1876 return True
1878 def compare(self, other: ColumnCollection[_COLKEY, _COL_co]) -> bool:
1879 """Compare this :class:`_expression.ColumnCollection` to another
1880 based on the names of the keys"""
1882 for l, r in zip_longest(self, other):
1883 if l is not r:
1884 return False
1885 else:
1886 return True
1888 def __eq__(self, other: Any) -> bool:
1889 return self.compare(other)
1891 @overload
1892 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ...
1894 @overload
1895 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ...
1897 def get(
1898 self, key: str, default: Optional[_COL] = None
1899 ) -> Optional[Union[_COL_co, _COL]]:
1900 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object
1901 based on a string key name from this
1902 :class:`_expression.ColumnCollection`."""
1904 if key in self._index:
1905 return self._index[key][1]
1906 else:
1907 return default
1909 def __str__(self) -> str:
1910 return "%s(%s)" % (
1911 self.__class__.__name__,
1912 ", ".join(str(c) for c in self),
1913 )
1915 def __setitem__(self, key: str, value: Any) -> NoReturn:
1916 raise NotImplementedError()
1918 def __delitem__(self, key: str) -> NoReturn:
1919 raise NotImplementedError()
1921 def __setattr__(self, key: str, obj: Any) -> NoReturn:
1922 raise NotImplementedError()
1924 def clear(self) -> NoReturn:
1925 """Dictionary clear() is not implemented for
1926 :class:`_sql.ColumnCollection`."""
1927 raise NotImplementedError()
1929 def remove(self, column: Any) -> NoReturn:
1930 raise NotImplementedError()
1932 def update(self, iter_: Any) -> NoReturn:
1933 """Dictionary update() is not implemented for
1934 :class:`_sql.ColumnCollection`."""
1935 raise NotImplementedError()
1937 # https://github.com/python/mypy/issues/4266
1938 __hash__: Optional[int] = None # type: ignore
1940 def _populate_separate_keys(
1941 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1942 ) -> None:
1943 """populate from an iterator of (key, column)"""
1945 self._collection[:] = collection = [
1946 (k, c, _ColumnMetrics(self, c)) for k, c in iter_
1947 ]
1948 self._colset.update(c._deannotate() for _, c, _ in collection)
1949 self._index.update(
1950 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)}
1951 )
1952 self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
1954 def add(
1955 self,
1956 column: ColumnElement[Any],
1957 key: Optional[_COLKEY] = None,
1958 ) -> None:
1959 """Add a column to this :class:`_sql.ColumnCollection`.
1961 .. note::
1963 This method is **not normally used by user-facing code**, as the
1964 :class:`_sql.ColumnCollection` is usually part of an existing
1965 object such as a :class:`_schema.Table`. To add a
1966 :class:`_schema.Column` to an existing :class:`_schema.Table`
1967 object, use the :meth:`_schema.Table.append_column` method.
1969 """
1970 colkey: _COLKEY
1972 if key is None:
1973 colkey = column.key # type: ignore
1974 else:
1975 colkey = key
1977 l = len(self._collection)
1979 # don't really know how this part is supposed to work w/ the
1980 # covariant thing
1982 _column = cast(_COL_co, column)
1984 self._collection.append(
1985 (colkey, _column, _ColumnMetrics(self, _column))
1986 )
1987 self._colset.add(_column._deannotate())
1989 self._index[l] = (colkey, _column)
1990 if colkey not in self._index:
1991 self._index[colkey] = (colkey, _column)
1993 def __getstate__(self) -> Dict[str, Any]:
1994 return {
1995 "_collection": [(k, c) for k, c, _ in self._collection],
1996 "_index": self._index,
1997 }
1999 def __setstate__(self, state: Dict[str, Any]) -> None:
2000 object.__setattr__(self, "_index", state["_index"])
2001 object.__setattr__(
2002 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
2003 )
2004 object.__setattr__(
2005 self,
2006 "_collection",
2007 [
2008 (k, c, _ColumnMetrics(self, c))
2009 for (k, c) in state["_collection"]
2010 ],
2011 )
2012 object.__setattr__(
2013 self, "_colset", {col for k, col, _ in self._collection}
2014 )
2016 def contains_column(self, col: ColumnElement[Any]) -> bool:
2017 """Checks if a column object exists in this collection"""
2018 if col not in self._colset:
2019 if isinstance(col, str):
2020 raise exc.ArgumentError(
2021 "contains_column cannot be used with string arguments. "
2022 "Use ``col_name in table.c`` instead."
2023 )
2024 return False
2025 else:
2026 return True
2028 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
2029 """Return a "read only" form of this
2030 :class:`_sql.ColumnCollection`."""
2032 return ReadOnlyColumnCollection(self)
2034 def _init_proxy_index(self) -> None:
2035 """populate the "proxy index", if empty.
2037 proxy index is added in 2.0 to provide more efficient operation
2038 for the corresponding_column() method.
2040 For reasons of both time to construct new .c collections as well as
2041 memory conservation for large numbers of large .c collections, the
2042 proxy_index is only filled if corresponding_column() is called. once
2043 filled it stays that way, and new _ColumnMetrics objects created after
2044 that point will populate it with new data. Note this case would be
2045 unusual, if not nonexistent, as it means a .c collection is being
2046 mutated after corresponding_column() were used, however it is tested in
2047 test/base/test_utils.py.
2049 """
2050 pi = self._proxy_index
2051 if pi:
2052 return
2054 for _, _, metrics in self._collection:
2055 eps = metrics.column._expanded_proxy_set
2057 for eps_col in eps:
2058 pi[eps_col].add(metrics)
2060 def corresponding_column(
2061 self, column: _COL, require_embedded: bool = False
2062 ) -> Optional[Union[_COL, _COL_co]]:
2063 """Given a :class:`_expression.ColumnElement`, return the exported
2064 :class:`_expression.ColumnElement` object from this
2065 :class:`_expression.ColumnCollection`
2066 which corresponds to that original :class:`_expression.ColumnElement`
2067 via a common
2068 ancestor column.
2070 :param column: the target :class:`_expression.ColumnElement`
2071 to be matched.
2073 :param require_embedded: only return corresponding columns for
2074 the given :class:`_expression.ColumnElement`, if the given
2075 :class:`_expression.ColumnElement`
2076 is actually present within a sub-element
2077 of this :class:`_expression.Selectable`.
2078 Normally the column will match if
2079 it merely shares a common ancestor with one of the exported
2080 columns of this :class:`_expression.Selectable`.
2082 .. seealso::
2084 :meth:`_expression.Selectable.corresponding_column`
2085 - invokes this method
2086 against the collection returned by
2087 :attr:`_expression.Selectable.exported_columns`.
2089 .. versionchanged:: 1.4 the implementation for ``corresponding_column``
2090 was moved onto the :class:`_expression.ColumnCollection` itself.
2092 """
2093 # TODO: cython candidate
2095 # don't dig around if the column is locally present
2096 if column in self._colset:
2097 return column
2099 selected_intersection, selected_metrics = None, None
2100 target_set = column.proxy_set
2102 pi = self._proxy_index
2103 if not pi:
2104 self._init_proxy_index()
2106 for current_metrics in (
2107 mm for ts in target_set if ts in pi for mm in pi[ts]
2108 ):
2109 if not require_embedded or current_metrics.embedded(target_set):
2110 if selected_metrics is None:
2111 # no corresponding column yet, pick this one.
2112 selected_metrics = current_metrics
2113 continue
2115 current_intersection = target_set.intersection(
2116 current_metrics.column._expanded_proxy_set
2117 )
2118 if selected_intersection is None:
2119 selected_intersection = target_set.intersection(
2120 selected_metrics.column._expanded_proxy_set
2121 )
2123 if len(current_intersection) > len(selected_intersection):
2124 # 'current' has a larger field of correspondence than
2125 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x
2126 # matches a1.c.x->table.c.x better than
2127 # selectable.c.x->table.c.x does.
2129 selected_metrics = current_metrics
2130 selected_intersection = current_intersection
2131 elif current_intersection == selected_intersection:
2132 # they have the same field of correspondence. see
2133 # which proxy_set has fewer columns in it, which
2134 # indicates a closer relationship with the root
2135 # column. Also take into account the "weight"
2136 # attribute which CompoundSelect() uses to give
2137 # higher precedence to columns based on vertical
2138 # position in the compound statement, and discard
2139 # columns that have no reference to the target
2140 # column (also occurs with CompoundSelect)
2142 selected_col_distance = sum(
2143 [
2144 sc._annotations.get("weight", 1)
2145 for sc in (
2146 selected_metrics.column._uncached_proxy_list()
2147 )
2148 if sc.shares_lineage(column)
2149 ],
2150 )
2151 current_col_distance = sum(
2152 [
2153 sc._annotations.get("weight", 1)
2154 for sc in (
2155 current_metrics.column._uncached_proxy_list()
2156 )
2157 if sc.shares_lineage(column)
2158 ],
2159 )
2160 if current_col_distance < selected_col_distance:
2161 selected_metrics = current_metrics
2162 selected_intersection = current_intersection
2164 return selected_metrics.column if selected_metrics else None
2167_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]")
2170class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
2171 """A :class:`_expression.ColumnCollection`
2172 that maintains deduplicating behavior.
2174 This is useful by schema level objects such as :class:`_schema.Table` and
2175 :class:`.PrimaryKeyConstraint`. The collection includes more
2176 sophisticated mutator methods as well to suit schema objects which
2177 require mutable column collections.
2179 .. versionadded:: 1.4
2181 """
2183 def add( # type: ignore[override]
2184 self,
2185 column: _NAMEDCOL,
2186 key: Optional[str] = None,
2187 *,
2188 index: Optional[int] = None,
2189 ) -> None:
2190 if key is not None and column.key != key:
2191 raise exc.ArgumentError(
2192 "DedupeColumnCollection requires columns be under "
2193 "the same key as their .key"
2194 )
2195 key = column.key
2197 if key is None:
2198 raise exc.ArgumentError(
2199 "Can't add unnamed column to column collection"
2200 )
2202 if key in self._index:
2203 existing = self._index[key][1]
2205 if existing is column:
2206 return
2208 self.replace(column, index=index)
2210 # pop out memoized proxy_set as this
2211 # operation may very well be occurring
2212 # in a _make_proxy operation
2213 util.memoized_property.reset(column, "proxy_set")
2214 else:
2215 self._append_new_column(key, column, index=index)
2217 def _append_new_column(
2218 self, key: str, named_column: _NAMEDCOL, *, index: Optional[int] = None
2219 ) -> None:
2220 collection_length = len(self._collection)
2222 if index is None:
2223 l = collection_length
2224 else:
2225 if index < 0:
2226 index = max(0, collection_length + index)
2227 l = index
2229 if index is None:
2230 self._collection.append(
2231 (key, named_column, _ColumnMetrics(self, named_column))
2232 )
2233 else:
2234 self._collection.insert(
2235 index, (key, named_column, _ColumnMetrics(self, named_column))
2236 )
2238 self._colset.add(named_column._deannotate())
2240 if index is not None:
2241 for idx in reversed(range(index, collection_length)):
2242 self._index[idx + 1] = self._index[idx]
2244 self._index[l] = (key, named_column)
2245 self._index[key] = (key, named_column)
2247 def _populate_separate_keys(
2248 self, iter_: Iterable[Tuple[str, _NAMEDCOL]]
2249 ) -> None:
2250 """populate from an iterator of (key, column)"""
2251 cols = list(iter_)
2253 replace_col = []
2254 for k, col in cols:
2255 if col.key != k:
2256 raise exc.ArgumentError(
2257 "DedupeColumnCollection requires columns be under "
2258 "the same key as their .key"
2259 )
2260 if col.name in self._index and col.key != col.name:
2261 replace_col.append(col)
2262 elif col.key in self._index:
2263 replace_col.append(col)
2264 else:
2265 self._index[k] = (k, col)
2266 self._collection.append((k, col, _ColumnMetrics(self, col)))
2267 self._colset.update(c._deannotate() for (k, c, _) in self._collection)
2269 self._index.update(
2270 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection)
2271 )
2272 for col in replace_col:
2273 self.replace(col)
2275 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None:
2276 self._populate_separate_keys((col.key, col) for col in iter_)
2278 def remove(self, column: _NAMEDCOL) -> None: # type: ignore[override]
2279 if column not in self._colset:
2280 raise ValueError(
2281 "Can't remove column %r; column is not in this collection"
2282 % column
2283 )
2284 del self._index[column.key]
2285 self._colset.remove(column)
2286 self._collection[:] = [
2287 (k, c, metrics)
2288 for (k, c, metrics) in self._collection
2289 if c is not column
2290 ]
2291 for metrics in self._proxy_index.get(column, ()):
2292 metrics.dispose(self)
2294 self._index.update(
2295 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2296 )
2297 # delete higher index
2298 del self._index[len(self._collection)]
2300 def replace(
2301 self,
2302 column: _NAMEDCOL,
2303 *,
2304 extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
2305 index: Optional[int] = None,
2306 ) -> None:
2307 """add the given column to this collection, removing unaliased
2308 versions of this column as well as existing columns with the
2309 same key.
2311 e.g.::
2313 t = Table("sometable", metadata, Column("col1", Integer))
2314 t.columns.replace(Column("col1", Integer, key="columnone"))
2316 will remove the original 'col1' from the collection, and add
2317 the new column under the name 'columnname'.
2319 Used by schema.Column to override columns during table reflection.
2321 """
2323 if extra_remove:
2324 remove_col = set(extra_remove)
2325 else:
2326 remove_col = set()
2327 # remove up to two columns based on matches of name as well as key
2328 if column.name in self._index and column.key != column.name:
2329 other = self._index[column.name][1]
2330 if other.name == other.key:
2331 remove_col.add(other)
2333 if column.key in self._index:
2334 remove_col.add(self._index[column.key][1])
2336 if not remove_col:
2337 self._append_new_column(column.key, column, index=index)
2338 return
2339 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
2340 replace_index = None
2342 for idx, (k, col, metrics) in enumerate(self._collection):
2343 if col in remove_col:
2344 if replace_index is None:
2345 replace_index = idx
2346 new_cols.append(
2347 (column.key, column, _ColumnMetrics(self, column))
2348 )
2349 else:
2350 new_cols.append((k, col, metrics))
2352 if remove_col:
2353 self._colset.difference_update(remove_col)
2355 for rc in remove_col:
2356 for metrics in self._proxy_index.get(rc, ()):
2357 metrics.dispose(self)
2359 if replace_index is None:
2360 if index is not None:
2361 new_cols.insert(
2362 index, (column.key, column, _ColumnMetrics(self, column))
2363 )
2365 else:
2366 new_cols.append(
2367 (column.key, column, _ColumnMetrics(self, column))
2368 )
2369 elif index is not None:
2370 to_move = new_cols[replace_index]
2371 effective_positive_index = (
2372 index if index >= 0 else max(0, len(new_cols) + index)
2373 )
2374 new_cols.insert(index, to_move)
2375 if replace_index > effective_positive_index:
2376 del new_cols[replace_index + 1]
2377 else:
2378 del new_cols[replace_index]
2380 self._colset.add(column._deannotate())
2381 self._collection[:] = new_cols
2383 self._index.clear()
2385 self._index.update(
2386 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2387 )
2388 self._index.update({k: (k, col) for (k, col, _) in self._collection})
2391class ReadOnlyColumnCollection(
2392 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co]
2393):
2394 __slots__ = ("_parent",)
2396 def __init__(self, collection: ColumnCollection[_COLKEY, _COL_co]):
2397 object.__setattr__(self, "_parent", collection)
2398 object.__setattr__(self, "_colset", collection._colset)
2399 object.__setattr__(self, "_index", collection._index)
2400 object.__setattr__(self, "_collection", collection._collection)
2401 object.__setattr__(self, "_proxy_index", collection._proxy_index)
2403 def __getstate__(self) -> Dict[str, _COL_co]:
2404 return {"_parent": self._parent}
2406 def __setstate__(self, state: Dict[str, Any]) -> None:
2407 parent = state["_parent"]
2408 self.__init__(parent) # type: ignore
2410 def add(self, column: Any, key: Any = ...) -> Any:
2411 self._readonly()
2413 def extend(self, elements: Any) -> NoReturn:
2414 self._readonly()
2416 def remove(self, item: Any) -> NoReturn:
2417 self._readonly()
2420class ColumnSet(util.OrderedSet["ColumnClause[Any]"]):
2421 def contains_column(self, col: ColumnClause[Any]) -> bool:
2422 return col in self
2424 def extend(self, cols: Iterable[Any]) -> None:
2425 for col in cols:
2426 self.add(col)
2428 def __eq__(self, other):
2429 l = []
2430 for c in other:
2431 for local in self:
2432 if c.shares_lineage(local):
2433 l.append(c == local)
2434 return elements.and_(*l)
2436 def __hash__(self) -> int: # type: ignore[override]
2437 return hash(tuple(x for x in self))
2440def _entity_namespace(
2441 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2442) -> _EntityNamespace:
2443 """Return the nearest .entity_namespace for the given entity.
2445 If not immediately available, does an iterate to find a sub-element
2446 that has one, if any.
2448 """
2449 try:
2450 return cast(_HasEntityNamespace, entity).entity_namespace
2451 except AttributeError:
2452 for elem in visitors.iterate(cast(ExternallyTraversible, entity)):
2453 if _is_has_entity_namespace(elem):
2454 return elem.entity_namespace
2455 else:
2456 raise
2459@overload
2460def _entity_namespace_key(
2461 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2462 key: str,
2463) -> SQLCoreOperations[Any]: ...
2466@overload
2467def _entity_namespace_key(
2468 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2469 key: str,
2470 default: _NoArg,
2471) -> SQLCoreOperations[Any]: ...
2474@overload
2475def _entity_namespace_key(
2476 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2477 key: str,
2478 default: _T,
2479) -> Union[SQLCoreOperations[Any], _T]: ...
2482def _entity_namespace_key(
2483 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2484 key: str,
2485 default: Union[SQLCoreOperations[Any], _T, _NoArg] = NO_ARG,
2486) -> Union[SQLCoreOperations[Any], _T]:
2487 """Return an entry from an entity_namespace.
2490 Raises :class:`_exc.InvalidRequestError` rather than attribute error
2491 on not found.
2493 """
2495 try:
2496 ns = _entity_namespace(entity)
2497 if default is not NO_ARG:
2498 return getattr(ns, key, default)
2499 else:
2500 return getattr(ns, key) # type: ignore
2501 except AttributeError as err:
2502 raise exc.InvalidRequestError(
2503 'Entity namespace for "%s" has no property "%s"' % (entity, key)
2504 ) from err