Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/django/db/models/indexes.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

143 statements  

1from types import NoneType 

2 

3from django.db.backends.utils import names_digest, split_identifier 

4from django.db.models.expressions import Col, ExpressionList, F, Func, OrderBy 

5from django.db.models.functions import Collate 

6from django.db.models.query_utils import Q 

7from django.db.models.sql import Query 

8from django.utils.functional import partition 

9 

10__all__ = ["Index"] 

11 

12 

13class Index: 

14 suffix = "idx" 

15 # The max length of the name of the index (restricted to 30 for 

16 # cross-database compatibility with Oracle) 

17 max_name_length = 30 

18 

19 def __init__( 

20 self, 

21 *expressions, 

22 fields=(), 

23 name=None, 

24 db_tablespace=None, 

25 opclasses=(), 

26 condition=None, 

27 include=None, 

28 ): 

29 if opclasses and not name: 

30 raise ValueError("An index must be named to use opclasses.") 

31 if not isinstance(condition, (NoneType, Q)): 

32 raise ValueError("Index.condition must be a Q instance.") 

33 if condition and not name: 

34 raise ValueError("An index must be named to use condition.") 

35 if not isinstance(fields, (list, tuple)): 

36 raise ValueError("Index.fields must be a list or tuple.") 

37 if not isinstance(opclasses, (list, tuple)): 

38 raise ValueError("Index.opclasses must be a list or tuple.") 

39 if not expressions and not fields: 

40 raise ValueError( 

41 "At least one field or expression is required to define an index." 

42 ) 

43 if expressions and fields: 

44 raise ValueError( 

45 "Index.fields and expressions are mutually exclusive.", 

46 ) 

47 if expressions and not name: 

48 raise ValueError("An index must be named to use expressions.") 

49 if expressions and opclasses: 

50 raise ValueError( 

51 "Index.opclasses cannot be used with expressions. Use " 

52 "django.contrib.postgres.indexes.OpClass() instead." 

53 ) 

54 if opclasses and len(fields) != len(opclasses): 

55 raise ValueError( 

56 "Index.fields and Index.opclasses must have the same number of " 

57 "elements." 

58 ) 

59 if fields and not all(isinstance(field, str) for field in fields): 

60 raise ValueError("Index.fields must contain only strings with field names.") 

61 if include and not name: 

62 raise ValueError("A covering index must be named.") 

63 if not isinstance(include, (NoneType, list, tuple)): 

64 raise ValueError("Index.include must be a list or tuple.") 

65 self.fields = list(fields) 

66 # A list of 2-tuple with the field name and ordering ('' or 'DESC'). 

67 self.fields_orders = [ 

68 (field_name.removeprefix("-"), "DESC" if field_name.startswith("-") else "") 

69 for field_name in self.fields 

70 ] 

71 self.name = name or "" 

72 self.db_tablespace = db_tablespace 

73 self.opclasses = opclasses 

74 self.condition = condition 

75 self.include = tuple(include) if include else () 

76 self.expressions = tuple( 

77 F(expression) if isinstance(expression, str) else expression 

78 for expression in expressions 

79 ) 

80 

81 @property 

82 def contains_expressions(self): 

83 return bool(self.expressions) 

84 

85 def _get_condition_sql(self, model, schema_editor): 

86 if self.condition is None: 

87 return None 

88 query = Query(model=model, alias_cols=False) 

89 where = query.build_where(self.condition) 

90 compiler = query.get_compiler(connection=schema_editor.connection) 

91 sql, params = where.as_sql(compiler, schema_editor.connection) 

92 return sql % tuple(schema_editor.quote_value(p) for p in params) 

93 

94 def create_sql(self, model, schema_editor, using="", **kwargs): 

95 include = [ 

96 model._meta.get_field(field_name).column for field_name in self.include 

97 ] 

98 condition = self._get_condition_sql(model, schema_editor) 

99 if self.expressions: 

100 index_expressions = [] 

