Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/django/db/models/lookups.py: 40%
436 statements
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
1import itertools
2import math
4from django.core.exceptions import EmptyResultSet
5from django.db.models.expressions import Case, Expression, Func, Value, When
6from django.db.models.fields import (
7 BooleanField,
8 CharField,
9 DateTimeField,
10 Field,
11 IntegerField,
12 UUIDField,
13)
14from django.db.models.query_utils import RegisterLookupMixin
15from django.utils.datastructures import OrderedSet
16from django.utils.functional import cached_property
17from django.utils.hashable import make_hashable
20class Lookup(Expression):
21 lookup_name = None
22 prepare_rhs = True
23 can_use_none_as_rhs = False
25 def __init__(self, lhs, rhs):
26 self.lhs, self.rhs = lhs, rhs
27 self.rhs = self.get_prep_lookup()
28 self.lhs = self.get_prep_lhs()
29 if hasattr(self.lhs, "get_bilateral_transforms"):
30 bilateral_transforms = self.lhs.get_bilateral_transforms()
31 else:
32 bilateral_transforms = []
33 if bilateral_transforms:
34 # Warn the user as soon as possible if they are trying to apply
35 # a bilateral transformation on a nested QuerySet: that won't work.
36 from django.db.models.sql.query import Query # avoid circular import
38 if isinstance(rhs, Query):
39 raise NotImplementedError(
40 "Bilateral transformations on nested querysets are not implemented."
41 )
42 self.bilateral_transforms = bilateral_transforms
44 def apply_bilateral_transforms(self, value):
45 for transform in self.bilateral_transforms:
46 value = transform(value)
47 return value
49 def __repr__(self):
50 return f"{self.__class__.__name__}({self.lhs!r}, {self.rhs!r})"
52 def batch_process_rhs(self, compiler, connection, rhs=None):
53 if rhs is None:
54 rhs = self.rhs
55 if self.bilateral_transforms:
56 sqls, sqls_params = [], []
57 for p in rhs:
58 value = Value(p, output_field=self.lhs.output_field)
59 value = self.apply_bilateral_transforms(value)
60 value = value.resolve_expression(compiler.query)
61 sql, sql_params = compiler.compile(value)
62 sqls.append(sql)
63 sqls_params.extend(sql_params)
64 else:
65 _, params = self.get_db_prep_lookup(rhs, connection)
66 sqls, sqls_params = ["%s"] * len(params), params
67 return sqls, sqls_params
69 def get_source_expressions(self):
70 if self.rhs_is_direct_value():
71 return [self.lhs]
72 return [self.lhs, self.rhs]
74 def set_source_expressions(self, new_exprs):
75 if len(new_exprs) == 1:
76 self.lhs = new_exprs[0]
77 else:
78 self.lhs, self.rhs = new_exprs
80 def get_prep_lookup(self):
81 if not self.prepare_rhs or hasattr(self.rhs, "resolve_expression"):
82 return self.rhs
83 if hasattr(self.lhs, "output_field"):
84 if hasattr(self.lhs.output_field, "get_prep_value"):
85 return self.lhs.output_field.get_prep_value(self.rhs)
86 elif self.rhs_is_direct_value():
87 return Value(self.rhs)
88 return self.rhs
90 def get_prep_lhs(self):
91 if hasattr(self.lhs, "resolve_expression"):
92 return self.lhs
93 return Value(self.lhs)
95 def get_db_prep_lookup(self, value, connection):
96 return ("%s", [value])
98 def process_lhs(self, compiler, connection, lhs=None):
99 lhs = lhs or self.lhs
100 if hasattr(lhs, "resolve_expression"):
101 lhs = lhs.resolve_expression(compiler.query)
102 sql, params = compiler.compile(lhs)
103 if isinstance(lhs, Lookup):
104 # Wrapped in parentheses to respect operator precedence.
105 sql = f"({sql})"
106 return sql, params
108 def process_rhs(self, compiler, connection):
109 value = self.rhs
110 if self.bilateral_transforms:
111 if self.rhs_is_direct_value():
112 # Do not call get_db_prep_lookup here as the value will be
113 # transformed before being used for lookup
114 value = Value(value, output_field=self.lhs.output_field)
115 value = self.apply_bilateral_transforms(value)
116 value = value.resolve_expression(compiler.query)
117 if hasattr(value, "as_sql"):
118 sql, params = compiler.compile(value)
119 # Ensure expression is wrapped in parentheses to respect operator
120 # precedence but avoid double wrapping as it can be misinterpreted
121 # on some backends (e.g. subqueries on SQLite).
122 if sql and sql[0] != "(":
123 sql = "(%s)" % sql
124 return sql, params
125 else:
126 return self.get_db_prep_lookup(value, connection)
128 def rhs_is_direct_value(self):
129 return not hasattr(self.rhs, "as_sql")
131 def get_group_by_cols(self):
132 cols = []
133 for source in self.get_source_expressions():
134 cols.extend(source.get_group_by_cols())
135 return cols
137 def as_oracle(self, compiler, connection):
138 # Oracle doesn't allow EXISTS() and filters to be compared to another
139 # expression unless they're wrapped in a CASE WHEN.
140 wrapped = False
141 exprs = []
142 for expr in (self.lhs, self.rhs):
143 if connection.ops.conditional_expression_supported_in_where_clause(expr):
144 expr = Case(When(expr, then=True), default=False)
145 wrapped = True
146 exprs.append(expr)
147 lookup = type(self)(*exprs) if wrapped else self
148 return lookup.as_sql(compiler, connection)
150 @cached_property
151 def output_field(self):
152 return BooleanField()
154 @property
155 def identity(self):
156 return self.__class__, self.lhs, self.rhs
158 def __eq__(self, other):
159 if not isinstance(other, Lookup):
160 return NotImplemented
161 return self.identity == other.identity
163 def __hash__(self):
164 return hash(make_hashable(self.identity))
166 def resolve_expression(
167 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False
168 ):
169 c = self.copy()
170 c.is_summary = summarize
171 c.lhs = self.lhs.resolve_expression(
172 query, allow_joins, reuse, summarize, for_save
173 )
174 if hasattr(self.rhs, "resolve_expression"):
175 c.rhs = self.rhs.resolve_expression(
176 query, allow_joins, reuse, summarize, for_save
177 )
178 return c
180 def select_format(self, compiler, sql, params):
181 # Wrap filters with a CASE WHEN expression if a database backend
182 # (e.g. Oracle) doesn't support boolean expression in SELECT or GROUP
183 # BY list.
184 if not compiler.connection.features.supports_boolean_expr_in_select_clause:
185 sql = f"CASE WHEN {sql} THEN 1 ELSE 0 END"
186 return sql, params
189class Transform(RegisterLookupMixin, Func):
190 """
191 RegisterLookupMixin() is first so that get_lookup() and get_transform()
192 first examine self and then check output_field.
193 """
195 bilateral = False
196 arity = 1
198 @property
199 def lhs(self):
200 return self.get_source_expressions()[0]
202 def get_bilateral_transforms(self):
203 if hasattr(self.lhs, "get_bilateral_transforms"):
204 bilateral_transforms = self.lhs.get_bilateral_transforms()
205 else:
206 bilateral_transforms = []
207 if self.bilateral:
208 bilateral_transforms.append(self.__class__)
209 return bilateral_transforms
212class BuiltinLookup(Lookup):
213 def process_lhs(self, compiler, connection, lhs=None):
214 lhs_sql, params = super().process_lhs(compiler, connection, lhs)
215 field_internal_type = self.lhs.output_field.get_internal_type()
216 db_type = self.lhs.output_field.db_type(connection=connection)
217 lhs_sql = connection.ops.field_cast_sql(db_type, field_internal_type) % lhs_sql
218 lhs_sql = (
219 connection.ops.lookup_cast(self.lookup_name, field_internal_type) % lhs_sql
220 )
221 return lhs_sql, list(params)
223 def as_sql(self, compiler, connection):
224 lhs_sql, params = self.process_lhs(compiler, connection)
225 rhs_sql, rhs_params = self.process_rhs(compiler, connection)
226 params.extend(rhs_params)
227 rhs_sql = self.get_rhs_op(connection, rhs_sql)
228 return "%s %s" % (lhs_sql, rhs_sql), params
230 def get_rhs_op(self, connection, rhs):
231 return connection.operators[self.lookup_name] % rhs
234class FieldGetDbPrepValueMixin:
235 """
236 Some lookups require Field.get_db_prep_value() to be called on their
237 inputs.
238 """
240 get_db_prep_lookup_value_is_iterable = False
242 def get_db_prep_lookup(self, value, connection):
243 # For relational fields, use the 'target_field' attribute of the
244 # output_field.
245 field = getattr(self.lhs.output_field, "target_field", None)
246 get_db_prep_value = (
247 getattr(field, "get_db_prep_value", None)
248 or self.lhs.output_field.get_db_prep_value
249 )
250 return (
251 "%s",
252 [get_db_prep_value(v, connection, prepared=True) for v in value]
253 if self.get_db_prep_lookup_value_is_iterable
254 else [get_db_prep_value(value, connection, prepared=True)],
255 )
258class FieldGetDbPrepValueIterableMixin(FieldGetDbPrepValueMixin):
259 """
260 Some lookups require Field.get_db_prep_value() to be called on each value
261 in an iterable.
262 """
264 get_db_prep_lookup_value_is_iterable = True
266 def get_prep_lookup(self):
267 if hasattr(self.rhs, "resolve_expression"):
268 return self.rhs
269 prepared_values = []
270 for rhs_value in self.rhs:
271 if hasattr(rhs_value, "resolve_expression"):
272 # An expression will be handled by the database but can coexist
273 # alongside real values.
274 pass
275 elif self.prepare_rhs and hasattr(self.lhs.output_field, "get_prep_value"):
276 rhs_value = self.lhs.output_field.get_prep_value(rhs_value)
277 prepared_values.append(rhs_value)
278 return prepared_values
280 def process_rhs(self, compiler, connection):
281 if self.rhs_is_direct_value():
282 # rhs should be an iterable of values. Use batch_process_rhs()
283 # to prepare/transform those values.
284 return self.batch_process_rhs(compiler, connection)
285 else:
286 return super().process_rhs(compiler, connection)
288 def resolve_expression_parameter(self, compiler, connection, sql, param):
289 params = [param]
290 if hasattr(param, "resolve_expression"):
291 param = param.resolve_expression(compiler.query)
292 if hasattr(param, "as_sql"):
293 sql, params = compiler.compile(param)
294 return sql, params
296 def batch_process_rhs(self, compiler, connection, rhs=None):
297 pre_processed = super().batch_process_rhs(compiler, connection, rhs)
298 # The params list may contain expressions which compile to a
299 # sql/param pair. Zip them to get sql and param pairs that refer to the
300 # same argument and attempt to replace them with the result of
301 # compiling the param step.
302 sql, params = zip(
303 *(
304 self.resolve_expression_parameter(compiler, connection, sql, param)
305 for sql, param in zip(*pre_processed)
306 )
307 )
308 params = itertools.chain.from_iterable(params)
309 return sql, tuple(params)
312class PostgresOperatorLookup(Lookup):
313 """Lookup defined by operators on PostgreSQL."""
315 postgres_operator = None
317 def as_postgresql(self, compiler, connection):
318 lhs, lhs_params = self.process_lhs(compiler, connection)
319 rhs, rhs_params = self.process_rhs(compiler, connection)
320 params = tuple(lhs_params) + tuple(rhs_params)
321 return "%s %s %s" % (lhs, self.postgres_operator, rhs), params
324@Field.register_lookup
325class Exact(FieldGetDbPrepValueMixin, BuiltinLookup):
326 lookup_name = "exact"
328 def get_prep_lookup(self):
329 from django.db.models.sql.query import Query # avoid circular import
331 if isinstance(self.rhs, Query):
332 if self.rhs.has_limit_one():
333 if not self.rhs.has_select_fields:
334 self.rhs.clear_select_clause()
335 self.rhs.add_fields(["pk"])
336 else:
337 raise ValueError(
338 "The QuerySet value for an exact lookup must be limited to "
339 "one result using slicing."
340 )
341 return super().get_prep_lookup()
343 def as_sql(self, compiler, connection):
344 # Avoid comparison against direct rhs if lhs is a boolean value. That
345 # turns "boolfield__exact=True" into "WHERE boolean_field" instead of
346 # "WHERE boolean_field = True" when allowed.
347 if (
348 isinstance(self.rhs, bool)
349 and getattr(self.lhs, "conditional", False)
350 and connection.ops.conditional_expression_supported_in_where_clause(
351 self.lhs
352 )
353 ):
354 lhs_sql, params = self.process_lhs(compiler, connection)
355 template = "%s" if self.rhs else "NOT %s"
356 return template % lhs_sql, params
357 return super().as_sql(compiler, connection)
360@Field.register_lookup
361class IExact(BuiltinLookup):
362 lookup_name = "iexact"
363 prepare_rhs = False
365 def process_rhs(self, qn, connection):
366 rhs, params = super().process_rhs(qn, connection)
367 if params:
368 params[0] = connection.ops.prep_for_iexact_query(params[0])
369 return rhs, params
372@Field.register_lookup
373class GreaterThan(FieldGetDbPrepValueMixin, BuiltinLookup):
374 lookup_name = "gt"
377@Field.register_lookup
378class GreaterThanOrEqual(FieldGetDbPrepValueMixin, BuiltinLookup):
379 lookup_name = "gte"
382@Field.register_lookup
383class LessThan(FieldGetDbPrepValueMixin, BuiltinLookup):
384 lookup_name = "lt"
387@Field.register_lookup
388class LessThanOrEqual(FieldGetDbPrepValueMixin, BuiltinLookup):
389 lookup_name = "lte"
392class IntegerFieldFloatRounding:
393 """
394 Allow floats to work as query values for IntegerField. Without this, the
395 decimal portion of the float would always be discarded.
396 """
398 def get_prep_lookup(self):
399 if isinstance(self.rhs, float):
400 self.rhs = math.ceil(self.rhs)
401 return super().get_prep_lookup()
404@IntegerField.register_lookup
405class IntegerGreaterThanOrEqual(IntegerFieldFloatRounding, GreaterThanOrEqual):
406 pass
409@IntegerField.register_lookup
410class IntegerLessThan(IntegerFieldFloatRounding, LessThan):
411 pass
414@Field.register_lookup
415class In(FieldGetDbPrepValueIterableMixin, BuiltinLookup):
416 lookup_name = "in"
418 def get_prep_lookup(self):
419 from django.db.models.sql.query import Query # avoid circular import
421 if isinstance(self.rhs, Query):
422 self.rhs.clear_ordering(clear_default=True)
423 if not self.rhs.has_select_fields:
424 self.rhs.clear_select_clause()
425 self.rhs.add_fields(["pk"])
426 return super().get_prep_lookup()
428 def process_rhs(self, compiler, connection):
429 db_rhs = getattr(self.rhs, "_db", None)
430 if db_rhs is not None and db_rhs != connection.alias:
431 raise ValueError(
432 "Subqueries aren't allowed across different databases. Force "
433 "the inner query to be evaluated using `list(inner_query)`."
434 )
436 if self.rhs_is_direct_value():
437 # Remove None from the list as NULL is never equal to anything.
438 try:
439 rhs = OrderedSet(self.rhs)
440 rhs.discard(None)
441 except TypeError: # Unhashable items in self.rhs
442 rhs = [r for r in self.rhs if r is not None]
444 if not rhs:
445 raise EmptyResultSet
447 # rhs should be an iterable; use batch_process_rhs() to
448 # prepare/transform those values.
449 sqls, sqls_params = self.batch_process_rhs(compiler, connection, rhs)
450 placeholder = "(" + ", ".join(sqls) + ")"
451 return (placeholder, sqls_params)
452 return super().process_rhs(compiler, connection)
454 def get_rhs_op(self, connection, rhs):
455 return "IN %s" % rhs
457 def as_sql(self, compiler, connection):
458 max_in_list_size = connection.ops.max_in_list_size()
459 if (
460 self.rhs_is_direct_value()
461 and max_in_list_size
462 and len(self.rhs) > max_in_list_size
463 ):
464 return self.split_parameter_list_as_sql(compiler, connection)
465 return super().as_sql(compiler, connection)
467 def split_parameter_list_as_sql(self, compiler, connection):
468 # This is a special case for databases which limit the number of
469 # elements which can appear in an 'IN' clause.
470 max_in_list_size = connection.ops.max_in_list_size()
471 lhs, lhs_params = self.process_lhs(compiler, connection)
472 rhs, rhs_params = self.batch_process_rhs(compiler, connection)
473 in_clause_elements = ["("]
474 params = []
475 for offset in range(0, len(rhs_params), max_in_list_size):
476 if offset > 0:
477 in_clause_elements.append(" OR ")
478 in_clause_elements.append("%s IN (" % lhs)
479 params.extend(lhs_params)
480 sqls = rhs[offset : offset + max_in_list_size]
481 sqls_params = rhs_params[offset : offset + max_in_list_size]
482 param_group = ", ".join(sqls)
483 in_clause_elements.append(param_group)
484 in_clause_elements.append(")")
485 params.extend(sqls_params)
486 in_clause_elements.append(")")
487 return "".join(in_clause_elements), params
490class PatternLookup(BuiltinLookup):
491 param_pattern = "%%%s%%"
492 prepare_rhs = False
494 def get_rhs_op(self, connection, rhs):
495 # Assume we are in startswith. We need to produce SQL like:
496 # col LIKE %s, ['thevalue%']
497 # For python values we can (and should) do that directly in Python,
498 # but if the value is for example reference to other column, then
499 # we need to add the % pattern match to the lookup by something like
500 # col LIKE othercol || '%%'
501 # So, for Python values we don't need any special pattern, but for
502 # SQL reference values or SQL transformations we need the correct
503 # pattern added.
504 if hasattr(self.rhs, "as_sql") or self.bilateral_transforms:
505 pattern = connection.pattern_ops[self.lookup_name].format(
506 connection.pattern_esc
507 )
508 return pattern.format(rhs)
509 else:
510 return super().get_rhs_op(connection, rhs)
512 def process_rhs(self, qn, connection):
513 rhs, params = super().process_rhs(qn, connection)
514 if self.rhs_is_direct_value() and params and not self.bilateral_transforms:
515 params[0] = self.param_pattern % connection.ops.prep_for_like_query(
516 params[0]
517 )
518 return rhs, params
521@Field.register_lookup
522class Contains(PatternLookup):
523 lookup_name = "contains"
526@Field.register_lookup
527class IContains(Contains):
528 lookup_name = "icontains"
531@Field.register_lookup
532class StartsWith(PatternLookup):
533 lookup_name = "startswith"
534 param_pattern = "%s%%"
537@Field.register_lookup
538class IStartsWith(StartsWith):
539 lookup_name = "istartswith"
542@Field.register_lookup
543class EndsWith(PatternLookup):
544 lookup_name = "endswith"
545 param_pattern = "%%%s"
548@Field.register_lookup
549class IEndsWith(EndsWith):
550 lookup_name = "iendswith"
553@Field.register_lookup
554class Range(FieldGetDbPrepValueIterableMixin, BuiltinLookup):
555 lookup_name = "range"
557 def get_rhs_op(self, connection, rhs):
558 return "BETWEEN %s AND %s" % (rhs[0], rhs[1])
561@Field.register_lookup
562class IsNull(BuiltinLookup):
563 lookup_name = "isnull"
564 prepare_rhs = False
566 def as_sql(self, compiler, connection):
567 if not isinstance(self.rhs, bool):
568 raise ValueError(
569 "The QuerySet value for an isnull lookup must be True or False."
570 )
571 sql, params = self.process_lhs(compiler, connection)
572 if self.rhs:
573 return "%s IS NULL" % sql, params
574 else:
575 return "%s IS NOT NULL" % sql, params
578@Field.register_lookup
579class Regex(BuiltinLookup):
580 lookup_name = "regex"
581 prepare_rhs = False
583 def as_sql(self, compiler, connection):
584 if self.lookup_name in connection.operators:
585 return super().as_sql(compiler, connection)
586 else:
587 lhs, lhs_params = self.process_lhs(compiler, connection)
588 rhs, rhs_params = self.process_rhs(compiler, connection)
589 sql_template = connection.ops.regex_lookup(self.lookup_name)
590 return sql_template % (lhs, rhs), lhs_params + rhs_params
593@Field.register_lookup
594class IRegex(Regex):
595 lookup_name = "iregex"
598class YearLookup(Lookup):
599 def year_lookup_bounds(self, connection, year):
600 from django.db.models.functions import ExtractIsoYear
602 iso_year = isinstance(self.lhs, ExtractIsoYear)
603 output_field = self.lhs.lhs.output_field
604 if isinstance(output_field, DateTimeField):
605 bounds = connection.ops.year_lookup_bounds_for_datetime_field(
606 year,
607 iso_year=iso_year,
608 )
609 else:
610 bounds = connection.ops.year_lookup_bounds_for_date_field(
611 year,
612 iso_year=iso_year,
613 )
614 return bounds
616 def as_sql(self, compiler, connection):
617 # Avoid the extract operation if the rhs is a direct value to allow
618 # indexes to be used.
619 if self.rhs_is_direct_value():
620 # Skip the extract part by directly using the originating field,
621 # that is self.lhs.lhs.
622 lhs_sql, params = self.process_lhs(compiler, connection, self.lhs.lhs)
623 rhs_sql, _ = self.process_rhs(compiler, connection)
624 rhs_sql = self.get_direct_rhs_sql(connection, rhs_sql)
625 start, finish = self.year_lookup_bounds(connection, self.rhs)
626 params.extend(self.get_bound_params(start, finish))
627 return "%s %s" % (lhs_sql, rhs_sql), params
628 return super().as_sql(compiler, connection)
630 def get_direct_rhs_sql(self, connection, rhs):
631 return connection.operators[self.lookup_name] % rhs
633 def get_bound_params(self, start, finish):
634 raise NotImplementedError(
635 "subclasses of YearLookup must provide a get_bound_params() method"
636 )
639class YearExact(YearLookup, Exact):
640 def get_direct_rhs_sql(self, connection, rhs):
641 return "BETWEEN %s AND %s"
643 def get_bound_params(self, start, finish):
644 return (start, finish)
647class YearGt(YearLookup, GreaterThan):
648 def get_bound_params(self, start, finish):
649 return (finish,)
652class YearGte(YearLookup, GreaterThanOrEqual):
653 def get_bound_params(self, start, finish):
654 return (start,)
657class YearLt(YearLookup, LessThan):
658 def get_bound_params(self, start, finish):
659 return (start,)
662class YearLte(YearLookup, LessThanOrEqual):
663 def get_bound_params(self, start, finish):
664 return (finish,)
667class UUIDTextMixin:
668 """
669 Strip hyphens from a value when filtering a UUIDField on backends without
670 a native datatype for UUID.
671 """
673 def process_rhs(self, qn, connection):
674 if not connection.features.has_native_uuid_field:
675 from django.db.models.functions import Replace
677 if self.rhs_is_direct_value():
678 self.rhs = Value(self.rhs)
679 self.rhs = Replace(
680 self.rhs, Value("-"), Value(""), output_field=CharField()
681 )
682 rhs, params = super().process_rhs(qn, connection)
683 return rhs, params
686@UUIDField.register_lookup
687class UUIDIExact(UUIDTextMixin, IExact):
688 pass
691@UUIDField.register_lookup
692class UUIDContains(UUIDTextMixin, Contains):
693 pass
696@UUIDField.register_lookup
697class UUIDIContains(UUIDTextMixin, IContains):
698 pass
701@UUIDField.register_lookup
702class UUIDStartsWith(UUIDTextMixin, StartsWith):
703 pass
706@UUIDField.register_lookup
707class UUIDIStartsWith(UUIDTextMixin, IStartsWith):
708 pass
711@UUIDField.register_lookup
712class UUIDEndsWith(UUIDTextMixin, EndsWith):
713 pass
716@UUIDField.register_lookup
717class UUIDIEndsWith(UUIDTextMixin, IEndsWith):
718 pass