Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/django/db/models/functions/comparison.py: 38%
117 statements
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
1"""Database functions that do comparisons or type conversions."""
2from django.db import NotSupportedError
3from django.db.models.expressions import Func, Value
4from django.db.models.fields import TextField
5from django.db.models.fields.json import JSONField
6from django.utils.regex_helper import _lazy_re_compile
9class Cast(Func):
10 """Coerce an expression to a new field type."""
12 function = "CAST"
13 template = "%(function)s(%(expressions)s AS %(db_type)s)"
15 def __init__(self, expression, output_field):
16 super().__init__(expression, output_field=output_field)
18 def as_sql(self, compiler, connection, **extra_context):
19 extra_context["db_type"] = self.output_field.cast_db_type(connection)
20 return super().as_sql(compiler, connection, **extra_context)
22 def as_sqlite(self, compiler, connection, **extra_context):
23 db_type = self.output_field.db_type(connection)
24 if db_type in {"datetime", "time"}:
25 # Use strftime as datetime/time don't keep fractional seconds.
26 template = "strftime(%%s, %(expressions)s)"
27 sql, params = super().as_sql(
28 compiler, connection, template=template, **extra_context
29 )
30 format_string = "%H:%M:%f" if db_type == "time" else "%Y-%m-%d %H:%M:%f"
31 params.insert(0, format_string)
32 return sql, params
33 elif db_type == "date":
34 template = "date(%(expressions)s)"
35 return super().as_sql(
36 compiler, connection, template=template, **extra_context
37 )
38 return self.as_sql(compiler, connection, **extra_context)
40 def as_mysql(self, compiler, connection, **extra_context):
41 template = None
42 output_type = self.output_field.get_internal_type()
43 # MySQL doesn't support explicit cast to float.
44 if output_type == "FloatField":
45 template = "(%(expressions)s + 0.0)"
46 # MariaDB doesn't support explicit cast to JSON.
47 elif output_type == "JSONField" and connection.mysql_is_mariadb:
48 template = "JSON_EXTRACT(%(expressions)s, '$')"
49 return self.as_sql(compiler, connection, template=template, **extra_context)
51 def as_postgresql(self, compiler, connection, **extra_context):
52 # CAST would be valid too, but the :: shortcut syntax is more readable.
53 # 'expressions' is wrapped in parentheses in case it's a complex
54 # expression.
55 return self.as_sql(
56 compiler,
57 connection,
58 template="(%(expressions)s)::%(db_type)s",
59 **extra_context,
60 )
62 def as_oracle(self, compiler, connection, **extra_context):
63 if self.output_field.get_internal_type() == "JSONField":
64 # Oracle doesn't support explicit cast to JSON.
65 template = "JSON_QUERY(%(expressions)s, '$')"
66 return super().as_sql(
67 compiler, connection, template=template, **extra_context
68 )
69 return self.as_sql(compiler, connection, **extra_context)
72class Coalesce(Func):
73 """Return, from left to right, the first non-null expression."""
75 function = "COALESCE"
77 def __init__(self, *expressions, **extra):
78 if len(expressions) < 2:
79 raise ValueError("Coalesce must take at least two expressions")
80 super().__init__(*expressions, **extra)
82 @property
83 def empty_result_set_value(self):
84 for expression in self.get_source_expressions():
85 result = expression.empty_result_set_value
86 if result is NotImplemented or result is not None:
87 return result
88 return None
90 def as_oracle(self, compiler, connection, **extra_context):
91 # Oracle prohibits mixing TextField (NCLOB) and CharField (NVARCHAR2),
92 # so convert all fields to NCLOB when that type is expected.
93 if self.output_field.get_internal_type() == "TextField":
94 clone = self.copy()
95 clone.set_source_expressions(
96 [
97 Func(expression, function="TO_NCLOB")
98 for expression in self.get_source_expressions()
99 ]
100 )
101 return super(Coalesce, clone).as_sql(compiler, connection, **extra_context)
102 return self.as_sql(compiler, connection, **extra_context)
105class Collate(Func):
106 function = "COLLATE"
107 template = "%(expressions)s %(function)s %(collation)s"
108 # Inspired from
109 # https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
110 collation_re = _lazy_re_compile(r"^[\w\-]+$")
112 def __init__(self, expression, collation):
113 if not (collation and self.collation_re.match(collation)):
114 raise ValueError("Invalid collation name: %r." % collation)
115 self.collation = collation
116 super().__init__(expression)
118 def as_sql(self, compiler, connection, **extra_context):
119 extra_context.setdefault("collation", connection.ops.quote_name(self.collation))
120 return super().as_sql(compiler, connection, **extra_context)
123class Greatest(Func):
124 """
125 Return the maximum expression.
127 If any expression is null the return value is database-specific:
128 On PostgreSQL, the maximum not-null expression is returned.
129 On MySQL, Oracle, and SQLite, if any expression is null, null is returned.
130 """
132 function = "GREATEST"
134 def __init__(self, *expressions, **extra):
135 if len(expressions) < 2:
136 raise ValueError("Greatest must take at least two expressions")
137 super().__init__(*expressions, **extra)
139 def as_sqlite(self, compiler, connection, **extra_context):
140 """Use the MAX function on SQLite."""
141 return super().as_sqlite(compiler, connection, function="MAX", **extra_context)
144class JSONObject(Func):
145 function = "JSON_OBJECT"
146 output_field = JSONField()
148 def __init__(self, **fields):
149 expressions = []
150 for key, value in fields.items():
151 expressions.extend((Value(key), value))
152 super().__init__(*expressions)
154 def as_sql(self, compiler, connection, **extra_context):
155 if not connection.features.has_json_object_function:
156 raise NotSupportedError(
157 "JSONObject() is not supported on this database backend."
158 )
159 return super().as_sql(compiler, connection, **extra_context)
161 def as_postgresql(self, compiler, connection, **extra_context):
162 copy = self.copy()
163 copy.set_source_expressions(
164 [
165 Cast(expression, TextField()) if index % 2 == 0 else expression
166 for index, expression in enumerate(copy.get_source_expressions())
167 ]
168 )
169 return super(JSONObject, copy).as_sql(
170 compiler,
171 connection,
172 function="JSONB_BUILD_OBJECT",
173 **extra_context,
174 )
176 def as_oracle(self, compiler, connection, **extra_context):
177 class ArgJoiner:
178 def join(self, args):
179 args = [" VALUE ".join(arg) for arg in zip(args[::2], args[1::2])]
180 return ", ".join(args)
182 return self.as_sql(
183 compiler,
184 connection,
185 arg_joiner=ArgJoiner(),
186 template="%(function)s(%(expressions)s RETURNING CLOB)",
187 **extra_context,
188 )
191class Least(Func):
192 """
193 Return the minimum expression.
195 If any expression is null the return value is database-specific:
196 On PostgreSQL, return the minimum not-null expression.
197 On MySQL, Oracle, and SQLite, if any expression is null, return null.
198 """
200 function = "LEAST"
202 def __init__(self, *expressions, **extra):
203 if len(expressions) < 2:
204 raise ValueError("Least must take at least two expressions")
205 super().__init__(*expressions, **extra)
207 def as_sqlite(self, compiler, connection, **extra_context):
208 """Use the MIN function on SQLite."""
209 return super().as_sqlite(compiler, connection, function="MIN", **extra_context)
212class NullIf(Func):
213 function = "NULLIF"
214 arity = 2
216 def as_oracle(self, compiler, connection, **extra_context):
217 expression1 = self.get_source_expressions()[0]
218 if isinstance(expression1, Value) and expression1.value is None:
219 raise ValueError("Oracle does not allow Value(None) for expression1.")
220 return super().as_sql(compiler, connection, **extra_context)