Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/django/db/models/query_utils.py: 35%

232 statements  

« prev     ^ index     » next       coverage.py v7.0.5, created at 2023-01-17 06:13 +0000

1""" 

2Various data structures used in query construction. 

3 

4Factored out from django.db.models.query to avoid making the main module very 

5large and/or so that they can be used by other modules without getting into 

6circular import difficulties. 

7""" 

8import functools 

9import inspect 

10import logging 

11from collections import namedtuple 

12 

13from django.core.exceptions import FieldError 

14from django.db import DEFAULT_DB_ALIAS, DatabaseError, connections 

15from django.db.models.constants import LOOKUP_SEP 

16from django.utils import tree 

17 

18logger = logging.getLogger("django.db.models") 

19 

20# PathInfo is used when converting lookups (fk__somecol). The contents 

21# describe the relation in Model terms (model Options and Fields for both 

22# sides of the relation. The join_field is the field backing the relation. 

23PathInfo = namedtuple( 

24 "PathInfo", 

25 "from_opts to_opts target_fields join_field m2m direct filtered_relation", 

26) 

27 

28 

29def subclasses(cls): 

30 yield cls 

31 for subclass in cls.__subclasses__(): 

32 yield from subclasses(subclass) 

33 

34 

35class Q(tree.Node): 

36 """ 

37 Encapsulate filters as objects that can then be combined logically (using 

38 `&` and `|`). 

39 """ 

40 

41 # Connection types 

42 AND = "AND" 

43 OR = "OR" 

44 XOR = "XOR" 

45 default = AND 

46 conditional = True 

47 

48 def __init__(self, *args, _connector=None, _negated=False, **kwargs): 

49 super().__init__( 

50 children=[*args, *sorted(kwargs.items())], 

51 connector=_connector, 

52 negated=_negated, 

53 ) 

54 

55 def _combine(self, other, conn): 

56 if getattr(other, "conditional", False) is False: 

57 raise TypeError(other) 

58 if not self: 

59 return other.copy() 

60 if not other and isinstance(other, Q): 

61 return self.copy() 

62 

63 obj = self.create(connector=conn) 

64 obj.add(self, conn) 

65 obj.add(other, conn) 

66 return obj 

67 

68 def __or__(self, other): 

69 return self._combine(other, self.OR) 

70 

71 def __and__(self, other): 

72 return self._combine(other, self.AND) 

73 

74 def __xor__(self, other): 

75 return self._combine(other, self.XOR) 

76 

77 def __invert__(self): 

78 obj = self.copy() 

79 obj.negate() 

80 return obj 

81 

82 def resolve_expression( 

83 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False 

84 ): 

85 # We must promote any new joins to left outer joins so that when Q is 

86 # used as an expression, rows aren't filtered due to joins. 

87 clause, joins = query._add_q( 

88 self, 

89 reuse, 

90 allow_joins=allow_joins, 

91 split_subq=False, 

92 check_filterable=False, 

93 summarize=summarize, 

94 ) 

95 query.promote_joins(joins) 

96 return clause 

97 

98 def flatten(self): 

99 """ 

100 Recursively yield this Q object and all subexpressions, in depth-first 

101 order. 

102 """ 

103 yield self 

104 for child in self.children: 

105 if isinstance(child, tuple): 

106 # Use the lookup. 

107 child = child[1] 

108 if hasattr(child, "flatten"): 

109 yield from child.flatten() 

110 else: 

111 yield child 

112 

113 def check(self, against, using=DEFAULT_DB_ALIAS): 

114 """ 

115 Do a database query to check if the expressions of the Q instance 

116 matches against the expressions. 

117 """ 

118 # Avoid circular imports. 

119 from django.db.models import BooleanField, Value 

120 from django.db.models.functions import Coalesce 

121 from django.db.models.sql import Query 

122 from django.db.models.sql.constants import SINGLE 

123 

124 query = Query(None) 

