1# sql/base.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Foundational utilities common to many sql modules.
10
11"""
12
13
14from __future__ import annotations
15
16import collections
17from enum import Enum
18import itertools
19from itertools import zip_longest
20import operator
21import re
22from typing import Any
23from typing import Callable
24from typing import cast
25from typing import Dict
26from typing import FrozenSet
27from typing import Generic
28from typing import Iterable
29from typing import Iterator
30from typing import List
31from typing import Mapping
32from typing import MutableMapping
33from typing import NamedTuple
34from typing import NoReturn
35from typing import Optional
36from typing import overload
37from typing import Protocol
38from typing import Sequence
39from typing import Set
40from typing import Tuple
41from typing import Type
42from typing import TYPE_CHECKING
43from typing import TypeVar
44from typing import Union
45
46from . import roles
47from . import visitors
48from .cache_key import HasCacheKey # noqa
49from .cache_key import MemoizedHasCacheKey # noqa
50from .traversals import HasCopyInternals # noqa
51from .visitors import ClauseVisitor
52from .visitors import ExtendedInternalTraversal
53from .visitors import ExternallyTraversible
54from .visitors import InternalTraversal
55from .. import event
56from .. import exc
57from .. import util
58from ..util import HasMemoized as HasMemoized
59from ..util import hybridmethod
60from ..util.typing import Self
61from ..util.typing import TypeGuard
62
63if TYPE_CHECKING:
64 from . import coercions
65 from . import elements
66 from . import type_api
67 from ._orm_types import DMLStrategyArgument
68 from ._orm_types import SynchronizeSessionArgument
69 from ._typing import _CLE
70 from .elements import BindParameter
71 from .elements import ClauseList
72 from .elements import ColumnClause # noqa
73 from .elements import ColumnElement
74 from .elements import NamedColumn
75 from .elements import SQLCoreOperations
76 from .elements import TextClause
77 from .schema import Column
78 from .schema import DefaultGenerator
79 from .selectable import _JoinTargetElement
80 from .selectable import _SelectIterable
81 from .selectable import FromClause
82 from ..engine import Connection
83 from ..engine import CursorResult
84 from ..engine.interfaces import _CoreMultiExecuteParams
85 from ..engine.interfaces import _ExecuteOptions
86 from ..engine.interfaces import _ImmutableExecuteOptions
87 from ..engine.interfaces import CacheStats
88 from ..engine.interfaces import Compiled
89 from ..engine.interfaces import CompiledCacheType
90 from ..engine.interfaces import CoreExecuteOptionsParameter
91 from ..engine.interfaces import Dialect
92 from ..engine.interfaces import IsolationLevel
93 from ..engine.interfaces import SchemaTranslateMapType
94 from ..event import dispatcher
95
96if not TYPE_CHECKING:
97 coercions = None # noqa
98 elements = None # noqa
99 type_api = None # noqa
100
101
102class _NoArg(Enum):
103 NO_ARG = 0
104
105 def __repr__(self):
106 return f"_NoArg.{self.name}"
107
108
109NO_ARG = _NoArg.NO_ARG
110
111
112class _NoneName(Enum):
113 NONE_NAME = 0
114 """indicate a 'deferred' name that was ultimately the value None."""
115
116
117_NONE_NAME = _NoneName.NONE_NAME
118
119_T = TypeVar("_T", bound=Any)
120
121_Fn = TypeVar("_Fn", bound=Callable[..., Any])
122
123_AmbiguousTableNameMap = MutableMapping[str, str]
124
125
126class _DefaultDescriptionTuple(NamedTuple):
127 arg: Any
128 is_scalar: Optional[bool]
129 is_callable: Optional[bool]
130 is_sentinel: Optional[bool]
131
132 @classmethod
133 def _from_column_default(
134 cls, default: Optional[DefaultGenerator]
135 ) -> _DefaultDescriptionTuple:
136 return (
137 _DefaultDescriptionTuple(
138 default.arg, # type: ignore
139 default.is_scalar,
140 default.is_callable,
141 default.is_sentinel,
142 )
143 if default
144 and (
145 default.has_arg
146 or (not default.for_update and default.is_sentinel)
147 )
148 else _DefaultDescriptionTuple(None, None, None, None)
149 )
150
151
152_never_select_column = operator.attrgetter("_omit_from_statements")
153
154
155class _EntityNamespace(Protocol):
156 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ...
157
158
159class _HasEntityNamespace(Protocol):
160 @util.ro_non_memoized_property
161 def entity_namespace(self) -> _EntityNamespace: ...
162
163
164def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]:
165 return hasattr(element, "entity_namespace")
166
167
168# Remove when https://github.com/python/mypy/issues/14640 will be fixed
169_Self = TypeVar("_Self", bound=Any)
170
171
172class Immutable:
173 """mark a ClauseElement as 'immutable' when expressions are cloned.
174
175 "immutable" objects refers to the "mutability" of an object in the
176 context of SQL DQL and DML generation. Such as, in DQL, one can
177 compose a SELECT or subquery of varied forms, but one cannot modify
178 the structure of a specific table or column within DQL.
179 :class:`.Immutable` is mostly intended to follow this concept, and as
180 such the primary "immutable" objects are :class:`.ColumnClause`,
181 :class:`.Column`, :class:`.TableClause`, :class:`.Table`.
182
183 """
184
185 __slots__ = ()
186
187 _is_immutable = True
188
189 def unique_params(self, *optionaldict, **kwargs):
190 raise NotImplementedError("Immutable objects do not support copying")
191
192 def params(self, *optionaldict, **kwargs):
193 raise NotImplementedError("Immutable objects do not support copying")
194
195 def _clone(self: _Self, **kw: Any) -> _Self:
196 return self
197
198 def _copy_internals(
199 self, *, omit_attrs: Iterable[str] = (), **kw: Any
200 ) -> None:
201 pass
202
203
204class SingletonConstant(Immutable):
205 """Represent SQL constants like NULL, TRUE, FALSE"""
206
207 _is_singleton_constant = True
208
209 _singleton: SingletonConstant
210
211 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T:
212 return cast(_T, cls._singleton)
213
214 @util.non_memoized_property
215 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
216 raise NotImplementedError()
217
218 @classmethod
219 def _create_singleton(cls):
220 obj = object.__new__(cls)
221 obj.__init__() # type: ignore
222
223 # for a long time this was an empty frozenset, meaning
224 # a SingletonConstant would never be a "corresponding column" in
225 # a statement. This referred to #6259. However, in #7154 we see
226 # that we do in fact need "correspondence" to work when matching cols
227 # in result sets, so the non-correspondence was moved to a more
228 # specific level when we are actually adapting expressions for SQL
229 # render only.
230 obj.proxy_set = frozenset([obj])
231 cls._singleton = obj
232
233
234def _from_objects(
235 *elements: Union[
236 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement
237 ]
238) -> Iterator[FromClause]:
239 return itertools.chain.from_iterable(
240 [element._from_objects for element in elements]
241 )
242
243
244def _select_iterables(
245 elements: Iterable[roles.ColumnsClauseRole],
246) -> _SelectIterable:
247 """expand tables into individual columns in the
248 given list of column expressions.
249
250 """
251 return itertools.chain.from_iterable(
252 [c._select_iterable for c in elements]
253 )
254
255
256_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType")
257
258
259class _GenerativeType(Protocol):
260 def _generate(self) -> Self: ...
261
262
263def _generative(fn: _Fn) -> _Fn:
264 """non-caching _generative() decorator.
265
266 This is basically the legacy decorator that copies the object and
267 runs a method on the new copy.
268
269 """
270
271 @util.decorator
272 def _generative(
273 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any
274 ) -> _SelfGenerativeType:
275 """Mark a method as generative."""
276
277 self = self._generate()
278 x = fn(self, *args, **kw)
279 assert x is self, "generative methods must return self"
280 return self
281
282 decorated = _generative(fn)
283 decorated.non_generative = fn # type: ignore
284 return decorated
285
286
287def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]:
288 msgs = kw.pop("msgs", {})
289
290 defaults = kw.pop("defaults", {})
291
292 getters = [
293 (name, operator.attrgetter(name), defaults.get(name, None))
294 for name in names
295 ]
296
297 @util.decorator
298 def check(fn, *args, **kw):
299 # make pylance happy by not including "self" in the argument
300 # list
301 self = args[0]
302 args = args[1:]
303 for name, getter, default_ in getters:
304 if getter(self) is not default_:
305 msg = msgs.get(
306 name,
307 "Method %s() has already been invoked on this %s construct"
308 % (fn.__name__, self.__class__),
309 )
310 raise exc.InvalidRequestError(msg)
311 return fn(self, *args, **kw)
312
313 return check
314
315
316def _clone(element, **kw):
317 return element._clone(**kw)
318
319
320def _expand_cloned(
321 elements: Iterable[_CLE],
322) -> Iterable[_CLE]:
323 """expand the given set of ClauseElements to be the set of all 'cloned'
324 predecessors.
325
326 """
327 # TODO: cython candidate
328 return itertools.chain(*[x._cloned_set for x in elements])
329
330
331def _de_clone(
332 elements: Iterable[_CLE],
333) -> Iterable[_CLE]:
334 for x in elements:
335 while x._is_clone_of is not None:
336 x = x._is_clone_of
337 yield x
338
339
340def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
341 """return the intersection of sets a and b, counting
342 any overlap between 'cloned' predecessors.
343
344 The returned set is in terms of the entities present within 'a'.
345
346 """
347 all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
348 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)}
349
350
351def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
352 all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
353 return {
354 elem for elem in a if not all_overlap.intersection(elem._cloned_set)
355 }
356
357
358class _DialectArgView(MutableMapping[str, Any]):
359 """A dictionary view of dialect-level arguments in the form
360 <dialectname>_<argument_name>.
361
362 """
363
364 def __init__(self, obj):
365 self.obj = obj
366
367 def _key(self, key):
368 try:
369 dialect, value_key = key.split("_", 1)
370 except ValueError as err:
371 raise KeyError(key) from err
372 else:
373 return dialect, value_key
374
375 def __getitem__(self, key):
376 dialect, value_key = self._key(key)
377
378 try:
379 opt = self.obj.dialect_options[dialect]
380 except exc.NoSuchModuleError as err:
381 raise KeyError(key) from err
382 else:
383 return opt[value_key]
384
385 def __setitem__(self, key, value):
386 try:
387 dialect, value_key = self._key(key)
388 except KeyError as err:
389 raise exc.ArgumentError(
390 "Keys must be of the form <dialectname>_<argname>"
391 ) from err
392 else:
393 self.obj.dialect_options[dialect][value_key] = value
394
395 def __delitem__(self, key):
396 dialect, value_key = self._key(key)
397 del self.obj.dialect_options[dialect][value_key]
398
399 def __len__(self):
400 return sum(
401 len(args._non_defaults)
402 for args in self.obj.dialect_options.values()
403 )
404
405 def __iter__(self):
406 return (
407 "%s_%s" % (dialect_name, value_name)
408 for dialect_name in self.obj.dialect_options
409 for value_name in self.obj.dialect_options[
410 dialect_name
411 ]._non_defaults
412 )
413
414
415class _DialectArgDict(MutableMapping[str, Any]):
416 """A dictionary view of dialect-level arguments for a specific
417 dialect.
418
419 Maintains a separate collection of user-specified arguments
420 and dialect-specified default arguments.
421
422 """
423
424 def __init__(self):
425 self._non_defaults = {}
426 self._defaults = {}
427
428 def __len__(self):
429 return len(set(self._non_defaults).union(self._defaults))
430
431 def __iter__(self):
432 return iter(set(self._non_defaults).union(self._defaults))
433
434 def __getitem__(self, key):
435 if key in self._non_defaults:
436 return self._non_defaults[key]
437 else:
438 return self._defaults[key]
439
440 def __setitem__(self, key, value):
441 self._non_defaults[key] = value
442
443 def __delitem__(self, key):
444 del self._non_defaults[key]
445
446
447@util.preload_module("sqlalchemy.dialects")
448def _kw_reg_for_dialect(dialect_name):
449 dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
450 if dialect_cls.construct_arguments is None:
451 return None
452 return dict(dialect_cls.construct_arguments)
453
454
455class DialectKWArgs:
456 """Establish the ability for a class to have dialect-specific arguments
457 with defaults and constructor validation.
458
459 The :class:`.DialectKWArgs` interacts with the
460 :attr:`.DefaultDialect.construct_arguments` present on a dialect.
461
462 .. seealso::
463
464 :attr:`.DefaultDialect.construct_arguments`
465
466 """
467
468 __slots__ = ()
469
470 _dialect_kwargs_traverse_internals = [
471 ("dialect_options", InternalTraversal.dp_dialect_options)
472 ]
473
474 @classmethod
475 def argument_for(cls, dialect_name, argument_name, default):
476 """Add a new kind of dialect-specific keyword argument for this class.
477
478 E.g.::
479
480 Index.argument_for("mydialect", "length", None)
481
482 some_index = Index('a', 'b', mydialect_length=5)
483
484 The :meth:`.DialectKWArgs.argument_for` method is a per-argument
485 way adding extra arguments to the
486 :attr:`.DefaultDialect.construct_arguments` dictionary. This
487 dictionary provides a list of argument names accepted by various
488 schema-level constructs on behalf of a dialect.
489
490 New dialects should typically specify this dictionary all at once as a
491 data member of the dialect class. The use case for ad-hoc addition of
492 argument names is typically for end-user code that is also using
493 a custom compilation scheme which consumes the additional arguments.
494
495 :param dialect_name: name of a dialect. The dialect must be
496 locatable, else a :class:`.NoSuchModuleError` is raised. The
497 dialect must also include an existing
498 :attr:`.DefaultDialect.construct_arguments` collection, indicating
499 that it participates in the keyword-argument validation and default
500 system, else :class:`.ArgumentError` is raised. If the dialect does
501 not include this collection, then any keyword argument can be
502 specified on behalf of this dialect already. All dialects packaged
503 within SQLAlchemy include this collection, however for third party
504 dialects, support may vary.
505
506 :param argument_name: name of the parameter.
507
508 :param default: default value of the parameter.
509
510 """
511
512 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
513 if construct_arg_dictionary is None:
514 raise exc.ArgumentError(
515 "Dialect '%s' does have keyword-argument "
516 "validation and defaults enabled configured" % dialect_name
517 )
518 if cls not in construct_arg_dictionary:
519 construct_arg_dictionary[cls] = {}
520 construct_arg_dictionary[cls][argument_name] = default
521
522 @util.memoized_property
523 def dialect_kwargs(self):
524 """A collection of keyword arguments specified as dialect-specific
525 options to this construct.
526
527 The arguments are present here in their original ``<dialect>_<kwarg>``
528 format. Only arguments that were actually passed are included;
529 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which
530 contains all options known by this dialect including defaults.
531
532 The collection is also writable; keys are accepted of the
533 form ``<dialect>_<kwarg>`` where the value will be assembled
534 into the list of options.
535
536 .. seealso::
537
538 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form
539
540 """
541 return _DialectArgView(self)
542
543 @property
544 def kwargs(self):
545 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
546 return self.dialect_kwargs
547
548 _kw_registry = util.PopulateDict(_kw_reg_for_dialect)
549
550 def _kw_reg_for_dialect_cls(self, dialect_name):
551 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
552 d = _DialectArgDict()
553
554 if construct_arg_dictionary is None:
555 d._defaults.update({"*": None})
556 else:
557 for cls in reversed(self.__class__.__mro__):
558 if cls in construct_arg_dictionary:
559 d._defaults.update(construct_arg_dictionary[cls])
560 return d
561
562 @util.memoized_property
563 def dialect_options(self):
564 """A collection of keyword arguments specified as dialect-specific
565 options to this construct.
566
567 This is a two-level nested registry, keyed to ``<dialect_name>``
568 and ``<argument_name>``. For example, the ``postgresql_where``
569 argument would be locatable as::
570
571 arg = my_object.dialect_options['postgresql']['where']
572
573 .. versionadded:: 0.9.2
574
575 .. seealso::
576
577 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form
578
579 """
580
581 return util.PopulateDict(
582 util.portable_instancemethod(self._kw_reg_for_dialect_cls)
583 )
584
585 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None:
586 # validate remaining kwargs that they all specify DB prefixes
587
588 if not kwargs:
589 return
590
591 for k in kwargs:
592 m = re.match("^(.+?)_(.+)$", k)
593 if not m:
594 raise TypeError(
595 "Additional arguments should be "
596 "named <dialectname>_<argument>, got '%s'" % k
597 )
598 dialect_name, arg_name = m.group(1, 2)
599
600 try:
601 construct_arg_dictionary = self.dialect_options[dialect_name]
602 except exc.NoSuchModuleError:
603 util.warn(
604 "Can't validate argument %r; can't "
605 "locate any SQLAlchemy dialect named %r"
606 % (k, dialect_name)
607 )
608 self.dialect_options[dialect_name] = d = _DialectArgDict()
609 d._defaults.update({"*": None})
610 d._non_defaults[arg_name] = kwargs[k]
611 else:
612 if (
613 "*" not in construct_arg_dictionary
614 and arg_name not in construct_arg_dictionary
615 ):
616 raise exc.ArgumentError(
617 "Argument %r is not accepted by "
618 "dialect %r on behalf of %r"
619 % (k, dialect_name, self.__class__)
620 )
621 else:
622 construct_arg_dictionary[arg_name] = kwargs[k]
623
624
625class CompileState:
626 """Produces additional object state necessary for a statement to be
627 compiled.
628
629 the :class:`.CompileState` class is at the base of classes that assemble
630 state for a particular statement object that is then used by the
631 compiler. This process is essentially an extension of the process that
632 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis
633 on converting raw user intent into more organized structures rather than
634 producing string output. The top-level :class:`.CompileState` for the
635 statement being executed is also accessible when the execution context
636 works with invoking the statement and collecting results.
637
638 The production of :class:`.CompileState` is specific to the compiler, such
639 as within the :meth:`.SQLCompiler.visit_insert`,
640 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also
641 responsible for associating the :class:`.CompileState` with the
642 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement,
643 i.e. the outermost SQL statement that's actually being executed.
644 There can be other :class:`.CompileState` objects that are not the
645 toplevel, such as when a SELECT subquery or CTE-nested
646 INSERT/UPDATE/DELETE is generated.
647
648 .. versionadded:: 1.4
649
650 """
651
652 __slots__ = ("statement", "_ambiguous_table_name_map")
653
654 plugins: Dict[Tuple[str, str], Type[CompileState]] = {}
655
656 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap]
657
658 @classmethod
659 def create_for_statement(cls, statement, compiler, **kw):
660 # factory construction.
661
662 if statement._propagate_attrs:
663 plugin_name = statement._propagate_attrs.get(
664 "compile_state_plugin", "default"
665 )
666 klass = cls.plugins.get(
667 (plugin_name, statement._effective_plugin_target), None
668 )
669 if klass is None:
670 klass = cls.plugins[
671 ("default", statement._effective_plugin_target)
672 ]
673
674 else:
675 klass = cls.plugins[
676 ("default", statement._effective_plugin_target)
677 ]
678
679 if klass is cls:
680 return cls(statement, compiler, **kw)
681 else:
682 return klass.create_for_statement(statement, compiler, **kw)
683
684 def __init__(self, statement, compiler, **kw):
685 self.statement = statement
686
687 @classmethod
688 def get_plugin_class(
689 cls, statement: Executable
690 ) -> Optional[Type[CompileState]]:
691 plugin_name = statement._propagate_attrs.get(
692 "compile_state_plugin", None
693 )
694
695 if plugin_name:
696 key = (plugin_name, statement._effective_plugin_target)
697 if key in cls.plugins:
698 return cls.plugins[key]
699
700 # there's no case where we call upon get_plugin_class() and want
701 # to get None back, there should always be a default. return that
702 # if there was no plugin-specific class (e.g. Insert with "orm"
703 # plugin)
704 try:
705 return cls.plugins[("default", statement._effective_plugin_target)]
706 except KeyError:
707 return None
708
709 @classmethod
710 def _get_plugin_class_for_plugin(
711 cls, statement: Executable, plugin_name: str
712 ) -> Optional[Type[CompileState]]:
713 try:
714 return cls.plugins[
715 (plugin_name, statement._effective_plugin_target)
716 ]
717 except KeyError:
718 return None
719
720 @classmethod
721 def plugin_for(
722 cls, plugin_name: str, visit_name: str
723 ) -> Callable[[_Fn], _Fn]:
724 def decorate(cls_to_decorate):
725 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate
726 return cls_to_decorate
727
728 return decorate
729
730
731class Generative(HasMemoized):
732 """Provide a method-chaining pattern in conjunction with the
733 @_generative decorator."""
734
735 def _generate(self) -> Self:
736 skip = self._memoized_keys
737 cls = self.__class__
738 s = cls.__new__(cls)
739 if skip:
740 # ensure this iteration remains atomic
741 s.__dict__ = {
742 k: v for k, v in self.__dict__.copy().items() if k not in skip
743 }
744 else:
745 s.__dict__ = self.__dict__.copy()
746 return s
747
748
749class InPlaceGenerative(HasMemoized):
750 """Provide a method-chaining pattern in conjunction with the
751 @_generative decorator that mutates in place."""
752
753 __slots__ = ()
754
755 def _generate(self):
756 skip = self._memoized_keys
757 # note __dict__ needs to be in __slots__ if this is used
758 for k in skip:
759 self.__dict__.pop(k, None)
760 return self
761
762
763class HasCompileState(Generative):
764 """A class that has a :class:`.CompileState` associated with it."""
765
766 _compile_state_plugin: Optional[Type[CompileState]] = None
767
768 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT
769
770 _compile_state_factory = CompileState.create_for_statement
771
772
773class _MetaOptions(type):
774 """metaclass for the Options class.
775
776 This metaclass is actually necessary despite the availability of the
777 ``__init_subclass__()`` hook as this type also provides custom class-level
778 behavior for the ``__add__()`` method.
779
780 """
781
782 _cache_attrs: Tuple[str, ...]
783
784 def __add__(self, other):
785 o1 = self()
786
787 if set(other).difference(self._cache_attrs):
788 raise TypeError(
789 "dictionary contains attributes not covered by "
790 "Options class %s: %r"
791 % (self, set(other).difference(self._cache_attrs))
792 )
793
794 o1.__dict__.update(other)
795 return o1
796
797 if TYPE_CHECKING:
798
799 def __getattr__(self, key: str) -> Any: ...
800
801 def __setattr__(self, key: str, value: Any) -> None: ...
802
803 def __delattr__(self, key: str) -> None: ...
804
805
806class Options(metaclass=_MetaOptions):
807 """A cacheable option dictionary with defaults."""
808
809 __slots__ = ()
810
811 _cache_attrs: Tuple[str, ...]
812
813 def __init_subclass__(cls) -> None:
814 dict_ = cls.__dict__
815 cls._cache_attrs = tuple(
816 sorted(
817 d
818 for d in dict_
819 if not d.startswith("__")
820 and d not in ("_cache_key_traversal",)
821 )
822 )
823 super().__init_subclass__()
824
825 def __init__(self, **kw):
826 self.__dict__.update(kw)
827
828 def __add__(self, other):
829 o1 = self.__class__.__new__(self.__class__)
830 o1.__dict__.update(self.__dict__)
831
832 if set(other).difference(self._cache_attrs):
833 raise TypeError(
834 "dictionary contains attributes not covered by "
835 "Options class %s: %r"
836 % (self, set(other).difference(self._cache_attrs))
837 )
838
839 o1.__dict__.update(other)
840 return o1
841
842 def __eq__(self, other):
843 # TODO: very inefficient. This is used only in test suites
844 # right now.
845 for a, b in zip_longest(self._cache_attrs, other._cache_attrs):
846 if getattr(self, a) != getattr(other, b):
847 return False
848 return True
849
850 def __repr__(self):
851 # TODO: fairly inefficient, used only in debugging right now.
852
853 return "%s(%s)" % (
854 self.__class__.__name__,
855 ", ".join(
856 "%s=%r" % (k, self.__dict__[k])
857 for k in self._cache_attrs
858 if k in self.__dict__
859 ),
860 )
861
862 @classmethod
863 def isinstance(cls, klass: Type[Any]) -> bool:
864 return issubclass(cls, klass)
865
866 @hybridmethod
867 def add_to_element(self, name, value):
868 return self + {name: getattr(self, name) + value}
869
870 @hybridmethod
871 def _state_dict_inst(self) -> Mapping[str, Any]:
872 return self.__dict__
873
874 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT
875
876 @_state_dict_inst.classlevel
877 def _state_dict(cls) -> Mapping[str, Any]:
878 return cls._state_dict_const
879
880 @classmethod
881 def safe_merge(cls, other):
882 d = other._state_dict()
883
884 # only support a merge with another object of our class
885 # and which does not have attrs that we don't. otherwise
886 # we risk having state that might not be part of our cache
887 # key strategy
888
889 if (
890 cls is not other.__class__
891 and other._cache_attrs
892 and set(other._cache_attrs).difference(cls._cache_attrs)
893 ):
894 raise TypeError(
895 "other element %r is not empty, is not of type %s, "
896 "and contains attributes not covered here %r"
897 % (
898 other,
899 cls,
900 set(other._cache_attrs).difference(cls._cache_attrs),
901 )
902 )
903 return cls + d
904
905 @classmethod
906 def from_execution_options(
907 cls, key, attrs, exec_options, statement_exec_options
908 ):
909 """process Options argument in terms of execution options.
910
911
912 e.g.::
913
914 (
915 load_options,
916 execution_options,
917 ) = QueryContext.default_load_options.from_execution_options(
918 "_sa_orm_load_options",
919 {
920 "populate_existing",
921 "autoflush",
922 "yield_per"
923 },
924 execution_options,
925 statement._execution_options,
926 )
927
928 get back the Options and refresh "_sa_orm_load_options" in the
929 exec options dict w/ the Options as well
930
931 """
932
933 # common case is that no options we are looking for are
934 # in either dictionary, so cancel for that first
935 check_argnames = attrs.intersection(
936 set(exec_options).union(statement_exec_options)
937 )
938
939 existing_options = exec_options.get(key, cls)
940
941 if check_argnames:
942 result = {}
943 for argname in check_argnames:
944 local = "_" + argname
945 if argname in exec_options:
946 result[local] = exec_options[argname]
947 elif argname in statement_exec_options:
948 result[local] = statement_exec_options[argname]
949
950 new_options = existing_options + result
951 exec_options = util.immutabledict().merge_with(
952 exec_options, {key: new_options}
953 )
954 return new_options, exec_options
955
956 else:
957 return existing_options, exec_options
958
959 if TYPE_CHECKING:
960
961 def __getattr__(self, key: str) -> Any: ...
962
963 def __setattr__(self, key: str, value: Any) -> None: ...
964
965 def __delattr__(self, key: str) -> None: ...
966
967
968class CacheableOptions(Options, HasCacheKey):
969 __slots__ = ()
970
971 @hybridmethod
972 def _gen_cache_key_inst(self, anon_map, bindparams):
973 return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
974
975 @_gen_cache_key_inst.classlevel
976 def _gen_cache_key(cls, anon_map, bindparams):
977 return (cls, ())
978
979 @hybridmethod
980 def _generate_cache_key(self):
981 return HasCacheKey._generate_cache_key_for_object(self)
982
983
984class ExecutableOption(HasCopyInternals):
985 __slots__ = ()
986
987 _annotations = util.EMPTY_DICT
988
989 __visit_name__ = "executable_option"
990
991 _is_has_cache_key = False
992
993 _is_core = True
994
995 def _clone(self, **kw):
996 """Create a shallow copy of this ExecutableOption."""
997 c = self.__class__.__new__(self.__class__)
998 c.__dict__ = dict(self.__dict__) # type: ignore
999 return c
1000
1001
1002class Executable(roles.StatementRole):
1003 """Mark a :class:`_expression.ClauseElement` as supporting execution.
1004
1005 :class:`.Executable` is a superclass for all "statement" types
1006 of objects, including :func:`select`, :func:`delete`, :func:`update`,
1007 :func:`insert`, :func:`text`.
1008
1009 """
1010
1011 supports_execution: bool = True
1012 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT
1013 _is_default_generator = False
1014 _with_options: Tuple[ExecutableOption, ...] = ()
1015 _with_context_options: Tuple[
1016 Tuple[Callable[[CompileState], None], Any], ...
1017 ] = ()
1018 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]]
1019
1020 _executable_traverse_internals = [
1021 ("_with_options", InternalTraversal.dp_executable_options),
1022 (
1023 "_with_context_options",
1024 ExtendedInternalTraversal.dp_with_context_options,
1025 ),
1026 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
1027 ]
1028
1029 is_select = False
1030 is_from_statement = False
1031 is_update = False
1032 is_insert = False
1033 is_text = False
1034 is_delete = False
1035 is_dml = False
1036
1037 if TYPE_CHECKING:
1038 __visit_name__: str
1039
1040 def _compile_w_cache(
1041 self,
1042 dialect: Dialect,
1043 *,
1044 compiled_cache: Optional[CompiledCacheType],
1045 column_keys: List[str],
1046 for_executemany: bool = False,
1047 schema_translate_map: Optional[SchemaTranslateMapType] = None,
1048 **kw: Any,
1049 ) -> Tuple[
1050 Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats
1051 ]: ...
1052
1053 def _execute_on_connection(
1054 self,
1055 connection: Connection,
1056 distilled_params: _CoreMultiExecuteParams,
1057 execution_options: CoreExecuteOptionsParameter,
1058 ) -> CursorResult[Any]: ...
1059
1060 def _execute_on_scalar(
1061 self,
1062 connection: Connection,
1063 distilled_params: _CoreMultiExecuteParams,
1064 execution_options: CoreExecuteOptionsParameter,
1065 ) -> Any: ...
1066
1067 @util.ro_non_memoized_property
1068 def _all_selected_columns(self):
1069 raise NotImplementedError()
1070
1071 @property
1072 def _effective_plugin_target(self) -> str:
1073 return self.__visit_name__
1074
1075 @_generative
1076 def options(self, *options: ExecutableOption) -> Self:
1077 """Apply options to this statement.
1078
1079 In the general sense, options are any kind of Python object
1080 that can be interpreted by the SQL compiler for the statement.
1081 These options can be consumed by specific dialects or specific kinds
1082 of compilers.
1083
1084 The most commonly known kind of option are the ORM level options
1085 that apply "eager load" and other loading behaviors to an ORM
1086 query. However, options can theoretically be used for many other
1087 purposes.
1088
1089 For background on specific kinds of options for specific kinds of
1090 statements, refer to the documentation for those option objects.
1091
1092 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to
1093 Core statement objects towards the goal of allowing unified
1094 Core / ORM querying capabilities.
1095
1096 .. seealso::
1097
1098 :ref:`loading_columns` - refers to options specific to the usage
1099 of ORM queries
1100
1101 :ref:`relationship_loader_options` - refers to options specific
1102 to the usage of ORM queries
1103
1104 """
1105 self._with_options += tuple(
1106 coercions.expect(roles.ExecutableOptionRole, opt)
1107 for opt in options
1108 )
1109 return self
1110
1111 @_generative
1112 def _set_compile_options(self, compile_options: CacheableOptions) -> Self:
1113 """Assign the compile options to a new value.
1114
1115 :param compile_options: appropriate CacheableOptions structure
1116
1117 """
1118
1119 self._compile_options = compile_options
1120 return self
1121
1122 @_generative
1123 def _update_compile_options(self, options: CacheableOptions) -> Self:
1124 """update the _compile_options with new keys."""
1125
1126 assert self._compile_options is not None
1127 self._compile_options += options
1128 return self
1129
1130 @_generative
1131 def _add_context_option(
1132 self,
1133 callable_: Callable[[CompileState], None],
1134 cache_args: Any,
1135 ) -> Self:
1136 """Add a context option to this statement.
1137
1138 These are callable functions that will
1139 be given the CompileState object upon compilation.
1140
1141 A second argument cache_args is required, which will be combined with
1142 the ``__code__`` identity of the function itself in order to produce a
1143 cache key.
1144
1145 """
1146 self._with_context_options += ((callable_, cache_args),)
1147 return self
1148
1149 @overload
1150 def execution_options(
1151 self,
1152 *,
1153 compiled_cache: Optional[CompiledCacheType] = ...,
1154 logging_token: str = ...,
1155 isolation_level: IsolationLevel = ...,
1156 no_parameters: bool = False,
1157 stream_results: bool = False,
1158 max_row_buffer: int = ...,
1159 yield_per: int = ...,
1160 insertmanyvalues_page_size: int = ...,
1161 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
1162 populate_existing: bool = False,
1163 autoflush: bool = False,
1164 synchronize_session: SynchronizeSessionArgument = ...,
1165 dml_strategy: DMLStrategyArgument = ...,
1166 render_nulls: bool = ...,
1167 is_delete_using: bool = ...,
1168 is_update_from: bool = ...,
1169 preserve_rowcount: bool = False,
1170 **opt: Any,
1171 ) -> Self: ...
1172
1173 @overload
1174 def execution_options(self, **opt: Any) -> Self: ...
1175
1176 @_generative
1177 def execution_options(self, **kw: Any) -> Self:
1178 """Set non-SQL options for the statement which take effect during
1179 execution.
1180
1181 Execution options can be set at many scopes, including per-statement,
1182 per-connection, or per execution, using methods such as
1183 :meth:`_engine.Connection.execution_options` and parameters which
1184 accept a dictionary of options such as
1185 :paramref:`_engine.Connection.execute.execution_options` and
1186 :paramref:`_orm.Session.execute.execution_options`.
1187
1188 The primary characteristic of an execution option, as opposed to
1189 other kinds of options such as ORM loader options, is that
1190 **execution options never affect the compiled SQL of a query, only
1191 things that affect how the SQL statement itself is invoked or how
1192 results are fetched**. That is, execution options are not part of
1193 what's accommodated by SQL compilation nor are they considered part of
1194 the cached state of a statement.
1195
1196 The :meth:`_sql.Executable.execution_options` method is
1197 :term:`generative`, as
1198 is the case for the method as applied to the :class:`_engine.Engine`
1199 and :class:`_orm.Query` objects, which means when the method is called,
1200 a copy of the object is returned, which applies the given parameters to
1201 that new copy, but leaves the original unchanged::
1202
1203 statement = select(table.c.x, table.c.y)
1204 new_statement = statement.execution_options(my_option=True)
1205
1206 An exception to this behavior is the :class:`_engine.Connection`
1207 object, where the :meth:`_engine.Connection.execution_options` method
1208 is explicitly **not** generative.
1209
1210 The kinds of options that may be passed to
1211 :meth:`_sql.Executable.execution_options` and other related methods and
1212 parameter dictionaries include parameters that are explicitly consumed
1213 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not
1214 defined by SQLAlchemy, which means the methods and/or parameter
1215 dictionaries may be used for user-defined parameters that interact with
1216 custom code, which may access the parameters using methods such as
1217 :meth:`_sql.Executable.get_execution_options` and
1218 :meth:`_engine.Connection.get_execution_options`, or within selected
1219 event hooks using a dedicated ``execution_options`` event parameter
1220 such as
1221 :paramref:`_events.ConnectionEvents.before_execute.execution_options`
1222 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.::
1223
1224 from sqlalchemy import event
1225
1226 @event.listens_for(some_engine, "before_execute")
1227 def _process_opt(conn, statement, multiparams, params, execution_options):
1228 "run a SQL function before invoking a statement"
1229
1230 if execution_options.get("do_special_thing", False):
1231 conn.exec_driver_sql("run_special_function()")
1232
1233 Within the scope of options that are explicitly recognized by
1234 SQLAlchemy, most apply to specific classes of objects and not others.
1235 The most common execution options include:
1236
1237 * :paramref:`_engine.Connection.execution_options.isolation_level` -
1238 sets the isolation level for a connection or a class of connections
1239 via an :class:`_engine.Engine`. This option is accepted only
1240 by :class:`_engine.Connection` or :class:`_engine.Engine`.
1241
1242 * :paramref:`_engine.Connection.execution_options.stream_results` -
1243 indicates results should be fetched using a server side cursor;
1244 this option is accepted by :class:`_engine.Connection`, by the
1245 :paramref:`_engine.Connection.execute.execution_options` parameter
1246 on :meth:`_engine.Connection.execute`, and additionally by
1247 :meth:`_sql.Executable.execution_options` on a SQL statement object,
1248 as well as by ORM constructs like :meth:`_orm.Session.execute`.
1249
1250 * :paramref:`_engine.Connection.execution_options.compiled_cache` -
1251 indicates a dictionary that will serve as the
1252 :ref:`SQL compilation cache <sql_caching>`
1253 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as
1254 well as for ORM methods like :meth:`_orm.Session.execute`.
1255 Can be passed as ``None`` to disable caching for statements.
1256 This option is not accepted by
1257 :meth:`_sql.Executable.execution_options` as it is inadvisable to
1258 carry along a compilation cache within a statement object.
1259
1260 * :paramref:`_engine.Connection.execution_options.schema_translate_map`
1261 - a mapping of schema names used by the
1262 :ref:`Schema Translate Map <schema_translating>` feature, accepted
1263 by :class:`_engine.Connection`, :class:`_engine.Engine`,
1264 :class:`_sql.Executable`, as well as by ORM constructs
1265 like :meth:`_orm.Session.execute`.
1266
1267 .. seealso::
1268
1269 :meth:`_engine.Connection.execution_options`
1270
1271 :paramref:`_engine.Connection.execute.execution_options`
1272
1273 :paramref:`_orm.Session.execute.execution_options`
1274
1275 :ref:`orm_queryguide_execution_options` - documentation on all
1276 ORM-specific execution options
1277
1278 """ # noqa: E501
1279 if "isolation_level" in kw:
1280 raise exc.ArgumentError(
1281 "'isolation_level' execution option may only be specified "
1282 "on Connection.execution_options(), or "
1283 "per-engine using the isolation_level "
1284 "argument to create_engine()."
1285 )
1286 if "compiled_cache" in kw:
1287 raise exc.ArgumentError(
1288 "'compiled_cache' execution option may only be specified "
1289 "on Connection.execution_options(), not per statement."
1290 )
1291 self._execution_options = self._execution_options.union(kw)
1292 return self
1293
1294 def get_execution_options(self) -> _ExecuteOptions:
1295 """Get the non-SQL options which will take effect during execution.
1296
1297 .. versionadded:: 1.3
1298
1299 .. seealso::
1300
1301 :meth:`.Executable.execution_options`
1302 """
1303 return self._execution_options
1304
1305
1306class SchemaEventTarget(event.EventTarget):
1307 """Base class for elements that are the targets of :class:`.DDLEvents`
1308 events.
1309
1310 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
1311
1312 """
1313
1314 dispatch: dispatcher[SchemaEventTarget]
1315
1316 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
1317 """Associate with this SchemaEvent's parent object."""
1318
1319 def _set_parent_with_dispatch(
1320 self, parent: SchemaEventTarget, **kw: Any
1321 ) -> None:
1322 self.dispatch.before_parent_attach(self, parent)
1323 self._set_parent(parent, **kw)
1324 self.dispatch.after_parent_attach(self, parent)
1325
1326
1327class SchemaVisitor(ClauseVisitor):
1328 """Define the visiting for ``SchemaItem`` objects."""
1329
1330 __traverse_options__ = {"schema_visitor": True}
1331
1332
1333class _SentinelDefaultCharacterization(Enum):
1334 NONE = "none"
1335 UNKNOWN = "unknown"
1336 CLIENTSIDE = "clientside"
1337 SENTINEL_DEFAULT = "sentinel_default"
1338 SERVERSIDE = "serverside"
1339 IDENTITY = "identity"
1340 SEQUENCE = "sequence"
1341
1342
1343class _SentinelColumnCharacterization(NamedTuple):
1344 columns: Optional[Sequence[Column[Any]]] = None
1345 is_explicit: bool = False
1346 is_autoinc: bool = False
1347 default_characterization: _SentinelDefaultCharacterization = (
1348 _SentinelDefaultCharacterization.NONE
1349 )
1350
1351
1352_COLKEY = TypeVar("_COLKEY", Union[None, str], str)
1353
1354_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True)
1355_COL = TypeVar("_COL", bound="ColumnElement[Any]")
1356
1357
1358class _ColumnMetrics(Generic[_COL_co]):
1359 __slots__ = ("column",)
1360
1361 column: _COL_co
1362
1363 def __init__(
1364 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co
1365 ):
1366 self.column = col
1367
1368 # proxy_index being non-empty means it was initialized.
1369 # so we need to update it
1370 pi = collection._proxy_index
1371 if pi:
1372 for eps_col in col._expanded_proxy_set:
1373 pi[eps_col].add(self)
1374
1375 def get_expanded_proxy_set(self):
1376 return self.column._expanded_proxy_set
1377
1378 def dispose(self, collection):
1379 pi = collection._proxy_index
1380 if not pi:
1381 return
1382 for col in self.column._expanded_proxy_set:
1383 colset = pi.get(col, None)
1384 if colset:
1385 colset.discard(self)
1386 if colset is not None and not colset:
1387 del pi[col]
1388
1389 def embedded(
1390 self,
1391 target_set: Union[
1392 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]]
1393 ],
1394 ) -> bool:
1395 expanded_proxy_set = self.column._expanded_proxy_set
1396 for t in target_set.difference(expanded_proxy_set):
1397 if not expanded_proxy_set.intersection(_expand_cloned([t])):
1398 return False
1399 return True
1400
1401
1402class ColumnCollection(Generic[_COLKEY, _COL_co]):
1403 """Collection of :class:`_expression.ColumnElement` instances,
1404 typically for
1405 :class:`_sql.FromClause` objects.
1406
1407 The :class:`_sql.ColumnCollection` object is most commonly available
1408 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection
1409 on the :class:`_schema.Table` object, introduced at
1410 :ref:`metadata_tables_and_columns`.
1411
1412 The :class:`_expression.ColumnCollection` has both mapping- and sequence-
1413 like behaviors. A :class:`_expression.ColumnCollection` usually stores
1414 :class:`_schema.Column` objects, which are then accessible both via mapping
1415 style access as well as attribute access style.
1416
1417 To access :class:`_schema.Column` objects using ordinary attribute-style
1418 access, specify the name like any other object attribute, such as below
1419 a column named ``employee_name`` is accessed::
1420
1421 >>> employee_table.c.employee_name
1422
1423 To access columns that have names with special characters or spaces,
1424 index-style access is used, such as below which illustrates a column named
1425 ``employee ' payment`` is accessed::
1426
1427 >>> employee_table.c["employee ' payment"]
1428
1429 As the :class:`_sql.ColumnCollection` object provides a Python dictionary
1430 interface, common dictionary method names like
1431 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`,
1432 and :meth:`_sql.ColumnCollection.items` are available, which means that
1433 database columns that are keyed under these names also need to use indexed
1434 access::
1435
1436 >>> employee_table.c["values"]
1437
1438
1439 The name for which a :class:`_schema.Column` would be present is normally
1440 that of the :paramref:`_schema.Column.key` parameter. In some contexts,
1441 such as a :class:`_sql.Select` object that uses a label style set
1442 using the :meth:`_sql.Select.set_label_style` method, a column of a certain
1443 key may instead be represented under a particular label name such
1444 as ``tablename_columnname``::
1445
1446 >>> from sqlalchemy import select, column, table
1447 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
1448 >>> t = table("t", column("c"))
1449 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
1450 >>> subq = stmt.subquery()
1451 >>> subq.c.t_c
1452 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c>
1453
1454 :class:`.ColumnCollection` also indexes the columns in order and allows
1455 them to be accessible by their integer position::
1456
1457 >>> cc[0]
1458 Column('x', Integer(), table=None)
1459 >>> cc[1]
1460 Column('y', Integer(), table=None)
1461
1462 .. versionadded:: 1.4 :class:`_expression.ColumnCollection`
1463 allows integer-based
1464 index access to the collection.
1465
1466 Iterating the collection yields the column expressions in order::
1467
1468 >>> list(cc)
1469 [Column('x', Integer(), table=None),
1470 Column('y', Integer(), table=None)]
1471
1472 The base :class:`_expression.ColumnCollection` object can store
1473 duplicates, which can
1474 mean either two columns with the same key, in which case the column
1475 returned by key access is **arbitrary**::
1476
1477 >>> x1, x2 = Column('x', Integer), Column('x', Integer)
1478 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)])
1479 >>> list(cc)
1480 [Column('x', Integer(), table=None),
1481 Column('x', Integer(), table=None)]
1482 >>> cc['x'] is x1
1483 False
1484 >>> cc['x'] is x2
1485 True
1486
1487 Or it can also mean the same column multiple times. These cases are
1488 supported as :class:`_expression.ColumnCollection`
1489 is used to represent the columns in
1490 a SELECT statement which may include duplicates.
1491
1492 A special subclass :class:`.DedupeColumnCollection` exists which instead
1493 maintains SQLAlchemy's older behavior of not allowing duplicates; this
1494 collection is used for schema level objects like :class:`_schema.Table`
1495 and
1496 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The
1497 :class:`.DedupeColumnCollection` class also has additional mutation methods
1498 as the schema constructs have more use cases that require removal and
1499 replacement of columns.
1500
1501 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection`
1502 now stores duplicate
1503 column keys as well as the same column in multiple positions. The
1504 :class:`.DedupeColumnCollection` class is added to maintain the
1505 former behavior in those cases where deduplication as well as
1506 additional replace/remove operations are needed.
1507
1508
1509 """
1510
1511 __slots__ = "_collection", "_index", "_colset", "_proxy_index"
1512
1513 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]]
1514 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
1515 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]]
1516 _colset: Set[_COL_co]
1517
1518 def __init__(
1519 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None
1520 ):
1521 object.__setattr__(self, "_colset", set())
1522 object.__setattr__(self, "_index", {})
1523 object.__setattr__(
1524 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1525 )
1526 object.__setattr__(self, "_collection", [])
1527 if columns:
1528 self._initial_populate(columns)
1529
1530 @util.preload_module("sqlalchemy.sql.elements")
1531 def __clause_element__(self) -> ClauseList:
1532 elements = util.preloaded.sql_elements
1533
1534 return elements.ClauseList(
1535 _literal_as_text_role=roles.ColumnsClauseRole,
1536 group=False,
1537 *self._all_columns,
1538 )
1539
1540 def _initial_populate(
1541 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1542 ) -> None:
1543 self._populate_separate_keys(iter_)
1544
1545 @property
1546 def _all_columns(self) -> List[_COL_co]:
1547 return [col for (_, col, _) in self._collection]
1548
1549 def keys(self) -> List[_COLKEY]:
1550 """Return a sequence of string key names for all columns in this
1551 collection."""
1552 return [k for (k, _, _) in self._collection]
1553
1554 def values(self) -> List[_COL_co]:
1555 """Return a sequence of :class:`_sql.ColumnClause` or
1556 :class:`_schema.Column` objects for all columns in this
1557 collection."""
1558 return [col for (_, col, _) in self._collection]
1559
1560 def items(self) -> List[Tuple[_COLKEY, _COL_co]]:
1561 """Return a sequence of (key, column) tuples for all columns in this
1562 collection each consisting of a string key name and a
1563 :class:`_sql.ColumnClause` or
1564 :class:`_schema.Column` object.
1565 """
1566
1567 return [(k, col) for (k, col, _) in self._collection]
1568
1569 def __bool__(self) -> bool:
1570 return bool(self._collection)
1571
1572 def __len__(self) -> int:
1573 return len(self._collection)
1574
1575 def __iter__(self) -> Iterator[_COL_co]:
1576 # turn to a list first to maintain over a course of changes
1577 return iter([col for _, col, _ in self._collection])
1578
1579 @overload
1580 def __getitem__(self, key: Union[str, int]) -> _COL_co: ...
1581
1582 @overload
1583 def __getitem__(
1584 self, key: Tuple[Union[str, int], ...]
1585 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1586
1587 @overload
1588 def __getitem__(
1589 self, key: slice
1590 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1591
1592 def __getitem__(
1593 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]]
1594 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]:
1595 try:
1596 if isinstance(key, (tuple, slice)):
1597 if isinstance(key, slice):
1598 cols = (
1599 (sub_key, col)
1600 for (sub_key, col, _) in self._collection[key]
1601 )
1602 else:
1603 cols = (self._index[sub_key] for sub_key in key)
1604
1605 return ColumnCollection(cols).as_readonly()
1606 else:
1607 return self._index[key][1]
1608 except KeyError as err:
1609 if isinstance(err.args[0], int):
1610 raise IndexError(err.args[0]) from err
1611 else:
1612 raise
1613
1614 def __getattr__(self, key: str) -> _COL_co:
1615 try:
1616 return self._index[key][1]
1617 except KeyError as err:
1618 raise AttributeError(key) from err
1619
1620 def __contains__(self, key: str) -> bool:
1621 if key not in self._index:
1622 if not isinstance(key, str):
1623 raise exc.ArgumentError(
1624 "__contains__ requires a string argument"
1625 )
1626 return False
1627 else:
1628 return True
1629
1630 def compare(self, other: ColumnCollection[Any, Any]) -> bool:
1631 """Compare this :class:`_expression.ColumnCollection` to another
1632 based on the names of the keys"""
1633
1634 for l, r in zip_longest(self, other):
1635 if l is not r:
1636 return False
1637 else:
1638 return True
1639
1640 def __eq__(self, other: Any) -> bool:
1641 return self.compare(other)
1642
1643 @overload
1644 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ...
1645
1646 @overload
1647 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ...
1648
1649 def get(
1650 self, key: str, default: Optional[_COL] = None
1651 ) -> Optional[Union[_COL_co, _COL]]:
1652 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object
1653 based on a string key name from this
1654 :class:`_expression.ColumnCollection`."""
1655
1656 if key in self._index:
1657 return self._index[key][1]
1658 else:
1659 return default
1660
1661 def __str__(self) -> str:
1662 return "%s(%s)" % (
1663 self.__class__.__name__,
1664 ", ".join(str(c) for c in self),
1665 )
1666
1667 def __setitem__(self, key: str, value: Any) -> NoReturn:
1668 raise NotImplementedError()
1669
1670 def __delitem__(self, key: str) -> NoReturn:
1671 raise NotImplementedError()
1672
1673 def __setattr__(self, key: str, obj: Any) -> NoReturn:
1674 raise NotImplementedError()
1675
1676 def clear(self) -> NoReturn:
1677 """Dictionary clear() is not implemented for
1678 :class:`_sql.ColumnCollection`."""
1679 raise NotImplementedError()
1680
1681 def remove(self, column: Any) -> None:
1682 raise NotImplementedError()
1683
1684 def update(self, iter_: Any) -> NoReturn:
1685 """Dictionary update() is not implemented for
1686 :class:`_sql.ColumnCollection`."""
1687 raise NotImplementedError()
1688
1689 # https://github.com/python/mypy/issues/4266
1690 __hash__ = None # type: ignore
1691
1692 def _populate_separate_keys(
1693 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1694 ) -> None:
1695 """populate from an iterator of (key, column)"""
1696
1697 self._collection[:] = collection = [
1698 (k, c, _ColumnMetrics(self, c)) for k, c in iter_
1699 ]
1700 self._colset.update(c._deannotate() for _, c, _ in collection)
1701 self._index.update(
1702 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)}
1703 )
1704 self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
1705
1706 def add(
1707 self, column: ColumnElement[Any], key: Optional[_COLKEY] = None
1708 ) -> None:
1709 """Add a column to this :class:`_sql.ColumnCollection`.
1710
1711 .. note::
1712
1713 This method is **not normally used by user-facing code**, as the
1714 :class:`_sql.ColumnCollection` is usually part of an existing
1715 object such as a :class:`_schema.Table`. To add a
1716 :class:`_schema.Column` to an existing :class:`_schema.Table`
1717 object, use the :meth:`_schema.Table.append_column` method.
1718
1719 """
1720 colkey: _COLKEY
1721
1722 if key is None:
1723 colkey = column.key # type: ignore
1724 else:
1725 colkey = key
1726
1727 l = len(self._collection)
1728
1729 # don't really know how this part is supposed to work w/ the
1730 # covariant thing
1731
1732 _column = cast(_COL_co, column)
1733
1734 self._collection.append(
1735 (colkey, _column, _ColumnMetrics(self, _column))
1736 )
1737 self._colset.add(_column._deannotate())
1738 self._index[l] = (colkey, _column)
1739 if colkey not in self._index:
1740 self._index[colkey] = (colkey, _column)
1741
1742 def __getstate__(self) -> Dict[str, Any]:
1743 return {
1744 "_collection": [(k, c) for k, c, _ in self._collection],
1745 "_index": self._index,
1746 }
1747
1748 def __setstate__(self, state: Dict[str, Any]) -> None:
1749 object.__setattr__(self, "_index", state["_index"])
1750 object.__setattr__(
1751 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1752 )
1753 object.__setattr__(
1754 self,
1755 "_collection",
1756 [
1757 (k, c, _ColumnMetrics(self, c))
1758 for (k, c) in state["_collection"]
1759 ],
1760 )
1761 object.__setattr__(
1762 self, "_colset", {col for k, col, _ in self._collection}
1763 )
1764
1765 def contains_column(self, col: ColumnElement[Any]) -> bool:
1766 """Checks if a column object exists in this collection"""
1767 if col not in self._colset:
1768 if isinstance(col, str):
1769 raise exc.ArgumentError(
1770 "contains_column cannot be used with string arguments. "
1771 "Use ``col_name in table.c`` instead."
1772 )
1773 return False
1774 else:
1775 return True
1776
1777 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
1778 """Return a "read only" form of this
1779 :class:`_sql.ColumnCollection`."""
1780
1781 return ReadOnlyColumnCollection(self)
1782
1783 def _init_proxy_index(self):
1784 """populate the "proxy index", if empty.
1785
1786 proxy index is added in 2.0 to provide more efficient operation
1787 for the corresponding_column() method.
1788
1789 For reasons of both time to construct new .c collections as well as
1790 memory conservation for large numbers of large .c collections, the
1791 proxy_index is only filled if corresponding_column() is called. once
1792 filled it stays that way, and new _ColumnMetrics objects created after
1793 that point will populate it with new data. Note this case would be
1794 unusual, if not nonexistent, as it means a .c collection is being
1795 mutated after corresponding_column() were used, however it is tested in
1796 test/base/test_utils.py.
1797
1798 """
1799 pi = self._proxy_index
1800 if pi:
1801 return
1802
1803 for _, _, metrics in self._collection:
1804 eps = metrics.column._expanded_proxy_set
1805
1806 for eps_col in eps:
1807 pi[eps_col].add(metrics)
1808
1809 def corresponding_column(
1810 self, column: _COL, require_embedded: bool = False
1811 ) -> Optional[Union[_COL, _COL_co]]:
1812 """Given a :class:`_expression.ColumnElement`, return the exported
1813 :class:`_expression.ColumnElement` object from this
1814 :class:`_expression.ColumnCollection`
1815 which corresponds to that original :class:`_expression.ColumnElement`
1816 via a common
1817 ancestor column.
1818
1819 :param column: the target :class:`_expression.ColumnElement`
1820 to be matched.
1821
1822 :param require_embedded: only return corresponding columns for
1823 the given :class:`_expression.ColumnElement`, if the given
1824 :class:`_expression.ColumnElement`
1825 is actually present within a sub-element
1826 of this :class:`_expression.Selectable`.
1827 Normally the column will match if
1828 it merely shares a common ancestor with one of the exported
1829 columns of this :class:`_expression.Selectable`.
1830
1831 .. seealso::
1832
1833 :meth:`_expression.Selectable.corresponding_column`
1834 - invokes this method
1835 against the collection returned by
1836 :attr:`_expression.Selectable.exported_columns`.
1837
1838 .. versionchanged:: 1.4 the implementation for ``corresponding_column``
1839 was moved onto the :class:`_expression.ColumnCollection` itself.
1840
1841 """
1842 # TODO: cython candidate
1843
1844 # don't dig around if the column is locally present
1845 if column in self._colset:
1846 return column
1847
1848 selected_intersection, selected_metrics = None, None
1849 target_set = column.proxy_set
1850
1851 pi = self._proxy_index
1852 if not pi:
1853 self._init_proxy_index()
1854
1855 for current_metrics in (
1856 mm for ts in target_set if ts in pi for mm in pi[ts]
1857 ):
1858 if not require_embedded or current_metrics.embedded(target_set):
1859 if selected_metrics is None:
1860 # no corresponding column yet, pick this one.
1861 selected_metrics = current_metrics
1862 continue
1863
1864 current_intersection = target_set.intersection(
1865 current_metrics.column._expanded_proxy_set
1866 )
1867 if selected_intersection is None:
1868 selected_intersection = target_set.intersection(
1869 selected_metrics.column._expanded_proxy_set
1870 )
1871
1872 if len(current_intersection) > len(selected_intersection):
1873 # 'current' has a larger field of correspondence than
1874 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x
1875 # matches a1.c.x->table.c.x better than
1876 # selectable.c.x->table.c.x does.
1877
1878 selected_metrics = current_metrics
1879 selected_intersection = current_intersection
1880 elif current_intersection == selected_intersection:
1881 # they have the same field of correspondence. see
1882 # which proxy_set has fewer columns in it, which
1883 # indicates a closer relationship with the root
1884 # column. Also take into account the "weight"
1885 # attribute which CompoundSelect() uses to give
1886 # higher precedence to columns based on vertical
1887 # position in the compound statement, and discard
1888 # columns that have no reference to the target
1889 # column (also occurs with CompoundSelect)
1890
1891 selected_col_distance = sum(
1892 [
1893 sc._annotations.get("weight", 1)
1894 for sc in (
1895 selected_metrics.column._uncached_proxy_list()
1896 )
1897 if sc.shares_lineage(column)
1898 ],
1899 )
1900 current_col_distance = sum(
1901 [
1902 sc._annotations.get("weight", 1)
1903 for sc in (
1904 current_metrics.column._uncached_proxy_list()
1905 )
1906 if sc.shares_lineage(column)
1907 ],
1908 )
1909 if current_col_distance < selected_col_distance:
1910 selected_metrics = current_metrics
1911 selected_intersection = current_intersection
1912
1913 return selected_metrics.column if selected_metrics else None
1914
1915
1916_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]")
1917
1918
1919class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
1920 """A :class:`_expression.ColumnCollection`
1921 that maintains deduplicating behavior.
1922
1923 This is useful by schema level objects such as :class:`_schema.Table` and
1924 :class:`.PrimaryKeyConstraint`. The collection includes more
1925 sophisticated mutator methods as well to suit schema objects which
1926 require mutable column collections.
1927
1928 .. versionadded:: 1.4
1929
1930 """
1931
1932 def add( # type: ignore[override]
1933 self, column: _NAMEDCOL, key: Optional[str] = None
1934 ) -> None:
1935 if key is not None and column.key != key:
1936 raise exc.ArgumentError(
1937 "DedupeColumnCollection requires columns be under "
1938 "the same key as their .key"
1939 )
1940 key = column.key
1941
1942 if key is None:
1943 raise exc.ArgumentError(
1944 "Can't add unnamed column to column collection"
1945 )
1946
1947 if key in self._index:
1948 existing = self._index[key][1]
1949
1950 if existing is column:
1951 return
1952
1953 self.replace(column)
1954
1955 # pop out memoized proxy_set as this
1956 # operation may very well be occurring
1957 # in a _make_proxy operation
1958 util.memoized_property.reset(column, "proxy_set")
1959 else:
1960 self._append_new_column(key, column)
1961
1962 def _append_new_column(self, key: str, named_column: _NAMEDCOL) -> None:
1963 l = len(self._collection)
1964 self._collection.append(
1965 (key, named_column, _ColumnMetrics(self, named_column))
1966 )
1967 self._colset.add(named_column._deannotate())
1968 self._index[l] = (key, named_column)
1969 self._index[key] = (key, named_column)
1970
1971 def _populate_separate_keys(
1972 self, iter_: Iterable[Tuple[str, _NAMEDCOL]]
1973 ) -> None:
1974 """populate from an iterator of (key, column)"""
1975 cols = list(iter_)
1976
1977 replace_col = []
1978 for k, col in cols:
1979 if col.key != k:
1980 raise exc.ArgumentError(
1981 "DedupeColumnCollection requires columns be under "
1982 "the same key as their .key"
1983 )
1984 if col.name in self._index and col.key != col.name:
1985 replace_col.append(col)
1986 elif col.key in self._index:
1987 replace_col.append(col)
1988 else:
1989 self._index[k] = (k, col)
1990 self._collection.append((k, col, _ColumnMetrics(self, col)))
1991 self._colset.update(c._deannotate() for (k, c, _) in self._collection)
1992
1993 self._index.update(
1994 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection)
1995 )
1996 for col in replace_col:
1997 self.replace(col)
1998
1999 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None:
2000 self._populate_separate_keys((col.key, col) for col in iter_)
2001
2002 def remove(self, column: _NAMEDCOL) -> None:
2003 if column not in self._colset:
2004 raise ValueError(
2005 "Can't remove column %r; column is not in this collection"
2006 % column
2007 )
2008 del self._index[column.key]
2009 self._colset.remove(column)
2010 self._collection[:] = [
2011 (k, c, metrics)
2012 for (k, c, metrics) in self._collection
2013 if c is not column
2014 ]
2015 for metrics in self._proxy_index.get(column, ()):
2016 metrics.dispose(self)
2017
2018 self._index.update(
2019 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2020 )
2021 # delete higher index
2022 del self._index[len(self._collection)]
2023
2024 def replace(
2025 self,
2026 column: _NAMEDCOL,
2027 extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
2028 ) -> None:
2029 """add the given column to this collection, removing unaliased
2030 versions of this column as well as existing columns with the
2031 same key.
2032
2033 e.g.::
2034
2035 t = Table('sometable', metadata, Column('col1', Integer))
2036 t.columns.replace(Column('col1', Integer, key='columnone'))
2037
2038 will remove the original 'col1' from the collection, and add
2039 the new column under the name 'columnname'.
2040
2041 Used by schema.Column to override columns during table reflection.
2042
2043 """
2044
2045 if extra_remove:
2046 remove_col = set(extra_remove)
2047 else:
2048 remove_col = set()
2049 # remove up to two columns based on matches of name as well as key
2050 if column.name in self._index and column.key != column.name:
2051 other = self._index[column.name][1]
2052 if other.name == other.key:
2053 remove_col.add(other)
2054
2055 if column.key in self._index:
2056 remove_col.add(self._index[column.key][1])
2057
2058 if not remove_col:
2059 self._append_new_column(column.key, column)
2060 return
2061 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
2062 replaced = False
2063 for k, col, metrics in self._collection:
2064 if col in remove_col:
2065 if not replaced:
2066 replaced = True
2067 new_cols.append(
2068 (column.key, column, _ColumnMetrics(self, column))
2069 )
2070 else:
2071 new_cols.append((k, col, metrics))
2072
2073 if remove_col:
2074 self._colset.difference_update(remove_col)
2075
2076 for rc in remove_col:
2077 for metrics in self._proxy_index.get(rc, ()):
2078 metrics.dispose(self)
2079
2080 if not replaced:
2081 new_cols.append((column.key, column, _ColumnMetrics(self, column)))
2082
2083 self._colset.add(column._deannotate())
2084 self._collection[:] = new_cols
2085
2086 self._index.clear()
2087
2088 self._index.update(
2089 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2090 )
2091 self._index.update({k: (k, col) for (k, col, _) in self._collection})
2092
2093
2094class ReadOnlyColumnCollection(
2095 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co]
2096):
2097 __slots__ = ("_parent",)
2098
2099 def __init__(self, collection):
2100 object.__setattr__(self, "_parent", collection)
2101 object.__setattr__(self, "_colset", collection._colset)
2102 object.__setattr__(self, "_index", collection._index)
2103 object.__setattr__(self, "_collection", collection._collection)
2104 object.__setattr__(self, "_proxy_index", collection._proxy_index)
2105
2106 def __getstate__(self):
2107 return {"_parent": self._parent}
2108
2109 def __setstate__(self, state):
2110 parent = state["_parent"]
2111 self.__init__(parent) # type: ignore
2112
2113 def add(self, column: Any, key: Any = ...) -> Any:
2114 self._readonly()
2115
2116 def extend(self, elements: Any) -> NoReturn:
2117 self._readonly()
2118
2119 def remove(self, item: Any) -> NoReturn:
2120 self._readonly()
2121
2122
2123class ColumnSet(util.OrderedSet["ColumnClause[Any]"]):
2124 def contains_column(self, col):
2125 return col in self
2126
2127 def extend(self, cols):
2128 for col in cols:
2129 self.add(col)
2130
2131 def __eq__(self, other):
2132 l = []
2133 for c in other:
2134 for local in self:
2135 if c.shares_lineage(local):
2136 l.append(c == local)
2137 return elements.and_(*l)
2138
2139 def __hash__(self):
2140 return hash(tuple(x for x in self))
2141
2142
2143def _entity_namespace(
2144 entity: Union[_HasEntityNamespace, ExternallyTraversible]
2145) -> _EntityNamespace:
2146 """Return the nearest .entity_namespace for the given entity.
2147
2148 If not immediately available, does an iterate to find a sub-element
2149 that has one, if any.
2150
2151 """
2152 try:
2153 return cast(_HasEntityNamespace, entity).entity_namespace
2154 except AttributeError:
2155 for elem in visitors.iterate(cast(ExternallyTraversible, entity)):
2156 if _is_has_entity_namespace(elem):
2157 return elem.entity_namespace
2158 else:
2159 raise
2160
2161
2162def _entity_namespace_key(
2163 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2164 key: str,
2165 default: Union[SQLCoreOperations[Any], _NoArg] = NO_ARG,
2166) -> SQLCoreOperations[Any]:
2167 """Return an entry from an entity_namespace.
2168
2169
2170 Raises :class:`_exc.InvalidRequestError` rather than attribute error
2171 on not found.
2172
2173 """
2174
2175 try:
2176 ns = _entity_namespace(entity)
2177 if default is not NO_ARG:
2178 return getattr(ns, key, default)
2179 else:
2180 return getattr(ns, key) # type: ignore
2181 except AttributeError as err:
2182 raise exc.InvalidRequestError(
2183 'Entity namespace for "%s" has no property "%s"' % (entity, key)
2184 ) from err