Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/django/db/models/lookups.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

475 statements  

1import itertools 

2import math 

3 

4from django.core.exceptions import EmptyResultSet, FullResultSet 

5from django.db.models.expressions import Case, ColPairs, Expression, Func, Value, When 

6from django.db.models.fields import ( 

7 BooleanField, 

8 CharField, 

9 DateTimeField, 

10 Field, 

11 IntegerField, 

12 UUIDField, 

13) 

14from django.db.models.query_utils import RegisterLookupMixin 

15from django.utils.datastructures import OrderedSet 

16from django.utils.functional import cached_property 

17from django.utils.hashable import make_hashable 

18 

19 

20class Lookup(Expression): 

21 lookup_name = None 

22 prepare_rhs = True 

23 can_use_none_as_rhs = False 

24 

25 def __init__(self, lhs, rhs): 

26 self.lhs, self.rhs = lhs, rhs 

27 self.rhs = self.get_prep_lookup() 

28 self.lhs = self.get_prep_lhs() 

29 if hasattr(self.lhs, "get_bilateral_transforms"): 

30 bilateral_transforms = self.lhs.get_bilateral_transforms() 

31 else: 

32 bilateral_transforms = [] 

33 if bilateral_transforms: 

34 # Warn the user as soon as possible if they are trying to apply 

35 # a bilateral transformation on a nested QuerySet: that won't work. 

36 from django.db.models.sql.query import Query # avoid circular import 

37 

38 if isinstance(rhs, Query): 

39 raise NotImplementedError( 

40 "Bilateral transformations on nested querysets are not implemented." 

41 ) 

42 self.bilateral_transforms = bilateral_transforms 

43 

44 def apply_bilateral_transforms(self, value): 

45 for transform in self.bilateral_transforms: 

46 value = transform(value) 

47 return value 

48 

49 def __repr__(self): 

50 return f"{self.__class__.__name__}({self.lhs!r}, {self.rhs!r})" 

51 

52 def batch_process_rhs(self, compiler, connection, rhs=None): 

53 if rhs is None: 

54 rhs = self.rhs 

55 if self.bilateral_transforms: 

56 sqls, sqls_params = [], [] 

57 for p in rhs: 

58 value = Value(p, output_field=self.lhs.output_field) 

59 value = self.apply_bilateral_transforms(value) 

60 value = value.resolve_expression(compiler.query) 

61 sql, sql_params = compiler.compile(value) 

62 sqls.append(sql) 

63 sqls_params.extend(sql_params) 

64 else: 

65 _, params = self.get_db_prep_lookup(rhs, connection) 

66 sqls, sqls_params = ["%s"] * len(params), params 

67 return sqls, sqls_params 

68 

69 def get_source_expressions(self): 

70 if self.rhs_is_direct_value(): 

71 return [self.lhs] 

72 return [self.lhs, self.rhs] 

73 

74 def set_source_expressions(self, new_exprs): 

75 if len(new_exprs) == 1: 

76 self.lhs = new_exprs[0] 

77 else: 

78 self.lhs, self.rhs = new_exprs 

79 

80 def get_prep_lookup(self): 

81 if not self.prepare_rhs or hasattr(self.rhs, "resolve_expression"): 

82 return self.rhs 

83 if hasattr(self.lhs, "output_field"): 

84 if hasattr(self.lhs.output_field, "get_prep_value"): 

85 return self.lhs.output_field.get_prep_value(self.rhs) 

86 elif self.rhs_is_direct_value(): 

87 return Value(self.rhs) 

88 return self.rhs 

89 

90 def get_prep_lhs(self): 

91 if hasattr(self.lhs, "resolve_expression"): 

92 return self.lhs 

93 return Value(self.lhs) 

94 

95 def get_db_prep_lookup(self, value, connection): 

96 return ("%s", [value]) 

97 

98 def process_lhs(self, compiler, connection, lhs=None): 

99 lhs = lhs or self.lhs 

100 if hasattr(lhs, "resolve_expression"): 

101 lhs = lhs.resolve_expression(compiler.query) 

102 sql, params = compiler.compile(lhs) 

103 if isinstance(lhs, Lookup): 

104 # Wrapped in parentheses to respect operator precedence. 

105 sql = f"({sql})" 

106 return sql, params 

107 

