Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/django/db/models/indexes.py: 20%
142 statements
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
1from django.db.backends.utils import names_digest, split_identifier
2from django.db.models.expressions import Col, ExpressionList, F, Func, OrderBy
3from django.db.models.functions import Collate
4from django.db.models.query_utils import Q
5from django.db.models.sql import Query
6from django.utils.functional import partition
8__all__ = ["Index"]
11class Index:
12 suffix = "idx"
13 # The max length of the name of the index (restricted to 30 for
14 # cross-database compatibility with Oracle)
15 max_name_length = 30
17 def __init__(
18 self,
19 *expressions,
20 fields=(),
21 name=None,
22 db_tablespace=None,
23 opclasses=(),
24 condition=None,
25 include=None,
26 ):
27 if opclasses and not name:
28 raise ValueError("An index must be named to use opclasses.")
29 if not isinstance(condition, (type(None), Q)):
30 raise ValueError("Index.condition must be a Q instance.")
31 if condition and not name:
32 raise ValueError("An index must be named to use condition.")
33 if not isinstance(fields, (list, tuple)):
34 raise ValueError("Index.fields must be a list or tuple.")
35 if not isinstance(opclasses, (list, tuple)):
36 raise ValueError("Index.opclasses must be a list or tuple.")
37 if not expressions and not fields:
38 raise ValueError(
39 "At least one field or expression is required to define an index."
40 )
41 if expressions and fields:
42 raise ValueError(
43 "Index.fields and expressions are mutually exclusive.",
44 )
45 if expressions and not name:
46 raise ValueError("An index must be named to use expressions.")
47 if expressions and opclasses:
48 raise ValueError(
49 "Index.opclasses cannot be used with expressions. Use "
50 "django.contrib.postgres.indexes.OpClass() instead."
51 )
52 if opclasses and len(fields) != len(opclasses):
53 raise ValueError(
54 "Index.fields and Index.opclasses must have the same number of "
55 "elements."
56 )
57 if fields and not all(isinstance(field, str) for field in fields):
58 raise ValueError("Index.fields must contain only strings with field names.")
59 if include and not name:
60 raise ValueError("A covering index must be named.")
61 if not isinstance(include, (type(None), list, tuple)):
62 raise ValueError("Index.include must be a list or tuple.")
63 self.fields = list(fields)
64 # A list of 2-tuple with the field name and ordering ('' or 'DESC').
65 self.fields_orders = [
66 (field_name[1:], "DESC") if field_name.startswith("-") else (field_name, "")
67 for field_name in self.fields
68 ]
69 self.name = name or ""
70 self.db_tablespace = db_tablespace
71 self.opclasses = opclasses
72 self.condition = condition
73 self.include = tuple(include) if include else ()
74 self.expressions = tuple(
75 F(expression) if isinstance(expression, str) else expression
76 for expression in expressions
77 )
79 @property
80 def contains_expressions(self):
81 return bool(self.expressions)
83 def _get_condition_sql(self, model, schema_editor):
84 if self.condition is None:
85 return None
86 query = Query(model=model, alias_cols=False)
87 where = query.build_where(self.condition)
88 compiler = query.get_compiler(connection=schema_editor.connection)
89 sql, params = where.as_sql(compiler, schema_editor.connection)
90 return sql % tuple(schema_editor.quote_value(p) for p in params)
92 def create_sql(self, model, schema_editor, using="", **kwargs):
93 include = [
94 model._meta.get_field(field_name).column for field_name in self.include
95 ]
96 condition = self._get_condition_sql(model, schema_editor)
97 if self.expressions:
98 index_expressions = []
99 for expression in self.expressions:
100 index_expression = IndexExpression(expression)
101 index_expression.set_wrapper_classes(schema_editor.connection)
102 index_expressions.append(index_expression)
103 expressions = ExpressionList(*index_expressions).resolve_expression(
104 Query(model, alias_cols=False),
105 )
106 fields = None
107 col_suffixes = None
108 else:
109 fields = [
110 model._meta.get_field(field_name)
111 for field_name, _ in self.fields_orders
112 ]
113 if schema_editor.connection.features.supports_index_column_ordering:
114 col_suffixes = [order[1] for order in self.fields_orders]
115 else:
116 col_suffixes = [""] * len(self.fields_orders)
117 expressions = None
118 return schema_editor._create_index_sql(
119 model,
120 fields=fields,
121 name=self.name,
122 using=using,
123 db_tablespace=self.db_tablespace,
124 col_suffixes=col_suffixes,
125 opclasses=self.opclasses,
126 condition=condition,
127 include=include,
128 expressions=expressions,
129 **kwargs,
130 )
132 def remove_sql(self, model, schema_editor, **kwargs):
133 return schema_editor._delete_index_sql(model, self.name, **kwargs)
135 def deconstruct(self):
136 path = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
137 path = path.replace("django.db.models.indexes", "django.db.models")
138 kwargs = {"name": self.name}
139 if self.fields:
140 kwargs["fields"] = self.fields
141 if self.db_tablespace is not None:
142 kwargs["db_tablespace"] = self.db_tablespace
143 if self.opclasses:
144 kwargs["opclasses"] = self.opclasses
145 if self.condition:
146 kwargs["condition"] = self.condition
147 if self.include:
148 kwargs["include"] = self.include
149 return (path, self.expressions, kwargs)
151 def clone(self):
152 """Create a copy of this Index."""
153 _, args, kwargs = self.deconstruct()
154 return self.__class__(*args, **kwargs)
156 def set_name_with_model(self, model):
157 """
158 Generate a unique name for the index.
160 The name is divided into 3 parts - table name (12 chars), field name
161 (8 chars) and unique hash + suffix (10 chars). Each part is made to
162 fit its size by truncating the excess length.
163 """
164 _, table_name = split_identifier(model._meta.db_table)
165 column_names = [
166 model._meta.get_field(field_name).column
167 for field_name, order in self.fields_orders
168 ]
169 column_names_with_order = [
170 (("-%s" if order else "%s") % column_name)
171 for column_name, (field_name, order) in zip(
172 column_names, self.fields_orders
173 )
174 ]
175 # The length of the parts of the name is based on the default max
176 # length of 30 characters.
177 hash_data = [table_name] + column_names_with_order + [self.suffix]
178 self.name = "%s_%s_%s" % (
179 table_name[:11],
180 column_names[0][:7],
181 "%s_%s" % (names_digest(*hash_data, length=6), self.suffix),
182 )
183 if len(self.name) > self.max_name_length:
184 raise ValueError(
185 "Index too long for multiple database support. Is self.suffix "
186 "longer than 3 characters?"
187 )
188 if self.name[0] == "_" or self.name[0].isdigit():
189 self.name = "D%s" % self.name[1:]
191 def __repr__(self):
192 return "<%s:%s%s%s%s%s%s%s>" % (
193 self.__class__.__qualname__,
194 "" if not self.fields else " fields=%s" % repr(self.fields),
195 "" if not self.expressions else " expressions=%s" % repr(self.expressions),
196 "" if not self.name else " name=%s" % repr(self.name),
197 ""
198 if self.db_tablespace is None
199 else " db_tablespace=%s" % repr(self.db_tablespace),
200 "" if self.condition is None else " condition=%s" % self.condition,
201 "" if not self.include else " include=%s" % repr(self.include),
202 "" if not self.opclasses else " opclasses=%s" % repr(self.opclasses),
203 )
205 def __eq__(self, other):
206 if self.__class__ == other.__class__:
207 return self.deconstruct() == other.deconstruct()
208 return NotImplemented
211class IndexExpression(Func):
212 """Order and wrap expressions for CREATE INDEX statements."""
214 template = "%(expressions)s"
215 wrapper_classes = (OrderBy, Collate)
217 def set_wrapper_classes(self, connection=None):
218 # Some databases (e.g. MySQL) treats COLLATE as an indexed expression.
219 if connection and connection.features.collate_as_index_expression:
220 self.wrapper_classes = tuple(
221 [
222 wrapper_cls
223 for wrapper_cls in self.wrapper_classes
224 if wrapper_cls is not Collate
225 ]
226 )
228 @classmethod
229 def register_wrappers(cls, *wrapper_classes):
230 cls.wrapper_classes = wrapper_classes
232 def resolve_expression(
233 self,
234 query=None,
235 allow_joins=True,
236 reuse=None,
237 summarize=False,
238 for_save=False,
239 ):
240 expressions = list(self.flatten())
241 # Split expressions and wrappers.
242 index_expressions, wrappers = partition(
243 lambda e: isinstance(e, self.wrapper_classes),
244 expressions,
245 )
246 wrapper_types = [type(wrapper) for wrapper in wrappers]
247 if len(wrapper_types) != len(set(wrapper_types)):
248 raise ValueError(
249 "Multiple references to %s can't be used in an indexed "
250 "expression."
251 % ", ".join(
252 [wrapper_cls.__qualname__ for wrapper_cls in self.wrapper_classes]
253 )
254 )
255 if expressions[1 : len(wrappers) + 1] != wrappers:
256 raise ValueError(
257 "%s must be topmost expressions in an indexed expression."
258 % ", ".join(
259 [wrapper_cls.__qualname__ for wrapper_cls in self.wrapper_classes]
260 )
261 )
262 # Wrap expressions in parentheses if they are not column references.
263 root_expression = index_expressions[1]
264 resolve_root_expression = root_expression.resolve_expression(
265 query,
266 allow_joins,
267 reuse,
268 summarize,
269 for_save,
270 )
271 if not isinstance(resolve_root_expression, Col):
272 root_expression = Func(root_expression, template="(%(expressions)s)")
274 if wrappers:
275 # Order wrappers and set their expressions.
276 wrappers = sorted(
277 wrappers,
278 key=lambda w: self.wrapper_classes.index(type(w)),
279 )
280 wrappers = [wrapper.copy() for wrapper in wrappers]
281 for i, wrapper in enumerate(wrappers[:-1]):
282 wrapper.set_source_expressions([wrappers[i + 1]])
283 # Set the root expression on the deepest wrapper.
284 wrappers[-1].set_source_expressions([root_expression])
285 self.set_source_expressions([wrappers[0]])
286 else:
287 # Use the root expression, if there are no wrappers.
288 self.set_source_expressions([root_expression])
289 return super().resolve_expression(
290 query, allow_joins, reuse, summarize, for_save
291 )
293 def as_sqlite(self, compiler, connection, **extra_context):
294 # Casting to numeric is unnecessary.
295 return self.as_sql(compiler, connection, **extra_context)