1# sql/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Foundational utilities common to many sql modules.
10
11"""
12
13
14from __future__ import annotations
15
16import collections
17from enum import Enum
18import itertools
19from itertools import zip_longest
20import operator
21import re
22from typing import Any
23from typing import Callable
24from typing import cast
25from typing import Dict
26from typing import FrozenSet
27from typing import Generic
28from typing import Iterable
29from typing import Iterator
30from typing import List
31from typing import Mapping
32from typing import MutableMapping
33from typing import NamedTuple
34from typing import NoReturn
35from typing import Optional
36from typing import overload
37from typing import Sequence
38from typing import Set
39from typing import Tuple
40from typing import Type
41from typing import TYPE_CHECKING
42from typing import TypeVar
43from typing import Union
44
45from . import roles
46from . import visitors
47from .cache_key import HasCacheKey # noqa
48from .cache_key import MemoizedHasCacheKey # noqa
49from .traversals import HasCopyInternals # noqa
50from .visitors import ClauseVisitor
51from .visitors import ExtendedInternalTraversal
52from .visitors import ExternallyTraversible
53from .visitors import InternalTraversal
54from .. import event
55from .. import exc
56from .. import util
57from ..util import HasMemoized as HasMemoized
58from ..util import hybridmethod
59from ..util import typing as compat_typing
60from ..util.typing import Protocol
61from ..util.typing import Self
62from ..util.typing import TypeGuard
63
64if TYPE_CHECKING:
65 from . import coercions
66 from . import elements
67 from . import type_api
68 from ._orm_types import DMLStrategyArgument
69 from ._orm_types import SynchronizeSessionArgument
70 from ._typing import _CLE
71 from .compiler import SQLCompiler
72 from .elements import BindParameter
73 from .elements import ClauseList
74 from .elements import ColumnClause # noqa
75 from .elements import ColumnElement
76 from .elements import NamedColumn
77 from .elements import SQLCoreOperations
78 from .elements import TextClause
79 from .schema import Column
80 from .schema import DefaultGenerator
81 from .selectable import _JoinTargetElement
82 from .selectable import _SelectIterable
83 from .selectable import FromClause
84 from ..engine import Connection
85 from ..engine import CursorResult
86 from ..engine.interfaces import _CoreMultiExecuteParams
87 from ..engine.interfaces import _ExecuteOptions
88 from ..engine.interfaces import _ImmutableExecuteOptions
89 from ..engine.interfaces import CacheStats
90 from ..engine.interfaces import Compiled
91 from ..engine.interfaces import CompiledCacheType
92 from ..engine.interfaces import CoreExecuteOptionsParameter
93 from ..engine.interfaces import Dialect
94 from ..engine.interfaces import IsolationLevel
95 from ..engine.interfaces import SchemaTranslateMapType
96 from ..event import dispatcher
97
98if not TYPE_CHECKING:
99 coercions = None # noqa
100 elements = None # noqa
101 type_api = None # noqa
102
103
104class _NoArg(Enum):
105 NO_ARG = 0
106
107 def __repr__(self):
108 return f"_NoArg.{self.name}"
109
110
111NO_ARG = _NoArg.NO_ARG
112
113
114class _NoneName(Enum):
115 NONE_NAME = 0
116 """indicate a 'deferred' name that was ultimately the value None."""
117
118
119_NONE_NAME = _NoneName.NONE_NAME
120
121_T = TypeVar("_T", bound=Any)
122
123_Fn = TypeVar("_Fn", bound=Callable[..., Any])
124
125_AmbiguousTableNameMap = MutableMapping[str, str]
126
127
128class _DefaultDescriptionTuple(NamedTuple):
129 arg: Any
130 is_scalar: Optional[bool]
131 is_callable: Optional[bool]
132 is_sentinel: Optional[bool]
133
134 @classmethod
135 def _from_column_default(
136 cls, default: Optional[DefaultGenerator]
137 ) -> _DefaultDescriptionTuple:
138 return (
139 _DefaultDescriptionTuple(
140 default.arg, # type: ignore
141 default.is_scalar,
142 default.is_callable,
143 default.is_sentinel,
144 )
145 if default
146 and (
147 default.has_arg
148 or (not default.for_update and default.is_sentinel)
149 )
150 else _DefaultDescriptionTuple(None, None, None, None)
151 )
152
153
154_never_select_column = operator.attrgetter("_omit_from_statements")
155
156
157class _EntityNamespace(Protocol):
158 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ...
159
160
161class _HasEntityNamespace(Protocol):
162 @util.ro_non_memoized_property
163 def entity_namespace(self) -> _EntityNamespace: ...
164
165
166def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]:
167 return hasattr(element, "entity_namespace")
168
169
170# Remove when https://github.com/python/mypy/issues/14640 will be fixed
171_Self = TypeVar("_Self", bound=Any)
172
173
174class Immutable:
175 """mark a ClauseElement as 'immutable' when expressions are cloned.
176
177 "immutable" objects refers to the "mutability" of an object in the
178 context of SQL DQL and DML generation. Such as, in DQL, one can
179 compose a SELECT or subquery of varied forms, but one cannot modify
180 the structure of a specific table or column within DQL.
181 :class:`.Immutable` is mostly intended to follow this concept, and as
182 such the primary "immutable" objects are :class:`.ColumnClause`,
183 :class:`.Column`, :class:`.TableClause`, :class:`.Table`.
184
185 """
186
187 __slots__ = ()
188
189 _is_immutable = True
190
191 def unique_params(self, *optionaldict, **kwargs):
192 raise NotImplementedError("Immutable objects do not support copying")
193
194 def params(self, *optionaldict, **kwargs):
195 raise NotImplementedError("Immutable objects do not support copying")
196
197 def _clone(self: _Self, **kw: Any) -> _Self:
198 return self
199
200 def _copy_internals(
201 self, *, omit_attrs: Iterable[str] = (), **kw: Any
202 ) -> None:
203 pass
204
205
206class SingletonConstant(Immutable):
207 """Represent SQL constants like NULL, TRUE, FALSE"""
208
209 _is_singleton_constant = True
210
211 _singleton: SingletonConstant
212
213 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T:
214 return cast(_T, cls._singleton)
215
216 @util.non_memoized_property
217 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
218 raise NotImplementedError()
219
220 @classmethod
221 def _create_singleton(cls):
222 obj = object.__new__(cls)
223 obj.__init__() # type: ignore
224
225 # for a long time this was an empty frozenset, meaning
226 # a SingletonConstant would never be a "corresponding column" in
227 # a statement. This referred to #6259. However, in #7154 we see
228 # that we do in fact need "correspondence" to work when matching cols
229 # in result sets, so the non-correspondence was moved to a more
230 # specific level when we are actually adapting expressions for SQL
231 # render only.
232 obj.proxy_set = frozenset([obj])
233 cls._singleton = obj
234
235
236def _from_objects(
237 *elements: Union[
238 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement
239 ]
240) -> Iterator[FromClause]:
241 return itertools.chain.from_iterable(
242 [element._from_objects for element in elements]
243 )
244
245
246def _select_iterables(
247 elements: Iterable[roles.ColumnsClauseRole],
248) -> _SelectIterable:
249 """expand tables into individual columns in the
250 given list of column expressions.
251
252 """
253 return itertools.chain.from_iterable(
254 [c._select_iterable for c in elements]
255 )
256
257
258_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType")
259
260
261class _GenerativeType(compat_typing.Protocol):
262 def _generate(self) -> Self: ...
263
264
265def _generative(fn: _Fn) -> _Fn:
266 """non-caching _generative() decorator.
267
268 This is basically the legacy decorator that copies the object and
269 runs a method on the new copy.
270
271 """
272
273 @util.decorator
274 def _generative(
275 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any
276 ) -> _SelfGenerativeType:
277 """Mark a method as generative."""
278
279 self = self._generate()
280 x = fn(self, *args, **kw)
281 assert x is self, "generative methods must return self"
282 return self
283
284 decorated = _generative(fn)
285 decorated.non_generative = fn # type: ignore
286 return decorated
287
288
289def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]:
290 msgs = kw.pop("msgs", {})
291
292 defaults = kw.pop("defaults", {})
293
294 getters = [
295 (name, operator.attrgetter(name), defaults.get(name, None))
296 for name in names
297 ]
298
299 @util.decorator
300 def check(fn, *args, **kw):
301 # make pylance happy by not including "self" in the argument
302 # list
303 self = args[0]
304 args = args[1:]
305 for name, getter, default_ in getters:
306 if getter(self) is not default_:
307 msg = msgs.get(
308 name,
309 "Method %s() has already been invoked on this %s construct"
310 % (fn.__name__, self.__class__),
311 )
312 raise exc.InvalidRequestError(msg)
313 return fn(self, *args, **kw)
314
315 return check
316
317
318def _clone(element, **kw):
319 return element._clone(**kw)
320
321
322def _expand_cloned(
323 elements: Iterable[_CLE],
324) -> Iterable[_CLE]:
325 """expand the given set of ClauseElements to be the set of all 'cloned'
326 predecessors.
327
328 """
329 # TODO: cython candidate
330 return itertools.chain(*[x._cloned_set for x in elements])
331
332
333def _de_clone(
334 elements: Iterable[_CLE],
335) -> Iterable[_CLE]:
336 for x in elements:
337 while x._is_clone_of is not None:
338 x = x._is_clone_of
339 yield x
340
341
342def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
343 """return the intersection of sets a and b, counting
344 any overlap between 'cloned' predecessors.
345
346 The returned set is in terms of the entities present within 'a'.
347
348 """
349 all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
350 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)}
351
352
353def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
354 all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
355 return {
356 elem for elem in a if not all_overlap.intersection(elem._cloned_set)
357 }
358
359
360class _DialectArgView(MutableMapping[str, Any]):
361 """A dictionary view of dialect-level arguments in the form
362 <dialectname>_<argument_name>.
363
364 """
365
366 __slots__ = ("obj",)
367
368 def __init__(self, obj):
369 self.obj = obj
370
371 def _key(self, key):
372 try:
373 dialect, value_key = key.split("_", 1)
374 except ValueError as err:
375 raise KeyError(key) from err
376 else:
377 return dialect, value_key
378
379 def __getitem__(self, key):
380 dialect, value_key = self._key(key)
381
382 try:
383 opt = self.obj.dialect_options[dialect]
384 except exc.NoSuchModuleError as err:
385 raise KeyError(key) from err
386 else:
387 return opt[value_key]
388
389 def __setitem__(self, key, value):
390 try:
391 dialect, value_key = self._key(key)
392 except KeyError as err:
393 raise exc.ArgumentError(
394 "Keys must be of the form <dialectname>_<argname>"
395 ) from err
396 else:
397 self.obj.dialect_options[dialect][value_key] = value
398
399 def __delitem__(self, key):
400 dialect, value_key = self._key(key)
401 del self.obj.dialect_options[dialect][value_key]
402
403 def __len__(self):
404 return sum(
405 len(args._non_defaults)
406 for args in self.obj.dialect_options.values()
407 )
408
409 def __iter__(self):
410 return (
411 "%s_%s" % (dialect_name, value_name)
412 for dialect_name in self.obj.dialect_options
413 for value_name in self.obj.dialect_options[
414 dialect_name
415 ]._non_defaults
416 )
417
418
419class _DialectArgDict(MutableMapping[str, Any]):
420 """A dictionary view of dialect-level arguments for a specific
421 dialect.
422
423 Maintains a separate collection of user-specified arguments
424 and dialect-specified default arguments.
425
426 """
427
428 def __init__(self):
429 self._non_defaults = {}
430 self._defaults = {}
431
432 def __len__(self):
433 return len(set(self._non_defaults).union(self._defaults))
434
435 def __iter__(self):
436 return iter(set(self._non_defaults).union(self._defaults))
437
438 def __getitem__(self, key):
439 if key in self._non_defaults:
440 return self._non_defaults[key]
441 else:
442 return self._defaults[key]
443
444 def __setitem__(self, key, value):
445 self._non_defaults[key] = value
446
447 def __delitem__(self, key):
448 del self._non_defaults[key]
449
450
451@util.preload_module("sqlalchemy.dialects")
452def _kw_reg_for_dialect(dialect_name):
453 dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
454 if dialect_cls.construct_arguments is None:
455 return None
456 return dict(dialect_cls.construct_arguments)
457
458
459class DialectKWArgs:
460 """Establish the ability for a class to have dialect-specific arguments
461 with defaults and constructor validation.
462
463 The :class:`.DialectKWArgs` interacts with the
464 :attr:`.DefaultDialect.construct_arguments` present on a dialect.
465
466 .. seealso::
467
468 :attr:`.DefaultDialect.construct_arguments`
469
470 """
471
472 __slots__ = ()
473
474 _dialect_kwargs_traverse_internals = [
475 ("dialect_options", InternalTraversal.dp_dialect_options)
476 ]
477
478 @classmethod
479 def argument_for(cls, dialect_name, argument_name, default):
480 """Add a new kind of dialect-specific keyword argument for this class.
481
482 E.g.::
483
484 Index.argument_for("mydialect", "length", None)
485
486 some_index = Index("a", "b", mydialect_length=5)
487
488 The :meth:`.DialectKWArgs.argument_for` method is a per-argument
489 way adding extra arguments to the
490 :attr:`.DefaultDialect.construct_arguments` dictionary. This
491 dictionary provides a list of argument names accepted by various
492 schema-level constructs on behalf of a dialect.
493
494 New dialects should typically specify this dictionary all at once as a
495 data member of the dialect class. The use case for ad-hoc addition of
496 argument names is typically for end-user code that is also using
497 a custom compilation scheme which consumes the additional arguments.
498
499 :param dialect_name: name of a dialect. The dialect must be
500 locatable, else a :class:`.NoSuchModuleError` is raised. The
501 dialect must also include an existing
502 :attr:`.DefaultDialect.construct_arguments` collection, indicating
503 that it participates in the keyword-argument validation and default
504 system, else :class:`.ArgumentError` is raised. If the dialect does
505 not include this collection, then any keyword argument can be
506 specified on behalf of this dialect already. All dialects packaged
507 within SQLAlchemy include this collection, however for third party
508 dialects, support may vary.
509
510 :param argument_name: name of the parameter.
511
512 :param default: default value of the parameter.
513
514 """
515
516 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
517 if construct_arg_dictionary is None:
518 raise exc.ArgumentError(
519 "Dialect '%s' does have keyword-argument "
520 "validation and defaults enabled configured" % dialect_name
521 )
522 if cls not in construct_arg_dictionary:
523 construct_arg_dictionary[cls] = {}
524 construct_arg_dictionary[cls][argument_name] = default
525
526 @property
527 def dialect_kwargs(self):
528 """A collection of keyword arguments specified as dialect-specific
529 options to this construct.
530
531 The arguments are present here in their original ``<dialect>_<kwarg>``
532 format. Only arguments that were actually passed are included;
533 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which
534 contains all options known by this dialect including defaults.
535
536 The collection is also writable; keys are accepted of the
537 form ``<dialect>_<kwarg>`` where the value will be assembled
538 into the list of options.
539
540 .. seealso::
541
542 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form
543
544 """
545 return _DialectArgView(self)
546
547 @property
548 def kwargs(self):
549 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
550 return self.dialect_kwargs
551
552 _kw_registry = util.PopulateDict(_kw_reg_for_dialect)
553
554 @classmethod
555 def _kw_reg_for_dialect_cls(cls, dialect_name):
556 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
557 d = _DialectArgDict()
558
559 if construct_arg_dictionary is None:
560 d._defaults.update({"*": None})
561 else:
562 for cls in reversed(cls.__mro__):
563 if cls in construct_arg_dictionary:
564 d._defaults.update(construct_arg_dictionary[cls])
565 return d
566
567 @util.memoized_property
568 def dialect_options(self):
569 """A collection of keyword arguments specified as dialect-specific
570 options to this construct.
571
572 This is a two-level nested registry, keyed to ``<dialect_name>``
573 and ``<argument_name>``. For example, the ``postgresql_where``
574 argument would be locatable as::
575
576 arg = my_object.dialect_options["postgresql"]["where"]
577
578 .. versionadded:: 0.9.2
579
580 .. seealso::
581
582 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form
583
584 """
585
586 return util.PopulateDict(self._kw_reg_for_dialect_cls)
587
588 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None:
589 # validate remaining kwargs that they all specify DB prefixes
590
591 if not kwargs:
592 return
593
594 for k in kwargs:
595 m = re.match("^(.+?)_(.+)$", k)
596 if not m:
597 raise TypeError(
598 "Additional arguments should be "
599 "named <dialectname>_<argument>, got '%s'" % k
600 )
601 dialect_name, arg_name = m.group(1, 2)
602
603 try:
604 construct_arg_dictionary = self.dialect_options[dialect_name]
605 except exc.NoSuchModuleError:
606 util.warn(
607 "Can't validate argument %r; can't "
608 "locate any SQLAlchemy dialect named %r"
609 % (k, dialect_name)
610 )
611 self.dialect_options[dialect_name] = d = _DialectArgDict()
612 d._defaults.update({"*": None})
613 d._non_defaults[arg_name] = kwargs[k]
614 else:
615 if (
616 "*" not in construct_arg_dictionary
617 and arg_name not in construct_arg_dictionary
618 ):
619 raise exc.ArgumentError(
620 "Argument %r is not accepted by "
621 "dialect %r on behalf of %r"
622 % (k, dialect_name, self.__class__)
623 )
624 else:
625 construct_arg_dictionary[arg_name] = kwargs[k]
626
627
628class CompileState:
629 """Produces additional object state necessary for a statement to be
630 compiled.
631
632 the :class:`.CompileState` class is at the base of classes that assemble
633 state for a particular statement object that is then used by the
634 compiler. This process is essentially an extension of the process that
635 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis
636 on converting raw user intent into more organized structures rather than
637 producing string output. The top-level :class:`.CompileState` for the
638 statement being executed is also accessible when the execution context
639 works with invoking the statement and collecting results.
640
641 The production of :class:`.CompileState` is specific to the compiler, such
642 as within the :meth:`.SQLCompiler.visit_insert`,
643 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also
644 responsible for associating the :class:`.CompileState` with the
645 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement,
646 i.e. the outermost SQL statement that's actually being executed.
647 There can be other :class:`.CompileState` objects that are not the
648 toplevel, such as when a SELECT subquery or CTE-nested
649 INSERT/UPDATE/DELETE is generated.
650
651 .. versionadded:: 1.4
652
653 """
654
655 __slots__ = ("statement", "_ambiguous_table_name_map")
656
657 plugins: Dict[Tuple[str, str], Type[CompileState]] = {}
658
659 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap]
660
661 @classmethod
662 def create_for_statement(
663 cls, statement: Executable, compiler: SQLCompiler, **kw: Any
664 ) -> CompileState:
665 # factory construction.
666
667 if statement._propagate_attrs:
668 plugin_name = statement._propagate_attrs.get(
669 "compile_state_plugin", "default"
670 )
671 klass = cls.plugins.get(
672 (plugin_name, statement._effective_plugin_target), None
673 )
674 if klass is None:
675 klass = cls.plugins[
676 ("default", statement._effective_plugin_target)
677 ]
678
679 else:
680 klass = cls.plugins[
681 ("default", statement._effective_plugin_target)
682 ]
683
684 if klass is cls:
685 return cls(statement, compiler, **kw)
686 else:
687 return klass.create_for_statement(statement, compiler, **kw)
688
689 def __init__(self, statement, compiler, **kw):
690 self.statement = statement
691
692 @classmethod
693 def get_plugin_class(
694 cls, statement: Executable
695 ) -> Optional[Type[CompileState]]:
696 plugin_name = statement._propagate_attrs.get(
697 "compile_state_plugin", None
698 )
699
700 if plugin_name:
701 key = (plugin_name, statement._effective_plugin_target)
702 if key in cls.plugins:
703 return cls.plugins[key]
704
705 # there's no case where we call upon get_plugin_class() and want
706 # to get None back, there should always be a default. return that
707 # if there was no plugin-specific class (e.g. Insert with "orm"
708 # plugin)
709 try:
710 return cls.plugins[("default", statement._effective_plugin_target)]
711 except KeyError:
712 return None
713
714 @classmethod
715 def _get_plugin_class_for_plugin(
716 cls, statement: Executable, plugin_name: str
717 ) -> Optional[Type[CompileState]]:
718 try:
719 return cls.plugins[
720 (plugin_name, statement._effective_plugin_target)
721 ]
722 except KeyError:
723 return None
724
725 @classmethod
726 def plugin_for(
727 cls, plugin_name: str, visit_name: str
728 ) -> Callable[[_Fn], _Fn]:
729 def decorate(cls_to_decorate):
730 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate
731 return cls_to_decorate
732
733 return decorate
734
735
736class Generative(HasMemoized):
737 """Provide a method-chaining pattern in conjunction with the
738 @_generative decorator."""
739
740 def _generate(self) -> Self:
741 skip = self._memoized_keys
742 cls = self.__class__
743 s = cls.__new__(cls)
744 if skip:
745 # ensure this iteration remains atomic
746 s.__dict__ = {
747 k: v for k, v in self.__dict__.copy().items() if k not in skip
748 }
749 else:
750 s.__dict__ = self.__dict__.copy()
751 return s
752
753
754class InPlaceGenerative(HasMemoized):
755 """Provide a method-chaining pattern in conjunction with the
756 @_generative decorator that mutates in place."""
757
758 __slots__ = ()
759
760 def _generate(self):
761 skip = self._memoized_keys
762 # note __dict__ needs to be in __slots__ if this is used
763 for k in skip:
764 self.__dict__.pop(k, None)
765 return self
766
767
768class HasCompileState(Generative):
769 """A class that has a :class:`.CompileState` associated with it."""
770
771 _compile_state_plugin: Optional[Type[CompileState]] = None
772
773 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT
774
775 _compile_state_factory = CompileState.create_for_statement
776
777
778class _MetaOptions(type):
779 """metaclass for the Options class.
780
781 This metaclass is actually necessary despite the availability of the
782 ``__init_subclass__()`` hook as this type also provides custom class-level
783 behavior for the ``__add__()`` method.
784
785 """
786
787 _cache_attrs: Tuple[str, ...]
788
789 def __add__(self, other):
790 o1 = self()
791
792 if set(other).difference(self._cache_attrs):
793 raise TypeError(
794 "dictionary contains attributes not covered by "
795 "Options class %s: %r"
796 % (self, set(other).difference(self._cache_attrs))
797 )
798
799 o1.__dict__.update(other)
800 return o1
801
802 if TYPE_CHECKING:
803
804 def __getattr__(self, key: str) -> Any: ...
805
806 def __setattr__(self, key: str, value: Any) -> None: ...
807
808 def __delattr__(self, key: str) -> None: ...
809
810
811class Options(metaclass=_MetaOptions):
812 """A cacheable option dictionary with defaults."""
813
814 __slots__ = ()
815
816 _cache_attrs: Tuple[str, ...]
817
818 def __init_subclass__(cls) -> None:
819 dict_ = cls.__dict__
820 cls._cache_attrs = tuple(
821 sorted(
822 d
823 for d in dict_
824 if not d.startswith("__")
825 and d not in ("_cache_key_traversal",)
826 )
827 )
828 super().__init_subclass__()
829
830 def __init__(self, **kw):
831 self.__dict__.update(kw)
832
833 def __add__(self, other):
834 o1 = self.__class__.__new__(self.__class__)
835 o1.__dict__.update(self.__dict__)
836
837 if set(other).difference(self._cache_attrs):
838 raise TypeError(
839 "dictionary contains attributes not covered by "
840 "Options class %s: %r"
841 % (self, set(other).difference(self._cache_attrs))
842 )
843
844 o1.__dict__.update(other)
845 return o1
846
847 def __eq__(self, other):
848 # TODO: very inefficient. This is used only in test suites
849 # right now.
850 for a, b in zip_longest(self._cache_attrs, other._cache_attrs):
851 if getattr(self, a) != getattr(other, b):
852 return False
853 return True
854
855 def __repr__(self):
856 # TODO: fairly inefficient, used only in debugging right now.
857
858 return "%s(%s)" % (
859 self.__class__.__name__,
860 ", ".join(
861 "%s=%r" % (k, self.__dict__[k])
862 for k in self._cache_attrs
863 if k in self.__dict__
864 ),
865 )
866
867 @classmethod
868 def isinstance(cls, klass: Type[Any]) -> bool:
869 return issubclass(cls, klass)
870
871 @hybridmethod
872 def add_to_element(self, name, value):
873 return self + {name: getattr(self, name) + value}
874
875 @hybridmethod
876 def _state_dict_inst(self) -> Mapping[str, Any]:
877 return self.__dict__
878
879 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT
880
881 @_state_dict_inst.classlevel
882 def _state_dict(cls) -> Mapping[str, Any]:
883 return cls._state_dict_const
884
885 @classmethod
886 def safe_merge(cls, other):
887 d = other._state_dict()
888
889 # only support a merge with another object of our class
890 # and which does not have attrs that we don't. otherwise
891 # we risk having state that might not be part of our cache
892 # key strategy
893
894 if (
895 cls is not other.__class__
896 and other._cache_attrs
897 and set(other._cache_attrs).difference(cls._cache_attrs)
898 ):
899 raise TypeError(
900 "other element %r is not empty, is not of type %s, "
901 "and contains attributes not covered here %r"
902 % (
903 other,
904 cls,
905 set(other._cache_attrs).difference(cls._cache_attrs),
906 )
907 )
908 return cls + d
909
910 @classmethod
911 def from_execution_options(
912 cls, key, attrs, exec_options, statement_exec_options
913 ):
914 """process Options argument in terms of execution options.
915
916
917 e.g.::
918
919 (
920 load_options,
921 execution_options,
922 ) = QueryContext.default_load_options.from_execution_options(
923 "_sa_orm_load_options",
924 {"populate_existing", "autoflush", "yield_per"},
925 execution_options,
926 statement._execution_options,
927 )
928
929 get back the Options and refresh "_sa_orm_load_options" in the
930 exec options dict w/ the Options as well
931
932 """
933
934 # common case is that no options we are looking for are
935 # in either dictionary, so cancel for that first
936 check_argnames = attrs.intersection(
937 set(exec_options).union(statement_exec_options)
938 )
939
940 existing_options = exec_options.get(key, cls)
941
942 if check_argnames:
943 result = {}
944 for argname in check_argnames:
945 local = "_" + argname
946 if argname in exec_options:
947 result[local] = exec_options[argname]
948 elif argname in statement_exec_options:
949 result[local] = statement_exec_options[argname]
950
951 new_options = existing_options + result
952 exec_options = util.immutabledict().merge_with(
953 exec_options, {key: new_options}
954 )
955 return new_options, exec_options
956
957 else:
958 return existing_options, exec_options
959
960 if TYPE_CHECKING:
961
962 def __getattr__(self, key: str) -> Any: ...
963
964 def __setattr__(self, key: str, value: Any) -> None: ...
965
966 def __delattr__(self, key: str) -> None: ...
967
968
969class CacheableOptions(Options, HasCacheKey):
970 __slots__ = ()
971
972 @hybridmethod
973 def _gen_cache_key_inst(self, anon_map, bindparams):
974 return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
975
976 @_gen_cache_key_inst.classlevel
977 def _gen_cache_key(cls, anon_map, bindparams):
978 return (cls, ())
979
980 @hybridmethod
981 def _generate_cache_key(self):
982 return HasCacheKey._generate_cache_key_for_object(self)
983
984
985class ExecutableOption(HasCopyInternals):
986 __slots__ = ()
987
988 _annotations = util.EMPTY_DICT
989
990 __visit_name__ = "executable_option"
991
992 _is_has_cache_key = False
993
994 _is_core = True
995
996 def _clone(self, **kw):
997 """Create a shallow copy of this ExecutableOption."""
998 c = self.__class__.__new__(self.__class__)
999 c.__dict__ = dict(self.__dict__) # type: ignore
1000 return c
1001
1002
1003class Executable(roles.StatementRole):
1004 """Mark a :class:`_expression.ClauseElement` as supporting execution.
1005
1006 :class:`.Executable` is a superclass for all "statement" types
1007 of objects, including :func:`select`, :func:`delete`, :func:`update`,
1008 :func:`insert`, :func:`text`.
1009
1010 """
1011
1012 supports_execution: bool = True
1013 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT
1014 _is_default_generator = False
1015 _with_options: Tuple[ExecutableOption, ...] = ()
1016 _with_context_options: Tuple[
1017 Tuple[Callable[[CompileState], None], Any], ...
1018 ] = ()
1019 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]]
1020
1021 _executable_traverse_internals = [
1022 ("_with_options", InternalTraversal.dp_executable_options),
1023 (
1024 "_with_context_options",
1025 ExtendedInternalTraversal.dp_with_context_options,
1026 ),
1027 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
1028 ]
1029
1030 is_select = False
1031 is_from_statement = False
1032 is_update = False
1033 is_insert = False
1034 is_text = False
1035 is_delete = False
1036 is_dml = False
1037
1038 if TYPE_CHECKING:
1039 __visit_name__: str
1040
1041 def _compile_w_cache(
1042 self,
1043 dialect: Dialect,
1044 *,
1045 compiled_cache: Optional[CompiledCacheType],
1046 column_keys: List[str],
1047 for_executemany: bool = False,
1048 schema_translate_map: Optional[SchemaTranslateMapType] = None,
1049 **kw: Any,
1050 ) -> Tuple[
1051 Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats
1052 ]: ...
1053
1054 def _execute_on_connection(
1055 self,
1056 connection: Connection,
1057 distilled_params: _CoreMultiExecuteParams,
1058 execution_options: CoreExecuteOptionsParameter,
1059 ) -> CursorResult[Any]: ...
1060
1061 def _execute_on_scalar(
1062 self,
1063 connection: Connection,
1064 distilled_params: _CoreMultiExecuteParams,
1065 execution_options: CoreExecuteOptionsParameter,
1066 ) -> Any: ...
1067
1068 @util.ro_non_memoized_property
1069 def _all_selected_columns(self):
1070 raise NotImplementedError()
1071
1072 @property
1073 def _effective_plugin_target(self) -> str:
1074 return self.__visit_name__
1075
1076 @_generative
1077 def options(self, *options: ExecutableOption) -> Self:
1078 """Apply options to this statement.
1079
1080 In the general sense, options are any kind of Python object
1081 that can be interpreted by the SQL compiler for the statement.
1082 These options can be consumed by specific dialects or specific kinds
1083 of compilers.
1084
1085 The most commonly known kind of option are the ORM level options
1086 that apply "eager load" and other loading behaviors to an ORM
1087 query. However, options can theoretically be used for many other
1088 purposes.
1089
1090 For background on specific kinds of options for specific kinds of
1091 statements, refer to the documentation for those option objects.
1092
1093 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to
1094 Core statement objects towards the goal of allowing unified
1095 Core / ORM querying capabilities.
1096
1097 .. seealso::
1098
1099 :ref:`loading_columns` - refers to options specific to the usage
1100 of ORM queries
1101
1102 :ref:`relationship_loader_options` - refers to options specific
1103 to the usage of ORM queries
1104
1105 """
1106 self._with_options += tuple(
1107 coercions.expect(roles.ExecutableOptionRole, opt)
1108 for opt in options
1109 )
1110 return self
1111
1112 @_generative
1113 def _set_compile_options(self, compile_options: CacheableOptions) -> Self:
1114 """Assign the compile options to a new value.
1115
1116 :param compile_options: appropriate CacheableOptions structure
1117
1118 """
1119
1120 self._compile_options = compile_options
1121 return self
1122
1123 @_generative
1124 def _update_compile_options(self, options: CacheableOptions) -> Self:
1125 """update the _compile_options with new keys."""
1126
1127 assert self._compile_options is not None
1128 self._compile_options += options
1129 return self
1130
1131 @_generative
1132 def _add_context_option(
1133 self,
1134 callable_: Callable[[CompileState], None],
1135 cache_args: Any,
1136 ) -> Self:
1137 """Add a context option to this statement.
1138
1139 These are callable functions that will
1140 be given the CompileState object upon compilation.
1141
1142 A second argument cache_args is required, which will be combined with
1143 the ``__code__`` identity of the function itself in order to produce a
1144 cache key.
1145
1146 """
1147 self._with_context_options += ((callable_, cache_args),)
1148 return self
1149
1150 @overload
1151 def execution_options(
1152 self,
1153 *,
1154 compiled_cache: Optional[CompiledCacheType] = ...,
1155 logging_token: str = ...,
1156 isolation_level: IsolationLevel = ...,
1157 no_parameters: bool = False,
1158 stream_results: bool = False,
1159 max_row_buffer: int = ...,
1160 yield_per: int = ...,
1161 insertmanyvalues_page_size: int = ...,
1162 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
1163 populate_existing: bool = False,
1164 autoflush: bool = False,
1165 synchronize_session: SynchronizeSessionArgument = ...,
1166 dml_strategy: DMLStrategyArgument = ...,
1167 render_nulls: bool = ...,
1168 is_delete_using: bool = ...,
1169 is_update_from: bool = ...,
1170 preserve_rowcount: bool = False,
1171 **opt: Any,
1172 ) -> Self: ...
1173
1174 @overload
1175 def execution_options(self, **opt: Any) -> Self: ...
1176
1177 @_generative
1178 def execution_options(self, **kw: Any) -> Self:
1179 """Set non-SQL options for the statement which take effect during
1180 execution.
1181
1182 Execution options can be set at many scopes, including per-statement,
1183 per-connection, or per execution, using methods such as
1184 :meth:`_engine.Connection.execution_options` and parameters which
1185 accept a dictionary of options such as
1186 :paramref:`_engine.Connection.execute.execution_options` and
1187 :paramref:`_orm.Session.execute.execution_options`.
1188
1189 The primary characteristic of an execution option, as opposed to
1190 other kinds of options such as ORM loader options, is that
1191 **execution options never affect the compiled SQL of a query, only
1192 things that affect how the SQL statement itself is invoked or how
1193 results are fetched**. That is, execution options are not part of
1194 what's accommodated by SQL compilation nor are they considered part of
1195 the cached state of a statement.
1196
1197 The :meth:`_sql.Executable.execution_options` method is
1198 :term:`generative`, as
1199 is the case for the method as applied to the :class:`_engine.Engine`
1200 and :class:`_orm.Query` objects, which means when the method is called,
1201 a copy of the object is returned, which applies the given parameters to
1202 that new copy, but leaves the original unchanged::
1203
1204 statement = select(table.c.x, table.c.y)
1205 new_statement = statement.execution_options(my_option=True)
1206
1207 An exception to this behavior is the :class:`_engine.Connection`
1208 object, where the :meth:`_engine.Connection.execution_options` method
1209 is explicitly **not** generative.
1210
1211 The kinds of options that may be passed to
1212 :meth:`_sql.Executable.execution_options` and other related methods and
1213 parameter dictionaries include parameters that are explicitly consumed
1214 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not
1215 defined by SQLAlchemy, which means the methods and/or parameter
1216 dictionaries may be used for user-defined parameters that interact with
1217 custom code, which may access the parameters using methods such as
1218 :meth:`_sql.Executable.get_execution_options` and
1219 :meth:`_engine.Connection.get_execution_options`, or within selected
1220 event hooks using a dedicated ``execution_options`` event parameter
1221 such as
1222 :paramref:`_events.ConnectionEvents.before_execute.execution_options`
1223 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.::
1224
1225 from sqlalchemy import event
1226
1227
1228 @event.listens_for(some_engine, "before_execute")
1229 def _process_opt(conn, statement, multiparams, params, execution_options):
1230 "run a SQL function before invoking a statement"
1231
1232 if execution_options.get("do_special_thing", False):
1233 conn.exec_driver_sql("run_special_function()")
1234
1235 Within the scope of options that are explicitly recognized by
1236 SQLAlchemy, most apply to specific classes of objects and not others.
1237 The most common execution options include:
1238
1239 * :paramref:`_engine.Connection.execution_options.isolation_level` -
1240 sets the isolation level for a connection or a class of connections
1241 via an :class:`_engine.Engine`. This option is accepted only
1242 by :class:`_engine.Connection` or :class:`_engine.Engine`.
1243
1244 * :paramref:`_engine.Connection.execution_options.stream_results` -
1245 indicates results should be fetched using a server side cursor;
1246 this option is accepted by :class:`_engine.Connection`, by the
1247 :paramref:`_engine.Connection.execute.execution_options` parameter
1248 on :meth:`_engine.Connection.execute`, and additionally by
1249 :meth:`_sql.Executable.execution_options` on a SQL statement object,
1250 as well as by ORM constructs like :meth:`_orm.Session.execute`.
1251
1252 * :paramref:`_engine.Connection.execution_options.compiled_cache` -
1253 indicates a dictionary that will serve as the
1254 :ref:`SQL compilation cache <sql_caching>`
1255 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as
1256 well as for ORM methods like :meth:`_orm.Session.execute`.
1257 Can be passed as ``None`` to disable caching for statements.
1258 This option is not accepted by
1259 :meth:`_sql.Executable.execution_options` as it is inadvisable to
1260 carry along a compilation cache within a statement object.
1261
1262 * :paramref:`_engine.Connection.execution_options.schema_translate_map`
1263 - a mapping of schema names used by the
1264 :ref:`Schema Translate Map <schema_translating>` feature, accepted
1265 by :class:`_engine.Connection`, :class:`_engine.Engine`,
1266 :class:`_sql.Executable`, as well as by ORM constructs
1267 like :meth:`_orm.Session.execute`.
1268
1269 .. seealso::
1270
1271 :meth:`_engine.Connection.execution_options`
1272
1273 :paramref:`_engine.Connection.execute.execution_options`
1274
1275 :paramref:`_orm.Session.execute.execution_options`
1276
1277 :ref:`orm_queryguide_execution_options` - documentation on all
1278 ORM-specific execution options
1279
1280 """ # noqa: E501
1281 if "isolation_level" in kw:
1282 raise exc.ArgumentError(
1283 "'isolation_level' execution option may only be specified "
1284 "on Connection.execution_options(), or "
1285 "per-engine using the isolation_level "
1286 "argument to create_engine()."
1287 )
1288 if "compiled_cache" in kw:
1289 raise exc.ArgumentError(
1290 "'compiled_cache' execution option may only be specified "
1291 "on Connection.execution_options(), not per statement."
1292 )
1293 self._execution_options = self._execution_options.union(kw)
1294 return self
1295
1296 def get_execution_options(self) -> _ExecuteOptions:
1297 """Get the non-SQL options which will take effect during execution.
1298
1299 .. versionadded:: 1.3
1300
1301 .. seealso::
1302
1303 :meth:`.Executable.execution_options`
1304 """
1305 return self._execution_options
1306
1307
1308class SchemaEventTarget(event.EventTarget):
1309 """Base class for elements that are the targets of :class:`.DDLEvents`
1310 events.
1311
1312 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
1313
1314 """
1315
1316 dispatch: dispatcher[SchemaEventTarget]
1317
1318 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
1319 """Associate with this SchemaEvent's parent object."""
1320
1321 def _set_parent_with_dispatch(
1322 self, parent: SchemaEventTarget, **kw: Any
1323 ) -> None:
1324 self.dispatch.before_parent_attach(self, parent)
1325 self._set_parent(parent, **kw)
1326 self.dispatch.after_parent_attach(self, parent)
1327
1328
1329class SchemaVisitable(SchemaEventTarget, visitors.Visitable):
1330 """Base class for elements that are targets of a :class:`.SchemaVisitor`.
1331
1332 .. versionadded:: 2.0.41
1333
1334 """
1335
1336
1337class SchemaVisitor(ClauseVisitor):
1338 """Define the visiting for ``SchemaItem`` and more
1339 generally ``SchemaVisitable`` objects.
1340
1341 """
1342
1343 __traverse_options__ = {"schema_visitor": True}
1344
1345
1346class _SentinelDefaultCharacterization(Enum):
1347 NONE = "none"
1348 UNKNOWN = "unknown"
1349 CLIENTSIDE = "clientside"
1350 SENTINEL_DEFAULT = "sentinel_default"
1351 SERVERSIDE = "serverside"
1352 IDENTITY = "identity"
1353 SEQUENCE = "sequence"
1354
1355
1356class _SentinelColumnCharacterization(NamedTuple):
1357 columns: Optional[Sequence[Column[Any]]] = None
1358 is_explicit: bool = False
1359 is_autoinc: bool = False
1360 default_characterization: _SentinelDefaultCharacterization = (
1361 _SentinelDefaultCharacterization.NONE
1362 )
1363
1364
1365_COLKEY = TypeVar("_COLKEY", Union[None, str], str)
1366
1367_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True)
1368_COL = TypeVar("_COL", bound="ColumnElement[Any]")
1369
1370
1371class _ColumnMetrics(Generic[_COL_co]):
1372 __slots__ = ("column",)
1373
1374 column: _COL_co
1375
1376 def __init__(
1377 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co
1378 ):
1379 self.column = col
1380
1381 # proxy_index being non-empty means it was initialized.
1382 # so we need to update it
1383 pi = collection._proxy_index
1384 if pi:
1385 for eps_col in col._expanded_proxy_set:
1386 pi[eps_col].add(self)
1387
1388 def get_expanded_proxy_set(self):
1389 return self.column._expanded_proxy_set
1390
1391 def dispose(self, collection):
1392 pi = collection._proxy_index
1393 if not pi:
1394 return
1395 for col in self.column._expanded_proxy_set:
1396 colset = pi.get(col, None)
1397 if colset:
1398 colset.discard(self)
1399 if colset is not None and not colset:
1400 del pi[col]
1401
1402 def embedded(
1403 self,
1404 target_set: Union[
1405 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]]
1406 ],
1407 ) -> bool:
1408 expanded_proxy_set = self.column._expanded_proxy_set
1409 for t in target_set.difference(expanded_proxy_set):
1410 if not expanded_proxy_set.intersection(_expand_cloned([t])):
1411 return False
1412 return True
1413
1414
1415class ColumnCollection(Generic[_COLKEY, _COL_co]):
1416 """Collection of :class:`_expression.ColumnElement` instances,
1417 typically for
1418 :class:`_sql.FromClause` objects.
1419
1420 The :class:`_sql.ColumnCollection` object is most commonly available
1421 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection
1422 on the :class:`_schema.Table` object, introduced at
1423 :ref:`metadata_tables_and_columns`.
1424
1425 The :class:`_expression.ColumnCollection` has both mapping- and sequence-
1426 like behaviors. A :class:`_expression.ColumnCollection` usually stores
1427 :class:`_schema.Column` objects, which are then accessible both via mapping
1428 style access as well as attribute access style.
1429
1430 To access :class:`_schema.Column` objects using ordinary attribute-style
1431 access, specify the name like any other object attribute, such as below
1432 a column named ``employee_name`` is accessed::
1433
1434 >>> employee_table.c.employee_name
1435
1436 To access columns that have names with special characters or spaces,
1437 index-style access is used, such as below which illustrates a column named
1438 ``employee ' payment`` is accessed::
1439
1440 >>> employee_table.c["employee ' payment"]
1441
1442 As the :class:`_sql.ColumnCollection` object provides a Python dictionary
1443 interface, common dictionary method names like
1444 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`,
1445 and :meth:`_sql.ColumnCollection.items` are available, which means that
1446 database columns that are keyed under these names also need to use indexed
1447 access::
1448
1449 >>> employee_table.c["values"]
1450
1451
1452 The name for which a :class:`_schema.Column` would be present is normally
1453 that of the :paramref:`_schema.Column.key` parameter. In some contexts,
1454 such as a :class:`_sql.Select` object that uses a label style set
1455 using the :meth:`_sql.Select.set_label_style` method, a column of a certain
1456 key may instead be represented under a particular label name such
1457 as ``tablename_columnname``::
1458
1459 >>> from sqlalchemy import select, column, table
1460 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
1461 >>> t = table("t", column("c"))
1462 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
1463 >>> subq = stmt.subquery()
1464 >>> subq.c.t_c
1465 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c>
1466
1467 :class:`.ColumnCollection` also indexes the columns in order and allows
1468 them to be accessible by their integer position::
1469
1470 >>> cc[0]
1471 Column('x', Integer(), table=None)
1472 >>> cc[1]
1473 Column('y', Integer(), table=None)
1474
1475 .. versionadded:: 1.4 :class:`_expression.ColumnCollection`
1476 allows integer-based
1477 index access to the collection.
1478
1479 Iterating the collection yields the column expressions in order::
1480
1481 >>> list(cc)
1482 [Column('x', Integer(), table=None),
1483 Column('y', Integer(), table=None)]
1484
1485 The base :class:`_expression.ColumnCollection` object can store
1486 duplicates, which can
1487 mean either two columns with the same key, in which case the column
1488 returned by key access is **arbitrary**::
1489
1490 >>> x1, x2 = Column("x", Integer), Column("x", Integer)
1491 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)])
1492 >>> list(cc)
1493 [Column('x', Integer(), table=None),
1494 Column('x', Integer(), table=None)]
1495 >>> cc["x"] is x1
1496 False
1497 >>> cc["x"] is x2
1498 True
1499
1500 Or it can also mean the same column multiple times. These cases are
1501 supported as :class:`_expression.ColumnCollection`
1502 is used to represent the columns in
1503 a SELECT statement which may include duplicates.
1504
1505 A special subclass :class:`.DedupeColumnCollection` exists which instead
1506 maintains SQLAlchemy's older behavior of not allowing duplicates; this
1507 collection is used for schema level objects like :class:`_schema.Table`
1508 and
1509 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The
1510 :class:`.DedupeColumnCollection` class also has additional mutation methods
1511 as the schema constructs have more use cases that require removal and
1512 replacement of columns.
1513
1514 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection`
1515 now stores duplicate
1516 column keys as well as the same column in multiple positions. The
1517 :class:`.DedupeColumnCollection` class is added to maintain the
1518 former behavior in those cases where deduplication as well as
1519 additional replace/remove operations are needed.
1520
1521
1522 """
1523
1524 __slots__ = "_collection", "_index", "_colset", "_proxy_index"
1525
1526 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]]
1527 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
1528 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]]
1529 _colset: Set[_COL_co]
1530
1531 def __init__(
1532 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None
1533 ):
1534 object.__setattr__(self, "_colset", set())
1535 object.__setattr__(self, "_index", {})
1536 object.__setattr__(
1537 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1538 )
1539 object.__setattr__(self, "_collection", [])
1540 if columns:
1541 self._initial_populate(columns)
1542
1543 @util.preload_module("sqlalchemy.sql.elements")
1544 def __clause_element__(self) -> ClauseList:
1545 elements = util.preloaded.sql_elements
1546
1547 return elements.ClauseList(
1548 _literal_as_text_role=roles.ColumnsClauseRole,
1549 group=False,
1550 *self._all_columns,
1551 )
1552
1553 def _initial_populate(
1554 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1555 ) -> None:
1556 self._populate_separate_keys(iter_)
1557
1558 @property
1559 def _all_columns(self) -> List[_COL_co]:
1560 return [col for (_, col, _) in self._collection]
1561
1562 def keys(self) -> List[_COLKEY]:
1563 """Return a sequence of string key names for all columns in this
1564 collection."""
1565 return [k for (k, _, _) in self._collection]
1566
1567 def values(self) -> List[_COL_co]:
1568 """Return a sequence of :class:`_sql.ColumnClause` or
1569 :class:`_schema.Column` objects for all columns in this
1570 collection."""
1571 return [col for (_, col, _) in self._collection]
1572
1573 def items(self) -> List[Tuple[_COLKEY, _COL_co]]:
1574 """Return a sequence of (key, column) tuples for all columns in this
1575 collection each consisting of a string key name and a
1576 :class:`_sql.ColumnClause` or
1577 :class:`_schema.Column` object.
1578 """
1579
1580 return [(k, col) for (k, col, _) in self._collection]
1581
1582 def __bool__(self) -> bool:
1583 return bool(self._collection)
1584
1585 def __len__(self) -> int:
1586 return len(self._collection)
1587
1588 def __iter__(self) -> Iterator[_COL_co]:
1589 # turn to a list first to maintain over a course of changes
1590 return iter([col for _, col, _ in self._collection])
1591
1592 @overload
1593 def __getitem__(self, key: Union[str, int]) -> _COL_co: ...
1594
1595 @overload
1596 def __getitem__(
1597 self, key: Tuple[Union[str, int], ...]
1598 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1599
1600 @overload
1601 def __getitem__(
1602 self, key: slice
1603 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1604
1605 def __getitem__(
1606 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]]
1607 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]:
1608 try:
1609 if isinstance(key, (tuple, slice)):
1610 if isinstance(key, slice):
1611 cols = (
1612 (sub_key, col)
1613 for (sub_key, col, _) in self._collection[key]
1614 )
1615 else:
1616 cols = (self._index[sub_key] for sub_key in key)
1617
1618 return ColumnCollection(cols).as_readonly()
1619 else:
1620 return self._index[key][1]
1621 except KeyError as err:
1622 if isinstance(err.args[0], int):
1623 raise IndexError(err.args[0]) from err
1624 else:
1625 raise
1626
1627 def __getattr__(self, key: str) -> _COL_co:
1628 try:
1629 return self._index[key][1]
1630 except KeyError as err:
1631 raise AttributeError(key) from err
1632
1633 def __contains__(self, key: str) -> bool:
1634 if key not in self._index:
1635 if not isinstance(key, str):
1636 raise exc.ArgumentError(
1637 "__contains__ requires a string argument"
1638 )
1639 return False
1640 else:
1641 return True
1642
1643 def compare(self, other: ColumnCollection[Any, Any]) -> bool:
1644 """Compare this :class:`_expression.ColumnCollection` to another
1645 based on the names of the keys"""
1646
1647 for l, r in zip_longest(self, other):
1648 if l is not r:
1649 return False
1650 else:
1651 return True
1652
1653 def __eq__(self, other: Any) -> bool:
1654 return self.compare(other)
1655
1656 @overload
1657 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ...
1658
1659 @overload
1660 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ...
1661
1662 def get(
1663 self, key: str, default: Optional[_COL] = None
1664 ) -> Optional[Union[_COL_co, _COL]]:
1665 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object
1666 based on a string key name from this
1667 :class:`_expression.ColumnCollection`."""
1668
1669 if key in self._index:
1670 return self._index[key][1]
1671 else:
1672 return default
1673
1674 def __str__(self) -> str:
1675 return "%s(%s)" % (
1676 self.__class__.__name__,
1677 ", ".join(str(c) for c in self),
1678 )
1679
1680 def __setitem__(self, key: str, value: Any) -> NoReturn:
1681 raise NotImplementedError()
1682
1683 def __delitem__(self, key: str) -> NoReturn:
1684 raise NotImplementedError()
1685
1686 def __setattr__(self, key: str, obj: Any) -> NoReturn:
1687 raise NotImplementedError()
1688
1689 def clear(self) -> NoReturn:
1690 """Dictionary clear() is not implemented for
1691 :class:`_sql.ColumnCollection`."""
1692 raise NotImplementedError()
1693
1694 def remove(self, column: Any) -> None:
1695 raise NotImplementedError()
1696
1697 def update(self, iter_: Any) -> NoReturn:
1698 """Dictionary update() is not implemented for
1699 :class:`_sql.ColumnCollection`."""
1700 raise NotImplementedError()
1701
1702 # https://github.com/python/mypy/issues/4266
1703 __hash__ = None # type: ignore
1704
1705 def _populate_separate_keys(
1706 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1707 ) -> None:
1708 """populate from an iterator of (key, column)"""
1709
1710 self._collection[:] = collection = [
1711 (k, c, _ColumnMetrics(self, c)) for k, c in iter_
1712 ]
1713 self._colset.update(c._deannotate() for _, c, _ in collection)
1714 self._index.update(
1715 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)}
1716 )
1717 self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
1718
1719 def add(
1720 self, column: ColumnElement[Any], key: Optional[_COLKEY] = None
1721 ) -> None:
1722 """Add a column to this :class:`_sql.ColumnCollection`.
1723
1724 .. note::
1725
1726 This method is **not normally used by user-facing code**, as the
1727 :class:`_sql.ColumnCollection` is usually part of an existing
1728 object such as a :class:`_schema.Table`. To add a
1729 :class:`_schema.Column` to an existing :class:`_schema.Table`
1730 object, use the :meth:`_schema.Table.append_column` method.
1731
1732 """
1733 colkey: _COLKEY
1734
1735 if key is None:
1736 colkey = column.key # type: ignore
1737 else:
1738 colkey = key
1739
1740 l = len(self._collection)
1741
1742 # don't really know how this part is supposed to work w/ the
1743 # covariant thing
1744
1745 _column = cast(_COL_co, column)
1746
1747 self._collection.append(
1748 (colkey, _column, _ColumnMetrics(self, _column))
1749 )
1750 self._colset.add(_column._deannotate())
1751 self._index[l] = (colkey, _column)
1752 if colkey not in self._index:
1753 self._index[colkey] = (colkey, _column)
1754
1755 def __getstate__(self) -> Dict[str, Any]:
1756 return {
1757 "_collection": [(k, c) for k, c, _ in self._collection],
1758 "_index": self._index,
1759 }
1760
1761 def __setstate__(self, state: Dict[str, Any]) -> None:
1762 object.__setattr__(self, "_index", state["_index"])
1763 object.__setattr__(
1764 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1765 )
1766 object.__setattr__(
1767 self,
1768 "_collection",
1769 [
1770 (k, c, _ColumnMetrics(self, c))
1771 for (k, c) in state["_collection"]
1772 ],
1773 )
1774 object.__setattr__(
1775 self, "_colset", {col for k, col, _ in self._collection}
1776 )
1777
1778 def contains_column(self, col: ColumnElement[Any]) -> bool:
1779 """Checks if a column object exists in this collection"""
1780 if col not in self._colset:
1781 if isinstance(col, str):
1782 raise exc.ArgumentError(
1783 "contains_column cannot be used with string arguments. "
1784 "Use ``col_name in table.c`` instead."
1785 )
1786 return False
1787 else:
1788 return True
1789
1790 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
1791 """Return a "read only" form of this
1792 :class:`_sql.ColumnCollection`."""
1793
1794 return ReadOnlyColumnCollection(self)
1795
1796 def _init_proxy_index(self):
1797 """populate the "proxy index", if empty.
1798
1799 proxy index is added in 2.0 to provide more efficient operation
1800 for the corresponding_column() method.
1801
1802 For reasons of both time to construct new .c collections as well as
1803 memory conservation for large numbers of large .c collections, the
1804 proxy_index is only filled if corresponding_column() is called. once
1805 filled it stays that way, and new _ColumnMetrics objects created after
1806 that point will populate it with new data. Note this case would be
1807 unusual, if not nonexistent, as it means a .c collection is being
1808 mutated after corresponding_column() were used, however it is tested in
1809 test/base/test_utils.py.
1810
1811 """
1812 pi = self._proxy_index
1813 if pi:
1814 return
1815
1816 for _, _, metrics in self._collection:
1817 eps = metrics.column._expanded_proxy_set
1818
1819 for eps_col in eps:
1820 pi[eps_col].add(metrics)
1821
1822 def corresponding_column(
1823 self, column: _COL, require_embedded: bool = False
1824 ) -> Optional[Union[_COL, _COL_co]]:
1825 """Given a :class:`_expression.ColumnElement`, return the exported
1826 :class:`_expression.ColumnElement` object from this
1827 :class:`_expression.ColumnCollection`
1828 which corresponds to that original :class:`_expression.ColumnElement`
1829 via a common
1830 ancestor column.
1831
1832 :param column: the target :class:`_expression.ColumnElement`
1833 to be matched.
1834
1835 :param require_embedded: only return corresponding columns for
1836 the given :class:`_expression.ColumnElement`, if the given
1837 :class:`_expression.ColumnElement`
1838 is actually present within a sub-element
1839 of this :class:`_expression.Selectable`.
1840 Normally the column will match if
1841 it merely shares a common ancestor with one of the exported
1842 columns of this :class:`_expression.Selectable`.
1843
1844 .. seealso::
1845
1846 :meth:`_expression.Selectable.corresponding_column`
1847 - invokes this method
1848 against the collection returned by
1849 :attr:`_expression.Selectable.exported_columns`.
1850
1851 .. versionchanged:: 1.4 the implementation for ``corresponding_column``
1852 was moved onto the :class:`_expression.ColumnCollection` itself.
1853
1854 """
1855 # TODO: cython candidate
1856
1857 # don't dig around if the column is locally present
1858 if column in self._colset:
1859 return column
1860
1861 selected_intersection, selected_metrics = None, None
1862 target_set = column.proxy_set
1863
1864 pi = self._proxy_index
1865 if not pi:
1866 self._init_proxy_index()
1867
1868 for current_metrics in (
1869 mm for ts in target_set if ts in pi for mm in pi[ts]
1870 ):
1871 if not require_embedded or current_metrics.embedded(target_set):
1872 if selected_metrics is None:
1873 # no corresponding column yet, pick this one.
1874 selected_metrics = current_metrics
1875 continue
1876
1877 current_intersection = target_set.intersection(
1878 current_metrics.column._expanded_proxy_set
1879 )
1880 if selected_intersection is None:
1881 selected_intersection = target_set.intersection(
1882 selected_metrics.column._expanded_proxy_set
1883 )
1884
1885 if len(current_intersection) > len(selected_intersection):
1886 # 'current' has a larger field of correspondence than
1887 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x
1888 # matches a1.c.x->table.c.x better than
1889 # selectable.c.x->table.c.x does.
1890
1891 selected_metrics = current_metrics
1892 selected_intersection = current_intersection
1893 elif current_intersection == selected_intersection:
1894 # they have the same field of correspondence. see
1895 # which proxy_set has fewer columns in it, which
1896 # indicates a closer relationship with the root
1897 # column. Also take into account the "weight"
1898 # attribute which CompoundSelect() uses to give
1899 # higher precedence to columns based on vertical
1900 # position in the compound statement, and discard
1901 # columns that have no reference to the target
1902 # column (also occurs with CompoundSelect)
1903
1904 selected_col_distance = sum(
1905 [
1906 sc._annotations.get("weight", 1)
1907 for sc in (
1908 selected_metrics.column._uncached_proxy_list()
1909 )
1910 if sc.shares_lineage(column)
1911 ],
1912 )
1913 current_col_distance = sum(
1914 [
1915 sc._annotations.get("weight", 1)
1916 for sc in (
1917 current_metrics.column._uncached_proxy_list()
1918 )
1919 if sc.shares_lineage(column)
1920 ],
1921 )
1922 if current_col_distance < selected_col_distance:
1923 selected_metrics = current_metrics
1924 selected_intersection = current_intersection
1925
1926 return selected_metrics.column if selected_metrics else None
1927
1928
1929_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]")
1930
1931
1932class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
1933 """A :class:`_expression.ColumnCollection`
1934 that maintains deduplicating behavior.
1935
1936 This is useful by schema level objects such as :class:`_schema.Table` and
1937 :class:`.PrimaryKeyConstraint`. The collection includes more
1938 sophisticated mutator methods as well to suit schema objects which
1939 require mutable column collections.
1940
1941 .. versionadded:: 1.4
1942
1943 """
1944
1945 def add( # type: ignore[override]
1946 self, column: _NAMEDCOL, key: Optional[str] = None
1947 ) -> None:
1948 if key is not None and column.key != key:
1949 raise exc.ArgumentError(
1950 "DedupeColumnCollection requires columns be under "
1951 "the same key as their .key"
1952 )
1953 key = column.key
1954
1955 if key is None:
1956 raise exc.ArgumentError(
1957 "Can't add unnamed column to column collection"
1958 )
1959
1960 if key in self._index:
1961 existing = self._index[key][1]
1962
1963 if existing is column:
1964 return
1965
1966 self.replace(column)
1967
1968 # pop out memoized proxy_set as this
1969 # operation may very well be occurring
1970 # in a _make_proxy operation
1971 util.memoized_property.reset(column, "proxy_set")
1972 else:
1973 self._append_new_column(key, column)
1974
1975 def _append_new_column(self, key: str, named_column: _NAMEDCOL) -> None:
1976 l = len(self._collection)
1977 self._collection.append(
1978 (key, named_column, _ColumnMetrics(self, named_column))
1979 )
1980 self._colset.add(named_column._deannotate())
1981 self._index[l] = (key, named_column)
1982 self._index[key] = (key, named_column)
1983
1984 def _populate_separate_keys(
1985 self, iter_: Iterable[Tuple[str, _NAMEDCOL]]
1986 ) -> None:
1987 """populate from an iterator of (key, column)"""
1988 cols = list(iter_)
1989
1990 replace_col = []
1991 for k, col in cols:
1992 if col.key != k:
1993 raise exc.ArgumentError(
1994 "DedupeColumnCollection requires columns be under "
1995 "the same key as their .key"
1996 )
1997 if col.name in self._index and col.key != col.name:
1998 replace_col.append(col)
1999 elif col.key in self._index:
2000 replace_col.append(col)
2001 else:
2002 self._index[k] = (k, col)
2003 self._collection.append((k, col, _ColumnMetrics(self, col)))
2004 self._colset.update(c._deannotate() for (k, c, _) in self._collection)
2005
2006 self._index.update(
2007 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection)
2008 )
2009 for col in replace_col:
2010 self.replace(col)
2011
2012 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None:
2013 self._populate_separate_keys((col.key, col) for col in iter_)
2014
2015 def remove(self, column: _NAMEDCOL) -> None:
2016 if column not in self._colset:
2017 raise ValueError(
2018 "Can't remove column %r; column is not in this collection"
2019 % column
2020 )
2021 del self._index[column.key]
2022 self._colset.remove(column)
2023 self._collection[:] = [
2024 (k, c, metrics)
2025 for (k, c, metrics) in self._collection
2026 if c is not column
2027 ]
2028 for metrics in self._proxy_index.get(column, ()):
2029 metrics.dispose(self)
2030
2031 self._index.update(
2032 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2033 )
2034 # delete higher index
2035 del self._index[len(self._collection)]
2036
2037 def replace(
2038 self,
2039 column: _NAMEDCOL,
2040 extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
2041 ) -> None:
2042 """add the given column to this collection, removing unaliased
2043 versions of this column as well as existing columns with the
2044 same key.
2045
2046 e.g.::
2047
2048 t = Table("sometable", metadata, Column("col1", Integer))
2049 t.columns.replace(Column("col1", Integer, key="columnone"))
2050
2051 will remove the original 'col1' from the collection, and add
2052 the new column under the name 'columnname'.
2053
2054 Used by schema.Column to override columns during table reflection.
2055
2056 """
2057
2058 if extra_remove:
2059 remove_col = set(extra_remove)
2060 else:
2061 remove_col = set()
2062 # remove up to two columns based on matches of name as well as key
2063 if column.name in self._index and column.key != column.name:
2064 other = self._index[column.name][1]
2065 if other.name == other.key:
2066 remove_col.add(other)
2067
2068 if column.key in self._index:
2069 remove_col.add(self._index[column.key][1])
2070
2071 if not remove_col:
2072 self._append_new_column(column.key, column)
2073 return
2074 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
2075 replaced = False
2076 for k, col, metrics in self._collection:
2077 if col in remove_col:
2078 if not replaced:
2079 replaced = True
2080 new_cols.append(
2081 (column.key, column, _ColumnMetrics(self, column))
2082 )
2083 else:
2084 new_cols.append((k, col, metrics))
2085
2086 if remove_col:
2087 self._colset.difference_update(remove_col)
2088
2089 for rc in remove_col:
2090 for metrics in self._proxy_index.get(rc, ()):
2091 metrics.dispose(self)
2092
2093 if not replaced:
2094 new_cols.append((column.key, column, _ColumnMetrics(self, column)))
2095
2096 self._colset.add(column._deannotate())
2097 self._collection[:] = new_cols
2098
2099 self._index.clear()
2100
2101 self._index.update(
2102 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2103 )
2104 self._index.update({k: (k, col) for (k, col, _) in self._collection})
2105
2106
2107class ReadOnlyColumnCollection(
2108 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co]
2109):
2110 __slots__ = ("_parent",)
2111
2112 def __init__(self, collection):
2113 object.__setattr__(self, "_parent", collection)
2114 object.__setattr__(self, "_colset", collection._colset)
2115 object.__setattr__(self, "_index", collection._index)
2116 object.__setattr__(self, "_collection", collection._collection)
2117 object.__setattr__(self, "_proxy_index", collection._proxy_index)
2118
2119 def __getstate__(self):
2120 return {"_parent": self._parent}
2121
2122 def __setstate__(self, state):
2123 parent = state["_parent"]
2124 self.__init__(parent) # type: ignore
2125
2126 def add(self, column: Any, key: Any = ...) -> Any:
2127 self._readonly()
2128
2129 def extend(self, elements: Any) -> NoReturn:
2130 self._readonly()
2131
2132 def remove(self, item: Any) -> NoReturn:
2133 self._readonly()
2134
2135
2136class ColumnSet(util.OrderedSet["ColumnClause[Any]"]):
2137 def contains_column(self, col):
2138 return col in self
2139
2140 def extend(self, cols):
2141 for col in cols:
2142 self.add(col)
2143
2144 def __eq__(self, other):
2145 l = []
2146 for c in other:
2147 for local in self:
2148 if c.shares_lineage(local):
2149 l.append(c == local)
2150 return elements.and_(*l)
2151
2152 def __hash__(self): # type: ignore[override]
2153 return hash(tuple(x for x in self))
2154
2155
2156def _entity_namespace(
2157 entity: Union[_HasEntityNamespace, ExternallyTraversible]
2158) -> _EntityNamespace:
2159 """Return the nearest .entity_namespace for the given entity.
2160
2161 If not immediately available, does an iterate to find a sub-element
2162 that has one, if any.
2163
2164 """
2165 try:
2166 return cast(_HasEntityNamespace, entity).entity_namespace
2167 except AttributeError:
2168 for elem in visitors.iterate(cast(ExternallyTraversible, entity)):
2169 if _is_has_entity_namespace(elem):
2170 return elem.entity_namespace
2171 else:
2172 raise
2173
2174
2175def _entity_namespace_key(
2176 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2177 key: str,
2178 default: Union[SQLCoreOperations[Any], _NoArg] = NO_ARG,
2179) -> SQLCoreOperations[Any]:
2180 """Return an entry from an entity_namespace.
2181
2182
2183 Raises :class:`_exc.InvalidRequestError` rather than attribute error
2184 on not found.
2185
2186 """
2187
2188 try:
2189 ns = _entity_namespace(entity)
2190 if default is not NO_ARG:
2191 return getattr(ns, key, default)
2192 else:
2193 return getattr(ns, key) # type: ignore
2194 except AttributeError as err:
2195 raise exc.InvalidRequestError(
2196 'Entity namespace for "%s" has no property "%s"' % (entity, key)
2197 ) from err