108 def process_rhs(self, compiler, connection): 

109 value = self.rhs 

110 if self.bilateral_transforms: 

111 if self.rhs_is_direct_value(): 

112 # Do not call get_db_prep_lookup here as the value will be 

113 # transformed before being used for lookup 

114 value = Value(value, output_field=self.lhs.output_field) 

115 value = self.apply_bilateral_transforms(value) 

116 value = value.resolve_expression(compiler.query) 

117 if hasattr(value, "as_sql"): 

118 sql, params = compiler.compile(value) 

119 if isinstance(value, ColPairs): 

120 raise ValueError( 

121 "CompositePrimaryKey cannot be used as a lookup value." 

122 ) 

123 # Ensure expression is wrapped in parentheses to respect operator 

124 # precedence but avoid double wrapping as it can be misinterpreted 

125 # on some backends (e.g. subqueries on SQLite). 

126 if not isinstance(value, Value) and sql and sql[0] != "(": 

127 sql = "(%s)" % sql 

128 return sql, params 

129 else: 

130 return self.get_db_prep_lookup(value, connection) 

131 

132 def rhs_is_direct_value(self): 

133 return not hasattr(self.rhs, "as_sql") 

134 

135 def get_group_by_cols(self): 

136 cols = [] 

137 for source in self.get_source_expressions(): 

138 cols.extend(source.get_group_by_cols()) 

139 return cols 

140 

141 def as_oracle(self, compiler, connection): 

142 # Oracle doesn't allow EXISTS() and filters to be compared to another 

143 # expression unless they're wrapped in a CASE WHEN. 

144 wrapped = False 

145 exprs = [] 

146 for expr in (self.lhs, self.rhs): 

147 if connection.ops.conditional_expression_supported_in_where_clause(expr): 

148 expr = Case(When(expr, then=True), default=False) 

149 wrapped = True 

150 exprs.append(expr) 

151 lookup = type(self)(*exprs) if wrapped else self 

152 return lookup.as_sql(compiler, connection) 

153 

154 @cached_property 

155 def output_field(self): 

156 return BooleanField() 

157 

158 @property 

159 def identity(self): 

160 return self.__class__, self.lhs, self.rhs 

161 

162 def __eq__(self, other): 

163 if not isinstance(other, Lookup): 

164 return NotImplemented 

165 return self.identity == other.identity 

166 

167 def __hash__(self): 

168 return hash(make_hashable(self.identity)) 

169 

170 def resolve_expression( 

171 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False 

172 ): 

173 c = self.copy() 

174 c.is_summary = summarize 

175 c.lhs = self.lhs.resolve_expression( 

176 query, allow_joins, reuse, summarize, for_save 

177 ) 

178 if hasattr(self.rhs, "resolve_expression"): 

179 c.rhs = self.rhs.resolve_expression( 

180 query, allow_joins, reuse, summarize, for_save 

181 ) 

182 return c 

183 

184 def select_format(self, compiler, sql, params): 

185 # Wrap filters with a CASE WHEN expression if a database backend 

186 # (e.g. Oracle) doesn't support boolean expression in SELECT or GROUP 

187 # BY list. 

188 if not compiler.connection.features.supports_boolean_expr_in_select_clause: 

189 sql = f"CASE WHEN {sql} THEN 1 ELSE 0 END" 

190 return sql, params 

191 

192 @cached_property 

193 def allowed_default(self): 

194 return self.lhs.allowed_default and self.rhs.allowed_default 

195 

196 

197class Transform(RegisterLookupMixin, Func): 

198 """ 

199 RegisterLookupMixin() is first so that get_lookup() and get_transform() 

200 first examine self and then check output_field. 

201 """ 

202 

203 bilateral = False 

204 arity = 1 

205 

206 @property 

207 def lhs(self): 

208 return self.get_source_expressions()[0] 

209 

210 def get_bilateral_transforms(self): 

211 if hasattr(self.lhs, "get_bilateral_transforms"): 

212 bilateral_transforms = self.lhs.get_bilateral_transforms() 

213 else: 

214 bilateral_transforms = [] 

215 if self.bilateral: 

216 bilateral_transforms.append(self.__class__) 

217 return bilateral_transforms 

218 

219 

220class BuiltinLookup(Lookup): 

