Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/django/db/models/functions/comparison.py: 38%

117 statements  

« prev     ^ index     » next       coverage.py v7.0.5, created at 2023-01-17 06:13 +0000

1"""Database functions that do comparisons or type conversions.""" 

2from django.db import NotSupportedError 

3from django.db.models.expressions import Func, Value 

4from django.db.models.fields import TextField 

5from django.db.models.fields.json import JSONField 

6from django.utils.regex_helper import _lazy_re_compile 

7 

8 

9class Cast(Func): 

10 """Coerce an expression to a new field type.""" 

11 

12 function = "CAST" 

13 template = "%(function)s(%(expressions)s AS %(db_type)s)" 

14 

15 def __init__(self, expression, output_field): 

16 super().__init__(expression, output_field=output_field) 

17 

18 def as_sql(self, compiler, connection, **extra_context): 

19 extra_context["db_type"] = self.output_field.cast_db_type(connection) 

20 return super().as_sql(compiler, connection, **extra_context) 

21 

22 def as_sqlite(self, compiler, connection, **extra_context): 

23 db_type = self.output_field.db_type(connection) 

24 if db_type in {"datetime", "time"}: 

25 # Use strftime as datetime/time don't keep fractional seconds. 

26 template = "strftime(%%s, %(expressions)s)" 

27 sql, params = super().as_sql( 

28 compiler, connection, template=template, **extra_context 

29 ) 

30 format_string = "%H:%M:%f" if db_type == "time" else "%Y-%m-%d %H:%M:%f" 

31 params.insert(0, format_string) 

32 return sql, params 

33 elif db_type == "date": 

34 template = "date(%(expressions)s)" 

35 return super().as_sql( 

36 compiler, connection, template=template, **extra_context 

37 ) 

38 return self.as_sql(compiler, connection, **extra_context) 

39 

40 def as_mysql(self, compiler, connection, **extra_context): 

41 template = None 

42 output_type = self.output_field.get_internal_type() 

43 # MySQL doesn't support explicit cast to float. 

44 if output_type == "FloatField": 

45 template = "(%(expressions)s + 0.0)" 

46 # MariaDB doesn't support explicit cast to JSON. 

47 elif output_type == "JSONField" and connection.mysql_is_mariadb: 

48 template = "JSON_EXTRACT(%(expressions)s, '$')" 

49 return self.as_sql(compiler, connection, template=template, **extra_context) 

50 

51 def as_postgresql(self, compiler, connection, **extra_context): 

52 # CAST would be valid too, but the :: shortcut syntax is more readable. 

53 # 'expressions' is wrapped in parentheses in case it's a complex 

54 # expression. 

55 return self.as_sql( 

56 compiler, 

57 connection, 

58 template="(%(expressions)s)::%(db_type)s", 

59 **extra_context, 

60 ) 

61 

62 def as_oracle(self, compiler, connection, **extra_context): 

63 if self.output_field.get_internal_type() == "JSONField": 

64 # Oracle doesn't support explicit cast to JSON. 

65 template = "JSON_QUERY(%(expressions)s, '$')" 

66 return super().as_sql( 

67 compiler, connection, template=template, **extra_context 

68 ) 

69 return self.as_sql(compiler, connection, **extra_context) 

70 

71 

72class Coalesce(Func): 

73 """Return, from left to right, the first non-null expression.""" 

74 

75 function = "COALESCE" 

76 

77 def __init__(self, *expressions, **extra): 

78 if len(expressions) < 2: 

79 raise ValueError("Coalesce must take at least two expressions") 

80 super().__init__(*expressions, **extra) 

81 

82 @property 

83 def empty_result_set_value(self): 

84 for expression in self.get_source_expressions(): 

85 result = expression.empty_result_set_value 

86 if result is NotImplemented or result is not None: 

87 return result 

88 return None 

89 

90 def as_oracle(self, compiler, connection, **extra_context): 

91 # Oracle prohibits mixing TextField (NCLOB) and CharField (NVARCHAR2), 

92 # so convert all fields to NCLOB when that type is expected. 

93 if self.output_field.get_internal_type() == "TextField": 

94 clone = self.copy() 

95 clone.set_source_expressions( 

96 [ 

97 Func(expression, function="TO_NCLOB") 

98 for expression in self.get_source_expressions() 

99 ] 

100 ) 

101 return super(Coalesce, clone).as_sql(compiler, connection, **extra_context) 