101 for expression in self.expressions: 

102 index_expression = IndexExpression(expression) 

103 index_expression.set_wrapper_classes(schema_editor.connection) 

104 index_expressions.append(index_expression) 

105 expressions = ExpressionList(*index_expressions).resolve_expression( 

106 Query(model, alias_cols=False), 

107 ) 

108 fields = None 

109 col_suffixes = None 

110 else: 

111 fields = [ 

112 model._meta.get_field(field_name) 

113 for field_name, _ in self.fields_orders 

114 ] 

115 if schema_editor.connection.features.supports_index_column_ordering: 

116 col_suffixes = [order[1] for order in self.fields_orders] 

117 else: 

118 col_suffixes = [""] * len(self.fields_orders) 

119 expressions = None 

120 return schema_editor._create_index_sql( 

121 model, 

122 fields=fields, 

123 name=self.name, 

124 using=using, 

125 db_tablespace=self.db_tablespace, 

126 col_suffixes=col_suffixes, 

127 opclasses=self.opclasses, 

128 condition=condition, 

129 include=include, 

130 expressions=expressions, 

131 **kwargs, 

132 ) 

133 

134 def remove_sql(self, model, schema_editor, **kwargs): 

135 return schema_editor._delete_index_sql(model, self.name, **kwargs) 

136 

137 def deconstruct(self): 

138 path = "%s.%s" % (self.__class__.__module__, self.__class__.__name__) 

139 path = path.replace("django.db.models.indexes", "django.db.models") 

140 kwargs = {"name": self.name} 

141 if self.fields: 

142 kwargs["fields"] = self.fields 

143 if self.db_tablespace is not None: 

144 kwargs["db_tablespace"] = self.db_tablespace 

145 if self.opclasses: 

146 kwargs["opclasses"] = self.opclasses 

147 if self.condition: 

148 kwargs["condition"] = self.condition 

149 if self.include: 

150 kwargs["include"] = self.include 

151 return (path, self.expressions, kwargs) 

152 

153 def clone(self): 

154 """Create a copy of this Index.""" 

155 _, args, kwargs = self.deconstruct() 

156 return self.__class__(*args, **kwargs) 

157 

158 def set_name_with_model(self, model): 

159 """ 

160 Generate a unique name for the index. 

161 

162 The name is divided into 3 parts - table name (12 chars), field name 

163 (8 chars) and unique hash + suffix (10 chars). Each part is made to 

164 fit its size by truncating the excess length. 

165 """ 

166 _, table_name = split_identifier(model._meta.db_table) 

167 column_names = [ 

168 model._meta.get_field(field_name).column 

169 for field_name, order in self.fields_orders 

170 ] 

171 column_names_with_order = [ 

172 (("-%s" if order else "%s") % column_name) 

173 for column_name, (field_name, order) in zip( 

174 column_names, self.fields_orders 

175 ) 

176 ] 

177 # The length of the parts of the name is based on the default max 

178 # length of 30 characters. 

179 hash_data = [table_name] + column_names_with_order + [self.suffix] 

180 self.name = "%s_%s_%s" % ( 

181 table_name[:11], 

182 column_names[0][:7], 

183 "%s_%s" % (names_digest(*hash_data, length=6), self.suffix), 

184 ) 

185 if len(self.name) > self.max_name_length: 

186 raise ValueError( 

187 "Index too long for multiple database support. Is self.suffix " 

188 "longer than 3 characters?" 

189 ) 

190 if self.name[0] == "_" or self.name[0].isdigit(): 

191 self.name = "D%s" % self.name[1:] 

192 

193 def __repr__(self): 