221 def process_lhs(self, compiler, connection, lhs=None): 

222 lhs_sql, params = super().process_lhs(compiler, connection, lhs) 

223 field_internal_type = self.lhs.output_field.get_internal_type() 

224 lhs_sql = ( 

225 connection.ops.lookup_cast(self.lookup_name, field_internal_type) % lhs_sql 

226 ) 

227 return lhs_sql, list(params) 

228 

229 def as_sql(self, compiler, connection): 

230 lhs_sql, params = self.process_lhs(compiler, connection) 

231 rhs_sql, rhs_params = self.process_rhs(compiler, connection) 

232 params.extend(rhs_params) 

233 rhs_sql = self.get_rhs_op(connection, rhs_sql) 

234 return "%s %s" % (lhs_sql, rhs_sql), params 

235 

236 def get_rhs_op(self, connection, rhs): 

237 return connection.operators[self.lookup_name] % rhs 

238 

239 

240class FieldGetDbPrepValueMixin: 

241 """ 

242 Some lookups require Field.get_db_prep_value() to be called on their 

243 inputs. 

244 """ 

245 

246 get_db_prep_lookup_value_is_iterable = False 

247 

248 def get_db_prep_lookup(self, value, connection): 

249 # For relational fields, use the 'target_field' attribute of the 

250 # output_field. 

251 field = getattr(self.lhs.output_field, "target_field", None) 

252 get_db_prep_value = ( 

253 getattr(field, "get_db_prep_value", None) 

254 or self.lhs.output_field.get_db_prep_value 

255 ) 

256 if not self.get_db_prep_lookup_value_is_iterable: 

257 value = [value] 

258 return ( 

259 "%s", 

260 [ 

261 ( 

262 v 

263 if hasattr(v, "as_sql") 

264 else get_db_prep_value(v, connection, prepared=True) 

265 ) 

266 for v in value 

267 ], 

268 ) 

269 

270 

271class FieldGetDbPrepValueIterableMixin(FieldGetDbPrepValueMixin): 

272 """ 

273 Some lookups require Field.get_db_prep_value() to be called on each value 

274 in an iterable. 

275 """ 

276 

277 get_db_prep_lookup_value_is_iterable = True 

278 

279 def get_prep_lookup(self): 

280 if hasattr(self.rhs, "resolve_expression"): 

281 return self.rhs 

282 prepared_values = [] 

283 for rhs_value in self.rhs: 

284 if hasattr(rhs_value, "resolve_expression"): 

285 # An expression will be handled by the database but can coexist 

286 # alongside real values. 

287 pass 

288 elif ( 

289 self.prepare_rhs 

290 and hasattr(self.lhs, "output_field") 

291 and hasattr(self.lhs.output_field, "get_prep_value") 

292 ): 

293 rhs_value = self.lhs.output_field.get_prep_value(rhs_value) 

294 prepared_values.append(rhs_value) 

295 return prepared_values 

296 

297 def process_rhs(self, compiler, connection): 

298 if self.rhs_is_direct_value(): 

299 # rhs should be an iterable of values. Use batch_process_rhs() 

300 # to prepare/transform those values. 

301 return self.batch_process_rhs(compiler, connection) 

302 else: 

303 return super().process_rhs(compiler, connection) 

304 

305 def resolve_expression_parameter(self, compiler, connection, sql, param): 

306 params = [param] 

307 if hasattr(param, "resolve_expression"): 

308 param = param.resolve_expression(compiler.query) 

309 if hasattr(param, "as_sql"): 

310 sql, params = compiler.compile(param) 

311 return sql, params 

312 

313 def batch_process_rhs(self, compiler, connection, rhs=None): 

314 pre_processed = super().batch_process_rhs(compiler, connection, rhs) 

315 # The params list may contain expressions which compile to a 

316 # sql/param pair. Zip them to get sql and param pairs that refer to the 

317 # same argument and attempt to replace them with the result of 

318 # compiling the param step. 

319 sql, params = zip( 

320 *( 

321 self.resolve_expression_parameter(compiler, connection, sql, param) 

322 for sql, param in zip(*pre_processed) 

323 ) 

324 ) 

325 params = itertools.chain.from_iterable(params) 

326 return sql, tuple(params) 

327 

328 

329class PostgresOperatorLookup(Lookup): 