102 return self.as_sql(compiler, connection, **extra_context) 

103 

104 

105class Collate(Func): 

106 function = "COLLATE" 

107 template = "%(expressions)s %(function)s %(collation)s" 

108 # Inspired from 

109 # https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS 

110 collation_re = _lazy_re_compile(r"^[\w\-]+$") 

111 

112 def __init__(self, expression, collation): 

113 if not (collation and self.collation_re.match(collation)): 

114 raise ValueError("Invalid collation name: %r." % collation) 

115 self.collation = collation 

116 super().__init__(expression) 

117 

118 def as_sql(self, compiler, connection, **extra_context): 

119 extra_context.setdefault("collation", connection.ops.quote_name(self.collation)) 

120 return super().as_sql(compiler, connection, **extra_context) 

121 

122 

123class Greatest(Func): 

124 """ 

125 Return the maximum expression. 

126 

127 If any expression is null the return value is database-specific: 

128 On PostgreSQL, the maximum not-null expression is returned. 

129 On MySQL, Oracle, and SQLite, if any expression is null, null is returned. 

130 """ 

131 

132 function = "GREATEST" 

133 

134 def __init__(self, *expressions, **extra): 

135 if len(expressions) < 2: 

136 raise ValueError("Greatest must take at least two expressions") 

137 super().__init__(*expressions, **extra) 

138 

139 def as_sqlite(self, compiler, connection, **extra_context): 

140 """Use the MAX function on SQLite.""" 

141 return super().as_sqlite(compiler, connection, function="MAX", **extra_context) 

142 

143 

144class JSONObject(Func): 

145 function = "JSON_OBJECT" 

146 output_field = JSONField() 

147 

148 def __init__(self, **fields): 

149 expressions = [] 

150 for key, value in fields.items(): 

151 expressions.extend((Value(key), value)) 

152 super().__init__(*expressions) 

153 

154 def as_sql(self, compiler, connection, **extra_context): 

155 if not connection.features.has_json_object_function: 

156 raise NotSupportedError( 

157 "JSONObject() is not supported on this database backend." 

158 ) 

159 return super().as_sql(compiler, connection, **extra_context) 

160 

161 def as_postgresql(self, compiler, connection, **extra_context): 

162 copy = self.copy() 

163 copy.set_source_expressions( 

164 [ 

165 Cast(expression, TextField()) if index % 2 == 0 else expression 

166 for index, expression in enumerate(copy.get_source_expressions()) 

167 ] 

168 ) 

169 return super(JSONObject, copy).as_sql( 

170 compiler, 

171 connection, 

172 function="JSONB_BUILD_OBJECT", 

173 **extra_context, 

174 ) 

175 

176 def as_oracle(self, compiler, connection, **extra_context): 

177 class ArgJoiner: 

178 def join(self, args): 

179 args = [" VALUE ".join(arg) for arg in zip(args[::2], args[1::2])] 

180 return ", ".join(args) 

181 

182 return self.as_sql( 

183 compiler, 

184 connection, 

185 arg_joiner=ArgJoiner(), 

186 template="%(function)s(%(expressions)s RETURNING CLOB)", 

187 **extra_context, 

188 ) 

189 

190 

191class Least(Func): 

192 """ 

193 Return the minimum expression. 

194 

195 If any expression is null the return value is database-specific: 

196 On PostgreSQL, return the minimum not-null expression. 

197 On MySQL, Oracle, and SQLite, if any expression is null, return null. 

198 """ 

199 

200 function = "LEAST" 

201 

202 def __init__(self, *expressions, **extra): 

203 if len(expressions) < 2: 

204 raise ValueError("Least must take at least two expressions") 

205 super().__init__(*expressions, **extra) 

206 

207 def as_sqlite(self, compiler, connection, **extra_context): 

208 """Use the MIN function on SQLite.""" 

209 return super().as_sqlite(compiler, connection, function="MIN", **extra_context) 

210 

211 

212class NullIf(Func): 

213 function = "NULLIF" 

214 arity = 2 

215 

216 def as_oracle(self, compiler, connection, **extra_context): 

217 expression1 = self.get_source_expressions()[0] 

218 if isinstance(expression1, Value) and expression1.value is None: 

219 raise ValueError("Oracle does not allow Value(None) for expression1.") 

220 return super().as_sql(compiler, connection, **extra_context)