125 for name, value in against.items(): 

126 if not hasattr(value, "resolve_expression"): 

127 value = Value(value) 

128 query.add_annotation(value, name, select=False) 

129 query.add_annotation(Value(1), "_check") 

130 # This will raise a FieldError if a field is missing in "against". 

131 if connections[using].features.supports_comparing_boolean_expr: 

132 query.add_q(Q(Coalesce(self, True, output_field=BooleanField()))) 

133 else: 

134 query.add_q(self) 

135 compiler = query.get_compiler(using=using) 

136 try: 

137 return compiler.execute_sql(SINGLE) is not None 

138 except DatabaseError as e: 

139 logger.warning("Got a database error calling check() on %r: %s", self, e) 

140 return True 

141 

142 def deconstruct(self): 

143 path = "%s.%s" % (self.__class__.__module__, self.__class__.__name__) 

144 if path.startswith("django.db.models.query_utils"): 

145 path = path.replace("django.db.models.query_utils", "django.db.models") 

146 args = tuple(self.children) 

147 kwargs = {} 

148 if self.connector != self.default: 

149 kwargs["_connector"] = self.connector 

150 if self.negated: 

151 kwargs["_negated"] = True 

152 return path, args, kwargs 

153 

154 

155class DeferredAttribute: 

156 """ 

157 A wrapper for a deferred-loading field. When the value is read from this 

158 object the first time, the query is executed. 

159 """ 

160 

161 def __init__(self, field): 

162 self.field = field 

163 

164 def __get__(self, instance, cls=None): 

165 """ 

166 Retrieve and caches the value from the datastore on the first lookup. 

167 Return the cached value. 

168 """ 

169 if instance is None: 

170 return self 

171 data = instance.__dict__ 

172 field_name = self.field.attname 

173 if field_name not in data: 

174 # Let's see if the field is part of the parent chain. If so we 

175 # might be able to reuse the already loaded value. Refs #18343. 

176 val = self._check_parent_chain(instance) 

177 if val is None: 

178 instance.refresh_from_db(fields=[field_name]) 

179 else: 

180 data[field_name] = val 

181 return data[field_name] 

182 

183 def _check_parent_chain(self, instance): 

184 """ 

185 Check if the field value can be fetched from a parent field already 

186 loaded in the instance. This can be done if the to-be fetched 

187 field is a primary key field. 

188 """ 

189 opts = instance._meta 

190 link_field = opts.get_ancestor_link(self.field.model) 

191 if self.field.primary_key and self.field != link_field: 

192 return getattr(instance, link_field.attname) 

193 return None 

194 

195 

196class class_or_instance_method: 

197 """ 

198 Hook used in RegisterLookupMixin to return partial functions depending on 

199 the caller type (instance or class of models.Field). 

200 """ 

201 

202 def __init__(self, class_method, instance_method): 

203 self.class_method = class_method 

204 self.instance_method = instance_method 

205 

206 def __get__(self, instance, owner): 

207 if instance is None: 

208 return functools.partial(self.class_method, owner) 

209 return functools.partial(self.instance_method, instance) 

210 

211 

212class RegisterLookupMixin: 

213 def _get_lookup(self, lookup_name): 

214 return self.get_lookups().get(lookup_name, None) 

215 

216 @functools.lru_cache(maxsize=None) 

217 def get_class_lookups(cls): 

218 class_lookups = [ 

219 parent.__dict__.get("class_lookups", {}) for parent in inspect.getmro(cls) 

220 ] 

221 return cls.merge_dicts(class_lookups) 

222 

223 def get_instance_lookups(self): 

224 class_lookups = self.get_class_lookups() 

225 if instance_lookups := getattr(self, "instance_lookups", None): 

226 return {**class_lookups, **instance_lookups} 

227 return class_lookups 

228 

229 get_lookups = class_or_instance_method(get_class_lookups, get_instance_lookups) 