330 """Lookup defined by operators on PostgreSQL.""" 

331 

332 postgres_operator = None 

333 

334 def as_postgresql(self, compiler, connection): 

335 lhs, lhs_params = self.process_lhs(compiler, connection) 

336 rhs, rhs_params = self.process_rhs(compiler, connection) 

337 params = tuple(lhs_params) + tuple(rhs_params) 

338 return "%s %s %s" % (lhs, self.postgres_operator, rhs), params 

339 

340 

341@Field.register_lookup 

342class Exact(FieldGetDbPrepValueMixin, BuiltinLookup): 

343 lookup_name = "exact" 

344 

345 def get_prep_lookup(self): 

346 from django.db.models.sql.query import Query # avoid circular import 

347 

348 if isinstance(self.rhs, Query): 

349 if self.rhs.has_limit_one(): 

350 if not self.rhs.has_select_fields: 

351 self.rhs.clear_select_clause() 

352 self.rhs.add_fields(["pk"]) 

353 else: 

354 raise ValueError( 

355 "The QuerySet value for an exact lookup must be limited to " 

356 "one result using slicing." 

357 ) 

358 return super().get_prep_lookup() 

359 

360 def as_sql(self, compiler, connection): 

361 # Avoid comparison against direct rhs if lhs is a boolean value. That 

362 # turns "boolfield__exact=True" into "WHERE boolean_field" instead of 

363 # "WHERE boolean_field = True" when allowed. 

364 if ( 

365 isinstance(self.rhs, bool) 

366 and getattr(self.lhs, "conditional", False) 

367 and connection.ops.conditional_expression_supported_in_where_clause( 

368 self.lhs 

369 ) 

370 ): 

371 lhs_sql, params = self.process_lhs(compiler, connection) 

372 template = "%s" if self.rhs else "NOT %s" 

373 return template % lhs_sql, params 

374 return super().as_sql(compiler, connection) 

375 

376 

377@Field.register_lookup 

378class IExact(BuiltinLookup): 

379 lookup_name = "iexact" 

380 prepare_rhs = False 

381 

382 def process_rhs(self, qn, connection): 

383 rhs, params = super().process_rhs(qn, connection) 

384 if params: 

385 params[0] = connection.ops.prep_for_iexact_query(params[0]) 

386 return rhs, params 

387 

388 

389@Field.register_lookup 

390class GreaterThan(FieldGetDbPrepValueMixin, BuiltinLookup): 

391 lookup_name = "gt" 

392 

393 

394@Field.register_lookup 

395class GreaterThanOrEqual(FieldGetDbPrepValueMixin, BuiltinLookup): 

396 lookup_name = "gte" 

397 

398 

399@Field.register_lookup 

400class LessThan(FieldGetDbPrepValueMixin, BuiltinLookup): 

401 lookup_name = "lt" 

402 

403 

404@Field.register_lookup 

405class LessThanOrEqual(FieldGetDbPrepValueMixin, BuiltinLookup): 

406 lookup_name = "lte" 

407 

408 

409class IntegerFieldOverflow: 

410 underflow_exception = EmptyResultSet 

411 overflow_exception = EmptyResultSet 

412 

413 def process_rhs(self, compiler, connection): 

414 rhs = self.rhs 

415 if isinstance(rhs, int): 

416 field_internal_type = self.lhs.output_field.get_internal_type() 

417 min_value, max_value = connection.ops.integer_field_range( 

418 field_internal_type 

419 ) 

420 if min_value is not None and rhs < min_value: 

421 raise self.underflow_exception 

422 if max_value is not None and rhs > max_value: 

423 raise self.overflow_exception 

424 return super().process_rhs(compiler, connection) 

425 

426 

427class IntegerFieldFloatRounding: 

428 """ 

429 Allow floats to work as query values for IntegerField. Without this, the 

430 decimal portion of the float would always be discarded. 

431 """ 

432 

433 def get_prep_lookup(self): 

434 if isinstance(self.rhs, float): 

435 self.rhs = math.ceil(self.rhs) 

436 return super().get_prep_lookup() 

437 

438 

439@IntegerField.register_lookup 

440class IntegerFieldExact(IntegerFieldOverflow, Exact): 

441 pass 

442 

443 

444@IntegerField.register_lookup 

