1# sql/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Foundational utilities common to many sql modules."""
10
11
12from __future__ import annotations
13
14import collections
15from enum import Enum
16import itertools
17from itertools import zip_longest
18import operator
19import re
20from typing import Any
21from typing import Callable
22from typing import cast
23from typing import Dict
24from typing import FrozenSet
25from typing import Generic
26from typing import Iterable
27from typing import Iterator
28from typing import List
29from typing import Mapping
30from typing import MutableMapping
31from typing import NamedTuple
32from typing import NoReturn
33from typing import Optional
34from typing import overload
35from typing import Protocol
36from typing import Sequence
37from typing import Set
38from typing import Tuple
39from typing import Type
40from typing import TYPE_CHECKING
41from typing import TypeVar
42from typing import Union
43
44from . import roles
45from . import visitors
46from .cache_key import HasCacheKey # noqa
47from .cache_key import MemoizedHasCacheKey # noqa
48from .traversals import HasCopyInternals # noqa
49from .visitors import ClauseVisitor
50from .visitors import ExtendedInternalTraversal
51from .visitors import ExternallyTraversible
52from .visitors import InternalTraversal
53from .. import event
54from .. import exc
55from .. import util
56from ..util import HasMemoized as HasMemoized
57from ..util import hybridmethod
58from ..util.typing import Self
59from ..util.typing import TypeGuard
60from ..util.typing import TypeVarTuple
61from ..util.typing import Unpack
62
63if TYPE_CHECKING:
64 from . import coercions
65 from . import elements
66 from . import type_api
67 from ._orm_types import DMLStrategyArgument
68 from ._orm_types import SynchronizeSessionArgument
69 from ._typing import _CLE
70 from .compiler import SQLCompiler
71 from .dml import Delete
72 from .dml import Insert
73 from .dml import Update
74 from .elements import BindParameter
75 from .elements import ClauseElement
76 from .elements import ClauseList
77 from .elements import ColumnClause # noqa
78 from .elements import ColumnElement
79 from .elements import NamedColumn
80 from .elements import SQLCoreOperations
81 from .elements import TextClause
82 from .schema import Column
83 from .schema import DefaultGenerator
84 from .selectable import _JoinTargetElement
85 from .selectable import _SelectIterable
86 from .selectable import FromClause
87 from .selectable import Select
88 from ..engine import Connection
89 from ..engine import CursorResult
90 from ..engine.interfaces import _CoreMultiExecuteParams
91 from ..engine.interfaces import _ExecuteOptions
92 from ..engine.interfaces import _ImmutableExecuteOptions
93 from ..engine.interfaces import CacheStats
94 from ..engine.interfaces import Compiled
95 from ..engine.interfaces import CompiledCacheType
96 from ..engine.interfaces import CoreExecuteOptionsParameter
97 from ..engine.interfaces import Dialect
98 from ..engine.interfaces import IsolationLevel
99 from ..engine.interfaces import SchemaTranslateMapType
100 from ..event import dispatcher
101
102if not TYPE_CHECKING:
103 coercions = None # noqa
104 elements = None # noqa
105 type_api = None # noqa
106
107
108_Ts = TypeVarTuple("_Ts")
109
110
111class _NoArg(Enum):
112 NO_ARG = 0
113
114 def __repr__(self):
115 return f"_NoArg.{self.name}"
116
117
118NO_ARG = _NoArg.NO_ARG
119
120
121class _NoneName(Enum):
122 NONE_NAME = 0
123 """indicate a 'deferred' name that was ultimately the value None."""
124
125
126_NONE_NAME = _NoneName.NONE_NAME
127
128_T = TypeVar("_T", bound=Any)
129
130_Fn = TypeVar("_Fn", bound=Callable[..., Any])
131
132_AmbiguousTableNameMap = MutableMapping[str, str]
133
134
135class _DefaultDescriptionTuple(NamedTuple):
136 arg: Any
137 is_scalar: Optional[bool]
138 is_callable: Optional[bool]
139 is_sentinel: Optional[bool]
140
141 @classmethod
142 def _from_column_default(
143 cls, default: Optional[DefaultGenerator]
144 ) -> _DefaultDescriptionTuple:
145 return (
146 _DefaultDescriptionTuple(
147 default.arg, # type: ignore
148 default.is_scalar,
149 default.is_callable,
150 default.is_sentinel,
151 )
152 if default
153 and (
154 default.has_arg
155 or (not default.for_update and default.is_sentinel)
156 )
157 else _DefaultDescriptionTuple(None, None, None, None)
158 )
159
160
161_never_select_column = operator.attrgetter("_omit_from_statements")
162
163
164class _EntityNamespace(Protocol):
165 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ...
166
167
168class _HasEntityNamespace(Protocol):
169 @util.ro_non_memoized_property
170 def entity_namespace(self) -> _EntityNamespace: ...
171
172
173def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]:
174 return hasattr(element, "entity_namespace")
175
176
177# Remove when https://github.com/python/mypy/issues/14640 will be fixed
178_Self = TypeVar("_Self", bound=Any)
179
180
181class Immutable:
182 """mark a ClauseElement as 'immutable' when expressions are cloned.
183
184 "immutable" objects refers to the "mutability" of an object in the
185 context of SQL DQL and DML generation. Such as, in DQL, one can
186 compose a SELECT or subquery of varied forms, but one cannot modify
187 the structure of a specific table or column within DQL.
188 :class:`.Immutable` is mostly intended to follow this concept, and as
189 such the primary "immutable" objects are :class:`.ColumnClause`,
190 :class:`.Column`, :class:`.TableClause`, :class:`.Table`.
191
192 """
193
194 __slots__ = ()
195
196 _is_immutable = True
197
198 def unique_params(self, *optionaldict, **kwargs):
199 raise NotImplementedError("Immutable objects do not support copying")
200
201 def params(self, *optionaldict, **kwargs):
202 raise NotImplementedError("Immutable objects do not support copying")
203
204 def _clone(self: _Self, **kw: Any) -> _Self:
205 return self
206
207 def _copy_internals(
208 self, *, omit_attrs: Iterable[str] = (), **kw: Any
209 ) -> None:
210 pass
211
212
213class SingletonConstant(Immutable):
214 """Represent SQL constants like NULL, TRUE, FALSE"""
215
216 _is_singleton_constant = True
217
218 _singleton: SingletonConstant
219
220 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T:
221 return cast(_T, cls._singleton)
222
223 @util.non_memoized_property
224 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
225 raise NotImplementedError()
226
227 @classmethod
228 def _create_singleton(cls):
229 obj = object.__new__(cls)
230 obj.__init__() # type: ignore
231
232 # for a long time this was an empty frozenset, meaning
233 # a SingletonConstant would never be a "corresponding column" in
234 # a statement. This referred to #6259. However, in #7154 we see
235 # that we do in fact need "correspondence" to work when matching cols
236 # in result sets, so the non-correspondence was moved to a more
237 # specific level when we are actually adapting expressions for SQL
238 # render only.
239 obj.proxy_set = frozenset([obj])
240 cls._singleton = obj
241
242
243def _from_objects(
244 *elements: Union[
245 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement
246 ]
247) -> Iterator[FromClause]:
248 return itertools.chain.from_iterable(
249 [element._from_objects for element in elements]
250 )
251
252
253def _select_iterables(
254 elements: Iterable[roles.ColumnsClauseRole],
255) -> _SelectIterable:
256 """expand tables into individual columns in the
257 given list of column expressions.
258
259 """
260 return itertools.chain.from_iterable(
261 [c._select_iterable for c in elements]
262 )
263
264
265_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType")
266
267
268class _GenerativeType(Protocol):
269 def _generate(self) -> Self: ...
270
271
272def _generative(fn: _Fn) -> _Fn:
273 """non-caching _generative() decorator.
274
275 This is basically the legacy decorator that copies the object and
276 runs a method on the new copy.
277
278 """
279
280 @util.decorator
281 def _generative(
282 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any
283 ) -> _SelfGenerativeType:
284 """Mark a method as generative."""
285
286 self = self._generate()
287 x = fn(self, *args, **kw)
288 assert x is self, "generative methods must return self"
289 return self
290
291 decorated = _generative(fn)
292 decorated.non_generative = fn # type: ignore
293 return decorated
294
295
296def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]:
297 msgs = kw.pop("msgs", {})
298
299 defaults = kw.pop("defaults", {})
300
301 getters = [
302 (name, operator.attrgetter(name), defaults.get(name, None))
303 for name in names
304 ]
305
306 @util.decorator
307 def check(fn, *args, **kw):
308 # make pylance happy by not including "self" in the argument
309 # list
310 self = args[0]
311 args = args[1:]
312 for name, getter, default_ in getters:
313 if getter(self) is not default_:
314 msg = msgs.get(
315 name,
316 "Method %s() has already been invoked on this %s construct"
317 % (fn.__name__, self.__class__),
318 )
319 raise exc.InvalidRequestError(msg)
320 return fn(self, *args, **kw)
321
322 return check
323
324
325def _clone(element, **kw):
326 return element._clone(**kw)
327
328
329def _expand_cloned(
330 elements: Iterable[_CLE],
331) -> Iterable[_CLE]:
332 """expand the given set of ClauseElements to be the set of all 'cloned'
333 predecessors.
334
335 """
336 # TODO: cython candidate
337 return itertools.chain(*[x._cloned_set for x in elements])
338
339
340def _de_clone(
341 elements: Iterable[_CLE],
342) -> Iterable[_CLE]:
343 for x in elements:
344 while x._is_clone_of is not None:
345 x = x._is_clone_of
346 yield x
347
348
349def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
350 """return the intersection of sets a and b, counting
351 any overlap between 'cloned' predecessors.
352
353 The returned set is in terms of the entities present within 'a'.
354
355 """
356 all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
357 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)}
358
359
360def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
361 all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
362 return {
363 elem for elem in a if not all_overlap.intersection(elem._cloned_set)
364 }
365
366
367class _DialectArgView(MutableMapping[str, Any]):
368 """A dictionary view of dialect-level arguments in the form
369 <dialectname>_<argument_name>.
370
371 """
372
373 __slots__ = ("obj",)
374
375 def __init__(self, obj):
376 self.obj = obj
377
378 def _key(self, key):
379 try:
380 dialect, value_key = key.split("_", 1)
381 except ValueError as err:
382 raise KeyError(key) from err
383 else:
384 return dialect, value_key
385
386 def __getitem__(self, key):
387 dialect, value_key = self._key(key)
388
389 try:
390 opt = self.obj.dialect_options[dialect]
391 except exc.NoSuchModuleError as err:
392 raise KeyError(key) from err
393 else:
394 return opt[value_key]
395
396 def __setitem__(self, key, value):
397 try:
398 dialect, value_key = self._key(key)
399 except KeyError as err:
400 raise exc.ArgumentError(
401 "Keys must be of the form <dialectname>_<argname>"
402 ) from err
403 else:
404 self.obj.dialect_options[dialect][value_key] = value
405
406 def __delitem__(self, key):
407 dialect, value_key = self._key(key)
408 del self.obj.dialect_options[dialect][value_key]
409
410 def __len__(self):
411 return sum(
412 len(args._non_defaults)
413 for args in self.obj.dialect_options.values()
414 )
415
416 def __iter__(self):
417 return (
418 "%s_%s" % (dialect_name, value_name)
419 for dialect_name in self.obj.dialect_options
420 for value_name in self.obj.dialect_options[
421 dialect_name
422 ]._non_defaults
423 )
424
425
426class _DialectArgDict(MutableMapping[str, Any]):
427 """A dictionary view of dialect-level arguments for a specific
428 dialect.
429
430 Maintains a separate collection of user-specified arguments
431 and dialect-specified default arguments.
432
433 """
434
435 def __init__(self):
436 self._non_defaults = {}
437 self._defaults = {}
438
439 def __len__(self):
440 return len(set(self._non_defaults).union(self._defaults))
441
442 def __iter__(self):
443 return iter(set(self._non_defaults).union(self._defaults))
444
445 def __getitem__(self, key):
446 if key in self._non_defaults:
447 return self._non_defaults[key]
448 else:
449 return self._defaults[key]
450
451 def __setitem__(self, key, value):
452 self._non_defaults[key] = value
453
454 def __delitem__(self, key):
455 del self._non_defaults[key]
456
457
458@util.preload_module("sqlalchemy.dialects")
459def _kw_reg_for_dialect(dialect_name):
460 dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
461 if dialect_cls.construct_arguments is None:
462 return None
463 return dict(dialect_cls.construct_arguments)
464
465
466class DialectKWArgs:
467 """Establish the ability for a class to have dialect-specific arguments
468 with defaults and constructor validation.
469
470 The :class:`.DialectKWArgs` interacts with the
471 :attr:`.DefaultDialect.construct_arguments` present on a dialect.
472
473 .. seealso::
474
475 :attr:`.DefaultDialect.construct_arguments`
476
477 """
478
479 __slots__ = ()
480
481 _dialect_kwargs_traverse_internals = [
482 ("dialect_options", InternalTraversal.dp_dialect_options)
483 ]
484
485 @classmethod
486 def argument_for(cls, dialect_name, argument_name, default):
487 """Add a new kind of dialect-specific keyword argument for this class.
488
489 E.g.::
490
491 Index.argument_for("mydialect", "length", None)
492
493 some_index = Index("a", "b", mydialect_length=5)
494
495 The :meth:`.DialectKWArgs.argument_for` method is a per-argument
496 way adding extra arguments to the
497 :attr:`.DefaultDialect.construct_arguments` dictionary. This
498 dictionary provides a list of argument names accepted by various
499 schema-level constructs on behalf of a dialect.
500
501 New dialects should typically specify this dictionary all at once as a
502 data member of the dialect class. The use case for ad-hoc addition of
503 argument names is typically for end-user code that is also using
504 a custom compilation scheme which consumes the additional arguments.
505
506 :param dialect_name: name of a dialect. The dialect must be
507 locatable, else a :class:`.NoSuchModuleError` is raised. The
508 dialect must also include an existing
509 :attr:`.DefaultDialect.construct_arguments` collection, indicating
510 that it participates in the keyword-argument validation and default
511 system, else :class:`.ArgumentError` is raised. If the dialect does
512 not include this collection, then any keyword argument can be
513 specified on behalf of this dialect already. All dialects packaged
514 within SQLAlchemy include this collection, however for third party
515 dialects, support may vary.
516
517 :param argument_name: name of the parameter.
518
519 :param default: default value of the parameter.
520
521 """
522
523 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
524 if construct_arg_dictionary is None:
525 raise exc.ArgumentError(
526 "Dialect '%s' does have keyword-argument "
527 "validation and defaults enabled configured" % dialect_name
528 )
529 if cls not in construct_arg_dictionary:
530 construct_arg_dictionary[cls] = {}
531 construct_arg_dictionary[cls][argument_name] = default
532
533 @property
534 def dialect_kwargs(self):
535 """A collection of keyword arguments specified as dialect-specific
536 options to this construct.
537
538 The arguments are present here in their original ``<dialect>_<kwarg>``
539 format. Only arguments that were actually passed are included;
540 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which
541 contains all options known by this dialect including defaults.
542
543 The collection is also writable; keys are accepted of the
544 form ``<dialect>_<kwarg>`` where the value will be assembled
545 into the list of options.
546
547 .. seealso::
548
549 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form
550
551 """
552 return _DialectArgView(self)
553
554 @property
555 def kwargs(self):
556 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
557 return self.dialect_kwargs
558
559 _kw_registry = util.PopulateDict(_kw_reg_for_dialect)
560
561 @classmethod
562 def _kw_reg_for_dialect_cls(cls, dialect_name):
563 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
564 d = _DialectArgDict()
565
566 if construct_arg_dictionary is None:
567 d._defaults.update({"*": None})
568 else:
569 for cls in reversed(cls.__mro__):
570 if cls in construct_arg_dictionary:
571 d._defaults.update(construct_arg_dictionary[cls])
572 return d
573
574 @util.memoized_property
575 def dialect_options(self):
576 """A collection of keyword arguments specified as dialect-specific
577 options to this construct.
578
579 This is a two-level nested registry, keyed to ``<dialect_name>``
580 and ``<argument_name>``. For example, the ``postgresql_where``
581 argument would be locatable as::
582
583 arg = my_object.dialect_options["postgresql"]["where"]
584
585 .. versionadded:: 0.9.2
586
587 .. seealso::
588
589 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form
590
591 """
592
593 return util.PopulateDict(self._kw_reg_for_dialect_cls)
594
595 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None:
596 # validate remaining kwargs that they all specify DB prefixes
597
598 if not kwargs:
599 return
600
601 for k in kwargs:
602 m = re.match("^(.+?)_(.+)$", k)
603 if not m:
604 raise TypeError(
605 "Additional arguments should be "
606 "named <dialectname>_<argument>, got '%s'" % k
607 )
608 dialect_name, arg_name = m.group(1, 2)
609
610 try:
611 construct_arg_dictionary = self.dialect_options[dialect_name]
612 except exc.NoSuchModuleError:
613 util.warn(
614 "Can't validate argument %r; can't "
615 "locate any SQLAlchemy dialect named %r"
616 % (k, dialect_name)
617 )
618 self.dialect_options[dialect_name] = d = _DialectArgDict()
619 d._defaults.update({"*": None})
620 d._non_defaults[arg_name] = kwargs[k]
621 else:
622 if (
623 "*" not in construct_arg_dictionary
624 and arg_name not in construct_arg_dictionary
625 ):
626 raise exc.ArgumentError(
627 "Argument %r is not accepted by "
628 "dialect %r on behalf of %r"
629 % (k, dialect_name, self.__class__)
630 )
631 else:
632 construct_arg_dictionary[arg_name] = kwargs[k]
633
634
635class CompileState:
636 """Produces additional object state necessary for a statement to be
637 compiled.
638
639 the :class:`.CompileState` class is at the base of classes that assemble
640 state for a particular statement object that is then used by the
641 compiler. This process is essentially an extension of the process that
642 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis
643 on converting raw user intent into more organized structures rather than
644 producing string output. The top-level :class:`.CompileState` for the
645 statement being executed is also accessible when the execution context
646 works with invoking the statement and collecting results.
647
648 The production of :class:`.CompileState` is specific to the compiler, such
649 as within the :meth:`.SQLCompiler.visit_insert`,
650 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also
651 responsible for associating the :class:`.CompileState` with the
652 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement,
653 i.e. the outermost SQL statement that's actually being executed.
654 There can be other :class:`.CompileState` objects that are not the
655 toplevel, such as when a SELECT subquery or CTE-nested
656 INSERT/UPDATE/DELETE is generated.
657
658 .. versionadded:: 1.4
659
660 """
661
662 __slots__ = ("statement", "_ambiguous_table_name_map")
663
664 plugins: Dict[Tuple[str, str], Type[CompileState]] = {}
665
666 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap]
667
668 @classmethod
669 def create_for_statement(
670 cls, statement: Executable, compiler: SQLCompiler, **kw: Any
671 ) -> CompileState:
672 # factory construction.
673
674 if statement._propagate_attrs:
675 plugin_name = statement._propagate_attrs.get(
676 "compile_state_plugin", "default"
677 )
678 klass = cls.plugins.get(
679 (plugin_name, statement._effective_plugin_target), None
680 )
681 if klass is None:
682 klass = cls.plugins[
683 ("default", statement._effective_plugin_target)
684 ]
685
686 else:
687 klass = cls.plugins[
688 ("default", statement._effective_plugin_target)
689 ]
690
691 if klass is cls:
692 return cls(statement, compiler, **kw)
693 else:
694 return klass.create_for_statement(statement, compiler, **kw)
695
696 def __init__(self, statement, compiler, **kw):
697 self.statement = statement
698
699 @classmethod
700 def get_plugin_class(
701 cls, statement: Executable
702 ) -> Optional[Type[CompileState]]:
703 plugin_name = statement._propagate_attrs.get(
704 "compile_state_plugin", None
705 )
706
707 if plugin_name:
708 key = (plugin_name, statement._effective_plugin_target)
709 if key in cls.plugins:
710 return cls.plugins[key]
711
712 # there's no case where we call upon get_plugin_class() and want
713 # to get None back, there should always be a default. return that
714 # if there was no plugin-specific class (e.g. Insert with "orm"
715 # plugin)
716 try:
717 return cls.plugins[("default", statement._effective_plugin_target)]
718 except KeyError:
719 return None
720
721 @classmethod
722 def _get_plugin_class_for_plugin(
723 cls, statement: Executable, plugin_name: str
724 ) -> Optional[Type[CompileState]]:
725 try:
726 return cls.plugins[
727 (plugin_name, statement._effective_plugin_target)
728 ]
729 except KeyError:
730 return None
731
732 @classmethod
733 def plugin_for(
734 cls, plugin_name: str, visit_name: str
735 ) -> Callable[[_Fn], _Fn]:
736 def decorate(cls_to_decorate):
737 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate
738 return cls_to_decorate
739
740 return decorate
741
742
743class Generative(HasMemoized):
744 """Provide a method-chaining pattern in conjunction with the
745 @_generative decorator."""
746
747 def _generate(self) -> Self:
748 skip = self._memoized_keys
749 cls = self.__class__
750 s = cls.__new__(cls)
751 if skip:
752 # ensure this iteration remains atomic
753 s.__dict__ = {
754 k: v for k, v in self.__dict__.copy().items() if k not in skip
755 }
756 else:
757 s.__dict__ = self.__dict__.copy()
758 return s
759
760
761class InPlaceGenerative(HasMemoized):
762 """Provide a method-chaining pattern in conjunction with the
763 @_generative decorator that mutates in place."""
764
765 __slots__ = ()
766
767 def _generate(self):
768 skip = self._memoized_keys
769 # note __dict__ needs to be in __slots__ if this is used
770 for k in skip:
771 self.__dict__.pop(k, None)
772 return self
773
774
775class HasCompileState(Generative):
776 """A class that has a :class:`.CompileState` associated with it."""
777
778 _compile_state_plugin: Optional[Type[CompileState]] = None
779
780 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT
781
782 _compile_state_factory = CompileState.create_for_statement
783
784
785class _MetaOptions(type):
786 """metaclass for the Options class.
787
788 This metaclass is actually necessary despite the availability of the
789 ``__init_subclass__()`` hook as this type also provides custom class-level
790 behavior for the ``__add__()`` method.
791
792 """
793
794 _cache_attrs: Tuple[str, ...]
795
796 def __add__(self, other):
797 o1 = self()
798
799 if set(other).difference(self._cache_attrs):
800 raise TypeError(
801 "dictionary contains attributes not covered by "
802 "Options class %s: %r"
803 % (self, set(other).difference(self._cache_attrs))
804 )
805
806 o1.__dict__.update(other)
807 return o1
808
809 if TYPE_CHECKING:
810
811 def __getattr__(self, key: str) -> Any: ...
812
813 def __setattr__(self, key: str, value: Any) -> None: ...
814
815 def __delattr__(self, key: str) -> None: ...
816
817
818class Options(metaclass=_MetaOptions):
819 """A cacheable option dictionary with defaults."""
820
821 __slots__ = ()
822
823 _cache_attrs: Tuple[str, ...]
824
825 def __init_subclass__(cls) -> None:
826 dict_ = cls.__dict__
827 cls._cache_attrs = tuple(
828 sorted(
829 d
830 for d in dict_
831 if not d.startswith("__")
832 and d not in ("_cache_key_traversal",)
833 )
834 )
835 super().__init_subclass__()
836
837 def __init__(self, **kw):
838 self.__dict__.update(kw)
839
840 def __add__(self, other):
841 o1 = self.__class__.__new__(self.__class__)
842 o1.__dict__.update(self.__dict__)
843
844 if set(other).difference(self._cache_attrs):
845 raise TypeError(
846 "dictionary contains attributes not covered by "
847 "Options class %s: %r"
848 % (self, set(other).difference(self._cache_attrs))
849 )
850
851 o1.__dict__.update(other)
852 return o1
853
854 def __eq__(self, other):
855 # TODO: very inefficient. This is used only in test suites
856 # right now.
857 for a, b in zip_longest(self._cache_attrs, other._cache_attrs):
858 if getattr(self, a) != getattr(other, b):
859 return False
860 return True
861
862 def __repr__(self):
863 # TODO: fairly inefficient, used only in debugging right now.
864
865 return "%s(%s)" % (
866 self.__class__.__name__,
867 ", ".join(
868 "%s=%r" % (k, self.__dict__[k])
869 for k in self._cache_attrs
870 if k in self.__dict__
871 ),
872 )
873
874 @classmethod
875 def isinstance(cls, klass: Type[Any]) -> bool:
876 return issubclass(cls, klass)
877
878 @hybridmethod
879 def add_to_element(self, name, value):
880 return self + {name: getattr(self, name) + value}
881
882 @hybridmethod
883 def _state_dict_inst(self) -> Mapping[str, Any]:
884 return self.__dict__
885
886 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT
887
888 @_state_dict_inst.classlevel
889 def _state_dict(cls) -> Mapping[str, Any]:
890 return cls._state_dict_const
891
892 @classmethod
893 def safe_merge(cls, other):
894 d = other._state_dict()
895
896 # only support a merge with another object of our class
897 # and which does not have attrs that we don't. otherwise
898 # we risk having state that might not be part of our cache
899 # key strategy
900
901 if (
902 cls is not other.__class__
903 and other._cache_attrs
904 and set(other._cache_attrs).difference(cls._cache_attrs)
905 ):
906 raise TypeError(
907 "other element %r is not empty, is not of type %s, "
908 "and contains attributes not covered here %r"
909 % (
910 other,
911 cls,
912 set(other._cache_attrs).difference(cls._cache_attrs),
913 )
914 )
915 return cls + d
916
917 @classmethod
918 def from_execution_options(
919 cls, key, attrs, exec_options, statement_exec_options
920 ):
921 """process Options argument in terms of execution options.
922
923
924 e.g.::
925
926 (
927 load_options,
928 execution_options,
929 ) = QueryContext.default_load_options.from_execution_options(
930 "_sa_orm_load_options",
931 {"populate_existing", "autoflush", "yield_per"},
932 execution_options,
933 statement._execution_options,
934 )
935
936 get back the Options and refresh "_sa_orm_load_options" in the
937 exec options dict w/ the Options as well
938
939 """
940
941 # common case is that no options we are looking for are
942 # in either dictionary, so cancel for that first
943 check_argnames = attrs.intersection(
944 set(exec_options).union(statement_exec_options)
945 )
946
947 existing_options = exec_options.get(key, cls)
948
949 if check_argnames:
950 result = {}
951 for argname in check_argnames:
952 local = "_" + argname
953 if argname in exec_options:
954 result[local] = exec_options[argname]
955 elif argname in statement_exec_options:
956 result[local] = statement_exec_options[argname]
957
958 new_options = existing_options + result
959 exec_options = util.immutabledict().merge_with(
960 exec_options, {key: new_options}
961 )
962 return new_options, exec_options
963
964 else:
965 return existing_options, exec_options
966
967 if TYPE_CHECKING:
968
969 def __getattr__(self, key: str) -> Any: ...
970
971 def __setattr__(self, key: str, value: Any) -> None: ...
972
973 def __delattr__(self, key: str) -> None: ...
974
975
976class CacheableOptions(Options, HasCacheKey):
977 __slots__ = ()
978
979 @hybridmethod
980 def _gen_cache_key_inst(self, anon_map, bindparams):
981 return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
982
983 @_gen_cache_key_inst.classlevel
984 def _gen_cache_key(cls, anon_map, bindparams):
985 return (cls, ())
986
987 @hybridmethod
988 def _generate_cache_key(self):
989 return HasCacheKey._generate_cache_key_for_object(self)
990
991
992class ExecutableOption(HasCopyInternals):
993 __slots__ = ()
994
995 _annotations = util.EMPTY_DICT
996
997 __visit_name__ = "executable_option"
998
999 _is_has_cache_key = False
1000
1001 _is_core = True
1002
1003 def _clone(self, **kw):
1004 """Create a shallow copy of this ExecutableOption."""
1005 c = self.__class__.__new__(self.__class__)
1006 c.__dict__ = dict(self.__dict__) # type: ignore
1007 return c
1008
1009
1010_L = TypeVar("_L", bound=str)
1011
1012
1013class HasSyntaxExtensions(Generic[_L]):
1014
1015 _position_map: Mapping[_L, str]
1016
1017 @_generative
1018 def ext(self, extension: SyntaxExtension) -> Self:
1019 """Applies a SQL syntax extension to this statement.
1020
1021 SQL syntax extensions are :class:`.ClauseElement` objects that define
1022 some vendor-specific syntactical construct that take place in specific
1023 parts of a SQL statement. Examples include vendor extensions like
1024 PostgreSQL / SQLite's "ON DUPLICATE KEY UPDATE", PostgreSQL's
1025 "DISTINCT ON", and MySQL's "LIMIT" that can be applied to UPDATE
1026 and DELETE statements.
1027
1028 .. seealso::
1029
1030 :ref:`examples_syntax_extensions`
1031
1032 :func:`_mysql.limit` - DML LIMIT for MySQL
1033
1034 :func:`_postgresql.distinct_on` - DISTINCT ON for PostgreSQL
1035
1036 .. versionadded:: 2.1
1037
1038 """
1039 extension = coercions.expect(
1040 roles.SyntaxExtensionRole, extension, apply_propagate_attrs=self
1041 )
1042 self._apply_syntax_extension_to_self(extension)
1043 return self
1044
1045 @util.preload_module("sqlalchemy.sql.elements")
1046 def apply_syntax_extension_point(
1047 self,
1048 apply_fn: Callable[[Sequence[ClauseElement]], Sequence[ClauseElement]],
1049 position: _L,
1050 ) -> None:
1051 """Apply a :class:`.SyntaxExtension` to a known extension point.
1052
1053 Should be used only internally by :class:`.SyntaxExtension`.
1054
1055 E.g.::
1056
1057 class Qualify(SyntaxExtension, ClauseElement):
1058
1059 # ...
1060
1061 def apply_to_select(self, select_stmt: Select) -> None:
1062 # append self to existing
1063 select_stmt.apply_extension_point(
1064 lambda existing: [*existing, self], "post_criteria"
1065 )
1066
1067
1068 class ReplaceExt(SyntaxExtension, ClauseElement):
1069
1070 # ...
1071
1072 def apply_to_select(self, select_stmt: Select) -> None:
1073 # replace any existing elements regardless of type
1074 select_stmt.apply_extension_point(
1075 lambda existing: [self], "post_criteria"
1076 )
1077
1078
1079 class ReplaceOfTypeExt(SyntaxExtension, ClauseElement):
1080
1081 # ...
1082
1083 def apply_to_select(self, select_stmt: Select) -> None:
1084 # replace any existing elements of the same type
1085 select_stmt.apply_extension_point(
1086 self.append_replacing_same_type, "post_criteria"
1087 )
1088
1089 :param apply_fn: callable function that will receive a sequence of
1090 :class:`.ClauseElement` that is already populating the extension
1091 point (the sequence is empty if there isn't one), and should return
1092 a new sequence of :class:`.ClauseElement` that will newly populate
1093 that point. The function typically can choose to concatenate the
1094 existing values with the new one, or to replace the values that are
1095 there with a new one by returning a list of a single element, or
1096 to perform more complex operations like removing only the same
1097 type element from the input list of merging already existing elements
1098 of the same type. Some examples are shown in the examples above
1099 :param position: string name of the position to apply to. This
1100 varies per statement type. IDEs should show the possible values
1101 for each statement type as it's typed with a ``typing.Literal`` per
1102 statement.
1103
1104 .. seealso::
1105
1106 :ref:`examples_syntax_extensions`
1107
1108
1109 """ # noqa: E501
1110
1111 try:
1112 attrname = self._position_map[position]
1113 except KeyError as ke:
1114 raise ValueError(
1115 f"Unknown position {position!r} for {self.__class__} "
1116 f"construct; known positions: "
1117 f"{', '.join(repr(k) for k in self._position_map)}"
1118 ) from ke
1119 else:
1120 ElementList = util.preloaded.sql_elements.ElementList
1121 existing: Optional[ClauseElement] = getattr(self, attrname, None)
1122 if existing is None:
1123 input_seq: Tuple[ClauseElement, ...] = ()
1124 elif isinstance(existing, ElementList):
1125 input_seq = existing.clauses
1126 else:
1127 input_seq = (existing,)
1128
1129 new_seq = apply_fn(input_seq)
1130 assert new_seq, "cannot return empty sequence"
1131 new = new_seq[0] if len(new_seq) == 1 else ElementList(new_seq)
1132 setattr(self, attrname, new)
1133
1134 def _apply_syntax_extension_to_self(
1135 self, extension: SyntaxExtension
1136 ) -> None:
1137 raise NotImplementedError()
1138
1139 def _get_syntax_extensions_as_dict(self) -> Mapping[_L, SyntaxExtension]:
1140 res: Dict[_L, SyntaxExtension] = {}
1141 for name, attr in self._position_map.items():
1142 value = getattr(self, attr)
1143 if value is not None:
1144 res[name] = value
1145 return res
1146
1147 def _set_syntax_extensions(self, **extensions: SyntaxExtension) -> None:
1148 for name, value in extensions.items():
1149 setattr(self, self._position_map[name], value) # type: ignore[index] # noqa: E501
1150
1151
1152class SyntaxExtension(roles.SyntaxExtensionRole):
1153 """Defines a unit that when also extending from :class:`.ClauseElement`
1154 can be applied to SQLAlchemy statements :class:`.Select`,
1155 :class:`_sql.Insert`, :class:`.Update` and :class:`.Delete` making use of
1156 pre-established SQL insertion points within these constructs.
1157
1158 .. versionadded:: 2.1
1159
1160 .. seealso::
1161
1162 :ref:`examples_syntax_extensions`
1163
1164 """
1165
1166 def append_replacing_same_type(
1167 self, existing: Sequence[ClauseElement]
1168 ) -> Sequence[ClauseElement]:
1169 """Utility function that can be used as
1170 :paramref:`_sql.HasSyntaxExtensions.apply_extension_point.apply_fn`
1171 to remove any other element of the same type in existing and appending
1172 ``self`` to the list.
1173
1174 This is equivalent to::
1175
1176 stmt.apply_extension_point(
1177 lambda existing: [
1178 *(e for e in existing if not isinstance(e, ReplaceOfTypeExt)),
1179 self,
1180 ],
1181 "post_criteria",
1182 )
1183
1184 .. seealso::
1185
1186 :ref:`examples_syntax_extensions`
1187
1188 :meth:`_sql.HasSyntaxExtensions.apply_syntax_extension_point`
1189
1190 """ # noqa: E501
1191 cls = type(self)
1192 return [*(e for e in existing if not isinstance(e, cls)), self] # type: ignore[list-item] # noqa: E501
1193
1194 def apply_to_select(self, select_stmt: Select[Unpack[_Ts]]) -> None:
1195 """Apply this :class:`.SyntaxExtension` to a :class:`.Select`"""
1196 raise NotImplementedError(
1197 f"Extension {type(self).__name__} cannot be applied to select"
1198 )
1199
1200 def apply_to_update(self, update_stmt: Update) -> None:
1201 """Apply this :class:`.SyntaxExtension` to an :class:`.Update`"""
1202 raise NotImplementedError(
1203 f"Extension {type(self).__name__} cannot be applied to update"
1204 )
1205
1206 def apply_to_delete(self, delete_stmt: Delete) -> None:
1207 """Apply this :class:`.SyntaxExtension` to a :class:`.Delete`"""
1208 raise NotImplementedError(
1209 f"Extension {type(self).__name__} cannot be applied to delete"
1210 )
1211
1212 def apply_to_insert(self, insert_stmt: Insert) -> None:
1213 """Apply this :class:`.SyntaxExtension` to an
1214 :class:`_sql.Insert`"""
1215 raise NotImplementedError(
1216 f"Extension {type(self).__name__} cannot be applied to insert"
1217 )
1218
1219
1220class Executable(roles.StatementRole):
1221 """Mark a :class:`_expression.ClauseElement` as supporting execution.
1222
1223 :class:`.Executable` is a superclass for all "statement" types
1224 of objects, including :func:`select`, :func:`delete`, :func:`update`,
1225 :func:`insert`, :func:`text`.
1226
1227 """
1228
1229 supports_execution: bool = True
1230 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT
1231 _is_default_generator = False
1232 _with_options: Tuple[ExecutableOption, ...] = ()
1233 _compile_state_funcs: Tuple[
1234 Tuple[Callable[[CompileState], None], Any], ...
1235 ] = ()
1236 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]]
1237
1238 _executable_traverse_internals = [
1239 ("_with_options", InternalTraversal.dp_executable_options),
1240 (
1241 "_compile_state_funcs",
1242 ExtendedInternalTraversal.dp_compile_state_funcs,
1243 ),
1244 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
1245 ]
1246
1247 is_select = False
1248 is_from_statement = False
1249 is_update = False
1250 is_insert = False
1251 is_text = False
1252 is_delete = False
1253 is_dml = False
1254
1255 if TYPE_CHECKING:
1256 __visit_name__: str
1257
1258 def _compile_w_cache(
1259 self,
1260 dialect: Dialect,
1261 *,
1262 compiled_cache: Optional[CompiledCacheType],
1263 column_keys: List[str],
1264 for_executemany: bool = False,
1265 schema_translate_map: Optional[SchemaTranslateMapType] = None,
1266 **kw: Any,
1267 ) -> Tuple[
1268 Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats
1269 ]: ...
1270
1271 def _execute_on_connection(
1272 self,
1273 connection: Connection,
1274 distilled_params: _CoreMultiExecuteParams,
1275 execution_options: CoreExecuteOptionsParameter,
1276 ) -> CursorResult[Any]: ...
1277
1278 def _execute_on_scalar(
1279 self,
1280 connection: Connection,
1281 distilled_params: _CoreMultiExecuteParams,
1282 execution_options: CoreExecuteOptionsParameter,
1283 ) -> Any: ...
1284
1285 @util.ro_non_memoized_property
1286 def _all_selected_columns(self):
1287 raise NotImplementedError()
1288
1289 @property
1290 def _effective_plugin_target(self) -> str:
1291 return self.__visit_name__
1292
1293 @_generative
1294 def options(self, *options: ExecutableOption) -> Self:
1295 """Apply options to this statement.
1296
1297 In the general sense, options are any kind of Python object
1298 that can be interpreted by systems that consume the statement outside
1299 of the regular SQL compiler chain. Specifically, these options are
1300 the ORM level options that apply "eager load" and other loading
1301 behaviors to an ORM query.
1302
1303 For background on specific kinds of options for specific kinds of
1304 statements, refer to the documentation for those option objects.
1305
1306 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to
1307 Core statement objects towards the goal of allowing unified
1308 Core / ORM querying capabilities.
1309
1310 .. seealso::
1311
1312 :ref:`loading_columns` - refers to options specific to the usage
1313 of ORM queries
1314
1315 :ref:`relationship_loader_options` - refers to options specific
1316 to the usage of ORM queries
1317
1318 """
1319 self._with_options += tuple(
1320 coercions.expect(roles.ExecutableOptionRole, opt)
1321 for opt in options
1322 )
1323 return self
1324
1325 @_generative
1326 def _set_compile_options(self, compile_options: CacheableOptions) -> Self:
1327 """Assign the compile options to a new value.
1328
1329 :param compile_options: appropriate CacheableOptions structure
1330
1331 """
1332
1333 self._compile_options = compile_options
1334 return self
1335
1336 @_generative
1337 def _update_compile_options(self, options: CacheableOptions) -> Self:
1338 """update the _compile_options with new keys."""
1339
1340 assert self._compile_options is not None
1341 self._compile_options += options
1342 return self
1343
1344 @_generative
1345 def _add_compile_state_func(
1346 self,
1347 callable_: Callable[[CompileState], None],
1348 cache_args: Any,
1349 ) -> Self:
1350 """Add a compile state function to this statement.
1351
1352 When using the ORM only, these are callable functions that will
1353 be given the CompileState object upon compilation.
1354
1355 A second argument cache_args is required, which will be combined with
1356 the ``__code__`` identity of the function itself in order to produce a
1357 cache key.
1358
1359 """
1360 self._compile_state_funcs += ((callable_, cache_args),)
1361 return self
1362
1363 @overload
1364 def execution_options(
1365 self,
1366 *,
1367 compiled_cache: Optional[CompiledCacheType] = ...,
1368 logging_token: str = ...,
1369 isolation_level: IsolationLevel = ...,
1370 no_parameters: bool = False,
1371 stream_results: bool = False,
1372 max_row_buffer: int = ...,
1373 yield_per: int = ...,
1374 driver_column_names: bool = ...,
1375 insertmanyvalues_page_size: int = ...,
1376 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
1377 populate_existing: bool = False,
1378 autoflush: bool = False,
1379 synchronize_session: SynchronizeSessionArgument = ...,
1380 dml_strategy: DMLStrategyArgument = ...,
1381 render_nulls: bool = ...,
1382 is_delete_using: bool = ...,
1383 is_update_from: bool = ...,
1384 preserve_rowcount: bool = False,
1385 **opt: Any,
1386 ) -> Self: ...
1387
1388 @overload
1389 def execution_options(self, **opt: Any) -> Self: ...
1390
1391 @_generative
1392 def execution_options(self, **kw: Any) -> Self:
1393 """Set non-SQL options for the statement which take effect during
1394 execution.
1395
1396 Execution options can be set at many scopes, including per-statement,
1397 per-connection, or per execution, using methods such as
1398 :meth:`_engine.Connection.execution_options` and parameters which
1399 accept a dictionary of options such as
1400 :paramref:`_engine.Connection.execute.execution_options` and
1401 :paramref:`_orm.Session.execute.execution_options`.
1402
1403 The primary characteristic of an execution option, as opposed to
1404 other kinds of options such as ORM loader options, is that
1405 **execution options never affect the compiled SQL of a query, only
1406 things that affect how the SQL statement itself is invoked or how
1407 results are fetched**. That is, execution options are not part of
1408 what's accommodated by SQL compilation nor are they considered part of
1409 the cached state of a statement.
1410
1411 The :meth:`_sql.Executable.execution_options` method is
1412 :term:`generative`, as
1413 is the case for the method as applied to the :class:`_engine.Engine`
1414 and :class:`_orm.Query` objects, which means when the method is called,
1415 a copy of the object is returned, which applies the given parameters to
1416 that new copy, but leaves the original unchanged::
1417
1418 statement = select(table.c.x, table.c.y)
1419 new_statement = statement.execution_options(my_option=True)
1420
1421 An exception to this behavior is the :class:`_engine.Connection`
1422 object, where the :meth:`_engine.Connection.execution_options` method
1423 is explicitly **not** generative.
1424
1425 The kinds of options that may be passed to
1426 :meth:`_sql.Executable.execution_options` and other related methods and
1427 parameter dictionaries include parameters that are explicitly consumed
1428 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not
1429 defined by SQLAlchemy, which means the methods and/or parameter
1430 dictionaries may be used for user-defined parameters that interact with
1431 custom code, which may access the parameters using methods such as
1432 :meth:`_sql.Executable.get_execution_options` and
1433 :meth:`_engine.Connection.get_execution_options`, or within selected
1434 event hooks using a dedicated ``execution_options`` event parameter
1435 such as
1436 :paramref:`_events.ConnectionEvents.before_execute.execution_options`
1437 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.::
1438
1439 from sqlalchemy import event
1440
1441
1442 @event.listens_for(some_engine, "before_execute")
1443 def _process_opt(conn, statement, multiparams, params, execution_options):
1444 "run a SQL function before invoking a statement"
1445
1446 if execution_options.get("do_special_thing", False):
1447 conn.exec_driver_sql("run_special_function()")
1448
1449 Within the scope of options that are explicitly recognized by
1450 SQLAlchemy, most apply to specific classes of objects and not others.
1451 The most common execution options include:
1452
1453 * :paramref:`_engine.Connection.execution_options.isolation_level` -
1454 sets the isolation level for a connection or a class of connections
1455 via an :class:`_engine.Engine`. This option is accepted only
1456 by :class:`_engine.Connection` or :class:`_engine.Engine`.
1457
1458 * :paramref:`_engine.Connection.execution_options.stream_results` -
1459 indicates results should be fetched using a server side cursor;
1460 this option is accepted by :class:`_engine.Connection`, by the
1461 :paramref:`_engine.Connection.execute.execution_options` parameter
1462 on :meth:`_engine.Connection.execute`, and additionally by
1463 :meth:`_sql.Executable.execution_options` on a SQL statement object,
1464 as well as by ORM constructs like :meth:`_orm.Session.execute`.
1465
1466 * :paramref:`_engine.Connection.execution_options.compiled_cache` -
1467 indicates a dictionary that will serve as the
1468 :ref:`SQL compilation cache <sql_caching>`
1469 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as
1470 well as for ORM methods like :meth:`_orm.Session.execute`.
1471 Can be passed as ``None`` to disable caching for statements.
1472 This option is not accepted by
1473 :meth:`_sql.Executable.execution_options` as it is inadvisable to
1474 carry along a compilation cache within a statement object.
1475
1476 * :paramref:`_engine.Connection.execution_options.schema_translate_map`
1477 - a mapping of schema names used by the
1478 :ref:`Schema Translate Map <schema_translating>` feature, accepted
1479 by :class:`_engine.Connection`, :class:`_engine.Engine`,
1480 :class:`_sql.Executable`, as well as by ORM constructs
1481 like :meth:`_orm.Session.execute`.
1482
1483 .. seealso::
1484
1485 :meth:`_engine.Connection.execution_options`
1486
1487 :paramref:`_engine.Connection.execute.execution_options`
1488
1489 :paramref:`_orm.Session.execute.execution_options`
1490
1491 :ref:`orm_queryguide_execution_options` - documentation on all
1492 ORM-specific execution options
1493
1494 """ # noqa: E501
1495 if "isolation_level" in kw:
1496 raise exc.ArgumentError(
1497 "'isolation_level' execution option may only be specified "
1498 "on Connection.execution_options(), or "
1499 "per-engine using the isolation_level "
1500 "argument to create_engine()."
1501 )
1502 if "compiled_cache" in kw:
1503 raise exc.ArgumentError(
1504 "'compiled_cache' execution option may only be specified "
1505 "on Connection.execution_options(), not per statement."
1506 )
1507 self._execution_options = self._execution_options.union(kw)
1508 return self
1509
1510 def get_execution_options(self) -> _ExecuteOptions:
1511 """Get the non-SQL options which will take effect during execution.
1512
1513 .. seealso::
1514
1515 :meth:`.Executable.execution_options`
1516 """
1517 return self._execution_options
1518
1519
1520class SchemaEventTarget(event.EventTarget):
1521 """Base class for elements that are the targets of :class:`.DDLEvents`
1522 events.
1523
1524 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
1525
1526 """
1527
1528 dispatch: dispatcher[SchemaEventTarget]
1529
1530 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
1531 """Associate with this SchemaEvent's parent object."""
1532
1533 def _set_parent_with_dispatch(
1534 self, parent: SchemaEventTarget, **kw: Any
1535 ) -> None:
1536 self.dispatch.before_parent_attach(self, parent)
1537 self._set_parent(parent, **kw)
1538 self.dispatch.after_parent_attach(self, parent)
1539
1540
1541class SchemaVisitable(SchemaEventTarget, visitors.Visitable):
1542 """Base class for elements that are targets of a :class:`.SchemaVisitor`.
1543
1544 .. versionadded:: 2.0.41
1545
1546 """
1547
1548
1549class SchemaVisitor(ClauseVisitor):
1550 """Define the visiting for ``SchemaItem`` and more
1551 generally ``SchemaVisitable`` objects.
1552
1553 """
1554
1555 __traverse_options__ = {"schema_visitor": True}
1556
1557
1558class _SentinelDefaultCharacterization(Enum):
1559 NONE = "none"
1560 UNKNOWN = "unknown"
1561 CLIENTSIDE = "clientside"
1562 SENTINEL_DEFAULT = "sentinel_default"
1563 SERVERSIDE = "serverside"
1564 IDENTITY = "identity"
1565 SEQUENCE = "sequence"
1566
1567
1568class _SentinelColumnCharacterization(NamedTuple):
1569 columns: Optional[Sequence[Column[Any]]] = None
1570 is_explicit: bool = False
1571 is_autoinc: bool = False
1572 default_characterization: _SentinelDefaultCharacterization = (
1573 _SentinelDefaultCharacterization.NONE
1574 )
1575
1576
1577_COLKEY = TypeVar("_COLKEY", Union[None, str], str)
1578
1579_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True)
1580_COL = TypeVar("_COL", bound="ColumnElement[Any]")
1581
1582
1583class _ColumnMetrics(Generic[_COL_co]):
1584 __slots__ = ("column",)
1585
1586 column: _COL_co
1587
1588 def __init__(
1589 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co
1590 ):
1591 self.column = col
1592
1593 # proxy_index being non-empty means it was initialized.
1594 # so we need to update it
1595 pi = collection._proxy_index
1596 if pi:
1597 for eps_col in col._expanded_proxy_set:
1598 pi[eps_col].add(self)
1599
1600 def get_expanded_proxy_set(self):
1601 return self.column._expanded_proxy_set
1602
1603 def dispose(self, collection):
1604 pi = collection._proxy_index
1605 if not pi:
1606 return
1607 for col in self.column._expanded_proxy_set:
1608 colset = pi.get(col, None)
1609 if colset:
1610 colset.discard(self)
1611 if colset is not None and not colset:
1612 del pi[col]
1613
1614 def embedded(
1615 self,
1616 target_set: Union[
1617 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]]
1618 ],
1619 ) -> bool:
1620 expanded_proxy_set = self.column._expanded_proxy_set
1621 for t in target_set.difference(expanded_proxy_set):
1622 if not expanded_proxy_set.intersection(_expand_cloned([t])):
1623 return False
1624 return True
1625
1626
1627class ColumnCollection(Generic[_COLKEY, _COL_co]):
1628 """Collection of :class:`_expression.ColumnElement` instances,
1629 typically for
1630 :class:`_sql.FromClause` objects.
1631
1632 The :class:`_sql.ColumnCollection` object is most commonly available
1633 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection
1634 on the :class:`_schema.Table` object, introduced at
1635 :ref:`metadata_tables_and_columns`.
1636
1637 The :class:`_expression.ColumnCollection` has both mapping- and sequence-
1638 like behaviors. A :class:`_expression.ColumnCollection` usually stores
1639 :class:`_schema.Column` objects, which are then accessible both via mapping
1640 style access as well as attribute access style.
1641
1642 To access :class:`_schema.Column` objects using ordinary attribute-style
1643 access, specify the name like any other object attribute, such as below
1644 a column named ``employee_name`` is accessed::
1645
1646 >>> employee_table.c.employee_name
1647
1648 To access columns that have names with special characters or spaces,
1649 index-style access is used, such as below which illustrates a column named
1650 ``employee ' payment`` is accessed::
1651
1652 >>> employee_table.c["employee ' payment"]
1653
1654 As the :class:`_sql.ColumnCollection` object provides a Python dictionary
1655 interface, common dictionary method names like
1656 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`,
1657 and :meth:`_sql.ColumnCollection.items` are available, which means that
1658 database columns that are keyed under these names also need to use indexed
1659 access::
1660
1661 >>> employee_table.c["values"]
1662
1663
1664 The name for which a :class:`_schema.Column` would be present is normally
1665 that of the :paramref:`_schema.Column.key` parameter. In some contexts,
1666 such as a :class:`_sql.Select` object that uses a label style set
1667 using the :meth:`_sql.Select.set_label_style` method, a column of a certain
1668 key may instead be represented under a particular label name such
1669 as ``tablename_columnname``::
1670
1671 >>> from sqlalchemy import select, column, table
1672 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
1673 >>> t = table("t", column("c"))
1674 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
1675 >>> subq = stmt.subquery()
1676 >>> subq.c.t_c
1677 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c>
1678
1679 :class:`.ColumnCollection` also indexes the columns in order and allows
1680 them to be accessible by their integer position::
1681
1682 >>> cc[0]
1683 Column('x', Integer(), table=None)
1684 >>> cc[1]
1685 Column('y', Integer(), table=None)
1686
1687 .. versionadded:: 1.4 :class:`_expression.ColumnCollection`
1688 allows integer-based
1689 index access to the collection.
1690
1691 Iterating the collection yields the column expressions in order::
1692
1693 >>> list(cc)
1694 [Column('x', Integer(), table=None),
1695 Column('y', Integer(), table=None)]
1696
1697 The base :class:`_expression.ColumnCollection` object can store
1698 duplicates, which can
1699 mean either two columns with the same key, in which case the column
1700 returned by key access is **arbitrary**::
1701
1702 >>> x1, x2 = Column("x", Integer), Column("x", Integer)
1703 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)])
1704 >>> list(cc)
1705 [Column('x', Integer(), table=None),
1706 Column('x', Integer(), table=None)]
1707 >>> cc["x"] is x1
1708 False
1709 >>> cc["x"] is x2
1710 True
1711
1712 Or it can also mean the same column multiple times. These cases are
1713 supported as :class:`_expression.ColumnCollection`
1714 is used to represent the columns in
1715 a SELECT statement which may include duplicates.
1716
1717 A special subclass :class:`.DedupeColumnCollection` exists which instead
1718 maintains SQLAlchemy's older behavior of not allowing duplicates; this
1719 collection is used for schema level objects like :class:`_schema.Table`
1720 and
1721 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The
1722 :class:`.DedupeColumnCollection` class also has additional mutation methods
1723 as the schema constructs have more use cases that require removal and
1724 replacement of columns.
1725
1726 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection`
1727 now stores duplicate
1728 column keys as well as the same column in multiple positions. The
1729 :class:`.DedupeColumnCollection` class is added to maintain the
1730 former behavior in those cases where deduplication as well as
1731 additional replace/remove operations are needed.
1732
1733
1734 """
1735
1736 __slots__ = "_collection", "_index", "_colset", "_proxy_index"
1737
1738 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]]
1739 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
1740 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]]
1741 _colset: Set[_COL_co]
1742
1743 def __init__(
1744 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None
1745 ):
1746 object.__setattr__(self, "_colset", set())
1747 object.__setattr__(self, "_index", {})
1748 object.__setattr__(
1749 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1750 )
1751 object.__setattr__(self, "_collection", [])
1752 if columns:
1753 self._initial_populate(columns)
1754
1755 @util.preload_module("sqlalchemy.sql.elements")
1756 def __clause_element__(self) -> ClauseList:
1757 elements = util.preloaded.sql_elements
1758
1759 return elements.ClauseList(
1760 _literal_as_text_role=roles.ColumnsClauseRole,
1761 group=False,
1762 *self._all_columns,
1763 )
1764
1765 def _initial_populate(
1766 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1767 ) -> None:
1768 self._populate_separate_keys(iter_)
1769
1770 @property
1771 def _all_columns(self) -> List[_COL_co]:
1772 return [col for (_, col, _) in self._collection]
1773
1774 def keys(self) -> List[_COLKEY]:
1775 """Return a sequence of string key names for all columns in this
1776 collection."""
1777 return [k for (k, _, _) in self._collection]
1778
1779 def values(self) -> List[_COL_co]:
1780 """Return a sequence of :class:`_sql.ColumnClause` or
1781 :class:`_schema.Column` objects for all columns in this
1782 collection."""
1783 return [col for (_, col, _) in self._collection]
1784
1785 def items(self) -> List[Tuple[_COLKEY, _COL_co]]:
1786 """Return a sequence of (key, column) tuples for all columns in this
1787 collection each consisting of a string key name and a
1788 :class:`_sql.ColumnClause` or
1789 :class:`_schema.Column` object.
1790 """
1791
1792 return [(k, col) for (k, col, _) in self._collection]
1793
1794 def __bool__(self) -> bool:
1795 return bool(self._collection)
1796
1797 def __len__(self) -> int:
1798 return len(self._collection)
1799
1800 def __iter__(self) -> Iterator[_COL_co]:
1801 # turn to a list first to maintain over a course of changes
1802 return iter([col for _, col, _ in self._collection])
1803
1804 @overload
1805 def __getitem__(self, key: Union[str, int]) -> _COL_co: ...
1806
1807 @overload
1808 def __getitem__(
1809 self, key: Tuple[Union[str, int], ...]
1810 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1811
1812 @overload
1813 def __getitem__(
1814 self, key: slice
1815 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1816
1817 def __getitem__(
1818 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]]
1819 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]:
1820 try:
1821 if isinstance(key, (tuple, slice)):
1822 if isinstance(key, slice):
1823 cols = (
1824 (sub_key, col)
1825 for (sub_key, col, _) in self._collection[key]
1826 )
1827 else:
1828 cols = (self._index[sub_key] for sub_key in key)
1829
1830 return ColumnCollection(cols).as_readonly()
1831 else:
1832 return self._index[key][1]
1833 except KeyError as err:
1834 if isinstance(err.args[0], int):
1835 raise IndexError(err.args[0]) from err
1836 else:
1837 raise
1838
1839 def __getattr__(self, key: str) -> _COL_co:
1840 try:
1841 return self._index[key][1]
1842 except KeyError as err:
1843 raise AttributeError(key) from err
1844
1845 def __contains__(self, key: str) -> bool:
1846 if key not in self._index:
1847 if not isinstance(key, str):
1848 raise exc.ArgumentError(
1849 "__contains__ requires a string argument"
1850 )
1851 return False
1852 else:
1853 return True
1854
1855 def compare(self, other: ColumnCollection[Any, Any]) -> bool:
1856 """Compare this :class:`_expression.ColumnCollection` to another
1857 based on the names of the keys"""
1858
1859 for l, r in zip_longest(self, other):
1860 if l is not r:
1861 return False
1862 else:
1863 return True
1864
1865 def __eq__(self, other: Any) -> bool:
1866 return self.compare(other)
1867
1868 @overload
1869 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ...
1870
1871 @overload
1872 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ...
1873
1874 def get(
1875 self, key: str, default: Optional[_COL] = None
1876 ) -> Optional[Union[_COL_co, _COL]]:
1877 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object
1878 based on a string key name from this
1879 :class:`_expression.ColumnCollection`."""
1880
1881 if key in self._index:
1882 return self._index[key][1]
1883 else:
1884 return default
1885
1886 def __str__(self) -> str:
1887 return "%s(%s)" % (
1888 self.__class__.__name__,
1889 ", ".join(str(c) for c in self),
1890 )
1891
1892 def __setitem__(self, key: str, value: Any) -> NoReturn:
1893 raise NotImplementedError()
1894
1895 def __delitem__(self, key: str) -> NoReturn:
1896 raise NotImplementedError()
1897
1898 def __setattr__(self, key: str, obj: Any) -> NoReturn:
1899 raise NotImplementedError()
1900
1901 def clear(self) -> NoReturn:
1902 """Dictionary clear() is not implemented for
1903 :class:`_sql.ColumnCollection`."""
1904 raise NotImplementedError()
1905
1906 def remove(self, column: Any) -> None:
1907 raise NotImplementedError()
1908
1909 def update(self, iter_: Any) -> NoReturn:
1910 """Dictionary update() is not implemented for
1911 :class:`_sql.ColumnCollection`."""
1912 raise NotImplementedError()
1913
1914 # https://github.com/python/mypy/issues/4266
1915 __hash__ = None # type: ignore
1916
1917 def _populate_separate_keys(
1918 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1919 ) -> None:
1920 """populate from an iterator of (key, column)"""
1921
1922 self._collection[:] = collection = [
1923 (k, c, _ColumnMetrics(self, c)) for k, c in iter_
1924 ]
1925 self._colset.update(c._deannotate() for _, c, _ in collection)
1926 self._index.update(
1927 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)}
1928 )
1929 self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
1930
1931 def add(
1932 self,
1933 column: ColumnElement[Any],
1934 key: Optional[_COLKEY] = None,
1935 ) -> None:
1936 """Add a column to this :class:`_sql.ColumnCollection`.
1937
1938 .. note::
1939
1940 This method is **not normally used by user-facing code**, as the
1941 :class:`_sql.ColumnCollection` is usually part of an existing
1942 object such as a :class:`_schema.Table`. To add a
1943 :class:`_schema.Column` to an existing :class:`_schema.Table`
1944 object, use the :meth:`_schema.Table.append_column` method.
1945
1946 """
1947 colkey: _COLKEY
1948
1949 if key is None:
1950 colkey = column.key # type: ignore
1951 else:
1952 colkey = key
1953
1954 l = len(self._collection)
1955
1956 # don't really know how this part is supposed to work w/ the
1957 # covariant thing
1958
1959 _column = cast(_COL_co, column)
1960
1961 self._collection.append(
1962 (colkey, _column, _ColumnMetrics(self, _column))
1963 )
1964 self._colset.add(_column._deannotate())
1965
1966 self._index[l] = (colkey, _column)
1967 if colkey not in self._index:
1968 self._index[colkey] = (colkey, _column)
1969
1970 def __getstate__(self) -> Dict[str, Any]:
1971 return {
1972 "_collection": [(k, c) for k, c, _ in self._collection],
1973 "_index": self._index,
1974 }
1975
1976 def __setstate__(self, state: Dict[str, Any]) -> None:
1977 object.__setattr__(self, "_index", state["_index"])
1978 object.__setattr__(
1979 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1980 )
1981 object.__setattr__(
1982 self,
1983 "_collection",
1984 [
1985 (k, c, _ColumnMetrics(self, c))
1986 for (k, c) in state["_collection"]
1987 ],
1988 )
1989 object.__setattr__(
1990 self, "_colset", {col for k, col, _ in self._collection}
1991 )
1992
1993 def contains_column(self, col: ColumnElement[Any]) -> bool:
1994 """Checks if a column object exists in this collection"""
1995 if col not in self._colset:
1996 if isinstance(col, str):
1997 raise exc.ArgumentError(
1998 "contains_column cannot be used with string arguments. "
1999 "Use ``col_name in table.c`` instead."
2000 )
2001 return False
2002 else:
2003 return True
2004
2005 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
2006 """Return a "read only" form of this
2007 :class:`_sql.ColumnCollection`."""
2008
2009 return ReadOnlyColumnCollection(self)
2010
2011 def _init_proxy_index(self):
2012 """populate the "proxy index", if empty.
2013
2014 proxy index is added in 2.0 to provide more efficient operation
2015 for the corresponding_column() method.
2016
2017 For reasons of both time to construct new .c collections as well as
2018 memory conservation for large numbers of large .c collections, the
2019 proxy_index is only filled if corresponding_column() is called. once
2020 filled it stays that way, and new _ColumnMetrics objects created after
2021 that point will populate it with new data. Note this case would be
2022 unusual, if not nonexistent, as it means a .c collection is being
2023 mutated after corresponding_column() were used, however it is tested in
2024 test/base/test_utils.py.
2025
2026 """
2027 pi = self._proxy_index
2028 if pi:
2029 return
2030
2031 for _, _, metrics in self._collection:
2032 eps = metrics.column._expanded_proxy_set
2033
2034 for eps_col in eps:
2035 pi[eps_col].add(metrics)
2036
2037 def corresponding_column(
2038 self, column: _COL, require_embedded: bool = False
2039 ) -> Optional[Union[_COL, _COL_co]]:
2040 """Given a :class:`_expression.ColumnElement`, return the exported
2041 :class:`_expression.ColumnElement` object from this
2042 :class:`_expression.ColumnCollection`
2043 which corresponds to that original :class:`_expression.ColumnElement`
2044 via a common
2045 ancestor column.
2046
2047 :param column: the target :class:`_expression.ColumnElement`
2048 to be matched.
2049
2050 :param require_embedded: only return corresponding columns for
2051 the given :class:`_expression.ColumnElement`, if the given
2052 :class:`_expression.ColumnElement`
2053 is actually present within a sub-element
2054 of this :class:`_expression.Selectable`.
2055 Normally the column will match if
2056 it merely shares a common ancestor with one of the exported
2057 columns of this :class:`_expression.Selectable`.
2058
2059 .. seealso::
2060
2061 :meth:`_expression.Selectable.corresponding_column`
2062 - invokes this method
2063 against the collection returned by
2064 :attr:`_expression.Selectable.exported_columns`.
2065
2066 .. versionchanged:: 1.4 the implementation for ``corresponding_column``
2067 was moved onto the :class:`_expression.ColumnCollection` itself.
2068
2069 """
2070 # TODO: cython candidate
2071
2072 # don't dig around if the column is locally present
2073 if column in self._colset:
2074 return column
2075
2076 selected_intersection, selected_metrics = None, None
2077 target_set = column.proxy_set
2078
2079 pi = self._proxy_index
2080 if not pi:
2081 self._init_proxy_index()
2082
2083 for current_metrics in (
2084 mm for ts in target_set if ts in pi for mm in pi[ts]
2085 ):
2086 if not require_embedded or current_metrics.embedded(target_set):
2087 if selected_metrics is None:
2088 # no corresponding column yet, pick this one.
2089 selected_metrics = current_metrics
2090 continue
2091
2092 current_intersection = target_set.intersection(
2093 current_metrics.column._expanded_proxy_set
2094 )
2095 if selected_intersection is None:
2096 selected_intersection = target_set.intersection(
2097 selected_metrics.column._expanded_proxy_set
2098 )
2099
2100 if len(current_intersection) > len(selected_intersection):
2101 # 'current' has a larger field of correspondence than
2102 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x
2103 # matches a1.c.x->table.c.x better than
2104 # selectable.c.x->table.c.x does.
2105
2106 selected_metrics = current_metrics
2107 selected_intersection = current_intersection
2108 elif current_intersection == selected_intersection:
2109 # they have the same field of correspondence. see
2110 # which proxy_set has fewer columns in it, which
2111 # indicates a closer relationship with the root
2112 # column. Also take into account the "weight"
2113 # attribute which CompoundSelect() uses to give
2114 # higher precedence to columns based on vertical
2115 # position in the compound statement, and discard
2116 # columns that have no reference to the target
2117 # column (also occurs with CompoundSelect)
2118
2119 selected_col_distance = sum(
2120 [
2121 sc._annotations.get("weight", 1)
2122 for sc in (
2123 selected_metrics.column._uncached_proxy_list()
2124 )
2125 if sc.shares_lineage(column)
2126 ],
2127 )
2128 current_col_distance = sum(
2129 [
2130 sc._annotations.get("weight", 1)
2131 for sc in (
2132 current_metrics.column._uncached_proxy_list()
2133 )
2134 if sc.shares_lineage(column)
2135 ],
2136 )
2137 if current_col_distance < selected_col_distance:
2138 selected_metrics = current_metrics
2139 selected_intersection = current_intersection
2140
2141 return selected_metrics.column if selected_metrics else None
2142
2143
2144_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]")
2145
2146
2147class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
2148 """A :class:`_expression.ColumnCollection`
2149 that maintains deduplicating behavior.
2150
2151 This is useful by schema level objects such as :class:`_schema.Table` and
2152 :class:`.PrimaryKeyConstraint`. The collection includes more
2153 sophisticated mutator methods as well to suit schema objects which
2154 require mutable column collections.
2155
2156 .. versionadded:: 1.4
2157
2158 """
2159
2160 def add( # type: ignore[override]
2161 self,
2162 column: _NAMEDCOL,
2163 key: Optional[str] = None,
2164 *,
2165 index: Optional[int] = None,
2166 ) -> None:
2167 if key is not None and column.key != key:
2168 raise exc.ArgumentError(
2169 "DedupeColumnCollection requires columns be under "
2170 "the same key as their .key"
2171 )
2172 key = column.key
2173
2174 if key is None:
2175 raise exc.ArgumentError(
2176 "Can't add unnamed column to column collection"
2177 )
2178
2179 if key in self._index:
2180 existing = self._index[key][1]
2181
2182 if existing is column:
2183 return
2184
2185 self.replace(column, index=index)
2186
2187 # pop out memoized proxy_set as this
2188 # operation may very well be occurring
2189 # in a _make_proxy operation
2190 util.memoized_property.reset(column, "proxy_set")
2191 else:
2192 self._append_new_column(key, column, index=index)
2193
2194 def _append_new_column(
2195 self, key: str, named_column: _NAMEDCOL, *, index: Optional[int] = None
2196 ) -> None:
2197 collection_length = len(self._collection)
2198
2199 if index is None:
2200 l = collection_length
2201 else:
2202 if index < 0:
2203 index = max(0, collection_length + index)
2204 l = index
2205
2206 if index is None:
2207 self._collection.append(
2208 (key, named_column, _ColumnMetrics(self, named_column))
2209 )
2210 else:
2211 self._collection.insert(
2212 index, (key, named_column, _ColumnMetrics(self, named_column))
2213 )
2214
2215 self._colset.add(named_column._deannotate())
2216
2217 if index is not None:
2218 for idx in reversed(range(index, collection_length)):
2219 self._index[idx + 1] = self._index[idx]
2220
2221 self._index[l] = (key, named_column)
2222 self._index[key] = (key, named_column)
2223
2224 def _populate_separate_keys(
2225 self, iter_: Iterable[Tuple[str, _NAMEDCOL]]
2226 ) -> None:
2227 """populate from an iterator of (key, column)"""
2228 cols = list(iter_)
2229
2230 replace_col = []
2231 for k, col in cols:
2232 if col.key != k:
2233 raise exc.ArgumentError(
2234 "DedupeColumnCollection requires columns be under "
2235 "the same key as their .key"
2236 )
2237 if col.name in self._index and col.key != col.name:
2238 replace_col.append(col)
2239 elif col.key in self._index:
2240 replace_col.append(col)
2241 else:
2242 self._index[k] = (k, col)
2243 self._collection.append((k, col, _ColumnMetrics(self, col)))
2244 self._colset.update(c._deannotate() for (k, c, _) in self._collection)
2245
2246 self._index.update(
2247 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection)
2248 )
2249 for col in replace_col:
2250 self.replace(col)
2251
2252 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None:
2253 self._populate_separate_keys((col.key, col) for col in iter_)
2254
2255 def remove(self, column: _NAMEDCOL) -> None:
2256 if column not in self._colset:
2257 raise ValueError(
2258 "Can't remove column %r; column is not in this collection"
2259 % column
2260 )
2261 del self._index[column.key]
2262 self._colset.remove(column)
2263 self._collection[:] = [
2264 (k, c, metrics)
2265 for (k, c, metrics) in self._collection
2266 if c is not column
2267 ]
2268 for metrics in self._proxy_index.get(column, ()):
2269 metrics.dispose(self)
2270
2271 self._index.update(
2272 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2273 )
2274 # delete higher index
2275 del self._index[len(self._collection)]
2276
2277 def replace(
2278 self,
2279 column: _NAMEDCOL,
2280 *,
2281 extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
2282 index: Optional[int] = None,
2283 ) -> None:
2284 """add the given column to this collection, removing unaliased
2285 versions of this column as well as existing columns with the
2286 same key.
2287
2288 e.g.::
2289
2290 t = Table("sometable", metadata, Column("col1", Integer))
2291 t.columns.replace(Column("col1", Integer, key="columnone"))
2292
2293 will remove the original 'col1' from the collection, and add
2294 the new column under the name 'columnname'.
2295
2296 Used by schema.Column to override columns during table reflection.
2297
2298 """
2299
2300 if extra_remove:
2301 remove_col = set(extra_remove)
2302 else:
2303 remove_col = set()
2304 # remove up to two columns based on matches of name as well as key
2305 if column.name in self._index and column.key != column.name:
2306 other = self._index[column.name][1]
2307 if other.name == other.key:
2308 remove_col.add(other)
2309
2310 if column.key in self._index:
2311 remove_col.add(self._index[column.key][1])
2312
2313 if not remove_col:
2314 self._append_new_column(column.key, column, index=index)
2315 return
2316 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
2317 replace_index = None
2318
2319 for idx, (k, col, metrics) in enumerate(self._collection):
2320 if col in remove_col:
2321 if replace_index is None:
2322 replace_index = idx
2323 new_cols.append(
2324 (column.key, column, _ColumnMetrics(self, column))
2325 )
2326 else:
2327 new_cols.append((k, col, metrics))
2328
2329 if remove_col:
2330 self._colset.difference_update(remove_col)
2331
2332 for rc in remove_col:
2333 for metrics in self._proxy_index.get(rc, ()):
2334 metrics.dispose(self)
2335
2336 if replace_index is None:
2337 if index is not None:
2338 new_cols.insert(
2339 index, (column.key, column, _ColumnMetrics(self, column))
2340 )
2341
2342 else:
2343 new_cols.append(
2344 (column.key, column, _ColumnMetrics(self, column))
2345 )
2346 elif index is not None:
2347 to_move = new_cols[replace_index]
2348 effective_positive_index = (
2349 index if index >= 0 else max(0, len(new_cols) + index)
2350 )
2351 new_cols.insert(index, to_move)
2352 if replace_index > effective_positive_index:
2353 del new_cols[replace_index + 1]
2354 else:
2355 del new_cols[replace_index]
2356
2357 self._colset.add(column._deannotate())
2358 self._collection[:] = new_cols
2359
2360 self._index.clear()
2361
2362 self._index.update(
2363 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2364 )
2365 self._index.update({k: (k, col) for (k, col, _) in self._collection})
2366
2367
2368class ReadOnlyColumnCollection(
2369 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co]
2370):
2371 __slots__ = ("_parent",)
2372
2373 def __init__(self, collection):
2374 object.__setattr__(self, "_parent", collection)
2375 object.__setattr__(self, "_colset", collection._colset)
2376 object.__setattr__(self, "_index", collection._index)
2377 object.__setattr__(self, "_collection", collection._collection)
2378 object.__setattr__(self, "_proxy_index", collection._proxy_index)
2379
2380 def __getstate__(self):
2381 return {"_parent": self._parent}
2382
2383 def __setstate__(self, state):
2384 parent = state["_parent"]
2385 self.__init__(parent) # type: ignore
2386
2387 def add(self, column: Any, key: Any = ...) -> Any:
2388 self._readonly()
2389
2390 def extend(self, elements: Any) -> NoReturn:
2391 self._readonly()
2392
2393 def remove(self, item: Any) -> NoReturn:
2394 self._readonly()
2395
2396
2397class ColumnSet(util.OrderedSet["ColumnClause[Any]"]):
2398 def contains_column(self, col):
2399 return col in self
2400
2401 def extend(self, cols):
2402 for col in cols:
2403 self.add(col)
2404
2405 def __eq__(self, other):
2406 l = []
2407 for c in other:
2408 for local in self:
2409 if c.shares_lineage(local):
2410 l.append(c == local)
2411 return elements.and_(*l)
2412
2413 def __hash__(self): # type: ignore[override]
2414 return hash(tuple(x for x in self))
2415
2416
2417def _entity_namespace(
2418 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2419) -> _EntityNamespace:
2420 """Return the nearest .entity_namespace for the given entity.
2421
2422 If not immediately available, does an iterate to find a sub-element
2423 that has one, if any.
2424
2425 """
2426 try:
2427 return cast(_HasEntityNamespace, entity).entity_namespace
2428 except AttributeError:
2429 for elem in visitors.iterate(cast(ExternallyTraversible, entity)):
2430 if _is_has_entity_namespace(elem):
2431 return elem.entity_namespace
2432 else:
2433 raise
2434
2435
2436def _entity_namespace_key(
2437 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2438 key: str,
2439 default: Union[SQLCoreOperations[Any], _NoArg] = NO_ARG,
2440) -> SQLCoreOperations[Any]:
2441 """Return an entry from an entity_namespace.
2442
2443
2444 Raises :class:`_exc.InvalidRequestError` rather than attribute error
2445 on not found.
2446
2447 """
2448
2449 try:
2450 ns = _entity_namespace(entity)
2451 if default is not NO_ARG:
2452 return getattr(ns, key, default)
2453 else:
2454 return getattr(ns, key) # type: ignore
2455 except AttributeError as err:
2456 raise exc.InvalidRequestError(
2457 'Entity namespace for "%s" has no property "%s"' % (entity, key)
2458 ) from err