194 return "<%s:%s%s%s%s%s%s%s>" % ( 

195 self.__class__.__qualname__, 

196 "" if not self.fields else " fields=%s" % repr(self.fields), 

197 "" if not self.expressions else " expressions=%s" % repr(self.expressions), 

198 "" if not self.name else " name=%s" % repr(self.name), 

199 ( 

200 "" 

201 if self.db_tablespace is None 

202 else " db_tablespace=%s" % repr(self.db_tablespace) 

203 ), 

204 "" if self.condition is None else " condition=%s" % self.condition, 

205 "" if not self.include else " include=%s" % repr(self.include), 

206 "" if not self.opclasses else " opclasses=%s" % repr(self.opclasses), 

207 ) 

208 

209 def __eq__(self, other): 

210 if self.__class__ == other.__class__: 

211 return self.deconstruct() == other.deconstruct() 

212 return NotImplemented 

213 

214 

215class IndexExpression(Func): 

216 """Order and wrap expressions for CREATE INDEX statements.""" 

217 

218 template = "%(expressions)s" 

219 wrapper_classes = (OrderBy, Collate) 

220 

221 def set_wrapper_classes(self, connection=None): 

222 # Some databases (e.g. MySQL) treats COLLATE as an indexed expression. 

223 if connection and connection.features.collate_as_index_expression: 

224 self.wrapper_classes = tuple( 

225 [ 

226 wrapper_cls 

227 for wrapper_cls in self.wrapper_classes 

228 if wrapper_cls is not Collate 

229 ] 

230 ) 

231 

232 @classmethod 

233 def register_wrappers(cls, *wrapper_classes): 

234 cls.wrapper_classes = wrapper_classes 

235 

236 def resolve_expression( 

237 self, 

238 query=None, 

239 allow_joins=True, 

240 reuse=None, 

241 summarize=False, 

242 for_save=False, 

243 ): 

244 expressions = list(self.flatten()) 

245 # Split expressions and wrappers. 

246 index_expressions, wrappers = partition( 

247 lambda e: isinstance(e, self.wrapper_classes), 

248 expressions, 

249 ) 

250 wrapper_types = [type(wrapper) for wrapper in wrappers] 

251 if len(wrapper_types) != len(set(wrapper_types)): 

252 raise ValueError( 

253 "Multiple references to %s can't be used in an indexed " 

254 "expression." 

255 % ", ".join( 

256 [wrapper_cls.__qualname__ for wrapper_cls in self.wrapper_classes] 

257 ) 

258 ) 

259 if expressions[1 : len(wrappers) + 1] != wrappers: 

260 raise ValueError( 

261 "%s must be topmost expressions in an indexed expression." 

262 % ", ".join( 

263 [wrapper_cls.__qualname__ for wrapper_cls in self.wrapper_classes] 

264 ) 

265 ) 

266 # Wrap expressions in parentheses if they are not column references. 

267 root_expression = index_expressions[1] 

268 resolve_root_expression = root_expression.resolve_expression( 

269 query, 

270 allow_joins, 

271 reuse, 

272 summarize, 

273 for_save, 

274 ) 

275 if not isinstance(resolve_root_expression, Col): 

276 root_expression = Func(root_expression, template="(%(expressions)s)") 

277 

278 if wrappers: 

279 # Order wrappers and set their expressions. 

280 wrappers = sorted( 

281 wrappers, 

282 key=lambda w: self.wrapper_classes.index(type(w)), 

283 ) 

284 wrappers = [wrapper.copy() for wrapper in wrappers] 

285 for i, wrapper in enumerate(wrappers[:-1]): 

286 wrapper.set_source_expressions([wrappers[i + 1]]) 

287 # Set the root expression on the deepest wrapper. 

288 wrappers[-1].set_source_expressions([root_expression]) 

289 self.set_source_expressions([wrappers[0]]) 

290 else: 

291 # Use the root expression, if there are no wrappers. 

292 self.set_source_expressions([root_expression]) 

293 return super().resolve_expression( 

294 query, allow_joins, reuse, summarize, for_save 

295 ) 

296 

297 def as_sqlite(self, compiler, connection, **extra_context): 

298 # Casting to numeric is unnecessary. 

299 return self.as_sql(compiler, connection, **extra_context)