445class IntegerGreaterThan(IntegerFieldOverflow, GreaterThan): 

446 underflow_exception = FullResultSet 

447 

448 

449@IntegerField.register_lookup 

450class IntegerGreaterThanOrEqual( 

451 IntegerFieldOverflow, IntegerFieldFloatRounding, GreaterThanOrEqual 

452): 

453 underflow_exception = FullResultSet 

454 

455 

456@IntegerField.register_lookup 

457class IntegerLessThan(IntegerFieldOverflow, IntegerFieldFloatRounding, LessThan): 

458 overflow_exception = FullResultSet 

459 

460 

461@IntegerField.register_lookup 

462class IntegerLessThanOrEqual(IntegerFieldOverflow, LessThanOrEqual): 

463 overflow_exception = FullResultSet 

464 

465 

466@Field.register_lookup 

467class In(FieldGetDbPrepValueIterableMixin, BuiltinLookup): 

468 lookup_name = "in" 

469 

470 def get_refs(self): 

471 refs = super().get_refs() 

472 if self.rhs_is_direct_value(): 

473 for rhs in self.rhs: 

474 if get_rhs_refs := getattr(rhs, "get_refs", None): 

475 refs |= get_rhs_refs() 

476 return refs 

477 

478 def get_prep_lookup(self): 

479 from django.db.models.sql.query import Query # avoid circular import 

480 

481 if isinstance(self.rhs, Query): 

482 self.rhs.clear_ordering(clear_default=True) 

483 if not self.rhs.has_select_fields: 

484 self.rhs.clear_select_clause() 

485 self.rhs.add_fields(["pk"]) 

486 return super().get_prep_lookup() 

487 

488 def process_rhs(self, compiler, connection): 

489 db_rhs = getattr(self.rhs, "_db", None) 

490 if db_rhs is not None and db_rhs != connection.alias: 

491 raise ValueError( 

492 "Subqueries aren't allowed across different databases. Force " 

493 "the inner query to be evaluated using `list(inner_query)`." 

494 ) 

495 

496 if self.rhs_is_direct_value(): 

497 # Remove None from the list as NULL is never equal to anything. 

498 try: 

499 rhs = OrderedSet(self.rhs) 

500 rhs.discard(None) 

501 except TypeError: # Unhashable items in self.rhs 

502 rhs = [r for r in self.rhs if r is not None] 

503 

504 if not rhs: 

505 raise EmptyResultSet 

506 

507 # rhs should be an iterable; use batch_process_rhs() to 

508 # prepare/transform those values. 

509 sqls, sqls_params = self.batch_process_rhs(compiler, connection, rhs) 

510 placeholder = "(" + ", ".join(sqls) + ")" 

511 return (placeholder, sqls_params) 

512 return super().process_rhs(compiler, connection) 

513 

514 def get_rhs_op(self, connection, rhs): 

515 return "IN %s" % rhs 

516 

517 def as_sql(self, compiler, connection): 

518 max_in_list_size = connection.ops.max_in_list_size() 

519 if ( 

520 self.rhs_is_direct_value() 

521 and max_in_list_size 

522 and len(self.rhs) > max_in_list_size 

523 ): 

524 return self.split_parameter_list_as_sql(compiler, connection) 

525 return super().as_sql(compiler, connection) 

526 

527 def split_parameter_list_as_sql(self, compiler, connection): 

528 # This is a special case for databases which limit the number of 

529 # elements which can appear in an 'IN' clause. 

530 max_in_list_size = connection.ops.max_in_list_size() 

531 lhs, lhs_params = self.process_lhs(compiler, connection) 

532 rhs, rhs_params = self.batch_process_rhs(compiler, connection) 

533 in_clause_elements = ["("] 

534 params = [] 

535 for offset in range(0, len(rhs_params), max_in_list_size): 

536 if offset > 0: 

537 in_clause_elements.append(" OR ") 

538 in_clause_elements.append("%s IN (" % lhs) 

539 params.extend(lhs_params) 

540 sqls = rhs[offset : offset + max_in_list_size] 

541 sqls_params = rhs_params[offset : offset + max_in_list_size] 

542 param_group = ", ".join(sqls) 

543 in_clause_elements.append(param_group) 

544 in_clause_elements.append(")") 

