1# sql/base.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Foundational utilities common to many sql modules.
10
11"""
12
13
14from __future__ import annotations
15
16import collections
17from enum import Enum
18import itertools
19from itertools import zip_longest
20import operator
21import re
22from typing import Any
23from typing import Callable
24from typing import cast
25from typing import Dict
26from typing import FrozenSet
27from typing import Generic
28from typing import Iterable
29from typing import Iterator
30from typing import List
31from typing import Mapping
32from typing import MutableMapping
33from typing import NamedTuple
34from typing import NoReturn
35from typing import Optional
36from typing import overload
37from typing import Protocol
38from typing import Sequence
39from typing import Set
40from typing import Tuple
41from typing import Type
42from typing import TYPE_CHECKING
43from typing import TypeVar
44from typing import Union
45
46from . import roles
47from . import visitors
48from .cache_key import HasCacheKey # noqa
49from .cache_key import MemoizedHasCacheKey # noqa
50from .traversals import HasCopyInternals # noqa
51from .visitors import ClauseVisitor
52from .visitors import ExtendedInternalTraversal
53from .visitors import ExternallyTraversible
54from .visitors import InternalTraversal
55from .. import event
56from .. import exc
57from .. import util
58from ..util import HasMemoized as HasMemoized
59from ..util import hybridmethod
60from ..util.typing import Self
61from ..util.typing import TypeGuard
62
63if TYPE_CHECKING:
64 from . import coercions
65 from . import elements
66 from . import type_api
67 from ._orm_types import DMLStrategyArgument
68 from ._orm_types import SynchronizeSessionArgument
69 from ._typing import _CLE
70 from .elements import BindParameter
71 from .elements import ClauseList
72 from .elements import ColumnClause # noqa
73 from .elements import ColumnElement
74 from .elements import NamedColumn
75 from .elements import SQLCoreOperations
76 from .elements import TextClause
77 from .schema import Column
78 from .schema import DefaultGenerator
79 from .selectable import _JoinTargetElement
80 from .selectable import _SelectIterable
81 from .selectable import FromClause
82 from ..engine import Connection
83 from ..engine import CursorResult
84 from ..engine.interfaces import _CoreMultiExecuteParams
85 from ..engine.interfaces import _ExecuteOptions
86 from ..engine.interfaces import _ImmutableExecuteOptions
87 from ..engine.interfaces import CacheStats
88 from ..engine.interfaces import Compiled
89 from ..engine.interfaces import CompiledCacheType
90 from ..engine.interfaces import CoreExecuteOptionsParameter
91 from ..engine.interfaces import Dialect
92 from ..engine.interfaces import IsolationLevel
93 from ..engine.interfaces import SchemaTranslateMapType
94 from ..event import dispatcher
95
96if not TYPE_CHECKING:
97 coercions = None # noqa
98 elements = None # noqa
99 type_api = None # noqa
100
101
102class _NoArg(Enum):
103 NO_ARG = 0
104
105 def __repr__(self):
106 return f"_NoArg.{self.name}"
107
108
109NO_ARG = _NoArg.NO_ARG
110
111
112class _NoneName(Enum):
113 NONE_NAME = 0
114 """indicate a 'deferred' name that was ultimately the value None."""
115
116
117_NONE_NAME = _NoneName.NONE_NAME
118
119_T = TypeVar("_T", bound=Any)
120
121_Fn = TypeVar("_Fn", bound=Callable[..., Any])
122
123_AmbiguousTableNameMap = MutableMapping[str, str]
124
125
126class _DefaultDescriptionTuple(NamedTuple):
127 arg: Any
128 is_scalar: Optional[bool]
129 is_callable: Optional[bool]
130 is_sentinel: Optional[bool]
131
132 @classmethod
133 def _from_column_default(
134 cls, default: Optional[DefaultGenerator]
135 ) -> _DefaultDescriptionTuple:
136 return (
137 _DefaultDescriptionTuple(
138 default.arg, # type: ignore
139 default.is_scalar,
140 default.is_callable,
141 default.is_sentinel,
142 )
143 if default
144 and (
145 default.has_arg
146 or (not default.for_update and default.is_sentinel)
147 )
148 else _DefaultDescriptionTuple(None, None, None, None)
149 )
150
151
152_never_select_column = operator.attrgetter("_omit_from_statements")
153
154
155class _EntityNamespace(Protocol):
156 def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ...
157
158
159class _HasEntityNamespace(Protocol):
160 @util.ro_non_memoized_property
161 def entity_namespace(self) -> _EntityNamespace: ...
162
163
164def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]:
165 return hasattr(element, "entity_namespace")
166
167
168# Remove when https://github.com/python/mypy/issues/14640 will be fixed
169_Self = TypeVar("_Self", bound=Any)
170
171
172class Immutable:
173 """mark a ClauseElement as 'immutable' when expressions are cloned.
174
175 "immutable" objects refers to the "mutability" of an object in the
176 context of SQL DQL and DML generation. Such as, in DQL, one can
177 compose a SELECT or subquery of varied forms, but one cannot modify
178 the structure of a specific table or column within DQL.
179 :class:`.Immutable` is mostly intended to follow this concept, and as
180 such the primary "immutable" objects are :class:`.ColumnClause`,
181 :class:`.Column`, :class:`.TableClause`, :class:`.Table`.
182
183 """
184
185 __slots__ = ()
186
187 _is_immutable = True
188
189 def unique_params(self, *optionaldict, **kwargs):
190 raise NotImplementedError("Immutable objects do not support copying")
191
192 def params(self, *optionaldict, **kwargs):
193 raise NotImplementedError("Immutable objects do not support copying")
194
195 def _clone(self: _Self, **kw: Any) -> _Self:
196 return self
197
198 def _copy_internals(
199 self, *, omit_attrs: Iterable[str] = (), **kw: Any
200 ) -> None:
201 pass
202
203
204class SingletonConstant(Immutable):
205 """Represent SQL constants like NULL, TRUE, FALSE"""
206
207 _is_singleton_constant = True
208
209 _singleton: SingletonConstant
210
211 def __new__(cls: _T, *arg: Any, **kw: Any) -> _T:
212 return cast(_T, cls._singleton)
213
214 @util.non_memoized_property
215 def proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
216 raise NotImplementedError()
217
218 @classmethod
219 def _create_singleton(cls):
220 obj = object.__new__(cls)
221 obj.__init__() # type: ignore
222
223 # for a long time this was an empty frozenset, meaning
224 # a SingletonConstant would never be a "corresponding column" in
225 # a statement. This referred to #6259. However, in #7154 we see
226 # that we do in fact need "correspondence" to work when matching cols
227 # in result sets, so the non-correspondence was moved to a more
228 # specific level when we are actually adapting expressions for SQL
229 # render only.
230 obj.proxy_set = frozenset([obj])
231 cls._singleton = obj
232
233
234def _from_objects(
235 *elements: Union[
236 ColumnElement[Any], FromClause, TextClause, _JoinTargetElement
237 ]
238) -> Iterator[FromClause]:
239 return itertools.chain.from_iterable(
240 [element._from_objects for element in elements]
241 )
242
243
244def _select_iterables(
245 elements: Iterable[roles.ColumnsClauseRole],
246) -> _SelectIterable:
247 """expand tables into individual columns in the
248 given list of column expressions.
249
250 """
251 return itertools.chain.from_iterable(
252 [c._select_iterable for c in elements]
253 )
254
255
256_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType")
257
258
259class _GenerativeType(Protocol):
260 def _generate(self) -> Self: ...
261
262
263def _generative(fn: _Fn) -> _Fn:
264 """non-caching _generative() decorator.
265
266 This is basically the legacy decorator that copies the object and
267 runs a method on the new copy.
268
269 """
270
271 @util.decorator
272 def _generative(
273 fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any
274 ) -> _SelfGenerativeType:
275 """Mark a method as generative."""
276
277 self = self._generate()
278 x = fn(self, *args, **kw)
279 assert x is self, "generative methods must return self"
280 return self
281
282 decorated = _generative(fn)
283 decorated.non_generative = fn # type: ignore
284 return decorated
285
286
287def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]:
288 msgs = kw.pop("msgs", {})
289
290 defaults = kw.pop("defaults", {})
291
292 getters = [
293 (name, operator.attrgetter(name), defaults.get(name, None))
294 for name in names
295 ]
296
297 @util.decorator
298 def check(fn, *args, **kw):
299 # make pylance happy by not including "self" in the argument
300 # list
301 self = args[0]
302 args = args[1:]
303 for name, getter, default_ in getters:
304 if getter(self) is not default_:
305 msg = msgs.get(
306 name,
307 "Method %s() has already been invoked on this %s construct"
308 % (fn.__name__, self.__class__),
309 )
310 raise exc.InvalidRequestError(msg)
311 return fn(self, *args, **kw)
312
313 return check
314
315
316def _clone(element, **kw):
317 return element._clone(**kw)
318
319
320def _expand_cloned(
321 elements: Iterable[_CLE],
322) -> Iterable[_CLE]:
323 """expand the given set of ClauseElements to be the set of all 'cloned'
324 predecessors.
325
326 """
327 # TODO: cython candidate
328 return itertools.chain(*[x._cloned_set for x in elements])
329
330
331def _de_clone(
332 elements: Iterable[_CLE],
333) -> Iterable[_CLE]:
334 for x in elements:
335 while x._is_clone_of is not None:
336 x = x._is_clone_of
337 yield x
338
339
340def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
341 """return the intersection of sets a and b, counting
342 any overlap between 'cloned' predecessors.
343
344 The returned set is in terms of the entities present within 'a'.
345
346 """
347 all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
348 return {elem for elem in a if all_overlap.intersection(elem._cloned_set)}
349
350
351def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
352 all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
353 return {
354 elem for elem in a if not all_overlap.intersection(elem._cloned_set)
355 }
356
357
358class _DialectArgView(MutableMapping[str, Any]):
359 """A dictionary view of dialect-level arguments in the form
360 <dialectname>_<argument_name>.
361
362 """
363
364 def __init__(self, obj):
365 self.obj = obj
366
367 def _key(self, key):
368 try:
369 dialect, value_key = key.split("_", 1)
370 except ValueError as err:
371 raise KeyError(key) from err
372 else:
373 return dialect, value_key
374
375 def __getitem__(self, key):
376 dialect, value_key = self._key(key)
377
378 try:
379 opt = self.obj.dialect_options[dialect]
380 except exc.NoSuchModuleError as err:
381 raise KeyError(key) from err
382 else:
383 return opt[value_key]
384
385 def __setitem__(self, key, value):
386 try:
387 dialect, value_key = self._key(key)
388 except KeyError as err:
389 raise exc.ArgumentError(
390 "Keys must be of the form <dialectname>_<argname>"
391 ) from err
392 else:
393 self.obj.dialect_options[dialect][value_key] = value
394
395 def __delitem__(self, key):
396 dialect, value_key = self._key(key)
397 del self.obj.dialect_options[dialect][value_key]
398
399 def __len__(self):
400 return sum(
401 len(args._non_defaults)
402 for args in self.obj.dialect_options.values()
403 )
404
405 def __iter__(self):
406 return (
407 "%s_%s" % (dialect_name, value_name)
408 for dialect_name in self.obj.dialect_options
409 for value_name in self.obj.dialect_options[
410 dialect_name
411 ]._non_defaults
412 )
413
414
415class _DialectArgDict(MutableMapping[str, Any]):
416 """A dictionary view of dialect-level arguments for a specific
417 dialect.
418
419 Maintains a separate collection of user-specified arguments
420 and dialect-specified default arguments.
421
422 """
423
424 def __init__(self):
425 self._non_defaults = {}
426 self._defaults = {}
427
428 def __len__(self):
429 return len(set(self._non_defaults).union(self._defaults))
430
431 def __iter__(self):
432 return iter(set(self._non_defaults).union(self._defaults))
433
434 def __getitem__(self, key):
435 if key in self._non_defaults:
436 return self._non_defaults[key]
437 else:
438 return self._defaults[key]
439
440 def __setitem__(self, key, value):
441 self._non_defaults[key] = value
442
443 def __delitem__(self, key):
444 del self._non_defaults[key]
445
446
447@util.preload_module("sqlalchemy.dialects")
448def _kw_reg_for_dialect(dialect_name):
449 dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
450 if dialect_cls.construct_arguments is None:
451 return None
452 return dict(dialect_cls.construct_arguments)
453
454
455class DialectKWArgs:
456 """Establish the ability for a class to have dialect-specific arguments
457 with defaults and constructor validation.
458
459 The :class:`.DialectKWArgs` interacts with the
460 :attr:`.DefaultDialect.construct_arguments` present on a dialect.
461
462 .. seealso::
463
464 :attr:`.DefaultDialect.construct_arguments`
465
466 """
467
468 __slots__ = ()
469
470 _dialect_kwargs_traverse_internals = [
471 ("dialect_options", InternalTraversal.dp_dialect_options)
472 ]
473
474 @classmethod
475 def argument_for(cls, dialect_name, argument_name, default):
476 """Add a new kind of dialect-specific keyword argument for this class.
477
478 E.g.::
479
480 Index.argument_for("mydialect", "length", None)
481
482 some_index = Index('a', 'b', mydialect_length=5)
483
484 The :meth:`.DialectKWArgs.argument_for` method is a per-argument
485 way adding extra arguments to the
486 :attr:`.DefaultDialect.construct_arguments` dictionary. This
487 dictionary provides a list of argument names accepted by various
488 schema-level constructs on behalf of a dialect.
489
490 New dialects should typically specify this dictionary all at once as a
491 data member of the dialect class. The use case for ad-hoc addition of
492 argument names is typically for end-user code that is also using
493 a custom compilation scheme which consumes the additional arguments.
494
495 :param dialect_name: name of a dialect. The dialect must be
496 locatable, else a :class:`.NoSuchModuleError` is raised. The
497 dialect must also include an existing
498 :attr:`.DefaultDialect.construct_arguments` collection, indicating
499 that it participates in the keyword-argument validation and default
500 system, else :class:`.ArgumentError` is raised. If the dialect does
501 not include this collection, then any keyword argument can be
502 specified on behalf of this dialect already. All dialects packaged
503 within SQLAlchemy include this collection, however for third party
504 dialects, support may vary.
505
506 :param argument_name: name of the parameter.
507
508 :param default: default value of the parameter.
509
510 """
511
512 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
513 if construct_arg_dictionary is None:
514 raise exc.ArgumentError(
515 "Dialect '%s' does have keyword-argument "
516 "validation and defaults enabled configured" % dialect_name
517 )
518 if cls not in construct_arg_dictionary:
519 construct_arg_dictionary[cls] = {}
520 construct_arg_dictionary[cls][argument_name] = default
521
522 @util.memoized_property
523 def dialect_kwargs(self):
524 """A collection of keyword arguments specified as dialect-specific
525 options to this construct.
526
527 The arguments are present here in their original ``<dialect>_<kwarg>``
528 format. Only arguments that were actually passed are included;
529 unlike the :attr:`.DialectKWArgs.dialect_options` collection, which
530 contains all options known by this dialect including defaults.
531
532 The collection is also writable; keys are accepted of the
533 form ``<dialect>_<kwarg>`` where the value will be assembled
534 into the list of options.
535
536 .. seealso::
537
538 :attr:`.DialectKWArgs.dialect_options` - nested dictionary form
539
540 """
541 return _DialectArgView(self)
542
543 @property
544 def kwargs(self):
545 """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
546 return self.dialect_kwargs
547
548 _kw_registry = util.PopulateDict(_kw_reg_for_dialect)
549
550 def _kw_reg_for_dialect_cls(self, dialect_name):
551 construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
552 d = _DialectArgDict()
553
554 if construct_arg_dictionary is None:
555 d._defaults.update({"*": None})
556 else:
557 for cls in reversed(self.__class__.__mro__):
558 if cls in construct_arg_dictionary:
559 d._defaults.update(construct_arg_dictionary[cls])
560 return d
561
562 @util.memoized_property
563 def dialect_options(self):
564 """A collection of keyword arguments specified as dialect-specific
565 options to this construct.
566
567 This is a two-level nested registry, keyed to ``<dialect_name>``
568 and ``<argument_name>``. For example, the ``postgresql_where``
569 argument would be locatable as::
570
571 arg = my_object.dialect_options['postgresql']['where']
572
573 .. versionadded:: 0.9.2
574
575 .. seealso::
576
577 :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form
578
579 """
580
581 return util.PopulateDict(
582 util.portable_instancemethod(self._kw_reg_for_dialect_cls)
583 )
584
585 def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None:
586 # validate remaining kwargs that they all specify DB prefixes
587
588 if not kwargs:
589 return
590
591 for k in kwargs:
592 m = re.match("^(.+?)_(.+)$", k)
593 if not m:
594 raise TypeError(
595 "Additional arguments should be "
596 "named <dialectname>_<argument>, got '%s'" % k
597 )
598 dialect_name, arg_name = m.group(1, 2)
599
600 try:
601 construct_arg_dictionary = self.dialect_options[dialect_name]
602 except exc.NoSuchModuleError:
603 util.warn(
604 "Can't validate argument %r; can't "
605 "locate any SQLAlchemy dialect named %r"
606 % (k, dialect_name)
607 )
608 self.dialect_options[dialect_name] = d = _DialectArgDict()
609 d._defaults.update({"*": None})
610 d._non_defaults[arg_name] = kwargs[k]
611 else:
612 if (
613 "*" not in construct_arg_dictionary
614 and arg_name not in construct_arg_dictionary
615 ):
616 raise exc.ArgumentError(
617 "Argument %r is not accepted by "
618 "dialect %r on behalf of %r"
619 % (k, dialect_name, self.__class__)
620 )
621 else:
622 construct_arg_dictionary[arg_name] = kwargs[k]
623
624
625class CompileState:
626 """Produces additional object state necessary for a statement to be
627 compiled.
628
629 the :class:`.CompileState` class is at the base of classes that assemble
630 state for a particular statement object that is then used by the
631 compiler. This process is essentially an extension of the process that
632 the SQLCompiler.visit_XYZ() method takes, however there is an emphasis
633 on converting raw user intent into more organized structures rather than
634 producing string output. The top-level :class:`.CompileState` for the
635 statement being executed is also accessible when the execution context
636 works with invoking the statement and collecting results.
637
638 The production of :class:`.CompileState` is specific to the compiler, such
639 as within the :meth:`.SQLCompiler.visit_insert`,
640 :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also
641 responsible for associating the :class:`.CompileState` with the
642 :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement,
643 i.e. the outermost SQL statement that's actually being executed.
644 There can be other :class:`.CompileState` objects that are not the
645 toplevel, such as when a SELECT subquery or CTE-nested
646 INSERT/UPDATE/DELETE is generated.
647
648 .. versionadded:: 1.4
649
650 """
651
652 __slots__ = ("statement", "_ambiguous_table_name_map")
653
654 plugins: Dict[Tuple[str, str], Type[CompileState]] = {}
655
656 _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap]
657
658 @classmethod
659 def create_for_statement(cls, statement, compiler, **kw):
660 # factory construction.
661
662 if statement._propagate_attrs:
663 plugin_name = statement._propagate_attrs.get(
664 "compile_state_plugin", "default"
665 )
666 klass = cls.plugins.get(
667 (plugin_name, statement._effective_plugin_target), None
668 )
669 if klass is None:
670 klass = cls.plugins[
671 ("default", statement._effective_plugin_target)
672 ]
673
674 else:
675 klass = cls.plugins[
676 ("default", statement._effective_plugin_target)
677 ]
678
679 if klass is cls:
680 return cls(statement, compiler, **kw)
681 else:
682 return klass.create_for_statement(statement, compiler, **kw)
683
684 def __init__(self, statement, compiler, **kw):
685 self.statement = statement
686
687 @classmethod
688 def get_plugin_class(
689 cls, statement: Executable
690 ) -> Optional[Type[CompileState]]:
691 plugin_name = statement._propagate_attrs.get(
692 "compile_state_plugin", None
693 )
694
695 if plugin_name:
696 key = (plugin_name, statement._effective_plugin_target)
697 if key in cls.plugins:
698 return cls.plugins[key]
699
700 # there's no case where we call upon get_plugin_class() and want
701 # to get None back, there should always be a default. return that
702 # if there was no plugin-specific class (e.g. Insert with "orm"
703 # plugin)
704 try:
705 return cls.plugins[("default", statement._effective_plugin_target)]
706 except KeyError:
707 return None
708
709 @classmethod
710 def _get_plugin_class_for_plugin(
711 cls, statement: Executable, plugin_name: str
712 ) -> Optional[Type[CompileState]]:
713 try:
714 return cls.plugins[
715 (plugin_name, statement._effective_plugin_target)
716 ]
717 except KeyError:
718 return None
719
720 @classmethod
721 def plugin_for(
722 cls, plugin_name: str, visit_name: str
723 ) -> Callable[[_Fn], _Fn]:
724 def decorate(cls_to_decorate):
725 cls.plugins[(plugin_name, visit_name)] = cls_to_decorate
726 return cls_to_decorate
727
728 return decorate
729
730
731class Generative(HasMemoized):
732 """Provide a method-chaining pattern in conjunction with the
733 @_generative decorator."""
734
735 def _generate(self) -> Self:
736 skip = self._memoized_keys
737 cls = self.__class__
738 s = cls.__new__(cls)
739 if skip:
740 # ensure this iteration remains atomic
741 s.__dict__ = {
742 k: v for k, v in self.__dict__.copy().items() if k not in skip
743 }
744 else:
745 s.__dict__ = self.__dict__.copy()
746 return s
747
748
749class InPlaceGenerative(HasMemoized):
750 """Provide a method-chaining pattern in conjunction with the
751 @_generative decorator that mutates in place."""
752
753 __slots__ = ()
754
755 def _generate(self):
756 skip = self._memoized_keys
757 # note __dict__ needs to be in __slots__ if this is used
758 for k in skip:
759 self.__dict__.pop(k, None)
760 return self
761
762
763class HasCompileState(Generative):
764 """A class that has a :class:`.CompileState` associated with it."""
765
766 _compile_state_plugin: Optional[Type[CompileState]] = None
767
768 _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT
769
770 _compile_state_factory = CompileState.create_for_statement
771
772
773class _MetaOptions(type):
774 """metaclass for the Options class.
775
776 This metaclass is actually necessary despite the availability of the
777 ``__init_subclass__()`` hook as this type also provides custom class-level
778 behavior for the ``__add__()`` method.
779
780 """
781
782 _cache_attrs: Tuple[str, ...]
783
784 def __add__(self, other):
785 o1 = self()
786
787 if set(other).difference(self._cache_attrs):
788 raise TypeError(
789 "dictionary contains attributes not covered by "
790 "Options class %s: %r"
791 % (self, set(other).difference(self._cache_attrs))
792 )
793
794 o1.__dict__.update(other)
795 return o1
796
797 if TYPE_CHECKING:
798
799 def __getattr__(self, key: str) -> Any: ...
800
801 def __setattr__(self, key: str, value: Any) -> None: ...
802
803 def __delattr__(self, key: str) -> None: ...
804
805
806class Options(metaclass=_MetaOptions):
807 """A cacheable option dictionary with defaults."""
808
809 __slots__ = ()
810
811 _cache_attrs: Tuple[str, ...]
812
813 def __init_subclass__(cls) -> None:
814 dict_ = cls.__dict__
815 cls._cache_attrs = tuple(
816 sorted(
817 d
818 for d in dict_
819 if not d.startswith("__")
820 and d not in ("_cache_key_traversal",)
821 )
822 )
823 super().__init_subclass__()
824
825 def __init__(self, **kw):
826 self.__dict__.update(kw)
827
828 def __add__(self, other):
829 o1 = self.__class__.__new__(self.__class__)
830 o1.__dict__.update(self.__dict__)
831
832 if set(other).difference(self._cache_attrs):
833 raise TypeError(
834 "dictionary contains attributes not covered by "
835 "Options class %s: %r"
836 % (self, set(other).difference(self._cache_attrs))
837 )
838
839 o1.__dict__.update(other)
840 return o1
841
842 def __eq__(self, other):
843 # TODO: very inefficient. This is used only in test suites
844 # right now.
845 for a, b in zip_longest(self._cache_attrs, other._cache_attrs):
846 if getattr(self, a) != getattr(other, b):
847 return False
848 return True
849
850 def __repr__(self):
851 # TODO: fairly inefficient, used only in debugging right now.
852
853 return "%s(%s)" % (
854 self.__class__.__name__,
855 ", ".join(
856 "%s=%r" % (k, self.__dict__[k])
857 for k in self._cache_attrs
858 if k in self.__dict__
859 ),
860 )
861
862 @classmethod
863 def isinstance(cls, klass: Type[Any]) -> bool:
864 return issubclass(cls, klass)
865
866 @hybridmethod
867 def add_to_element(self, name, value):
868 return self + {name: getattr(self, name) + value}
869
870 @hybridmethod
871 def _state_dict_inst(self) -> Mapping[str, Any]:
872 return self.__dict__
873
874 _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT
875
876 @_state_dict_inst.classlevel
877 def _state_dict(cls) -> Mapping[str, Any]:
878 return cls._state_dict_const
879
880 @classmethod
881 def safe_merge(cls, other):
882 d = other._state_dict()
883
884 # only support a merge with another object of our class
885 # and which does not have attrs that we don't. otherwise
886 # we risk having state that might not be part of our cache
887 # key strategy
888
889 if (
890 cls is not other.__class__
891 and other._cache_attrs
892 and set(other._cache_attrs).difference(cls._cache_attrs)
893 ):
894 raise TypeError(
895 "other element %r is not empty, is not of type %s, "
896 "and contains attributes not covered here %r"
897 % (
898 other,
899 cls,
900 set(other._cache_attrs).difference(cls._cache_attrs),
901 )
902 )
903 return cls + d
904
905 @classmethod
906 def from_execution_options(
907 cls, key, attrs, exec_options, statement_exec_options
908 ):
909 """process Options argument in terms of execution options.
910
911
912 e.g.::
913
914 (
915 load_options,
916 execution_options,
917 ) = QueryContext.default_load_options.from_execution_options(
918 "_sa_orm_load_options",
919 {
920 "populate_existing",
921 "autoflush",
922 "yield_per"
923 },
924 execution_options,
925 statement._execution_options,
926 )
927
928 get back the Options and refresh "_sa_orm_load_options" in the
929 exec options dict w/ the Options as well
930
931 """
932
933 # common case is that no options we are looking for are
934 # in either dictionary, so cancel for that first
935 check_argnames = attrs.intersection(
936 set(exec_options).union(statement_exec_options)
937 )
938
939 existing_options = exec_options.get(key, cls)
940
941 if check_argnames:
942 result = {}
943 for argname in check_argnames:
944 local = "_" + argname
945 if argname in exec_options:
946 result[local] = exec_options[argname]
947 elif argname in statement_exec_options:
948 result[local] = statement_exec_options[argname]
949
950 new_options = existing_options + result
951 exec_options = util.immutabledict().merge_with(
952 exec_options, {key: new_options}
953 )
954 return new_options, exec_options
955
956 else:
957 return existing_options, exec_options
958
959 if TYPE_CHECKING:
960
961 def __getattr__(self, key: str) -> Any: ...
962
963 def __setattr__(self, key: str, value: Any) -> None: ...
964
965 def __delattr__(self, key: str) -> None: ...
966
967
968class CacheableOptions(Options, HasCacheKey):
969 __slots__ = ()
970
971 @hybridmethod
972 def _gen_cache_key_inst(self, anon_map, bindparams):
973 return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
974
975 @_gen_cache_key_inst.classlevel
976 def _gen_cache_key(cls, anon_map, bindparams):
977 return (cls, ())
978
979 @hybridmethod
980 def _generate_cache_key(self):
981 return HasCacheKey._generate_cache_key_for_object(self)
982
983
984class ExecutableOption(HasCopyInternals):
985 __slots__ = ()
986
987 _annotations = util.EMPTY_DICT
988
989 __visit_name__ = "executable_option"
990
991 _is_has_cache_key = False
992
993 _is_core = True
994
995 def _clone(self, **kw):
996 """Create a shallow copy of this ExecutableOption."""
997 c = self.__class__.__new__(self.__class__)
998 c.__dict__ = dict(self.__dict__) # type: ignore
999 return c
1000
1001
1002class Executable(roles.StatementRole):
1003 """Mark a :class:`_expression.ClauseElement` as supporting execution.
1004
1005 :class:`.Executable` is a superclass for all "statement" types
1006 of objects, including :func:`select`, :func:`delete`, :func:`update`,
1007 :func:`insert`, :func:`text`.
1008
1009 """
1010
1011 supports_execution: bool = True
1012 _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT
1013 _is_default_generator = False
1014 _with_options: Tuple[ExecutableOption, ...] = ()
1015 _with_context_options: Tuple[
1016 Tuple[Callable[[CompileState], None], Any], ...
1017 ] = ()
1018 _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]]
1019
1020 _executable_traverse_internals = [
1021 ("_with_options", InternalTraversal.dp_executable_options),
1022 (
1023 "_with_context_options",
1024 ExtendedInternalTraversal.dp_with_context_options,
1025 ),
1026 ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
1027 ]
1028
1029 is_select = False
1030 is_from_statement = False
1031 is_update = False
1032 is_insert = False
1033 is_text = False
1034 is_delete = False
1035 is_dml = False
1036
1037 if TYPE_CHECKING:
1038 __visit_name__: str
1039
1040 def _compile_w_cache(
1041 self,
1042 dialect: Dialect,
1043 *,
1044 compiled_cache: Optional[CompiledCacheType],
1045 column_keys: List[str],
1046 for_executemany: bool = False,
1047 schema_translate_map: Optional[SchemaTranslateMapType] = None,
1048 **kw: Any,
1049 ) -> Tuple[
1050 Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats
1051 ]: ...
1052
1053 def _execute_on_connection(
1054 self,
1055 connection: Connection,
1056 distilled_params: _CoreMultiExecuteParams,
1057 execution_options: CoreExecuteOptionsParameter,
1058 ) -> CursorResult[Any]: ...
1059
1060 def _execute_on_scalar(
1061 self,
1062 connection: Connection,
1063 distilled_params: _CoreMultiExecuteParams,
1064 execution_options: CoreExecuteOptionsParameter,
1065 ) -> Any: ...
1066
1067 @util.ro_non_memoized_property
1068 def _all_selected_columns(self):
1069 raise NotImplementedError()
1070
1071 @property
1072 def _effective_plugin_target(self) -> str:
1073 return self.__visit_name__
1074
1075 @_generative
1076 def options(self, *options: ExecutableOption) -> Self:
1077 """Apply options to this statement.
1078
1079 In the general sense, options are any kind of Python object
1080 that can be interpreted by the SQL compiler for the statement.
1081 These options can be consumed by specific dialects or specific kinds
1082 of compilers.
1083
1084 The most commonly known kind of option are the ORM level options
1085 that apply "eager load" and other loading behaviors to an ORM
1086 query. However, options can theoretically be used for many other
1087 purposes.
1088
1089 For background on specific kinds of options for specific kinds of
1090 statements, refer to the documentation for those option objects.
1091
1092 .. versionchanged:: 1.4 - added :meth:`.Executable.options` to
1093 Core statement objects towards the goal of allowing unified
1094 Core / ORM querying capabilities.
1095
1096 .. seealso::
1097
1098 :ref:`loading_columns` - refers to options specific to the usage
1099 of ORM queries
1100
1101 :ref:`relationship_loader_options` - refers to options specific
1102 to the usage of ORM queries
1103
1104 """
1105 self._with_options += tuple(
1106 coercions.expect(roles.ExecutableOptionRole, opt)
1107 for opt in options
1108 )
1109 return self
1110
1111 @_generative
1112 def _set_compile_options(self, compile_options: CacheableOptions) -> Self:
1113 """Assign the compile options to a new value.
1114
1115 :param compile_options: appropriate CacheableOptions structure
1116
1117 """
1118
1119 self._compile_options = compile_options
1120 return self
1121
1122 @_generative
1123 def _update_compile_options(self, options: CacheableOptions) -> Self:
1124 """update the _compile_options with new keys."""
1125
1126 assert self._compile_options is not None
1127 self._compile_options += options
1128 return self
1129
1130 @_generative
1131 def _add_context_option(
1132 self,
1133 callable_: Callable[[CompileState], None],
1134 cache_args: Any,
1135 ) -> Self:
1136 """Add a context option to this statement.
1137
1138 These are callable functions that will
1139 be given the CompileState object upon compilation.
1140
1141 A second argument cache_args is required, which will be combined with
1142 the ``__code__`` identity of the function itself in order to produce a
1143 cache key.
1144
1145 """
1146 self._with_context_options += ((callable_, cache_args),)
1147 return self
1148
1149 @overload
1150 def execution_options(
1151 self,
1152 *,
1153 compiled_cache: Optional[CompiledCacheType] = ...,
1154 logging_token: str = ...,
1155 isolation_level: IsolationLevel = ...,
1156 no_parameters: bool = False,
1157 stream_results: bool = False,
1158 max_row_buffer: int = ...,
1159 yield_per: int = ...,
1160 driver_column_names: bool = ...,
1161 insertmanyvalues_page_size: int = ...,
1162 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
1163 populate_existing: bool = False,
1164 autoflush: bool = False,
1165 synchronize_session: SynchronizeSessionArgument = ...,
1166 dml_strategy: DMLStrategyArgument = ...,
1167 render_nulls: bool = ...,
1168 is_delete_using: bool = ...,
1169 is_update_from: bool = ...,
1170 preserve_rowcount: bool = False,
1171 **opt: Any,
1172 ) -> Self: ...
1173
1174 @overload
1175 def execution_options(self, **opt: Any) -> Self: ...
1176
1177 @_generative
1178 def execution_options(self, **kw: Any) -> Self:
1179 """Set non-SQL options for the statement which take effect during
1180 execution.
1181
1182 Execution options can be set at many scopes, including per-statement,
1183 per-connection, or per execution, using methods such as
1184 :meth:`_engine.Connection.execution_options` and parameters which
1185 accept a dictionary of options such as
1186 :paramref:`_engine.Connection.execute.execution_options` and
1187 :paramref:`_orm.Session.execute.execution_options`.
1188
1189 The primary characteristic of an execution option, as opposed to
1190 other kinds of options such as ORM loader options, is that
1191 **execution options never affect the compiled SQL of a query, only
1192 things that affect how the SQL statement itself is invoked or how
1193 results are fetched**. That is, execution options are not part of
1194 what's accommodated by SQL compilation nor are they considered part of
1195 the cached state of a statement.
1196
1197 The :meth:`_sql.Executable.execution_options` method is
1198 :term:`generative`, as
1199 is the case for the method as applied to the :class:`_engine.Engine`
1200 and :class:`_orm.Query` objects, which means when the method is called,
1201 a copy of the object is returned, which applies the given parameters to
1202 that new copy, but leaves the original unchanged::
1203
1204 statement = select(table.c.x, table.c.y)
1205 new_statement = statement.execution_options(my_option=True)
1206
1207 An exception to this behavior is the :class:`_engine.Connection`
1208 object, where the :meth:`_engine.Connection.execution_options` method
1209 is explicitly **not** generative.
1210
1211 The kinds of options that may be passed to
1212 :meth:`_sql.Executable.execution_options` and other related methods and
1213 parameter dictionaries include parameters that are explicitly consumed
1214 by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not
1215 defined by SQLAlchemy, which means the methods and/or parameter
1216 dictionaries may be used for user-defined parameters that interact with
1217 custom code, which may access the parameters using methods such as
1218 :meth:`_sql.Executable.get_execution_options` and
1219 :meth:`_engine.Connection.get_execution_options`, or within selected
1220 event hooks using a dedicated ``execution_options`` event parameter
1221 such as
1222 :paramref:`_events.ConnectionEvents.before_execute.execution_options`
1223 or :attr:`_orm.ORMExecuteState.execution_options`, e.g.::
1224
1225 from sqlalchemy import event
1226
1227 @event.listens_for(some_engine, "before_execute")
1228 def _process_opt(conn, statement, multiparams, params, execution_options):
1229 "run a SQL function before invoking a statement"
1230
1231 if execution_options.get("do_special_thing", False):
1232 conn.exec_driver_sql("run_special_function()")
1233
1234 Within the scope of options that are explicitly recognized by
1235 SQLAlchemy, most apply to specific classes of objects and not others.
1236 The most common execution options include:
1237
1238 * :paramref:`_engine.Connection.execution_options.isolation_level` -
1239 sets the isolation level for a connection or a class of connections
1240 via an :class:`_engine.Engine`. This option is accepted only
1241 by :class:`_engine.Connection` or :class:`_engine.Engine`.
1242
1243 * :paramref:`_engine.Connection.execution_options.stream_results` -
1244 indicates results should be fetched using a server side cursor;
1245 this option is accepted by :class:`_engine.Connection`, by the
1246 :paramref:`_engine.Connection.execute.execution_options` parameter
1247 on :meth:`_engine.Connection.execute`, and additionally by
1248 :meth:`_sql.Executable.execution_options` on a SQL statement object,
1249 as well as by ORM constructs like :meth:`_orm.Session.execute`.
1250
1251 * :paramref:`_engine.Connection.execution_options.compiled_cache` -
1252 indicates a dictionary that will serve as the
1253 :ref:`SQL compilation cache <sql_caching>`
1254 for a :class:`_engine.Connection` or :class:`_engine.Engine`, as
1255 well as for ORM methods like :meth:`_orm.Session.execute`.
1256 Can be passed as ``None`` to disable caching for statements.
1257 This option is not accepted by
1258 :meth:`_sql.Executable.execution_options` as it is inadvisable to
1259 carry along a compilation cache within a statement object.
1260
1261 * :paramref:`_engine.Connection.execution_options.schema_translate_map`
1262 - a mapping of schema names used by the
1263 :ref:`Schema Translate Map <schema_translating>` feature, accepted
1264 by :class:`_engine.Connection`, :class:`_engine.Engine`,
1265 :class:`_sql.Executable`, as well as by ORM constructs
1266 like :meth:`_orm.Session.execute`.
1267
1268 .. seealso::
1269
1270 :meth:`_engine.Connection.execution_options`
1271
1272 :paramref:`_engine.Connection.execute.execution_options`
1273
1274 :paramref:`_orm.Session.execute.execution_options`
1275
1276 :ref:`orm_queryguide_execution_options` - documentation on all
1277 ORM-specific execution options
1278
1279 """ # noqa: E501
1280 if "isolation_level" in kw:
1281 raise exc.ArgumentError(
1282 "'isolation_level' execution option may only be specified "
1283 "on Connection.execution_options(), or "
1284 "per-engine using the isolation_level "
1285 "argument to create_engine()."
1286 )
1287 if "compiled_cache" in kw:
1288 raise exc.ArgumentError(
1289 "'compiled_cache' execution option may only be specified "
1290 "on Connection.execution_options(), not per statement."
1291 )
1292 self._execution_options = self._execution_options.union(kw)
1293 return self
1294
1295 def get_execution_options(self) -> _ExecuteOptions:
1296 """Get the non-SQL options which will take effect during execution.
1297
1298 .. versionadded:: 1.3
1299
1300 .. seealso::
1301
1302 :meth:`.Executable.execution_options`
1303 """
1304 return self._execution_options
1305
1306
1307class SchemaEventTarget(event.EventTarget):
1308 """Base class for elements that are the targets of :class:`.DDLEvents`
1309 events.
1310
1311 This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
1312
1313 """
1314
1315 dispatch: dispatcher[SchemaEventTarget]
1316
1317 def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
1318 """Associate with this SchemaEvent's parent object."""
1319
1320 def _set_parent_with_dispatch(
1321 self, parent: SchemaEventTarget, **kw: Any
1322 ) -> None:
1323 self.dispatch.before_parent_attach(self, parent)
1324 self._set_parent(parent, **kw)
1325 self.dispatch.after_parent_attach(self, parent)
1326
1327
1328class SchemaVisitor(ClauseVisitor):
1329 """Define the visiting for ``SchemaItem`` objects."""
1330
1331 __traverse_options__ = {"schema_visitor": True}
1332
1333
1334class _SentinelDefaultCharacterization(Enum):
1335 NONE = "none"
1336 UNKNOWN = "unknown"
1337 CLIENTSIDE = "clientside"
1338 SENTINEL_DEFAULT = "sentinel_default"
1339 SERVERSIDE = "serverside"
1340 IDENTITY = "identity"
1341 SEQUENCE = "sequence"
1342
1343
1344class _SentinelColumnCharacterization(NamedTuple):
1345 columns: Optional[Sequence[Column[Any]]] = None
1346 is_explicit: bool = False
1347 is_autoinc: bool = False
1348 default_characterization: _SentinelDefaultCharacterization = (
1349 _SentinelDefaultCharacterization.NONE
1350 )
1351
1352
1353_COLKEY = TypeVar("_COLKEY", Union[None, str], str)
1354
1355_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True)
1356_COL = TypeVar("_COL", bound="ColumnElement[Any]")
1357
1358
1359class _ColumnMetrics(Generic[_COL_co]):
1360 __slots__ = ("column",)
1361
1362 column: _COL_co
1363
1364 def __init__(
1365 self, collection: ColumnCollection[Any, _COL_co], col: _COL_co
1366 ):
1367 self.column = col
1368
1369 # proxy_index being non-empty means it was initialized.
1370 # so we need to update it
1371 pi = collection._proxy_index
1372 if pi:
1373 for eps_col in col._expanded_proxy_set:
1374 pi[eps_col].add(self)
1375
1376 def get_expanded_proxy_set(self):
1377 return self.column._expanded_proxy_set
1378
1379 def dispose(self, collection):
1380 pi = collection._proxy_index
1381 if not pi:
1382 return
1383 for col in self.column._expanded_proxy_set:
1384 colset = pi.get(col, None)
1385 if colset:
1386 colset.discard(self)
1387 if colset is not None and not colset:
1388 del pi[col]
1389
1390 def embedded(
1391 self,
1392 target_set: Union[
1393 Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]]
1394 ],
1395 ) -> bool:
1396 expanded_proxy_set = self.column._expanded_proxy_set
1397 for t in target_set.difference(expanded_proxy_set):
1398 if not expanded_proxy_set.intersection(_expand_cloned([t])):
1399 return False
1400 return True
1401
1402
1403class ColumnCollection(Generic[_COLKEY, _COL_co]):
1404 """Collection of :class:`_expression.ColumnElement` instances,
1405 typically for
1406 :class:`_sql.FromClause` objects.
1407
1408 The :class:`_sql.ColumnCollection` object is most commonly available
1409 as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection
1410 on the :class:`_schema.Table` object, introduced at
1411 :ref:`metadata_tables_and_columns`.
1412
1413 The :class:`_expression.ColumnCollection` has both mapping- and sequence-
1414 like behaviors. A :class:`_expression.ColumnCollection` usually stores
1415 :class:`_schema.Column` objects, which are then accessible both via mapping
1416 style access as well as attribute access style.
1417
1418 To access :class:`_schema.Column` objects using ordinary attribute-style
1419 access, specify the name like any other object attribute, such as below
1420 a column named ``employee_name`` is accessed::
1421
1422 >>> employee_table.c.employee_name
1423
1424 To access columns that have names with special characters or spaces,
1425 index-style access is used, such as below which illustrates a column named
1426 ``employee ' payment`` is accessed::
1427
1428 >>> employee_table.c["employee ' payment"]
1429
1430 As the :class:`_sql.ColumnCollection` object provides a Python dictionary
1431 interface, common dictionary method names like
1432 :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`,
1433 and :meth:`_sql.ColumnCollection.items` are available, which means that
1434 database columns that are keyed under these names also need to use indexed
1435 access::
1436
1437 >>> employee_table.c["values"]
1438
1439
1440 The name for which a :class:`_schema.Column` would be present is normally
1441 that of the :paramref:`_schema.Column.key` parameter. In some contexts,
1442 such as a :class:`_sql.Select` object that uses a label style set
1443 using the :meth:`_sql.Select.set_label_style` method, a column of a certain
1444 key may instead be represented under a particular label name such
1445 as ``tablename_columnname``::
1446
1447 >>> from sqlalchemy import select, column, table
1448 >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
1449 >>> t = table("t", column("c"))
1450 >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
1451 >>> subq = stmt.subquery()
1452 >>> subq.c.t_c
1453 <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c>
1454
1455 :class:`.ColumnCollection` also indexes the columns in order and allows
1456 them to be accessible by their integer position::
1457
1458 >>> cc[0]
1459 Column('x', Integer(), table=None)
1460 >>> cc[1]
1461 Column('y', Integer(), table=None)
1462
1463 .. versionadded:: 1.4 :class:`_expression.ColumnCollection`
1464 allows integer-based
1465 index access to the collection.
1466
1467 Iterating the collection yields the column expressions in order::
1468
1469 >>> list(cc)
1470 [Column('x', Integer(), table=None),
1471 Column('y', Integer(), table=None)]
1472
1473 The base :class:`_expression.ColumnCollection` object can store
1474 duplicates, which can
1475 mean either two columns with the same key, in which case the column
1476 returned by key access is **arbitrary**::
1477
1478 >>> x1, x2 = Column('x', Integer), Column('x', Integer)
1479 >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)])
1480 >>> list(cc)
1481 [Column('x', Integer(), table=None),
1482 Column('x', Integer(), table=None)]
1483 >>> cc['x'] is x1
1484 False
1485 >>> cc['x'] is x2
1486 True
1487
1488 Or it can also mean the same column multiple times. These cases are
1489 supported as :class:`_expression.ColumnCollection`
1490 is used to represent the columns in
1491 a SELECT statement which may include duplicates.
1492
1493 A special subclass :class:`.DedupeColumnCollection` exists which instead
1494 maintains SQLAlchemy's older behavior of not allowing duplicates; this
1495 collection is used for schema level objects like :class:`_schema.Table`
1496 and
1497 :class:`.PrimaryKeyConstraint` where this deduping is helpful. The
1498 :class:`.DedupeColumnCollection` class also has additional mutation methods
1499 as the schema constructs have more use cases that require removal and
1500 replacement of columns.
1501
1502 .. versionchanged:: 1.4 :class:`_expression.ColumnCollection`
1503 now stores duplicate
1504 column keys as well as the same column in multiple positions. The
1505 :class:`.DedupeColumnCollection` class is added to maintain the
1506 former behavior in those cases where deduplication as well as
1507 additional replace/remove operations are needed.
1508
1509
1510 """
1511
1512 __slots__ = "_collection", "_index", "_colset", "_proxy_index"
1513
1514 _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]]
1515 _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
1516 _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]]
1517 _colset: Set[_COL_co]
1518
1519 def __init__(
1520 self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None
1521 ):
1522 object.__setattr__(self, "_colset", set())
1523 object.__setattr__(self, "_index", {})
1524 object.__setattr__(
1525 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1526 )
1527 object.__setattr__(self, "_collection", [])
1528 if columns:
1529 self._initial_populate(columns)
1530
1531 @util.preload_module("sqlalchemy.sql.elements")
1532 def __clause_element__(self) -> ClauseList:
1533 elements = util.preloaded.sql_elements
1534
1535 return elements.ClauseList(
1536 _literal_as_text_role=roles.ColumnsClauseRole,
1537 group=False,
1538 *self._all_columns,
1539 )
1540
1541 def _initial_populate(
1542 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1543 ) -> None:
1544 self._populate_separate_keys(iter_)
1545
1546 @property
1547 def _all_columns(self) -> List[_COL_co]:
1548 return [col for (_, col, _) in self._collection]
1549
1550 def keys(self) -> List[_COLKEY]:
1551 """Return a sequence of string key names for all columns in this
1552 collection."""
1553 return [k for (k, _, _) in self._collection]
1554
1555 def values(self) -> List[_COL_co]:
1556 """Return a sequence of :class:`_sql.ColumnClause` or
1557 :class:`_schema.Column` objects for all columns in this
1558 collection."""
1559 return [col for (_, col, _) in self._collection]
1560
1561 def items(self) -> List[Tuple[_COLKEY, _COL_co]]:
1562 """Return a sequence of (key, column) tuples for all columns in this
1563 collection each consisting of a string key name and a
1564 :class:`_sql.ColumnClause` or
1565 :class:`_schema.Column` object.
1566 """
1567
1568 return [(k, col) for (k, col, _) in self._collection]
1569
1570 def __bool__(self) -> bool:
1571 return bool(self._collection)
1572
1573 def __len__(self) -> int:
1574 return len(self._collection)
1575
1576 def __iter__(self) -> Iterator[_COL_co]:
1577 # turn to a list first to maintain over a course of changes
1578 return iter([col for _, col, _ in self._collection])
1579
1580 @overload
1581 def __getitem__(self, key: Union[str, int]) -> _COL_co: ...
1582
1583 @overload
1584 def __getitem__(
1585 self, key: Tuple[Union[str, int], ...]
1586 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1587
1588 @overload
1589 def __getitem__(
1590 self, key: slice
1591 ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
1592
1593 def __getitem__(
1594 self, key: Union[str, int, slice, Tuple[Union[str, int], ...]]
1595 ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]:
1596 try:
1597 if isinstance(key, (tuple, slice)):
1598 if isinstance(key, slice):
1599 cols = (
1600 (sub_key, col)
1601 for (sub_key, col, _) in self._collection[key]
1602 )
1603 else:
1604 cols = (self._index[sub_key] for sub_key in key)
1605
1606 return ColumnCollection(cols).as_readonly()
1607 else:
1608 return self._index[key][1]
1609 except KeyError as err:
1610 if isinstance(err.args[0], int):
1611 raise IndexError(err.args[0]) from err
1612 else:
1613 raise
1614
1615 def __getattr__(self, key: str) -> _COL_co:
1616 try:
1617 return self._index[key][1]
1618 except KeyError as err:
1619 raise AttributeError(key) from err
1620
1621 def __contains__(self, key: str) -> bool:
1622 if key not in self._index:
1623 if not isinstance(key, str):
1624 raise exc.ArgumentError(
1625 "__contains__ requires a string argument"
1626 )
1627 return False
1628 else:
1629 return True
1630
1631 def compare(self, other: ColumnCollection[Any, Any]) -> bool:
1632 """Compare this :class:`_expression.ColumnCollection` to another
1633 based on the names of the keys"""
1634
1635 for l, r in zip_longest(self, other):
1636 if l is not r:
1637 return False
1638 else:
1639 return True
1640
1641 def __eq__(self, other: Any) -> bool:
1642 return self.compare(other)
1643
1644 @overload
1645 def get(self, key: str, default: None = None) -> Optional[_COL_co]: ...
1646
1647 @overload
1648 def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ...
1649
1650 def get(
1651 self, key: str, default: Optional[_COL] = None
1652 ) -> Optional[Union[_COL_co, _COL]]:
1653 """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object
1654 based on a string key name from this
1655 :class:`_expression.ColumnCollection`."""
1656
1657 if key in self._index:
1658 return self._index[key][1]
1659 else:
1660 return default
1661
1662 def __str__(self) -> str:
1663 return "%s(%s)" % (
1664 self.__class__.__name__,
1665 ", ".join(str(c) for c in self),
1666 )
1667
1668 def __setitem__(self, key: str, value: Any) -> NoReturn:
1669 raise NotImplementedError()
1670
1671 def __delitem__(self, key: str) -> NoReturn:
1672 raise NotImplementedError()
1673
1674 def __setattr__(self, key: str, obj: Any) -> NoReturn:
1675 raise NotImplementedError()
1676
1677 def clear(self) -> NoReturn:
1678 """Dictionary clear() is not implemented for
1679 :class:`_sql.ColumnCollection`."""
1680 raise NotImplementedError()
1681
1682 def remove(self, column: Any) -> None:
1683 raise NotImplementedError()
1684
1685 def update(self, iter_: Any) -> NoReturn:
1686 """Dictionary update() is not implemented for
1687 :class:`_sql.ColumnCollection`."""
1688 raise NotImplementedError()
1689
1690 # https://github.com/python/mypy/issues/4266
1691 __hash__ = None # type: ignore
1692
1693 def _populate_separate_keys(
1694 self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
1695 ) -> None:
1696 """populate from an iterator of (key, column)"""
1697
1698 self._collection[:] = collection = [
1699 (k, c, _ColumnMetrics(self, c)) for k, c in iter_
1700 ]
1701 self._colset.update(c._deannotate() for _, c, _ in collection)
1702 self._index.update(
1703 {idx: (k, c) for idx, (k, c, _) in enumerate(collection)}
1704 )
1705 self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
1706
1707 def add(
1708 self, column: ColumnElement[Any], key: Optional[_COLKEY] = None
1709 ) -> None:
1710 """Add a column to this :class:`_sql.ColumnCollection`.
1711
1712 .. note::
1713
1714 This method is **not normally used by user-facing code**, as the
1715 :class:`_sql.ColumnCollection` is usually part of an existing
1716 object such as a :class:`_schema.Table`. To add a
1717 :class:`_schema.Column` to an existing :class:`_schema.Table`
1718 object, use the :meth:`_schema.Table.append_column` method.
1719
1720 """
1721 colkey: _COLKEY
1722
1723 if key is None:
1724 colkey = column.key # type: ignore
1725 else:
1726 colkey = key
1727
1728 l = len(self._collection)
1729
1730 # don't really know how this part is supposed to work w/ the
1731 # covariant thing
1732
1733 _column = cast(_COL_co, column)
1734
1735 self._collection.append(
1736 (colkey, _column, _ColumnMetrics(self, _column))
1737 )
1738 self._colset.add(_column._deannotate())
1739 self._index[l] = (colkey, _column)
1740 if colkey not in self._index:
1741 self._index[colkey] = (colkey, _column)
1742
1743 def __getstate__(self) -> Dict[str, Any]:
1744 return {
1745 "_collection": [(k, c) for k, c, _ in self._collection],
1746 "_index": self._index,
1747 }
1748
1749 def __setstate__(self, state: Dict[str, Any]) -> None:
1750 object.__setattr__(self, "_index", state["_index"])
1751 object.__setattr__(
1752 self, "_proxy_index", collections.defaultdict(util.OrderedSet)
1753 )
1754 object.__setattr__(
1755 self,
1756 "_collection",
1757 [
1758 (k, c, _ColumnMetrics(self, c))
1759 for (k, c) in state["_collection"]
1760 ],
1761 )
1762 object.__setattr__(
1763 self, "_colset", {col for k, col, _ in self._collection}
1764 )
1765
1766 def contains_column(self, col: ColumnElement[Any]) -> bool:
1767 """Checks if a column object exists in this collection"""
1768 if col not in self._colset:
1769 if isinstance(col, str):
1770 raise exc.ArgumentError(
1771 "contains_column cannot be used with string arguments. "
1772 "Use ``col_name in table.c`` instead."
1773 )
1774 return False
1775 else:
1776 return True
1777
1778 def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
1779 """Return a "read only" form of this
1780 :class:`_sql.ColumnCollection`."""
1781
1782 return ReadOnlyColumnCollection(self)
1783
1784 def _init_proxy_index(self):
1785 """populate the "proxy index", if empty.
1786
1787 proxy index is added in 2.0 to provide more efficient operation
1788 for the corresponding_column() method.
1789
1790 For reasons of both time to construct new .c collections as well as
1791 memory conservation for large numbers of large .c collections, the
1792 proxy_index is only filled if corresponding_column() is called. once
1793 filled it stays that way, and new _ColumnMetrics objects created after
1794 that point will populate it with new data. Note this case would be
1795 unusual, if not nonexistent, as it means a .c collection is being
1796 mutated after corresponding_column() were used, however it is tested in
1797 test/base/test_utils.py.
1798
1799 """
1800 pi = self._proxy_index
1801 if pi:
1802 return
1803
1804 for _, _, metrics in self._collection:
1805 eps = metrics.column._expanded_proxy_set
1806
1807 for eps_col in eps:
1808 pi[eps_col].add(metrics)
1809
1810 def corresponding_column(
1811 self, column: _COL, require_embedded: bool = False
1812 ) -> Optional[Union[_COL, _COL_co]]:
1813 """Given a :class:`_expression.ColumnElement`, return the exported
1814 :class:`_expression.ColumnElement` object from this
1815 :class:`_expression.ColumnCollection`
1816 which corresponds to that original :class:`_expression.ColumnElement`
1817 via a common
1818 ancestor column.
1819
1820 :param column: the target :class:`_expression.ColumnElement`
1821 to be matched.
1822
1823 :param require_embedded: only return corresponding columns for
1824 the given :class:`_expression.ColumnElement`, if the given
1825 :class:`_expression.ColumnElement`
1826 is actually present within a sub-element
1827 of this :class:`_expression.Selectable`.
1828 Normally the column will match if
1829 it merely shares a common ancestor with one of the exported
1830 columns of this :class:`_expression.Selectable`.
1831
1832 .. seealso::
1833
1834 :meth:`_expression.Selectable.corresponding_column`
1835 - invokes this method
1836 against the collection returned by
1837 :attr:`_expression.Selectable.exported_columns`.
1838
1839 .. versionchanged:: 1.4 the implementation for ``corresponding_column``
1840 was moved onto the :class:`_expression.ColumnCollection` itself.
1841
1842 """
1843 # TODO: cython candidate
1844
1845 # don't dig around if the column is locally present
1846 if column in self._colset:
1847 return column
1848
1849 selected_intersection, selected_metrics = None, None
1850 target_set = column.proxy_set
1851
1852 pi = self._proxy_index
1853 if not pi:
1854 self._init_proxy_index()
1855
1856 for current_metrics in (
1857 mm for ts in target_set if ts in pi for mm in pi[ts]
1858 ):
1859 if not require_embedded or current_metrics.embedded(target_set):
1860 if selected_metrics is None:
1861 # no corresponding column yet, pick this one.
1862 selected_metrics = current_metrics
1863 continue
1864
1865 current_intersection = target_set.intersection(
1866 current_metrics.column._expanded_proxy_set
1867 )
1868 if selected_intersection is None:
1869 selected_intersection = target_set.intersection(
1870 selected_metrics.column._expanded_proxy_set
1871 )
1872
1873 if len(current_intersection) > len(selected_intersection):
1874 # 'current' has a larger field of correspondence than
1875 # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x
1876 # matches a1.c.x->table.c.x better than
1877 # selectable.c.x->table.c.x does.
1878
1879 selected_metrics = current_metrics
1880 selected_intersection = current_intersection
1881 elif current_intersection == selected_intersection:
1882 # they have the same field of correspondence. see
1883 # which proxy_set has fewer columns in it, which
1884 # indicates a closer relationship with the root
1885 # column. Also take into account the "weight"
1886 # attribute which CompoundSelect() uses to give
1887 # higher precedence to columns based on vertical
1888 # position in the compound statement, and discard
1889 # columns that have no reference to the target
1890 # column (also occurs with CompoundSelect)
1891
1892 selected_col_distance = sum(
1893 [
1894 sc._annotations.get("weight", 1)
1895 for sc in (
1896 selected_metrics.column._uncached_proxy_list()
1897 )
1898 if sc.shares_lineage(column)
1899 ],
1900 )
1901 current_col_distance = sum(
1902 [
1903 sc._annotations.get("weight", 1)
1904 for sc in (
1905 current_metrics.column._uncached_proxy_list()
1906 )
1907 if sc.shares_lineage(column)
1908 ],
1909 )
1910 if current_col_distance < selected_col_distance:
1911 selected_metrics = current_metrics
1912 selected_intersection = current_intersection
1913
1914 return selected_metrics.column if selected_metrics else None
1915
1916
1917_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]")
1918
1919
1920class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
1921 """A :class:`_expression.ColumnCollection`
1922 that maintains deduplicating behavior.
1923
1924 This is useful by schema level objects such as :class:`_schema.Table` and
1925 :class:`.PrimaryKeyConstraint`. The collection includes more
1926 sophisticated mutator methods as well to suit schema objects which
1927 require mutable column collections.
1928
1929 .. versionadded:: 1.4
1930
1931 """
1932
1933 def add( # type: ignore[override]
1934 self, column: _NAMEDCOL, key: Optional[str] = None
1935 ) -> None:
1936 if key is not None and column.key != key:
1937 raise exc.ArgumentError(
1938 "DedupeColumnCollection requires columns be under "
1939 "the same key as their .key"
1940 )
1941 key = column.key
1942
1943 if key is None:
1944 raise exc.ArgumentError(
1945 "Can't add unnamed column to column collection"
1946 )
1947
1948 if key in self._index:
1949 existing = self._index[key][1]
1950
1951 if existing is column:
1952 return
1953
1954 self.replace(column)
1955
1956 # pop out memoized proxy_set as this
1957 # operation may very well be occurring
1958 # in a _make_proxy operation
1959 util.memoized_property.reset(column, "proxy_set")
1960 else:
1961 self._append_new_column(key, column)
1962
1963 def _append_new_column(self, key: str, named_column: _NAMEDCOL) -> None:
1964 l = len(self._collection)
1965 self._collection.append(
1966 (key, named_column, _ColumnMetrics(self, named_column))
1967 )
1968 self._colset.add(named_column._deannotate())
1969 self._index[l] = (key, named_column)
1970 self._index[key] = (key, named_column)
1971
1972 def _populate_separate_keys(
1973 self, iter_: Iterable[Tuple[str, _NAMEDCOL]]
1974 ) -> None:
1975 """populate from an iterator of (key, column)"""
1976 cols = list(iter_)
1977
1978 replace_col = []
1979 for k, col in cols:
1980 if col.key != k:
1981 raise exc.ArgumentError(
1982 "DedupeColumnCollection requires columns be under "
1983 "the same key as their .key"
1984 )
1985 if col.name in self._index and col.key != col.name:
1986 replace_col.append(col)
1987 elif col.key in self._index:
1988 replace_col.append(col)
1989 else:
1990 self._index[k] = (k, col)
1991 self._collection.append((k, col, _ColumnMetrics(self, col)))
1992 self._colset.update(c._deannotate() for (k, c, _) in self._collection)
1993
1994 self._index.update(
1995 (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection)
1996 )
1997 for col in replace_col:
1998 self.replace(col)
1999
2000 def extend(self, iter_: Iterable[_NAMEDCOL]) -> None:
2001 self._populate_separate_keys((col.key, col) for col in iter_)
2002
2003 def remove(self, column: _NAMEDCOL) -> None:
2004 if column not in self._colset:
2005 raise ValueError(
2006 "Can't remove column %r; column is not in this collection"
2007 % column
2008 )
2009 del self._index[column.key]
2010 self._colset.remove(column)
2011 self._collection[:] = [
2012 (k, c, metrics)
2013 for (k, c, metrics) in self._collection
2014 if c is not column
2015 ]
2016 for metrics in self._proxy_index.get(column, ()):
2017 metrics.dispose(self)
2018
2019 self._index.update(
2020 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2021 )
2022 # delete higher index
2023 del self._index[len(self._collection)]
2024
2025 def replace(
2026 self,
2027 column: _NAMEDCOL,
2028 extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
2029 ) -> None:
2030 """add the given column to this collection, removing unaliased
2031 versions of this column as well as existing columns with the
2032 same key.
2033
2034 e.g.::
2035
2036 t = Table('sometable', metadata, Column('col1', Integer))
2037 t.columns.replace(Column('col1', Integer, key='columnone'))
2038
2039 will remove the original 'col1' from the collection, and add
2040 the new column under the name 'columnname'.
2041
2042 Used by schema.Column to override columns during table reflection.
2043
2044 """
2045
2046 if extra_remove:
2047 remove_col = set(extra_remove)
2048 else:
2049 remove_col = set()
2050 # remove up to two columns based on matches of name as well as key
2051 if column.name in self._index and column.key != column.name:
2052 other = self._index[column.name][1]
2053 if other.name == other.key:
2054 remove_col.add(other)
2055
2056 if column.key in self._index:
2057 remove_col.add(self._index[column.key][1])
2058
2059 if not remove_col:
2060 self._append_new_column(column.key, column)
2061 return
2062 new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
2063 replaced = False
2064 for k, col, metrics in self._collection:
2065 if col in remove_col:
2066 if not replaced:
2067 replaced = True
2068 new_cols.append(
2069 (column.key, column, _ColumnMetrics(self, column))
2070 )
2071 else:
2072 new_cols.append((k, col, metrics))
2073
2074 if remove_col:
2075 self._colset.difference_update(remove_col)
2076
2077 for rc in remove_col:
2078 for metrics in self._proxy_index.get(rc, ()):
2079 metrics.dispose(self)
2080
2081 if not replaced:
2082 new_cols.append((column.key, column, _ColumnMetrics(self, column)))
2083
2084 self._colset.add(column._deannotate())
2085 self._collection[:] = new_cols
2086
2087 self._index.clear()
2088
2089 self._index.update(
2090 {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
2091 )
2092 self._index.update({k: (k, col) for (k, col, _) in self._collection})
2093
2094
2095class ReadOnlyColumnCollection(
2096 util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co]
2097):
2098 __slots__ = ("_parent",)
2099
2100 def __init__(self, collection):
2101 object.__setattr__(self, "_parent", collection)
2102 object.__setattr__(self, "_colset", collection._colset)
2103 object.__setattr__(self, "_index", collection._index)
2104 object.__setattr__(self, "_collection", collection._collection)
2105 object.__setattr__(self, "_proxy_index", collection._proxy_index)
2106
2107 def __getstate__(self):
2108 return {"_parent": self._parent}
2109
2110 def __setstate__(self, state):
2111 parent = state["_parent"]
2112 self.__init__(parent) # type: ignore
2113
2114 def add(self, column: Any, key: Any = ...) -> Any:
2115 self._readonly()
2116
2117 def extend(self, elements: Any) -> NoReturn:
2118 self._readonly()
2119
2120 def remove(self, item: Any) -> NoReturn:
2121 self._readonly()
2122
2123
2124class ColumnSet(util.OrderedSet["ColumnClause[Any]"]):
2125 def contains_column(self, col):
2126 return col in self
2127
2128 def extend(self, cols):
2129 for col in cols:
2130 self.add(col)
2131
2132 def __eq__(self, other):
2133 l = []
2134 for c in other:
2135 for local in self:
2136 if c.shares_lineage(local):
2137 l.append(c == local)
2138 return elements.and_(*l)
2139
2140 def __hash__(self): # type: ignore[override]
2141 return hash(tuple(x for x in self))
2142
2143
2144def _entity_namespace(
2145 entity: Union[_HasEntityNamespace, ExternallyTraversible]
2146) -> _EntityNamespace:
2147 """Return the nearest .entity_namespace for the given entity.
2148
2149 If not immediately available, does an iterate to find a sub-element
2150 that has one, if any.
2151
2152 """
2153 try:
2154 return cast(_HasEntityNamespace, entity).entity_namespace
2155 except AttributeError:
2156 for elem in visitors.iterate(cast(ExternallyTraversible, entity)):
2157 if _is_has_entity_namespace(elem):
2158 return elem.entity_namespace
2159 else:
2160 raise
2161
2162
2163def _entity_namespace_key(
2164 entity: Union[_HasEntityNamespace, ExternallyTraversible],
2165 key: str,
2166 default: Union[SQLCoreOperations[Any], _NoArg] = NO_ARG,
2167) -> SQLCoreOperations[Any]:
2168 """Return an entry from an entity_namespace.
2169
2170
2171 Raises :class:`_exc.InvalidRequestError` rather than attribute error
2172 on not found.
2173
2174 """
2175
2176 try:
2177 ns = _entity_namespace(entity)
2178 if default is not NO_ARG:
2179 return getattr(ns, key, default)
2180 else:
2181 return getattr(ns, key) # type: ignore
2182 except AttributeError as err:
2183 raise exc.InvalidRequestError(
2184 'Entity namespace for "%s" has no property "%s"' % (entity, key)
2185 ) from err