Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/base.py: 48%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# sql/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
9"""Foundational utilities common to many sql modules."""
12from __future__ import annotations
14import collections
15from enum import Enum
16import itertools
17from itertools import zip_longest
18import operator
19import re
20from typing import Any
21from typing import Callable
22from typing import cast
23from typing import Collection
24from typing import Dict
25from typing import Final
26from typing import FrozenSet
27from typing import Generator
28from typing import Generic
29from typing import Iterable
30from typing import Iterator
31from typing import List
32from typing import Mapping
33from typing import MutableMapping
34from typing import NamedTuple
35from typing import NoReturn
36from typing import Optional
37from typing import overload
38from typing import Protocol
39from typing import Sequence
40from typing import Set
41from typing import Tuple
42from typing import Type
43from typing import TYPE_CHECKING
44from typing import TypeGuard
45from typing import TypeVar
46from typing import Union
48from . import roles
49from . import visitors
50from .cache_key import HasCacheKey # noqa
51from .cache_key import MemoizedHasCacheKey # noqa
52from .traversals import HasCopyInternals # noqa
53from .visitors import ClauseVisitor
54from .visitors import ExtendedInternalTraversal
55from .visitors import ExternallyTraversible
56from .visitors import InternalTraversal
57from .. import event
58from .. import exc
59from .. import util
60from ..util import EMPTY_DICT
61from ..util import HasMemoized as HasMemoized
62from ..util import hybridmethod
63from ..util.typing import Self
64from ..util.typing import TypeVarTuple
65from ..util.typing import Unpack
67if TYPE_CHECKING:
68 from . import coercions
69 from . import elements
70 from . import type_api
71 from ._orm_types import DMLStrategyArgument
72 from ._orm_types import SynchronizeSessionArgument
73 from ._typing import _CLE
74 from .cache_key import CacheKey
75 from .compiler import SQLCompiler
76 from .dml import Delete
77 from .dml import Insert
78 from .dml import Update
79 from .elements import BindParameter
80 from .elements import ClauseElement
81 from .elements import ClauseList
82 from .elements import ColumnClause # noqa
83 from .elements import ColumnElement
84 from .elements import NamedColumn
85 from .elements import SQLCoreOperations
86 from .elements import TextClause
87 from .schema import Column
88 from .schema import DefaultGenerator
89 from .selectable import _JoinTargetElement
90 from .selectable import _SelectIterable
91 from .selectable import FromClause
92 from .selectable import Select
93 from .visitors import anon_map
94 from ..engine import Connection
95 from ..engine import CursorResult
96 from ..engine.interfaces import _CoreMultiExecuteParams
97 from ..engine.interfaces import _CoreSingleExecuteParams
98 from ..engine.interfaces import _ExecuteOptions
99 from ..engine.interfaces import _ImmutableExecuteOptions
100 from ..engine.interfaces import CacheStats
101 from ..engine.interfaces import Compiled
102 from ..engine.interfaces import CompiledCacheType
103 from ..engine.interfaces import CoreExecuteOptionsParameter
104 from ..engine.interfaces import Dialect
105 from ..engine.interfaces import IsolationLevel
106 from ..engine.interfaces import SchemaTranslateMapType
107 from ..event import dispatcher
109if not TYPE_CHECKING:
110 coercions = None # noqa
111 elements = None # noqa
112 type_api = None # noqa
115_Ts = TypeVarTuple("_Ts")
118class _NoArg(Enum):
119 NO_ARG = 0
121 def __repr__(self):
122 return f"_NoArg.{self.name}"
125NO_ARG: Final = _NoArg.NO_ARG
128class _NoneName(Enum):
129 NONE_NAME = 0
130 """indicate a 'deferred' name that was ultimately the value None."""
133_NONE_NAME: Final = _NoneName.NONE_NAME
135_T = TypeVar("_T", bound=Any)
137_Fn = TypeVar("_Fn", bound=Callable[..., Any])
139_AmbiguousTableNameMap = MutableMapping[str, str]
142class _DefaultDescriptionTuple(NamedTuple):
143 arg: Any
144 is_scalar: Optional[bool]
145 is_callable: Optional[bool]
146 is_sentinel: Optional[bool]
148 @classmethod
149 def _from_column_default(
150 cls, default: Optional[DefaultGenerator]
151 ) -> _DefaultDescriptionTuple:
152 return (
153 _DefaultDescriptionTuple(
154 default.arg, # type: ignore
155 default.is_scalar,
156 default.is_callable,
157 default.is_sentinel,
158 )
159 if default
160 and (
161 default.has_arg
162 or (not default.for_update and default.is_sentinel)
163 )
164 else _DefaultDescriptionTuple(None, None, None, None)
165 )
168_never_select_column: operator.attrgetter[Any] = operator.attrgetter(
169 "_omit_from_statements"
170)
173class _EntityNamespace(Protocol):
174 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ...
177class _HasEntityNamespace(Protocol):
178 @util.ro_non_memoized_property
179 def entity_namespace(self) -> _EntityNamespace: ...
182def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]:
183 return hasattr(element, "entity_namespace")
186# Remove when https://github.com/python/mypy/issues/14640 will be fixed
187_Self = TypeVar("_Self", bound=Any)
190class Immutable:
191 """mark a ClauseElement as 'immutable' when expressions are cloned.
193 "immutable" objects refers to the "mutability" of an object in the
194 context of SQL DQL and DML generation. Such as, in DQL, one can
195 compose a SELECT or subquery of varied forms, but one cannot modify
196 the structure of a specific table or column within DQL.
197 :class:`.Immutable` is mostly intended to follow this concept, and as
198 such the primary "immutable" objects are :class:`.ColumnClause`,
199 :class:`.Column`, :class:`.TableClause`, :class:`.Table`.
201 """
203 __slots__ = ()
205 _is_immutable: bool = True
207 def unique_params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
208 raise NotImplementedError("Immutable objects do not support copying")
210 def params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
211 raise NotImplementedError("Immutable objects do not support copying")
213 def _clone(self: _Self, **kw: Any) -> _Self:
214 return self
216 def _copy_internals(
217 self, *, omit_attrs: Iterable[str] = (), **kw: Any
218 ) -> None:
219 pass
222class SingletonConstant(Immutable):
223 """Represent SQL constants like NULL, TRUE, FALSE"""
225 _is_singleton_constant: bool = True
227 _singleton: SingletonConstant
229 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T:
230 return cast(_T, cls._singleton)
232 @util.non_memoized_property
233 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
234 raise NotImplementedError()
236 @classmethod
237 def _create_singleton(cls) -> None:
238 obj = object.__new__(cls)
239 obj.__init__() # type: ignore
241 # for a long time this was an empty frozenset, meaning
242 # a SingletonConstant would never be a "corresponding column" in
243 # a statement. This referred to #6259. However, in #7154 we see
244 # that we do in fact need "correspondence" to work when matching cols
245 # in result sets, so the non-correspondence was moved to a more
246 # specific level when we are actually adapting expressions for SQL
247 # render only.
248 obj.proxy_set = frozenset([obj])
249 cls._singleton = obj
252def _from_objects(
253 *elements: Union[
254 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement
255 ]
256) -> Iterator[FromClause]:
257 return itertools.chain.from_iterable(
258 [element._from_objects for element in elements]
259 )
262def _select_iterables(
263 elements: Iterable[roles.ColumnsClauseRole],
264) -> _SelectIterable:
265 """expand tables into individual columns in the
266 given list of column expressions.
268 """
269 return itertools.chain.from_iterable(
270 [c._select_iterable for c in elements]
271 )
274_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType")
277class _GenerativeType(Protocol):
278 def _generate(self) -> Self: ...
281def _generative(fn: _Fn) -> _Fn:
282 """non-caching _generative() decorator.
284 This is basically the legacy decorator that copies the object and
285 runs a method on the new copy.
287 """
289 @util.decorator
290 def _generative(
291 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any
292 ) -> _SelfGenerativeType:
293 """Mark a method as generative."""
295 self = self._generate()
296 x = fn(self, *args, **kw)
297 assert x is self, "generative methods must return self"
298 return self
300 decorated = _generative(fn)
301 decorated.non_generative = fn # type: ignore
302 return decorated
305def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]:
306 msgs: Dict[str, str] = kw.pop("msgs", {})
308 defaults: Dict[str, str] = kw.pop("defaults", {})
310 getters: List[Tuple[str, operator.attrgetter[Any], Optional[str]]] = [
311 (name, operator.attrgetter(name), defaults.get(name, None))
312 for name in names
313 ]
315 @util.decorator
316 def check(fn: _Fn, *args: Any, **kw: Any) -> Any:
317 # make pylance happy by not including "self" in the argument
318 # list
319 self = args[0]
320 args = args[1:]
321 for name, getter, default_ in getters:
322 if getter(self) is not default_:
323 msg = msgs.get(
324 name,
325 "Method %s() has already been invoked on this %s construct"
326 % (fn.__name__, self.__class__),
327 )
328 raise exc.InvalidRequestError(msg)
329 return fn(self, *args, **kw)
331 return check
334def _clone(element, **kw):
335 return element._clone(**kw)
338def _expand_cloned(
339 elements: Iterable[_CLE],
340) -> Iterable[_CLE]:
341 """expand the given set of ClauseElements to be the set of all 'cloned'
342 predecessors.
344 """
345 # TODO: cython candidate
346 return itertools.chain(*[x._cloned_set for x in elements])
349def _de_clone(
350 elements: Iterable[_CLE],
351) -> Iterable[_CLE]:
352 for x in elements:
353 while x._is_clone_of is not None:
354 x = x._is_clone_of
355 yield x
358def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
359 """return the intersection of sets a and b, counting
360 any overlap between 'cloned' predecessors.
362 The returned set is in terms of the entities present within 'a'.
364 """
365 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
366 _expand_cloned(b)
367 )
368 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)}
371def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
372 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
373 _expand_cloned(b)
374 )
375 return {
376 elem for elem in a if not all_overlap.intersection(elem._cloned_set)
377 }
380class _DialectArgView(MutableMapping[str, Any]):
381 """A dictionary view of dialect-level arguments in the form
382 <dialectname>_<argument_name>.
384 """
386 __slots__ = ("obj",)
388 def __init__(self, obj: DialectKWArgs) -> None:
389 self.obj = obj
391 def _key(self, key: str) -> Tuple[str, str]:
392 try:
393 dialect, value_key = key.split("_", 1)
394 except ValueError as err:
395 raise KeyError(key) from err
396 else:
397 return dialect, value_key
399 def __getitem__(self, key: str) -> Any:
400 dialect, value_key = self._key(key)
402 try:
403 opt = self.obj.dialect_options[dialect]
404 except exc.NoSuchModuleError as err:
405 raise KeyError(key) from err
406 else:
407 return opt[value_key]
409 def __setitem__(self, key: str, value: Any) -> None:
410 try:
411 dialect, value_key = self._key(key)
412 except KeyError as err:
413 raise exc.ArgumentError(
414 "Keys must be of the form <dialectname>_<argname>"
415 ) from err
416 else:
417 self.obj.dialect_options[dialect][value_key] = value
419 def __delitem__(self, key: str) -> None:
420 dialect, value_key = self._key(key)
421 del self.obj.dialect_options[dialect][value_key]
423 def __len__(self) -> int:
424 return sum(
425 len(args._non_defaults)
426 for args in self.obj.dialect_options.values()
427 )
429 def __iter__(self) -> Generator[str, None, None]:
430 return (
431 "%s_%s" % (dialect_name, value_name)
432 for dialect_name in self.obj.dialect_options
433 for value_name in self.obj.dialect_options[
434 dialect_name
435 ]._non_defaults
436 )
439class _DialectArgDict(MutableMapping[str, Any]):
440 """A dictionary view of dialect-level arguments for a specific
441 dialect.
443 Maintains a separate collection of user-specified arguments
444 and dialect-specified default arguments.
446 """
448 def __init__(self) -> None:
449 self._non_defaults: Dict[str, Any] = {}
450 self._defaults: Dict[str, Any] = {}
452 def __len__(self) -> int:
453 return len(set(self._non_defaults).union(self._defaults))
455 def __iter__(self) -> Iterator[str]:
456 return iter(set(self._non_defaults).union(self._defaults))
458 def __getitem__(self, key: str) -> Any:
459 if key in self._non_defaults:
460 return self._non_defaults[key]
461 else:
462 return self._defaults[key]
464 def __setitem__(self, key: str, value: Any) -> None:
465 self._non_defaults[key] = value
467 def __delitem__(self, key: str) -> None:
468 del self._non_defaults[key]
471@util.preload_module("sqlalchemy.dialects")
472def _kw_reg_for_dialect(dialect_name: str) -> Optional[Dict[Any, Any]]:
473 dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
474 if dialect_cls.construct_arguments is None:
475 return None
476 return dict(dialect_cls.construct_arguments)
479class DialectKWArgs:
480 """Establish the ability for a class to have dialect-specific arguments
481 with defaults and constructor validation.
483 The :class:`.DialectKWArgs` interacts with the
484 :attr:`.DefaultDialect.construct_arguments` present on a dialect.
486 .. seealso::
488 :attr:`.DefaultDialect.construct_arguments`
490 """
492 __slots__ = ()
494 _dialect_kwargs_traverse_internals: List[Tuple[str, Any]] = [
495 ("dialect_options", InternalTraversal.dp_dialect_options)
496 ]
498 @classmethod
499 def argument_for(
500 cls, dialect_name: str, argument_name: str, default: Any
501 ) -> None:
502 """Add a new kind of dialect-specific keyword argument for this class.
504 E.g.::
506 Index.argument_for("mydialect", "length", None)
508 some_index = Index("a", "b", mydialect_length=5)
510 The :meth:`.DialectKWArgs.argument_for` method is a per-argument
511 way adding extra arguments to the
512 :attr:`.DefaultDialect.construct_arguments` dictionary. This
513 dictionary provides a list of argument names accepted by various
514 schema-level constructs on behalf of a dialect.
516 New dialects should typically specify this dictionary all at once as a
517 data member of the dialect class. The use case for ad-hoc addition of
518 argument names is typically for end-user code that is also using
519 a custom compilation scheme which consumes the additional arguments.
521 :param dialect_name: name of a dialect. The dialect must be
522 locatable, else a :class:`.NoSuchModuleError` is raised. The
523 dialect must also include an existing
524 :attr:`.DefaultDialect.construct_arguments` collection, indicating
525 that it participates in the keyword-argument validation and default
526 system, else :class:`.ArgumentError` is raised. If the dialect does
527 not include this collection, then any keyword argument can be
528 specified on behalf of this dialect already. All dialects packaged
529 within SQLAlchemy include this collection, however for third party
530 dialects, support may vary.
532 :param argument_name: name of the parameter.
534 :param default: default value of the parameter.
536 """
538 construct_arg_dictionary: Optional[Dict[Any, Any]] = (
539 DialectKWArgs._kw_registry[dialect_name]
540 )
541 if construct_arg_dictionary is None:
542 raise exc.ArgumentError(
543 "Dialect '%s' does have keyword-argument "
544 "validation and defaults enabled configured" % dialect_name
545 )
546 if cls not in construct_arg_dictionary:
547 construct_arg_dictionary[cls] = {}
548 construct_arg_dictionary[cls][argument_name] = default
550 @property
551 def dialect_kwargs(self) -> _DialectArgView:
552 """A collection of keyword arguments specified as dialect-specific
553 options to this construct.
555 The arguments are present here in their original ``<dialect>_<kwarg>``
556 format. Only arguments that were actually passed are included;
557 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which
558 contains all options known by this dialect including defaults.
560 The collection is also writable; keys are accepted of the
561 form ``<dialect>_<kwarg>`` where the value will be assembled
562 into the list of options.
564 .. seealso::
566 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form
568 """
569 return _DialectArgView(self)
571 @property
572 def kwargs(self) -> _DialectArgView:
573 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
574 return self.dialect_kwargs
576 _kw_registry: util.PopulateDict[str, Optional[Dict[Any, Any]]] = (
577 util.PopulateDict(_kw_reg_for_dialect)
578 )
580 @classmethod
581 def _kw_reg_for_dialect_cls(cls, dialect_name: str) -> _DialectArgDict:
582 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
583 d = _DialectArgDict()
585 if construct_arg_dictionary is None:
586 d._defaults.update({"*": None})
587 else:
588 for cls in reversed(cls.__mro__):
589 if cls in construct_arg_dictionary:
590 d._defaults.update(construct_arg_dictionary[cls])
591 return d
593 @util.memoized_property
594 def dialect_options(self) -> util.PopulateDict[str, _DialectArgDict]:
595 """A collection of keyword arguments specified as dialect-specific
596 options to this construct.
598 This is a two-level nested registry, keyed to ``<dialect_name>``
599 and ``<argument_name>``. For example, the ``postgresql_where``
600 argument would be locatable as::
602 arg = my_object.dialect_options["postgresql"]["where"]
604 .. versionadded:: 0.9.2
606 .. seealso::
608 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form
610 """
612 return util.PopulateDict(self._kw_reg_for_dialect_cls)
614 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None:
615 # validate remaining kwargs that they all specify DB prefixes
617 if not kwargs:
618 return
620 for k in kwargs:
621 m = re.match("^(.+?)_(.+)$", k)
622 if not m:
623 raise TypeError(
624 "Additional arguments should be "
625 "named <dialectname>_<argument>, got '%s'" % k
626 )
627 dialect_name, arg_name = m.group(1, 2)
629 try:
630 construct_arg_dictionary = self.dialect_options[dialect_name]
631 except exc.NoSuchModuleError:
632 util.warn(
633 "Can't validate argument %r; can't "
634 "locate any SQLAlchemy dialect named %r"
635 % (k, dialect_name)
636 )
637 self.dialect_options[dialect_name] = d = _DialectArgDict()
638 d._defaults.update({"*": None})
639 d._non_defaults[arg_name] = kwargs[k]
640 else:
641 if (
642 "*" not in construct_arg_dictionary
643 and arg_name not in construct_arg_dictionary
644 ):
645 raise exc.ArgumentError(
646 "Argument %r is not accepted by "
647 "dialect %r on behalf of %r"
648 % (k, dialect_name, self.__class__)
649 )
650 else:
651 construct_arg_dictionary[arg_name] = kwargs[k]
654class CompileState:
655 """Produces additional object state necessary for a statement to be
656 compiled.
658 the :class:`.CompileState` class is at the base of classes that assemble
659 state for a particular statement object that is then used by the
660 compiler. This process is essentially an extension of the process that
661 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis
662 on converting raw user intent into more organized structures rather than
663 producing string output. The top-level :class:`.CompileState` for the
664 statement being executed is also accessible when the execution context
665 works with invoking the statement and collecting results.
667 The production of :class:`.CompileState` is specific to the compiler, such
668 as within the :meth:`.SQLCompiler.visit_insert`,
669 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also
670 responsible for associating the :class:`.CompileState` with the
671 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement,
672 i.e. the outermost SQL statement that's actually being executed.
673 There can be other :class:`.CompileState` objects that are not the
674 toplevel, such as when a SELECT subquery or CTE-nested
675 INSERT/UPDATE/DELETE is generated.
677 .. versionadded:: 1.4
679 """
681 __slots__ = ("statement", "_ambiguous_table_name_map")
683 plugins: Dict[Tuple[str, str], Type[CompileState]] = {}
685 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap]
687 @classmethod
688 def create_for_statement(
689 cls, statement: Executable, compiler: SQLCompiler, **kw: Any
690 ) -> CompileState:
691 # factory construction.
693 if statement._propagate_attrs:
694 plugin_name = statement._propagate_attrs.get(
695 "compile_state_plugin", "default"
696 )
697 klass = cls.plugins.get(
698 (plugin_name, statement._effective_plugin_target), None
699 )
700 if klass is None:
701 klass = cls.plugins[
702 ("default", statement._effective_plugin_target)
703 ]
705 else:
706 klass = cls.plugins[
707 ("default", statement._effective_plugin_target)
708 ]
710 if klass is cls:
711 return cls(statement, compiler, **kw)
712 else:
713 return klass.create_for_statement(statement, compiler, **kw)
715 def __init__(self, statement, compiler, **kw):
716 self.statement = statement
718 @classmethod
719 def get_plugin_class(
720 cls, statement: Executable
721 ) -> Optional[Type[CompileState]]:
722 plugin_name = statement._propagate_attrs.get(
723 "compile_state_plugin", None
724 )
726 if plugin_name:
727 key = (plugin_name, statement._effective_plugin_target)
728 if key in cls.plugins:
729 return cls.plugins[key]
731 # there's no case where we call upon get_plugin_class() and want
732 # to get None back, there should always be a default. return that
733 # if there was no plugin-specific class (e.g. Insert with "orm"
734 # plugin)
735 try:
736 return cls.plugins[("default", statement._effective_plugin_target)]
737 except KeyError:
738 return None
740 @classmethod
741 def _get_plugin_class_for_plugin(
742 cls, statement: Executable, plugin_name: str
743 ) -> Optional[Type[CompileState]]:
744 try:
745 return cls.plugins[
746 (plugin_name, statement._effective_plugin_target)
747 ]
748 except KeyError:
749 return None
751 @classmethod
752 def plugin_for(
753 cls, plugin_name: str, visit_name: str
754 ) -> Callable[[_Fn], _Fn]:
755 def decorate(cls_to_decorate):
756 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate
757 return cls_to_decorate
759 return decorate
762class Generative(HasMemoized):
763 """Provide a method-chaining pattern in conjunction with the
764 @_generative decorator."""
766 def _generate(self) -> Self:
767 skip = self._memoized_keys
768 cls = self.__class__
769 s = cls.__new__(cls)
770 if skip:
771 # ensure this iteration remains atomic
772 s.__dict__ = {
773 k: v for k, v in self.__dict__.copy().items() if k not in skip
774 }
775 else:
776 s.__dict__ = self.__dict__.copy()
777 return s
780class InPlaceGenerative(HasMemoized):
781 """Provide a method-chaining pattern in conjunction with the
782 @_generative decorator that mutates in place."""
784 __slots__ = ()
786 def _generate(self) -> Self:
787 skip = self._memoized_keys
788 # note __dict__ needs to be in __slots__ if this is used
789 for k in skip:
790 self.__dict__.pop(k, None)
791 return self
794class HasCompileState(Generative):
795 """A class that has a :class:`.CompileState` associated with it."""
797 _compile_state_plugin: Optional[Type[CompileState]] = None
799 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT
801 _compile_state_factory = CompileState.create_for_statement
804class _MetaOptions(type):
805 """metaclass for the Options class.
807 This metaclass is actually necessary despite the availability of the
808 ``__init_subclass__()`` hook as this type also provides custom class-level
809 behavior for the ``__add__()`` method.
811 """
813 _cache_attrs: Tuple[str, ...]
815 def __add__(self, other):
816 o1 = self()
818 if set(other).difference(self._cache_attrs):
819 raise TypeError(
820 "dictionary contains attributes not covered by "
821 "Options class %s: %r"
822 % (self, set(other).difference(self._cache_attrs))
823 )
825 o1.__dict__.update(other)
826 return o1
828 if TYPE_CHECKING:
830 def __getattr__(self, key: str) -> Any: ...
832 def __setattr__(self, key: str, value: Any) -> None: ...
834 def __delattr__(self, key: str) -> None: ...
837class Options(metaclass=_MetaOptions):
838 """A cacheable option dictionary with defaults."""
840 __slots__ = ()
842 _cache_attrs: Tuple[str, ...]
844 def __init_subclass__(cls) -> None:
845 dict_ = cls.__dict__
846 cls._cache_attrs = tuple(
847 sorted(
848 d
849 for d in dict_
850 if not d.startswith("__")
851 and d not in ("_cache_key_traversal",)
852 )
853 )
854 super().__init_subclass__()
856 def __init__(self, **kw: Any) -> None:
857 self.__dict__.update(kw)
859 def __add__(self, other):
860 o1 = self.__class__.__new__(self.__class__)
861 o1.__dict__.update(self.__dict__)
863 if set(other).difference(self._cache_attrs):
864 raise TypeError(
865 "dictionary contains attributes not covered by "
866 "Options class %s: %r"
867 % (self, set(other).difference(self._cache_attrs))
868 )
870 o1.__dict__.update(other)
871 return o1
873 def __eq__(self, other):
874 # TODO: very inefficient. This is used only in test suites
875 # right now.
876 for a, b in zip_longest(self._cache_attrs, other._cache_attrs):
877 if getattr(self, a) != getattr(other, b):
878 return False
879 return True
881 def __repr__(self) -> str:
882 # TODO: fairly inefficient, used only in debugging right now.
884 return "%s(%s)" % (
885 self.__class__.__name__,
886 ", ".join(
887 "%s=%r" % (k, self.__dict__[k])
888 for k in self._cache_attrs
889 if k in self.__dict__
890 ),
891 )
893 @classmethod
894 def isinstance(cls, klass: Type[Any]) -> bool:
895 return issubclass(cls, klass)
897 @hybridmethod
898 def add_to_element(self, name: str, value: str) -> Any:
899 return self + {name: getattr(self, name) + value}
901 @hybridmethod
902 def _state_dict_inst(self) -> Mapping[str, Any]:
903 return self.__dict__
905 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT
907 @_state_dict_inst.classlevel
908 def _state_dict(cls) -> Mapping[str, Any]:
909 return cls._state_dict_const
911 @classmethod
912 def safe_merge(cls, other: "Options") -> Any:
913 d = other._state_dict()
915 # only support a merge with another object of our class
916 # and which does not have attrs that we don't. otherwise
917 # we risk having state that might not be part of our cache
918 # key strategy
920 if (
921 cls is not other.__class__
922 and other._cache_attrs
923 and set(other._cache_attrs).difference(cls._cache_attrs)
924 ):
925 raise TypeError(
926 "other element %r is not empty, is not of type %s, "
927 "and contains attributes not covered here %r"
928 % (
929 other,
930 cls,
931 set(other._cache_attrs).difference(cls._cache_attrs),
932 )
933 )
934 return cls + d
936 @classmethod
937 def from_execution_options(
938 cls,
939 key: str,
940 attrs: set[str],
941 exec_options: Mapping[str, Any],
942 statement_exec_options: Mapping[str, Any],
943 ) -> Tuple["Options", Mapping[str, Any]]:
944 """process Options argument in terms of execution options.
947 e.g.::
949 (
950 load_options,
951 execution_options,
952 ) = QueryContext.default_load_options.from_execution_options(
953 "_sa_orm_load_options",
954 {"populate_existing", "autoflush", "yield_per"},
955 execution_options,
956 statement._execution_options,
957 )
959 get back the Options and refresh "_sa_orm_load_options" in the
960 exec options dict w/ the Options as well
962 """
964 # common case is that no options we are looking for are
965 # in either dictionary, so cancel for that first
966 check_argnames = attrs.intersection(
967 set(exec_options).union(statement_exec_options)
968 )
970 existing_options = exec_options.get(key, cls)
972 if check_argnames:
973 result = {}
974 for argname in check_argnames:
975 local = "_" + argname
976 if argname in exec_options:
977 result[local] = exec_options[argname]
978 elif argname in statement_exec_options:
979 result[local] = statement_exec_options[argname]
981 new_options = existing_options + result
982 exec_options = util.immutabledict().merge_with(
983 exec_options, {key: new_options}
984 )
985 return new_options, exec_options
987 else:
988 return existing_options, exec_options
990 if TYPE_CHECKING:
992 def __getattr__(self, key: str) -> Any: ...
994 def __setattr__(self, key: str, value: Any) -> None: ...
996 def __delattr__(self, key: str) -> None: ...
999class CacheableOptions(Options, HasCacheKey):
1000 __slots__ = ()
1002 @hybridmethod
1003 def _gen_cache_key_inst(
1004 self, anon_map: Any, bindparams: List[BindParameter[Any]]
1005 ) -> Optional[Tuple[Any]]:
1006 return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
1008 @_gen_cache_key_inst.classlevel
1009 def _gen_cache_key(
1010 cls, anon_map: "anon_map", bindparams: List[BindParameter[Any]]
1011 ) -> Tuple[CacheableOptions, Any]:
1012 return (cls, ())
1014 @hybridmethod
1015 def _generate_cache_key(self) -> Optional[CacheKey]:
1016 return HasCacheKey._generate_cache_key(self)
1019class ExecutableOption(HasCopyInternals):
1020 __slots__ = ()
1022 _annotations: _ImmutableExecuteOptions = util.EMPTY_DICT
1024 __visit_name__: str = "executable_option"
1026 _is_has_cache_key: bool = False
1028 _is_core: bool = True
1030 def _clone(self, **kw):
1031 """Create a shallow copy of this ExecutableOption."""
1032 c = self.__class__.__new__(self.__class__)
1033 c.__dict__ = dict(self.__dict__) # type: ignore
1034 return c
1037_L = TypeVar("_L", bound=str)
1040class HasSyntaxExtensions(Generic[_L]):
1042 _position_map: Mapping[_L, str]
1044 @_generative
1045 def ext(self, extension: SyntaxExtension) -> Self:
1046 """Applies a SQL syntax extension to this statement.
1048 SQL syntax extensions are :class:`.ClauseElement` objects that define
1049 some vendor-specific syntactical construct that take place in specific
1050 parts of a SQL statement. Examples include vendor extensions like
1051 PostgreSQL / SQLite's "ON DUPLICATE KEY UPDATE", PostgreSQL's
1052 "DISTINCT ON", and MySQL's "LIMIT" that can be applied to UPDATE
1053 and DELETE statements.
1055 .. seealso::
1057 :ref:`examples_syntax_extensions`
1059 :func:`_mysql.limit` - DML LIMIT for MySQL
1061 :func:`_postgresql.distinct_on` - DISTINCT ON for PostgreSQL
1063 .. versionadded:: 2.1
1065 """
1066 extension = coercions.expect(
1067 roles.SyntaxExtensionRole, extension, apply_propagate_attrs=self
1068 )
1069 self._apply_syntax_extension_to_self(extension)
1070 return self
1072 @util.preload_module("sqlalchemy.sql.elements")
1073 def apply_syntax_extension_point(
1074 self,
1075 apply_fn: Callable[[Sequence[ClauseElement]], Sequence[ClauseElement]],
1076 position: _L,
1077 ) -> None:
1078 """Apply a :class:`.SyntaxExtension` to a known extension point.
1080 Should be used only internally by :class:`.SyntaxExtension`.
1082 E.g.::
1084 class Qualify(SyntaxExtension, ClauseElement):
1086 # ...
1088 def apply_to_select(self, select_stmt: Select) -> None:
1089 # append self to existing
1090 select_stmt.apply_extension_point(
1091 lambda existing: [*existing, self], "post_criteria"
1092 )
1095 class ReplaceExt(SyntaxExtension, ClauseElement):
1097 # ...
1099 def apply_to_select(self, select_stmt: Select) -> None:
1100 # replace any existing elements regardless of type
1101 select_stmt.apply_extension_point(
1102 lambda existing: [self], "post_criteria"
1103 )
1106 class ReplaceOfTypeExt(SyntaxExtension, ClauseElement):
1108 # ...
1110 def apply_to_select(self, select_stmt: Select) -> None:
1111 # replace any existing elements of the same type
1112 select_stmt.apply_extension_point(
1113 self.append_replacing_same_type, "post_criteria"
1114 )
1116 :param apply_fn: callable function that will receive a sequence of
1117 :class:`.ClauseElement` that is already populating the extension
1118 point (the sequence is empty if there isn't one), and should return
1119 a new sequence of :class:`.ClauseElement` that will newly populate
1120 that point. The function typically can choose to concatenate the
1121 existing values with the new one, or to replace the values that are
1122 there with a new one by returning a list of a single element, or
1123 to perform more complex operations like removing only the same
1124 type element from the input list of merging already existing elements
1125 of the same type. Some examples are shown in the examples above
1126 :param position: string name of the position to apply to. This
1127 varies per statement type. IDEs should show the possible values
1128 for each statement type as it's typed with a ``typing.Literal`` per
1129 statement.
1131 .. seealso::
1133 :ref:`examples_syntax_extensions`
1136 """ # noqa: E501
1138 try:
1139 attrname = self._position_map[position]
1140 except KeyError as ke:
1141 raise ValueError(
1142 f"Unknown position {position!r} for {self.__class__} "
1143 f"construct; known positions: "
1144 f"{', '.join(repr(k) for k in self._position_map)}"
1145 ) from ke
1146 else:
1147 ElementList = util.preloaded.sql_elements.ElementList
1148 existing: Optional[ClauseElement] = getattr(self, attrname, None)
1149 if existing is None:
1150 input_seq: Tuple[ClauseElement, ...] = ()
1151 elif isinstance(existing, ElementList):
1152 input_seq = existing.clauses
1153 else:
1154 input_seq = (existing,)
1156 new_seq = apply_fn(input_seq)
1157 assert new_seq, "cannot return empty sequence"
1158 new = new_seq[0] if len(new_seq) == 1 else ElementList(new_seq)
1159 setattr(self, attrname, new)
1161 def _apply_syntax_extension_to_self(
1162 self, extension: SyntaxExtension
1163 ) -> None:
1164 raise NotImplementedError()
1166 def _get_syntax_extensions_as_dict(self) -> Mapping[_L, SyntaxExtension]:
1167 res: Dict[_L, SyntaxExtension] = {}
1168 for name, attr in self._position_map.items():
1169 value = getattr(self, attr)
1170 if value is not None:
1171 res[name] = value
1172 return res
1174 def _set_syntax_extensions(self, **extensions: SyntaxExtension) -> None:
1175 for name, value in extensions.items():
1176 setattr(self, self._position_map[name], value) # type: ignore[index] # noqa: E501
1179class SyntaxExtension(roles.SyntaxExtensionRole):
1180 """Defines a unit that when also extending from :class:`.ClauseElement`
1181 can be applied to SQLAlchemy statements :class:`.Select`,
1182 :class:`_sql.Insert`, :class:`.Update` and :class:`.Delete` making use of
1183 pre-established SQL insertion points within these constructs.
1185 .. versionadded:: 2.1
1187 .. seealso::
1189 :ref:`examples_syntax_extensions`
1191 """
1193 def append_replacing_same_type(
1194 self, existing: Sequence[ClauseElement]
1195 ) -> Sequence[ClauseElement]:
1196 """Utility function that can be used as
1197 :paramref:`_sql.HasSyntaxExtensions.apply_extension_point.apply_fn`
1198 to remove any other element of the same type in existing and appending
1199 ``self`` to the list.
1201 This is equivalent to::
1203 stmt.apply_extension_point(
1204 lambda existing: [
1205 *(e for e in existing if not isinstance(e, ReplaceOfTypeExt)),
1206 self,
1207 ],
1208 "post_criteria",
1209 )
1211 .. seealso::
1213 :ref:`examples_syntax_extensions`
1215 :meth:`_sql.HasSyntaxExtensions.apply_syntax_extension_point`
1217 """ # noqa: E501
1218 cls = type(self)
1219 return [*(e for e in existing if not isinstance(e, cls)), self] # type: ignore[list-item] # noqa: E501
1221 def apply_to_select(self, select_stmt: Select[Unpack[_Ts]]) -> None:
1222 """Apply this :class:`.SyntaxExtension` to a :class:`.Select`"""
1223 raise NotImplementedError(
1224 f"Extension {type(self).__name__} cannot be applied to select"
1225 )
1227 def apply_to_update(self, update_stmt: Update) -> None:
1228 """Apply this :class:`.SyntaxExtension` to an :class:`.Update`"""
1229 raise NotImplementedError(
1230 f"Extension {type(self).__name__} cannot be applied to update"
1231 )
1233 def apply_to_delete(self, delete_stmt: Delete) -> None:
1234 """Apply this :class:`.SyntaxExtension` to a :class:`.Delete`"""
1235 raise NotImplementedError(
1236 f"Extension {type(self).__name__} cannot be applied to delete"
1237 )
1239 def apply_to_insert(self, insert_stmt: Insert) -> None:
1240 """Apply this :class:`.SyntaxExtension` to an :class:`_sql.Insert`"""
1241 raise NotImplementedError(
1242 f"Extension {type(self).__name__} cannot be applied to insert"
1243 )
1246class Executable(roles.StatementRole):
1247 """Mark a :class:`_expression.ClauseElement` as supporting execution.
1249 :class:`.Executable` is a superclass for all "statement" types
1250 of objects, including :func:`select`, :func:`delete`, :func:`update`,
1251 :func:`insert`, :func:`text`.
1253 """
1255 supports_execution: bool = True
1256 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT
1257 _is_default_generator: bool = False
1258 _with_options: Tuple[ExecutableOption, ...] = ()
1259 _compile_state_funcs: Tuple[
1260 Tuple[Callable[[CompileState], None], Any], ...
1261 ] = ()
1262 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]]
1264 _executable_traverse_internals = [
1265 ("_with_options", InternalTraversal.dp_executable_options),
1266 (
1267 "_compile_state_funcs",
1268 ExtendedInternalTraversal.dp_compile_state_funcs,
1269 ),
1270 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
1271 ]
1273 is_select: bool = False
1274 is_from_statement: bool = False
1275 is_update: bool = False
1276 is_insert: bool = False
1277 is_text: bool = False
1278 is_delete: bool = False
1279 is_dml: bool = False
1281 if TYPE_CHECKING:
1282 __visit_name__: str
1284 def _compile_w_cache(
1285 self,
1286 dialect: Dialect,
1287 *,
1288 compiled_cache: Optional[CompiledCacheType],
1289 column_keys: List[str],
1290 for_executemany: bool = False,
1291 schema_translate_map: Optional[SchemaTranslateMapType] = None,
1292 **kw: Any,
1293 ) -> tuple[
1294 Compiled,
1295 Sequence[BindParameter[Any]] | None,
1296 _CoreSingleExecuteParams | None,
1297 CacheStats,
1298 ]: ...
1300 def _execute_on_connection(
1301 self,
1302 connection: Connection,
1303 distilled_params: _CoreMultiExecuteParams,
1304 execution_options: CoreExecuteOptionsParameter,
1305 ) -> CursorResult[Any]: ...
1307 def _execute_on_scalar(
1308 self,
1309 connection: Connection,
1310 distilled_params: _CoreMultiExecuteParams,
1311 execution_options: CoreExecuteOptionsParameter,
1312 ) -> Any: ...
1314 @util.ro_non_memoized_property
1315 def _all_selected_columns(self) -> _SelectIterable:
1316 raise NotImplementedError()
1318 @property
1319 def _effective_plugin_target(self) -> str:
1320 return self.__visit_name__
1322 @_generative
1323 def options(self, *options: ExecutableOption) -> Self:
1324 """Apply options to this statement.
1326 In the general sense, options are any kind of Python object
1327 that can be interpreted by systems that consume the statement outside
1328 of the regular SQL compiler chain. Specifically, these options are
1329 the ORM level options that apply "eager load" and other loading
1330 behaviors to an ORM query.
1332 For background on specific kinds of options for specific kinds of
1333 statements, refer to the documentation for those option objects.
1335 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to
1336 Core statement objects towards the goal of allowing unified
1337 Core / ORM querying capabilities.
1339 .. seealso::
1341 :ref:`loading_columns` - refers to options specific to the usage
1342 of ORM queries
1344 :ref:`relationship_loader_options` - refers to options specific
1345 to the usage of ORM queries
1347 """
1348 self._with_options += tuple(
1349 coercions.expect(roles.ExecutableOptionRole, opt)
1350 for opt in options
1351 )
1352 return self
1354 @_generative
1355 def _set_compile_options(self, compile_options: CacheableOptions) -> Self:
1356 """Assign the compile options to a new value.
1358 :param compile_options: appropriate CacheableOptions structure
1360 """
1362 self._compile_options = compile_options
1363 return self
1365 @_generative
1366 def _update_compile_options(self, options: CacheableOptions) -> Self:
1367 """update the _compile_options with new keys."""
1369 assert self._compile_options is not None
1370 self._compile_options += options
1371 return self
1373 @_generative
1374 def _add_compile_state_func(
1375 self,
1376 callable_: Callable[[CompileState], None],
1377 cache_args: Any,
1378 ) -> Self:
1379 """Add a compile state function to this statement.
1381 When using the ORM only, these are callable functions that will
1382 be given the CompileState object upon compilation.
1384 A second argument cache_args is required, which will be combined with
1385 the ``__code__`` identity of the function itself in order to produce a
1386 cache key.
1388 """
1389 self._compile_state_funcs += ((callable_, cache_args),)
1390 return self
1392 @overload
1393 def execution_options(
1394 self,
1395 *,
1396 compiled_cache: Optional[CompiledCacheType] = ...,
1397 logging_token: str = ...,
1398 isolation_level: IsolationLevel = ...,
1399 no_parameters: bool = False,
1400 stream_results: bool = False,
1401 max_row_buffer: int = ...,
1402 yield_per: int = ...,
1403 driver_column_names: bool = ...,
1404 insertmanyvalues_page_size: int = ...,
1405 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
1406 populate_existing: bool = False,
1407 autoflush: bool = False,
1408 synchronize_session: SynchronizeSessionArgument = ...,
1409 dml_strategy: DMLStrategyArgument = ...,
1410 render_nulls: bool = ...,
1411 is_delete_using: bool = ...,
1412 is_update_from: bool = ...,
1413 preserve_rowcount: bool = False,
1414 **opt: Any,
1415 ) -> Self: ...
1417 @overload
1418 def execution_options(self, **opt: Any) -> Self: ...
1420 @_generative
1421 def execution_options(self, **kw: Any) -> Self:
1422 """Set non-SQL options for the statement which take effect during
1423 execution.
1425 Execution options can be set at many scopes, including per-statement,
1426 per-connection, or per execution, using methods such as
1427 :meth:`_engine.Connection.execution_options` and parameters which
1428 accept a dictionary of options such as
1429 :paramref:`_engine.Connection.execute.execution_options` and
1430 :paramref:`_orm.Session.execute.execution_options`.
1432 The primary characteristic of an execution option, as opposed to
1433 other kinds of options such as ORM loader options, is that
1434 **execution options never affect the compiled SQL of a query, only
1435 things that affect how the SQL statement itself is invoked or how
1436 results are fetched**. That is, execution options are not part of
1437 what's accommodated by SQL compilation nor are they considered part of
1438 the cached state of a statement.
1440 The :meth:`_sql.Executable.execution_options` method is
1441 :term:`generative`, as
1442 is the case for the method as applied to the :class:`_engine.Engine`
1443 and :class:`_orm.Query` objects, which means when the method is called,
1444 a copy of the object is returned, which applies the given parameters to
1445 that new copy, but leaves the original unchanged::
1447 statement = select(table.c.x, table.c.y)
1448 new_statement = statement.execution_options(my_option=True)
1450 An exception to this behavior is the :class:`_engine.Connection`
1451 object, where the :meth:`_engine.Connection.execution_options` method
1452 is explicitly **not** generative.
1454 The kinds of options that may be passed to
1455 :meth:`_sql.Executable.execution_options` and other related methods and
1456 parameter dictionaries include parameters that are explicitly consumed
1457 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not
1458 defined by SQLAlchemy, which means the methods and/or parameter
1459 dictionaries may be used for user-defined parameters that interact with
1460 custom code, which may access the parameters using methods such as
1461 :meth:`_sql.Executable.get_execution_options` and
1462 :meth:`_engine.Connection.get_execution_options`, or within selected
1463 event hooks using a dedicated ``execution_options`` event parameter
1464 such as
1465 :paramref:`_events.ConnectionEvents.before_execute.execution_options`
1466 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.::
1468 from sqlalchemy import event
1471 @event.listens_for(some_engine, "before_execute")
1472 def _process_opt(conn, statement, multiparams, params, execution_options):
1473 "run a SQL function before invoking a statement"
1475 if execution_options.get("do_special_thing", False):
1476 conn.exec_driver_sql("run_special_function()")
1478 Within the scope of options that are explicitly recognized by
1479 SQLAlchemy, most apply to specific classes of objects and not others.
1480 The most common execution options include:
1482 * :paramref:`_engine.Connection.execution_options.isolation_level` -
1483 sets the isolation level for a connection or a class of connections
1484 via an :class:`_engine.Engine`. This option is accepted only
1485 by :class:`_engine.Connection` or :class:`_engine.Engine`.
1487 * :paramref:`_engine.Connection.execution_options.stream_results` -
1488 indicates results should be fetched using a server side cursor;
1489 this option is accepted by :class:`_engine.Connection`, by the
1490 :paramref:`_engine.Connection.execute.execution_options` parameter
1491 on :meth:`_engine.Connection.execute`, and additionally by
1492 :meth:`_sql.Executable.execution_options` on a SQL statement object,
1493 as well as by ORM constructs like :meth:`_orm.Session.execute`.
1495 * :paramref:`_engine.Connection.execution_options.compiled_cache` -
1496 indicates a dictionary that will serve as the
1497 :ref:`SQL compilation cache <sql_caching>`
1498 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as
1499 well as for ORM methods like :meth:`_orm.Session.execute`.
1500 Can be passed as ``None`` to disable caching for statements.
1501 This option is not accepted by
1502 :meth:`_sql.Executable.execution_options` as it is inadvisable to
1503 carry along a compilation cache within a statement object.
1505 * :paramref:`_engine.Connection.execution_options.schema_translate_map`
1506 - a mapping of schema names used by the
1507 :ref:`Schema Translate Map <schema_translating>` feature, accepted
1508 by :class:`_engine.Connection`, :class:`_engine.Engine`,
1509 :class:`_sql.Executable`, as well as by ORM constructs
1510 like :meth:`_orm.Session.execute`.
1512 .. seealso::
1514 :meth:`_engine.Connection.execution_options`
1516 :paramref:`_engine.Connection.execute.execution_options`
1518 :paramref:`_orm.Session.execute.execution_options`
1520 :ref:`orm_queryguide_execution_options` - documentation on all
1521 ORM-specific execution options
1523 """ # noqa: E501
1524 if "isolation_level" in kw:
1525 raise exc.ArgumentError(
1526 "'isolation_level' execution option may only be specified "
1527 "on Connection.execution_options(), or "
1528 "per-engine using the isolation_level "
1529 "argument to create_engine()."
1530 )
1531 if "compiled_cache" in kw:
1532 raise exc.ArgumentError(
1533 "'compiled_cache' execution option may only be specified "
1534 "on Connection.execution_options(), not per statement."
1535 )
1536 self._execution_options = self._execution_options.union(kw)
1537 return self
1539 def get_execution_options(self) -> _ExecuteOptions:
1540 """Get the non-SQL options which will take effect during execution.
1542 .. seealso::
1544 :meth:`.Executable.execution_options`
1545 """
1546 return self._execution_options
1549class ExecutableStatement(Executable):
1550 """Executable subclass that implements a lightweight version of ``params``
1551 that avoids a full cloned traverse.
1553 .. versionadded:: 2.1
1555 """
1557 _params: util.immutabledict[str, Any] = EMPTY_DICT
1559 _executable_traverse_internals = (
1560 Executable._executable_traverse_internals
1561 + [("_params", InternalTraversal.dp_params)]
1562 )
1564 @_generative
1565 def params(
1566 self,
1567 __optionaldict: _CoreSingleExecuteParams | None = None,
1568 /,
1569 **kwargs: Any,
1570 ) -> Self:
1571 """Return a copy with the provided bindparam values.
1573 Returns a copy of this Executable with bindparam values set
1574 to the given dictionary::
1576 >>> clause = column("x") + bindparam("foo")
1577 >>> print(clause.compile().params)
1578 {'foo': None}
1579 >>> print(clause.params({"foo": 7}).compile().params)
1580 {'foo': 7}
1582 """
1583 if __optionaldict:
1584 kwargs.update(__optionaldict)
1585 self._params = (
1586 util.immutabledict(kwargs)
1587 if not self._params
1588 else self._params | kwargs
1589 )
1590 return self
1593class SchemaEventTarget(event.EventTarget):
1594 """Base class for elements that are the targets of :class:`.DDLEvents`
1595 events.
1597 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
1599 """
1601 dispatch: dispatcher[SchemaEventTarget]
1603 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
1604 """Associate with this SchemaEvent's parent object."""
1606 def _set_parent_with_dispatch(
1607 self, parent: SchemaEventTarget, **kw: Any
1608 ) -> None:
1609 self.dispatch.before_parent_attach(self, parent)
1610 self._set_parent(parent, **kw)
1611 self.dispatch.after_parent_attach(self, parent)
1614class SchemaVisitable(SchemaEventTarget, visitors.Visitable):
1615 """Base class for elements that are targets of a :class:`.SchemaVisitor`.
1617 .. versionadded:: 2.0.41
1619 """
1622class SchemaVisitor(ClauseVisitor):
1623 """Define the visiting for ``SchemaItem`` and more
1624 generally ``SchemaVisitable`` objects.
1626 """
1628 __traverse_options__: Dict[str, Any] = {"schema_visitor": True}
1631class _SentinelDefaultCharacterization(Enum):
1632 NONE = "none"
1633 UNKNOWN = "unknown"
1634 CLIENTSIDE = "clientside"
1635 SENTINEL_DEFAULT = "sentinel_default"
1636 SERVERSIDE = "serverside"
1637 IDENTITY = "identity"
1638 SEQUENCE = "sequence"
1639 MONOTONIC_FUNCTION = "monotonic"
1642class _SentinelColumnCharacterization(NamedTuple):
1643 columns: Optional[Sequence[Column[Any]]] = None
1644 is_explicit: bool = False
1645 is_autoinc: bool = False
1646 default_characterization: _SentinelDefaultCharacterization = (
1647 _SentinelDefaultCharacterization.NONE
1648 )
1651_COLKEY = TypeVar("_COLKEY", Union[None, str], str)
1653_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True)
1654_COL = TypeVar("_COL", bound="ColumnElement[Any]")
1657class _ColumnMetrics(Generic[_COL_co]):
1658 __slots__ = ("column",)
1660 column: _COL_co
1662 def __init__(
1663 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co
1664 ) -> None:
1665 self.column = col
1667 # proxy_index being non-empty means it was initialized.
1668 # so we need to update it
1669 pi = collection._proxy_index
1670 if pi:
1671 for eps_col in col._expanded_proxy_set:
1672 pi[eps_col].add(self)
1674 def get_expanded_proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
1675 return self.column._expanded_proxy_set
1677 def dispose(self, collection: ColumnCollection[_COLKEY, _COL_co]) -> None:
1678 pi = collection._proxy_index
1679 if not pi:
1680 return
1681 for col in self.column._expanded_proxy_set:
1682 colset = pi.get(col, None)
1683 if colset:
1684 colset.discard(self)
1685 if colset is not None and not colset:
1686 del pi[col]
1688 def embedded(
1689 self,
1690 target_set: Union[
1691 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]]
1692 ],
1693 ) -> bool:
1694 expanded_proxy_set = self.column._expanded_proxy_set
1695 for t in target_set.difference(expanded_proxy_set):
1696 if not expanded_proxy_set.intersection(_expand_cloned([t])):
1697 return False
1698 return True
1701class ColumnCollection(Generic[_COLKEY, _COL_co]):
1702 """Collection of :class:`_expression.ColumnElement` instances,
1703 typically for
1704 :class:`_sql.FromClause` objects.
1706 The :class:`_sql.ColumnCollection` object is most commonly available
1707 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection
1708 on the :class:`_schema.Table` object, introduced at
1709 :ref:`metadata_tables_and_columns`.
1711 The :class:`_expression.ColumnCollection` has both mapping- and sequence-
1712 like behaviors. A :class:`_expression.ColumnCollection` usually stores
1713 :class:`_schema.Column` objects, which are then accessible both via mapping
1714 style access as well as attribute access style.
1716 To access :class:`_schema.Column` objects using ordinary attribute-style
1717 access, specify the name like any other object attribute, such as below
1718 a column named ``employee_name`` is accessed::
1720 >>> employee_table.c.employee_name
1722 To access columns that have names with special characters or spaces,
1723 index-style access is used, such as below which illustrates a column named
1724 ``employee ' payment`` is accessed::
1726 >>> employee_table.c["employee ' payment"]
1728 As the :class:`_sql.ColumnCollection` object provides a Python dictionary
1729 interface, common dictionary method names like
1730 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`,
1731 and :meth:`_sql.ColumnCollection.items` are available, which means that
1732 database columns that are keyed under these names also need to use indexed
1733 access::
1735 >>> employee_table.c["values"]
1738 The name for which a :class:`_schema.Column` would be present is normally
1739 that of the :paramref:`_schema.Column.key` parameter. In some contexts,
1740 such as a :class:`_sql.Select` object that uses a label style set
1741 using the :meth:`_sql.Select.set_label_style` method, a column of a certain
1742 key may instead be represented under a particular label name such
1743 as ``tablename_columnname``::
1745 >>> from sqlalchemy import select, column, table
1746 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
1747 >>> t = table("t", column("c"))
1748 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
1749 >>> subq = stmt.subquery()
1750 >>> subq.c.t_c
1751 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c>
1753 :class:`.ColumnCollection` also indexes the columns in order and allows
1754 them to be accessible by their integer position::
1756 >>> cc[0]
1757 Column('x', Integer(), table=None)
1758 >>> cc[1]
1759 Column('y', Integer(), table=None)
1761 .. versionadded:: 1.4 :class:`_expression.ColumnCollection`
1762 allows integer-based
1763 index access to the collection.
1765 Iterating the collection yields the column expressions in order::
1767 >>> list(cc)
1768 [Column('x', Integer(), table=None),
1769 Column('y', Integer(), table=None)]
1771 The base :class:`_expression.ColumnCollection` object can store
1772 duplicates, which can
1773 mean either two columns with the same key, in which case the column
1774 returned by key access is **arbitrary**::
1776 >>> x1, x2 = Column("x", Integer), Column("x", Integer)
1777 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)])
1778 >>> list(cc)
1779 [Column('x', Integer(), table=None),
1780 Column('x', Integer(), table=None)]
1781 >>> cc["x"] is x1
1782 False
1783 >>> cc["x"] is x2
1784 True
1786 Or it can also mean the same column multiple times. These cases are
1787 supported as :class:`_expression.ColumnCollection`
1788 is used to represent the columns in
1789 a SELECT statement which may include duplicates.
1791 A special subclass :class:`.DedupeColumnCollection` exists which instead
1792 maintains SQLAlchemy's older behavior of not allowing duplicates; this
1793 collection is used for schema level objects like :class:`_schema.Table`
1794 and
1795 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The
1796 :class:`.DedupeColumnCollection` class also has additional mutation methods
1797 as the schema constructs have more use cases that require removal and
1798 replacement of columns.
1800 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection`
1801 now stores duplicate
1802 column keys as well as the same column in multiple positions. The
1803 :class:`.DedupeColumnCollection` class is added to maintain the
1804 former behavior in those cases where deduplication as well as
1805 additional replace/remove operations are needed.
1808 """
1810 __slots__ = ("_collection", "_index", "_colset", "_proxy_index")
1812 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]]
1813 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
1814 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]]
1815 _colset: Set[_COL_co]
1817 def __init__(
1818 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None
1819 ):
1820 object.__setattr__(self, "_colset", set())
1821 object.__setattr__(self, "_index", {})
1822 object.__setattr__(
1823 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1824 )
1825 object.__setattr__(self, "_collection", [])
1826 if columns:
1827 self._initial_populate(columns)
1829 @util.preload_module("sqlalchemy.sql.elements")
1830 def __clause_element__(self) -> ClauseList:
1831 elements = util.preloaded.sql_elements
1833 return elements.ClauseList(
1834 _literal_as_text_role=roles.ColumnsClauseRole,
1835 group=False,
1836 *self._all_columns,
1837 )
1839 def _initial_populate(
1840 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1841 ) -> None:
1842 self._populate_separate_keys(iter_)
1844 @property
1845 def _all_columns(self) -> List[_COL_co]:
1846 return [col for (_, col, _) in self._collection]
1848 def keys(self) -> List[_COLKEY]:
1849 """Return a sequence of string key names for all columns in this
1850 collection."""
1851 return [k for (k, _, _) in self._collection]
1853 def values(self) -> List[_COL_co]:
1854 """Return a sequence of :class:`_sql.ColumnClause` or
1855 :class:`_schema.Column` objects for all columns in this
1856 collection."""
1857 return [col for (_, col, _) in self._collection]
1859 def items(self) -> List[Tuple[_COLKEY, _COL_co]]:
1860 """Return a sequence of (key, column) tuples for all columns in this
1861 collection each consisting of a string key name and a
1862 :class:`_sql.ColumnClause` or
1863 :class:`_schema.Column` object.
1864 """
1866 return [(k, col) for (k, col, _) in self._collection]
1868 def __bool__(self) -> bool:
1869 return bool(self._collection)
1871 def __len__(self) -> int:
1872 return len(self._collection)
1874 def __iter__(self) -> Iterator[_COL_co]:
1875 # turn to a list first to maintain over a course of changes
1876 return iter([col for _, col, _ in self._collection])
1878 @overload
1879 def __getitem__(self, key: Union[str, int]) -> _COL_co: ...
1881 @overload
1882 def __getitem__(
1883 self, key: Tuple[Union[str, int], ...]
1884 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1886 @overload
1887 def __getitem__(
1888 self, key: slice
1889 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1891 def __getitem__(
1892 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]]
1893 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]:
1894 try:
1895 if isinstance(key, (tuple, slice)):
1896 if isinstance(key, slice):
1897 cols = (
1898 (sub_key, col)
1899 for (sub_key, col, _) in self._collection[key]
1900 )
1901 else:
1902 cols = (self._index[sub_key] for sub_key in key)
1904 return ColumnCollection(cols).as_readonly()
1905 else:
1906 return self._index[key][1]
1907 except KeyError as err:
1908 if isinstance(err.args[0], int):
1909 raise IndexError(err.args[0]) from err
1910 else:
1911 raise
1913 def __getattr__(self, key: str) -> _COL_co:
1914 try:
1915 return self._index[key][1]
1916 except KeyError as err:
1917 raise AttributeError(key) from err
1919 def __contains__(self, key: str) -> bool:
1920 if key not in self._index:
1921 if not isinstance(key, str):
1922 raise exc.ArgumentError(
1923 "__contains__ requires a string argument"
1924 )
1925 return False
1926 else:
1927 return True
1929 def compare(self, other: ColumnCollection[_COLKEY, _COL_co]) -> bool:
1930 """Compare this :class:`_expression.ColumnCollection` to another
1931 based on the names of the keys"""
1933 for l, r in zip_longest(self, other):
1934 if l is not r:
1935 return False
1936 else:
1937 return True
1939 def __eq__(self, other: Any) -> bool:
1940 return self.compare(other)
1942 @overload
1943 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ...
1945 @overload
1946 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ...
1948 def get(
1949 self, key: str, default: Optional[_COL] = None
1950 ) -> Optional[Union[_COL_co, _COL]]:
1951 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object
1952 based on a string key name from this
1953 :class:`_expression.ColumnCollection`."""
1955 if key in self._index:
1956 return self._index[key][1]
1957 else:
1958 return default
1960 def __str__(self) -> str:
1961 return "%s(%s)" % (
1962 self.__class__.__name__,
1963 ", ".join(str(c) for c in self),
1964 )
1966 def __setitem__(self, key: str, value: Any) -> NoReturn:
1967 raise NotImplementedError()
1969 def __delitem__(self, key: str) -> NoReturn:
1970 raise NotImplementedError()
1972 def __setattr__(self, key: str, obj: Any) -> NoReturn:
1973 raise NotImplementedError()
1975 def clear(self) -> NoReturn:
1976 """Dictionary clear() is not implemented for
1977 :class:`_sql.ColumnCollection`."""
1978 raise NotImplementedError()
1980 def remove(self, column: Any) -> NoReturn:
1981 raise NotImplementedError()
1983 def update(self, iter_: Any) -> NoReturn:
1984 """Dictionary update() is not implemented for
1985 :class:`_sql.ColumnCollection`."""
1986 raise NotImplementedError()
1988 # https://github.com/python/mypy/issues/4266
1989 __hash__: Optional[int] = None # type: ignore
1991 def _populate_separate_keys(
1992 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1993 ) -> None:
1994 """populate from an iterator of (key, column)"""
1996 self._collection[:] = collection = [
1997 (k, c, _ColumnMetrics(self, c)) for k, c in iter_
1998 ]
1999 self._colset.update(c._deannotate() for _, c, _ in collection)
2000 self._index.update(
2001 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)}
2002 )
2003 self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
2005 def add(
2006 self,
2007 column: ColumnElement[Any],
2008 key: Optional[_COLKEY] = None,
2009 ) -> None:
2010 """Add a column to this :class:`_sql.ColumnCollection`.
2012 .. note::
2014 This method is **not normally used by user-facing code**, as the
2015 :class:`_sql.ColumnCollection` is usually part of an existing
2016 object such as a :class:`_schema.Table`. To add a
2017 :class:`_schema.Column` to an existing :class:`_schema.Table`
2018 object, use the :meth:`_schema.Table.append_column` method.
2020 """
2021 colkey: _COLKEY
2023 if key is None:
2024 colkey = column.key # type: ignore
2025 else:
2026 colkey = key
2028 l = len(self._collection)
2030 # don't really know how this part is supposed to work w/ the
2031 # covariant thing
2033 _column = cast(_COL_co, column)
2035 self._collection.append(
2036 (colkey, _column, _ColumnMetrics(self, _column))
2037 )
2038 self._colset.add(_column._deannotate())
2040 self._index[l] = (colkey, _column)
2041 if colkey not in self._index:
2042 self._index[colkey] = (colkey, _column)
2044 def __getstate__(self) -> Dict[str, Any]:
2045 return {
2046 "_collection": [(k, c) for k, c, _ in self._collection],
2047 "_index": self._index,
2048 }
2050 def __setstate__(self, state: Dict[str, Any]) -> None:
2051 object.__setattr__(self, "_index", state["_index"])
2052 object.__setattr__(
2053 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
2054 )
2055 object.__setattr__(
2056 self,
2057 "_collection",
2058 [
2059 (k, c, _ColumnMetrics(self, c))
2060 for (k, c) in state["_collection"]
2061 ],
2062 )
2063 object.__setattr__(
2064 self, "_colset", {col for k, col, _ in self._collection}
2065 )
2067 def contains_column(self, col: ColumnElement[Any]) -> bool:
2068 """Checks if a column object exists in this collection"""
2069 if col not in self._colset:
2070 if isinstance(col, str):
2071 raise exc.ArgumentError(
2072 "contains_column cannot be used with string arguments. "
2073 "Use ``col_name in table.c`` instead."
2074 )
2075 return False
2076 else:
2077 return True
2079 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
2080 """Return a "read only" form of this
2081 :class:`_sql.ColumnCollection`."""
2083 return ReadOnlyColumnCollection(self)
2085 def _init_proxy_index(self) -> None:
2086 """populate the "proxy index", if empty.
2088 proxy index is added in 2.0 to provide more efficient operation
2089 for the corresponding_column() method.
2091 For reasons of both time to construct new .c collections as well as
2092 memory conservation for large numbers of large .c collections, the
2093 proxy_index is only filled if corresponding_column() is called. once
2094 filled it stays that way, and new _ColumnMetrics objects created after
2095 that point will populate it with new data. Note this case would be
2096 unusual, if not nonexistent, as it means a .c collection is being
2097 mutated after corresponding_column() were used, however it is tested in
2098 test/base/test_utils.py.
2100 """
2101 pi = self._proxy_index
2102 if pi:
2103 return
2105 for _, _, metrics in self._collection:
2106 eps = metrics.column._expanded_proxy_set
2108 for eps_col in eps:
2109 pi[eps_col].add(metrics)
2111 def corresponding_column(
2112 self, column: _COL, require_embedded: bool = False
2113 ) -> Optional[Union[_COL, _COL_co]]:
2114 """Given a :class:`_expression.ColumnElement`, return the exported
2115 :class:`_expression.ColumnElement` object from this
2116 :class:`_expression.ColumnCollection`
2117 which corresponds to that original :class:`_expression.ColumnElement`
2118 via a common
2119 ancestor column.
2121 :param column: the target :class:`_expression.ColumnElement`
2122 to be matched.
2124 :param require_embedded: only return corresponding columns for
2125 the given :class:`_expression.ColumnElement`, if the given
2126 :class:`_expression.ColumnElement`
2127 is actually present within a sub-element
2128 of this :class:`_expression.Selectable`.
2129 Normally the column will match if
2130 it merely shares a common ancestor with one of the exported
2131 columns of this :class:`_expression.Selectable`.
2133 .. seealso::
2135 :meth:`_expression.Selectable.corresponding_column`
2136 - invokes this method
2137 against the collection returned by
2138 :attr:`_expression.Selectable.exported_columns`.
2140 .. versionchanged:: 1.4 the implementation for ``corresponding_column``
2141 was moved onto the :class:`_expression.ColumnCollection` itself.
2143 """
2144 # TODO: cython candidate
2146 # don't dig around if the column is locally present
2147 if column in self._colset:
2148 return column
2150 selected_intersection, selected_metrics = None, None
2151 target_set = column.proxy_set
2153 pi = self._proxy_index
2154 if not pi:
2155 self._init_proxy_index()
2157 for current_metrics in (
2158 mm for ts in target_set if ts in pi for mm in pi[ts]
2159 ):
2160 if not require_embedded or current_metrics.embedded(target_set):
2161 if selected_metrics is None:
2162 # no corresponding column yet, pick this one.
2163 selected_metrics = current_metrics
2164 continue
2166 current_intersection = target_set.intersection(
2167 current_metrics.column._expanded_proxy_set
2168 )
2169 if selected_intersection is None:
2170 selected_intersection = target_set.intersection(
2171 selected_metrics.column._expanded_proxy_set
2172 )
2174 if len(current_intersection) > len(selected_intersection):
2175 # 'current' has a larger field of correspondence than
2176 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x
2177 # matches a1.c.x->table.c.x better than
2178 # selectable.c.x->table.c.x does.
2180 selected_metrics = current_metrics
2181 selected_intersection = current_intersection
2182 elif current_intersection == selected_intersection:
2183 # they have the same field of correspondence. see
2184 # which proxy_set has fewer columns in it, which
2185 # indicates a closer relationship with the root
2186 # column. Also take into account the "weight"
2187 # attribute which CompoundSelect() uses to give
2188 # higher precedence to columns based on vertical
2189 # position in the compound statement, and discard
2190 # columns that have no reference to the target
2191 # column (also occurs with CompoundSelect)
2193 selected_col_distance = sum(
2194 [
2195 sc._annotations.get("weight", 1)
2196 for sc in (
2197 selected_metrics.column._uncached_proxy_list()
2198 )
2199 if sc.shares_lineage(column)
2200 ],
2201 )
2202 current_col_distance = sum(
2203 [
2204 sc._annotations.get("weight", 1)
2205 for sc in (
2206 current_metrics.column._uncached_proxy_list()
2207 )
2208 if sc.shares_lineage(column)
2209 ],
2210 )
2211 if current_col_distance < selected_col_distance:
2212 selected_metrics = current_metrics
2213 selected_intersection = current_intersection
2215 return selected_metrics.column if selected_metrics else None
2218_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]")
2221class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
2222 """A :class:`_expression.ColumnCollection`
2223 that maintains deduplicating behavior.
2225 This is useful by schema level objects such as :class:`_schema.Table` and
2226 :class:`.PrimaryKeyConstraint`. The collection includes more
2227 sophisticated mutator methods as well to suit schema objects which
2228 require mutable column collections.
2230 .. versionadded:: 1.4
2232 """
2234 def add( # type: ignore[override]
2235 self,
2236 column: _NAMEDCOL,
2237 key: Optional[str] = None,
2238 *,
2239 index: Optional[int] = None,
2240 ) -> None:
2241 if key is not None and column.key != key:
2242 raise exc.ArgumentError(
2243 "DedupeColumnCollection requires columns be under "
2244 "the same key as their .key"
2245 )
2246 key = column.key
2248 if key is None:
2249 raise exc.ArgumentError(
2250 "Can't add unnamed column to column collection"
2251 )
2253 if key in self._index:
2254 existing = self._index[key][1]
2256 if existing is column:
2257 return
2259 self.replace(column, index=index)
2261 # pop out memoized proxy_set as this
2262 # operation may very well be occurring
2263 # in a _make_proxy operation
2264 util.memoized_property.reset(column, "proxy_set")
2265 else:
2266 self._append_new_column(key, column, index=index)
2268 def _append_new_column(
2269 self, key: str, named_column: _NAMEDCOL, *, index: Optional[int] = None
2270 ) -> None:
2271 collection_length = len(self._collection)
2273 if index is None:
2274 l = collection_length
2275 else:
2276 if index < 0:
2277 index = max(0, collection_length + index)
2278 l = index
2280 if index is None:
2281 self._collection.append(
2282 (key, named_column, _ColumnMetrics(self, named_column))
2283 )
2284 else:
2285 self._collection.insert(
2286 index, (key, named_column, _ColumnMetrics(self, named_column))
2287 )
2289 self._colset.add(named_column._deannotate())
2291 if index is not None:
2292 for idx in reversed(range(index, collection_length)):
2293 self._index[idx + 1] = self._index[idx]
2295 self._index[l] = (key, named_column)
2296 self._index[key] = (key, named_column)
2298 def _populate_separate_keys(
2299 self, iter_: Iterable[Tuple[str, _NAMEDCOL]]
2300 ) -> None:
2301 """populate from an iterator of (key, column)"""
2302 cols = list(iter_)
2304 replace_col = []
2305 for k, col in cols:
2306 if col.key != k:
2307 raise exc.ArgumentError(
2308 "DedupeColumnCollection requires columns be under "
2309 "the same key as their .key"
2310 )
2311 if col.name in self._index and col.key != col.name:
2312 replace_col.append(col)
2313 elif col.key in self._index:
2314 replace_col.append(col)
2315 else:
2316 self._index[k] = (k, col)
2317 self._collection.append((k, col, _ColumnMetrics(self, col)))
2318 self._colset.update(c._deannotate() for (k, c, _) in self._collection)
2320 self._index.update(
2321 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection)
2322 )
2323 for col in replace_col:
2324 self.replace(col)
2326 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None:
2327 self._populate_separate_keys((col.key, col) for col in iter_)
2329 def remove(self, column: _NAMEDCOL) -> None: # type: ignore[override]
2330 if column not in self._colset:
2331 raise ValueError(
2332 "Can't remove column %r; column is not in this collection"
2333 % column
2334 )
2335 del self._index[column.key]
2336 self._colset.remove(column)
2337 self._collection[:] = [
2338 (k, c, metrics)
2339 for (k, c, metrics) in self._collection
2340 if c is not column
2341 ]
2342 for metrics in self._proxy_index.get(column, ()):
2343 metrics.dispose(self)
2345 self._index.update(
2346 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2347 )
2348 # delete higher index
2349 del self._index[len(self._collection)]
2351 def replace(
2352 self,
2353 column: _NAMEDCOL,
2354 *,
2355 extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
2356 index: Optional[int] = None,
2357 ) -> None:
2358 """add the given column to this collection, removing unaliased
2359 versions of this column as well as existing columns with the
2360 same key.
2362 e.g.::
2364 t = Table("sometable", metadata, Column("col1", Integer))
2365 t.columns.replace(Column("col1", Integer, key="columnone"))
2367 will remove the original 'col1' from the collection, and add
2368 the new column under the name 'columnname'.
2370 Used by schema.Column to override columns during table reflection.
2372 """
2374 if extra_remove:
2375 remove_col = set(extra_remove)
2376 else:
2377 remove_col = set()
2378 # remove up to two columns based on matches of name as well as key
2379 if column.name in self._index and column.key != column.name:
2380 other = self._index[column.name][1]
2381 if other.name == other.key:
2382 remove_col.add(other)
2384 if column.key in self._index:
2385 remove_col.add(self._index[column.key][1])
2387 if not remove_col:
2388 self._append_new_column(column.key, column, index=index)
2389 return
2390 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
2391 replace_index = None
2393 for idx, (k, col, metrics) in enumerate(self._collection):
2394 if col in remove_col:
2395 if replace_index is None:
2396 replace_index = idx
2397 new_cols.append(
2398 (column.key, column, _ColumnMetrics(self, column))
2399 )
2400 else:
2401 new_cols.append((k, col, metrics))
2403 if remove_col:
2404 self._colset.difference_update(remove_col)
2406 for rc in remove_col:
2407 for metrics in self._proxy_index.get(rc, ()):
2408 metrics.dispose(self)
2410 if replace_index is None:
2411 if index is not None:
2412 new_cols.insert(
2413 index, (column.key, column, _ColumnMetrics(self, column))
2414 )
2416 else:
2417 new_cols.append(
2418 (column.key, column, _ColumnMetrics(self, column))
2419 )
2420 elif index is not None:
2421 to_move = new_cols[replace_index]
2422 effective_positive_index = (
2423 index if index >= 0 else max(0, len(new_cols) + index)
2424 )
2425 new_cols.insert(index, to_move)
2426 if replace_index > effective_positive_index:
2427 del new_cols[replace_index + 1]
2428 else:
2429 del new_cols[replace_index]
2431 self._colset.add(column._deannotate())
2432 self._collection[:] = new_cols
2434 self._index.clear()
2436 self._index.update(
2437 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2438 )
2439 self._index.update({k: (k, col) for (k, col, _) in self._collection})
2442class ReadOnlyColumnCollection(
2443 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co]
2444):
2445 __slots__ = ("_parent",)
2447 def __init__(self, collection: ColumnCollection[_COLKEY, _COL_co]):
2448 object.__setattr__(self, "_parent", collection)
2449 object.__setattr__(self, "_colset", collection._colset)
2450 object.__setattr__(self, "_index", collection._index)
2451 object.__setattr__(self, "_collection", collection._collection)
2452 object.__setattr__(self, "_proxy_index", collection._proxy_index)
2454 def __getstate__(self) -> Dict[str, _COL_co]:
2455 return {"_parent": self._parent}
2457 def __setstate__(self, state: Dict[str, Any]) -> None:
2458 parent = state["_parent"]
2459 self.__init__(parent) # type: ignore
2461 def add(self, column: Any, key: Any = ...) -> Any:
2462 self._readonly()
2464 def extend(self, elements: Any) -> NoReturn:
2465 self._readonly()
2467 def remove(self, item: Any) -> NoReturn:
2468 self._readonly()
2471class ColumnSet(util.OrderedSet["ColumnClause[Any]"]):
2472 def contains_column(self, col: ColumnClause[Any]) -> bool:
2473 return col in self
2475 def extend(self, cols: Iterable[Any]) -> None:
2476 for col in cols:
2477 self.add(col)
2479 def __eq__(self, other):
2480 l = []
2481 for c in other:
2482 for local in self:
2483 if c.shares_lineage(local):
2484 l.append(c == local)
2485 return elements.and_(*l)
2487 def __hash__(self) -> int: # type: ignore[override]
2488 return hash(tuple(x for x in self))
2491def _entity_namespace(
2492 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2493) -> _EntityNamespace:
2494 """Return the nearest .entity_namespace for the given entity.
2496 If not immediately available, does an iterate to find a sub-element
2497 that has one, if any.
2499 """
2500 try:
2501 return cast(_HasEntityNamespace, entity).entity_namespace
2502 except AttributeError:
2503 for elem in visitors.iterate(cast(ExternallyTraversible, entity)):
2504 if _is_has_entity_namespace(elem):
2505 return elem.entity_namespace
2506 else:
2507 raise
2510@overload
2511def _entity_namespace_key(
2512 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2513 key: str,
2514) -> SQLCoreOperations[Any]: ...
2517@overload
2518def _entity_namespace_key(
2519 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2520 key: str,
2521 default: _NoArg,
2522) -> SQLCoreOperations[Any]: ...
2525@overload
2526def _entity_namespace_key(
2527 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2528 key: str,
2529 default: _T,
2530) -> Union[SQLCoreOperations[Any], _T]: ...
2533def _entity_namespace_key(
2534 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2535 key: str,
2536 default: Union[SQLCoreOperations[Any], _T, _NoArg] = NO_ARG,
2537) -> Union[SQLCoreOperations[Any], _T]:
2538 """Return an entry from an entity_namespace.
2541 Raises :class:`_exc.InvalidRequestError` rather than attribute error
2542 on not found.
2544 """
2546 try:
2547 ns = _entity_namespace(entity)
2548 if default is not NO_ARG:
2549 return getattr(ns, key, default)
2550 else:
2551 return getattr(ns, key) # type: ignore
2552 except AttributeError as err:
2553 raise exc.InvalidRequestError(
2554 'Entity namespace for "%s" has no property "%s"' % (entity, key)
2555 ) from err
2558def _entity_namespace_key_search_all(
2559 entities: Collection[Any],
2560 key: str,
2561) -> SQLCoreOperations[Any]:
2562 """Search multiple entities for a key, raise if ambiguous or not found.
2564 This is used by filter_by() to search across all FROM clause entities
2565 when a single entity doesn't have the requested attribute.
2567 .. versionadded:: 2.1
2569 Raises:
2570 AmbiguousColumnError: If key exists in multiple entities
2571 InvalidRequestError: If key doesn't exist in any entity
2572 """
2574 match_: SQLCoreOperations[Any] | None = None
2576 for entity in entities:
2577 ns = _entity_namespace(entity)
2578 # Check if the attribute exists
2579 if hasattr(ns, key):
2580 if match_ is not None:
2581 entity_desc = ", ".join(str(e) for e in list(entities)[:3])
2582 if len(entities) > 3:
2583 entity_desc += f", ... ({len(entities)} total)"
2584 raise exc.AmbiguousColumnError(
2585 f'Attribute name "{key}" is ambiguous; it exists in '
2586 f"multiple FROM clause entities ({entity_desc}). "
2587 f"Use filter() with explicit column references instead "
2588 f"of filter_by()."
2589 )
2590 match_ = getattr(ns, key)
2592 if match_ is None:
2593 # No entity has this attribute
2594 entity_desc = ", ".join(str(e) for e in list(entities)[:3])
2595 if len(entities) > 3:
2596 entity_desc += f", ... ({len(entities)} total)"
2597 raise exc.InvalidRequestError(
2598 f'None of the FROM clause entities have a property "{key}". '
2599 f"Searched entities: {entity_desc}"
2600 )
2602 return match_