545 params.extend(sqls_params) 

546 in_clause_elements.append(")") 

547 return "".join(in_clause_elements), params 

548 

549 

550class PatternLookup(BuiltinLookup): 

551 param_pattern = "%%%s%%" 

552 prepare_rhs = False 

553 

554 def get_rhs_op(self, connection, rhs): 

555 # Assume we are in startswith. We need to produce SQL like: 

556 # col LIKE %s, ['thevalue%'] 

557 # For python values we can (and should) do that directly in Python, 

558 # but if the value is for example reference to other column, then 

559 # we need to add the % pattern match to the lookup by something like 

560 # col LIKE othercol || '%%' 

561 # So, for Python values we don't need any special pattern, but for 

562 # SQL reference values or SQL transformations we need the correct 

563 # pattern added. 

564 if hasattr(self.rhs, "as_sql") or self.bilateral_transforms: 

565 pattern = connection.pattern_ops[self.lookup_name].format( 

566 connection.pattern_esc 

567 ) 

568 return pattern.format(rhs) 

569 else: 

570 return super().get_rhs_op(connection, rhs) 

571 

572 def process_rhs(self, qn, connection): 

573 rhs, params = super().process_rhs(qn, connection) 

574 if self.rhs_is_direct_value() and params and not self.bilateral_transforms: 

575 params[0] = self.param_pattern % connection.ops.prep_for_like_query( 

576 params[0] 

577 ) 

578 return rhs, params 

579 

580 

581@Field.register_lookup 

582class Contains(PatternLookup): 

583 lookup_name = "contains" 

584 

585 

586@Field.register_lookup 

587class IContains(Contains): 

588 lookup_name = "icontains" 

589 

590 

591@Field.register_lookup 

592class StartsWith(PatternLookup): 

593 lookup_name = "startswith" 

594 param_pattern = "%s%%" 

595 

596 

597@Field.register_lookup 

598class IStartsWith(StartsWith): 

599 lookup_name = "istartswith" 

600 

601 

602@Field.register_lookup 

603class EndsWith(PatternLookup): 

604 lookup_name = "endswith" 

605 param_pattern = "%%%s" 

606 

607 

608@Field.register_lookup 

609class IEndsWith(EndsWith): 

610 lookup_name = "iendswith" 

611 

612 

613@Field.register_lookup 

614class Range(FieldGetDbPrepValueIterableMixin, BuiltinLookup): 

615 lookup_name = "range" 

616 

617 def get_rhs_op(self, connection, rhs): 

618 return "BETWEEN %s AND %s" % (rhs[0], rhs[1]) 

619 

620 

621@Field.register_lookup 

622class IsNull(BuiltinLookup): 

623 lookup_name = "isnull" 

624 prepare_rhs = False 

625 

626 def as_sql(self, compiler, connection): 

627 if not isinstance(self.rhs, bool): 

628 raise ValueError( 

629 "The QuerySet value for an isnull lookup must be True or False." 

630 ) 

631 if isinstance(self.lhs, Value): 

632 if self.lhs.value is None or ( 

633 self.lhs.value == "" 

634 and connection.features.interprets_empty_strings_as_nulls 

635 ): 

636 result_exception = FullResultSet if self.rhs else EmptyResultSet 

637 else: 

638 result_exception = EmptyResultSet if self.rhs else FullResultSet 

639 raise result_exception 

640 sql, params = self.process_lhs(compiler, connection) 

641 if self.rhs: 

642 return "%s IS NULL" % sql, params 

643 else: 

644 return "%s IS NOT NULL" % sql, params 

645 

646 

647@Field.register_lookup 

648class Regex(BuiltinLookup): 

649 lookup_name = "regex" 

650 prepare_rhs = False 

651 

652 def as_sql(self, compiler, connection): 

653 if self.lookup_name in connection.operators: 

654 return super().as_sql(compiler, connection) 

655 else: 

656 lhs, lhs_params = self.process_lhs(compiler, connection) 

657 rhs, rhs_params = self.process_rhs(compiler, connection) 

658 sql_template = connection.ops.regex_lookup(self.lookup_name) 

659 return sql_template % (lhs, rhs), lhs_params + rhs_params 

660 

661 

662@Field.register_lookup 

663class IRegex(Regex): 

664 lookup_name = "iregex" 

