Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/base.py: 48%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# sql/base.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
9"""Foundational utilities common to many sql modules."""
12from __future__ import annotations
14import collections
15from enum import Enum
16import itertools
17from itertools import zip_longest
18import operator
19import re
20from typing import Any
21from typing import Callable
22from typing import cast
23from typing import Collection
24from typing import Dict
25from typing import Final
26from typing import FrozenSet
27from typing import Generator
28from typing import Generic
29from typing import Iterable
30from typing import Iterator
31from typing import List
32from typing import Mapping
33from typing import MutableMapping
34from typing import NamedTuple
35from typing import NoReturn
36from typing import Optional
37from typing import overload
38from typing import Protocol
39from typing import Sequence
40from typing import Set
41from typing import Tuple
42from typing import Type
43from typing import TYPE_CHECKING
44from typing import TypeGuard
45from typing import TypeVar
46from typing import Union
48from . import roles
49from . import visitors
50from .cache_key import HasCacheKey # noqa
51from .cache_key import MemoizedHasCacheKey # noqa
52from .traversals import HasCopyInternals # noqa
53from .visitors import ClauseVisitor
54from .visitors import ExtendedInternalTraversal
55from .visitors import ExternallyTraversible
56from .visitors import InternalTraversal
57from .. import event
58from .. import exc
59from .. import util
60from ..util import EMPTY_DICT
61from ..util import HasMemoized as HasMemoized
62from ..util import hybridmethod
63from ..util.typing import Self
64from ..util.typing import TypeVarTuple
65from ..util.typing import Unpack
67if TYPE_CHECKING:
68 from . import coercions
69 from . import elements
70 from . import type_api
71 from ._orm_types import DMLStrategyArgument
72 from ._orm_types import SynchronizeSessionArgument
73 from ._typing import _CLE
74 from .cache_key import CacheKey
75 from .compiler import SQLCompiler
76 from .dml import Delete
77 from .dml import Insert
78 from .dml import Update
79 from .elements import BindParameter
80 from .elements import ClauseElement
81 from .elements import ClauseList
82 from .elements import ColumnClause # noqa
83 from .elements import ColumnElement
84 from .elements import NamedColumn
85 from .elements import SQLCoreOperations
86 from .elements import TextClause
87 from .schema import Column
88 from .schema import DefaultGenerator
89 from .selectable import _JoinTargetElement
90 from .selectable import _SelectIterable
91 from .selectable import FromClause
92 from .selectable import Select
93 from .visitors import anon_map
94 from ..engine import Connection
95 from ..engine import CursorResult
96 from ..engine.interfaces import _CoreMultiExecuteParams
97 from ..engine.interfaces import _CoreSingleExecuteParams
98 from ..engine.interfaces import _ExecuteOptions
99 from ..engine.interfaces import _ImmutableExecuteOptions
100 from ..engine.interfaces import CacheStats
101 from ..engine.interfaces import Compiled
102 from ..engine.interfaces import CompiledCacheType
103 from ..engine.interfaces import CoreExecuteOptionsParameter
104 from ..engine.interfaces import Dialect
105 from ..engine.interfaces import IsolationLevel
106 from ..engine.interfaces import SchemaTranslateMapType
107 from ..event import dispatcher
109if not TYPE_CHECKING:
110 coercions = None # noqa
111 elements = None # noqa
112 type_api = None # noqa
115_Ts = TypeVarTuple("_Ts")
118class _NoArg(Enum):
119 NO_ARG = 0
121 def __repr__(self):
122 return f"_NoArg.{self.name}"
125NO_ARG: Final = _NoArg.NO_ARG
128class _NoneName(Enum):
129 NONE_NAME = 0
130 """indicate a 'deferred' name that was ultimately the value None."""
133_NONE_NAME: Final = _NoneName.NONE_NAME
135_T = TypeVar("_T", bound=Any)
137_Fn = TypeVar("_Fn", bound=Callable[..., Any])
139_AmbiguousTableNameMap = MutableMapping[str, str]
142class _DefaultDescriptionTuple(NamedTuple):
143 arg: Any
144 is_scalar: Optional[bool]
145 is_callable: Optional[bool]
146 is_sentinel: Optional[bool]
148 @classmethod
149 def _from_column_default(
150 cls, default: Optional[DefaultGenerator]
151 ) -> _DefaultDescriptionTuple:
152 return (
153 _DefaultDescriptionTuple(
154 default.arg, # type: ignore
155 default.is_scalar,
156 default.is_callable,
157 default.is_sentinel,
158 )
159 if default
160 and (
161 default.has_arg
162 or (not default.for_update and default.is_sentinel)
163 )
164 else _DefaultDescriptionTuple(None, None, None, None)
165 )
168_never_select_column: operator.attrgetter[Any] = operator.attrgetter(
169 "_omit_from_statements"
170)
173class _EntityNamespace(Protocol):
174 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ...
177class _HasEntityNamespace(Protocol):
178 @util.ro_non_memoized_property
179 def entity_namespace(self) -> _EntityNamespace: ...
182def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]:
183 return hasattr(element, "entity_namespace")
186# Remove when https://github.com/python/mypy/issues/14640 will be fixed
187_Self = TypeVar("_Self", bound=Any)
190class Immutable:
191 """mark a ClauseElement as 'immutable' when expressions are cloned.
193 "immutable" objects refers to the "mutability" of an object in the
194 context of SQL DQL and DML generation. Such as, in DQL, one can
195 compose a SELECT or subquery of varied forms, but one cannot modify
196 the structure of a specific table or column within DQL.
197 :class:`.Immutable` is mostly intended to follow this concept, and as
198 such the primary "immutable" objects are :class:`.ColumnClause`,
199 :class:`.Column`, :class:`.TableClause`, :class:`.Table`.
201 """
203 __slots__ = ()
205 _is_immutable: bool = True
207 def unique_params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
208 raise NotImplementedError("Immutable objects do not support copying")
210 def params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
211 raise NotImplementedError("Immutable objects do not support copying")
213 def _clone(self: _Self, **kw: Any) -> _Self:
214 return self
216 def _copy_internals(
217 self, *, omit_attrs: Iterable[str] = (), **kw: Any
218 ) -> None:
219 pass
222class SingletonConstant(Immutable):
223 """Represent SQL constants like NULL, TRUE, FALSE"""
225 _is_singleton_constant: bool = True
227 _singleton: SingletonConstant
229 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T:
230 return cast(_T, cls._singleton)
232 @util.non_memoized_property
233 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
234 raise NotImplementedError()
236 @classmethod
237 def _create_singleton(cls) -> None:
238 obj = object.__new__(cls)
239 obj.__init__() # type: ignore
241 # for a long time this was an empty frozenset, meaning
242 # a SingletonConstant would never be a "corresponding column" in
243 # a statement. This referred to #6259. However, in #7154 we see
244 # that we do in fact need "correspondence" to work when matching cols
245 # in result sets, so the non-correspondence was moved to a more
246 # specific level when we are actually adapting expressions for SQL
247 # render only.
248 obj.proxy_set = frozenset([obj])
249 cls._singleton = obj
252def _from_objects(
253 *elements: Union[
254 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement
255 ]
256) -> Iterator[FromClause]:
257 return itertools.chain.from_iterable(
258 [element._from_objects for element in elements]
259 )
262def _select_iterables(
263 elements: Iterable[roles.ColumnsClauseRole],
264) -> _SelectIterable:
265 """expand tables into individual columns in the
266 given list of column expressions.
268 """
269 return itertools.chain.from_iterable(
270 [c._select_iterable for c in elements]
271 )
274_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType")
277class _GenerativeType(Protocol):
278 def _generate(self) -> Self: ...
281def _generative(fn: _Fn) -> _Fn:
282 """non-caching _generative() decorator.
284 This is basically the legacy decorator that copies the object and
285 runs a method on the new copy.
287 """
289 @util.decorator
290 def _generative(
291 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any
292 ) -> _SelfGenerativeType:
293 """Mark a method as generative."""
295 self = self._generate()
296 x = fn(self, *args, **kw)
297 assert x is self, "generative methods must return self"
298 return self
300 decorated = _generative(fn)
301 decorated.non_generative = fn # type: ignore
302 return decorated
305def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]:
306 msgs: Dict[str, str] = kw.pop("msgs", {})
308 defaults: Dict[str, str] = kw.pop("defaults", {})
310 getters: List[Tuple[str, operator.attrgetter[Any], Optional[str]]] = [
311 (name, operator.attrgetter(name), defaults.get(name, None))
312 for name in names
313 ]
315 @util.decorator
316 def check(fn: _Fn, *args: Any, **kw: Any) -> Any:
317 # make pylance happy by not including "self" in the argument
318 # list
319 self = args[0]
320 args = args[1:]
321 for name, getter, default_ in getters:
322 if getter(self) is not default_:
323 msg = msgs.get(
324 name,
325 "Method %s() has already been invoked on this %s construct"
326 % (fn.__name__, self.__class__),
327 )
328 raise exc.InvalidRequestError(msg)
329 return fn(self, *args, **kw)
331 return check
334def _clone(element, **kw):
335 return element._clone(**kw)
338def _expand_cloned(
339 elements: Iterable[_CLE],
340) -> Iterable[_CLE]:
341 """expand the given set of ClauseElements to be the set of all 'cloned'
342 predecessors.
344 """
345 # TODO: cython candidate
346 return itertools.chain(*[x._cloned_set for x in elements])
349def _de_clone(
350 elements: Iterable[_CLE],
351) -> Iterable[_CLE]:
352 for x in elements:
353 while x._is_clone_of is not None:
354 x = x._is_clone_of
355 yield x
358def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
359 """return the intersection of sets a and b, counting
360 any overlap between 'cloned' predecessors.
362 The returned set is in terms of the entities present within 'a'.
364 """
365 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
366 _expand_cloned(b)
367 )
368 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)}
371def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
372 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
373 _expand_cloned(b)
374 )
375 return {
376 elem for elem in a if not all_overlap.intersection(elem._cloned_set)
377 }
380class _DialectArgView(MutableMapping[str, Any]):
381 """A dictionary view of dialect-level arguments in the form
382 <dialectname>_<argument_name>.
384 """
386 __slots__ = ("obj",)
388 def __init__(self, obj: DialectKWArgs) -> None:
389 self.obj = obj
391 def _key(self, key: str) -> Tuple[str, str]:
392 try:
393 dialect, value_key = key.split("_", 1)
394 except ValueError as err:
395 raise KeyError(key) from err
396 else:
397 return dialect, value_key
399 def __getitem__(self, key: str) -> Any:
400 dialect, value_key = self._key(key)
402 try:
403 opt = self.obj.dialect_options[dialect]
404 except exc.NoSuchModuleError as err:
405 raise KeyError(key) from err
406 else:
407 return opt[value_key]
409 def __setitem__(self, key: str, value: Any) -> None:
410 try:
411 dialect, value_key = self._key(key)
412 except KeyError as err:
413 raise exc.ArgumentError(
414 "Keys must be of the form <dialectname>_<argname>"
415 ) from err
416 else:
417 self.obj.dialect_options[dialect][value_key] = value
419 def __delitem__(self, key: str) -> None:
420 dialect, value_key = self._key(key)
421 del self.obj.dialect_options[dialect][value_key]
423 def __len__(self) -> int:
424 return sum(
425 len(args._non_defaults)
426 for args in self.obj.dialect_options.values()
427 )
429 def __iter__(self) -> Generator[str, None, None]:
430 return (
431 "%s_%s" % (dialect_name, value_name)
432 for dialect_name in self.obj.dialect_options
433 for value_name in self.obj.dialect_options[
434 dialect_name
435 ]._non_defaults
436 )
439class _DialectArgDict(MutableMapping[str, Any]):
440 """A dictionary view of dialect-level arguments for a specific
441 dialect.
443 Maintains a separate collection of user-specified arguments
444 and dialect-specified default arguments.
446 """
448 def __init__(self) -> None:
449 self._non_defaults: Dict[str, Any] = {}
450 self._defaults: Dict[str, Any] = {}
452 def __len__(self) -> int:
453 return len(set(self._non_defaults).union(self._defaults))
455 def __iter__(self) -> Iterator[str]:
456 return iter(set(self._non_defaults).union(self._defaults))
458 def __getitem__(self, key: str) -> Any:
459 if key in self._non_defaults:
460 return self._non_defaults[key]
461 else:
462 return self._defaults[key]
464 def __setitem__(self, key: str, value: Any) -> None:
465 self._non_defaults[key] = value
467 def __delitem__(self, key: str) -> None:
468 del self._non_defaults[key]
471@util.preload_module("sqlalchemy.dialects")
472def _kw_reg_for_dialect(dialect_name: str) -> Optional[Dict[Any, Any]]:
473 dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
474 if dialect_cls.construct_arguments is None:
475 return None
476 return dict(dialect_cls.construct_arguments)
479class DialectKWArgs:
480 """Establish the ability for a class to have dialect-specific arguments
481 with defaults and constructor validation.
483 The :class:`.DialectKWArgs` interacts with the
484 :attr:`.DefaultDialect.construct_arguments` present on a dialect.
486 .. seealso::
488 :attr:`.DefaultDialect.construct_arguments`
490 """
492 __slots__ = ()
494 _dialect_kwargs_traverse_internals: List[Tuple[str, Any]] = [
495 ("dialect_options", InternalTraversal.dp_dialect_options)
496 ]
498 @classmethod
499 def argument_for(
500 cls, dialect_name: str, argument_name: str, default: Any
501 ) -> None:
502 """Add a new kind of dialect-specific keyword argument for this class.
504 E.g.::
506 Index.argument_for("mydialect", "length", None)
508 some_index = Index("a", "b", mydialect_length=5)
510 The :meth:`.DialectKWArgs.argument_for` method is a per-argument
511 way adding extra arguments to the
512 :attr:`.DefaultDialect.construct_arguments` dictionary. This
513 dictionary provides a list of argument names accepted by various
514 schema-level constructs on behalf of a dialect.
516 New dialects should typically specify this dictionary all at once as a
517 data member of the dialect class. The use case for ad-hoc addition of
518 argument names is typically for end-user code that is also using
519 a custom compilation scheme which consumes the additional arguments.
521 :param dialect_name: name of a dialect. The dialect must be
522 locatable, else a :class:`.NoSuchModuleError` is raised. The
523 dialect must also include an existing
524 :attr:`.DefaultDialect.construct_arguments` collection, indicating
525 that it participates in the keyword-argument validation and default
526 system, else :class:`.ArgumentError` is raised. If the dialect does
527 not include this collection, then any keyword argument can be
528 specified on behalf of this dialect already. All dialects packaged
529 within SQLAlchemy include this collection, however for third party
530 dialects, support may vary.
532 :param argument_name: name of the parameter.
534 :param default: default value of the parameter.
536 """
538 construct_arg_dictionary: Optional[Dict[Any, Any]] = (
539 DialectKWArgs._kw_registry[dialect_name]
540 )
541 if construct_arg_dictionary is None:
542 raise exc.ArgumentError(
543 "Dialect '%s' does have keyword-argument "
544 "validation and defaults enabled configured" % dialect_name
545 )
546 if cls not in construct_arg_dictionary:
547 construct_arg_dictionary[cls] = {}
548 construct_arg_dictionary[cls][argument_name] = default
550 @property
551 def dialect_kwargs(self) -> _DialectArgView:
552 """A collection of keyword arguments specified as dialect-specific
553 options to this construct.
555 The arguments are present here in their original ``<dialect>_<kwarg>``
556 format. Only arguments that were actually passed are included;
557 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which
558 contains all options known by this dialect including defaults.
560 The collection is also writable; keys are accepted of the
561 form ``<dialect>_<kwarg>`` where the value will be assembled
562 into the list of options.
564 .. seealso::
566 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form
568 """
569 return _DialectArgView(self)
571 @property
572 def kwargs(self) -> _DialectArgView:
573 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
574 return self.dialect_kwargs
576 _kw_registry: util.PopulateDict[str, Optional[Dict[Any, Any]]] = (
577 util.PopulateDict(_kw_reg_for_dialect)
578 )
580 @classmethod
581 def _kw_reg_for_dialect_cls(cls, dialect_name: str) -> _DialectArgDict:
582 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
583 d = _DialectArgDict()
585 if construct_arg_dictionary is None:
586 d._defaults.update({"*": None})
587 else:
588 for cls in reversed(cls.__mro__):
589 if cls in construct_arg_dictionary:
590 d._defaults.update(construct_arg_dictionary[cls])
591 return d
593 @util.memoized_property
594 def dialect_options(self) -> util.PopulateDict[str, _DialectArgDict]:
595 """A collection of keyword arguments specified as dialect-specific
596 options to this construct.
598 This is a two-level nested registry, keyed to ``<dialect_name>``
599 and ``<argument_name>``. For example, the ``postgresql_where``
600 argument would be locatable as::
602 arg = my_object.dialect_options["postgresql"]["where"]
604 .. versionadded:: 0.9.2
606 .. seealso::
608 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form
610 """
612 return util.PopulateDict(self._kw_reg_for_dialect_cls)
614 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None:
615 # validate remaining kwargs that they all specify DB prefixes
617 if not kwargs:
618 return
620 for k in kwargs:
621 m = re.match("^(.+?)_(.+)$", k)
622 if not m:
623 raise TypeError(
624 "Additional arguments should be "
625 "named <dialectname>_<argument>, got '%s'" % k
626 )
627 dialect_name, arg_name = m.group(1, 2)
629 try:
630 construct_arg_dictionary = self.dialect_options[dialect_name]
631 except exc.NoSuchModuleError:
632 util.warn(
633 "Can't validate argument %r; can't "
634 "locate any SQLAlchemy dialect named %r"
635 % (k, dialect_name)
636 )
637 self.dialect_options[dialect_name] = d = _DialectArgDict()
638 d._defaults.update({"*": None})
639 d._non_defaults[arg_name] = kwargs[k]
640 else:
641 if (
642 "*" not in construct_arg_dictionary
643 and arg_name not in construct_arg_dictionary
644 ):
645 raise exc.ArgumentError(
646 "Argument %r is not accepted by "
647 "dialect %r on behalf of %r"
648 % (k, dialect_name, self.__class__)
649 )
650 else:
651 construct_arg_dictionary[arg_name] = kwargs[k]
654class CompileState:
655 """Produces additional object state necessary for a statement to be
656 compiled.
658 the :class:`.CompileState` class is at the base of classes that assemble
659 state for a particular statement object that is then used by the
660 compiler. This process is essentially an extension of the process that
661 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis
662 on converting raw user intent into more organized structures rather than
663 producing string output. The top-level :class:`.CompileState` for the
664 statement being executed is also accessible when the execution context
665 works with invoking the statement and collecting results.
667 The production of :class:`.CompileState` is specific to the compiler, such
668 as within the :meth:`.SQLCompiler.visit_insert`,
669 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also
670 responsible for associating the :class:`.CompileState` with the
671 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement,
672 i.e. the outermost SQL statement that's actually being executed.
673 There can be other :class:`.CompileState` objects that are not the
674 toplevel, such as when a SELECT subquery or CTE-nested
675 INSERT/UPDATE/DELETE is generated.
677 .. versionadded:: 1.4
679 """
681 __slots__ = ("statement", "_ambiguous_table_name_map")
683 plugins: Dict[Tuple[str, str], Type[CompileState]] = {}
685 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap]
687 @classmethod
688 def create_for_statement(
689 cls, statement: Executable, compiler: SQLCompiler, **kw: Any
690 ) -> CompileState:
691 # factory construction.
693 if statement._propagate_attrs:
694 plugin_name = statement._propagate_attrs.get(
695 "compile_state_plugin", "default"
696 )
697 klass = cls.plugins.get(
698 (plugin_name, statement._effective_plugin_target), None
699 )
700 if klass is None:
701 klass = cls.plugins[
702 ("default", statement._effective_plugin_target)
703 ]
705 else:
706 klass = cls.plugins[
707 ("default", statement._effective_plugin_target)
708 ]
710 if klass is cls:
711 return cls(statement, compiler, **kw)
712 else:
713 return klass.create_for_statement(statement, compiler, **kw)
715 def __init__(self, statement, compiler, **kw):
716 self.statement = statement
718 @classmethod
719 def get_plugin_class(
720 cls, statement: Executable
721 ) -> Optional[Type[CompileState]]:
722 plugin_name = statement._propagate_attrs.get(
723 "compile_state_plugin", None
724 )
726 if plugin_name:
727 key = (plugin_name, statement._effective_plugin_target)
728 if key in cls.plugins:
729 return cls.plugins[key]
731 # there's no case where we call upon get_plugin_class() and want
732 # to get None back, there should always be a default. return that
733 # if there was no plugin-specific class (e.g. Insert with "orm"
734 # plugin)
735 try:
736 return cls.plugins[("default", statement._effective_plugin_target)]
737 except KeyError:
738 return None
740 @classmethod
741 def _get_plugin_class_for_plugin(
742 cls, statement: Executable, plugin_name: str
743 ) -> Optional[Type[CompileState]]:
744 try:
745 return cls.plugins[
746 (plugin_name, statement._effective_plugin_target)
747 ]
748 except KeyError:
749 return None
751 @classmethod
752 def plugin_for(
753 cls, plugin_name: str, visit_name: str
754 ) -> Callable[[_Fn], _Fn]:
755 def decorate(cls_to_decorate):
756 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate
757 return cls_to_decorate
759 return decorate
762class Generative(HasMemoized):
763 """Provide a method-chaining pattern in conjunction with the
764 @_generative decorator."""
766 def _generate(self) -> Self:
767 skip = self._memoized_keys
768 cls = self.__class__
769 s = cls.__new__(cls)
770 if skip:
771 # ensure this iteration remains atomic
772 s.__dict__ = {
773 k: v for k, v in self.__dict__.copy().items() if k not in skip
774 }
775 else:
776 s.__dict__ = self.__dict__.copy()
777 return s
780class InPlaceGenerative(HasMemoized):
781 """Provide a method-chaining pattern in conjunction with the
782 @_generative decorator that mutates in place."""
784 __slots__ = ()
786 def _generate(self) -> Self:
787 skip = self._memoized_keys
788 # note __dict__ needs to be in __slots__ if this is used
789 for k in skip:
790 self.__dict__.pop(k, None)
791 return self
794class HasCompileState(Generative):
795 """A class that has a :class:`.CompileState` associated with it."""
797 _compile_state_plugin: Optional[Type[CompileState]] = None
799 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT
801 _compile_state_factory = CompileState.create_for_statement
804class _MetaOptions(type):
805 """metaclass for the Options class.
807 This metaclass is actually necessary despite the availability of the
808 ``__init_subclass__()`` hook as this type also provides custom class-level
809 behavior for the ``__add__()`` method.
811 """
813 _cache_attrs: Tuple[str, ...]
815 def __add__(self, other):
816 o1 = self()
818 if set(other).difference(self._cache_attrs):
819 raise TypeError(
820 "dictionary contains attributes not covered by "
821 "Options class %s: %r"
822 % (self, set(other).difference(self._cache_attrs))
823 )
825 o1.__dict__.update(other)
826 return o1
828 if TYPE_CHECKING:
830 def __getattr__(self, key: str) -> Any: ...
832 def __setattr__(self, key: str, value: Any) -> None: ...
834 def __delattr__(self, key: str) -> None: ...
837class Options(metaclass=_MetaOptions):
838 """A cacheable option dictionary with defaults."""
840 __slots__ = ()
842 _cache_attrs: Tuple[str, ...]
844 def __init_subclass__(cls) -> None:
845 dict_ = cls.__dict__
846 cls._cache_attrs = tuple(
847 sorted(
848 d
849 for d in dict_
850 if not d.startswith("__")
851 and d not in ("_cache_key_traversal",)
852 )
853 )
854 super().__init_subclass__()
856 def __init__(self, **kw: Any) -> None:
857 self.__dict__.update(kw)
859 def __add__(self, other):
860 o1 = self.__class__.__new__(self.__class__)
861 o1.__dict__.update(self.__dict__)
863 if set(other).difference(self._cache_attrs):
864 raise TypeError(
865 "dictionary contains attributes not covered by "
866 "Options class %s: %r"
867 % (self, set(other).difference(self._cache_attrs))
868 )
870 o1.__dict__.update(other)
871 return o1
873 def __eq__(self, other):
874 # TODO: very inefficient. This is used only in test suites
875 # right now.
876 for a, b in zip_longest(self._cache_attrs, other._cache_attrs):
877 if getattr(self, a) != getattr(other, b):
878 return False
879 return True
881 def __repr__(self) -> str:
882 # TODO: fairly inefficient, used only in debugging right now.
884 return "%s(%s)" % (
885 self.__class__.__name__,
886 ", ".join(
887 "%s=%r" % (k, self.__dict__[k])
888 for k in self._cache_attrs
889 if k in self.__dict__
890 ),
891 )
893 @classmethod
894 def isinstance(cls, klass: Type[Any]) -> bool:
895 return issubclass(cls, klass)
897 @hybridmethod
898 def add_to_element(self, name: str, value: str) -> Any:
899 return self + {name: getattr(self, name) + value}
901 @hybridmethod
902 def _state_dict_inst(self) -> Mapping[str, Any]:
903 return self.__dict__
905 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT
907 @_state_dict_inst.classlevel
908 def _state_dict(cls) -> Mapping[str, Any]:
909 return cls._state_dict_const
911 @classmethod
912 def safe_merge(cls, other: "Options") -> Any:
913 d = other._state_dict()
915 # only support a merge with another object of our class
916 # and which does not have attrs that we don't. otherwise
917 # we risk having state that might not be part of our cache
918 # key strategy
920 if (
921 cls is not other.__class__
922 and other._cache_attrs
923 and set(other._cache_attrs).difference(cls._cache_attrs)
924 ):
925 raise TypeError(
926 "other element %r is not empty, is not of type %s, "
927 "and contains attributes not covered here %r"
928 % (
929 other,
930 cls,
931 set(other._cache_attrs).difference(cls._cache_attrs),
932 )
933 )
934 return cls + d
936 @classmethod
937 def from_execution_options(
938 cls,
939 key: str,
940 attrs: set[str],
941 exec_options: Mapping[str, Any],
942 statement_exec_options: Mapping[str, Any],
943 ) -> Tuple["Options", Mapping[str, Any]]:
944 """process Options argument in terms of execution options.
947 e.g.::
949 (
950 load_options,
951 execution_options,
952 ) = QueryContext.default_load_options.from_execution_options(
953 "_sa_orm_load_options",
954 {"populate_existing", "autoflush", "yield_per"},
955 execution_options,
956 statement._execution_options,
957 )
959 get back the Options and refresh "_sa_orm_load_options" in the
960 exec options dict w/ the Options as well
962 """
964 # common case is that no options we are looking for are
965 # in either dictionary, so cancel for that first
966 check_argnames = attrs.intersection(
967 set(exec_options).union(statement_exec_options)
968 )
970 existing_options = exec_options.get(key, cls)
972 if check_argnames:
973 result = {}
974 for argname in check_argnames:
975 local = "_" + argname
976 if argname in exec_options:
977 result[local] = exec_options[argname]
978 elif argname in statement_exec_options:
979 result[local] = statement_exec_options[argname]
981 new_options = existing_options + result
982 exec_options = util.EMPTY_DICT.merge_with(
983 exec_options, {key: new_options}
984 )
985 return new_options, exec_options
987 else:
988 return existing_options, exec_options
990 if TYPE_CHECKING:
992 def __getattr__(self, key: str) -> Any: ...
994 def __setattr__(self, key: str, value: Any) -> None: ...
996 def __delattr__(self, key: str) -> None: ...
999class CacheableOptions(Options, HasCacheKey):
1000 __slots__ = ()
1002 @hybridmethod
1003 def _gen_cache_key_inst(
1004 self, anon_map: Any, bindparams: List[BindParameter[Any]]
1005 ) -> Optional[Tuple[Any]]:
1006 return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
1008 @_gen_cache_key_inst.classlevel
1009 def _gen_cache_key(
1010 cls, anon_map: "anon_map", bindparams: List[BindParameter[Any]]
1011 ) -> Tuple[CacheableOptions, Any]:
1012 return (cls, ())
1014 @hybridmethod
1015 def _generate_cache_key(self) -> Optional[CacheKey]:
1016 return HasCacheKey._generate_cache_key(self)
1019class ExecutableOption(HasCopyInternals):
1020 __slots__ = ()
1022 _annotations: _ImmutableExecuteOptions = util.EMPTY_DICT
1024 __visit_name__: str = "executable_option"
1026 _is_has_cache_key: bool = False
1028 _is_core: bool = True
1030 def _clone(self, **kw):
1031 """Create a shallow copy of this ExecutableOption."""
1032 c = self.__class__.__new__(self.__class__)
1033 c.__dict__ = dict(self.__dict__) # type: ignore
1034 return c
1037_L = TypeVar("_L", bound=str)
1040class HasSyntaxExtensions(Generic[_L]):
1042 _position_map: Mapping[_L, str]
1044 @_generative
1045 def ext(self, extension: SyntaxExtension) -> Self:
1046 """Applies a SQL syntax extension to this statement.
1048 SQL syntax extensions are :class:`.ClauseElement` objects that define
1049 some vendor-specific syntactical construct that take place in specific
1050 parts of a SQL statement. Examples include vendor extensions like
1051 PostgreSQL / SQLite's "ON DUPLICATE KEY UPDATE", PostgreSQL's
1052 "DISTINCT ON", and MySQL's "LIMIT" that can be applied to UPDATE
1053 and DELETE statements.
1055 .. seealso::
1057 :ref:`examples_syntax_extensions`
1059 :func:`_mysql.limit` - DML LIMIT for MySQL
1061 :func:`_postgresql.distinct_on` - DISTINCT ON for PostgreSQL
1063 .. versionadded:: 2.1
1065 """
1066 extension = coercions.expect(
1067 roles.SyntaxExtensionRole, extension, apply_propagate_attrs=self
1068 )
1069 self._apply_syntax_extension_to_self(extension)
1070 return self
1072 @util.preload_module("sqlalchemy.sql.elements")
1073 def apply_syntax_extension_point(
1074 self,
1075 apply_fn: Callable[[Sequence[ClauseElement]], Sequence[ClauseElement]],
1076 position: _L,
1077 ) -> None:
1078 """Apply a :class:`.SyntaxExtension` to a known extension point.
1080 Should be used only internally by :class:`.SyntaxExtension`.
1082 E.g.::
1084 class Qualify(SyntaxExtension, ClauseElement):
1086 # ...
1088 def apply_to_select(self, select_stmt: Select) -> None:
1089 # append self to existing
1090 select_stmt.apply_extension_point(
1091 lambda existing: [*existing, self], "post_criteria"
1092 )
1095 class ReplaceExt(SyntaxExtension, ClauseElement):
1097 # ...
1099 def apply_to_select(self, select_stmt: Select) -> None:
1100 # replace any existing elements regardless of type
1101 select_stmt.apply_extension_point(
1102 lambda existing: [self], "post_criteria"
1103 )
1106 class ReplaceOfTypeExt(SyntaxExtension, ClauseElement):
1108 # ...
1110 def apply_to_select(self, select_stmt: Select) -> None:
1111 # replace any existing elements of the same type
1112 select_stmt.apply_extension_point(
1113 self.append_replacing_same_type, "post_criteria"
1114 )
1116 :param apply_fn: callable function that will receive a sequence of
1117 :class:`.ClauseElement` that is already populating the extension
1118 point (the sequence is empty if there isn't one), and should return
1119 a new sequence of :class:`.ClauseElement` that will newly populate
1120 that point. The function typically can choose to concatenate the
1121 existing values with the new one, or to replace the values that are
1122 there with a new one by returning a list of a single element, or
1123 to perform more complex operations like removing only the same
1124 type element from the input list of merging already existing elements
1125 of the same type. Some examples are shown in the examples above
1126 :param position: string name of the position to apply to. This
1127 varies per statement type. IDEs should show the possible values
1128 for each statement type as it's typed with a ``typing.Literal`` per
1129 statement.
1131 .. seealso::
1133 :ref:`examples_syntax_extensions`
1135 :meth:`.ext`
1138 """ # noqa: E501
1140 try:
1141 attrname = self._position_map[position]
1142 except KeyError as ke:
1143 raise ValueError(
1144 f"Unknown position {position!r} for {self.__class__} "
1145 f"construct; known positions: "
1146 f"{', '.join(repr(k) for k in self._position_map)}"
1147 ) from ke
1148 else:
1149 ElementList = util.preloaded.sql_elements.ElementList
1150 existing: Optional[ClauseElement] = getattr(self, attrname, None)
1151 if existing is None:
1152 input_seq: Tuple[ClauseElement, ...] = ()
1153 elif isinstance(existing, ElementList):
1154 input_seq = existing.clauses
1155 else:
1156 input_seq = (existing,)
1158 new_seq = apply_fn(input_seq)
1159 assert new_seq, "cannot return empty sequence"
1160 new = new_seq[0] if len(new_seq) == 1 else ElementList(new_seq)
1161 setattr(self, attrname, new)
1163 def _apply_syntax_extension_to_self(
1164 self, extension: SyntaxExtension
1165 ) -> None:
1166 raise NotImplementedError()
1168 def _get_syntax_extensions_as_dict(self) -> Mapping[_L, SyntaxExtension]:
1169 res: Dict[_L, SyntaxExtension] = {}
1170 for name, attr in self._position_map.items():
1171 value = getattr(self, attr)
1172 if value is not None:
1173 res[name] = value
1174 return res
1176 def _set_syntax_extensions(self, **extensions: SyntaxExtension) -> None:
1177 for name, value in extensions.items():
1178 setattr(self, self._position_map[name], value) # type: ignore[index] # noqa: E501
1181class SyntaxExtension(roles.SyntaxExtensionRole):
1182 """Defines a unit that when also extending from :class:`.ClauseElement`
1183 can be applied to SQLAlchemy statements :class:`.Select`,
1184 :class:`_sql.Insert`, :class:`.Update` and :class:`.Delete` making use of
1185 pre-established SQL insertion points within these constructs.
1187 .. versionadded:: 2.1
1189 .. seealso::
1191 :ref:`examples_syntax_extensions`
1193 """
1195 def append_replacing_same_type(
1196 self, existing: Sequence[ClauseElement]
1197 ) -> Sequence[ClauseElement]:
1198 """Utility function that can be used as
1199 :paramref:`_sql.Select.apply_syntax_extension_point.apply_fn`
1200 to remove any other element of the same type in existing and appending
1201 ``self`` to the list.
1203 This is equivalent to::
1205 stmt.apply_syntax_extension_point(
1206 lambda existing: [
1207 *(e for e in existing if not isinstance(e, ReplaceOfTypeExt)),
1208 self,
1209 ],
1210 "post_criteria",
1211 )
1213 .. seealso::
1215 :ref:`examples_syntax_extensions`
1217 :meth:`_sql.Select.apply_syntax_extension_point` and equivalents
1218 in :class:`_dml.Insert`, :class:`_dml.Delete`, :class:`_dml.Update`
1220 """ # noqa: E501
1221 cls = type(self)
1222 return [*(e for e in existing if not isinstance(e, cls)), self] # type: ignore[list-item] # noqa: E501
1224 def apply_to_select(self, select_stmt: Select[Unpack[_Ts]]) -> None:
1225 """Apply this :class:`.SyntaxExtension` to a :class:`.Select`"""
1226 raise NotImplementedError(
1227 f"Extension {type(self).__name__} cannot be applied to select"
1228 )
1230 def apply_to_update(self, update_stmt: Update) -> None:
1231 """Apply this :class:`.SyntaxExtension` to an :class:`.Update`"""
1232 raise NotImplementedError(
1233 f"Extension {type(self).__name__} cannot be applied to update"
1234 )
1236 def apply_to_delete(self, delete_stmt: Delete) -> None:
1237 """Apply this :class:`.SyntaxExtension` to a :class:`.Delete`"""
1238 raise NotImplementedError(
1239 f"Extension {type(self).__name__} cannot be applied to delete"
1240 )
1242 def apply_to_insert(self, insert_stmt: Insert) -> None:
1243 """Apply this :class:`.SyntaxExtension` to an :class:`_sql.Insert`"""
1244 raise NotImplementedError(
1245 f"Extension {type(self).__name__} cannot be applied to insert"
1246 )
1249class Executable(roles.StatementRole):
1250 """Mark a :class:`_expression.ClauseElement` as supporting execution.
1252 :class:`.Executable` is a superclass for all "statement" types
1253 of objects, including :func:`select`, :func:`delete`, :func:`update`,
1254 :func:`insert`, :func:`text`.
1256 """
1258 supports_execution: bool = True
1259 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT
1260 _is_default_generator: bool = False
1261 _with_options: Tuple[ExecutableOption, ...] = ()
1262 _compile_state_funcs: Tuple[
1263 Tuple[Callable[[CompileState], None], Any], ...
1264 ] = ()
1265 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]]
1267 _executable_traverse_internals = [
1268 ("_with_options", InternalTraversal.dp_executable_options),
1269 (
1270 "_compile_state_funcs",
1271 ExtendedInternalTraversal.dp_compile_state_funcs,
1272 ),
1273 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
1274 ]
1276 is_select: bool = False
1277 is_from_statement: bool = False
1278 is_update: bool = False
1279 is_insert: bool = False
1280 is_text: bool = False
1281 is_delete: bool = False
1282 is_dml: bool = False
1284 if TYPE_CHECKING:
1285 __visit_name__: str
1287 def _compile_w_cache(
1288 self,
1289 dialect: Dialect,
1290 *,
1291 compiled_cache: Optional[CompiledCacheType],
1292 column_keys: List[str],
1293 for_executemany: bool = False,
1294 schema_translate_map: Optional[SchemaTranslateMapType] = None,
1295 **kw: Any,
1296 ) -> tuple[
1297 Compiled,
1298 Sequence[BindParameter[Any]] | None,
1299 _CoreSingleExecuteParams | None,
1300 CacheStats,
1301 ]: ...
1303 def _execute_on_connection(
1304 self,
1305 connection: Connection,
1306 distilled_params: _CoreMultiExecuteParams,
1307 execution_options: CoreExecuteOptionsParameter,
1308 ) -> CursorResult[Any]: ...
1310 def _execute_on_scalar(
1311 self,
1312 connection: Connection,
1313 distilled_params: _CoreMultiExecuteParams,
1314 execution_options: CoreExecuteOptionsParameter,
1315 ) -> Any: ...
1317 @util.ro_non_memoized_property
1318 def _all_selected_columns(self) -> _SelectIterable:
1319 raise NotImplementedError()
1321 @property
1322 def _effective_plugin_target(self) -> str:
1323 return self.__visit_name__
1325 @_generative
1326 def options(self, *options: ExecutableOption) -> Self:
1327 """Apply options to this statement.
1329 In the general sense, options are any kind of Python object
1330 that can be interpreted by systems that consume the statement outside
1331 of the regular SQL compiler chain. Specifically, these options are
1332 the ORM level options that apply "eager load" and other loading
1333 behaviors to an ORM query.
1335 For background on specific kinds of options for specific kinds of
1336 statements, refer to the documentation for those option objects.
1338 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to
1339 Core statement objects towards the goal of allowing unified
1340 Core / ORM querying capabilities.
1342 .. seealso::
1344 :ref:`loading_columns` - refers to options specific to the usage
1345 of ORM queries
1347 :ref:`relationship_loader_options` - refers to options specific
1348 to the usage of ORM queries
1350 """
1351 self._with_options += tuple(
1352 coercions.expect(roles.ExecutableOptionRole, opt)
1353 for opt in options
1354 )
1355 return self
1357 @_generative
1358 def _set_compile_options(self, compile_options: CacheableOptions) -> Self:
1359 """Assign the compile options to a new value.
1361 :param compile_options: appropriate CacheableOptions structure
1363 """
1365 self._compile_options = compile_options
1366 return self
1368 @_generative
1369 def _update_compile_options(self, options: CacheableOptions) -> Self:
1370 """update the _compile_options with new keys."""
1372 assert self._compile_options is not None
1373 self._compile_options += options
1374 return self
1376 @_generative
1377 def _add_compile_state_func(
1378 self,
1379 callable_: Callable[[CompileState], None],
1380 cache_args: Any,
1381 ) -> Self:
1382 """Add a compile state function to this statement.
1384 When using the ORM only, these are callable functions that will
1385 be given the CompileState object upon compilation.
1387 A second argument cache_args is required, which will be combined with
1388 the ``__code__`` identity of the function itself in order to produce a
1389 cache key.
1391 """
1392 self._compile_state_funcs += ((callable_, cache_args),)
1393 return self
1395 @overload
1396 def execution_options(
1397 self,
1398 *,
1399 compiled_cache: Optional[CompiledCacheType] = ...,
1400 logging_token: str = ...,
1401 isolation_level: IsolationLevel = ...,
1402 no_parameters: bool = False,
1403 stream_results: bool = False,
1404 max_row_buffer: int = ...,
1405 yield_per: int = ...,
1406 driver_column_names: bool = ...,
1407 insertmanyvalues_page_size: int = ...,
1408 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
1409 populate_existing: bool = False,
1410 autoflush: bool = False,
1411 synchronize_session: SynchronizeSessionArgument = ...,
1412 dml_strategy: DMLStrategyArgument = ...,
1413 render_nulls: bool = ...,
1414 is_delete_using: bool = ...,
1415 is_update_from: bool = ...,
1416 preserve_rowcount: bool = False,
1417 **opt: Any,
1418 ) -> Self: ...
1420 @overload
1421 def execution_options(self, **opt: Any) -> Self: ...
1423 @_generative
1424 def execution_options(self, **kw: Any) -> Self:
1425 """Set non-SQL options for the statement which take effect during
1426 execution.
1428 Execution options can be set at many scopes, including per-statement,
1429 per-connection, or per execution, using methods such as
1430 :meth:`_engine.Connection.execution_options` and parameters which
1431 accept a dictionary of options such as
1432 :paramref:`_engine.Connection.execute.execution_options` and
1433 :paramref:`_orm.Session.execute.execution_options`.
1435 The primary characteristic of an execution option, as opposed to
1436 other kinds of options such as ORM loader options, is that
1437 **execution options never affect the compiled SQL of a query, only
1438 things that affect how the SQL statement itself is invoked or how
1439 results are fetched**. That is, execution options are not part of
1440 what's accommodated by SQL compilation nor are they considered part of
1441 the cached state of a statement.
1443 The :meth:`_sql.Executable.execution_options` method is
1444 :term:`generative`, as
1445 is the case for the method as applied to the :class:`_engine.Engine`
1446 and :class:`_orm.Query` objects, which means when the method is called,
1447 a copy of the object is returned, which applies the given parameters to
1448 that new copy, but leaves the original unchanged::
1450 statement = select(table.c.x, table.c.y)
1451 new_statement = statement.execution_options(my_option=True)
1453 An exception to this behavior is the :class:`_engine.Connection`
1454 object, where the :meth:`_engine.Connection.execution_options` method
1455 is explicitly **not** generative.
1457 The kinds of options that may be passed to
1458 :meth:`_sql.Executable.execution_options` and other related methods and
1459 parameter dictionaries include parameters that are explicitly consumed
1460 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not
1461 defined by SQLAlchemy, which means the methods and/or parameter
1462 dictionaries may be used for user-defined parameters that interact with
1463 custom code, which may access the parameters using methods such as
1464 :meth:`_sql.Executable.get_execution_options` and
1465 :meth:`_engine.Connection.get_execution_options`, or within selected
1466 event hooks using a dedicated ``execution_options`` event parameter
1467 such as
1468 :paramref:`_events.ConnectionEvents.before_execute.execution_options`
1469 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.::
1471 from sqlalchemy import event
1474 @event.listens_for(some_engine, "before_execute")
1475 def _process_opt(conn, statement, multiparams, params, execution_options):
1476 "run a SQL function before invoking a statement"
1478 if execution_options.get("do_special_thing", False):
1479 conn.exec_driver_sql("run_special_function()")
1481 Within the scope of options that are explicitly recognized by
1482 SQLAlchemy, most apply to specific classes of objects and not others.
1483 The most common execution options include:
1485 * :paramref:`_engine.Connection.execution_options.isolation_level` -
1486 sets the isolation level for a connection or a class of connections
1487 via an :class:`_engine.Engine`. This option is accepted only
1488 by :class:`_engine.Connection` or :class:`_engine.Engine`.
1490 * :paramref:`_engine.Connection.execution_options.stream_results` -
1491 indicates results should be fetched using a server side cursor;
1492 this option is accepted by :class:`_engine.Connection`, by the
1493 :paramref:`_engine.Connection.execute.execution_options` parameter
1494 on :meth:`_engine.Connection.execute`, and additionally by
1495 :meth:`_sql.Executable.execution_options` on a SQL statement object,
1496 as well as by ORM constructs like :meth:`_orm.Session.execute`.
1498 * :paramref:`_engine.Connection.execution_options.compiled_cache` -
1499 indicates a dictionary that will serve as the
1500 :ref:`SQL compilation cache <sql_caching>`
1501 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as
1502 well as for ORM methods like :meth:`_orm.Session.execute`.
1503 Can be passed as ``None`` to disable caching for statements.
1504 This option is not accepted by
1505 :meth:`_sql.Executable.execution_options` as it is inadvisable to
1506 carry along a compilation cache within a statement object.
1508 * :paramref:`_engine.Connection.execution_options.schema_translate_map`
1509 - a mapping of schema names used by the
1510 :ref:`Schema Translate Map <schema_translating>` feature, accepted
1511 by :class:`_engine.Connection`, :class:`_engine.Engine`,
1512 :class:`_sql.Executable`, as well as by ORM constructs
1513 like :meth:`_orm.Session.execute`.
1515 .. seealso::
1517 :meth:`_engine.Connection.execution_options`
1519 :paramref:`_engine.Connection.execute.execution_options`
1521 :paramref:`_orm.Session.execute.execution_options`
1523 :ref:`orm_queryguide_execution_options` - documentation on all
1524 ORM-specific execution options
1526 """ # noqa: E501
1527 if "isolation_level" in kw:
1528 raise exc.ArgumentError(
1529 "'isolation_level' execution option may only be specified "
1530 "on Connection.execution_options(), or "
1531 "per-engine using the isolation_level "
1532 "argument to create_engine()."
1533 )
1534 if "compiled_cache" in kw:
1535 raise exc.ArgumentError(
1536 "'compiled_cache' execution option may only be specified "
1537 "on Connection.execution_options(), not per statement."
1538 )
1539 self._execution_options = self._execution_options.union(kw)
1540 return self
1542 def get_execution_options(self) -> _ExecuteOptions:
1543 """Get the non-SQL options which will take effect during execution.
1545 .. seealso::
1547 :meth:`.Executable.execution_options`
1548 """
1549 return self._execution_options
1552class ExecutableStatement(Executable):
1553 """Executable subclass that implements a lightweight version of ``params``
1554 that avoids a full cloned traverse.
1556 .. versionadded:: 2.1
1558 """
1560 _params: util.immutabledict[str, Any] = EMPTY_DICT
1562 _executable_traverse_internals = (
1563 Executable._executable_traverse_internals
1564 + [("_params", InternalTraversal.dp_params)]
1565 )
1567 @_generative
1568 def params(
1569 self,
1570 __optionaldict: _CoreSingleExecuteParams | None = None,
1571 /,
1572 **kwargs: Any,
1573 ) -> Self:
1574 """Return a copy with the provided bindparam values.
1576 Returns a copy of this Executable with bindparam values set
1577 to the given dictionary::
1579 >>> clause = column("x") + bindparam("foo")
1580 >>> print(clause.compile().params)
1581 {'foo': None}
1582 >>> print(clause.params({"foo": 7}).compile().params)
1583 {'foo': 7}
1585 """
1586 if __optionaldict:
1587 kwargs.update(__optionaldict)
1588 self._params = (
1589 util.immutabledict(kwargs)
1590 if not self._params
1591 else self._params | kwargs
1592 )
1593 return self
1596class SchemaEventTarget(event.EventTarget):
1597 """Base class for elements that are the targets of :class:`.DDLEvents`
1598 events.
1600 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
1602 """
1604 dispatch: dispatcher[SchemaEventTarget]
1606 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
1607 """Associate with this SchemaEvent's parent object."""
1609 def _set_parent_with_dispatch(
1610 self, parent: SchemaEventTarget, **kw: Any
1611 ) -> None:
1612 self.dispatch.before_parent_attach(self, parent)
1613 self._set_parent(parent, **kw)
1614 self.dispatch.after_parent_attach(self, parent)
1617class SchemaVisitable(SchemaEventTarget, visitors.Visitable):
1618 """Base class for elements that are targets of a :class:`.SchemaVisitor`.
1620 .. versionadded:: 2.0.41
1622 """
1625class SchemaVisitor(ClauseVisitor):
1626 """Define the visiting for ``SchemaItem`` and more
1627 generally ``SchemaVisitable`` objects.
1629 """
1631 __traverse_options__: Dict[str, Any] = {"schema_visitor": True}
1634class _SentinelDefaultCharacterization(Enum):
1635 NONE = "none"
1636 UNKNOWN = "unknown"
1637 CLIENTSIDE = "clientside"
1638 SENTINEL_DEFAULT = "sentinel_default"
1639 SERVERSIDE = "serverside"
1640 IDENTITY = "identity"
1641 SEQUENCE = "sequence"
1642 MONOTONIC_FUNCTION = "monotonic"
1645class _SentinelColumnCharacterization(NamedTuple):
1646 columns: Optional[Sequence[Column[Any]]] = None
1647 is_explicit: bool = False
1648 is_autoinc: bool = False
1649 default_characterization: _SentinelDefaultCharacterization = (
1650 _SentinelDefaultCharacterization.NONE
1651 )
1654_COLKEY = TypeVar("_COLKEY", Union[None, str], str)
1656_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True)
1657_COL = TypeVar("_COL", bound="ColumnElement[Any]")
1660class _ColumnMetrics(Generic[_COL_co]):
1661 __slots__ = ("column",)
1663 column: _COL_co
1665 def __init__(
1666 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co
1667 ) -> None:
1668 self.column = col
1670 # proxy_index being non-empty means it was initialized.
1671 # so we need to update it
1672 pi = collection._proxy_index
1673 if pi:
1674 for eps_col in col._expanded_proxy_set:
1675 pi[eps_col].add(self)
1677 def get_expanded_proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
1678 return self.column._expanded_proxy_set
1680 def dispose(self, collection: ColumnCollection[_COLKEY, _COL_co]) -> None:
1681 pi = collection._proxy_index
1682 if not pi:
1683 return
1684 for col in self.column._expanded_proxy_set:
1685 colset = pi.get(col, None)
1686 if colset:
1687 colset.discard(self)
1688 if colset is not None and not colset:
1689 del pi[col]
1691 def embedded(
1692 self,
1693 target_set: Union[
1694 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]]
1695 ],
1696 ) -> bool:
1697 expanded_proxy_set = self.column._expanded_proxy_set
1698 for t in target_set.difference(expanded_proxy_set):
1699 if not expanded_proxy_set.intersection(_expand_cloned([t])):
1700 return False
1701 return True
1704class ColumnCollection(Generic[_COLKEY, _COL_co]):
1705 """Base class for collection of :class:`_expression.ColumnElement`
1706 instances, typically for :class:`_sql.FromClause` objects.
1708 The :class:`_sql.ColumnCollection` object is most commonly available
1709 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection
1710 on the :class:`_schema.Table` object, introduced at
1711 :ref:`metadata_tables_and_columns`.
1713 The :class:`_expression.ColumnCollection` has both mapping- and sequence-
1714 like behaviors. A :class:`_expression.ColumnCollection` usually stores
1715 :class:`_schema.Column` objects, which are then accessible both via mapping
1716 style access as well as attribute access style.
1718 To access :class:`_schema.Column` objects using ordinary attribute-style
1719 access, specify the name like any other object attribute, such as below
1720 a column named ``employee_name`` is accessed::
1722 >>> employee_table.c.employee_name
1724 To access columns that have names with special characters or spaces,
1725 index-style access is used, such as below which illustrates a column named
1726 ``employee ' payment`` is accessed::
1728 >>> employee_table.c["employee ' payment"]
1730 As the :class:`_sql.ColumnCollection` object provides a Python dictionary
1731 interface, common dictionary method names like
1732 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`,
1733 and :meth:`_sql.ColumnCollection.items` are available, which means that
1734 database columns that are keyed under these names also need to use indexed
1735 access::
1737 >>> employee_table.c["values"]
1740 The name for which a :class:`_schema.Column` would be present is normally
1741 that of the :paramref:`_schema.Column.key` parameter. In some contexts,
1742 such as a :class:`_sql.Select` object that uses a label style set
1743 using the :meth:`_sql.Select.set_label_style` method, a column of a certain
1744 key may instead be represented under a particular label name such
1745 as ``tablename_columnname``::
1747 >>> from sqlalchemy import select, column, table
1748 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
1749 >>> t = table("t", column("c"))
1750 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
1751 >>> subq = stmt.subquery()
1752 >>> subq.c.t_c
1753 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c>
1755 :class:`.ColumnCollection` also indexes the columns in order and allows
1756 them to be accessible by their integer position::
1758 >>> cc[0]
1759 Column('x', Integer(), table=None)
1760 >>> cc[1]
1761 Column('y', Integer(), table=None)
1763 .. versionadded:: 1.4 :class:`_expression.ColumnCollection`
1764 allows integer-based
1765 index access to the collection.
1767 Iterating the collection yields the column expressions in order::
1769 >>> list(cc)
1770 [Column('x', Integer(), table=None),
1771 Column('y', Integer(), table=None)]
1773 The :class:`_expression.ColumnCollection` base class is read-only.
1774 For mutation operations, the :class:`.WriteableColumnCollection` subclass
1775 provides methods such as :meth:`.WriteableColumnCollection.add`.
1776 A special subclass :class:`.DedupeColumnCollection` exists which
1777 maintains SQLAlchemy's older behavior of not allowing duplicates; this
1778 collection is used for schema level objects like :class:`_schema.Table`
1779 and :class:`.PrimaryKeyConstraint` where this deduping is helpful.
1780 The :class:`.DedupeColumnCollection` class also has additional mutation
1781 methods as the schema constructs have more use cases that require removal
1782 and replacement of columns.
1784 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection`
1785 now stores duplicate
1786 column keys as well as the same column in multiple positions. The
1787 :class:`.DedupeColumnCollection` class is added to maintain the
1788 former behavior in those cases where deduplication as well as
1789 additional replace/remove operations are needed.
1791 .. versionchanged:: 2.1 :class:`_expression.ColumnCollection` is now
1792 a read-only base class. Mutation operations are available through
1793 :class:`.WriteableColumnCollection` and :class:`.DedupeColumnCollection`
1794 subclasses.
1797 """
1799 __slots__ = ("_collection", "_index", "_colset", "_proxy_index")
1801 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]]
1802 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
1803 _colset: Set[_COL_co]
1804 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]]
1806 def __init__(self) -> None:
1807 raise TypeError(
1808 "ColumnCollection is an abstract base class and cannot be "
1809 "instantiated directly. Use WriteableColumnCollection or "
1810 "DedupeColumnCollection instead."
1811 )
1813 @util.preload_module("sqlalchemy.sql.elements")
1814 def __clause_element__(self) -> ClauseList:
1815 elements = util.preloaded.sql_elements
1817 return elements.ClauseList(
1818 _literal_as_text_role=roles.ColumnsClauseRole,
1819 group=False,
1820 *self._all_columns,
1821 )
1823 @property
1824 def _all_columns(self) -> List[_COL_co]:
1825 return [col for (_, col, _) in self._collection]
1827 def keys(self) -> List[_COLKEY]:
1828 """Return a sequence of string key names for all columns in this
1829 collection."""
1830 return [k for (k, _, _) in self._collection]
1832 def values(self) -> List[_COL_co]:
1833 """Return a sequence of :class:`_sql.ColumnClause` or
1834 :class:`_schema.Column` objects for all columns in this
1835 collection."""
1836 return [col for (_, col, _) in self._collection]
1838 def items(self) -> List[Tuple[_COLKEY, _COL_co]]:
1839 """Return a sequence of (key, column) tuples for all columns in this
1840 collection each consisting of a string key name and a
1841 :class:`_sql.ColumnClause` or
1842 :class:`_schema.Column` object.
1843 """
1845 return [(k, col) for (k, col, _) in self._collection]
1847 def __bool__(self) -> bool:
1848 return bool(self._collection)
1850 def __len__(self) -> int:
1851 return len(self._collection)
1853 def __iter__(self) -> Iterator[_COL_co]:
1854 # turn to a list first to maintain over a course of changes
1855 return iter([col for _, col, _ in self._collection])
1857 @overload
1858 def __getitem__(self, key: Union[str, int]) -> _COL_co: ...
1860 @overload
1861 def __getitem__(
1862 self, key: Tuple[Union[str, int], ...]
1863 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1865 @overload
1866 def __getitem__(
1867 self, key: slice
1868 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1870 def __getitem__(
1871 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]]
1872 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]:
1873 try:
1874 if isinstance(key, (tuple, slice)):
1875 if isinstance(key, slice):
1876 cols = (
1877 (sub_key, col)
1878 for (sub_key, col, _) in self._collection[key]
1879 )
1880 else:
1881 cols = (self._index[sub_key] for sub_key in key)
1883 return WriteableColumnCollection(cols).as_readonly()
1884 else:
1885 return self._index[key][1]
1886 except KeyError as err:
1887 if isinstance(err.args[0], int):
1888 raise IndexError(err.args[0]) from err
1889 else:
1890 raise
1892 def __getattr__(self, key: str) -> _COL_co:
1893 try:
1894 return self._index[key][1]
1895 except KeyError as err:
1896 raise AttributeError(key) from err
1898 def __contains__(self, key: str) -> bool:
1899 if key not in self._index:
1900 if not isinstance(key, str):
1901 raise exc.ArgumentError(
1902 "__contains__ requires a string argument"
1903 )
1904 return False
1905 else:
1906 return True
1908 def compare(self, other: ColumnCollection[_COLKEY, _COL_co]) -> bool:
1909 """Compare this :class:`_expression.ColumnCollection` to another
1910 based on the names of the keys"""
1912 for l, r in zip_longest(self, other):
1913 if l is not r:
1914 return False
1915 else:
1916 return True
1918 def __eq__(self, other: Any) -> bool:
1919 return self.compare(other)
1921 @overload
1922 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ...
1924 @overload
1925 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ...
1927 def get(
1928 self, key: str, default: Optional[_COL] = None
1929 ) -> Optional[Union[_COL_co, _COL]]:
1930 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object
1931 based on a string key name from this
1932 :class:`_expression.ColumnCollection`."""
1934 if key in self._index:
1935 return self._index[key][1]
1936 else:
1937 return default
1939 def __str__(self) -> str:
1940 return "%s(%s)" % (
1941 self.__class__.__name__,
1942 ", ".join(str(c) for c in self),
1943 )
1945 # https://github.com/python/mypy/issues/4266
1946 __hash__: Optional[int] = None # type: ignore
1948 def contains_column(self, col: ColumnElement[Any]) -> bool:
1949 """Checks if a column object exists in this collection"""
1950 if col not in self._colset:
1951 if isinstance(col, str):
1952 raise exc.ArgumentError(
1953 "contains_column cannot be used with string arguments. "
1954 "Use ``col_name in table.c`` instead."
1955 )
1956 return False
1957 else:
1958 return True
1960 def _as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
1961 raise NotImplementedError()
1963 def corresponding_column(
1964 self, column: _COL, require_embedded: bool = False
1965 ) -> Optional[Union[_COL, _COL_co]]:
1966 """Given a :class:`_expression.ColumnElement`, return the exported
1967 :class:`_expression.ColumnElement` object from this
1968 :class:`_expression.ColumnCollection`
1969 which corresponds to that original :class:`_expression.ColumnElement`
1970 via a common
1971 ancestor column.
1973 :param column: the target :class:`_expression.ColumnElement`
1974 to be matched.
1976 :param require_embedded: only return corresponding columns for
1977 the given :class:`_expression.ColumnElement`, if the given
1978 :class:`_expression.ColumnElement`
1979 is actually present within a sub-element
1980 of this :class:`_expression.Selectable`.
1981 Normally the column will match if
1982 it merely shares a common ancestor with one of the exported
1983 columns of this :class:`_expression.Selectable`.
1985 .. seealso::
1987 :meth:`_expression.Selectable.corresponding_column`
1988 - invokes this method
1989 against the collection returned by
1990 :attr:`_expression.Selectable.exported_columns`.
1992 .. versionchanged:: 1.4 the implementation for ``corresponding_column``
1993 was moved onto the :class:`_expression.ColumnCollection` itself.
1995 """
1996 raise NotImplementedError()
1999class WriteableColumnCollection(ColumnCollection[_COLKEY, _COL_co]):
2000 """A :class:`_sql.ColumnCollection` that allows mutation operations.
2002 This is the writable form of :class:`_sql.ColumnCollection` that
2003 implements methods such as :meth:`.add`, :meth:`.remove`, :meth:`.update`,
2004 and :meth:`.clear`.
2006 This class is used internally for building column collections during
2007 construction of SQL constructs. For schema-level objects that require
2008 deduplication behavior, use :class:`.DedupeColumnCollection`.
2010 .. versionadded:: 2.1
2012 """
2014 __slots__ = ()
2016 def __init__(
2017 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None
2018 ):
2019 object.__setattr__(self, "_colset", set())
2020 object.__setattr__(self, "_index", {})
2021 object.__setattr__(
2022 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
2023 )
2024 object.__setattr__(self, "_collection", [])
2025 if columns:
2026 self._initial_populate(columns)
2028 def _initial_populate(
2029 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
2030 ) -> None:
2031 self._populate_separate_keys(iter_)
2033 def _populate_separate_keys(
2034 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
2035 ) -> None:
2036 """populate from an iterator of (key, column)"""
2038 self._collection[:] = collection = [
2039 (k, c, _ColumnMetrics(self, c)) for k, c in iter_
2040 ]
2041 self._colset.update(c._deannotate() for _, c, _ in collection)
2042 self._index.update(
2043 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)}
2044 )
2045 self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
2047 def __getstate__(self) -> Dict[str, Any]:
2048 return {
2049 "_collection": [(k, c) for k, c, _ in self._collection],
2050 "_index": self._index,
2051 }
2053 def __setstate__(self, state: Dict[str, Any]) -> None:
2054 object.__setattr__(self, "_index", state["_index"])
2055 object.__setattr__(
2056 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
2057 )
2058 object.__setattr__(
2059 self,
2060 "_collection",
2061 [
2062 (k, c, _ColumnMetrics(self, c))
2063 for (k, c) in state["_collection"]
2064 ],
2065 )
2066 object.__setattr__(
2067 self, "_colset", {col for k, col, _ in self._collection}
2068 )
2070 def add(
2071 self,
2072 column: ColumnElement[Any],
2073 key: Optional[_COLKEY] = None,
2074 ) -> None:
2075 """Add a column to this :class:`_sql.WriteableColumnCollection`.
2077 .. note::
2079 This method is **not normally used by user-facing code**, as the
2080 :class:`_sql.WriteableColumnCollection` is usually part of an
2081 existing object such as a :class:`_schema.Table`. To add a
2082 :class:`_schema.Column` to an existing :class:`_schema.Table`
2083 object, use the :meth:`_schema.Table.append_column` method.
2085 """
2086 colkey: _COLKEY
2088 if key is None:
2089 colkey = column.key # type: ignore
2090 else:
2091 colkey = key
2093 l = len(self._collection)
2095 # don't really know how this part is supposed to work w/ the
2096 # covariant thing
2098 _column = cast(_COL_co, column)
2100 self._collection.append(
2101 (colkey, _column, _ColumnMetrics(self, _column))
2102 )
2103 self._colset.add(_column._deannotate())
2105 self._index[l] = (colkey, _column)
2106 if colkey not in self._index:
2107 self._index[colkey] = (colkey, _column)
2109 def _as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
2110 return ReadOnlyColumnCollection(self)
2112 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
2113 """Return a "read only" form of this
2114 :class:`_sql.WriteableColumnCollection`."""
2116 return self._as_readonly()
2118 def _init_proxy_index(self) -> None:
2119 """populate the "proxy index", if empty.
2121 proxy index is added in 2.0 to provide more efficient operation
2122 for the corresponding_column() method.
2124 For reasons of both time to construct new .c collections as well as
2125 memory conservation for large numbers of large .c collections, the
2126 proxy_index is only filled if corresponding_column() is called. once
2127 filled it stays that way, and new _ColumnMetrics objects created after
2128 that point will populate it with new data. Note this case would be
2129 unusual, if not nonexistent, as it means a .c collection is being
2130 mutated after corresponding_column() were used, however it is tested in
2131 test/base/test_utils.py.
2133 """
2134 pi = self._proxy_index
2135 if pi:
2136 return
2138 for _, _, metrics in self._collection:
2139 eps = metrics.column._expanded_proxy_set
2141 for eps_col in eps:
2142 pi[eps_col].add(metrics)
2144 def corresponding_column(
2145 self, column: _COL, require_embedded: bool = False
2146 ) -> Optional[Union[_COL, _COL_co]]:
2147 """Given a :class:`_expression.ColumnElement`, return the exported
2148 :class:`_expression.ColumnElement` object from this
2149 :class:`_expression.ColumnCollection`
2150 which corresponds to that original :class:`_expression.ColumnElement`
2151 via a common
2152 ancestor column.
2154 See :meth:`.ColumnCollection.corresponding_column` for parameter
2155 information.
2157 """
2158 # TODO: cython candidate
2160 # don't dig around if the column is locally present
2161 if column in self._colset:
2162 return column
2164 selected_intersection, selected_metrics = None, None
2165 target_set = column.proxy_set
2167 pi = self._proxy_index
2168 if not pi:
2169 self._init_proxy_index()
2171 for current_metrics in (
2172 mm for ts in target_set if ts in pi for mm in pi[ts]
2173 ):
2174 if not require_embedded or current_metrics.embedded(target_set):
2175 if selected_metrics is None:
2176 # no corresponding column yet, pick this one.
2177 selected_metrics = current_metrics
2178 continue
2180 current_intersection = target_set.intersection(
2181 current_metrics.column._expanded_proxy_set
2182 )
2183 if selected_intersection is None:
2184 selected_intersection = target_set.intersection(
2185 selected_metrics.column._expanded_proxy_set
2186 )
2188 if len(current_intersection) > len(selected_intersection):
2189 # 'current' has a larger field of correspondence than
2190 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x
2191 # matches a1.c.x->table.c.x better than
2192 # selectable.c.x->table.c.x does.
2194 selected_metrics = current_metrics
2195 selected_intersection = current_intersection
2196 elif current_intersection == selected_intersection:
2197 # they have the same field of correspondence. see
2198 # which proxy_set has fewer columns in it, which
2199 # indicates a closer relationship with the root
2200 # column. Also take into account the "weight"
2201 # attribute which CompoundSelect() uses to give
2202 # higher precedence to columns based on vertical
2203 # position in the compound statement, and discard
2204 # columns that have no reference to the target
2205 # column (also occurs with CompoundSelect)
2207 selected_col_distance = sum(
2208 [
2209 sc._annotations.get("weight", 1)
2210 for sc in (
2211 selected_metrics.column._uncached_proxy_list()
2212 )
2213 if sc.shares_lineage(column)
2214 ],
2215 )
2216 current_col_distance = sum(
2217 [
2218 sc._annotations.get("weight", 1)
2219 for sc in (
2220 current_metrics.column._uncached_proxy_list()
2221 )
2222 if sc.shares_lineage(column)
2223 ],
2224 )
2225 if current_col_distance < selected_col_distance:
2226 selected_metrics = current_metrics
2227 selected_intersection = current_intersection
2229 return selected_metrics.column if selected_metrics else None
2232_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]")
2235class DedupeColumnCollection(WriteableColumnCollection[str, _NAMEDCOL]):
2236 """A :class:`_expression.ColumnCollection`
2237 that maintains deduplicating behavior.
2239 This is useful by schema level objects such as :class:`_schema.Table` and
2240 :class:`.PrimaryKeyConstraint`. The collection includes more
2241 sophisticated mutator methods as well to suit schema objects which
2242 require mutable column collections.
2244 .. versionadded:: 1.4
2246 """
2248 def add( # type: ignore[override]
2249 self,
2250 column: _NAMEDCOL,
2251 key: Optional[str] = None,
2252 *,
2253 index: Optional[int] = None,
2254 ) -> None:
2255 if key is not None and column.key != key:
2256 raise exc.ArgumentError(
2257 "DedupeColumnCollection requires columns be under "
2258 "the same key as their .key"
2259 )
2260 key = column.key
2262 if key is None:
2263 raise exc.ArgumentError(
2264 "Can't add unnamed column to column collection"
2265 )
2267 if key in self._index:
2268 existing = self._index[key][1]
2270 if existing is column:
2271 return
2273 self.replace(column, index=index)
2275 # pop out memoized proxy_set as this
2276 # operation may very well be occurring
2277 # in a _make_proxy operation
2278 util.memoized_property.reset(column, "proxy_set")
2279 else:
2280 self._append_new_column(key, column, index=index)
2282 def _append_new_column(
2283 self, key: str, named_column: _NAMEDCOL, *, index: Optional[int] = None
2284 ) -> None:
2285 collection_length = len(self._collection)
2287 if index is None:
2288 l = collection_length
2289 else:
2290 if index < 0:
2291 index = max(0, collection_length + index)
2292 l = index
2294 if index is None:
2295 self._collection.append(
2296 (key, named_column, _ColumnMetrics(self, named_column))
2297 )
2298 else:
2299 self._collection.insert(
2300 index, (key, named_column, _ColumnMetrics(self, named_column))
2301 )
2303 self._colset.add(named_column._deannotate())
2305 if index is not None:
2306 for idx in reversed(range(index, collection_length)):
2307 self._index[idx + 1] = self._index[idx]
2309 self._index[l] = (key, named_column)
2310 self._index[key] = (key, named_column)
2312 def _populate_separate_keys(
2313 self, iter_: Iterable[Tuple[str, _NAMEDCOL]]
2314 ) -> None:
2315 """populate from an iterator of (key, column)"""
2316 cols = list(iter_)
2318 replace_col = []
2319 for k, col in cols:
2320 if col.key != k:
2321 raise exc.ArgumentError(
2322 "DedupeColumnCollection requires columns be under "
2323 "the same key as their .key"
2324 )
2325 if col.name in self._index and col.key != col.name:
2326 replace_col.append(col)
2327 elif col.key in self._index:
2328 replace_col.append(col)
2329 else:
2330 self._index[k] = (k, col)
2331 self._collection.append((k, col, _ColumnMetrics(self, col)))
2332 self._colset.update(c._deannotate() for (k, c, _) in self._collection)
2334 self._index.update(
2335 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection)
2336 )
2337 for col in replace_col:
2338 self.replace(col)
2340 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None:
2341 self._populate_separate_keys((col.key, col) for col in iter_)
2343 def remove(self, column: _NAMEDCOL) -> None:
2344 if column not in self._colset:
2345 raise ValueError(
2346 "Can't remove column %r; column is not in this collection"
2347 % column
2348 )
2349 del self._index[column.key]
2350 self._colset.remove(column)
2351 self._collection[:] = [
2352 (k, c, metrics)
2353 for (k, c, metrics) in self._collection
2354 if c is not column
2355 ]
2356 for metrics in self._proxy_index.get(column, ()):
2357 metrics.dispose(self)
2359 self._index.update(
2360 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2361 )
2362 # delete higher index
2363 del self._index[len(self._collection)]
2365 def replace(
2366 self,
2367 column: _NAMEDCOL,
2368 *,
2369 extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
2370 index: Optional[int] = None,
2371 ) -> None:
2372 """add the given column to this collection, removing unaliased
2373 versions of this column as well as existing columns with the
2374 same key.
2376 e.g.::
2378 t = Table("sometable", metadata, Column("col1", Integer))
2379 t.columns.replace(Column("col1", Integer, key="columnone"))
2381 will remove the original 'col1' from the collection, and add
2382 the new column under the name 'columnname'.
2384 Used by schema.Column to override columns during table reflection.
2386 """
2388 if extra_remove:
2389 remove_col = set(extra_remove)
2390 else:
2391 remove_col = set()
2392 # remove up to two columns based on matches of name as well as key
2393 if column.name in self._index and column.key != column.name:
2394 other = self._index[column.name][1]
2395 if other.name == other.key:
2396 remove_col.add(other)
2398 if column.key in self._index:
2399 remove_col.add(self._index[column.key][1])
2401 if not remove_col:
2402 self._append_new_column(column.key, column, index=index)
2403 return
2404 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
2405 replace_index = None
2407 for idx, (k, col, metrics) in enumerate(self._collection):
2408 if col in remove_col:
2409 if replace_index is None:
2410 replace_index = idx
2411 new_cols.append(
2412 (column.key, column, _ColumnMetrics(self, column))
2413 )
2414 else:
2415 new_cols.append((k, col, metrics))
2417 if remove_col:
2418 self._colset.difference_update(remove_col)
2420 for rc in remove_col:
2421 for metrics in self._proxy_index.get(rc, ()):
2422 metrics.dispose(self)
2424 if replace_index is None:
2425 if index is not None:
2426 new_cols.insert(
2427 index, (column.key, column, _ColumnMetrics(self, column))
2428 )
2430 else:
2431 new_cols.append(
2432 (column.key, column, _ColumnMetrics(self, column))
2433 )
2434 elif index is not None:
2435 to_move = new_cols[replace_index]
2436 effective_positive_index = (
2437 index if index >= 0 else max(0, len(new_cols) + index)
2438 )
2439 new_cols.insert(index, to_move)
2440 if replace_index > effective_positive_index:
2441 del new_cols[replace_index + 1]
2442 else:
2443 del new_cols[replace_index]
2445 self._colset.add(column._deannotate())
2446 self._collection[:] = new_cols
2448 self._index.clear()
2450 self._index.update(
2451 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2452 )
2453 self._index.update({k: (k, col) for (k, col, _) in self._collection})
2456class ReadOnlyColumnCollection(
2457 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co]
2458):
2459 __slots__ = ("_parent",)
2461 _parent: WriteableColumnCollection[_COLKEY, _COL_co]
2463 def __init__(
2464 self, collection: WriteableColumnCollection[_COLKEY, _COL_co]
2465 ):
2466 object.__setattr__(self, "_parent", collection)
2467 object.__setattr__(self, "_index", collection._index)
2468 object.__setattr__(self, "_collection", collection._collection)
2469 object.__setattr__(self, "_colset", collection._colset)
2470 object.__setattr__(self, "_proxy_index", collection._proxy_index)
2472 def _as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
2473 return self
2475 def __getstate__(self) -> Dict[str, ColumnCollection[_COLKEY, _COL_co]]:
2476 return {"_parent": self._parent}
2478 def __setstate__(self, state: Dict[str, Any]) -> None:
2479 parent = state["_parent"]
2480 self.__init__(parent) # type: ignore
2482 def corresponding_column(
2483 self, column: _COL, require_embedded: bool = False
2484 ) -> Optional[Union[_COL, _COL_co]]:
2485 """Given a :class:`_expression.ColumnElement`, return the exported
2486 :class:`_expression.ColumnElement` object from this
2487 :class:`_expression.ColumnCollection`
2488 which corresponds to that original :class:`_expression.ColumnElement`
2489 via a common
2490 ancestor column.
2492 See :meth:`.ColumnCollection.corresponding_column` for parameter
2493 information.
2495 """
2496 return self._parent.corresponding_column(column, require_embedded)
2499class ColumnSet(util.OrderedSet["ColumnClause[Any]"]):
2500 def contains_column(self, col: ColumnClause[Any]) -> bool:
2501 return col in self
2503 def extend(self, cols: Iterable[Any]) -> None:
2504 for col in cols:
2505 self.add(col)
2507 def __eq__(self, other):
2508 l = []
2509 for c in other:
2510 for local in self:
2511 if c.shares_lineage(local):
2512 l.append(c == local)
2513 return elements.and_(*l)
2515 def __hash__(self) -> int: # type: ignore[override]
2516 return hash(tuple(x for x in self))
2519def _entity_namespace(
2520 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2521) -> _EntityNamespace:
2522 """Return the nearest .entity_namespace for the given entity.
2524 If not immediately available, does an iterate to find a sub-element
2525 that has one, if any.
2527 """
2528 try:
2529 return cast(_HasEntityNamespace, entity).entity_namespace
2530 except AttributeError:
2531 for elem in visitors.iterate(cast(ExternallyTraversible, entity)):
2532 if _is_has_entity_namespace(elem):
2533 return elem.entity_namespace
2534 else:
2535 raise
2538@overload
2539def _entity_namespace_key(
2540 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2541 key: str,
2542) -> SQLCoreOperations[Any]: ...
2545@overload
2546def _entity_namespace_key(
2547 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2548 key: str,
2549 default: _NoArg,
2550) -> SQLCoreOperations[Any]: ...
2553@overload
2554def _entity_namespace_key(
2555 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2556 key: str,
2557 default: _T,
2558) -> Union[SQLCoreOperations[Any], _T]: ...
2561def _entity_namespace_key(
2562 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2563 key: str,
2564 default: Union[SQLCoreOperations[Any], _T, _NoArg] = NO_ARG,
2565) -> Union[SQLCoreOperations[Any], _T]:
2566 """Return an entry from an entity_namespace.
2569 Raises :class:`_exc.InvalidRequestError` rather than attribute error
2570 on not found.
2572 """
2574 try:
2575 ns = _entity_namespace(entity)
2576 if default is not NO_ARG:
2577 return getattr(ns, key, default)
2578 else:
2579 return getattr(ns, key) # type: ignore
2580 except AttributeError as err:
2581 raise exc.InvalidRequestError(
2582 'Entity namespace for "%s" has no property "%s"' % (entity, key)
2583 ) from err
2586def _entity_namespace_key_search_all(
2587 entities: Collection[Any],
2588 key: str,
2589) -> SQLCoreOperations[Any]:
2590 """Search multiple entities for a key, raise if ambiguous or not found.
2592 This is used by filter_by() to search across all FROM clause entities
2593 when a single entity doesn't have the requested attribute.
2595 .. versionadded:: 2.1
2597 Raises:
2598 AmbiguousColumnError: If key exists in multiple entities
2599 InvalidRequestError: If key doesn't exist in any entity
2600 """
2602 match_: SQLCoreOperations[Any] | None = None
2604 for entity in entities:
2605 ns = _entity_namespace(entity)
2606 # Check if the attribute exists
2607 if hasattr(ns, key):
2608 if match_ is not None:
2609 entity_desc = ", ".join(str(e) for e in list(entities)[:3])
2610 if len(entities) > 3:
2611 entity_desc += f", ... ({len(entities)} total)"
2612 raise exc.AmbiguousColumnError(
2613 f'Attribute name "{key}" is ambiguous; it exists in '
2614 f"multiple FROM clause entities ({entity_desc}). "
2615 f"Use filter() with explicit column references instead "
2616 f"of filter_by()."
2617 )
2618 match_ = getattr(ns, key)
2620 if match_ is None:
2621 # No entity has this attribute
2622 entity_desc = ", ".join(str(e) for e in list(entities)[:3])
2623 if len(entities) > 3:
2624 entity_desc += f", ... ({len(entities)} total)"
2625 raise exc.InvalidRequestError(
2626 f'None of the FROM clause entities have a property "{key}". '
2627 f"Searched entities: {entity_desc}"
2628 )
2630 return match_