230 get_class_lookups = classmethod(get_class_lookups) 

231 

232 def get_lookup(self, lookup_name): 

233 from django.db.models.lookups import Lookup 

234 

235 found = self._get_lookup(lookup_name) 

236 if found is None and hasattr(self, "output_field"): 

237 return self.output_field.get_lookup(lookup_name) 

238 if found is not None and not issubclass(found, Lookup): 

239 return None 

240 return found 

241 

242 def get_transform(self, lookup_name): 

243 from django.db.models.lookups import Transform 

244 

245 found = self._get_lookup(lookup_name) 

246 if found is None and hasattr(self, "output_field"): 

247 return self.output_field.get_transform(lookup_name) 

248 if found is not None and not issubclass(found, Transform): 

249 return None 

250 return found 

251 

252 @staticmethod 

253 def merge_dicts(dicts): 

254 """ 

255 Merge dicts in reverse to preference the order of the original list. e.g., 

256 merge_dicts([a, b]) will preference the keys in 'a' over those in 'b'. 

257 """ 

258 merged = {} 

259 for d in reversed(dicts): 

260 merged.update(d) 

261 return merged 

262 

263 @classmethod 

264 def _clear_cached_class_lookups(cls): 

265 for subclass in subclasses(cls): 

266 subclass.get_class_lookups.cache_clear() 

267 

268 def register_class_lookup(cls, lookup, lookup_name=None): 

269 if lookup_name is None: 

270 lookup_name = lookup.lookup_name 

271 if "class_lookups" not in cls.__dict__: 

272 cls.class_lookups = {} 

273 cls.class_lookups[lookup_name] = lookup 

274 cls._clear_cached_class_lookups() 

275 return lookup 

276 

277 def register_instance_lookup(self, lookup, lookup_name=None): 

278 if lookup_name is None: 

279 lookup_name = lookup.lookup_name 

280 if "instance_lookups" not in self.__dict__: 

281 self.instance_lookups = {} 

282 self.instance_lookups[lookup_name] = lookup 

283 return lookup 

284 

285 register_lookup = class_or_instance_method( 

286 register_class_lookup, register_instance_lookup 

287 ) 

288 register_class_lookup = classmethod(register_class_lookup) 

289 

290 def _unregister_class_lookup(cls, lookup, lookup_name=None): 

291 """ 

292 Remove given lookup from cls lookups. For use in tests only as it's 

293 not thread-safe. 

294 """ 

295 if lookup_name is None: 

296 lookup_name = lookup.lookup_name 

297 del cls.class_lookups[lookup_name] 

298 cls._clear_cached_class_lookups() 

299 

300 def _unregister_instance_lookup(self, lookup, lookup_name=None): 

301 """ 

302 Remove given lookup from instance lookups. For use in tests only as 

303 it's not thread-safe. 

304 """ 

305 if lookup_name is None: 

306 lookup_name = lookup.lookup_name 

307 del self.instance_lookups[lookup_name] 

308 

309 _unregister_lookup = class_or_instance_method( 

310 _unregister_class_lookup, _unregister_instance_lookup 

311 ) 

312 _unregister_class_lookup = classmethod(_unregister_class_lookup) 

313 

314 

315def select_related_descend(field, restricted, requested, select_mask, reverse=False): 

316 """ 

317 Return True if this field should be used to descend deeper for 

318 select_related() purposes. Used by both the query construction code 

319 (compiler.get_related_selections()) and the model instance creation code 

320 (compiler.klass_info). 

321 

322 Arguments: 

323 * field - the field to be checked 

324 * restricted - a boolean field, indicating if the field list has been 

325 manually restricted using a requested clause) 

326 * requested - The select_related() dictionary. 

327 * select_mask - the dictionary of selected fields. 

328 * reverse - boolean, True if we are checking a reverse select related 

329 """ 

330 if not field.remote_field: 

331 return False 

