Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/base.py: 49%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# sql/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
9"""Foundational utilities common to many sql modules."""
12from __future__ import annotations
14import collections
15from enum import Enum
16import itertools
17from itertools import zip_longest
18import operator
19import re
20from typing import Any
21from typing import Callable
22from typing import cast
23from typing import Dict
24from typing import Final
25from typing import FrozenSet
26from typing import Generator
27from typing import Generic
28from typing import Iterable
29from typing import Iterator
30from typing import List
31from typing import Mapping
32from typing import MutableMapping
33from typing import NamedTuple
34from typing import NoReturn
35from typing import Optional
36from typing import overload
37from typing import Protocol
38from typing import Sequence
39from typing import Set
40from typing import Tuple
41from typing import Type
42from typing import TYPE_CHECKING
43from typing import TypeGuard
44from typing import TypeVar
45from typing import Union
47from . import roles
48from . import visitors
49from .cache_key import HasCacheKey # noqa
50from .cache_key import MemoizedHasCacheKey # noqa
51from .traversals import HasCopyInternals # noqa
52from .visitors import ClauseVisitor
53from .visitors import ExtendedInternalTraversal
54from .visitors import ExternallyTraversible
55from .visitors import InternalTraversal
56from .. import event
57from .. import exc
58from .. import util
59from ..util import EMPTY_DICT
60from ..util import HasMemoized as HasMemoized
61from ..util import hybridmethod
62from ..util.typing import Self
63from ..util.typing import TypeVarTuple
64from ..util.typing import Unpack
66if TYPE_CHECKING:
67 from . import coercions
68 from . import elements
69 from . import type_api
70 from ._orm_types import DMLStrategyArgument
71 from ._orm_types import SynchronizeSessionArgument
72 from ._typing import _CLE
73 from .cache_key import CacheKey
74 from .compiler import SQLCompiler
75 from .dml import Delete
76 from .dml import Insert
77 from .dml import Update
78 from .elements import BindParameter
79 from .elements import ClauseElement
80 from .elements import ClauseList
81 from .elements import ColumnClause # noqa
82 from .elements import ColumnElement
83 from .elements import NamedColumn
84 from .elements import SQLCoreOperations
85 from .elements import TextClause
86 from .schema import Column
87 from .schema import DefaultGenerator
88 from .selectable import _JoinTargetElement
89 from .selectable import _SelectIterable
90 from .selectable import FromClause
91 from .selectable import Select
92 from .visitors import anon_map
93 from ..engine import Connection
94 from ..engine import CursorResult
95 from ..engine.interfaces import _CoreMultiExecuteParams
96 from ..engine.interfaces import _CoreSingleExecuteParams
97 from ..engine.interfaces import _ExecuteOptions
98 from ..engine.interfaces import _ImmutableExecuteOptions
99 from ..engine.interfaces import CacheStats
100 from ..engine.interfaces import Compiled
101 from ..engine.interfaces import CompiledCacheType
102 from ..engine.interfaces import CoreExecuteOptionsParameter
103 from ..engine.interfaces import Dialect
104 from ..engine.interfaces import IsolationLevel
105 from ..engine.interfaces import SchemaTranslateMapType
106 from ..event import dispatcher
108if not TYPE_CHECKING:
109 coercions = None # noqa
110 elements = None # noqa
111 type_api = None # noqa
114_Ts = TypeVarTuple("_Ts")
117class _NoArg(Enum):
118 NO_ARG = 0
120 def __repr__(self):
121 return f"_NoArg.{self.name}"
124NO_ARG: Final = _NoArg.NO_ARG
127class _NoneName(Enum):
128 NONE_NAME = 0
129 """indicate a 'deferred' name that was ultimately the value None."""
132_NONE_NAME: Final = _NoneName.NONE_NAME
134_T = TypeVar("_T", bound=Any)
136_Fn = TypeVar("_Fn", bound=Callable[..., Any])
138_AmbiguousTableNameMap = MutableMapping[str, str]
141class _DefaultDescriptionTuple(NamedTuple):
142 arg: Any
143 is_scalar: Optional[bool]
144 is_callable: Optional[bool]
145 is_sentinel: Optional[bool]
147 @classmethod
148 def _from_column_default(
149 cls, default: Optional[DefaultGenerator]
150 ) -> _DefaultDescriptionTuple:
151 return (
152 _DefaultDescriptionTuple(
153 default.arg, # type: ignore
154 default.is_scalar,
155 default.is_callable,
156 default.is_sentinel,
157 )
158 if default
159 and (
160 default.has_arg
161 or (not default.for_update and default.is_sentinel)
162 )
163 else _DefaultDescriptionTuple(None, None, None, None)
164 )
167_never_select_column: operator.attrgetter[Any] = operator.attrgetter(
168 "_omit_from_statements"
169)
172class _EntityNamespace(Protocol):
173 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ...
176class _HasEntityNamespace(Protocol):
177 @util.ro_non_memoized_property
178 def entity_namespace(self) -> _EntityNamespace: ...
181def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]:
182 return hasattr(element, "entity_namespace")
185# Remove when https://github.com/python/mypy/issues/14640 will be fixed
186_Self = TypeVar("_Self", bound=Any)
189class Immutable:
190 """mark a ClauseElement as 'immutable' when expressions are cloned.
192 "immutable" objects refers to the "mutability" of an object in the
193 context of SQL DQL and DML generation. Such as, in DQL, one can
194 compose a SELECT or subquery of varied forms, but one cannot modify
195 the structure of a specific table or column within DQL.
196 :class:`.Immutable` is mostly intended to follow this concept, and as
197 such the primary "immutable" objects are :class:`.ColumnClause`,
198 :class:`.Column`, :class:`.TableClause`, :class:`.Table`.
200 """
202 __slots__ = ()
204 _is_immutable: bool = True
206 def unique_params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
207 raise NotImplementedError("Immutable objects do not support copying")
209 def params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
210 raise NotImplementedError("Immutable objects do not support copying")
212 def _clone(self: _Self, **kw: Any) -> _Self:
213 return self
215 def _copy_internals(
216 self, *, omit_attrs: Iterable[str] = (), **kw: Any
217 ) -> None:
218 pass
221class SingletonConstant(Immutable):
222 """Represent SQL constants like NULL, TRUE, FALSE"""
224 _is_singleton_constant: bool = True
226 _singleton: SingletonConstant
228 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T:
229 return cast(_T, cls._singleton)
231 @util.non_memoized_property
232 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
233 raise NotImplementedError()
235 @classmethod
236 def _create_singleton(cls) -> None:
237 obj = object.__new__(cls)
238 obj.__init__() # type: ignore
240 # for a long time this was an empty frozenset, meaning
241 # a SingletonConstant would never be a "corresponding column" in
242 # a statement. This referred to #6259. However, in #7154 we see
243 # that we do in fact need "correspondence" to work when matching cols
244 # in result sets, so the non-correspondence was moved to a more
245 # specific level when we are actually adapting expressions for SQL
246 # render only.
247 obj.proxy_set = frozenset([obj])
248 cls._singleton = obj
251def _from_objects(
252 *elements: Union[
253 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement
254 ]
255) -> Iterator[FromClause]:
256 return itertools.chain.from_iterable(
257 [element._from_objects for element in elements]
258 )
261def _select_iterables(
262 elements: Iterable[roles.ColumnsClauseRole],
263) -> _SelectIterable:
264 """expand tables into individual columns in the
265 given list of column expressions.
267 """
268 return itertools.chain.from_iterable(
269 [c._select_iterable for c in elements]
270 )
273_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType")
276class _GenerativeType(Protocol):
277 def _generate(self) -> Self: ...
280def _generative(fn: _Fn) -> _Fn:
281 """non-caching _generative() decorator.
283 This is basically the legacy decorator that copies the object and
284 runs a method on the new copy.
286 """
288 @util.decorator
289 def _generative(
290 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any
291 ) -> _SelfGenerativeType:
292 """Mark a method as generative."""
294 self = self._generate()
295 x = fn(self, *args, **kw)
296 assert x is self, "generative methods must return self"
297 return self
299 decorated = _generative(fn)
300 decorated.non_generative = fn # type: ignore
301 return decorated
304def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]:
305 msgs: Dict[str, str] = kw.pop("msgs", {})
307 defaults: Dict[str, str] = kw.pop("defaults", {})
309 getters: List[Tuple[str, operator.attrgetter[Any], Optional[str]]] = [
310 (name, operator.attrgetter(name), defaults.get(name, None))
311 for name in names
312 ]
314 @util.decorator
315 def check(fn: _Fn, *args: Any, **kw: Any) -> Any:
316 # make pylance happy by not including "self" in the argument
317 # list
318 self = args[0]
319 args = args[1:]
320 for name, getter, default_ in getters:
321 if getter(self) is not default_:
322 msg = msgs.get(
323 name,
324 "Method %s() has already been invoked on this %s construct"
325 % (fn.__name__, self.__class__),
326 )
327 raise exc.InvalidRequestError(msg)
328 return fn(self, *args, **kw)
330 return check
333def _clone(element, **kw):
334 return element._clone(**kw)
337def _expand_cloned(
338 elements: Iterable[_CLE],
339) -> Iterable[_CLE]:
340 """expand the given set of ClauseElements to be the set of all 'cloned'
341 predecessors.
343 """
344 # TODO: cython candidate
345 return itertools.chain(*[x._cloned_set for x in elements])
348def _de_clone(
349 elements: Iterable[_CLE],
350) -> Iterable[_CLE]:
351 for x in elements:
352 while x._is_clone_of is not None:
353 x = x._is_clone_of
354 yield x
357def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
358 """return the intersection of sets a and b, counting
359 any overlap between 'cloned' predecessors.
361 The returned set is in terms of the entities present within 'a'.
363 """
364 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
365 _expand_cloned(b)
366 )
367 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)}
370def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
371 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
372 _expand_cloned(b)
373 )
374 return {
375 elem for elem in a if not all_overlap.intersection(elem._cloned_set)
376 }
379class _DialectArgView(MutableMapping[str, Any]):
380 """A dictionary view of dialect-level arguments in the form
381 <dialectname>_<argument_name>.
383 """
385 __slots__ = ("obj",)
387 def __init__(self, obj: DialectKWArgs) -> None:
388 self.obj = obj
390 def _key(self, key: str) -> Tuple[str, str]:
391 try:
392 dialect, value_key = key.split("_", 1)
393 except ValueError as err:
394 raise KeyError(key) from err
395 else:
396 return dialect, value_key
398 def __getitem__(self, key: str) -> Any:
399 dialect, value_key = self._key(key)
401 try:
402 opt = self.obj.dialect_options[dialect]
403 except exc.NoSuchModuleError as err:
404 raise KeyError(key) from err
405 else:
406 return opt[value_key]
408 def __setitem__(self, key: str, value: Any) -> None:
409 try:
410 dialect, value_key = self._key(key)
411 except KeyError as err:
412 raise exc.ArgumentError(
413 "Keys must be of the form <dialectname>_<argname>"
414 ) from err
415 else:
416 self.obj.dialect_options[dialect][value_key] = value
418 def __delitem__(self, key: str) -> None:
419 dialect, value_key = self._key(key)
420 del self.obj.dialect_options[dialect][value_key]
422 def __len__(self) -> int:
423 return sum(
424 len(args._non_defaults)
425 for args in self.obj.dialect_options.values()
426 )
428 def __iter__(self) -> Generator[str, None, None]:
429 return (
430 "%s_%s" % (dialect_name, value_name)
431 for dialect_name in self.obj.dialect_options
432 for value_name in self.obj.dialect_options[
433 dialect_name
434 ]._non_defaults
435 )
438class _DialectArgDict(MutableMapping[str, Any]):
439 """A dictionary view of dialect-level arguments for a specific
440 dialect.
442 Maintains a separate collection of user-specified arguments
443 and dialect-specified default arguments.
445 """
447 def __init__(self) -> None:
448 self._non_defaults: Dict[str, Any] = {}
449 self._defaults: Dict[str, Any] = {}
451 def __len__(self) -> int:
452 return len(set(self._non_defaults).union(self._defaults))
454 def __iter__(self) -> Iterator[str]:
455 return iter(set(self._non_defaults).union(self._defaults))
457 def __getitem__(self, key: str) -> Any:
458 if key in self._non_defaults:
459 return self._non_defaults[key]
460 else:
461 return self._defaults[key]
463 def __setitem__(self, key: str, value: Any) -> None:
464 self._non_defaults[key] = value
466 def __delitem__(self, key: str) -> None:
467 del self._non_defaults[key]
470@util.preload_module("sqlalchemy.dialects")
471def _kw_reg_for_dialect(dialect_name: str) -> Optional[Dict[Any, Any]]:
472 dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
473 if dialect_cls.construct_arguments is None:
474 return None
475 return dict(dialect_cls.construct_arguments)
478class DialectKWArgs:
479 """Establish the ability for a class to have dialect-specific arguments
480 with defaults and constructor validation.
482 The :class:`.DialectKWArgs` interacts with the
483 :attr:`.DefaultDialect.construct_arguments` present on a dialect.
485 .. seealso::
487 :attr:`.DefaultDialect.construct_arguments`
489 """
491 __slots__ = ()
493 _dialect_kwargs_traverse_internals: List[Tuple[str, Any]] = [
494 ("dialect_options", InternalTraversal.dp_dialect_options)
495 ]
497 @classmethod
498 def argument_for(
499 cls, dialect_name: str, argument_name: str, default: Any
500 ) -> None:
501 """Add a new kind of dialect-specific keyword argument for this class.
503 E.g.::
505 Index.argument_for("mydialect", "length", None)
507 some_index = Index("a", "b", mydialect_length=5)
509 The :meth:`.DialectKWArgs.argument_for` method is a per-argument
510 way adding extra arguments to the
511 :attr:`.DefaultDialect.construct_arguments` dictionary. This
512 dictionary provides a list of argument names accepted by various
513 schema-level constructs on behalf of a dialect.
515 New dialects should typically specify this dictionary all at once as a
516 data member of the dialect class. The use case for ad-hoc addition of
517 argument names is typically for end-user code that is also using
518 a custom compilation scheme which consumes the additional arguments.
520 :param dialect_name: name of a dialect. The dialect must be
521 locatable, else a :class:`.NoSuchModuleError` is raised. The
522 dialect must also include an existing
523 :attr:`.DefaultDialect.construct_arguments` collection, indicating
524 that it participates in the keyword-argument validation and default
525 system, else :class:`.ArgumentError` is raised. If the dialect does
526 not include this collection, then any keyword argument can be
527 specified on behalf of this dialect already. All dialects packaged
528 within SQLAlchemy include this collection, however for third party
529 dialects, support may vary.
531 :param argument_name: name of the parameter.
533 :param default: default value of the parameter.
535 """
537 construct_arg_dictionary: Optional[Dict[Any, Any]] = (
538 DialectKWArgs._kw_registry[dialect_name]
539 )
540 if construct_arg_dictionary is None:
541 raise exc.ArgumentError(
542 "Dialect '%s' does have keyword-argument "
543 "validation and defaults enabled configured" % dialect_name
544 )
545 if cls not in construct_arg_dictionary:
546 construct_arg_dictionary[cls] = {}
547 construct_arg_dictionary[cls][argument_name] = default
549 @property
550 def dialect_kwargs(self) -> _DialectArgView:
551 """A collection of keyword arguments specified as dialect-specific
552 options to this construct.
554 The arguments are present here in their original ``<dialect>_<kwarg>``
555 format. Only arguments that were actually passed are included;
556 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which
557 contains all options known by this dialect including defaults.
559 The collection is also writable; keys are accepted of the
560 form ``<dialect>_<kwarg>`` where the value will be assembled
561 into the list of options.
563 .. seealso::
565 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form
567 """
568 return _DialectArgView(self)
570 @property
571 def kwargs(self) -> _DialectArgView:
572 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
573 return self.dialect_kwargs
575 _kw_registry: util.PopulateDict[str, Optional[Dict[Any, Any]]] = (
576 util.PopulateDict(_kw_reg_for_dialect)
577 )
579 @classmethod
580 def _kw_reg_for_dialect_cls(cls, dialect_name: str) -> _DialectArgDict:
581 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
582 d = _DialectArgDict()
584 if construct_arg_dictionary is None:
585 d._defaults.update({"*": None})
586 else:
587 for cls in reversed(cls.__mro__):
588 if cls in construct_arg_dictionary:
589 d._defaults.update(construct_arg_dictionary[cls])
590 return d
592 @util.memoized_property
593 def dialect_options(self) -> util.PopulateDict[str, _DialectArgDict]:
594 """A collection of keyword arguments specified as dialect-specific
595 options to this construct.
597 This is a two-level nested registry, keyed to ``<dialect_name>``
598 and ``<argument_name>``. For example, the ``postgresql_where``
599 argument would be locatable as::
601 arg = my_object.dialect_options["postgresql"]["where"]
603 .. versionadded:: 0.9.2
605 .. seealso::
607 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form
609 """
611 return util.PopulateDict(self._kw_reg_for_dialect_cls)
613 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None:
614 # validate remaining kwargs that they all specify DB prefixes
616 if not kwargs:
617 return
619 for k in kwargs:
620 m = re.match("^(.+?)_(.+)$", k)
621 if not m:
622 raise TypeError(
623 "Additional arguments should be "
624 "named <dialectname>_<argument>, got '%s'" % k
625 )
626 dialect_name, arg_name = m.group(1, 2)
628 try:
629 construct_arg_dictionary = self.dialect_options[dialect_name]
630 except exc.NoSuchModuleError:
631 util.warn(
632 "Can't validate argument %r; can't "
633 "locate any SQLAlchemy dialect named %r"
634 % (k, dialect_name)
635 )
636 self.dialect_options[dialect_name] = d = _DialectArgDict()
637 d._defaults.update({"*": None})
638 d._non_defaults[arg_name] = kwargs[k]
639 else:
640 if (
641 "*" not in construct_arg_dictionary
642 and arg_name not in construct_arg_dictionary
643 ):
644 raise exc.ArgumentError(
645 "Argument %r is not accepted by "
646 "dialect %r on behalf of %r"
647 % (k, dialect_name, self.__class__)
648 )
649 else:
650 construct_arg_dictionary[arg_name] = kwargs[k]
653class CompileState:
654 """Produces additional object state necessary for a statement to be
655 compiled.
657 the :class:`.CompileState` class is at the base of classes that assemble
658 state for a particular statement object that is then used by the
659 compiler. This process is essentially an extension of the process that
660 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis
661 on converting raw user intent into more organized structures rather than
662 producing string output. The top-level :class:`.CompileState` for the
663 statement being executed is also accessible when the execution context
664 works with invoking the statement and collecting results.
666 The production of :class:`.CompileState` is specific to the compiler, such
667 as within the :meth:`.SQLCompiler.visit_insert`,
668 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also
669 responsible for associating the :class:`.CompileState` with the
670 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement,
671 i.e. the outermost SQL statement that's actually being executed.
672 There can be other :class:`.CompileState` objects that are not the
673 toplevel, such as when a SELECT subquery or CTE-nested
674 INSERT/UPDATE/DELETE is generated.
676 .. versionadded:: 1.4
678 """
680 __slots__ = ("statement", "_ambiguous_table_name_map")
682 plugins: Dict[Tuple[str, str], Type[CompileState]] = {}
684 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap]
686 @classmethod
687 def create_for_statement(
688 cls, statement: Executable, compiler: SQLCompiler, **kw: Any
689 ) -> CompileState:
690 # factory construction.
692 if statement._propagate_attrs:
693 plugin_name = statement._propagate_attrs.get(
694 "compile_state_plugin", "default"
695 )
696 klass = cls.plugins.get(
697 (plugin_name, statement._effective_plugin_target), None
698 )
699 if klass is None:
700 klass = cls.plugins[
701 ("default", statement._effective_plugin_target)
702 ]
704 else:
705 klass = cls.plugins[
706 ("default", statement._effective_plugin_target)
707 ]
709 if klass is cls:
710 return cls(statement, compiler, **kw)
711 else:
712 return klass.create_for_statement(statement, compiler, **kw)
714 def __init__(self, statement, compiler, **kw):
715 self.statement = statement
717 @classmethod
718 def get_plugin_class(
719 cls, statement: Executable
720 ) -> Optional[Type[CompileState]]:
721 plugin_name = statement._propagate_attrs.get(
722 "compile_state_plugin", None
723 )
725 if plugin_name:
726 key = (plugin_name, statement._effective_plugin_target)
727 if key in cls.plugins:
728 return cls.plugins[key]
730 # there's no case where we call upon get_plugin_class() and want
731 # to get None back, there should always be a default. return that
732 # if there was no plugin-specific class (e.g. Insert with "orm"
733 # plugin)
734 try:
735 return cls.plugins[("default", statement._effective_plugin_target)]
736 except KeyError:
737 return None
739 @classmethod
740 def _get_plugin_class_for_plugin(
741 cls, statement: Executable, plugin_name: str
742 ) -> Optional[Type[CompileState]]:
743 try:
744 return cls.plugins[
745 (plugin_name, statement._effective_plugin_target)
746 ]
747 except KeyError:
748 return None
750 @classmethod
751 def plugin_for(
752 cls, plugin_name: str, visit_name: str
753 ) -> Callable[[_Fn], _Fn]:
754 def decorate(cls_to_decorate):
755 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate
756 return cls_to_decorate
758 return decorate
761class Generative(HasMemoized):
762 """Provide a method-chaining pattern in conjunction with the
763 @_generative decorator."""
765 def _generate(self) -> Self:
766 skip = self._memoized_keys
767 cls = self.__class__
768 s = cls.__new__(cls)
769 if skip:
770 # ensure this iteration remains atomic
771 s.__dict__ = {
772 k: v for k, v in self.__dict__.copy().items() if k not in skip
773 }
774 else:
775 s.__dict__ = self.__dict__.copy()
776 return s
779class InPlaceGenerative(HasMemoized):
780 """Provide a method-chaining pattern in conjunction with the
781 @_generative decorator that mutates in place."""
783 __slots__ = ()
785 def _generate(self) -> Self:
786 skip = self._memoized_keys
787 # note __dict__ needs to be in __slots__ if this is used
788 for k in skip:
789 self.__dict__.pop(k, None)
790 return self
793class HasCompileState(Generative):
794 """A class that has a :class:`.CompileState` associated with it."""
796 _compile_state_plugin: Optional[Type[CompileState]] = None
798 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT
800 _compile_state_factory = CompileState.create_for_statement
803class _MetaOptions(type):
804 """metaclass for the Options class.
806 This metaclass is actually necessary despite the availability of the
807 ``__init_subclass__()`` hook as this type also provides custom class-level
808 behavior for the ``__add__()`` method.
810 """
812 _cache_attrs: Tuple[str, ...]
814 def __add__(self, other):
815 o1 = self()
817 if set(other).difference(self._cache_attrs):
818 raise TypeError(
819 "dictionary contains attributes not covered by "
820 "Options class %s: %r"
821 % (self, set(other).difference(self._cache_attrs))
822 )
824 o1.__dict__.update(other)
825 return o1
827 if TYPE_CHECKING:
829 def __getattr__(self, key: str) -> Any: ...
831 def __setattr__(self, key: str, value: Any) -> None: ...
833 def __delattr__(self, key: str) -> None: ...
836class Options(metaclass=_MetaOptions):
837 """A cacheable option dictionary with defaults."""
839 __slots__ = ()
841 _cache_attrs: Tuple[str, ...]
843 def __init_subclass__(cls) -> None:
844 dict_ = cls.__dict__
845 cls._cache_attrs = tuple(
846 sorted(
847 d
848 for d in dict_
849 if not d.startswith("__")
850 and d not in ("_cache_key_traversal",)
851 )
852 )
853 super().__init_subclass__()
855 def __init__(self, **kw: Any) -> None:
856 self.__dict__.update(kw)
858 def __add__(self, other):
859 o1 = self.__class__.__new__(self.__class__)
860 o1.__dict__.update(self.__dict__)
862 if set(other).difference(self._cache_attrs):
863 raise TypeError(
864 "dictionary contains attributes not covered by "
865 "Options class %s: %r"
866 % (self, set(other).difference(self._cache_attrs))
867 )
869 o1.__dict__.update(other)
870 return o1
872 def __eq__(self, other):
873 # TODO: very inefficient. This is used only in test suites
874 # right now.
875 for a, b in zip_longest(self._cache_attrs, other._cache_attrs):
876 if getattr(self, a) != getattr(other, b):
877 return False
878 return True
880 def __repr__(self) -> str:
881 # TODO: fairly inefficient, used only in debugging right now.
883 return "%s(%s)" % (
884 self.__class__.__name__,
885 ", ".join(
886 "%s=%r" % (k, self.__dict__[k])
887 for k in self._cache_attrs
888 if k in self.__dict__
889 ),
890 )
892 @classmethod
893 def isinstance(cls, klass: Type[Any]) -> bool:
894 return issubclass(cls, klass)
896 @hybridmethod
897 def add_to_element(self, name: str, value: str) -> Any:
898 return self + {name: getattr(self, name) + value}
900 @hybridmethod
901 def _state_dict_inst(self) -> Mapping[str, Any]:
902 return self.__dict__
904 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT
906 @_state_dict_inst.classlevel
907 def _state_dict(cls) -> Mapping[str, Any]:
908 return cls._state_dict_const
910 @classmethod
911 def safe_merge(cls, other: "Options") -> Any:
912 d = other._state_dict()
914 # only support a merge with another object of our class
915 # and which does not have attrs that we don't. otherwise
916 # we risk having state that might not be part of our cache
917 # key strategy
919 if (
920 cls is not other.__class__
921 and other._cache_attrs
922 and set(other._cache_attrs).difference(cls._cache_attrs)
923 ):
924 raise TypeError(
925 "other element %r is not empty, is not of type %s, "
926 "and contains attributes not covered here %r"
927 % (
928 other,
929 cls,
930 set(other._cache_attrs).difference(cls._cache_attrs),
931 )
932 )
933 return cls + d
935 @classmethod
936 def from_execution_options(
937 cls,
938 key: str,
939 attrs: set[str],
940 exec_options: Mapping[str, Any],
941 statement_exec_options: Mapping[str, Any],
942 ) -> Tuple["Options", Mapping[str, Any]]:
943 """process Options argument in terms of execution options.
946 e.g.::
948 (
949 load_options,
950 execution_options,
951 ) = QueryContext.default_load_options.from_execution_options(
952 "_sa_orm_load_options",
953 {"populate_existing", "autoflush", "yield_per"},
954 execution_options,
955 statement._execution_options,
956 )
958 get back the Options and refresh "_sa_orm_load_options" in the
959 exec options dict w/ the Options as well
961 """
963 # common case is that no options we are looking for are
964 # in either dictionary, so cancel for that first
965 check_argnames = attrs.intersection(
966 set(exec_options).union(statement_exec_options)
967 )
969 existing_options = exec_options.get(key, cls)
971 if check_argnames:
972 result = {}
973 for argname in check_argnames:
974 local = "_" + argname
975 if argname in exec_options:
976 result[local] = exec_options[argname]
977 elif argname in statement_exec_options:
978 result[local] = statement_exec_options[argname]
980 new_options = existing_options + result
981 exec_options = util.immutabledict().merge_with(
982 exec_options, {key: new_options}
983 )
984 return new_options, exec_options
986 else:
987 return existing_options, exec_options
989 if TYPE_CHECKING:
991 def __getattr__(self, key: str) -> Any: ...
993 def __setattr__(self, key: str, value: Any) -> None: ...
995 def __delattr__(self, key: str) -> None: ...
998class CacheableOptions(Options, HasCacheKey):
999 __slots__ = ()
1001 @hybridmethod
1002 def _gen_cache_key_inst(
1003 self, anon_map: Any, bindparams: List[BindParameter[Any]]
1004 ) -> Optional[Tuple[Any]]:
1005 return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
1007 @_gen_cache_key_inst.classlevel
1008 def _gen_cache_key(
1009 cls, anon_map: "anon_map", bindparams: List[BindParameter[Any]]
1010 ) -> Tuple[CacheableOptions, Any]:
1011 return (cls, ())
1013 @hybridmethod
1014 def _generate_cache_key(self) -> Optional[CacheKey]:
1015 return HasCacheKey._generate_cache_key(self)
1018class ExecutableOption(HasCopyInternals):
1019 __slots__ = ()
1021 _annotations: _ImmutableExecuteOptions = util.EMPTY_DICT
1023 __visit_name__: str = "executable_option"
1025 _is_has_cache_key: bool = False
1027 _is_core: bool = True
1029 def _clone(self, **kw):
1030 """Create a shallow copy of this ExecutableOption."""
1031 c = self.__class__.__new__(self.__class__)
1032 c.__dict__ = dict(self.__dict__) # type: ignore
1033 return c
1036_L = TypeVar("_L", bound=str)
1039class HasSyntaxExtensions(Generic[_L]):
1041 _position_map: Mapping[_L, str]
1043 @_generative
1044 def ext(self, extension: SyntaxExtension) -> Self:
1045 """Applies a SQL syntax extension to this statement.
1047 SQL syntax extensions are :class:`.ClauseElement` objects that define
1048 some vendor-specific syntactical construct that take place in specific
1049 parts of a SQL statement. Examples include vendor extensions like
1050 PostgreSQL / SQLite's "ON DUPLICATE KEY UPDATE", PostgreSQL's
1051 "DISTINCT ON", and MySQL's "LIMIT" that can be applied to UPDATE
1052 and DELETE statements.
1054 .. seealso::
1056 :ref:`examples_syntax_extensions`
1058 :func:`_mysql.limit` - DML LIMIT for MySQL
1060 :func:`_postgresql.distinct_on` - DISTINCT ON for PostgreSQL
1062 .. versionadded:: 2.1
1064 """
1065 extension = coercions.expect(
1066 roles.SyntaxExtensionRole, extension, apply_propagate_attrs=self
1067 )
1068 self._apply_syntax_extension_to_self(extension)
1069 return self
1071 @util.preload_module("sqlalchemy.sql.elements")
1072 def apply_syntax_extension_point(
1073 self,
1074 apply_fn: Callable[[Sequence[ClauseElement]], Sequence[ClauseElement]],
1075 position: _L,
1076 ) -> None:
1077 """Apply a :class:`.SyntaxExtension` to a known extension point.
1079 Should be used only internally by :class:`.SyntaxExtension`.
1081 E.g.::
1083 class Qualify(SyntaxExtension, ClauseElement):
1085 # ...
1087 def apply_to_select(self, select_stmt: Select) -> None:
1088 # append self to existing
1089 select_stmt.apply_extension_point(
1090 lambda existing: [*existing, self], "post_criteria"
1091 )
1094 class ReplaceExt(SyntaxExtension, ClauseElement):
1096 # ...
1098 def apply_to_select(self, select_stmt: Select) -> None:
1099 # replace any existing elements regardless of type
1100 select_stmt.apply_extension_point(
1101 lambda existing: [self], "post_criteria"
1102 )
1105 class ReplaceOfTypeExt(SyntaxExtension, ClauseElement):
1107 # ...
1109 def apply_to_select(self, select_stmt: Select) -> None:
1110 # replace any existing elements of the same type
1111 select_stmt.apply_extension_point(
1112 self.append_replacing_same_type, "post_criteria"
1113 )
1115 :param apply_fn: callable function that will receive a sequence of
1116 :class:`.ClauseElement` that is already populating the extension
1117 point (the sequence is empty if there isn't one), and should return
1118 a new sequence of :class:`.ClauseElement` that will newly populate
1119 that point. The function typically can choose to concatenate the
1120 existing values with the new one, or to replace the values that are
1121 there with a new one by returning a list of a single element, or
1122 to perform more complex operations like removing only the same
1123 type element from the input list of merging already existing elements
1124 of the same type. Some examples are shown in the examples above
1125 :param position: string name of the position to apply to. This
1126 varies per statement type. IDEs should show the possible values
1127 for each statement type as it's typed with a ``typing.Literal`` per
1128 statement.
1130 .. seealso::
1132 :ref:`examples_syntax_extensions`
1135 """ # noqa: E501
1137 try:
1138 attrname = self._position_map[position]
1139 except KeyError as ke:
1140 raise ValueError(
1141 f"Unknown position {position!r} for {self.__class__} "
1142 f"construct; known positions: "
1143 f"{', '.join(repr(k) for k in self._position_map)}"
1144 ) from ke
1145 else:
1146 ElementList = util.preloaded.sql_elements.ElementList
1147 existing: Optional[ClauseElement] = getattr(self, attrname, None)
1148 if existing is None:
1149 input_seq: Tuple[ClauseElement, ...] = ()
1150 elif isinstance(existing, ElementList):
1151 input_seq = existing.clauses
1152 else:
1153 input_seq = (existing,)
1155 new_seq = apply_fn(input_seq)
1156 assert new_seq, "cannot return empty sequence"
1157 new = new_seq[0] if len(new_seq) == 1 else ElementList(new_seq)
1158 setattr(self, attrname, new)
1160 def _apply_syntax_extension_to_self(
1161 self, extension: SyntaxExtension
1162 ) -> None:
1163 raise NotImplementedError()
1165 def _get_syntax_extensions_as_dict(self) -> Mapping[_L, SyntaxExtension]:
1166 res: Dict[_L, SyntaxExtension] = {}
1167 for name, attr in self._position_map.items():
1168 value = getattr(self, attr)
1169 if value is not None:
1170 res[name] = value
1171 return res
1173 def _set_syntax_extensions(self, **extensions: SyntaxExtension) -> None:
1174 for name, value in extensions.items():
1175 setattr(self, self._position_map[name], value) # type: ignore[index] # noqa: E501
1178class SyntaxExtension(roles.SyntaxExtensionRole):
1179 """Defines a unit that when also extending from :class:`.ClauseElement`
1180 can be applied to SQLAlchemy statements :class:`.Select`,
1181 :class:`_sql.Insert`, :class:`.Update` and :class:`.Delete` making use of
1182 pre-established SQL insertion points within these constructs.
1184 .. versionadded:: 2.1
1186 .. seealso::
1188 :ref:`examples_syntax_extensions`
1190 """
1192 def append_replacing_same_type(
1193 self, existing: Sequence[ClauseElement]
1194 ) -> Sequence[ClauseElement]:
1195 """Utility function that can be used as
1196 :paramref:`_sql.HasSyntaxExtensions.apply_extension_point.apply_fn`
1197 to remove any other element of the same type in existing and appending
1198 ``self`` to the list.
1200 This is equivalent to::
1202 stmt.apply_extension_point(
1203 lambda existing: [
1204 *(e for e in existing if not isinstance(e, ReplaceOfTypeExt)),
1205 self,
1206 ],
1207 "post_criteria",
1208 )
1210 .. seealso::
1212 :ref:`examples_syntax_extensions`
1214 :meth:`_sql.HasSyntaxExtensions.apply_syntax_extension_point`
1216 """ # noqa: E501
1217 cls = type(self)
1218 return [*(e for e in existing if not isinstance(e, cls)), self] # type: ignore[list-item] # noqa: E501
1220 def apply_to_select(self, select_stmt: Select[Unpack[_Ts]]) -> None:
1221 """Apply this :class:`.SyntaxExtension` to a :class:`.Select`"""
1222 raise NotImplementedError(
1223 f"Extension {type(self).__name__} cannot be applied to select"
1224 )
1226 def apply_to_update(self, update_stmt: Update) -> None:
1227 """Apply this :class:`.SyntaxExtension` to an :class:`.Update`"""
1228 raise NotImplementedError(
1229 f"Extension {type(self).__name__} cannot be applied to update"
1230 )
1232 def apply_to_delete(self, delete_stmt: Delete) -> None:
1233 """Apply this :class:`.SyntaxExtension` to a :class:`.Delete`"""
1234 raise NotImplementedError(
1235 f"Extension {type(self).__name__} cannot be applied to delete"
1236 )
1238 def apply_to_insert(self, insert_stmt: Insert) -> None:
1239 """Apply this :class:`.SyntaxExtension` to an :class:`_sql.Insert`"""
1240 raise NotImplementedError(
1241 f"Extension {type(self).__name__} cannot be applied to insert"
1242 )
1245class Executable(roles.StatementRole):
1246 """Mark a :class:`_expression.ClauseElement` as supporting execution.
1248 :class:`.Executable` is a superclass for all "statement" types
1249 of objects, including :func:`select`, :func:`delete`, :func:`update`,
1250 :func:`insert`, :func:`text`.
1252 """
1254 supports_execution: bool = True
1255 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT
1256 _is_default_generator: bool = False
1257 _with_options: Tuple[ExecutableOption, ...] = ()
1258 _compile_state_funcs: Tuple[
1259 Tuple[Callable[[CompileState], None], Any], ...
1260 ] = ()
1261 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]]
1263 _executable_traverse_internals = [
1264 ("_with_options", InternalTraversal.dp_executable_options),
1265 (
1266 "_compile_state_funcs",
1267 ExtendedInternalTraversal.dp_compile_state_funcs,
1268 ),
1269 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
1270 ]
1272 is_select: bool = False
1273 is_from_statement: bool = False
1274 is_update: bool = False
1275 is_insert: bool = False
1276 is_text: bool = False
1277 is_delete: bool = False
1278 is_dml: bool = False
1280 if TYPE_CHECKING:
1281 __visit_name__: str
1283 def _compile_w_cache(
1284 self,
1285 dialect: Dialect,
1286 *,
1287 compiled_cache: Optional[CompiledCacheType],
1288 column_keys: List[str],
1289 for_executemany: bool = False,
1290 schema_translate_map: Optional[SchemaTranslateMapType] = None,
1291 **kw: Any,
1292 ) -> tuple[
1293 Compiled,
1294 Sequence[BindParameter[Any]] | None,
1295 _CoreSingleExecuteParams | None,
1296 CacheStats,
1297 ]: ...
1299 def _execute_on_connection(
1300 self,
1301 connection: Connection,
1302 distilled_params: _CoreMultiExecuteParams,
1303 execution_options: CoreExecuteOptionsParameter,
1304 ) -> CursorResult[Any]: ...
1306 def _execute_on_scalar(
1307 self,
1308 connection: Connection,
1309 distilled_params: _CoreMultiExecuteParams,
1310 execution_options: CoreExecuteOptionsParameter,
1311 ) -> Any: ...
1313 @util.ro_non_memoized_property
1314 def _all_selected_columns(self) -> _SelectIterable:
1315 raise NotImplementedError()
1317 @property
1318 def _effective_plugin_target(self) -> str:
1319 return self.__visit_name__
1321 @_generative
1322 def options(self, *options: ExecutableOption) -> Self:
1323 """Apply options to this statement.
1325 In the general sense, options are any kind of Python object
1326 that can be interpreted by systems that consume the statement outside
1327 of the regular SQL compiler chain. Specifically, these options are
1328 the ORM level options that apply "eager load" and other loading
1329 behaviors to an ORM query.
1331 For background on specific kinds of options for specific kinds of
1332 statements, refer to the documentation for those option objects.
1334 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to
1335 Core statement objects towards the goal of allowing unified
1336 Core / ORM querying capabilities.
1338 .. seealso::
1340 :ref:`loading_columns` - refers to options specific to the usage
1341 of ORM queries
1343 :ref:`relationship_loader_options` - refers to options specific
1344 to the usage of ORM queries
1346 """
1347 self._with_options += tuple(
1348 coercions.expect(roles.ExecutableOptionRole, opt)
1349 for opt in options
1350 )
1351 return self
1353 @_generative
1354 def _set_compile_options(self, compile_options: CacheableOptions) -> Self:
1355 """Assign the compile options to a new value.
1357 :param compile_options: appropriate CacheableOptions structure
1359 """
1361 self._compile_options = compile_options
1362 return self
1364 @_generative
1365 def _update_compile_options(self, options: CacheableOptions) -> Self:
1366 """update the _compile_options with new keys."""
1368 assert self._compile_options is not None
1369 self._compile_options += options
1370 return self
1372 @_generative
1373 def _add_compile_state_func(
1374 self,
1375 callable_: Callable[[CompileState], None],
1376 cache_args: Any,
1377 ) -> Self:
1378 """Add a compile state function to this statement.
1380 When using the ORM only, these are callable functions that will
1381 be given the CompileState object upon compilation.
1383 A second argument cache_args is required, which will be combined with
1384 the ``__code__`` identity of the function itself in order to produce a
1385 cache key.
1387 """
1388 self._compile_state_funcs += ((callable_, cache_args),)
1389 return self
1391 @overload
1392 def execution_options(
1393 self,
1394 *,
1395 compiled_cache: Optional[CompiledCacheType] = ...,
1396 logging_token: str = ...,
1397 isolation_level: IsolationLevel = ...,
1398 no_parameters: bool = False,
1399 stream_results: bool = False,
1400 max_row_buffer: int = ...,
1401 yield_per: int = ...,
1402 driver_column_names: bool = ...,
1403 insertmanyvalues_page_size: int = ...,
1404 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
1405 populate_existing: bool = False,
1406 autoflush: bool = False,
1407 synchronize_session: SynchronizeSessionArgument = ...,
1408 dml_strategy: DMLStrategyArgument = ...,
1409 render_nulls: bool = ...,
1410 is_delete_using: bool = ...,
1411 is_update_from: bool = ...,
1412 preserve_rowcount: bool = False,
1413 **opt: Any,
1414 ) -> Self: ...
1416 @overload
1417 def execution_options(self, **opt: Any) -> Self: ...
1419 @_generative
1420 def execution_options(self, **kw: Any) -> Self:
1421 """Set non-SQL options for the statement which take effect during
1422 execution.
1424 Execution options can be set at many scopes, including per-statement,
1425 per-connection, or per execution, using methods such as
1426 :meth:`_engine.Connection.execution_options` and parameters which
1427 accept a dictionary of options such as
1428 :paramref:`_engine.Connection.execute.execution_options` and
1429 :paramref:`_orm.Session.execute.execution_options`.
1431 The primary characteristic of an execution option, as opposed to
1432 other kinds of options such as ORM loader options, is that
1433 **execution options never affect the compiled SQL of a query, only
1434 things that affect how the SQL statement itself is invoked or how
1435 results are fetched**. That is, execution options are not part of
1436 what's accommodated by SQL compilation nor are they considered part of
1437 the cached state of a statement.
1439 The :meth:`_sql.Executable.execution_options` method is
1440 :term:`generative`, as
1441 is the case for the method as applied to the :class:`_engine.Engine`
1442 and :class:`_orm.Query` objects, which means when the method is called,
1443 a copy of the object is returned, which applies the given parameters to
1444 that new copy, but leaves the original unchanged::
1446 statement = select(table.c.x, table.c.y)
1447 new_statement = statement.execution_options(my_option=True)
1449 An exception to this behavior is the :class:`_engine.Connection`
1450 object, where the :meth:`_engine.Connection.execution_options` method
1451 is explicitly **not** generative.
1453 The kinds of options that may be passed to
1454 :meth:`_sql.Executable.execution_options` and other related methods and
1455 parameter dictionaries include parameters that are explicitly consumed
1456 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not
1457 defined by SQLAlchemy, which means the methods and/or parameter
1458 dictionaries may be used for user-defined parameters that interact with
1459 custom code, which may access the parameters using methods such as
1460 :meth:`_sql.Executable.get_execution_options` and
1461 :meth:`_engine.Connection.get_execution_options`, or within selected
1462 event hooks using a dedicated ``execution_options`` event parameter
1463 such as
1464 :paramref:`_events.ConnectionEvents.before_execute.execution_options`
1465 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.::
1467 from sqlalchemy import event
1470 @event.listens_for(some_engine, "before_execute")
1471 def _process_opt(conn, statement, multiparams, params, execution_options):
1472 "run a SQL function before invoking a statement"
1474 if execution_options.get("do_special_thing", False):
1475 conn.exec_driver_sql("run_special_function()")
1477 Within the scope of options that are explicitly recognized by
1478 SQLAlchemy, most apply to specific classes of objects and not others.
1479 The most common execution options include:
1481 * :paramref:`_engine.Connection.execution_options.isolation_level` -
1482 sets the isolation level for a connection or a class of connections
1483 via an :class:`_engine.Engine`. This option is accepted only
1484 by :class:`_engine.Connection` or :class:`_engine.Engine`.
1486 * :paramref:`_engine.Connection.execution_options.stream_results` -
1487 indicates results should be fetched using a server side cursor;
1488 this option is accepted by :class:`_engine.Connection`, by the
1489 :paramref:`_engine.Connection.execute.execution_options` parameter
1490 on :meth:`_engine.Connection.execute`, and additionally by
1491 :meth:`_sql.Executable.execution_options` on a SQL statement object,
1492 as well as by ORM constructs like :meth:`_orm.Session.execute`.
1494 * :paramref:`_engine.Connection.execution_options.compiled_cache` -
1495 indicates a dictionary that will serve as the
1496 :ref:`SQL compilation cache <sql_caching>`
1497 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as
1498 well as for ORM methods like :meth:`_orm.Session.execute`.
1499 Can be passed as ``None`` to disable caching for statements.
1500 This option is not accepted by
1501 :meth:`_sql.Executable.execution_options` as it is inadvisable to
1502 carry along a compilation cache within a statement object.
1504 * :paramref:`_engine.Connection.execution_options.schema_translate_map`
1505 - a mapping of schema names used by the
1506 :ref:`Schema Translate Map <schema_translating>` feature, accepted
1507 by :class:`_engine.Connection`, :class:`_engine.Engine`,
1508 :class:`_sql.Executable`, as well as by ORM constructs
1509 like :meth:`_orm.Session.execute`.
1511 .. seealso::
1513 :meth:`_engine.Connection.execution_options`
1515 :paramref:`_engine.Connection.execute.execution_options`
1517 :paramref:`_orm.Session.execute.execution_options`
1519 :ref:`orm_queryguide_execution_options` - documentation on all
1520 ORM-specific execution options
1522 """ # noqa: E501
1523 if "isolation_level" in kw:
1524 raise exc.ArgumentError(
1525 "'isolation_level' execution option may only be specified "
1526 "on Connection.execution_options(), or "
1527 "per-engine using the isolation_level "
1528 "argument to create_engine()."
1529 )
1530 if "compiled_cache" in kw:
1531 raise exc.ArgumentError(
1532 "'compiled_cache' execution option may only be specified "
1533 "on Connection.execution_options(), not per statement."
1534 )
1535 self._execution_options = self._execution_options.union(kw)
1536 return self
1538 def get_execution_options(self) -> _ExecuteOptions:
1539 """Get the non-SQL options which will take effect during execution.
1541 .. seealso::
1543 :meth:`.Executable.execution_options`
1544 """
1545 return self._execution_options
1548class ExecutableStatement(Executable):
1549 """Executable subclass that implements a lightweight version of ``params``
1550 that avoids a full cloned traverse.
1552 .. versionadded:: 2.1
1554 """
1556 _params: util.immutabledict[str, Any] = EMPTY_DICT
1558 _executable_traverse_internals = (
1559 Executable._executable_traverse_internals
1560 + [("_params", InternalTraversal.dp_params)]
1561 )
1563 @_generative
1564 def params(
1565 self,
1566 __optionaldict: _CoreSingleExecuteParams | None = None,
1567 /,
1568 **kwargs: Any,
1569 ) -> Self:
1570 """Return a copy with the provided bindparam values.
1572 Returns a copy of this Executable with bindparam values set
1573 to the given dictionary::
1575 >>> clause = column("x") + bindparam("foo")
1576 >>> print(clause.compile().params)
1577 {'foo': None}
1578 >>> print(clause.params({"foo": 7}).compile().params)
1579 {'foo': 7}
1581 """
1582 if __optionaldict:
1583 kwargs.update(__optionaldict)
1584 self._params = (
1585 util.immutabledict(kwargs)
1586 if not self._params
1587 else self._params | kwargs
1588 )
1589 return self
1592class SchemaEventTarget(event.EventTarget):
1593 """Base class for elements that are the targets of :class:`.DDLEvents`
1594 events.
1596 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
1598 """
1600 dispatch: dispatcher[SchemaEventTarget]
1602 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
1603 """Associate with this SchemaEvent's parent object."""
1605 def _set_parent_with_dispatch(
1606 self, parent: SchemaEventTarget, **kw: Any
1607 ) -> None:
1608 self.dispatch.before_parent_attach(self, parent)
1609 self._set_parent(parent, **kw)
1610 self.dispatch.after_parent_attach(self, parent)
1613class SchemaVisitable(SchemaEventTarget, visitors.Visitable):
1614 """Base class for elements that are targets of a :class:`.SchemaVisitor`.
1616 .. versionadded:: 2.0.41
1618 """
1621class SchemaVisitor(ClauseVisitor):
1622 """Define the visiting for ``SchemaItem`` and more
1623 generally ``SchemaVisitable`` objects.
1625 """
1627 __traverse_options__: Dict[str, Any] = {"schema_visitor": True}
1630class _SentinelDefaultCharacterization(Enum):
1631 NONE = "none"
1632 UNKNOWN = "unknown"
1633 CLIENTSIDE = "clientside"
1634 SENTINEL_DEFAULT = "sentinel_default"
1635 SERVERSIDE = "serverside"
1636 IDENTITY = "identity"
1637 SEQUENCE = "sequence"
1640class _SentinelColumnCharacterization(NamedTuple):
1641 columns: Optional[Sequence[Column[Any]]] = None
1642 is_explicit: bool = False
1643 is_autoinc: bool = False
1644 default_characterization: _SentinelDefaultCharacterization = (
1645 _SentinelDefaultCharacterization.NONE
1646 )
1649_COLKEY = TypeVar("_COLKEY", Union[None, str], str)
1651_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True)
1652_COL = TypeVar("_COL", bound="ColumnElement[Any]")
1655class _ColumnMetrics(Generic[_COL_co]):
1656 __slots__ = ("column",)
1658 column: _COL_co
1660 def __init__(
1661 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co
1662 ) -> None:
1663 self.column = col
1665 # proxy_index being non-empty means it was initialized.
1666 # so we need to update it
1667 pi = collection._proxy_index
1668 if pi:
1669 for eps_col in col._expanded_proxy_set:
1670 pi[eps_col].add(self)
1672 def get_expanded_proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
1673 return self.column._expanded_proxy_set
1675 def dispose(self, collection: ColumnCollection[_COLKEY, _COL_co]) -> None:
1676 pi = collection._proxy_index
1677 if not pi:
1678 return
1679 for col in self.column._expanded_proxy_set:
1680 colset = pi.get(col, None)
1681 if colset:
1682 colset.discard(self)
1683 if colset is not None and not colset:
1684 del pi[col]
1686 def embedded(
1687 self,
1688 target_set: Union[
1689 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]]
1690 ],
1691 ) -> bool:
1692 expanded_proxy_set = self.column._expanded_proxy_set
1693 for t in target_set.difference(expanded_proxy_set):
1694 if not expanded_proxy_set.intersection(_expand_cloned([t])):
1695 return False
1696 return True
1699class ColumnCollection(Generic[_COLKEY, _COL_co]):
1700 """Collection of :class:`_expression.ColumnElement` instances,
1701 typically for
1702 :class:`_sql.FromClause` objects.
1704 The :class:`_sql.ColumnCollection` object is most commonly available
1705 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection
1706 on the :class:`_schema.Table` object, introduced at
1707 :ref:`metadata_tables_and_columns`.
1709 The :class:`_expression.ColumnCollection` has both mapping- and sequence-
1710 like behaviors. A :class:`_expression.ColumnCollection` usually stores
1711 :class:`_schema.Column` objects, which are then accessible both via mapping
1712 style access as well as attribute access style.
1714 To access :class:`_schema.Column` objects using ordinary attribute-style
1715 access, specify the name like any other object attribute, such as below
1716 a column named ``employee_name`` is accessed::
1718 >>> employee_table.c.employee_name
1720 To access columns that have names with special characters or spaces,
1721 index-style access is used, such as below which illustrates a column named
1722 ``employee ' payment`` is accessed::
1724 >>> employee_table.c["employee ' payment"]
1726 As the :class:`_sql.ColumnCollection` object provides a Python dictionary
1727 interface, common dictionary method names like
1728 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`,
1729 and :meth:`_sql.ColumnCollection.items` are available, which means that
1730 database columns that are keyed under these names also need to use indexed
1731 access::
1733 >>> employee_table.c["values"]
1736 The name for which a :class:`_schema.Column` would be present is normally
1737 that of the :paramref:`_schema.Column.key` parameter. In some contexts,
1738 such as a :class:`_sql.Select` object that uses a label style set
1739 using the :meth:`_sql.Select.set_label_style` method, a column of a certain
1740 key may instead be represented under a particular label name such
1741 as ``tablename_columnname``::
1743 >>> from sqlalchemy import select, column, table
1744 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
1745 >>> t = table("t", column("c"))
1746 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
1747 >>> subq = stmt.subquery()
1748 >>> subq.c.t_c
1749 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c>
1751 :class:`.ColumnCollection` also indexes the columns in order and allows
1752 them to be accessible by their integer position::
1754 >>> cc[0]
1755 Column('x', Integer(), table=None)
1756 >>> cc[1]
1757 Column('y', Integer(), table=None)
1759 .. versionadded:: 1.4 :class:`_expression.ColumnCollection`
1760 allows integer-based
1761 index access to the collection.
1763 Iterating the collection yields the column expressions in order::
1765 >>> list(cc)
1766 [Column('x', Integer(), table=None),
1767 Column('y', Integer(), table=None)]
1769 The base :class:`_expression.ColumnCollection` object can store
1770 duplicates, which can
1771 mean either two columns with the same key, in which case the column
1772 returned by key access is **arbitrary**::
1774 >>> x1, x2 = Column("x", Integer), Column("x", Integer)
1775 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)])
1776 >>> list(cc)
1777 [Column('x', Integer(), table=None),
1778 Column('x', Integer(), table=None)]
1779 >>> cc["x"] is x1
1780 False
1781 >>> cc["x"] is x2
1782 True
1784 Or it can also mean the same column multiple times. These cases are
1785 supported as :class:`_expression.ColumnCollection`
1786 is used to represent the columns in
1787 a SELECT statement which may include duplicates.
1789 A special subclass :class:`.DedupeColumnCollection` exists which instead
1790 maintains SQLAlchemy's older behavior of not allowing duplicates; this
1791 collection is used for schema level objects like :class:`_schema.Table`
1792 and
1793 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The
1794 :class:`.DedupeColumnCollection` class also has additional mutation methods
1795 as the schema constructs have more use cases that require removal and
1796 replacement of columns.
1798 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection`
1799 now stores duplicate
1800 column keys as well as the same column in multiple positions. The
1801 :class:`.DedupeColumnCollection` class is added to maintain the
1802 former behavior in those cases where deduplication as well as
1803 additional replace/remove operations are needed.
1806 """
1808 __slots__ = ("_collection", "_index", "_colset", "_proxy_index")
1810 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]]
1811 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
1812 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]]
1813 _colset: Set[_COL_co]
1815 def __init__(
1816 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None
1817 ):
1818 object.__setattr__(self, "_colset", set())
1819 object.__setattr__(self, "_index", {})
1820 object.__setattr__(
1821 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1822 )
1823 object.__setattr__(self, "_collection", [])
1824 if columns:
1825 self._initial_populate(columns)
1827 @util.preload_module("sqlalchemy.sql.elements")
1828 def __clause_element__(self) -> ClauseList:
1829 elements = util.preloaded.sql_elements
1831 return elements.ClauseList(
1832 _literal_as_text_role=roles.ColumnsClauseRole,
1833 group=False,
1834 *self._all_columns,
1835 )
1837 def _initial_populate(
1838 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1839 ) -> None:
1840 self._populate_separate_keys(iter_)
1842 @property
1843 def _all_columns(self) -> List[_COL_co]:
1844 return [col for (_, col, _) in self._collection]
1846 def keys(self) -> List[_COLKEY]:
1847 """Return a sequence of string key names for all columns in this
1848 collection."""
1849 return [k for (k, _, _) in self._collection]
1851 def values(self) -> List[_COL_co]:
1852 """Return a sequence of :class:`_sql.ColumnClause` or
1853 :class:`_schema.Column` objects for all columns in this
1854 collection."""
1855 return [col for (_, col, _) in self._collection]
1857 def items(self) -> List[Tuple[_COLKEY, _COL_co]]:
1858 """Return a sequence of (key, column) tuples for all columns in this
1859 collection each consisting of a string key name and a
1860 :class:`_sql.ColumnClause` or
1861 :class:`_schema.Column` object.
1862 """
1864 return [(k, col) for (k, col, _) in self._collection]
1866 def __bool__(self) -> bool:
1867 return bool(self._collection)
1869 def __len__(self) -> int:
1870 return len(self._collection)
1872 def __iter__(self) -> Iterator[_COL_co]:
1873 # turn to a list first to maintain over a course of changes
1874 return iter([col for _, col, _ in self._collection])
1876 @overload
1877 def __getitem__(self, key: Union[str, int]) -> _COL_co: ...
1879 @overload
1880 def __getitem__(
1881 self, key: Tuple[Union[str, int], ...]
1882 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1884 @overload
1885 def __getitem__(
1886 self, key: slice
1887 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1889 def __getitem__(
1890 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]]
1891 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]:
1892 try:
1893 if isinstance(key, (tuple, slice)):
1894 if isinstance(key, slice):
1895 cols = (
1896 (sub_key, col)
1897 for (sub_key, col, _) in self._collection[key]
1898 )
1899 else:
1900 cols = (self._index[sub_key] for sub_key in key)
1902 return ColumnCollection(cols).as_readonly()
1903 else:
1904 return self._index[key][1]
1905 except KeyError as err:
1906 if isinstance(err.args[0], int):
1907 raise IndexError(err.args[0]) from err
1908 else:
1909 raise
1911 def __getattr__(self, key: str) -> _COL_co:
1912 try:
1913 return self._index[key][1]
1914 except KeyError as err:
1915 raise AttributeError(key) from err
1917 def __contains__(self, key: str) -> bool:
1918 if key not in self._index:
1919 if not isinstance(key, str):
1920 raise exc.ArgumentError(
1921 "__contains__ requires a string argument"
1922 )
1923 return False
1924 else:
1925 return True
1927 def compare(self, other: ColumnCollection[_COLKEY, _COL_co]) -> bool:
1928 """Compare this :class:`_expression.ColumnCollection` to another
1929 based on the names of the keys"""
1931 for l, r in zip_longest(self, other):
1932 if l is not r:
1933 return False
1934 else:
1935 return True
1937 def __eq__(self, other: Any) -> bool:
1938 return self.compare(other)
1940 @overload
1941 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ...
1943 @overload
1944 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ...
1946 def get(
1947 self, key: str, default: Optional[_COL] = None
1948 ) -> Optional[Union[_COL_co, _COL]]:
1949 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object
1950 based on a string key name from this
1951 :class:`_expression.ColumnCollection`."""
1953 if key in self._index:
1954 return self._index[key][1]
1955 else:
1956 return default
1958 def __str__(self) -> str:
1959 return "%s(%s)" % (
1960 self.__class__.__name__,
1961 ", ".join(str(c) for c in self),
1962 )
1964 def __setitem__(self, key: str, value: Any) -> NoReturn:
1965 raise NotImplementedError()
1967 def __delitem__(self, key: str) -> NoReturn:
1968 raise NotImplementedError()
1970 def __setattr__(self, key: str, obj: Any) -> NoReturn:
1971 raise NotImplementedError()
1973 def clear(self) -> NoReturn:
1974 """Dictionary clear() is not implemented for
1975 :class:`_sql.ColumnCollection`."""
1976 raise NotImplementedError()
1978 def remove(self, column: Any) -> NoReturn:
1979 raise NotImplementedError()
1981 def update(self, iter_: Any) -> NoReturn:
1982 """Dictionary update() is not implemented for
1983 :class:`_sql.ColumnCollection`."""
1984 raise NotImplementedError()
1986 # https://github.com/python/mypy/issues/4266
1987 __hash__: Optional[int] = None # type: ignore
1989 def _populate_separate_keys(
1990 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1991 ) -> None:
1992 """populate from an iterator of (key, column)"""
1994 self._collection[:] = collection = [
1995 (k, c, _ColumnMetrics(self, c)) for k, c in iter_
1996 ]
1997 self._colset.update(c._deannotate() for _, c, _ in collection)
1998 self._index.update(
1999 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)}
2000 )
2001 self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
2003 def add(
2004 self,
2005 column: ColumnElement[Any],
2006 key: Optional[_COLKEY] = None,
2007 ) -> None:
2008 """Add a column to this :class:`_sql.ColumnCollection`.
2010 .. note::
2012 This method is **not normally used by user-facing code**, as the
2013 :class:`_sql.ColumnCollection` is usually part of an existing
2014 object such as a :class:`_schema.Table`. To add a
2015 :class:`_schema.Column` to an existing :class:`_schema.Table`
2016 object, use the :meth:`_schema.Table.append_column` method.
2018 """
2019 colkey: _COLKEY
2021 if key is None:
2022 colkey = column.key # type: ignore
2023 else:
2024 colkey = key
2026 l = len(self._collection)
2028 # don't really know how this part is supposed to work w/ the
2029 # covariant thing
2031 _column = cast(_COL_co, column)
2033 self._collection.append(
2034 (colkey, _column, _ColumnMetrics(self, _column))
2035 )
2036 self._colset.add(_column._deannotate())
2038 self._index[l] = (colkey, _column)
2039 if colkey not in self._index:
2040 self._index[colkey] = (colkey, _column)
2042 def __getstate__(self) -> Dict[str, Any]:
2043 return {
2044 "_collection": [(k, c) for k, c, _ in self._collection],
2045 "_index": self._index,
2046 }
2048 def __setstate__(self, state: Dict[str, Any]) -> None:
2049 object.__setattr__(self, "_index", state["_index"])
2050 object.__setattr__(
2051 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
2052 )
2053 object.__setattr__(
2054 self,
2055 "_collection",
2056 [
2057 (k, c, _ColumnMetrics(self, c))
2058 for (k, c) in state["_collection"]
2059 ],
2060 )
2061 object.__setattr__(
2062 self, "_colset", {col for k, col, _ in self._collection}
2063 )
2065 def contains_column(self, col: ColumnElement[Any]) -> bool:
2066 """Checks if a column object exists in this collection"""
2067 if col not in self._colset:
2068 if isinstance(col, str):
2069 raise exc.ArgumentError(
2070 "contains_column cannot be used with string arguments. "
2071 "Use ``col_name in table.c`` instead."
2072 )
2073 return False
2074 else:
2075 return True
2077 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
2078 """Return a "read only" form of this
2079 :class:`_sql.ColumnCollection`."""
2081 return ReadOnlyColumnCollection(self)
2083 def _init_proxy_index(self) -> None:
2084 """populate the "proxy index", if empty.
2086 proxy index is added in 2.0 to provide more efficient operation
2087 for the corresponding_column() method.
2089 For reasons of both time to construct new .c collections as well as
2090 memory conservation for large numbers of large .c collections, the
2091 proxy_index is only filled if corresponding_column() is called. once
2092 filled it stays that way, and new _ColumnMetrics objects created after
2093 that point will populate it with new data. Note this case would be
2094 unusual, if not nonexistent, as it means a .c collection is being
2095 mutated after corresponding_column() were used, however it is tested in
2096 test/base/test_utils.py.
2098 """
2099 pi = self._proxy_index
2100 if pi:
2101 return
2103 for _, _, metrics in self._collection:
2104 eps = metrics.column._expanded_proxy_set
2106 for eps_col in eps:
2107 pi[eps_col].add(metrics)
2109 def corresponding_column(
2110 self, column: _COL, require_embedded: bool = False
2111 ) -> Optional[Union[_COL, _COL_co]]:
2112 """Given a :class:`_expression.ColumnElement`, return the exported
2113 :class:`_expression.ColumnElement` object from this
2114 :class:`_expression.ColumnCollection`
2115 which corresponds to that original :class:`_expression.ColumnElement`
2116 via a common
2117 ancestor column.
2119 :param column: the target :class:`_expression.ColumnElement`
2120 to be matched.
2122 :param require_embedded: only return corresponding columns for
2123 the given :class:`_expression.ColumnElement`, if the given
2124 :class:`_expression.ColumnElement`
2125 is actually present within a sub-element
2126 of this :class:`_expression.Selectable`.
2127 Normally the column will match if
2128 it merely shares a common ancestor with one of the exported
2129 columns of this :class:`_expression.Selectable`.
2131 .. seealso::
2133 :meth:`_expression.Selectable.corresponding_column`
2134 - invokes this method
2135 against the collection returned by
2136 :attr:`_expression.Selectable.exported_columns`.
2138 .. versionchanged:: 1.4 the implementation for ``corresponding_column``
2139 was moved onto the :class:`_expression.ColumnCollection` itself.
2141 """
2142 # TODO: cython candidate
2144 # don't dig around if the column is locally present
2145 if column in self._colset:
2146 return column
2148 selected_intersection, selected_metrics = None, None
2149 target_set = column.proxy_set
2151 pi = self._proxy_index
2152 if not pi:
2153 self._init_proxy_index()
2155 for current_metrics in (
2156 mm for ts in target_set if ts in pi for mm in pi[ts]
2157 ):
2158 if not require_embedded or current_metrics.embedded(target_set):
2159 if selected_metrics is None:
2160 # no corresponding column yet, pick this one.
2161 selected_metrics = current_metrics
2162 continue
2164 current_intersection = target_set.intersection(
2165 current_metrics.column._expanded_proxy_set
2166 )
2167 if selected_intersection is None:
2168 selected_intersection = target_set.intersection(
2169 selected_metrics.column._expanded_proxy_set
2170 )
2172 if len(current_intersection) > len(selected_intersection):
2173 # 'current' has a larger field of correspondence than
2174 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x
2175 # matches a1.c.x->table.c.x better than
2176 # selectable.c.x->table.c.x does.
2178 selected_metrics = current_metrics
2179 selected_intersection = current_intersection
2180 elif current_intersection == selected_intersection:
2181 # they have the same field of correspondence. see
2182 # which proxy_set has fewer columns in it, which
2183 # indicates a closer relationship with the root
2184 # column. Also take into account the "weight"
2185 # attribute which CompoundSelect() uses to give
2186 # higher precedence to columns based on vertical
2187 # position in the compound statement, and discard
2188 # columns that have no reference to the target
2189 # column (also occurs with CompoundSelect)
2191 selected_col_distance = sum(
2192 [
2193 sc._annotations.get("weight", 1)
2194 for sc in (
2195 selected_metrics.column._uncached_proxy_list()
2196 )
2197 if sc.shares_lineage(column)
2198 ],
2199 )
2200 current_col_distance = sum(
2201 [
2202 sc._annotations.get("weight", 1)
2203 for sc in (
2204 current_metrics.column._uncached_proxy_list()
2205 )
2206 if sc.shares_lineage(column)
2207 ],
2208 )
2209 if current_col_distance < selected_col_distance:
2210 selected_metrics = current_metrics
2211 selected_intersection = current_intersection
2213 return selected_metrics.column if selected_metrics else None
2216_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]")
2219class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
2220 """A :class:`_expression.ColumnCollection`
2221 that maintains deduplicating behavior.
2223 This is useful by schema level objects such as :class:`_schema.Table` and
2224 :class:`.PrimaryKeyConstraint`. The collection includes more
2225 sophisticated mutator methods as well to suit schema objects which
2226 require mutable column collections.
2228 .. versionadded:: 1.4
2230 """
2232 def add( # type: ignore[override]
2233 self,
2234 column: _NAMEDCOL,
2235 key: Optional[str] = None,
2236 *,
2237 index: Optional[int] = None,
2238 ) -> None:
2239 if key is not None and column.key != key:
2240 raise exc.ArgumentError(
2241 "DedupeColumnCollection requires columns be under "
2242 "the same key as their .key"
2243 )
2244 key = column.key
2246 if key is None:
2247 raise exc.ArgumentError(
2248 "Can't add unnamed column to column collection"
2249 )
2251 if key in self._index:
2252 existing = self._index[key][1]
2254 if existing is column:
2255 return
2257 self.replace(column, index=index)
2259 # pop out memoized proxy_set as this
2260 # operation may very well be occurring
2261 # in a _make_proxy operation
2262 util.memoized_property.reset(column, "proxy_set")
2263 else:
2264 self._append_new_column(key, column, index=index)
2266 def _append_new_column(
2267 self, key: str, named_column: _NAMEDCOL, *, index: Optional[int] = None
2268 ) -> None:
2269 collection_length = len(self._collection)
2271 if index is None:
2272 l = collection_length
2273 else:
2274 if index < 0:
2275 index = max(0, collection_length + index)
2276 l = index
2278 if index is None:
2279 self._collection.append(
2280 (key, named_column, _ColumnMetrics(self, named_column))
2281 )
2282 else:
2283 self._collection.insert(
2284 index, (key, named_column, _ColumnMetrics(self, named_column))
2285 )
2287 self._colset.add(named_column._deannotate())
2289 if index is not None:
2290 for idx in reversed(range(index, collection_length)):
2291 self._index[idx + 1] = self._index[idx]
2293 self._index[l] = (key, named_column)
2294 self._index[key] = (key, named_column)
2296 def _populate_separate_keys(
2297 self, iter_: Iterable[Tuple[str, _NAMEDCOL]]
2298 ) -> None:
2299 """populate from an iterator of (key, column)"""
2300 cols = list(iter_)
2302 replace_col = []
2303 for k, col in cols:
2304 if col.key != k:
2305 raise exc.ArgumentError(
2306 "DedupeColumnCollection requires columns be under "
2307 "the same key as their .key"
2308 )
2309 if col.name in self._index and col.key != col.name:
2310 replace_col.append(col)
2311 elif col.key in self._index:
2312 replace_col.append(col)
2313 else:
2314 self._index[k] = (k, col)
2315 self._collection.append((k, col, _ColumnMetrics(self, col)))
2316 self._colset.update(c._deannotate() for (k, c, _) in self._collection)
2318 self._index.update(
2319 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection)
2320 )
2321 for col in replace_col:
2322 self.replace(col)
2324 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None:
2325 self._populate_separate_keys((col.key, col) for col in iter_)
2327 def remove(self, column: _NAMEDCOL) -> None: # type: ignore[override]
2328 if column not in self._colset:
2329 raise ValueError(
2330 "Can't remove column %r; column is not in this collection"
2331 % column
2332 )
2333 del self._index[column.key]
2334 self._colset.remove(column)
2335 self._collection[:] = [
2336 (k, c, metrics)
2337 for (k, c, metrics) in self._collection
2338 if c is not column
2339 ]
2340 for metrics in self._proxy_index.get(column, ()):
2341 metrics.dispose(self)
2343 self._index.update(
2344 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2345 )
2346 # delete higher index
2347 del self._index[len(self._collection)]
2349 def replace(
2350 self,
2351 column: _NAMEDCOL,
2352 *,
2353 extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
2354 index: Optional[int] = None,
2355 ) -> None:
2356 """add the given column to this collection, removing unaliased
2357 versions of this column as well as existing columns with the
2358 same key.
2360 e.g.::
2362 t = Table("sometable", metadata, Column("col1", Integer))
2363 t.columns.replace(Column("col1", Integer, key="columnone"))
2365 will remove the original 'col1' from the collection, and add
2366 the new column under the name 'columnname'.
2368 Used by schema.Column to override columns during table reflection.
2370 """
2372 if extra_remove:
2373 remove_col = set(extra_remove)
2374 else:
2375 remove_col = set()
2376 # remove up to two columns based on matches of name as well as key
2377 if column.name in self._index and column.key != column.name:
2378 other = self._index[column.name][1]
2379 if other.name == other.key:
2380 remove_col.add(other)
2382 if column.key in self._index:
2383 remove_col.add(self._index[column.key][1])
2385 if not remove_col:
2386 self._append_new_column(column.key, column, index=index)
2387 return
2388 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
2389 replace_index = None
2391 for idx, (k, col, metrics) in enumerate(self._collection):
2392 if col in remove_col:
2393 if replace_index is None:
2394 replace_index = idx
2395 new_cols.append(
2396 (column.key, column, _ColumnMetrics(self, column))
2397 )
2398 else:
2399 new_cols.append((k, col, metrics))
2401 if remove_col:
2402 self._colset.difference_update(remove_col)
2404 for rc in remove_col:
2405 for metrics in self._proxy_index.get(rc, ()):
2406 metrics.dispose(self)
2408 if replace_index is None:
2409 if index is not None:
2410 new_cols.insert(
2411 index, (column.key, column, _ColumnMetrics(self, column))
2412 )
2414 else:
2415 new_cols.append(
2416 (column.key, column, _ColumnMetrics(self, column))
2417 )
2418 elif index is not None:
2419 to_move = new_cols[replace_index]
2420 effective_positive_index = (
2421 index if index >= 0 else max(0, len(new_cols) + index)
2422 )
2423 new_cols.insert(index, to_move)
2424 if replace_index > effective_positive_index:
2425 del new_cols[replace_index + 1]
2426 else:
2427 del new_cols[replace_index]
2429 self._colset.add(column._deannotate())
2430 self._collection[:] = new_cols
2432 self._index.clear()
2434 self._index.update(
2435 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2436 )
2437 self._index.update({k: (k, col) for (k, col, _) in self._collection})
2440class ReadOnlyColumnCollection(
2441 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co]
2442):
2443 __slots__ = ("_parent",)
2445 def __init__(self, collection: ColumnCollection[_COLKEY, _COL_co]):
2446 object.__setattr__(self, "_parent", collection)
2447 object.__setattr__(self, "_colset", collection._colset)
2448 object.__setattr__(self, "_index", collection._index)
2449 object.__setattr__(self, "_collection", collection._collection)
2450 object.__setattr__(self, "_proxy_index", collection._proxy_index)
2452 def __getstate__(self) -> Dict[str, _COL_co]:
2453 return {"_parent": self._parent}
2455 def __setstate__(self, state: Dict[str, Any]) -> None:
2456 parent = state["_parent"]
2457 self.__init__(parent) # type: ignore
2459 def add(self, column: Any, key: Any = ...) -> Any:
2460 self._readonly()
2462 def extend(self, elements: Any) -> NoReturn:
2463 self._readonly()
2465 def remove(self, item: Any) -> NoReturn:
2466 self._readonly()
2469class ColumnSet(util.OrderedSet["ColumnClause[Any]"]):
2470 def contains_column(self, col: ColumnClause[Any]) -> bool:
2471 return col in self
2473 def extend(self, cols: Iterable[Any]) -> None:
2474 for col in cols:
2475 self.add(col)
2477 def __eq__(self, other):
2478 l = []
2479 for c in other:
2480 for local in self:
2481 if c.shares_lineage(local):
2482 l.append(c == local)
2483 return elements.and_(*l)
2485 def __hash__(self) -> int: # type: ignore[override]
2486 return hash(tuple(x for x in self))
2489def _entity_namespace(
2490 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2491) -> _EntityNamespace:
2492 """Return the nearest .entity_namespace for the given entity.
2494 If not immediately available, does an iterate to find a sub-element
2495 that has one, if any.
2497 """
2498 try:
2499 return cast(_HasEntityNamespace, entity).entity_namespace
2500 except AttributeError:
2501 for elem in visitors.iterate(cast(ExternallyTraversible, entity)):
2502 if _is_has_entity_namespace(elem):
2503 return elem.entity_namespace
2504 else:
2505 raise
2508@overload
2509def _entity_namespace_key(
2510 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2511 key: str,
2512) -> SQLCoreOperations[Any]: ...
2515@overload
2516def _entity_namespace_key(
2517 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2518 key: str,
2519 default: _NoArg,
2520) -> SQLCoreOperations[Any]: ...
2523@overload
2524def _entity_namespace_key(
2525 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2526 key: str,
2527 default: _T,
2528) -> Union[SQLCoreOperations[Any], _T]: ...
2531def _entity_namespace_key(
2532 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2533 key: str,
2534 default: Union[SQLCoreOperations[Any], _T, _NoArg] = NO_ARG,
2535) -> Union[SQLCoreOperations[Any], _T]:
2536 """Return an entry from an entity_namespace.
2539 Raises :class:`_exc.InvalidRequestError` rather than attribute error
2540 on not found.
2542 """
2544 try:
2545 ns = _entity_namespace(entity)
2546 if default is not NO_ARG:
2547 return getattr(ns, key, default)
2548 else:
2549 return getattr(ns, key) # type: ignore
2550 except AttributeError as err:
2551 raise exc.InvalidRequestError(
2552 'Entity namespace for "%s" has no property "%s"' % (entity, key)
2553 ) from err