1from types import NoneType
2
3from django.db.backends.utils import names_digest, split_identifier
4from django.db.models.expressions import Col, ExpressionList, F, Func, OrderBy
5from django.db.models.functions import Collate
6from django.db.models.query_utils import Q
7from django.db.models.sql import Query
8from django.utils.functional import partition
9
10__all__ = ["Index"]
11
12
13class Index:
14 suffix = "idx"
15 # The max length of the name of the index (restricted to 30 for
16 # cross-database compatibility with Oracle)
17 max_name_length = 30
18
19 def __init__(
20 self,
21 *expressions,
22 fields=(),
23 name=None,
24 db_tablespace=None,
25 opclasses=(),
26 condition=None,
27 include=None,
28 ):
29 if opclasses and not name:
30 raise ValueError("An index must be named to use opclasses.")
31 if not isinstance(condition, (NoneType, Q)):
32 raise ValueError("Index.condition must be a Q instance.")
33 if condition and not name:
34 raise ValueError("An index must be named to use condition.")
35 if not isinstance(fields, (list, tuple)):
36 raise ValueError("Index.fields must be a list or tuple.")
37 if not isinstance(opclasses, (list, tuple)):
38 raise ValueError("Index.opclasses must be a list or tuple.")
39 if not expressions and not fields:
40 raise ValueError(
41 "At least one field or expression is required to define an index."
42 )
43 if expressions and fields:
44 raise ValueError(
45 "Index.fields and expressions are mutually exclusive.",
46 )
47 if expressions and not name:
48 raise ValueError("An index must be named to use expressions.")
49 if expressions and opclasses:
50 raise ValueError(
51 "Index.opclasses cannot be used with expressions. Use "
52 "django.contrib.postgres.indexes.OpClass() instead."
53 )
54 if opclasses and len(fields) != len(opclasses):
55 raise ValueError(
56 "Index.fields and Index.opclasses must have the same number of "
57 "elements."
58 )
59 if fields and not all(isinstance(field, str) for field in fields):
60 raise ValueError("Index.fields must contain only strings with field names.")
61 if include and not name:
62 raise ValueError("A covering index must be named.")
63 if not isinstance(include, (NoneType, list, tuple)):
64 raise ValueError("Index.include must be a list or tuple.")
65 self.fields = list(fields)
66 # A list of 2-tuple with the field name and ordering ('' or 'DESC').
67 self.fields_orders = [
68 (field_name.removeprefix("-"), "DESC" if field_name.startswith("-") else "")
69 for field_name in self.fields
70 ]
71 self.name = name or ""
72 self.db_tablespace = db_tablespace
73 self.opclasses = opclasses
74 self.condition = condition
75 self.include = tuple(include) if include else ()
76 self.expressions = tuple(
77 F(expression) if isinstance(expression, str) else expression
78 for expression in expressions
79 )
80
81 @property
82 def contains_expressions(self):
83 return bool(self.expressions)
84
85 def _get_condition_sql(self, model, schema_editor):
86 if self.condition is None:
87 return None
88 query = Query(model=model, alias_cols=False)
89 where = query.build_where(self.condition)
90 compiler = query.get_compiler(connection=schema_editor.connection)
91 sql, params = where.as_sql(compiler, schema_editor.connection)
92 return sql % tuple(schema_editor.quote_value(p) for p in params)
93
94 def create_sql(self, model, schema_editor, using="", **kwargs):
95 include = [
96 model._meta.get_field(field_name).column for field_name in self.include
97 ]
98 condition = self._get_condition_sql(model, schema_editor)
99 if self.expressions:
100 index_expressions = []
101 for expression in self.expressions:
102 index_expression = IndexExpression(expression)
103 index_expression.set_wrapper_classes(schema_editor.connection)
104 index_expressions.append(index_expression)
105 expressions = ExpressionList(*index_expressions).resolve_expression(
106 Query(model, alias_cols=False),
107 )
108 fields = None
109 col_suffixes = None
110 else:
111 fields = [
112 model._meta.get_field(field_name)
113 for field_name, _ in self.fields_orders
114 ]
115 if schema_editor.connection.features.supports_index_column_ordering:
116 col_suffixes = [order[1] for order in self.fields_orders]
117 else:
118 col_suffixes = [""] * len(self.fields_orders)
119 expressions = None
120 return schema_editor._create_index_sql(
121 model,
122 fields=fields,
123 name=self.name,
124 using=using,
125 db_tablespace=self.db_tablespace,
126 col_suffixes=col_suffixes,
127 opclasses=self.opclasses,
128 condition=condition,
129 include=include,
130 expressions=expressions,
131 **kwargs,
132 )
133
134 def remove_sql(self, model, schema_editor, **kwargs):
135 return schema_editor._delete_index_sql(model, self.name, **kwargs)
136
137 def deconstruct(self):
138 path = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
139 path = path.replace("django.db.models.indexes", "django.db.models")
140 kwargs = {"name": self.name}
141 if self.fields:
142 kwargs["fields"] = self.fields
143 if self.db_tablespace is not None:
144 kwargs["db_tablespace"] = self.db_tablespace
145 if self.opclasses:
146 kwargs["opclasses"] = self.opclasses
147 if self.condition:
148 kwargs["condition"] = self.condition
149 if self.include:
150 kwargs["include"] = self.include
151 return (path, self.expressions, kwargs)
152
153 def clone(self):
154 """Create a copy of this Index."""
155 _, args, kwargs = self.deconstruct()
156 return self.__class__(*args, **kwargs)
157
158 def set_name_with_model(self, model):
159 """
160 Generate a unique name for the index.
161
162 The name is divided into 3 parts - table name (12 chars), field name
163 (8 chars) and unique hash + suffix (10 chars). Each part is made to
164 fit its size by truncating the excess length.
165 """
166 _, table_name = split_identifier(model._meta.db_table)
167 column_names = [
168 model._meta.get_field(field_name).column
169 for field_name, order in self.fields_orders
170 ]
171 column_names_with_order = [
172 (("-%s" if order else "%s") % column_name)
173 for column_name, (field_name, order) in zip(
174 column_names, self.fields_orders
175 )
176 ]
177 # The length of the parts of the name is based on the default max
178 # length of 30 characters.
179 hash_data = [table_name] + column_names_with_order + [self.suffix]
180 self.name = "%s_%s_%s" % (
181 table_name[:11],
182 column_names[0][:7],
183 "%s_%s" % (names_digest(*hash_data, length=6), self.suffix),
184 )
185 if len(self.name) > self.max_name_length:
186 raise ValueError(
187 "Index too long for multiple database support. Is self.suffix "
188 "longer than 3 characters?"
189 )
190 if self.name[0] == "_" or self.name[0].isdigit():
191 self.name = "D%s" % self.name[1:]
192
193 def __repr__(self):
194 return "<%s:%s%s%s%s%s%s%s>" % (
195 self.__class__.__qualname__,
196 "" if not self.fields else " fields=%s" % repr(self.fields),
197 "" if not self.expressions else " expressions=%s" % repr(self.expressions),
198 "" if not self.name else " name=%s" % repr(self.name),
199 (
200 ""
201 if self.db_tablespace is None
202 else " db_tablespace=%s" % repr(self.db_tablespace)
203 ),
204 "" if self.condition is None else " condition=%s" % self.condition,
205 "" if not self.include else " include=%s" % repr(self.include),
206 "" if not self.opclasses else " opclasses=%s" % repr(self.opclasses),
207 )
208
209 def __eq__(self, other):
210 if self.__class__ == other.__class__:
211 return self.deconstruct() == other.deconstruct()
212 return NotImplemented
213
214
215class IndexExpression(Func):
216 """Order and wrap expressions for CREATE INDEX statements."""
217
218 template = "%(expressions)s"
219 wrapper_classes = (OrderBy, Collate)
220
221 def set_wrapper_classes(self, connection=None):
222 # Some databases (e.g. MySQL) treats COLLATE as an indexed expression.
223 if connection and connection.features.collate_as_index_expression:
224 self.wrapper_classes = tuple(
225 [
226 wrapper_cls
227 for wrapper_cls in self.wrapper_classes
228 if wrapper_cls is not Collate
229 ]
230 )
231
232 @classmethod
233 def register_wrappers(cls, *wrapper_classes):
234 cls.wrapper_classes = wrapper_classes
235
236 def resolve_expression(
237 self,
238 query=None,
239 allow_joins=True,
240 reuse=None,
241 summarize=False,
242 for_save=False,
243 ):
244 expressions = list(self.flatten())
245 # Split expressions and wrappers.
246 index_expressions, wrappers = partition(
247 lambda e: isinstance(e, self.wrapper_classes),
248 expressions,
249 )
250 wrapper_types = [type(wrapper) for wrapper in wrappers]
251 if len(wrapper_types) != len(set(wrapper_types)):
252 raise ValueError(
253 "Multiple references to %s can't be used in an indexed "
254 "expression."
255 % ", ".join(
256 [wrapper_cls.__qualname__ for wrapper_cls in self.wrapper_classes]
257 )
258 )
259 if expressions[1 : len(wrappers) + 1] != wrappers:
260 raise ValueError(
261 "%s must be topmost expressions in an indexed expression."
262 % ", ".join(
263 [wrapper_cls.__qualname__ for wrapper_cls in self.wrapper_classes]
264 )
265 )
266 # Wrap expressions in parentheses if they are not column references.
267 root_expression = index_expressions[1]
268 resolve_root_expression = root_expression.resolve_expression(
269 query,
270 allow_joins,
271 reuse,
272 summarize,
273 for_save,
274 )
275 if not isinstance(resolve_root_expression, Col):
276 root_expression = Func(root_expression, template="(%(expressions)s)")
277
278 if wrappers:
279 # Order wrappers and set their expressions.
280 wrappers = sorted(
281 wrappers,
282 key=lambda w: self.wrapper_classes.index(type(w)),
283 )
284 wrappers = [wrapper.copy() for wrapper in wrappers]
285 for i, wrapper in enumerate(wrappers[:-1]):
286 wrapper.set_source_expressions([wrappers[i + 1]])
287 # Set the root expression on the deepest wrapper.
288 wrappers[-1].set_source_expressions([root_expression])
289 self.set_source_expressions([wrappers[0]])
290 else:
291 # Use the root expression, if there are no wrappers.
292 self.set_source_expressions([root_expression])
293 return super().resolve_expression(
294 query, allow_joins, reuse, summarize, for_save
295 )
296
297 def as_sqlite(self, compiler, connection, **extra_context):
298 # Casting to numeric is unnecessary.
299 return self.as_sql(compiler, connection, **extra_context)