332 if field.remote_field.parent_link and not reverse: 

333 return False 

334 if restricted: 

335 if reverse and field.related_query_name() not in requested: 

336 return False 

337 if not reverse and field.name not in requested: 

338 return False 

339 if not restricted and field.null: 

340 return False 

341 if ( 

342 restricted 

343 and select_mask 

344 and field.name in requested 

345 and field not in select_mask 

346 ): 

347 raise FieldError( 

348 f"Field {field.model._meta.object_name}.{field.name} cannot be both " 

349 "deferred and traversed using select_related at the same time." 

350 ) 

351 return True 

352 

353 

354def refs_expression(lookup_parts, annotations): 

355 """ 

356 Check if the lookup_parts contains references to the given annotations set. 

357 Because the LOOKUP_SEP is contained in the default annotation names, check 

358 each prefix of the lookup_parts for a match. 

359 """ 

360 for n in range(1, len(lookup_parts) + 1): 

361 level_n_lookup = LOOKUP_SEP.join(lookup_parts[0:n]) 

362 if annotations.get(level_n_lookup): 

363 return level_n_lookup, lookup_parts[n:] 

364 return None, () 

365 

366 

367def check_rel_lookup_compatibility(model, target_opts, field): 

368 """ 

369 Check that self.model is compatible with target_opts. Compatibility 

370 is OK if: 

371 1) model and opts match (where proxy inheritance is removed) 

372 2) model is parent of opts' model or the other way around 

373 """ 

374 

375 def check(opts): 

376 return ( 

377 model._meta.concrete_model == opts.concrete_model 

378 or opts.concrete_model in model._meta.get_parent_list() 

379 or model in opts.get_parent_list() 

380 ) 

381 

382 # If the field is a primary key, then doing a query against the field's 

383 # model is ok, too. Consider the case: 

384 # class Restaurant(models.Model): 

385 # place = OneToOneField(Place, primary_key=True): 

386 # Restaurant.objects.filter(pk__in=Restaurant.objects.all()). 

387 # If we didn't have the primary key check, then pk__in (== place__in) would 

388 # give Place's opts as the target opts, but Restaurant isn't compatible 

389 # with that. This logic applies only to primary keys, as when doing __in=qs, 

390 # we are going to turn this into __in=qs.values('pk') later on. 

391 return check(target_opts) or ( 

392 getattr(field, "primary_key", False) and check(field.model._meta) 

393 ) 

394 

395 

396class FilteredRelation: 

397 """Specify custom filtering in the ON clause of SQL joins.""" 

398 

399 def __init__(self, relation_name, *, condition=Q()): 

400 if not relation_name: 

401 raise ValueError("relation_name cannot be empty.") 

402 self.relation_name = relation_name 

403 self.alias = None 

404 if not isinstance(condition, Q): 

405 raise ValueError("condition argument must be a Q() instance.") 

406 self.condition = condition 

407 self.path = [] 

408 

409 def __eq__(self, other): 

410 if not isinstance(other, self.__class__): 

411 return NotImplemented 

412 return ( 

413 self.relation_name == other.relation_name 

414 and self.alias == other.alias 

415 and self.condition == other.condition 

416 ) 

417 

418 def clone(self): 

419 clone = FilteredRelation(self.relation_name, condition=self.condition) 

420 clone.alias = self.alias 

421 clone.path = self.path[:] 

422 return clone 

423 

424 def resolve_expression(self, *args, **kwargs): 

425 """ 

426 QuerySet.annotate() only accepts expression-like arguments 

427 (with a resolve_expression() method). 

428 """ 

429 raise NotImplementedError("FilteredRelation.resolve_expression() is unused.") 

430 

431 def as_sql(self, compiler, connection): 

432 # Resolve the condition in Join.filtered_relation. 

433 query = compiler.query 

434 where = query.build_filtered_relation_q(self.condition, reuse=set(self.path)) 

435 return compiler.compile(where)