1# sql/annotation.py
2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: http://www.opensource.org/licenses/mit-license.php
7
8"""The :class:`.Annotated` class and related routines; creates hash-equivalent
9copies of SQL constructs which contain context-specific markers and
10associations.
11
12"""
13
14from . import operators
15from .. import util
16
17
18class Annotated(object):
19 """clones a ClauseElement and applies an 'annotations' dictionary.
20
21 Unlike regular clones, this clone also mimics __hash__() and
22 __cmp__() of the original element so that it takes its place
23 in hashed collections.
24
25 A reference to the original element is maintained, for the important
26 reason of keeping its hash value current. When GC'ed, the
27 hash value may be reused, causing conflicts.
28
29 .. note:: The rationale for Annotated producing a brand new class,
30 rather than placing the functionality directly within ClauseElement,
31 is **performance**. The __hash__() method is absent on plain
32 ClauseElement which leads to significantly reduced function call
33 overhead, as the use of sets and dictionaries against ClauseElement
34 objects is prevalent, but most are not "annotated".
35
36 """
37
38 def __new__(cls, *args):
39 if not args:
40 # clone constructor
41 return object.__new__(cls)
42 else:
43 element, values = args
44 # pull appropriate subclass from registry of annotated
45 # classes
46 try:
47 cls = annotated_classes[element.__class__]
48 except KeyError:
49 cls = _new_annotation_type(element.__class__, cls)
50 return object.__new__(cls)
51
52 def __init__(self, element, values):
53 self.__dict__ = element.__dict__.copy()
54 self.__element = element
55 self._annotations = values
56 self._hash = hash(element)
57
58 def _annotate(self, values):
59 _values = self._annotations.copy()
60 _values.update(values)
61 return self._with_annotations(_values)
62
63 def _with_annotations(self, values):
64 clone = self.__class__.__new__(self.__class__)
65 clone.__dict__ = self.__dict__.copy()
66 clone._annotations = values
67 return clone
68
69 def _deannotate(self, values=None, clone=True):
70 if values is None:
71 return self.__element
72 else:
73 _values = self._annotations.copy()
74 for v in values:
75 _values.pop(v, None)
76 return self._with_annotations(_values)
77
78 def _compiler_dispatch(self, visitor, **kw):
79 return self.__element.__class__._compiler_dispatch(self, visitor, **kw)
80
81 @property
82 def _constructor(self):
83 return self.__element._constructor
84
85 def _clone(self):
86 clone = self.__element._clone()
87 if clone is self.__element:
88 # detect immutable, don't change anything
89 return self
90 else:
91 # update the clone with any changes that have occurred
92 # to this object's __dict__.
93 clone.__dict__.update(self.__dict__)
94 return self.__class__(clone, self._annotations)
95
96 def __reduce__(self):
97 return self.__class__, (self.__element, self._annotations)
98
99 def __hash__(self):
100 return self._hash
101
102 def __eq__(self, other):
103 if isinstance(self.__element, operators.ColumnOperators):
104 return self.__element.__class__.__eq__(self, other)
105 else:
106 return hash(other) == hash(self)
107
108
109# hard-generate Annotated subclasses. this technique
110# is used instead of on-the-fly types (i.e. type.__new__())
111# so that the resulting objects are pickleable.
112annotated_classes = {}
113
114
115def _deep_annotate(element, annotations, exclude=None):
116 """Deep copy the given ClauseElement, annotating each element
117 with the given annotations dictionary.
118
119 Elements within the exclude collection will be cloned but not annotated.
120
121 """
122
123 def clone(elem):
124 if (
125 exclude
126 and hasattr(elem, "proxy_set")
127 and elem.proxy_set.intersection(exclude)
128 ):
129 newelem = elem._clone()
130 elif annotations != elem._annotations:
131 newelem = elem._annotate(annotations)
132 else:
133 newelem = elem
134 newelem._copy_internals(clone=clone)
135 return newelem
136
137 if element is not None:
138 element = clone(element)
139 clone = None # remove gc cycles
140 return element
141
142
143def _deep_deannotate(element, values=None):
144 """Deep copy the given element, removing annotations."""
145
146 cloned = util.column_dict()
147
148 def clone(elem):
149 # if a values dict is given,
150 # the elem must be cloned each time it appears,
151 # as there may be different annotations in source
152 # elements that are remaining. if totally
153 # removing all annotations, can assume the same
154 # slate...
155 if values or elem not in cloned:
156 newelem = elem._deannotate(values=values, clone=True)
157 newelem._copy_internals(clone=clone)
158 if not values:
159 cloned[elem] = newelem
160 return newelem
161 else:
162 return cloned[elem]
163
164 if element is not None:
165 element = clone(element)
166 clone = None # remove gc cycles
167 return element
168
169
170def _shallow_annotate(element, annotations):
171 """Annotate the given ClauseElement and copy its internals so that
172 internal objects refer to the new annotated object.
173
174 Basically used to apply a "dont traverse" annotation to a
175 selectable, without digging throughout the whole
176 structure wasting time.
177 """
178 element = element._annotate(annotations)
179 element._copy_internals()
180 return element
181
182
183def _new_annotation_type(cls, base_cls):
184 if issubclass(cls, Annotated):
185 return cls
186 elif cls in annotated_classes:
187 return annotated_classes[cls]
188
189 for super_ in cls.__mro__:
190 # check if an Annotated subclass more specific than
191 # the given base_cls is already registered, such
192 # as AnnotatedColumnElement.
193 if super_ in annotated_classes:
194 base_cls = annotated_classes[super_]
195 break
196
197 annotated_classes[cls] = anno_cls = type(
198 "Annotated%s" % cls.__name__, (base_cls, cls), {}
199 )
200 globals()["Annotated%s" % cls.__name__] = anno_cls
201 return anno_cls
202
203
204def _prepare_annotations(target_hierarchy, base_cls):
205 stack = [target_hierarchy]
206 while stack:
207 cls = stack.pop()
208 stack.extend(cls.__subclasses__())
209
210 _new_annotation_type(cls, base_cls)