Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/django/db/models/query_utils.py: 35%
232 statements
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
1"""
2Various data structures used in query construction.
4Factored out from django.db.models.query to avoid making the main module very
5large and/or so that they can be used by other modules without getting into
6circular import difficulties.
7"""
8import functools
9import inspect
10import logging
11from collections import namedtuple
13from django.core.exceptions import FieldError
14from django.db import DEFAULT_DB_ALIAS, DatabaseError, connections
15from django.db.models.constants import LOOKUP_SEP
16from django.utils import tree
18logger = logging.getLogger("django.db.models")
20# PathInfo is used when converting lookups (fk__somecol). The contents
21# describe the relation in Model terms (model Options and Fields for both
22# sides of the relation. The join_field is the field backing the relation.
23PathInfo = namedtuple(
24 "PathInfo",
25 "from_opts to_opts target_fields join_field m2m direct filtered_relation",
26)
29def subclasses(cls):
30 yield cls
31 for subclass in cls.__subclasses__():
32 yield from subclasses(subclass)
35class Q(tree.Node):
36 """
37 Encapsulate filters as objects that can then be combined logically (using
38 `&` and `|`).
39 """
41 # Connection types
42 AND = "AND"
43 OR = "OR"
44 XOR = "XOR"
45 default = AND
46 conditional = True
48 def __init__(self, *args, _connector=None, _negated=False, **kwargs):
49 super().__init__(
50 children=[*args, *sorted(kwargs.items())],
51 connector=_connector,
52 negated=_negated,
53 )
55 def _combine(self, other, conn):
56 if getattr(other, "conditional", False) is False:
57 raise TypeError(other)
58 if not self:
59 return other.copy()
60 if not other and isinstance(other, Q):
61 return self.copy()
63 obj = self.create(connector=conn)
64 obj.add(self, conn)
65 obj.add(other, conn)
66 return obj
68 def __or__(self, other):
69 return self._combine(other, self.OR)
71 def __and__(self, other):
72 return self._combine(other, self.AND)
74 def __xor__(self, other):
75 return self._combine(other, self.XOR)
77 def __invert__(self):
78 obj = self.copy()
79 obj.negate()
80 return obj
82 def resolve_expression(
83 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False
84 ):
85 # We must promote any new joins to left outer joins so that when Q is
86 # used as an expression, rows aren't filtered due to joins.
87 clause, joins = query._add_q(
88 self,
89 reuse,
90 allow_joins=allow_joins,
91 split_subq=False,
92 check_filterable=False,
93 summarize=summarize,
94 )
95 query.promote_joins(joins)
96 return clause
98 def flatten(self):
99 """
100 Recursively yield this Q object and all subexpressions, in depth-first
101 order.
102 """
103 yield self
104 for child in self.children:
105 if isinstance(child, tuple):
106 # Use the lookup.
107 child = child[1]
108 if hasattr(child, "flatten"):
109 yield from child.flatten()
110 else:
111 yield child
113 def check(self, against, using=DEFAULT_DB_ALIAS):
114 """
115 Do a database query to check if the expressions of the Q instance
116 matches against the expressions.
117 """
118 # Avoid circular imports.
119 from django.db.models import BooleanField, Value
120 from django.db.models.functions import Coalesce
121 from django.db.models.sql import Query
122 from django.db.models.sql.constants import SINGLE
124 query = Query(None)
125 for name, value in against.items():
126 if not hasattr(value, "resolve_expression"):
127 value = Value(value)
128 query.add_annotation(value, name, select=False)
129 query.add_annotation(Value(1), "_check")
130 # This will raise a FieldError if a field is missing in "against".
131 if connections[using].features.supports_comparing_boolean_expr:
132 query.add_q(Q(Coalesce(self, True, output_field=BooleanField())))
133 else:
134 query.add_q(self)
135 compiler = query.get_compiler(using=using)
136 try:
137 return compiler.execute_sql(SINGLE) is not None
138 except DatabaseError as e:
139 logger.warning("Got a database error calling check() on %r: %s", self, e)
140 return True
142 def deconstruct(self):
143 path = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
144 if path.startswith("django.db.models.query_utils"):
145 path = path.replace("django.db.models.query_utils", "django.db.models")
146 args = tuple(self.children)
147 kwargs = {}
148 if self.connector != self.default:
149 kwargs["_connector"] = self.connector
150 if self.negated:
151 kwargs["_negated"] = True
152 return path, args, kwargs
155class DeferredAttribute:
156 """
157 A wrapper for a deferred-loading field. When the value is read from this
158 object the first time, the query is executed.
159 """
161 def __init__(self, field):
162 self.field = field
164 def __get__(self, instance, cls=None):
165 """
166 Retrieve and caches the value from the datastore on the first lookup.
167 Return the cached value.
168 """
169 if instance is None:
170 return self
171 data = instance.__dict__
172 field_name = self.field.attname
173 if field_name not in data:
174 # Let's see if the field is part of the parent chain. If so we
175 # might be able to reuse the already loaded value. Refs #18343.
176 val = self._check_parent_chain(instance)
177 if val is None:
178 instance.refresh_from_db(fields=[field_name])
179 else:
180 data[field_name] = val
181 return data[field_name]
183 def _check_parent_chain(self, instance):
184 """
185 Check if the field value can be fetched from a parent field already
186 loaded in the instance. This can be done if the to-be fetched
187 field is a primary key field.
188 """
189 opts = instance._meta
190 link_field = opts.get_ancestor_link(self.field.model)
191 if self.field.primary_key and self.field != link_field:
192 return getattr(instance, link_field.attname)
193 return None
196class class_or_instance_method:
197 """
198 Hook used in RegisterLookupMixin to return partial functions depending on
199 the caller type (instance or class of models.Field).
200 """
202 def __init__(self, class_method, instance_method):
203 self.class_method = class_method
204 self.instance_method = instance_method
206 def __get__(self, instance, owner):
207 if instance is None:
208 return functools.partial(self.class_method, owner)
209 return functools.partial(self.instance_method, instance)
212class RegisterLookupMixin:
213 def _get_lookup(self, lookup_name):
214 return self.get_lookups().get(lookup_name, None)
216 @functools.lru_cache(maxsize=None)
217 def get_class_lookups(cls):
218 class_lookups = [
219 parent.__dict__.get("class_lookups", {}) for parent in inspect.getmro(cls)
220 ]
221 return cls.merge_dicts(class_lookups)
223 def get_instance_lookups(self):
224 class_lookups = self.get_class_lookups()
225 if instance_lookups := getattr(self, "instance_lookups", None):
226 return {**class_lookups, **instance_lookups}
227 return class_lookups
229 get_lookups = class_or_instance_method(get_class_lookups, get_instance_lookups)
230 get_class_lookups = classmethod(get_class_lookups)
232 def get_lookup(self, lookup_name):
233 from django.db.models.lookups import Lookup
235 found = self._get_lookup(lookup_name)
236 if found is None and hasattr(self, "output_field"):
237 return self.output_field.get_lookup(lookup_name)
238 if found is not None and not issubclass(found, Lookup):
239 return None
240 return found
242 def get_transform(self, lookup_name):
243 from django.db.models.lookups import Transform
245 found = self._get_lookup(lookup_name)
246 if found is None and hasattr(self, "output_field"):
247 return self.output_field.get_transform(lookup_name)
248 if found is not None and not issubclass(found, Transform):
249 return None
250 return found
252 @staticmethod
253 def merge_dicts(dicts):
254 """
255 Merge dicts in reverse to preference the order of the original list. e.g.,
256 merge_dicts([a, b]) will preference the keys in 'a' over those in 'b'.
257 """
258 merged = {}
259 for d in reversed(dicts):
260 merged.update(d)
261 return merged
263 @classmethod
264 def _clear_cached_class_lookups(cls):
265 for subclass in subclasses(cls):
266 subclass.get_class_lookups.cache_clear()
268 def register_class_lookup(cls, lookup, lookup_name=None):
269 if lookup_name is None:
270 lookup_name = lookup.lookup_name
271 if "class_lookups" not in cls.__dict__:
272 cls.class_lookups = {}
273 cls.class_lookups[lookup_name] = lookup
274 cls._clear_cached_class_lookups()
275 return lookup
277 def register_instance_lookup(self, lookup, lookup_name=None):
278 if lookup_name is None:
279 lookup_name = lookup.lookup_name
280 if "instance_lookups" not in self.__dict__:
281 self.instance_lookups = {}
282 self.instance_lookups[lookup_name] = lookup
283 return lookup
285 register_lookup = class_or_instance_method(
286 register_class_lookup, register_instance_lookup
287 )
288 register_class_lookup = classmethod(register_class_lookup)
290 def _unregister_class_lookup(cls, lookup, lookup_name=None):
291 """
292 Remove given lookup from cls lookups. For use in tests only as it's
293 not thread-safe.
294 """
295 if lookup_name is None:
296 lookup_name = lookup.lookup_name
297 del cls.class_lookups[lookup_name]
298 cls._clear_cached_class_lookups()
300 def _unregister_instance_lookup(self, lookup, lookup_name=None):
301 """
302 Remove given lookup from instance lookups. For use in tests only as
303 it's not thread-safe.
304 """
305 if lookup_name is None:
306 lookup_name = lookup.lookup_name
307 del self.instance_lookups[lookup_name]
309 _unregister_lookup = class_or_instance_method(
310 _unregister_class_lookup, _unregister_instance_lookup
311 )
312 _unregister_class_lookup = classmethod(_unregister_class_lookup)
315def select_related_descend(field, restricted, requested, select_mask, reverse=False):
316 """
317 Return True if this field should be used to descend deeper for
318 select_related() purposes. Used by both the query construction code
319 (compiler.get_related_selections()) and the model instance creation code
320 (compiler.klass_info).
322 Arguments:
323 * field - the field to be checked
324 * restricted - a boolean field, indicating if the field list has been
325 manually restricted using a requested clause)
326 * requested - The select_related() dictionary.
327 * select_mask - the dictionary of selected fields.
328 * reverse - boolean, True if we are checking a reverse select related
329 """
330 if not field.remote_field:
331 return False
332 if field.remote_field.parent_link and not reverse:
333 return False
334 if restricted:
335 if reverse and field.related_query_name() not in requested:
336 return False
337 if not reverse and field.name not in requested:
338 return False
339 if not restricted and field.null:
340 return False
341 if (
342 restricted
343 and select_mask
344 and field.name in requested
345 and field not in select_mask
346 ):
347 raise FieldError(
348 f"Field {field.model._meta.object_name}.{field.name} cannot be both "
349 "deferred and traversed using select_related at the same time."
350 )
351 return True
354def refs_expression(lookup_parts, annotations):
355 """
356 Check if the lookup_parts contains references to the given annotations set.
357 Because the LOOKUP_SEP is contained in the default annotation names, check
358 each prefix of the lookup_parts for a match.
359 """
360 for n in range(1, len(lookup_parts) + 1):
361 level_n_lookup = LOOKUP_SEP.join(lookup_parts[0:n])
362 if annotations.get(level_n_lookup):
363 return level_n_lookup, lookup_parts[n:]
364 return None, ()
367def check_rel_lookup_compatibility(model, target_opts, field):
368 """
369 Check that self.model is compatible with target_opts. Compatibility
370 is OK if:
371 1) model and opts match (where proxy inheritance is removed)
372 2) model is parent of opts' model or the other way around
373 """
375 def check(opts):
376 return (
377 model._meta.concrete_model == opts.concrete_model
378 or opts.concrete_model in model._meta.get_parent_list()
379 or model in opts.get_parent_list()
380 )
382 # If the field is a primary key, then doing a query against the field's
383 # model is ok, too. Consider the case:
384 # class Restaurant(models.Model):
385 # place = OneToOneField(Place, primary_key=True):
386 # Restaurant.objects.filter(pk__in=Restaurant.objects.all()).
387 # If we didn't have the primary key check, then pk__in (== place__in) would
388 # give Place's opts as the target opts, but Restaurant isn't compatible
389 # with that. This logic applies only to primary keys, as when doing __in=qs,
390 # we are going to turn this into __in=qs.values('pk') later on.
391 return check(target_opts) or (
392 getattr(field, "primary_key", False) and check(field.model._meta)
393 )
396class FilteredRelation:
397 """Specify custom filtering in the ON clause of SQL joins."""
399 def __init__(self, relation_name, *, condition=Q()):
400 if not relation_name:
401 raise ValueError("relation_name cannot be empty.")
402 self.relation_name = relation_name
403 self.alias = None
404 if not isinstance(condition, Q):
405 raise ValueError("condition argument must be a Q() instance.")
406 self.condition = condition
407 self.path = []
409 def __eq__(self, other):
410 if not isinstance(other, self.__class__):
411 return NotImplemented
412 return (
413 self.relation_name == other.relation_name
414 and self.alias == other.alias
415 and self.condition == other.condition
416 )
418 def clone(self):
419 clone = FilteredRelation(self.relation_name, condition=self.condition)
420 clone.alias = self.alias
421 clone.path = self.path[:]
422 return clone
424 def resolve_expression(self, *args, **kwargs):
425 """
426 QuerySet.annotate() only accepts expression-like arguments
427 (with a resolve_expression() method).
428 """
429 raise NotImplementedError("FilteredRelation.resolve_expression() is unused.")
431 def as_sql(self, compiler, connection):
432 # Resolve the condition in Join.filtered_relation.
433 query = compiler.query
434 where = query.build_filtered_relation_q(self.condition, reuse=set(self.path))
435 return compiler.compile(where)