1# sql/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Foundational utilities common to many sql modules."""
10
11
12from __future__ import annotations
13
14import collections
15from enum import Enum
16import itertools
17from itertools import zip_longest
18import operator
19import re
20from typing import Any
21from typing import Callable
22from typing import cast
23from typing import Dict
24from typing import Final
25from typing import FrozenSet
26from typing import Generator
27from typing import Generic
28from typing import Iterable
29from typing import Iterator
30from typing import List
31from typing import Mapping
32from typing import MutableMapping
33from typing import NamedTuple
34from typing import NoReturn
35from typing import Optional
36from typing import overload
37from typing import Protocol
38from typing import Sequence
39from typing import Set
40from typing import Tuple
41from typing import Type
42from typing import TYPE_CHECKING
43from typing import TypeVar
44from typing import Union
45
46from . import roles
47from . import visitors
48from .cache_key import HasCacheKey # noqa
49from .cache_key import MemoizedHasCacheKey # noqa
50from .traversals import HasCopyInternals # noqa
51from .visitors import ClauseVisitor
52from .visitors import ExtendedInternalTraversal
53from .visitors import ExternallyTraversible
54from .visitors import InternalTraversal
55from .. import event
56from .. import exc
57from .. import util
58from ..util import HasMemoized as HasMemoized
59from ..util import hybridmethod
60from ..util.typing import Self
61from ..util.typing import TypeGuard
62from ..util.typing import TypeVarTuple
63from ..util.typing import Unpack
64
65if TYPE_CHECKING:
66 from . import coercions
67 from . import elements
68 from . import type_api
69 from ._orm_types import DMLStrategyArgument
70 from ._orm_types import SynchronizeSessionArgument
71 from ._typing import _CLE
72 from .cache_key import CacheKey
73 from .compiler import SQLCompiler
74 from .dml import Delete
75 from .dml import Insert
76 from .dml import Update
77 from .elements import BindParameter
78 from .elements import ClauseElement
79 from .elements import ClauseList
80 from .elements import ColumnClause # noqa
81 from .elements import ColumnElement
82 from .elements import NamedColumn
83 from .elements import SQLCoreOperations
84 from .elements import TextClause
85 from .schema import Column
86 from .schema import DefaultGenerator
87 from .selectable import _JoinTargetElement
88 from .selectable import _SelectIterable
89 from .selectable import FromClause
90 from .selectable import Select
91 from .visitors import anon_map
92 from ..engine import Connection
93 from ..engine import CursorResult
94 from ..engine.interfaces import _CoreMultiExecuteParams
95 from ..engine.interfaces import _ExecuteOptions
96 from ..engine.interfaces import _ImmutableExecuteOptions
97 from ..engine.interfaces import CacheStats
98 from ..engine.interfaces import Compiled
99 from ..engine.interfaces import CompiledCacheType
100 from ..engine.interfaces import CoreExecuteOptionsParameter
101 from ..engine.interfaces import Dialect
102 from ..engine.interfaces import IsolationLevel
103 from ..engine.interfaces import SchemaTranslateMapType
104 from ..event import dispatcher
105
106if not TYPE_CHECKING:
107 coercions = None # noqa
108 elements = None # noqa
109 type_api = None # noqa
110
111
112_Ts = TypeVarTuple("_Ts")
113
114
115class _NoArg(Enum):
116 NO_ARG = 0
117
118 def __repr__(self):
119 return f"_NoArg.{self.name}"
120
121
122NO_ARG: Final = _NoArg.NO_ARG
123
124
125class _NoneName(Enum):
126 NONE_NAME = 0
127 """indicate a 'deferred' name that was ultimately the value None."""
128
129
130_NONE_NAME: Final = _NoneName.NONE_NAME
131
132_T = TypeVar("_T", bound=Any)
133
134_Fn = TypeVar("_Fn", bound=Callable[..., Any])
135
136_AmbiguousTableNameMap = MutableMapping[str, str]
137
138
139class _DefaultDescriptionTuple(NamedTuple):
140 arg: Any
141 is_scalar: Optional[bool]
142 is_callable: Optional[bool]
143 is_sentinel: Optional[bool]
144
145 @classmethod
146 def _from_column_default(
147 cls, default: Optional[DefaultGenerator]
148 ) -> _DefaultDescriptionTuple:
149 return (
150 _DefaultDescriptionTuple(
151 default.arg, # type: ignore
152 default.is_scalar,
153 default.is_callable,
154 default.is_sentinel,
155 )
156 if default
157 and (
158 default.has_arg
159 or (not default.for_update and default.is_sentinel)
160 )
161 else _DefaultDescriptionTuple(None, None, None, None)
162 )
163
164
165_never_select_column: operator.attrgetter[Any] = operator.attrgetter(
166 "_omit_from_statements"
167)
168
169
170class _EntityNamespace(Protocol):
171 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ...
172
173
174class _HasEntityNamespace(Protocol):
175 @util.ro_non_memoized_property
176 def entity_namespace(self) -> _EntityNamespace: ...
177
178
179def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]:
180 return hasattr(element, "entity_namespace")
181
182
183# Remove when https://github.com/python/mypy/issues/14640 will be fixed
184_Self = TypeVar("_Self", bound=Any)
185
186
187class Immutable:
188 """mark a ClauseElement as 'immutable' when expressions are cloned.
189
190 "immutable" objects refers to the "mutability" of an object in the
191 context of SQL DQL and DML generation. Such as, in DQL, one can
192 compose a SELECT or subquery of varied forms, but one cannot modify
193 the structure of a specific table or column within DQL.
194 :class:`.Immutable` is mostly intended to follow this concept, and as
195 such the primary "immutable" objects are :class:`.ColumnClause`,
196 :class:`.Column`, :class:`.TableClause`, :class:`.Table`.
197
198 """
199
200 __slots__ = ()
201
202 _is_immutable: bool = True
203
204 def unique_params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
205 raise NotImplementedError("Immutable objects do not support copying")
206
207 def params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
208 raise NotImplementedError("Immutable objects do not support copying")
209
210 def _clone(self: _Self, **kw: Any) -> _Self:
211 return self
212
213 def _copy_internals(
214 self, *, omit_attrs: Iterable[str] = (), **kw: Any
215 ) -> None:
216 pass
217
218
219class SingletonConstant(Immutable):
220 """Represent SQL constants like NULL, TRUE, FALSE"""
221
222 _is_singleton_constant: bool = True
223
224 _singleton: SingletonConstant
225
226 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T:
227 return cast(_T, cls._singleton)
228
229 @util.non_memoized_property
230 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
231 raise NotImplementedError()
232
233 @classmethod
234 def _create_singleton(cls) -> None:
235 obj = object.__new__(cls)
236 obj.__init__() # type: ignore
237
238 # for a long time this was an empty frozenset, meaning
239 # a SingletonConstant would never be a "corresponding column" in
240 # a statement. This referred to #6259. However, in #7154 we see
241 # that we do in fact need "correspondence" to work when matching cols
242 # in result sets, so the non-correspondence was moved to a more
243 # specific level when we are actually adapting expressions for SQL
244 # render only.
245 obj.proxy_set = frozenset([obj])
246 cls._singleton = obj
247
248
249def _from_objects(
250 *elements: Union[
251 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement
252 ]
253) -> Iterator[FromClause]:
254 return itertools.chain.from_iterable(
255 [element._from_objects for element in elements]
256 )
257
258
259def _select_iterables(
260 elements: Iterable[roles.ColumnsClauseRole],
261) -> _SelectIterable:
262 """expand tables into individual columns in the
263 given list of column expressions.
264
265 """
266 return itertools.chain.from_iterable(
267 [c._select_iterable for c in elements]
268 )
269
270
271_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType")
272
273
274class _GenerativeType(Protocol):
275 def _generate(self) -> Self: ...
276
277
278def _generative(fn: _Fn) -> _Fn:
279 """non-caching _generative() decorator.
280
281 This is basically the legacy decorator that copies the object and
282 runs a method on the new copy.
283
284 """
285
286 @util.decorator
287 def _generative(
288 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any
289 ) -> _SelfGenerativeType:
290 """Mark a method as generative."""
291
292 self = self._generate()
293 x = fn(self, *args, **kw)
294 assert x is self, "generative methods must return self"
295 return self
296
297 decorated = _generative(fn)
298 decorated.non_generative = fn # type: ignore
299 return decorated
300
301
302def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]:
303 msgs: Dict[str, str] = kw.pop("msgs", {})
304
305 defaults: Dict[str, str] = kw.pop("defaults", {})
306
307 getters: List[Tuple[str, operator.attrgetter[Any], Optional[str]]] = [
308 (name, operator.attrgetter(name), defaults.get(name, None))
309 for name in names
310 ]
311
312 @util.decorator
313 def check(fn: _Fn, *args: Any, **kw: Any) -> Any:
314 # make pylance happy by not including "self" in the argument
315 # list
316 self = args[0]
317 args = args[1:]
318 for name, getter, default_ in getters:
319 if getter(self) is not default_:
320 msg = msgs.get(
321 name,
322 "Method %s() has already been invoked on this %s construct"
323 % (fn.__name__, self.__class__),
324 )
325 raise exc.InvalidRequestError(msg)
326 return fn(self, *args, **kw)
327
328 return check
329
330
331def _clone(element, **kw):
332 return element._clone(**kw)
333
334
335def _expand_cloned(
336 elements: Iterable[_CLE],
337) -> Iterable[_CLE]:
338 """expand the given set of ClauseElements to be the set of all 'cloned'
339 predecessors.
340
341 """
342 # TODO: cython candidate
343 return itertools.chain(*[x._cloned_set for x in elements])
344
345
346def _de_clone(
347 elements: Iterable[_CLE],
348) -> Iterable[_CLE]:
349 for x in elements:
350 while x._is_clone_of is not None:
351 x = x._is_clone_of
352 yield x
353
354
355def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
356 """return the intersection of sets a and b, counting
357 any overlap between 'cloned' predecessors.
358
359 The returned set is in terms of the entities present within 'a'.
360
361 """
362 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
363 _expand_cloned(b)
364 )
365 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)}
366
367
368def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
369 all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
370 _expand_cloned(b)
371 )
372 return {
373 elem for elem in a if not all_overlap.intersection(elem._cloned_set)
374 }
375
376
377class _DialectArgView(MutableMapping[str, Any]):
378 """A dictionary view of dialect-level arguments in the form
379 <dialectname>_<argument_name>.
380
381 """
382
383 __slots__ = ("obj",)
384
385 def __init__(self, obj: DialectKWArgs) -> None:
386 self.obj = obj
387
388 def _key(self, key: str) -> Tuple[str, str]:
389 try:
390 dialect, value_key = key.split("_", 1)
391 except ValueError as err:
392 raise KeyError(key) from err
393 else:
394 return dialect, value_key
395
396 def __getitem__(self, key: str) -> Any:
397 dialect, value_key = self._key(key)
398
399 try:
400 opt = self.obj.dialect_options[dialect]
401 except exc.NoSuchModuleError as err:
402 raise KeyError(key) from err
403 else:
404 return opt[value_key]
405
406 def __setitem__(self, key: str, value: Any) -> None:
407 try:
408 dialect, value_key = self._key(key)
409 except KeyError as err:
410 raise exc.ArgumentError(
411 "Keys must be of the form <dialectname>_<argname>"
412 ) from err
413 else:
414 self.obj.dialect_options[dialect][value_key] = value
415
416 def __delitem__(self, key: str) -> None:
417 dialect, value_key = self._key(key)
418 del self.obj.dialect_options[dialect][value_key]
419
420 def __len__(self) -> int:
421 return sum(
422 len(args._non_defaults)
423 for args in self.obj.dialect_options.values()
424 )
425
426 def __iter__(self) -> Generator[str, None, None]:
427 return (
428 "%s_%s" % (dialect_name, value_name)
429 for dialect_name in self.obj.dialect_options
430 for value_name in self.obj.dialect_options[
431 dialect_name
432 ]._non_defaults
433 )
434
435
436class _DialectArgDict(MutableMapping[str, Any]):
437 """A dictionary view of dialect-level arguments for a specific
438 dialect.
439
440 Maintains a separate collection of user-specified arguments
441 and dialect-specified default arguments.
442
443 """
444
445 def __init__(self) -> None:
446 self._non_defaults: Dict[str, Any] = {}
447 self._defaults: Dict[str, Any] = {}
448
449 def __len__(self) -> int:
450 return len(set(self._non_defaults).union(self._defaults))
451
452 def __iter__(self) -> Iterator[str]:
453 return iter(set(self._non_defaults).union(self._defaults))
454
455 def __getitem__(self, key: str) -> Any:
456 if key in self._non_defaults:
457 return self._non_defaults[key]
458 else:
459 return self._defaults[key]
460
461 def __setitem__(self, key: str, value: Any) -> None:
462 self._non_defaults[key] = value
463
464 def __delitem__(self, key: str) -> None:
465 del self._non_defaults[key]
466
467
468@util.preload_module("sqlalchemy.dialects")
469def _kw_reg_for_dialect(dialect_name: str) -> Optional[Dict[Any, Any]]:
470 dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
471 if dialect_cls.construct_arguments is None:
472 return None
473 return dict(dialect_cls.construct_arguments)
474
475
476class DialectKWArgs:
477 """Establish the ability for a class to have dialect-specific arguments
478 with defaults and constructor validation.
479
480 The :class:`.DialectKWArgs` interacts with the
481 :attr:`.DefaultDialect.construct_arguments` present on a dialect.
482
483 .. seealso::
484
485 :attr:`.DefaultDialect.construct_arguments`
486
487 """
488
489 __slots__ = ()
490
491 _dialect_kwargs_traverse_internals: List[Tuple[str, Any]] = [
492 ("dialect_options", InternalTraversal.dp_dialect_options)
493 ]
494
495 @classmethod
496 def argument_for(
497 cls, dialect_name: str, argument_name: str, default: Any
498 ) -> None:
499 """Add a new kind of dialect-specific keyword argument for this class.
500
501 E.g.::
502
503 Index.argument_for("mydialect", "length", None)
504
505 some_index = Index("a", "b", mydialect_length=5)
506
507 The :meth:`.DialectKWArgs.argument_for` method is a per-argument
508 way adding extra arguments to the
509 :attr:`.DefaultDialect.construct_arguments` dictionary. This
510 dictionary provides a list of argument names accepted by various
511 schema-level constructs on behalf of a dialect.
512
513 New dialects should typically specify this dictionary all at once as a
514 data member of the dialect class. The use case for ad-hoc addition of
515 argument names is typically for end-user code that is also using
516 a custom compilation scheme which consumes the additional arguments.
517
518 :param dialect_name: name of a dialect. The dialect must be
519 locatable, else a :class:`.NoSuchModuleError` is raised. The
520 dialect must also include an existing
521 :attr:`.DefaultDialect.construct_arguments` collection, indicating
522 that it participates in the keyword-argument validation and default
523 system, else :class:`.ArgumentError` is raised. If the dialect does
524 not include this collection, then any keyword argument can be
525 specified on behalf of this dialect already. All dialects packaged
526 within SQLAlchemy include this collection, however for third party
527 dialects, support may vary.
528
529 :param argument_name: name of the parameter.
530
531 :param default: default value of the parameter.
532
533 """
534
535 construct_arg_dictionary: Optional[Dict[Any, Any]] = (
536 DialectKWArgs._kw_registry[dialect_name]
537 )
538 if construct_arg_dictionary is None:
539 raise exc.ArgumentError(
540 "Dialect '%s' does have keyword-argument "
541 "validation and defaults enabled configured" % dialect_name
542 )
543 if cls not in construct_arg_dictionary:
544 construct_arg_dictionary[cls] = {}
545 construct_arg_dictionary[cls][argument_name] = default
546
547 @property
548 def dialect_kwargs(self) -> _DialectArgView:
549 """A collection of keyword arguments specified as dialect-specific
550 options to this construct.
551
552 The arguments are present here in their original ``<dialect>_<kwarg>``
553 format. Only arguments that were actually passed are included;
554 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which
555 contains all options known by this dialect including defaults.
556
557 The collection is also writable; keys are accepted of the
558 form ``<dialect>_<kwarg>`` where the value will be assembled
559 into the list of options.
560
561 .. seealso::
562
563 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form
564
565 """
566 return _DialectArgView(self)
567
568 @property
569 def kwargs(self) -> _DialectArgView:
570 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
571 return self.dialect_kwargs
572
573 _kw_registry: util.PopulateDict[str, Optional[Dict[Any, Any]]] = (
574 util.PopulateDict(_kw_reg_for_dialect)
575 )
576
577 @classmethod
578 def _kw_reg_for_dialect_cls(cls, dialect_name: str) -> _DialectArgDict:
579 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
580 d = _DialectArgDict()
581
582 if construct_arg_dictionary is None:
583 d._defaults.update({"*": None})
584 else:
585 for cls in reversed(cls.__mro__):
586 if cls in construct_arg_dictionary:
587 d._defaults.update(construct_arg_dictionary[cls])
588 return d
589
590 @util.memoized_property
591 def dialect_options(self) -> util.PopulateDict[str, _DialectArgDict]:
592 """A collection of keyword arguments specified as dialect-specific
593 options to this construct.
594
595 This is a two-level nested registry, keyed to ``<dialect_name>``
596 and ``<argument_name>``. For example, the ``postgresql_where``
597 argument would be locatable as::
598
599 arg = my_object.dialect_options["postgresql"]["where"]
600
601 .. versionadded:: 0.9.2
602
603 .. seealso::
604
605 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form
606
607 """
608
609 return util.PopulateDict(self._kw_reg_for_dialect_cls)
610
611 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None:
612 # validate remaining kwargs that they all specify DB prefixes
613
614 if not kwargs:
615 return
616
617 for k in kwargs:
618 m = re.match("^(.+?)_(.+)$", k)
619 if not m:
620 raise TypeError(
621 "Additional arguments should be "
622 "named <dialectname>_<argument>, got '%s'" % k
623 )
624 dialect_name, arg_name = m.group(1, 2)
625
626 try:
627 construct_arg_dictionary = self.dialect_options[dialect_name]
628 except exc.NoSuchModuleError:
629 util.warn(
630 "Can't validate argument %r; can't "
631 "locate any SQLAlchemy dialect named %r"
632 % (k, dialect_name)
633 )
634 self.dialect_options[dialect_name] = d = _DialectArgDict()
635 d._defaults.update({"*": None})
636 d._non_defaults[arg_name] = kwargs[k]
637 else:
638 if (
639 "*" not in construct_arg_dictionary
640 and arg_name not in construct_arg_dictionary
641 ):
642 raise exc.ArgumentError(
643 "Argument %r is not accepted by "
644 "dialect %r on behalf of %r"
645 % (k, dialect_name, self.__class__)
646 )
647 else:
648 construct_arg_dictionary[arg_name] = kwargs[k]
649
650
651class CompileState:
652 """Produces additional object state necessary for a statement to be
653 compiled.
654
655 the :class:`.CompileState` class is at the base of classes that assemble
656 state for a particular statement object that is then used by the
657 compiler. This process is essentially an extension of the process that
658 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis
659 on converting raw user intent into more organized structures rather than
660 producing string output. The top-level :class:`.CompileState` for the
661 statement being executed is also accessible when the execution context
662 works with invoking the statement and collecting results.
663
664 The production of :class:`.CompileState` is specific to the compiler, such
665 as within the :meth:`.SQLCompiler.visit_insert`,
666 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also
667 responsible for associating the :class:`.CompileState` with the
668 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement,
669 i.e. the outermost SQL statement that's actually being executed.
670 There can be other :class:`.CompileState` objects that are not the
671 toplevel, such as when a SELECT subquery or CTE-nested
672 INSERT/UPDATE/DELETE is generated.
673
674 .. versionadded:: 1.4
675
676 """
677
678 __slots__ = ("statement", "_ambiguous_table_name_map")
679
680 plugins: Dict[Tuple[str, str], Type[CompileState]] = {}
681
682 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap]
683
684 @classmethod
685 def create_for_statement(
686 cls, statement: Executable, compiler: SQLCompiler, **kw: Any
687 ) -> CompileState:
688 # factory construction.
689
690 if statement._propagate_attrs:
691 plugin_name = statement._propagate_attrs.get(
692 "compile_state_plugin", "default"
693 )
694 klass = cls.plugins.get(
695 (plugin_name, statement._effective_plugin_target), None
696 )
697 if klass is None:
698 klass = cls.plugins[
699 ("default", statement._effective_plugin_target)
700 ]
701
702 else:
703 klass = cls.plugins[
704 ("default", statement._effective_plugin_target)
705 ]
706
707 if klass is cls:
708 return cls(statement, compiler, **kw)
709 else:
710 return klass.create_for_statement(statement, compiler, **kw)
711
712 def __init__(self, statement, compiler, **kw):
713 self.statement = statement
714
715 @classmethod
716 def get_plugin_class(
717 cls, statement: Executable
718 ) -> Optional[Type[CompileState]]:
719 plugin_name = statement._propagate_attrs.get(
720 "compile_state_plugin", None
721 )
722
723 if plugin_name:
724 key = (plugin_name, statement._effective_plugin_target)
725 if key in cls.plugins:
726 return cls.plugins[key]
727
728 # there's no case where we call upon get_plugin_class() and want
729 # to get None back, there should always be a default. return that
730 # if there was no plugin-specific class (e.g. Insert with "orm"
731 # plugin)
732 try:
733 return cls.plugins[("default", statement._effective_plugin_target)]
734 except KeyError:
735 return None
736
737 @classmethod
738 def _get_plugin_class_for_plugin(
739 cls, statement: Executable, plugin_name: str
740 ) -> Optional[Type[CompileState]]:
741 try:
742 return cls.plugins[
743 (plugin_name, statement._effective_plugin_target)
744 ]
745 except KeyError:
746 return None
747
748 @classmethod
749 def plugin_for(
750 cls, plugin_name: str, visit_name: str
751 ) -> Callable[[_Fn], _Fn]:
752 def decorate(cls_to_decorate):
753 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate
754 return cls_to_decorate
755
756 return decorate
757
758
759class Generative(HasMemoized):
760 """Provide a method-chaining pattern in conjunction with the
761 @_generative decorator."""
762
763 def _generate(self) -> Self:
764 skip = self._memoized_keys
765 cls = self.__class__
766 s = cls.__new__(cls)
767 if skip:
768 # ensure this iteration remains atomic
769 s.__dict__ = {
770 k: v for k, v in self.__dict__.copy().items() if k not in skip
771 }
772 else:
773 s.__dict__ = self.__dict__.copy()
774 return s
775
776
777class InPlaceGenerative(HasMemoized):
778 """Provide a method-chaining pattern in conjunction with the
779 @_generative decorator that mutates in place."""
780
781 __slots__ = ()
782
783 def _generate(self):
784 skip = self._memoized_keys
785 # note __dict__ needs to be in __slots__ if this is used
786 for k in skip:
787 self.__dict__.pop(k, None)
788 return self
789
790
791class HasCompileState(Generative):
792 """A class that has a :class:`.CompileState` associated with it."""
793
794 _compile_state_plugin: Optional[Type[CompileState]] = None
795
796 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT
797
798 _compile_state_factory = CompileState.create_for_statement
799
800
801class _MetaOptions(type):
802 """metaclass for the Options class.
803
804 This metaclass is actually necessary despite the availability of the
805 ``__init_subclass__()`` hook as this type also provides custom class-level
806 behavior for the ``__add__()`` method.
807
808 """
809
810 _cache_attrs: Tuple[str, ...]
811
812 def __add__(self, other):
813 o1 = self()
814
815 if set(other).difference(self._cache_attrs):
816 raise TypeError(
817 "dictionary contains attributes not covered by "
818 "Options class %s: %r"
819 % (self, set(other).difference(self._cache_attrs))
820 )
821
822 o1.__dict__.update(other)
823 return o1
824
825 if TYPE_CHECKING:
826
827 def __getattr__(self, key: str) -> Any: ...
828
829 def __setattr__(self, key: str, value: Any) -> None: ...
830
831 def __delattr__(self, key: str) -> None: ...
832
833
834class Options(metaclass=_MetaOptions):
835 """A cacheable option dictionary with defaults."""
836
837 __slots__ = ()
838
839 _cache_attrs: Tuple[str, ...]
840
841 def __init_subclass__(cls) -> None:
842 dict_ = cls.__dict__
843 cls._cache_attrs = tuple(
844 sorted(
845 d
846 for d in dict_
847 if not d.startswith("__")
848 and d not in ("_cache_key_traversal",)
849 )
850 )
851 super().__init_subclass__()
852
853 def __init__(self, **kw: Any) -> None:
854 self.__dict__.update(kw)
855
856 def __add__(self, other):
857 o1 = self.__class__.__new__(self.__class__)
858 o1.__dict__.update(self.__dict__)
859
860 if set(other).difference(self._cache_attrs):
861 raise TypeError(
862 "dictionary contains attributes not covered by "
863 "Options class %s: %r"
864 % (self, set(other).difference(self._cache_attrs))
865 )
866
867 o1.__dict__.update(other)
868 return o1
869
870 def __eq__(self, other):
871 # TODO: very inefficient. This is used only in test suites
872 # right now.
873 for a, b in zip_longest(self._cache_attrs, other._cache_attrs):
874 if getattr(self, a) != getattr(other, b):
875 return False
876 return True
877
878 def __repr__(self) -> str:
879 # TODO: fairly inefficient, used only in debugging right now.
880
881 return "%s(%s)" % (
882 self.__class__.__name__,
883 ", ".join(
884 "%s=%r" % (k, self.__dict__[k])
885 for k in self._cache_attrs
886 if k in self.__dict__
887 ),
888 )
889
890 @classmethod
891 def isinstance(cls, klass: Type[Any]) -> bool:
892 return issubclass(cls, klass)
893
894 @hybridmethod
895 def add_to_element(self, name: str, value: str) -> Any:
896 return self + {name: getattr(self, name) + value}
897
898 @hybridmethod
899 def _state_dict_inst(self) -> Mapping[str, Any]:
900 return self.__dict__
901
902 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT
903
904 @_state_dict_inst.classlevel
905 def _state_dict(cls) -> Mapping[str, Any]:
906 return cls._state_dict_const
907
908 @classmethod
909 def safe_merge(cls, other: "Options") -> Any:
910 d = other._state_dict()
911
912 # only support a merge with another object of our class
913 # and which does not have attrs that we don't. otherwise
914 # we risk having state that might not be part of our cache
915 # key strategy
916
917 if (
918 cls is not other.__class__
919 and other._cache_attrs
920 and set(other._cache_attrs).difference(cls._cache_attrs)
921 ):
922 raise TypeError(
923 "other element %r is not empty, is not of type %s, "
924 "and contains attributes not covered here %r"
925 % (
926 other,
927 cls,
928 set(other._cache_attrs).difference(cls._cache_attrs),
929 )
930 )
931 return cls + d
932
933 @classmethod
934 def from_execution_options(
935 cls,
936 key: str,
937 attrs: set[str],
938 exec_options: Mapping[str, Any],
939 statement_exec_options: Mapping[str, Any],
940 ) -> Tuple["Options", Mapping[str, Any]]:
941 """process Options argument in terms of execution options.
942
943
944 e.g.::
945
946 (
947 load_options,
948 execution_options,
949 ) = QueryContext.default_load_options.from_execution_options(
950 "_sa_orm_load_options",
951 {"populate_existing", "autoflush", "yield_per"},
952 execution_options,
953 statement._execution_options,
954 )
955
956 get back the Options and refresh "_sa_orm_load_options" in the
957 exec options dict w/ the Options as well
958
959 """
960
961 # common case is that no options we are looking for are
962 # in either dictionary, so cancel for that first
963 check_argnames = attrs.intersection(
964 set(exec_options).union(statement_exec_options)
965 )
966
967 existing_options = exec_options.get(key, cls)
968
969 if check_argnames:
970 result = {}
971 for argname in check_argnames:
972 local = "_" + argname
973 if argname in exec_options:
974 result[local] = exec_options[argname]
975 elif argname in statement_exec_options:
976 result[local] = statement_exec_options[argname]
977
978 new_options = existing_options + result
979 exec_options = util.immutabledict().merge_with(
980 exec_options, {key: new_options}
981 )
982 return new_options, exec_options
983
984 else:
985 return existing_options, exec_options
986
987 if TYPE_CHECKING:
988
989 def __getattr__(self, key: str) -> Any: ...
990
991 def __setattr__(self, key: str, value: Any) -> None: ...
992
993 def __delattr__(self, key: str) -> None: ...
994
995
996class CacheableOptions(Options, HasCacheKey):
997 __slots__ = ()
998
999 @hybridmethod
1000 def _gen_cache_key_inst(
1001 self, anon_map: Any, bindparams: List[BindParameter[Any]]
1002 ) -> Optional[Tuple[Any]]:
1003 return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
1004
1005 @_gen_cache_key_inst.classlevel
1006 def _gen_cache_key(
1007 cls, anon_map: "anon_map", bindparams: List[BindParameter[Any]]
1008 ) -> Tuple[CacheableOptions, Any]:
1009 return (cls, ())
1010
1011 @hybridmethod
1012 def _generate_cache_key(self) -> Optional[CacheKey]:
1013 return HasCacheKey._generate_cache_key_for_object(self)
1014
1015
1016class ExecutableOption(HasCopyInternals):
1017 __slots__ = ()
1018
1019 _annotations: _ImmutableExecuteOptions = util.EMPTY_DICT
1020
1021 __visit_name__: str = "executable_option"
1022
1023 _is_has_cache_key: bool = False
1024
1025 _is_core: bool = True
1026
1027 def _clone(self, **kw):
1028 """Create a shallow copy of this ExecutableOption."""
1029 c = self.__class__.__new__(self.__class__)
1030 c.__dict__ = dict(self.__dict__) # type: ignore
1031 return c
1032
1033
1034_L = TypeVar("_L", bound=str)
1035
1036
1037class HasSyntaxExtensions(Generic[_L]):
1038
1039 _position_map: Mapping[_L, str]
1040
1041 @_generative
1042 def ext(self, extension: SyntaxExtension) -> Self:
1043 """Applies a SQL syntax extension to this statement.
1044
1045 SQL syntax extensions are :class:`.ClauseElement` objects that define
1046 some vendor-specific syntactical construct that take place in specific
1047 parts of a SQL statement. Examples include vendor extensions like
1048 PostgreSQL / SQLite's "ON DUPLICATE KEY UPDATE", PostgreSQL's
1049 "DISTINCT ON", and MySQL's "LIMIT" that can be applied to UPDATE
1050 and DELETE statements.
1051
1052 .. seealso::
1053
1054 :ref:`examples_syntax_extensions`
1055
1056 :func:`_mysql.limit` - DML LIMIT for MySQL
1057
1058 :func:`_postgresql.distinct_on` - DISTINCT ON for PostgreSQL
1059
1060 .. versionadded:: 2.1
1061
1062 """
1063 extension = coercions.expect(
1064 roles.SyntaxExtensionRole, extension, apply_propagate_attrs=self
1065 )
1066 self._apply_syntax_extension_to_self(extension)
1067 return self
1068
1069 @util.preload_module("sqlalchemy.sql.elements")
1070 def apply_syntax_extension_point(
1071 self,
1072 apply_fn: Callable[[Sequence[ClauseElement]], Sequence[ClauseElement]],
1073 position: _L,
1074 ) -> None:
1075 """Apply a :class:`.SyntaxExtension` to a known extension point.
1076
1077 Should be used only internally by :class:`.SyntaxExtension`.
1078
1079 E.g.::
1080
1081 class Qualify(SyntaxExtension, ClauseElement):
1082
1083 # ...
1084
1085 def apply_to_select(self, select_stmt: Select) -> None:
1086 # append self to existing
1087 select_stmt.apply_extension_point(
1088 lambda existing: [*existing, self], "post_criteria"
1089 )
1090
1091
1092 class ReplaceExt(SyntaxExtension, ClauseElement):
1093
1094 # ...
1095
1096 def apply_to_select(self, select_stmt: Select) -> None:
1097 # replace any existing elements regardless of type
1098 select_stmt.apply_extension_point(
1099 lambda existing: [self], "post_criteria"
1100 )
1101
1102
1103 class ReplaceOfTypeExt(SyntaxExtension, ClauseElement):
1104
1105 # ...
1106
1107 def apply_to_select(self, select_stmt: Select) -> None:
1108 # replace any existing elements of the same type
1109 select_stmt.apply_extension_point(
1110 self.append_replacing_same_type, "post_criteria"
1111 )
1112
1113 :param apply_fn: callable function that will receive a sequence of
1114 :class:`.ClauseElement` that is already populating the extension
1115 point (the sequence is empty if there isn't one), and should return
1116 a new sequence of :class:`.ClauseElement` that will newly populate
1117 that point. The function typically can choose to concatenate the
1118 existing values with the new one, or to replace the values that are
1119 there with a new one by returning a list of a single element, or
1120 to perform more complex operations like removing only the same
1121 type element from the input list of merging already existing elements
1122 of the same type. Some examples are shown in the examples above
1123 :param position: string name of the position to apply to. This
1124 varies per statement type. IDEs should show the possible values
1125 for each statement type as it's typed with a ``typing.Literal`` per
1126 statement.
1127
1128 .. seealso::
1129
1130 :ref:`examples_syntax_extensions`
1131
1132
1133 """ # noqa: E501
1134
1135 try:
1136 attrname = self._position_map[position]
1137 except KeyError as ke:
1138 raise ValueError(
1139 f"Unknown position {position!r} for {self.__class__} "
1140 f"construct; known positions: "
1141 f"{', '.join(repr(k) for k in self._position_map)}"
1142 ) from ke
1143 else:
1144 ElementList = util.preloaded.sql_elements.ElementList
1145 existing: Optional[ClauseElement] = getattr(self, attrname, None)
1146 if existing is None:
1147 input_seq: Tuple[ClauseElement, ...] = ()
1148 elif isinstance(existing, ElementList):
1149 input_seq = existing.clauses
1150 else:
1151 input_seq = (existing,)
1152
1153 new_seq = apply_fn(input_seq)
1154 assert new_seq, "cannot return empty sequence"
1155 new = new_seq[0] if len(new_seq) == 1 else ElementList(new_seq)
1156 setattr(self, attrname, new)
1157
1158 def _apply_syntax_extension_to_self(
1159 self, extension: SyntaxExtension
1160 ) -> None:
1161 raise NotImplementedError()
1162
1163 def _get_syntax_extensions_as_dict(self) -> Mapping[_L, SyntaxExtension]:
1164 res: Dict[_L, SyntaxExtension] = {}
1165 for name, attr in self._position_map.items():
1166 value = getattr(self, attr)
1167 if value is not None:
1168 res[name] = value
1169 return res
1170
1171 def _set_syntax_extensions(self, **extensions: SyntaxExtension) -> None:
1172 for name, value in extensions.items():
1173 setattr(self, self._position_map[name], value) # type: ignore[index] # noqa: E501
1174
1175
1176class SyntaxExtension(roles.SyntaxExtensionRole):
1177 """Defines a unit that when also extending from :class:`.ClauseElement`
1178 can be applied to SQLAlchemy statements :class:`.Select`,
1179 :class:`_sql.Insert`, :class:`.Update` and :class:`.Delete` making use of
1180 pre-established SQL insertion points within these constructs.
1181
1182 .. versionadded:: 2.1
1183
1184 .. seealso::
1185
1186 :ref:`examples_syntax_extensions`
1187
1188 """
1189
1190 def append_replacing_same_type(
1191 self, existing: Sequence[ClauseElement]
1192 ) -> Sequence[ClauseElement]:
1193 """Utility function that can be used as
1194 :paramref:`_sql.HasSyntaxExtensions.apply_extension_point.apply_fn`
1195 to remove any other element of the same type in existing and appending
1196 ``self`` to the list.
1197
1198 This is equivalent to::
1199
1200 stmt.apply_extension_point(
1201 lambda existing: [
1202 *(e for e in existing if not isinstance(e, ReplaceOfTypeExt)),
1203 self,
1204 ],
1205 "post_criteria",
1206 )
1207
1208 .. seealso::
1209
1210 :ref:`examples_syntax_extensions`
1211
1212 :meth:`_sql.HasSyntaxExtensions.apply_syntax_extension_point`
1213
1214 """ # noqa: E501
1215 cls = type(self)
1216 return [*(e for e in existing if not isinstance(e, cls)), self] # type: ignore[list-item] # noqa: E501
1217
1218 def apply_to_select(self, select_stmt: Select[Unpack[_Ts]]) -> None:
1219 """Apply this :class:`.SyntaxExtension` to a :class:`.Select`"""
1220 raise NotImplementedError(
1221 f"Extension {type(self).__name__} cannot be applied to select"
1222 )
1223
1224 def apply_to_update(self, update_stmt: Update) -> None:
1225 """Apply this :class:`.SyntaxExtension` to an :class:`.Update`"""
1226 raise NotImplementedError(
1227 f"Extension {type(self).__name__} cannot be applied to update"
1228 )
1229
1230 def apply_to_delete(self, delete_stmt: Delete) -> None:
1231 """Apply this :class:`.SyntaxExtension` to a :class:`.Delete`"""
1232 raise NotImplementedError(
1233 f"Extension {type(self).__name__} cannot be applied to delete"
1234 )
1235
1236 def apply_to_insert(self, insert_stmt: Insert) -> None:
1237 """Apply this :class:`.SyntaxExtension` to an :class:`_sql.Insert`"""
1238 raise NotImplementedError(
1239 f"Extension {type(self).__name__} cannot be applied to insert"
1240 )
1241
1242
1243class Executable(roles.StatementRole):
1244 """Mark a :class:`_expression.ClauseElement` as supporting execution.
1245
1246 :class:`.Executable` is a superclass for all "statement" types
1247 of objects, including :func:`select`, :func:`delete`, :func:`update`,
1248 :func:`insert`, :func:`text`.
1249
1250 """
1251
1252 supports_execution: bool = True
1253 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT
1254 _is_default_generator: bool = False
1255 _with_options: Tuple[ExecutableOption, ...] = ()
1256 _compile_state_funcs: Tuple[
1257 Tuple[Callable[[CompileState], None], Any], ...
1258 ] = ()
1259 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]]
1260
1261 _executable_traverse_internals = [
1262 ("_with_options", InternalTraversal.dp_executable_options),
1263 (
1264 "_compile_state_funcs",
1265 ExtendedInternalTraversal.dp_compile_state_funcs,
1266 ),
1267 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
1268 ]
1269
1270 is_select: bool = False
1271 is_from_statement: bool = False
1272 is_update: bool = False
1273 is_insert: bool = False
1274 is_text: bool = False
1275 is_delete: bool = False
1276 is_dml: bool = False
1277
1278 if TYPE_CHECKING:
1279 __visit_name__: str
1280
1281 def _compile_w_cache(
1282 self,
1283 dialect: Dialect,
1284 *,
1285 compiled_cache: Optional[CompiledCacheType],
1286 column_keys: List[str],
1287 for_executemany: bool = False,
1288 schema_translate_map: Optional[SchemaTranslateMapType] = None,
1289 **kw: Any,
1290 ) -> Tuple[
1291 Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats
1292 ]: ...
1293
1294 def _execute_on_connection(
1295 self,
1296 connection: Connection,
1297 distilled_params: _CoreMultiExecuteParams,
1298 execution_options: CoreExecuteOptionsParameter,
1299 ) -> CursorResult[Any]: ...
1300
1301 def _execute_on_scalar(
1302 self,
1303 connection: Connection,
1304 distilled_params: _CoreMultiExecuteParams,
1305 execution_options: CoreExecuteOptionsParameter,
1306 ) -> Any: ...
1307
1308 @util.ro_non_memoized_property
1309 def _all_selected_columns(self) -> _SelectIterable:
1310 raise NotImplementedError()
1311
1312 @property
1313 def _effective_plugin_target(self) -> str:
1314 return self.__visit_name__
1315
1316 @_generative
1317 def options(self, *options: ExecutableOption) -> Self:
1318 """Apply options to this statement.
1319
1320 In the general sense, options are any kind of Python object
1321 that can be interpreted by systems that consume the statement outside
1322 of the regular SQL compiler chain. Specifically, these options are
1323 the ORM level options that apply "eager load" and other loading
1324 behaviors to an ORM query.
1325
1326 For background on specific kinds of options for specific kinds of
1327 statements, refer to the documentation for those option objects.
1328
1329 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to
1330 Core statement objects towards the goal of allowing unified
1331 Core / ORM querying capabilities.
1332
1333 .. seealso::
1334
1335 :ref:`loading_columns` - refers to options specific to the usage
1336 of ORM queries
1337
1338 :ref:`relationship_loader_options` - refers to options specific
1339 to the usage of ORM queries
1340
1341 """
1342 self._with_options += tuple(
1343 coercions.expect(roles.ExecutableOptionRole, opt)
1344 for opt in options
1345 )
1346 return self
1347
1348 @_generative
1349 def _set_compile_options(self, compile_options: CacheableOptions) -> Self:
1350 """Assign the compile options to a new value.
1351
1352 :param compile_options: appropriate CacheableOptions structure
1353
1354 """
1355
1356 self._compile_options = compile_options
1357 return self
1358
1359 @_generative
1360 def _update_compile_options(self, options: CacheableOptions) -> Self:
1361 """update the _compile_options with new keys."""
1362
1363 assert self._compile_options is not None
1364 self._compile_options += options
1365 return self
1366
1367 @_generative
1368 def _add_compile_state_func(
1369 self,
1370 callable_: Callable[[CompileState], None],
1371 cache_args: Any,
1372 ) -> Self:
1373 """Add a compile state function to this statement.
1374
1375 When using the ORM only, these are callable functions that will
1376 be given the CompileState object upon compilation.
1377
1378 A second argument cache_args is required, which will be combined with
1379 the ``__code__`` identity of the function itself in order to produce a
1380 cache key.
1381
1382 """
1383 self._compile_state_funcs += ((callable_, cache_args),)
1384 return self
1385
1386 @overload
1387 def execution_options(
1388 self,
1389 *,
1390 compiled_cache: Optional[CompiledCacheType] = ...,
1391 logging_token: str = ...,
1392 isolation_level: IsolationLevel = ...,
1393 no_parameters: bool = False,
1394 stream_results: bool = False,
1395 max_row_buffer: int = ...,
1396 yield_per: int = ...,
1397 driver_column_names: bool = ...,
1398 insertmanyvalues_page_size: int = ...,
1399 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
1400 populate_existing: bool = False,
1401 autoflush: bool = False,
1402 synchronize_session: SynchronizeSessionArgument = ...,
1403 dml_strategy: DMLStrategyArgument = ...,
1404 render_nulls: bool = ...,
1405 is_delete_using: bool = ...,
1406 is_update_from: bool = ...,
1407 preserve_rowcount: bool = False,
1408 **opt: Any,
1409 ) -> Self: ...
1410
1411 @overload
1412 def execution_options(self, **opt: Any) -> Self: ...
1413
1414 @_generative
1415 def execution_options(self, **kw: Any) -> Self:
1416 """Set non-SQL options for the statement which take effect during
1417 execution.
1418
1419 Execution options can be set at many scopes, including per-statement,
1420 per-connection, or per execution, using methods such as
1421 :meth:`_engine.Connection.execution_options` and parameters which
1422 accept a dictionary of options such as
1423 :paramref:`_engine.Connection.execute.execution_options` and
1424 :paramref:`_orm.Session.execute.execution_options`.
1425
1426 The primary characteristic of an execution option, as opposed to
1427 other kinds of options such as ORM loader options, is that
1428 **execution options never affect the compiled SQL of a query, only
1429 things that affect how the SQL statement itself is invoked or how
1430 results are fetched**. That is, execution options are not part of
1431 what's accommodated by SQL compilation nor are they considered part of
1432 the cached state of a statement.
1433
1434 The :meth:`_sql.Executable.execution_options` method is
1435 :term:`generative`, as
1436 is the case for the method as applied to the :class:`_engine.Engine`
1437 and :class:`_orm.Query` objects, which means when the method is called,
1438 a copy of the object is returned, which applies the given parameters to
1439 that new copy, but leaves the original unchanged::
1440
1441 statement = select(table.c.x, table.c.y)
1442 new_statement = statement.execution_options(my_option=True)
1443
1444 An exception to this behavior is the :class:`_engine.Connection`
1445 object, where the :meth:`_engine.Connection.execution_options` method
1446 is explicitly **not** generative.
1447
1448 The kinds of options that may be passed to
1449 :meth:`_sql.Executable.execution_options` and other related methods and
1450 parameter dictionaries include parameters that are explicitly consumed
1451 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not
1452 defined by SQLAlchemy, which means the methods and/or parameter
1453 dictionaries may be used for user-defined parameters that interact with
1454 custom code, which may access the parameters using methods such as
1455 :meth:`_sql.Executable.get_execution_options` and
1456 :meth:`_engine.Connection.get_execution_options`, or within selected
1457 event hooks using a dedicated ``execution_options`` event parameter
1458 such as
1459 :paramref:`_events.ConnectionEvents.before_execute.execution_options`
1460 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.::
1461
1462 from sqlalchemy import event
1463
1464
1465 @event.listens_for(some_engine, "before_execute")
1466 def _process_opt(conn, statement, multiparams, params, execution_options):
1467 "run a SQL function before invoking a statement"
1468
1469 if execution_options.get("do_special_thing", False):
1470 conn.exec_driver_sql("run_special_function()")
1471
1472 Within the scope of options that are explicitly recognized by
1473 SQLAlchemy, most apply to specific classes of objects and not others.
1474 The most common execution options include:
1475
1476 * :paramref:`_engine.Connection.execution_options.isolation_level` -
1477 sets the isolation level for a connection or a class of connections
1478 via an :class:`_engine.Engine`. This option is accepted only
1479 by :class:`_engine.Connection` or :class:`_engine.Engine`.
1480
1481 * :paramref:`_engine.Connection.execution_options.stream_results` -
1482 indicates results should be fetched using a server side cursor;
1483 this option is accepted by :class:`_engine.Connection`, by the
1484 :paramref:`_engine.Connection.execute.execution_options` parameter
1485 on :meth:`_engine.Connection.execute`, and additionally by
1486 :meth:`_sql.Executable.execution_options` on a SQL statement object,
1487 as well as by ORM constructs like :meth:`_orm.Session.execute`.
1488
1489 * :paramref:`_engine.Connection.execution_options.compiled_cache` -
1490 indicates a dictionary that will serve as the
1491 :ref:`SQL compilation cache <sql_caching>`
1492 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as
1493 well as for ORM methods like :meth:`_orm.Session.execute`.
1494 Can be passed as ``None`` to disable caching for statements.
1495 This option is not accepted by
1496 :meth:`_sql.Executable.execution_options` as it is inadvisable to
1497 carry along a compilation cache within a statement object.
1498
1499 * :paramref:`_engine.Connection.execution_options.schema_translate_map`
1500 - a mapping of schema names used by the
1501 :ref:`Schema Translate Map <schema_translating>` feature, accepted
1502 by :class:`_engine.Connection`, :class:`_engine.Engine`,
1503 :class:`_sql.Executable`, as well as by ORM constructs
1504 like :meth:`_orm.Session.execute`.
1505
1506 .. seealso::
1507
1508 :meth:`_engine.Connection.execution_options`
1509
1510 :paramref:`_engine.Connection.execute.execution_options`
1511
1512 :paramref:`_orm.Session.execute.execution_options`
1513
1514 :ref:`orm_queryguide_execution_options` - documentation on all
1515 ORM-specific execution options
1516
1517 """ # noqa: E501
1518 if "isolation_level" in kw:
1519 raise exc.ArgumentError(
1520 "'isolation_level' execution option may only be specified "
1521 "on Connection.execution_options(), or "
1522 "per-engine using the isolation_level "
1523 "argument to create_engine()."
1524 )
1525 if "compiled_cache" in kw:
1526 raise exc.ArgumentError(
1527 "'compiled_cache' execution option may only be specified "
1528 "on Connection.execution_options(), not per statement."
1529 )
1530 self._execution_options = self._execution_options.union(kw)
1531 return self
1532
1533 def get_execution_options(self) -> _ExecuteOptions:
1534 """Get the non-SQL options which will take effect during execution.
1535
1536 .. seealso::
1537
1538 :meth:`.Executable.execution_options`
1539 """
1540 return self._execution_options
1541
1542
1543class SchemaEventTarget(event.EventTarget):
1544 """Base class for elements that are the targets of :class:`.DDLEvents`
1545 events.
1546
1547 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
1548
1549 """
1550
1551 dispatch: dispatcher[SchemaEventTarget]
1552
1553 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
1554 """Associate with this SchemaEvent's parent object."""
1555
1556 def _set_parent_with_dispatch(
1557 self, parent: SchemaEventTarget, **kw: Any
1558 ) -> None:
1559 self.dispatch.before_parent_attach(self, parent)
1560 self._set_parent(parent, **kw)
1561 self.dispatch.after_parent_attach(self, parent)
1562
1563
1564class SchemaVisitable(SchemaEventTarget, visitors.Visitable):
1565 """Base class for elements that are targets of a :class:`.SchemaVisitor`.
1566
1567 .. versionadded:: 2.0.41
1568
1569 """
1570
1571
1572class SchemaVisitor(ClauseVisitor):
1573 """Define the visiting for ``SchemaItem`` and more
1574 generally ``SchemaVisitable`` objects.
1575
1576 """
1577
1578 __traverse_options__: Dict[str, Any] = {"schema_visitor": True}
1579
1580
1581class _SentinelDefaultCharacterization(Enum):
1582 NONE = "none"
1583 UNKNOWN = "unknown"
1584 CLIENTSIDE = "clientside"
1585 SENTINEL_DEFAULT = "sentinel_default"
1586 SERVERSIDE = "serverside"
1587 IDENTITY = "identity"
1588 SEQUENCE = "sequence"
1589
1590
1591class _SentinelColumnCharacterization(NamedTuple):
1592 columns: Optional[Sequence[Column[Any]]] = None
1593 is_explicit: bool = False
1594 is_autoinc: bool = False
1595 default_characterization: _SentinelDefaultCharacterization = (
1596 _SentinelDefaultCharacterization.NONE
1597 )
1598
1599
1600_COLKEY = TypeVar("_COLKEY", Union[None, str], str)
1601
1602_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True)
1603_COL = TypeVar("_COL", bound="ColumnElement[Any]")
1604
1605
1606class _ColumnMetrics(Generic[_COL_co]):
1607 __slots__ = ("column",)
1608
1609 column: _COL_co
1610
1611 def __init__(
1612 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co
1613 ) -> None:
1614 self.column = col
1615
1616 # proxy_index being non-empty means it was initialized.
1617 # so we need to update it
1618 pi = collection._proxy_index
1619 if pi:
1620 for eps_col in col._expanded_proxy_set:
1621 pi[eps_col].add(self)
1622
1623 def get_expanded_proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
1624 return self.column._expanded_proxy_set
1625
1626 def dispose(self, collection: ColumnCollection[_COLKEY, _COL_co]) -> None:
1627 pi = collection._proxy_index
1628 if not pi:
1629 return
1630 for col in self.column._expanded_proxy_set:
1631 colset = pi.get(col, None)
1632 if colset:
1633 colset.discard(self)
1634 if colset is not None and not colset:
1635 del pi[col]
1636
1637 def embedded(
1638 self,
1639 target_set: Union[
1640 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]]
1641 ],
1642 ) -> bool:
1643 expanded_proxy_set = self.column._expanded_proxy_set
1644 for t in target_set.difference(expanded_proxy_set):
1645 if not expanded_proxy_set.intersection(_expand_cloned([t])):
1646 return False
1647 return True
1648
1649
1650class ColumnCollection(Generic[_COLKEY, _COL_co]):
1651 """Collection of :class:`_expression.ColumnElement` instances,
1652 typically for
1653 :class:`_sql.FromClause` objects.
1654
1655 The :class:`_sql.ColumnCollection` object is most commonly available
1656 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection
1657 on the :class:`_schema.Table` object, introduced at
1658 :ref:`metadata_tables_and_columns`.
1659
1660 The :class:`_expression.ColumnCollection` has both mapping- and sequence-
1661 like behaviors. A :class:`_expression.ColumnCollection` usually stores
1662 :class:`_schema.Column` objects, which are then accessible both via mapping
1663 style access as well as attribute access style.
1664
1665 To access :class:`_schema.Column` objects using ordinary attribute-style
1666 access, specify the name like any other object attribute, such as below
1667 a column named ``employee_name`` is accessed::
1668
1669 >>> employee_table.c.employee_name
1670
1671 To access columns that have names with special characters or spaces,
1672 index-style access is used, such as below which illustrates a column named
1673 ``employee ' payment`` is accessed::
1674
1675 >>> employee_table.c["employee ' payment"]
1676
1677 As the :class:`_sql.ColumnCollection` object provides a Python dictionary
1678 interface, common dictionary method names like
1679 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`,
1680 and :meth:`_sql.ColumnCollection.items` are available, which means that
1681 database columns that are keyed under these names also need to use indexed
1682 access::
1683
1684 >>> employee_table.c["values"]
1685
1686
1687 The name for which a :class:`_schema.Column` would be present is normally
1688 that of the :paramref:`_schema.Column.key` parameter. In some contexts,
1689 such as a :class:`_sql.Select` object that uses a label style set
1690 using the :meth:`_sql.Select.set_label_style` method, a column of a certain
1691 key may instead be represented under a particular label name such
1692 as ``tablename_columnname``::
1693
1694 >>> from sqlalchemy import select, column, table
1695 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
1696 >>> t = table("t", column("c"))
1697 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
1698 >>> subq = stmt.subquery()
1699 >>> subq.c.t_c
1700 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c>
1701
1702 :class:`.ColumnCollection` also indexes the columns in order and allows
1703 them to be accessible by their integer position::
1704
1705 >>> cc[0]
1706 Column('x', Integer(), table=None)
1707 >>> cc[1]
1708 Column('y', Integer(), table=None)
1709
1710 .. versionadded:: 1.4 :class:`_expression.ColumnCollection`
1711 allows integer-based
1712 index access to the collection.
1713
1714 Iterating the collection yields the column expressions in order::
1715
1716 >>> list(cc)
1717 [Column('x', Integer(), table=None),
1718 Column('y', Integer(), table=None)]
1719
1720 The base :class:`_expression.ColumnCollection` object can store
1721 duplicates, which can
1722 mean either two columns with the same key, in which case the column
1723 returned by key access is **arbitrary**::
1724
1725 >>> x1, x2 = Column("x", Integer), Column("x", Integer)
1726 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)])
1727 >>> list(cc)
1728 [Column('x', Integer(), table=None),
1729 Column('x', Integer(), table=None)]
1730 >>> cc["x"] is x1
1731 False
1732 >>> cc["x"] is x2
1733 True
1734
1735 Or it can also mean the same column multiple times. These cases are
1736 supported as :class:`_expression.ColumnCollection`
1737 is used to represent the columns in
1738 a SELECT statement which may include duplicates.
1739
1740 A special subclass :class:`.DedupeColumnCollection` exists which instead
1741 maintains SQLAlchemy's older behavior of not allowing duplicates; this
1742 collection is used for schema level objects like :class:`_schema.Table`
1743 and
1744 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The
1745 :class:`.DedupeColumnCollection` class also has additional mutation methods
1746 as the schema constructs have more use cases that require removal and
1747 replacement of columns.
1748
1749 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection`
1750 now stores duplicate
1751 column keys as well as the same column in multiple positions. The
1752 :class:`.DedupeColumnCollection` class is added to maintain the
1753 former behavior in those cases where deduplication as well as
1754 additional replace/remove operations are needed.
1755
1756
1757 """
1758
1759 __slots__ = ("_collection", "_index", "_colset", "_proxy_index")
1760
1761 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]]
1762 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
1763 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]]
1764 _colset: Set[_COL_co]
1765
1766 def __init__(
1767 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None
1768 ):
1769 object.__setattr__(self, "_colset", set())
1770 object.__setattr__(self, "_index", {})
1771 object.__setattr__(
1772 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1773 )
1774 object.__setattr__(self, "_collection", [])
1775 if columns:
1776 self._initial_populate(columns)
1777
1778 @util.preload_module("sqlalchemy.sql.elements")
1779 def __clause_element__(self) -> ClauseList:
1780 elements = util.preloaded.sql_elements
1781
1782 return elements.ClauseList(
1783 _literal_as_text_role=roles.ColumnsClauseRole,
1784 group=False,
1785 *self._all_columns,
1786 )
1787
1788 def _initial_populate(
1789 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1790 ) -> None:
1791 self._populate_separate_keys(iter_)
1792
1793 @property
1794 def _all_columns(self) -> List[_COL_co]:
1795 return [col for (_, col, _) in self._collection]
1796
1797 def keys(self) -> List[_COLKEY]:
1798 """Return a sequence of string key names for all columns in this
1799 collection."""
1800 return [k for (k, _, _) in self._collection]
1801
1802 def values(self) -> List[_COL_co]:
1803 """Return a sequence of :class:`_sql.ColumnClause` or
1804 :class:`_schema.Column` objects for all columns in this
1805 collection."""
1806 return [col for (_, col, _) in self._collection]
1807
1808 def items(self) -> List[Tuple[_COLKEY, _COL_co]]:
1809 """Return a sequence of (key, column) tuples for all columns in this
1810 collection each consisting of a string key name and a
1811 :class:`_sql.ColumnClause` or
1812 :class:`_schema.Column` object.
1813 """
1814
1815 return [(k, col) for (k, col, _) in self._collection]
1816
1817 def __bool__(self) -> bool:
1818 return bool(self._collection)
1819
1820 def __len__(self) -> int:
1821 return len(self._collection)
1822
1823 def __iter__(self) -> Iterator[_COL_co]:
1824 # turn to a list first to maintain over a course of changes
1825 return iter([col for _, col, _ in self._collection])
1826
1827 @overload
1828 def __getitem__(self, key: Union[str, int]) -> _COL_co: ...
1829
1830 @overload
1831 def __getitem__(
1832 self, key: Tuple[Union[str, int], ...]
1833 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1834
1835 @overload
1836 def __getitem__(
1837 self, key: slice
1838 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1839
1840 def __getitem__(
1841 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]]
1842 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]:
1843 try:
1844 if isinstance(key, (tuple, slice)):
1845 if isinstance(key, slice):
1846 cols = (
1847 (sub_key, col)
1848 for (sub_key, col, _) in self._collection[key]
1849 )
1850 else:
1851 cols = (self._index[sub_key] for sub_key in key)
1852
1853 return ColumnCollection(cols).as_readonly()
1854 else:
1855 return self._index[key][1]
1856 except KeyError as err:
1857 if isinstance(err.args[0], int):
1858 raise IndexError(err.args[0]) from err
1859 else:
1860 raise
1861
1862 def __getattr__(self, key: str) -> _COL_co:
1863 try:
1864 return self._index[key][1]
1865 except KeyError as err:
1866 raise AttributeError(key) from err
1867
1868 def __contains__(self, key: str) -> bool:
1869 if key not in self._index:
1870 if not isinstance(key, str):
1871 raise exc.ArgumentError(
1872 "__contains__ requires a string argument"
1873 )
1874 return False
1875 else:
1876 return True
1877
1878 def compare(self, other: ColumnCollection[_COLKEY, _COL_co]) -> bool:
1879 """Compare this :class:`_expression.ColumnCollection` to another
1880 based on the names of the keys"""
1881
1882 for l, r in zip_longest(self, other):
1883 if l is not r:
1884 return False
1885 else:
1886 return True
1887
1888 def __eq__(self, other: Any) -> bool:
1889 return self.compare(other)
1890
1891 @overload
1892 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ...
1893
1894 @overload
1895 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ...
1896
1897 def get(
1898 self, key: str, default: Optional[_COL] = None
1899 ) -> Optional[Union[_COL_co, _COL]]:
1900 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object
1901 based on a string key name from this
1902 :class:`_expression.ColumnCollection`."""
1903
1904 if key in self._index:
1905 return self._index[key][1]
1906 else:
1907 return default
1908
1909 def __str__(self) -> str:
1910 return "%s(%s)" % (
1911 self.__class__.__name__,
1912 ", ".join(str(c) for c in self),
1913 )
1914
1915 def __setitem__(self, key: str, value: Any) -> NoReturn:
1916 raise NotImplementedError()
1917
1918 def __delitem__(self, key: str) -> NoReturn:
1919 raise NotImplementedError()
1920
1921 def __setattr__(self, key: str, obj: Any) -> NoReturn:
1922 raise NotImplementedError()
1923
1924 def clear(self) -> NoReturn:
1925 """Dictionary clear() is not implemented for
1926 :class:`_sql.ColumnCollection`."""
1927 raise NotImplementedError()
1928
1929 def remove(self, column: Any) -> NoReturn:
1930 raise NotImplementedError()
1931
1932 def update(self, iter_: Any) -> NoReturn:
1933 """Dictionary update() is not implemented for
1934 :class:`_sql.ColumnCollection`."""
1935 raise NotImplementedError()
1936
1937 # https://github.com/python/mypy/issues/4266
1938 __hash__: Optional[int] = None # type: ignore
1939
1940 def _populate_separate_keys(
1941 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1942 ) -> None:
1943 """populate from an iterator of (key, column)"""
1944
1945 self._collection[:] = collection = [
1946 (k, c, _ColumnMetrics(self, c)) for k, c in iter_
1947 ]
1948 self._colset.update(c._deannotate() for _, c, _ in collection)
1949 self._index.update(
1950 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)}
1951 )
1952 self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
1953
1954 def add(
1955 self,
1956 column: ColumnElement[Any],
1957 key: Optional[_COLKEY] = None,
1958 ) -> None:
1959 """Add a column to this :class:`_sql.ColumnCollection`.
1960
1961 .. note::
1962
1963 This method is **not normally used by user-facing code**, as the
1964 :class:`_sql.ColumnCollection` is usually part of an existing
1965 object such as a :class:`_schema.Table`. To add a
1966 :class:`_schema.Column` to an existing :class:`_schema.Table`
1967 object, use the :meth:`_schema.Table.append_column` method.
1968
1969 """
1970 colkey: _COLKEY
1971
1972 if key is None:
1973 colkey = column.key # type: ignore
1974 else:
1975 colkey = key
1976
1977 l = len(self._collection)
1978
1979 # don't really know how this part is supposed to work w/ the
1980 # covariant thing
1981
1982 _column = cast(_COL_co, column)
1983
1984 self._collection.append(
1985 (colkey, _column, _ColumnMetrics(self, _column))
1986 )
1987 self._colset.add(_column._deannotate())
1988
1989 self._index[l] = (colkey, _column)
1990 if colkey not in self._index:
1991 self._index[colkey] = (colkey, _column)
1992
1993 def __getstate__(self) -> Dict[str, Any]:
1994 return {
1995 "_collection": [(k, c) for k, c, _ in self._collection],
1996 "_index": self._index,
1997 }
1998
1999 def __setstate__(self, state: Dict[str, Any]) -> None:
2000 object.__setattr__(self, "_index", state["_index"])
2001 object.__setattr__(
2002 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
2003 )
2004 object.__setattr__(
2005 self,
2006 "_collection",
2007 [
2008 (k, c, _ColumnMetrics(self, c))
2009 for (k, c) in state["_collection"]
2010 ],
2011 )
2012 object.__setattr__(
2013 self, "_colset", {col for k, col, _ in self._collection}
2014 )
2015
2016 def contains_column(self, col: ColumnElement[Any]) -> bool:
2017 """Checks if a column object exists in this collection"""
2018 if col not in self._colset:
2019 if isinstance(col, str):
2020 raise exc.ArgumentError(
2021 "contains_column cannot be used with string arguments. "
2022 "Use ``col_name in table.c`` instead."
2023 )
2024 return False
2025 else:
2026 return True
2027
2028 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
2029 """Return a "read only" form of this
2030 :class:`_sql.ColumnCollection`."""
2031
2032 return ReadOnlyColumnCollection(self)
2033
2034 def _init_proxy_index(self) -> None:
2035 """populate the "proxy index", if empty.
2036
2037 proxy index is added in 2.0 to provide more efficient operation
2038 for the corresponding_column() method.
2039
2040 For reasons of both time to construct new .c collections as well as
2041 memory conservation for large numbers of large .c collections, the
2042 proxy_index is only filled if corresponding_column() is called. once
2043 filled it stays that way, and new _ColumnMetrics objects created after
2044 that point will populate it with new data. Note this case would be
2045 unusual, if not nonexistent, as it means a .c collection is being
2046 mutated after corresponding_column() were used, however it is tested in
2047 test/base/test_utils.py.
2048
2049 """
2050 pi = self._proxy_index
2051 if pi:
2052 return
2053
2054 for _, _, metrics in self._collection:
2055 eps = metrics.column._expanded_proxy_set
2056
2057 for eps_col in eps:
2058 pi[eps_col].add(metrics)
2059
2060 def corresponding_column(
2061 self, column: _COL, require_embedded: bool = False
2062 ) -> Optional[Union[_COL, _COL_co]]:
2063 """Given a :class:`_expression.ColumnElement`, return the exported
2064 :class:`_expression.ColumnElement` object from this
2065 :class:`_expression.ColumnCollection`
2066 which corresponds to that original :class:`_expression.ColumnElement`
2067 via a common
2068 ancestor column.
2069
2070 :param column: the target :class:`_expression.ColumnElement`
2071 to be matched.
2072
2073 :param require_embedded: only return corresponding columns for
2074 the given :class:`_expression.ColumnElement`, if the given
2075 :class:`_expression.ColumnElement`
2076 is actually present within a sub-element
2077 of this :class:`_expression.Selectable`.
2078 Normally the column will match if
2079 it merely shares a common ancestor with one of the exported
2080 columns of this :class:`_expression.Selectable`.
2081
2082 .. seealso::
2083
2084 :meth:`_expression.Selectable.corresponding_column`
2085 - invokes this method
2086 against the collection returned by
2087 :attr:`_expression.Selectable.exported_columns`.
2088
2089 .. versionchanged:: 1.4 the implementation for ``corresponding_column``
2090 was moved onto the :class:`_expression.ColumnCollection` itself.
2091
2092 """
2093 # TODO: cython candidate
2094
2095 # don't dig around if the column is locally present
2096 if column in self._colset:
2097 return column
2098
2099 selected_intersection, selected_metrics = None, None
2100 target_set = column.proxy_set
2101
2102 pi = self._proxy_index
2103 if not pi:
2104 self._init_proxy_index()
2105
2106 for current_metrics in (
2107 mm for ts in target_set if ts in pi for mm in pi[ts]
2108 ):
2109 if not require_embedded or current_metrics.embedded(target_set):
2110 if selected_metrics is None:
2111 # no corresponding column yet, pick this one.
2112 selected_metrics = current_metrics
2113 continue
2114
2115 current_intersection = target_set.intersection(
2116 current_metrics.column._expanded_proxy_set
2117 )
2118 if selected_intersection is None:
2119 selected_intersection = target_set.intersection(
2120 selected_metrics.column._expanded_proxy_set
2121 )
2122
2123 if len(current_intersection) > len(selected_intersection):
2124 # 'current' has a larger field of correspondence than
2125 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x
2126 # matches a1.c.x->table.c.x better than
2127 # selectable.c.x->table.c.x does.
2128
2129 selected_metrics = current_metrics
2130 selected_intersection = current_intersection
2131 elif current_intersection == selected_intersection:
2132 # they have the same field of correspondence. see
2133 # which proxy_set has fewer columns in it, which
2134 # indicates a closer relationship with the root
2135 # column. Also take into account the "weight"
2136 # attribute which CompoundSelect() uses to give
2137 # higher precedence to columns based on vertical
2138 # position in the compound statement, and discard
2139 # columns that have no reference to the target
2140 # column (also occurs with CompoundSelect)
2141
2142 selected_col_distance = sum(
2143 [
2144 sc._annotations.get("weight", 1)
2145 for sc in (
2146 selected_metrics.column._uncached_proxy_list()
2147 )
2148 if sc.shares_lineage(column)
2149 ],
2150 )
2151 current_col_distance = sum(
2152 [
2153 sc._annotations.get("weight", 1)
2154 for sc in (
2155 current_metrics.column._uncached_proxy_list()
2156 )
2157 if sc.shares_lineage(column)
2158 ],
2159 )
2160 if current_col_distance < selected_col_distance:
2161 selected_metrics = current_metrics
2162 selected_intersection = current_intersection
2163
2164 return selected_metrics.column if selected_metrics else None
2165
2166
2167_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]")
2168
2169
2170class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
2171 """A :class:`_expression.ColumnCollection`
2172 that maintains deduplicating behavior.
2173
2174 This is useful by schema level objects such as :class:`_schema.Table` and
2175 :class:`.PrimaryKeyConstraint`. The collection includes more
2176 sophisticated mutator methods as well to suit schema objects which
2177 require mutable column collections.
2178
2179 .. versionadded:: 1.4
2180
2181 """
2182
2183 def add( # type: ignore[override]
2184 self,
2185 column: _NAMEDCOL,
2186 key: Optional[str] = None,
2187 *,
2188 index: Optional[int] = None,
2189 ) -> None:
2190 if key is not None and column.key != key:
2191 raise exc.ArgumentError(
2192 "DedupeColumnCollection requires columns be under "
2193 "the same key as their .key"
2194 )
2195 key = column.key
2196
2197 if key is None:
2198 raise exc.ArgumentError(
2199 "Can't add unnamed column to column collection"
2200 )
2201
2202 if key in self._index:
2203 existing = self._index[key][1]
2204
2205 if existing is column:
2206 return
2207
2208 self.replace(column, index=index)
2209
2210 # pop out memoized proxy_set as this
2211 # operation may very well be occurring
2212 # in a _make_proxy operation
2213 util.memoized_property.reset(column, "proxy_set")
2214 else:
2215 self._append_new_column(key, column, index=index)
2216
2217 def _append_new_column(
2218 self, key: str, named_column: _NAMEDCOL, *, index: Optional[int] = None
2219 ) -> None:
2220 collection_length = len(self._collection)
2221
2222 if index is None:
2223 l = collection_length
2224 else:
2225 if index < 0:
2226 index = max(0, collection_length + index)
2227 l = index
2228
2229 if index is None:
2230 self._collection.append(
2231 (key, named_column, _ColumnMetrics(self, named_column))
2232 )
2233 else:
2234 self._collection.insert(
2235 index, (key, named_column, _ColumnMetrics(self, named_column))
2236 )
2237
2238 self._colset.add(named_column._deannotate())
2239
2240 if index is not None:
2241 for idx in reversed(range(index, collection_length)):
2242 self._index[idx + 1] = self._index[idx]
2243
2244 self._index[l] = (key, named_column)
2245 self._index[key] = (key, named_column)
2246
2247 def _populate_separate_keys(
2248 self, iter_: Iterable[Tuple[str, _NAMEDCOL]]
2249 ) -> None:
2250 """populate from an iterator of (key, column)"""
2251 cols = list(iter_)
2252
2253 replace_col = []
2254 for k, col in cols:
2255 if col.key != k:
2256 raise exc.ArgumentError(
2257 "DedupeColumnCollection requires columns be under "
2258 "the same key as their .key"
2259 )
2260 if col.name in self._index and col.key != col.name:
2261 replace_col.append(col)
2262 elif col.key in self._index:
2263 replace_col.append(col)
2264 else:
2265 self._index[k] = (k, col)
2266 self._collection.append((k, col, _ColumnMetrics(self, col)))
2267 self._colset.update(c._deannotate() for (k, c, _) in self._collection)
2268
2269 self._index.update(
2270 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection)
2271 )
2272 for col in replace_col:
2273 self.replace(col)
2274
2275 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None:
2276 self._populate_separate_keys((col.key, col) for col in iter_)
2277
2278 def remove(self, column: _NAMEDCOL) -> None: # type: ignore[override]
2279 if column not in self._colset:
2280 raise ValueError(
2281 "Can't remove column %r; column is not in this collection"
2282 % column
2283 )
2284 del self._index[column.key]
2285 self._colset.remove(column)
2286 self._collection[:] = [
2287 (k, c, metrics)
2288 for (k, c, metrics) in self._collection
2289 if c is not column
2290 ]
2291 for metrics in self._proxy_index.get(column, ()):
2292 metrics.dispose(self)
2293
2294 self._index.update(
2295 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2296 )
2297 # delete higher index
2298 del self._index[len(self._collection)]
2299
2300 def replace(
2301 self,
2302 column: _NAMEDCOL,
2303 *,
2304 extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
2305 index: Optional[int] = None,
2306 ) -> None:
2307 """add the given column to this collection, removing unaliased
2308 versions of this column as well as existing columns with the
2309 same key.
2310
2311 e.g.::
2312
2313 t = Table("sometable", metadata, Column("col1", Integer))
2314 t.columns.replace(Column("col1", Integer, key="columnone"))
2315
2316 will remove the original 'col1' from the collection, and add
2317 the new column under the name 'columnname'.
2318
2319 Used by schema.Column to override columns during table reflection.
2320
2321 """
2322
2323 if extra_remove:
2324 remove_col = set(extra_remove)
2325 else:
2326 remove_col = set()
2327 # remove up to two columns based on matches of name as well as key
2328 if column.name in self._index and column.key != column.name:
2329 other = self._index[column.name][1]
2330 if other.name == other.key:
2331 remove_col.add(other)
2332
2333 if column.key in self._index:
2334 remove_col.add(self._index[column.key][1])
2335
2336 if not remove_col:
2337 self._append_new_column(column.key, column, index=index)
2338 return
2339 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
2340 replace_index = None
2341
2342 for idx, (k, col, metrics) in enumerate(self._collection):
2343 if col in remove_col:
2344 if replace_index is None:
2345 replace_index = idx
2346 new_cols.append(
2347 (column.key, column, _ColumnMetrics(self, column))
2348 )
2349 else:
2350 new_cols.append((k, col, metrics))
2351
2352 if remove_col:
2353 self._colset.difference_update(remove_col)
2354
2355 for rc in remove_col:
2356 for metrics in self._proxy_index.get(rc, ()):
2357 metrics.dispose(self)
2358
2359 if replace_index is None:
2360 if index is not None:
2361 new_cols.insert(
2362 index, (column.key, column, _ColumnMetrics(self, column))
2363 )
2364
2365 else:
2366 new_cols.append(
2367 (column.key, column, _ColumnMetrics(self, column))
2368 )
2369 elif index is not None:
2370 to_move = new_cols[replace_index]
2371 effective_positive_index = (
2372 index if index >= 0 else max(0, len(new_cols) + index)
2373 )
2374 new_cols.insert(index, to_move)
2375 if replace_index > effective_positive_index:
2376 del new_cols[replace_index + 1]
2377 else:
2378 del new_cols[replace_index]
2379
2380 self._colset.add(column._deannotate())
2381 self._collection[:] = new_cols
2382
2383 self._index.clear()
2384
2385 self._index.update(
2386 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2387 )
2388 self._index.update({k: (k, col) for (k, col, _) in self._collection})
2389
2390
2391class ReadOnlyColumnCollection(
2392 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co]
2393):
2394 __slots__ = ("_parent",)
2395
2396 def __init__(self, collection: ColumnCollection[_COLKEY, _COL_co]):
2397 object.__setattr__(self, "_parent", collection)
2398 object.__setattr__(self, "_colset", collection._colset)
2399 object.__setattr__(self, "_index", collection._index)
2400 object.__setattr__(self, "_collection", collection._collection)
2401 object.__setattr__(self, "_proxy_index", collection._proxy_index)
2402
2403 def __getstate__(self) -> Dict[str, _COL_co]:
2404 return {"_parent": self._parent}
2405
2406 def __setstate__(self, state: Dict[str, Any]) -> None:
2407 parent = state["_parent"]
2408 self.__init__(parent) # type: ignore
2409
2410 def add(self, column: Any, key: Any = ...) -> Any:
2411 self._readonly()
2412
2413 def extend(self, elements: Any) -> NoReturn:
2414 self._readonly()
2415
2416 def remove(self, item: Any) -> NoReturn:
2417 self._readonly()
2418
2419
2420class ColumnSet(util.OrderedSet["ColumnClause[Any]"]):
2421 def contains_column(self, col: ColumnClause[Any]) -> bool:
2422 return col in self
2423
2424 def extend(self, cols: Iterable[Any]) -> None:
2425 for col in cols:
2426 self.add(col)
2427
2428 def __eq__(self, other):
2429 l = []
2430 for c in other:
2431 for local in self:
2432 if c.shares_lineage(local):
2433 l.append(c == local)
2434 return elements.and_(*l)
2435
2436 def __hash__(self) -> int: # type: ignore[override]
2437 return hash(tuple(x for x in self))
2438
2439
2440def _entity_namespace(
2441 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2442) -> _EntityNamespace:
2443 """Return the nearest .entity_namespace for the given entity.
2444
2445 If not immediately available, does an iterate to find a sub-element
2446 that has one, if any.
2447
2448 """
2449 try:
2450 return cast(_HasEntityNamespace, entity).entity_namespace
2451 except AttributeError:
2452 for elem in visitors.iterate(cast(ExternallyTraversible, entity)):
2453 if _is_has_entity_namespace(elem):
2454 return elem.entity_namespace
2455 else:
2456 raise
2457
2458
2459@overload
2460def _entity_namespace_key(
2461 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2462 key: str,
2463) -> SQLCoreOperations[Any]: ...
2464
2465
2466@overload
2467def _entity_namespace_key(
2468 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2469 key: str,
2470 default: _NoArg,
2471) -> SQLCoreOperations[Any]: ...
2472
2473
2474@overload
2475def _entity_namespace_key(
2476 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2477 key: str,
2478 default: _T,
2479) -> Union[SQLCoreOperations[Any], _T]: ...
2480
2481
2482def _entity_namespace_key(
2483 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2484 key: str,
2485 default: Union[SQLCoreOperations[Any], _T, _NoArg] = NO_ARG,
2486) -> Union[SQLCoreOperations[Any], _T]:
2487 """Return an entry from an entity_namespace.
2488
2489
2490 Raises :class:`_exc.InvalidRequestError` rather than attribute error
2491 on not found.
2492
2493 """
2494
2495 try:
2496 ns = _entity_namespace(entity)
2497 if default is not NO_ARG:
2498 return getattr(ns, key, default)
2499 else:
2500 return getattr(ns, key) # type: ignore
2501 except AttributeError as err:
2502 raise exc.InvalidRequestError(
2503 'Entity namespace for "%s" has no property "%s"' % (entity, key)
2504 ) from err