665 

666 

667class YearLookup(Lookup): 

668 def year_lookup_bounds(self, connection, year): 

669 from django.db.models.functions import ExtractIsoYear 

670 

671 iso_year = isinstance(self.lhs, ExtractIsoYear) 

672 output_field = self.lhs.lhs.output_field 

673 if isinstance(output_field, DateTimeField): 

674 bounds = connection.ops.year_lookup_bounds_for_datetime_field( 

675 year, 

676 iso_year=iso_year, 

677 ) 

678 else: 

679 bounds = connection.ops.year_lookup_bounds_for_date_field( 

680 year, 

681 iso_year=iso_year, 

682 ) 

683 return bounds 

684 

685 def as_sql(self, compiler, connection): 

686 # Avoid the extract operation if the rhs is a direct value to allow 

687 # indexes to be used. 

688 if self.rhs_is_direct_value(): 

689 # Skip the extract part by directly using the originating field, 

690 # that is self.lhs.lhs. 

691 lhs_sql, params = self.process_lhs(compiler, connection, self.lhs.lhs) 

692 rhs_sql, _ = self.process_rhs(compiler, connection) 

693 rhs_sql = self.get_direct_rhs_sql(connection, rhs_sql) 

694 start, finish = self.year_lookup_bounds(connection, self.rhs) 

695 params.extend(self.get_bound_params(start, finish)) 

696 return "%s %s" % (lhs_sql, rhs_sql), params 

697 return super().as_sql(compiler, connection) 

698 

699 def get_direct_rhs_sql(self, connection, rhs): 

700 return connection.operators[self.lookup_name] % rhs 

701 

702 def get_bound_params(self, start, finish): 

703 raise NotImplementedError( 

704 "subclasses of YearLookup must provide a get_bound_params() method" 

705 ) 

706 

707 

708class YearExact(YearLookup, Exact): 

709 def get_direct_rhs_sql(self, connection, rhs): 

710 return "BETWEEN %s AND %s" 

711 

712 def get_bound_params(self, start, finish): 

713 return (start, finish) 

714 

715 

716class YearGt(YearLookup, GreaterThan): 

717 def get_bound_params(self, start, finish): 

718 return (finish,) 

719 

720 

721class YearGte(YearLookup, GreaterThanOrEqual): 

722 def get_bound_params(self, start, finish): 

723 return (start,) 

724 

725 

726class YearLt(YearLookup, LessThan): 

727 def get_bound_params(self, start, finish): 

728 return (start,) 

729 

730 

731class YearLte(YearLookup, LessThanOrEqual): 

732 def get_bound_params(self, start, finish): 

733 return (finish,) 

734 

735 

736class UUIDTextMixin: 

737 """ 

738 Strip hyphens from a value when filtering a UUIDField on backends without 

739 a native datatype for UUID. 

740 """ 

741 

742 def process_rhs(self, qn, connection): 

743 if not connection.features.has_native_uuid_field: 

744 from django.db.models.functions import Replace 

745 

746 if self.rhs_is_direct_value(): 

747 self.rhs = Value(self.rhs) 

748 self.rhs = Replace( 

749 self.rhs, Value("-"), Value(""), output_field=CharField() 

750 ) 

751 rhs, params = super().process_rhs(qn, connection) 

752 return rhs, params 

753 

754 

755@UUIDField.register_lookup 

756class UUIDIExact(UUIDTextMixin, IExact): 

757 pass 

758 

759 

760@UUIDField.register_lookup 

761class UUIDContains(UUIDTextMixin, Contains): 

762 pass 

763 

764 

765@UUIDField.register_lookup 

766class UUIDIContains(UUIDTextMixin, IContains): 

767 pass 

768 

769 

770@UUIDField.register_lookup 

771class UUIDStartsWith(UUIDTextMixin, StartsWith): 

772 pass 

773 

774 

775@UUIDField.register_lookup 

776class UUIDIStartsWith(UUIDTextMixin, IStartsWith): 

777 pass 

778 

779 

780@UUIDField.register_lookup 

781class UUIDEndsWith(UUIDTextMixin, EndsWith): 

782 pass 

783 

784 

785@UUIDField.register_lookup 

786class UUIDIEndsWith(UUIDTextMixin, IEndsWith): 

787 pass