Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/django/db/models/base.py: 14%
1096 statements
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
1import copy
2import inspect
3import warnings
4from functools import partialmethod
5from itertools import chain
7from asgiref.sync import sync_to_async
9import django
10from django.apps import apps
11from django.conf import settings
12from django.core import checks
13from django.core.exceptions import (
14 NON_FIELD_ERRORS,
15 FieldDoesNotExist,
16 FieldError,
17 MultipleObjectsReturned,
18 ObjectDoesNotExist,
19 ValidationError,
20)
21from django.db import (
22 DJANGO_VERSION_PICKLE_KEY,
23 DatabaseError,
24 connection,
25 connections,
26 router,
27 transaction,
28)
29from django.db.models import NOT_PROVIDED, ExpressionWrapper, IntegerField, Max, Value
30from django.db.models.constants import LOOKUP_SEP
31from django.db.models.constraints import CheckConstraint, UniqueConstraint
32from django.db.models.deletion import CASCADE, Collector
33from django.db.models.expressions import RawSQL
34from django.db.models.fields.related import (
35 ForeignObjectRel,
36 OneToOneField,
37 lazy_related_operation,
38 resolve_relation,
39)
40from django.db.models.functions import Coalesce
41from django.db.models.manager import Manager
42from django.db.models.options import Options
43from django.db.models.query import F, Q
44from django.db.models.signals import (
45 class_prepared,
46 post_init,
47 post_save,
48 pre_init,
49 pre_save,
50)
51from django.db.models.utils import AltersData, make_model_tuple
52from django.utils.encoding import force_str
53from django.utils.hashable import make_hashable
54from django.utils.text import capfirst, get_text_list
55from django.utils.translation import gettext_lazy as _
58class Deferred:
59 def __repr__(self):
60 return "<Deferred field>"
62 def __str__(self):
63 return "<Deferred field>"
66DEFERRED = Deferred()
69def subclass_exception(name, bases, module, attached_to):
70 """
71 Create exception subclass. Used by ModelBase below.
73 The exception is created in a way that allows it to be pickled, assuming
74 that the returned exception class will be added as an attribute to the
75 'attached_to' class.
76 """
77 return type(
78 name,
79 bases,
80 {
81 "__module__": module,
82 "__qualname__": "%s.%s" % (attached_to.__qualname__, name),
83 },
84 )
87def _has_contribute_to_class(value):
88 # Only call contribute_to_class() if it's bound.
89 return not inspect.isclass(value) and hasattr(value, "contribute_to_class")
92class ModelBase(type):
93 """Metaclass for all models."""
95 def __new__(cls, name, bases, attrs, **kwargs):
96 super_new = super().__new__
98 # Also ensure initialization is only performed for subclasses of Model
99 # (excluding Model class itself).
100 parents = [b for b in bases if isinstance(b, ModelBase)]
101 if not parents:
102 return super_new(cls, name, bases, attrs)
104 # Create the class.
105 module = attrs.pop("__module__")
106 new_attrs = {"__module__": module}
107 classcell = attrs.pop("__classcell__", None)
108 if classcell is not None:
109 new_attrs["__classcell__"] = classcell
110 attr_meta = attrs.pop("Meta", None)
111 # Pass all attrs without a (Django-specific) contribute_to_class()
112 # method to type.__new__() so that they're properly initialized
113 # (i.e. __set_name__()).
114 contributable_attrs = {}
115 for obj_name, obj in attrs.items():
116 if _has_contribute_to_class(obj):
117 contributable_attrs[obj_name] = obj
118 else:
119 new_attrs[obj_name] = obj
120 new_class = super_new(cls, name, bases, new_attrs, **kwargs)
122 abstract = getattr(attr_meta, "abstract", False)
123 meta = attr_meta or getattr(new_class, "Meta", None)
124 base_meta = getattr(new_class, "_meta", None)
126 app_label = None
128 # Look for an application configuration to attach the model to.
129 app_config = apps.get_containing_app_config(module)
131 if getattr(meta, "app_label", None) is None:
132 if app_config is None:
133 if not abstract:
134 raise RuntimeError(
135 "Model class %s.%s doesn't declare an explicit "
136 "app_label and isn't in an application in "
137 "INSTALLED_APPS." % (module, name)
138 )
140 else:
141 app_label = app_config.label
143 new_class.add_to_class("_meta", Options(meta, app_label))
144 if not abstract:
145 new_class.add_to_class(
146 "DoesNotExist",
147 subclass_exception(
148 "DoesNotExist",
149 tuple(
150 x.DoesNotExist
151 for x in parents
152 if hasattr(x, "_meta") and not x._meta.abstract
153 )
154 or (ObjectDoesNotExist,),
155 module,
156 attached_to=new_class,
157 ),
158 )
159 new_class.add_to_class(
160 "MultipleObjectsReturned",
161 subclass_exception(
162 "MultipleObjectsReturned",
163 tuple(
164 x.MultipleObjectsReturned
165 for x in parents
166 if hasattr(x, "_meta") and not x._meta.abstract
167 )
168 or (MultipleObjectsReturned,),
169 module,
170 attached_to=new_class,
171 ),
172 )
173 if base_meta and not base_meta.abstract:
174 # Non-abstract child classes inherit some attributes from their
175 # non-abstract parent (unless an ABC comes before it in the
176 # method resolution order).
177 if not hasattr(meta, "ordering"):
178 new_class._meta.ordering = base_meta.ordering
179 if not hasattr(meta, "get_latest_by"):
180 new_class._meta.get_latest_by = base_meta.get_latest_by
182 is_proxy = new_class._meta.proxy
184 # If the model is a proxy, ensure that the base class
185 # hasn't been swapped out.
186 if is_proxy and base_meta and base_meta.swapped:
187 raise TypeError(
188 "%s cannot proxy the swapped model '%s'." % (name, base_meta.swapped)
189 )
191 # Add remaining attributes (those with a contribute_to_class() method)
192 # to the class.
193 for obj_name, obj in contributable_attrs.items():
194 new_class.add_to_class(obj_name, obj)
196 # All the fields of any type declared on this model
197 new_fields = chain(
198 new_class._meta.local_fields,
199 new_class._meta.local_many_to_many,
200 new_class._meta.private_fields,
201 )
202 field_names = {f.name for f in new_fields}
204 # Basic setup for proxy models.
205 if is_proxy:
206 base = None
207 for parent in [kls for kls in parents if hasattr(kls, "_meta")]:
208 if parent._meta.abstract:
209 if parent._meta.fields:
210 raise TypeError(
211 "Abstract base class containing model fields not "
212 "permitted for proxy model '%s'." % name
213 )
214 else:
215 continue
216 if base is None:
217 base = parent
218 elif parent._meta.concrete_model is not base._meta.concrete_model:
219 raise TypeError(
220 "Proxy model '%s' has more than one non-abstract model base "
221 "class." % name
222 )
223 if base is None:
224 raise TypeError(
225 "Proxy model '%s' has no non-abstract model base class." % name
226 )
227 new_class._meta.setup_proxy(base)
228 new_class._meta.concrete_model = base._meta.concrete_model
229 else:
230 new_class._meta.concrete_model = new_class
232 # Collect the parent links for multi-table inheritance.
233 parent_links = {}
234 for base in reversed([new_class] + parents):
235 # Conceptually equivalent to `if base is Model`.
236 if not hasattr(base, "_meta"):
237 continue
238 # Skip concrete parent classes.
239 if base != new_class and not base._meta.abstract:
240 continue
241 # Locate OneToOneField instances.
242 for field in base._meta.local_fields:
243 if isinstance(field, OneToOneField) and field.remote_field.parent_link:
244 related = resolve_relation(new_class, field.remote_field.model)
245 parent_links[make_model_tuple(related)] = field
247 # Track fields inherited from base models.
248 inherited_attributes = set()
249 # Do the appropriate setup for any model parents.
250 for base in new_class.mro():
251 if base not in parents or not hasattr(base, "_meta"):
252 # Things without _meta aren't functional models, so they're
253 # uninteresting parents.
254 inherited_attributes.update(base.__dict__)
255 continue
257 parent_fields = base._meta.local_fields + base._meta.local_many_to_many
258 if not base._meta.abstract:
259 # Check for clashes between locally declared fields and those
260 # on the base classes.
261 for field in parent_fields:
262 if field.name in field_names:
263 raise FieldError(
264 "Local field %r in class %r clashes with field of "
265 "the same name from base class %r."
266 % (
267 field.name,
268 name,
269 base.__name__,
270 )
271 )
272 else:
273 inherited_attributes.add(field.name)
275 # Concrete classes...
276 base = base._meta.concrete_model
277 base_key = make_model_tuple(base)
278 if base_key in parent_links:
279 field = parent_links[base_key]
280 elif not is_proxy:
281 attr_name = "%s_ptr" % base._meta.model_name
282 field = OneToOneField(
283 base,
284 on_delete=CASCADE,
285 name=attr_name,
286 auto_created=True,
287 parent_link=True,
288 )
290 if attr_name in field_names:
291 raise FieldError(
292 "Auto-generated field '%s' in class %r for "
293 "parent_link to base class %r clashes with "
294 "declared field of the same name."
295 % (
296 attr_name,
297 name,
298 base.__name__,
299 )
300 )
302 # Only add the ptr field if it's not already present;
303 # e.g. migrations will already have it specified
304 if not hasattr(new_class, attr_name):
305 new_class.add_to_class(attr_name, field)
306 else:
307 field = None
308 new_class._meta.parents[base] = field
309 else:
310 base_parents = base._meta.parents.copy()
312 # Add fields from abstract base class if it wasn't overridden.
313 for field in parent_fields:
314 if (
315 field.name not in field_names
316 and field.name not in new_class.__dict__
317 and field.name not in inherited_attributes
318 ):
319 new_field = copy.deepcopy(field)
320 new_class.add_to_class(field.name, new_field)
321 # Replace parent links defined on this base by the new
322 # field. It will be appropriately resolved if required.
323 if field.one_to_one:
324 for parent, parent_link in base_parents.items():
325 if field == parent_link:
326 base_parents[parent] = new_field
328 # Pass any non-abstract parent classes onto child.
329 new_class._meta.parents.update(base_parents)
331 # Inherit private fields (like GenericForeignKey) from the parent
332 # class
333 for field in base._meta.private_fields:
334 if field.name in field_names:
335 if not base._meta.abstract:
336 raise FieldError(
337 "Local field %r in class %r clashes with field of "
338 "the same name from base class %r."
339 % (
340 field.name,
341 name,
342 base.__name__,
343 )
344 )
345 else:
346 field = copy.deepcopy(field)
347 if not base._meta.abstract:
348 field.mti_inherited = True
349 new_class.add_to_class(field.name, field)
351 # Copy indexes so that index names are unique when models extend an
352 # abstract model.
353 new_class._meta.indexes = [
354 copy.deepcopy(idx) for idx in new_class._meta.indexes
355 ]
357 if abstract:
358 # Abstract base models can't be instantiated and don't appear in
359 # the list of models for an app. We do the final setup for them a
360 # little differently from normal models.
361 attr_meta.abstract = False
362 new_class.Meta = attr_meta
363 return new_class
365 new_class._prepare()
366 new_class._meta.apps.register_model(new_class._meta.app_label, new_class)
367 return new_class
369 def add_to_class(cls, name, value):
370 if _has_contribute_to_class(value):
371 value.contribute_to_class(cls, name)
372 else:
373 setattr(cls, name, value)
375 def _prepare(cls):
376 """Create some methods once self._meta has been populated."""
377 opts = cls._meta
378 opts._prepare(cls)
380 if opts.order_with_respect_to:
381 cls.get_next_in_order = partialmethod(
382 cls._get_next_or_previous_in_order, is_next=True
383 )
384 cls.get_previous_in_order = partialmethod(
385 cls._get_next_or_previous_in_order, is_next=False
386 )
388 # Defer creating accessors on the foreign class until it has been
389 # created and registered. If remote_field is None, we're ordering
390 # with respect to a GenericForeignKey and don't know what the
391 # foreign class is - we'll add those accessors later in
392 # contribute_to_class().
393 if opts.order_with_respect_to.remote_field:
394 wrt = opts.order_with_respect_to
395 remote = wrt.remote_field.model
396 lazy_related_operation(make_foreign_order_accessors, cls, remote)
398 # Give the class a docstring -- its definition.
399 if cls.__doc__ is None:
400 cls.__doc__ = "%s(%s)" % (
401 cls.__name__,
402 ", ".join(f.name for f in opts.fields),
403 )
405 get_absolute_url_override = settings.ABSOLUTE_URL_OVERRIDES.get(
406 opts.label_lower
407 )
408 if get_absolute_url_override:
409 setattr(cls, "get_absolute_url", get_absolute_url_override)
411 if not opts.managers:
412 if any(f.name == "objects" for f in opts.fields):
413 raise ValueError(
414 "Model %s must specify a custom Manager, because it has a "
415 "field named 'objects'." % cls.__name__
416 )
417 manager = Manager()
418 manager.auto_created = True
419 cls.add_to_class("objects", manager)
421 # Set the name of _meta.indexes. This can't be done in
422 # Options.contribute_to_class() because fields haven't been added to
423 # the model at that point.
424 for index in cls._meta.indexes:
425 if not index.name:
426 index.set_name_with_model(cls)
428 class_prepared.send(sender=cls)
430 @property
431 def _base_manager(cls):
432 return cls._meta.base_manager
434 @property
435 def _default_manager(cls):
436 return cls._meta.default_manager
439class ModelStateFieldsCacheDescriptor:
440 def __get__(self, instance, cls=None):
441 if instance is None:
442 return self
443 res = instance.fields_cache = {}
444 return res
447class ModelState:
448 """Store model instance state."""
450 db = None
451 # If true, uniqueness validation checks will consider this a new, unsaved
452 # object. Necessary for correct validation of new instances of objects with
453 # explicit (non-auto) PKs. This impacts validation only; it has no effect
454 # on the actual save.
455 adding = True
456 fields_cache = ModelStateFieldsCacheDescriptor()
459class Model(AltersData, metaclass=ModelBase):
460 def __init__(self, *args, **kwargs):
461 # Alias some things as locals to avoid repeat global lookups
462 cls = self.__class__
463 opts = self._meta
464 _setattr = setattr
465 _DEFERRED = DEFERRED
466 if opts.abstract:
467 raise TypeError("Abstract models cannot be instantiated.")
469 pre_init.send(sender=cls, args=args, kwargs=kwargs)
471 # Set up the storage for instance state
472 self._state = ModelState()
474 # There is a rather weird disparity here; if kwargs, it's set, then args
475 # overrides it. It should be one or the other; don't duplicate the work
476 # The reason for the kwargs check is that standard iterator passes in by
477 # args, and instantiation for iteration is 33% faster.
478 if len(args) > len(opts.concrete_fields):
479 # Daft, but matches old exception sans the err msg.
480 raise IndexError("Number of args exceeds number of fields")
482 if not kwargs:
483 fields_iter = iter(opts.concrete_fields)
484 # The ordering of the zip calls matter - zip throws StopIteration
485 # when an iter throws it. So if the first iter throws it, the second
486 # is *not* consumed. We rely on this, so don't change the order
487 # without changing the logic.
488 for val, field in zip(args, fields_iter):
489 if val is _DEFERRED:
490 continue
491 _setattr(self, field.attname, val)
492 else:
493 # Slower, kwargs-ready version.
494 fields_iter = iter(opts.fields)
495 for val, field in zip(args, fields_iter):
496 if val is _DEFERRED:
497 continue
498 _setattr(self, field.attname, val)
499 if kwargs.pop(field.name, NOT_PROVIDED) is not NOT_PROVIDED:
500 raise TypeError(
501 f"{cls.__qualname__}() got both positional and "
502 f"keyword arguments for field '{field.name}'."
503 )
505 # Now we're left with the unprocessed fields that *must* come from
506 # keywords, or default.
508 for field in fields_iter:
509 is_related_object = False
510 # Virtual field
511 if field.attname not in kwargs and field.column is None:
512 continue
513 if kwargs:
514 if isinstance(field.remote_field, ForeignObjectRel):
515 try:
516 # Assume object instance was passed in.
517 rel_obj = kwargs.pop(field.name)
518 is_related_object = True
519 except KeyError:
520 try:
521 # Object instance wasn't passed in -- must be an ID.
522 val = kwargs.pop(field.attname)
523 except KeyError:
524 val = field.get_default()
525 else:
526 try:
527 val = kwargs.pop(field.attname)
528 except KeyError:
529 # This is done with an exception rather than the
530 # default argument on pop because we don't want
531 # get_default() to be evaluated, and then not used.
532 # Refs #12057.
533 val = field.get_default()
534 else:
535 val = field.get_default()
537 if is_related_object:
538 # If we are passed a related instance, set it using the
539 # field.name instead of field.attname (e.g. "user" instead of
540 # "user_id") so that the object gets properly cached (and type
541 # checked) by the RelatedObjectDescriptor.
542 if rel_obj is not _DEFERRED:
543 _setattr(self, field.name, rel_obj)
544 else:
545 if val is not _DEFERRED:
546 _setattr(self, field.attname, val)
548 if kwargs:
549 property_names = opts._property_names
550 unexpected = ()
551 for prop, value in kwargs.items():
552 # Any remaining kwargs must correspond to properties or virtual
553 # fields.
554 if prop in property_names:
555 if value is not _DEFERRED:
556 _setattr(self, prop, value)
557 else:
558 try:
559 opts.get_field(prop)
560 except FieldDoesNotExist:
561 unexpected += (prop,)
562 else:
563 if value is not _DEFERRED:
564 _setattr(self, prop, value)
565 if unexpected:
566 unexpected_names = ", ".join(repr(n) for n in unexpected)
567 raise TypeError(
568 f"{cls.__name__}() got unexpected keyword arguments: "
569 f"{unexpected_names}"
570 )
571 super().__init__()
572 post_init.send(sender=cls, instance=self)
574 @classmethod
575 def from_db(cls, db, field_names, values):
576 if len(values) != len(cls._meta.concrete_fields):
577 values_iter = iter(values)
578 values = [
579 next(values_iter) if f.attname in field_names else DEFERRED
580 for f in cls._meta.concrete_fields
581 ]
582 new = cls(*values)
583 new._state.adding = False
584 new._state.db = db
585 return new
587 def __repr__(self):
588 return "<%s: %s>" % (self.__class__.__name__, self)
590 def __str__(self):
591 return "%s object (%s)" % (self.__class__.__name__, self.pk)
593 def __eq__(self, other):
594 if not isinstance(other, Model):
595 return NotImplemented
596 if self._meta.concrete_model != other._meta.concrete_model:
597 return False
598 my_pk = self.pk
599 if my_pk is None:
600 return self is other
601 return my_pk == other.pk
603 def __hash__(self):
604 if self.pk is None:
605 raise TypeError("Model instances without primary key value are unhashable")
606 return hash(self.pk)
608 def __reduce__(self):
609 data = self.__getstate__()
610 data[DJANGO_VERSION_PICKLE_KEY] = django.__version__
611 class_id = self._meta.app_label, self._meta.object_name
612 return model_unpickle, (class_id,), data
614 def __getstate__(self):
615 """Hook to allow choosing the attributes to pickle."""
616 state = self.__dict__.copy()
617 state["_state"] = copy.copy(state["_state"])
618 state["_state"].fields_cache = state["_state"].fields_cache.copy()
619 # memoryview cannot be pickled, so cast it to bytes and store
620 # separately.
621 _memoryview_attrs = []
622 for attr, value in state.items():
623 if isinstance(value, memoryview):
624 _memoryview_attrs.append((attr, bytes(value)))
625 if _memoryview_attrs:
626 state["_memoryview_attrs"] = _memoryview_attrs
627 for attr, value in _memoryview_attrs:
628 state.pop(attr)
629 return state
631 def __setstate__(self, state):
632 pickled_version = state.get(DJANGO_VERSION_PICKLE_KEY)
633 if pickled_version:
634 if pickled_version != django.__version__:
635 warnings.warn(
636 "Pickled model instance's Django version %s does not "
637 "match the current version %s."
638 % (pickled_version, django.__version__),
639 RuntimeWarning,
640 stacklevel=2,
641 )
642 else:
643 warnings.warn(
644 "Pickled model instance's Django version is not specified.",
645 RuntimeWarning,
646 stacklevel=2,
647 )
648 if "_memoryview_attrs" in state:
649 for attr, value in state.pop("_memoryview_attrs"):
650 state[attr] = memoryview(value)
651 self.__dict__.update(state)
653 def _get_pk_val(self, meta=None):
654 meta = meta or self._meta
655 return getattr(self, meta.pk.attname)
657 def _set_pk_val(self, value):
658 for parent_link in self._meta.parents.values():
659 if parent_link and parent_link != self._meta.pk:
660 setattr(self, parent_link.target_field.attname, value)
661 return setattr(self, self._meta.pk.attname, value)
663 pk = property(_get_pk_val, _set_pk_val)
665 def get_deferred_fields(self):
666 """
667 Return a set containing names of deferred fields for this instance.
668 """
669 return {
670 f.attname
671 for f in self._meta.concrete_fields
672 if f.attname not in self.__dict__
673 }
675 def refresh_from_db(self, using=None, fields=None):
676 """
677 Reload field values from the database.
679 By default, the reloading happens from the database this instance was
680 loaded from, or by the read router if this instance wasn't loaded from
681 any database. The using parameter will override the default.
683 Fields can be used to specify which fields to reload. The fields
684 should be an iterable of field attnames. If fields is None, then
685 all non-deferred fields are reloaded.
687 When accessing deferred fields of an instance, the deferred loading
688 of the field will call this method.
689 """
690 if fields is None:
691 self._prefetched_objects_cache = {}
692 else:
693 prefetched_objects_cache = getattr(self, "_prefetched_objects_cache", ())
694 for field in fields:
695 if field in prefetched_objects_cache:
696 del prefetched_objects_cache[field]
697 fields.remove(field)
698 if not fields:
699 return
700 if any(LOOKUP_SEP in f for f in fields):
701 raise ValueError(
702 'Found "%s" in fields argument. Relations and transforms '
703 "are not allowed in fields." % LOOKUP_SEP
704 )
706 hints = {"instance": self}
707 db_instance_qs = self.__class__._base_manager.db_manager(
708 using, hints=hints
709 ).filter(pk=self.pk)
711 # Use provided fields, if not set then reload all non-deferred fields.
712 deferred_fields = self.get_deferred_fields()
713 if fields is not None:
714 fields = list(fields)
715 db_instance_qs = db_instance_qs.only(*fields)
716 elif deferred_fields:
717 fields = [
718 f.attname
719 for f in self._meta.concrete_fields
720 if f.attname not in deferred_fields
721 ]
722 db_instance_qs = db_instance_qs.only(*fields)
724 db_instance = db_instance_qs.get()
725 non_loaded_fields = db_instance.get_deferred_fields()
726 for field in self._meta.concrete_fields:
727 if field.attname in non_loaded_fields:
728 # This field wasn't refreshed - skip ahead.
729 continue
730 setattr(self, field.attname, getattr(db_instance, field.attname))
731 # Clear cached foreign keys.
732 if field.is_relation and field.is_cached(self):
733 field.delete_cached_value(self)
735 # Clear cached relations.
736 for field in self._meta.related_objects:
737 if field.is_cached(self):
738 field.delete_cached_value(self)
740 # Clear cached private relations.
741 for field in self._meta.private_fields:
742 if field.is_relation and field.is_cached(self):
743 field.delete_cached_value(self)
745 self._state.db = db_instance._state.db
747 async def arefresh_from_db(self, using=None, fields=None):
748 return await sync_to_async(self.refresh_from_db)(using=using, fields=fields)
750 def serializable_value(self, field_name):
751 """
752 Return the value of the field name for this instance. If the field is
753 a foreign key, return the id value instead of the object. If there's
754 no Field object with this name on the model, return the model
755 attribute's value.
757 Used to serialize a field's value (in the serializer, or form output,
758 for example). Normally, you would just access the attribute directly
759 and not use this method.
760 """
761 try:
762 field = self._meta.get_field(field_name)
763 except FieldDoesNotExist:
764 return getattr(self, field_name)
765 return getattr(self, field.attname)
767 def save(
768 self, force_insert=False, force_update=False, using=None, update_fields=None
769 ):
770 """
771 Save the current instance. Override this in a subclass if you want to
772 control the saving process.
774 The 'force_insert' and 'force_update' parameters can be used to insist
775 that the "save" must be an SQL insert or update (or equivalent for
776 non-SQL backends), respectively. Normally, they should not be set.
777 """
778 self._prepare_related_fields_for_save(operation_name="save")
780 using = using or router.db_for_write(self.__class__, instance=self)
781 if force_insert and (force_update or update_fields):
782 raise ValueError("Cannot force both insert and updating in model saving.")
784 deferred_fields = self.get_deferred_fields()
785 if update_fields is not None:
786 # If update_fields is empty, skip the save. We do also check for
787 # no-op saves later on for inheritance cases. This bailout is
788 # still needed for skipping signal sending.
789 if not update_fields:
790 return
792 update_fields = frozenset(update_fields)
793 field_names = self._meta._non_pk_concrete_field_names
794 non_model_fields = update_fields.difference(field_names)
796 if non_model_fields:
797 raise ValueError(
798 "The following fields do not exist in this model, are m2m "
799 "fields, or are non-concrete fields: %s"
800 % ", ".join(non_model_fields)
801 )
803 # If saving to the same database, and this model is deferred, then
804 # automatically do an "update_fields" save on the loaded fields.
805 elif not force_insert and deferred_fields and using == self._state.db:
806 field_names = set()
807 for field in self._meta.concrete_fields:
808 if not field.primary_key and not hasattr(field, "through"):
809 field_names.add(field.attname)
810 loaded_fields = field_names.difference(deferred_fields)
811 if loaded_fields:
812 update_fields = frozenset(loaded_fields)
814 self.save_base(
815 using=using,
816 force_insert=force_insert,
817 force_update=force_update,
818 update_fields=update_fields,
819 )
821 save.alters_data = True
823 async def asave(
824 self, force_insert=False, force_update=False, using=None, update_fields=None
825 ):
826 return await sync_to_async(self.save)(
827 force_insert=force_insert,
828 force_update=force_update,
829 using=using,
830 update_fields=update_fields,
831 )
833 asave.alters_data = True
835 def save_base(
836 self,
837 raw=False,
838 force_insert=False,
839 force_update=False,
840 using=None,
841 update_fields=None,
842 ):
843 """
844 Handle the parts of saving which should be done only once per save,
845 yet need to be done in raw saves, too. This includes some sanity
846 checks and signal sending.
848 The 'raw' argument is telling save_base not to save any parent
849 models and not to do any changes to the values before save. This
850 is used by fixture loading.
851 """
852 using = using or router.db_for_write(self.__class__, instance=self)
853 assert not (force_insert and (force_update or update_fields))
854 assert update_fields is None or update_fields
855 cls = origin = self.__class__
856 # Skip proxies, but keep the origin as the proxy model.
857 if cls._meta.proxy:
858 cls = cls._meta.concrete_model
859 meta = cls._meta
860 if not meta.auto_created:
861 pre_save.send(
862 sender=origin,
863 instance=self,
864 raw=raw,
865 using=using,
866 update_fields=update_fields,
867 )
868 # A transaction isn't needed if one query is issued.
869 if meta.parents:
870 context_manager = transaction.atomic(using=using, savepoint=False)
871 else:
872 context_manager = transaction.mark_for_rollback_on_error(using=using)
873 with context_manager:
874 parent_inserted = False
875 if not raw:
876 parent_inserted = self._save_parents(cls, using, update_fields)
877 updated = self._save_table(
878 raw,
879 cls,
880 force_insert or parent_inserted,
881 force_update,
882 using,
883 update_fields,
884 )
885 # Store the database on which the object was saved
886 self._state.db = using
887 # Once saved, this is no longer a to-be-added instance.
888 self._state.adding = False
890 # Signal that the save is complete
891 if not meta.auto_created:
892 post_save.send(
893 sender=origin,
894 instance=self,
895 created=(not updated),
896 update_fields=update_fields,
897 raw=raw,
898 using=using,
899 )
901 save_base.alters_data = True
903 def _save_parents(self, cls, using, update_fields):
904 """Save all the parents of cls using values from self."""
905 meta = cls._meta
906 inserted = False
907 for parent, field in meta.parents.items():
908 # Make sure the link fields are synced between parent and self.
909 if (
910 field
911 and getattr(self, parent._meta.pk.attname) is None
912 and getattr(self, field.attname) is not None
913 ):
914 setattr(self, parent._meta.pk.attname, getattr(self, field.attname))
915 parent_inserted = self._save_parents(
916 cls=parent, using=using, update_fields=update_fields
917 )
918 updated = self._save_table(
919 cls=parent,
920 using=using,
921 update_fields=update_fields,
922 force_insert=parent_inserted,
923 )
924 if not updated:
925 inserted = True
926 # Set the parent's PK value to self.
927 if field:
928 setattr(self, field.attname, self._get_pk_val(parent._meta))
929 # Since we didn't have an instance of the parent handy set
930 # attname directly, bypassing the descriptor. Invalidate
931 # the related object cache, in case it's been accidentally
932 # populated. A fresh instance will be re-built from the
933 # database if necessary.
934 if field.is_cached(self):
935 field.delete_cached_value(self)
936 return inserted
938 def _save_table(
939 self,
940 raw=False,
941 cls=None,
942 force_insert=False,
943 force_update=False,
944 using=None,
945 update_fields=None,
946 ):
947 """
948 Do the heavy-lifting involved in saving. Update or insert the data
949 for a single table.
950 """
951 meta = cls._meta
952 non_pks = [f for f in meta.local_concrete_fields if not f.primary_key]
954 if update_fields:
955 non_pks = [
956 f
957 for f in non_pks
958 if f.name in update_fields or f.attname in update_fields
959 ]
961 pk_val = self._get_pk_val(meta)
962 if pk_val is None:
963 pk_val = meta.pk.get_pk_value_on_save(self)
964 setattr(self, meta.pk.attname, pk_val)
965 pk_set = pk_val is not None
966 if not pk_set and (force_update or update_fields):
967 raise ValueError("Cannot force an update in save() with no primary key.")
968 updated = False
969 # Skip an UPDATE when adding an instance and primary key has a default.
970 if (
971 not raw
972 and not force_insert
973 and self._state.adding
974 and meta.pk.default
975 and meta.pk.default is not NOT_PROVIDED
976 ):
977 force_insert = True
978 # If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
979 if pk_set and not force_insert:
980 base_qs = cls._base_manager.using(using)
981 values = [
982 (
983 f,
984 None,
985 (getattr(self, f.attname) if raw else f.pre_save(self, False)),
986 )
987 for f in non_pks
988 ]
989 forced_update = update_fields or force_update
990 updated = self._do_update(
991 base_qs, using, pk_val, values, update_fields, forced_update
992 )
993 if force_update and not updated:
994 raise DatabaseError("Forced update did not affect any rows.")
995 if update_fields and not updated:
996 raise DatabaseError("Save with update_fields did not affect any rows.")
997 if not updated:
998 if meta.order_with_respect_to:
999 # If this is a model with an order_with_respect_to
1000 # autopopulate the _order field
1001 field = meta.order_with_respect_to
1002 filter_args = field.get_filter_kwargs_for_object(self)
1003 self._order = (
1004 cls._base_manager.using(using)
1005 .filter(**filter_args)
1006 .aggregate(
1007 _order__max=Coalesce(
1008 ExpressionWrapper(
1009 Max("_order") + Value(1), output_field=IntegerField()
1010 ),
1011 Value(0),
1012 ),
1013 )["_order__max"]
1014 )
1015 fields = meta.local_concrete_fields
1016 if not pk_set:
1017 fields = [f for f in fields if f is not meta.auto_field]
1019 returning_fields = meta.db_returning_fields
1020 results = self._do_insert(
1021 cls._base_manager, using, fields, returning_fields, raw
1022 )
1023 if results:
1024 for value, field in zip(results[0], returning_fields):
1025 setattr(self, field.attname, value)
1026 return updated
1028 def _do_update(self, base_qs, using, pk_val, values, update_fields, forced_update):
1029 """
1030 Try to update the model. Return True if the model was updated (if an
1031 update query was done and a matching row was found in the DB).
1032 """
1033 filtered = base_qs.filter(pk=pk_val)
1034 if not values:
1035 # We can end up here when saving a model in inheritance chain where
1036 # update_fields doesn't target any field in current model. In that
1037 # case we just say the update succeeded. Another case ending up here
1038 # is a model with just PK - in that case check that the PK still
1039 # exists.
1040 return update_fields is not None or filtered.exists()
1041 if self._meta.select_on_save and not forced_update:
1042 return (
1043 filtered.exists()
1044 and
1045 # It may happen that the object is deleted from the DB right after
1046 # this check, causing the subsequent UPDATE to return zero matching
1047 # rows. The same result can occur in some rare cases when the
1048 # database returns zero despite the UPDATE being executed
1049 # successfully (a row is matched and updated). In order to
1050 # distinguish these two cases, the object's existence in the
1051 # database is again checked for if the UPDATE query returns 0.
1052 (filtered._update(values) > 0 or filtered.exists())
1053 )
1054 return filtered._update(values) > 0
1056 def _do_insert(self, manager, using, fields, returning_fields, raw):
1057 """
1058 Do an INSERT. If returning_fields is defined then this method should
1059 return the newly created data for the model.
1060 """
1061 return manager._insert(
1062 [self],
1063 fields=fields,
1064 returning_fields=returning_fields,
1065 using=using,
1066 raw=raw,
1067 )
1069 def _prepare_related_fields_for_save(self, operation_name, fields=None):
1070 # Ensure that a model instance without a PK hasn't been assigned to
1071 # a ForeignKey, GenericForeignKey or OneToOneField on this model. If
1072 # the field is nullable, allowing the save would result in silent data
1073 # loss.
1074 for field in self._meta.concrete_fields:
1075 if fields and field not in fields:
1076 continue
1077 # If the related field isn't cached, then an instance hasn't been
1078 # assigned and there's no need to worry about this check.
1079 if field.is_relation and field.is_cached(self):
1080 obj = getattr(self, field.name, None)
1081 if not obj:
1082 continue
1083 # A pk may have been assigned manually to a model instance not
1084 # saved to the database (or auto-generated in a case like
1085 # UUIDField), but we allow the save to proceed and rely on the
1086 # database to raise an IntegrityError if applicable. If
1087 # constraints aren't supported by the database, there's the
1088 # unavoidable risk of data corruption.
1089 if obj.pk is None:
1090 # Remove the object from a related instance cache.
1091 if not field.remote_field.multiple:
1092 field.remote_field.delete_cached_value(obj)
1093 raise ValueError(
1094 "%s() prohibited to prevent data loss due to unsaved "
1095 "related object '%s'." % (operation_name, field.name)
1096 )
1097 elif getattr(self, field.attname) in field.empty_values:
1098 # Set related object if it has been saved after an
1099 # assignment.
1100 setattr(self, field.name, obj)
1101 # If the relationship's pk/to_field was changed, clear the
1102 # cached relationship.
1103 if getattr(obj, field.target_field.attname) != getattr(
1104 self, field.attname
1105 ):
1106 field.delete_cached_value(self)
1107 # GenericForeignKeys are private.
1108 for field in self._meta.private_fields:
1109 if fields and field not in fields:
1110 continue
1111 if (
1112 field.is_relation
1113 and field.is_cached(self)
1114 and hasattr(field, "fk_field")
1115 ):
1116 obj = field.get_cached_value(self, default=None)
1117 if obj and obj.pk is None:
1118 raise ValueError(
1119 f"{operation_name}() prohibited to prevent data loss due to "
1120 f"unsaved related object '{field.name}'."
1121 )
1123 def delete(self, using=None, keep_parents=False):
1124 if self.pk is None:
1125 raise ValueError(
1126 "%s object can't be deleted because its %s attribute is set "
1127 "to None." % (self._meta.object_name, self._meta.pk.attname)
1128 )
1129 using = using or router.db_for_write(self.__class__, instance=self)
1130 collector = Collector(using=using, origin=self)
1131 collector.collect([self], keep_parents=keep_parents)
1132 return collector.delete()
1134 delete.alters_data = True
1136 async def adelete(self, using=None, keep_parents=False):
1137 return await sync_to_async(self.delete)(
1138 using=using,
1139 keep_parents=keep_parents,
1140 )
1142 adelete.alters_data = True
1144 def _get_FIELD_display(self, field):
1145 value = getattr(self, field.attname)
1146 choices_dict = dict(make_hashable(field.flatchoices))
1147 # force_str() to coerce lazy strings.
1148 return force_str(
1149 choices_dict.get(make_hashable(value), value), strings_only=True
1150 )
1152 def _get_next_or_previous_by_FIELD(self, field, is_next, **kwargs):
1153 if not self.pk:
1154 raise ValueError("get_next/get_previous cannot be used on unsaved objects.")
1155 op = "gt" if is_next else "lt"
1156 order = "" if is_next else "-"
1157 param = getattr(self, field.attname)
1158 q = Q.create([(field.name, param), (f"pk__{op}", self.pk)], connector=Q.AND)
1159 q = Q.create([q, (f"{field.name}__{op}", param)], connector=Q.OR)
1160 qs = (
1161 self.__class__._default_manager.using(self._state.db)
1162 .filter(**kwargs)
1163 .filter(q)
1164 .order_by("%s%s" % (order, field.name), "%spk" % order)
1165 )
1166 try:
1167 return qs[0]
1168 except IndexError:
1169 raise self.DoesNotExist(
1170 "%s matching query does not exist." % self.__class__._meta.object_name
1171 )
1173 def _get_next_or_previous_in_order(self, is_next):
1174 cachename = "__%s_order_cache" % is_next
1175 if not hasattr(self, cachename):
1176 op = "gt" if is_next else "lt"
1177 order = "_order" if is_next else "-_order"
1178 order_field = self._meta.order_with_respect_to
1179 filter_args = order_field.get_filter_kwargs_for_object(self)
1180 obj = (
1181 self.__class__._default_manager.filter(**filter_args)
1182 .filter(
1183 **{
1184 "_order__%s"
1185 % op: self.__class__._default_manager.values("_order").filter(
1186 **{self._meta.pk.name: self.pk}
1187 )
1188 }
1189 )
1190 .order_by(order)[:1]
1191 .get()
1192 )
1193 setattr(self, cachename, obj)
1194 return getattr(self, cachename)
1196 def _get_field_value_map(self, meta, exclude=None):
1197 if exclude is None:
1198 exclude = set()
1199 meta = meta or self._meta
1200 return {
1201 field.name: Value(getattr(self, field.attname), field)
1202 for field in meta.local_concrete_fields
1203 if field.name not in exclude
1204 }
1206 def prepare_database_save(self, field):
1207 if self.pk is None:
1208 raise ValueError(
1209 "Unsaved model instance %r cannot be used in an ORM query." % self
1210 )
1211 return getattr(self, field.remote_field.get_related_field().attname)
1213 def clean(self):
1214 """
1215 Hook for doing any extra model-wide validation after clean() has been
1216 called on every field by self.clean_fields. Any ValidationError raised
1217 by this method will not be associated with a particular field; it will
1218 have a special-case association with the field defined by NON_FIELD_ERRORS.
1219 """
1220 pass
1222 def validate_unique(self, exclude=None):
1223 """
1224 Check unique constraints on the model and raise ValidationError if any
1225 failed.
1226 """
1227 unique_checks, date_checks = self._get_unique_checks(exclude=exclude)
1229 errors = self._perform_unique_checks(unique_checks)
1230 date_errors = self._perform_date_checks(date_checks)
1232 for k, v in date_errors.items():
1233 errors.setdefault(k, []).extend(v)
1235 if errors:
1236 raise ValidationError(errors)
1238 def _get_unique_checks(self, exclude=None, include_meta_constraints=False):
1239 """
1240 Return a list of checks to perform. Since validate_unique() could be
1241 called from a ModelForm, some fields may have been excluded; we can't
1242 perform a unique check on a model that is missing fields involved
1243 in that check. Fields that did not validate should also be excluded,
1244 but they need to be passed in via the exclude argument.
1245 """
1246 if exclude is None:
1247 exclude = set()
1248 unique_checks = []
1250 unique_togethers = [(self.__class__, self._meta.unique_together)]
1251 constraints = []
1252 if include_meta_constraints:
1253 constraints = [(self.__class__, self._meta.total_unique_constraints)]
1254 for parent_class in self._meta.get_parent_list():
1255 if parent_class._meta.unique_together:
1256 unique_togethers.append(
1257 (parent_class, parent_class._meta.unique_together)
1258 )
1259 if include_meta_constraints and parent_class._meta.total_unique_constraints:
1260 constraints.append(
1261 (parent_class, parent_class._meta.total_unique_constraints)
1262 )
1264 for model_class, unique_together in unique_togethers:
1265 for check in unique_together:
1266 if not any(name in exclude for name in check):
1267 # Add the check if the field isn't excluded.
1268 unique_checks.append((model_class, tuple(check)))
1270 if include_meta_constraints:
1271 for model_class, model_constraints in constraints:
1272 for constraint in model_constraints:
1273 if not any(name in exclude for name in constraint.fields):
1274 unique_checks.append((model_class, constraint.fields))
1276 # These are checks for the unique_for_<date/year/month>.
1277 date_checks = []
1279 # Gather a list of checks for fields declared as unique and add them to
1280 # the list of checks.
1282 fields_with_class = [(self.__class__, self._meta.local_fields)]
1283 for parent_class in self._meta.get_parent_list():
1284 fields_with_class.append((parent_class, parent_class._meta.local_fields))
1286 for model_class, fields in fields_with_class:
1287 for f in fields:
1288 name = f.name
1289 if name in exclude:
1290 continue
1291 if f.unique:
1292 unique_checks.append((model_class, (name,)))
1293 if f.unique_for_date and f.unique_for_date not in exclude:
1294 date_checks.append((model_class, "date", name, f.unique_for_date))
1295 if f.unique_for_year and f.unique_for_year not in exclude:
1296 date_checks.append((model_class, "year", name, f.unique_for_year))
1297 if f.unique_for_month and f.unique_for_month not in exclude:
1298 date_checks.append((model_class, "month", name, f.unique_for_month))
1299 return unique_checks, date_checks
1301 def _perform_unique_checks(self, unique_checks):
1302 errors = {}
1304 for model_class, unique_check in unique_checks:
1305 # Try to look up an existing object with the same values as this
1306 # object's values for all the unique field.
1308 lookup_kwargs = {}
1309 for field_name in unique_check:
1310 f = self._meta.get_field(field_name)
1311 lookup_value = getattr(self, f.attname)
1312 # TODO: Handle multiple backends with different feature flags.
1313 if lookup_value is None or (
1314 lookup_value == ""
1315 and connection.features.interprets_empty_strings_as_nulls
1316 ):
1317 # no value, skip the lookup
1318 continue
1319 if f.primary_key and not self._state.adding:
1320 # no need to check for unique primary key when editing
1321 continue
1322 lookup_kwargs[str(field_name)] = lookup_value
1324 # some fields were skipped, no reason to do the check
1325 if len(unique_check) != len(lookup_kwargs):
1326 continue
1328 qs = model_class._default_manager.filter(**lookup_kwargs)
1330 # Exclude the current object from the query if we are editing an
1331 # instance (as opposed to creating a new one)
1332 # Note that we need to use the pk as defined by model_class, not
1333 # self.pk. These can be different fields because model inheritance
1334 # allows single model to have effectively multiple primary keys.
1335 # Refs #17615.
1336 model_class_pk = self._get_pk_val(model_class._meta)
1337 if not self._state.adding and model_class_pk is not None:
1338 qs = qs.exclude(pk=model_class_pk)
1339 if qs.exists():
1340 if len(unique_check) == 1:
1341 key = unique_check[0]
1342 else:
1343 key = NON_FIELD_ERRORS
1344 errors.setdefault(key, []).append(
1345 self.unique_error_message(model_class, unique_check)
1346 )
1348 return errors
1350 def _perform_date_checks(self, date_checks):
1351 errors = {}
1352 for model_class, lookup_type, field, unique_for in date_checks:
1353 lookup_kwargs = {}
1354 # there's a ticket to add a date lookup, we can remove this special
1355 # case if that makes it's way in
1356 date = getattr(self, unique_for)
1357 if date is None:
1358 continue
1359 if lookup_type == "date":
1360 lookup_kwargs["%s__day" % unique_for] = date.day
1361 lookup_kwargs["%s__month" % unique_for] = date.month
1362 lookup_kwargs["%s__year" % unique_for] = date.year
1363 else:
1364 lookup_kwargs["%s__%s" % (unique_for, lookup_type)] = getattr(
1365 date, lookup_type
1366 )
1367 lookup_kwargs[field] = getattr(self, field)
1369 qs = model_class._default_manager.filter(**lookup_kwargs)
1370 # Exclude the current object from the query if we are editing an
1371 # instance (as opposed to creating a new one)
1372 if not self._state.adding and self.pk is not None:
1373 qs = qs.exclude(pk=self.pk)
1375 if qs.exists():
1376 errors.setdefault(field, []).append(
1377 self.date_error_message(lookup_type, field, unique_for)
1378 )
1379 return errors
1381 def date_error_message(self, lookup_type, field_name, unique_for):
1382 opts = self._meta
1383 field = opts.get_field(field_name)
1384 return ValidationError(
1385 message=field.error_messages["unique_for_date"],
1386 code="unique_for_date",
1387 params={
1388 "model": self,
1389 "model_name": capfirst(opts.verbose_name),
1390 "lookup_type": lookup_type,
1391 "field": field_name,
1392 "field_label": capfirst(field.verbose_name),
1393 "date_field": unique_for,
1394 "date_field_label": capfirst(opts.get_field(unique_for).verbose_name),
1395 },
1396 )
1398 def unique_error_message(self, model_class, unique_check):
1399 opts = model_class._meta
1401 params = {
1402 "model": self,
1403 "model_class": model_class,
1404 "model_name": capfirst(opts.verbose_name),
1405 "unique_check": unique_check,
1406 }
1408 # A unique field
1409 if len(unique_check) == 1:
1410 field = opts.get_field(unique_check[0])
1411 params["field_label"] = capfirst(field.verbose_name)
1412 return ValidationError(
1413 message=field.error_messages["unique"],
1414 code="unique",
1415 params=params,
1416 )
1418 # unique_together
1419 else:
1420 field_labels = [
1421 capfirst(opts.get_field(f).verbose_name) for f in unique_check
1422 ]
1423 params["field_labels"] = get_text_list(field_labels, _("and"))
1424 return ValidationError(
1425 message=_("%(model_name)s with this %(field_labels)s already exists."),
1426 code="unique_together",
1427 params=params,
1428 )
1430 def get_constraints(self):
1431 constraints = [(self.__class__, self._meta.constraints)]
1432 for parent_class in self._meta.get_parent_list():
1433 if parent_class._meta.constraints:
1434 constraints.append((parent_class, parent_class._meta.constraints))
1435 return constraints
1437 def validate_constraints(self, exclude=None):
1438 constraints = self.get_constraints()
1439 using = router.db_for_write(self.__class__, instance=self)
1441 errors = {}
1442 for model_class, model_constraints in constraints:
1443 for constraint in model_constraints:
1444 try:
1445 constraint.validate(model_class, self, exclude=exclude, using=using)
1446 except ValidationError as e:
1447 if e.code == "unique" and len(constraint.fields) == 1:
1448 errors.setdefault(constraint.fields[0], []).append(e)
1449 else:
1450 errors = e.update_error_dict(errors)
1451 if errors:
1452 raise ValidationError(errors)
1454 def full_clean(self, exclude=None, validate_unique=True, validate_constraints=True):
1455 """
1456 Call clean_fields(), clean(), validate_unique(), and
1457 validate_constraints() on the model. Raise a ValidationError for any
1458 errors that occur.
1459 """
1460 errors = {}
1461 if exclude is None:
1462 exclude = set()
1463 else:
1464 exclude = set(exclude)
1466 try:
1467 self.clean_fields(exclude=exclude)
1468 except ValidationError as e:
1469 errors = e.update_error_dict(errors)
1471 # Form.clean() is run even if other validation fails, so do the
1472 # same with Model.clean() for consistency.
1473 try:
1474 self.clean()
1475 except ValidationError as e:
1476 errors = e.update_error_dict(errors)
1478 # Run unique checks, but only for fields that passed validation.
1479 if validate_unique:
1480 for name in errors:
1481 if name != NON_FIELD_ERRORS and name not in exclude:
1482 exclude.add(name)
1483 try:
1484 self.validate_unique(exclude=exclude)
1485 except ValidationError as e:
1486 errors = e.update_error_dict(errors)
1488 # Run constraints checks, but only for fields that passed validation.
1489 if validate_constraints:
1490 for name in errors:
1491 if name != NON_FIELD_ERRORS and name not in exclude:
1492 exclude.add(name)
1493 try:
1494 self.validate_constraints(exclude=exclude)
1495 except ValidationError as e:
1496 errors = e.update_error_dict(errors)
1498 if errors:
1499 raise ValidationError(errors)
1501 def clean_fields(self, exclude=None):
1502 """
1503 Clean all fields and raise a ValidationError containing a dict
1504 of all validation errors if any occur.
1505 """
1506 if exclude is None:
1507 exclude = set()
1509 errors = {}
1510 for f in self._meta.fields:
1511 if f.name in exclude:
1512 continue
1513 # Skip validation for empty fields with blank=True. The developer
1514 # is responsible for making sure they have a valid value.
1515 raw_value = getattr(self, f.attname)
1516 if f.blank and raw_value in f.empty_values:
1517 continue
1518 try:
1519 setattr(self, f.attname, f.clean(raw_value, self))
1520 except ValidationError as e:
1521 errors[f.name] = e.error_list
1523 if errors:
1524 raise ValidationError(errors)
1526 @classmethod
1527 def check(cls, **kwargs):
1528 errors = [
1529 *cls._check_swappable(),
1530 *cls._check_model(),
1531 *cls._check_managers(**kwargs),
1532 ]
1533 if not cls._meta.swapped:
1534 databases = kwargs.get("databases") or []
1535 errors += [
1536 *cls._check_fields(**kwargs),
1537 *cls._check_m2m_through_same_relationship(),
1538 *cls._check_long_column_names(databases),
1539 ]
1540 clash_errors = (
1541 *cls._check_id_field(),
1542 *cls._check_field_name_clashes(),
1543 *cls._check_model_name_db_lookup_clashes(),
1544 *cls._check_property_name_related_field_accessor_clashes(),
1545 *cls._check_single_primary_key(),
1546 )
1547 errors.extend(clash_errors)
1548 # If there are field name clashes, hide consequent column name
1549 # clashes.
1550 if not clash_errors:
1551 errors.extend(cls._check_column_name_clashes())
1552 errors += [
1553 *cls._check_index_together(),
1554 *cls._check_unique_together(),
1555 *cls._check_indexes(databases),
1556 *cls._check_ordering(),
1557 *cls._check_constraints(databases),
1558 *cls._check_default_pk(),
1559 *cls._check_db_table_comment(databases),
1560 ]
1562 return errors
1564 @classmethod
1565 def _check_default_pk(cls):
1566 if (
1567 not cls._meta.abstract
1568 and cls._meta.pk.auto_created
1569 and
1570 # Inherited PKs are checked in parents models.
1571 not (
1572 isinstance(cls._meta.pk, OneToOneField)
1573 and cls._meta.pk.remote_field.parent_link
1574 )
1575 and not settings.is_overridden("DEFAULT_AUTO_FIELD")
1576 and cls._meta.app_config
1577 and not cls._meta.app_config._is_default_auto_field_overridden
1578 ):
1579 return [
1580 checks.Warning(
1581 f"Auto-created primary key used when not defining a "
1582 f"primary key type, by default "
1583 f"'{settings.DEFAULT_AUTO_FIELD}'.",
1584 hint=(
1585 f"Configure the DEFAULT_AUTO_FIELD setting or the "
1586 f"{cls._meta.app_config.__class__.__qualname__}."
1587 f"default_auto_field attribute to point to a subclass "
1588 f"of AutoField, e.g. 'django.db.models.BigAutoField'."
1589 ),
1590 obj=cls,
1591 id="models.W042",
1592 ),
1593 ]
1594 return []
1596 @classmethod
1597 def _check_db_table_comment(cls, databases):
1598 if not cls._meta.db_table_comment:
1599 return []
1600 errors = []
1601 for db in databases:
1602 if not router.allow_migrate_model(db, cls):
1603 continue
1604 connection = connections[db]
1605 if not (
1606 connection.features.supports_comments
1607 or "supports_comments" in cls._meta.required_db_features
1608 ):
1609 errors.append(
1610 checks.Warning(
1611 f"{connection.display_name} does not support comments on "
1612 f"tables (db_table_comment).",
1613 obj=cls,
1614 id="models.W046",
1615 )
1616 )
1617 return errors
1619 @classmethod
1620 def _check_swappable(cls):
1621 """Check if the swapped model exists."""
1622 errors = []
1623 if cls._meta.swapped:
1624 try:
1625 apps.get_model(cls._meta.swapped)
1626 except ValueError:
1627 errors.append(
1628 checks.Error(
1629 "'%s' is not of the form 'app_label.app_name'."
1630 % cls._meta.swappable,
1631 id="models.E001",
1632 )
1633 )
1634 except LookupError:
1635 app_label, model_name = cls._meta.swapped.split(".")
1636 errors.append(
1637 checks.Error(
1638 "'%s' references '%s.%s', which has not been "
1639 "installed, or is abstract."
1640 % (cls._meta.swappable, app_label, model_name),
1641 id="models.E002",
1642 )
1643 )
1644 return errors
1646 @classmethod
1647 def _check_model(cls):
1648 errors = []
1649 if cls._meta.proxy:
1650 if cls._meta.local_fields or cls._meta.local_many_to_many:
1651 errors.append(
1652 checks.Error(
1653 "Proxy model '%s' contains model fields." % cls.__name__,
1654 id="models.E017",
1655 )
1656 )
1657 return errors
1659 @classmethod
1660 def _check_managers(cls, **kwargs):
1661 """Perform all manager checks."""
1662 errors = []
1663 for manager in cls._meta.managers:
1664 errors.extend(manager.check(**kwargs))
1665 return errors
1667 @classmethod
1668 def _check_fields(cls, **kwargs):
1669 """Perform all field checks."""
1670 errors = []
1671 for field in cls._meta.local_fields:
1672 errors.extend(field.check(**kwargs))
1673 for field in cls._meta.local_many_to_many:
1674 errors.extend(field.check(from_model=cls, **kwargs))
1675 return errors
1677 @classmethod
1678 def _check_m2m_through_same_relationship(cls):
1679 """Check if no relationship model is used by more than one m2m field."""
1681 errors = []
1682 seen_intermediary_signatures = []
1684 fields = cls._meta.local_many_to_many
1686 # Skip when the target model wasn't found.
1687 fields = (f for f in fields if isinstance(f.remote_field.model, ModelBase))
1689 # Skip when the relationship model wasn't found.
1690 fields = (f for f in fields if isinstance(f.remote_field.through, ModelBase))
1692 for f in fields:
1693 signature = (
1694 f.remote_field.model,
1695 cls,
1696 f.remote_field.through,
1697 f.remote_field.through_fields,
1698 )
1699 if signature in seen_intermediary_signatures:
1700 errors.append(
1701 checks.Error(
1702 "The model has two identical many-to-many relations "
1703 "through the intermediate model '%s'."
1704 % f.remote_field.through._meta.label,
1705 obj=cls,
1706 id="models.E003",
1707 )
1708 )
1709 else:
1710 seen_intermediary_signatures.append(signature)
1711 return errors
1713 @classmethod
1714 def _check_id_field(cls):
1715 """Check if `id` field is a primary key."""
1716 fields = [
1717 f for f in cls._meta.local_fields if f.name == "id" and f != cls._meta.pk
1718 ]
1719 # fields is empty or consists of the invalid "id" field
1720 if fields and not fields[0].primary_key and cls._meta.pk.name == "id":
1721 return [
1722 checks.Error(
1723 "'id' can only be used as a field name if the field also "
1724 "sets 'primary_key=True'.",
1725 obj=cls,
1726 id="models.E004",
1727 )
1728 ]
1729 else:
1730 return []
1732 @classmethod
1733 def _check_field_name_clashes(cls):
1734 """Forbid field shadowing in multi-table inheritance."""
1735 errors = []
1736 used_fields = {} # name or attname -> field
1738 # Check that multi-inheritance doesn't cause field name shadowing.
1739 for parent in cls._meta.get_parent_list():
1740 for f in parent._meta.local_fields:
1741 clash = used_fields.get(f.name) or used_fields.get(f.attname) or None
1742 if clash:
1743 errors.append(
1744 checks.Error(
1745 "The field '%s' from parent model "
1746 "'%s' clashes with the field '%s' "
1747 "from parent model '%s'."
1748 % (clash.name, clash.model._meta, f.name, f.model._meta),
1749 obj=cls,
1750 id="models.E005",
1751 )
1752 )
1753 used_fields[f.name] = f
1754 used_fields[f.attname] = f
1756 # Check that fields defined in the model don't clash with fields from
1757 # parents, including auto-generated fields like multi-table inheritance
1758 # child accessors.
1759 for parent in cls._meta.get_parent_list():
1760 for f in parent._meta.get_fields():
1761 if f not in used_fields:
1762 used_fields[f.name] = f
1764 for f in cls._meta.local_fields:
1765 clash = used_fields.get(f.name) or used_fields.get(f.attname) or None
1766 # Note that we may detect clash between user-defined non-unique
1767 # field "id" and automatically added unique field "id", both
1768 # defined at the same model. This special case is considered in
1769 # _check_id_field and here we ignore it.
1770 id_conflict = (
1771 f.name == "id" and clash and clash.name == "id" and clash.model == cls
1772 )
1773 if clash and not id_conflict:
1774 errors.append(
1775 checks.Error(
1776 "The field '%s' clashes with the field '%s' "
1777 "from model '%s'." % (f.name, clash.name, clash.model._meta),
1778 obj=f,
1779 id="models.E006",
1780 )
1781 )
1782 used_fields[f.name] = f
1783 used_fields[f.attname] = f
1785 return errors
1787 @classmethod
1788 def _check_column_name_clashes(cls):
1789 # Store a list of column names which have already been used by other fields.
1790 used_column_names = []
1791 errors = []
1793 for f in cls._meta.local_fields:
1794 _, column_name = f.get_attname_column()
1796 # Ensure the column name is not already in use.
1797 if column_name and column_name in used_column_names:
1798 errors.append(
1799 checks.Error(
1800 "Field '%s' has column name '%s' that is used by "
1801 "another field." % (f.name, column_name),
1802 hint="Specify a 'db_column' for the field.",
1803 obj=cls,
1804 id="models.E007",
1805 )
1806 )
1807 else:
1808 used_column_names.append(column_name)
1810 return errors
1812 @classmethod
1813 def _check_model_name_db_lookup_clashes(cls):
1814 errors = []
1815 model_name = cls.__name__
1816 if model_name.startswith("_") or model_name.endswith("_"):
1817 errors.append(
1818 checks.Error(
1819 "The model name '%s' cannot start or end with an underscore "
1820 "as it collides with the query lookup syntax." % model_name,
1821 obj=cls,
1822 id="models.E023",
1823 )
1824 )
1825 elif LOOKUP_SEP in model_name:
1826 errors.append(
1827 checks.Error(
1828 "The model name '%s' cannot contain double underscores as "
1829 "it collides with the query lookup syntax." % model_name,
1830 obj=cls,
1831 id="models.E024",
1832 )
1833 )
1834 return errors
1836 @classmethod
1837 def _check_property_name_related_field_accessor_clashes(cls):
1838 errors = []
1839 property_names = cls._meta._property_names
1840 related_field_accessors = (
1841 f.get_attname()
1842 for f in cls._meta._get_fields(reverse=False)
1843 if f.is_relation and f.related_model is not None
1844 )
1845 for accessor in related_field_accessors:
1846 if accessor in property_names:
1847 errors.append(
1848 checks.Error(
1849 "The property '%s' clashes with a related field "
1850 "accessor." % accessor,
1851 obj=cls,
1852 id="models.E025",
1853 )
1854 )
1855 return errors
1857 @classmethod
1858 def _check_single_primary_key(cls):
1859 errors = []
1860 if sum(1 for f in cls._meta.local_fields if f.primary_key) > 1:
1861 errors.append(
1862 checks.Error(
1863 "The model cannot have more than one field with "
1864 "'primary_key=True'.",
1865 obj=cls,
1866 id="models.E026",
1867 )
1868 )
1869 return errors
1871 # RemovedInDjango51Warning.
1872 @classmethod
1873 def _check_index_together(cls):
1874 """Check the value of "index_together" option."""
1875 if not isinstance(cls._meta.index_together, (tuple, list)):
1876 return [
1877 checks.Error(
1878 "'index_together' must be a list or tuple.",
1879 obj=cls,
1880 id="models.E008",
1881 )
1882 ]
1884 elif any(
1885 not isinstance(fields, (tuple, list)) for fields in cls._meta.index_together
1886 ):
1887 return [
1888 checks.Error(
1889 "All 'index_together' elements must be lists or tuples.",
1890 obj=cls,
1891 id="models.E009",
1892 )
1893 ]
1895 else:
1896 errors = []
1897 for fields in cls._meta.index_together:
1898 errors.extend(cls._check_local_fields(fields, "index_together"))
1899 return errors
1901 @classmethod
1902 def _check_unique_together(cls):
1903 """Check the value of "unique_together" option."""
1904 if not isinstance(cls._meta.unique_together, (tuple, list)):
1905 return [
1906 checks.Error(
1907 "'unique_together' must be a list or tuple.",
1908 obj=cls,
1909 id="models.E010",
1910 )
1911 ]
1913 elif any(
1914 not isinstance(fields, (tuple, list))
1915 for fields in cls._meta.unique_together
1916 ):
1917 return [
1918 checks.Error(
1919 "All 'unique_together' elements must be lists or tuples.",
1920 obj=cls,
1921 id="models.E011",
1922 )
1923 ]
1925 else:
1926 errors = []
1927 for fields in cls._meta.unique_together:
1928 errors.extend(cls._check_local_fields(fields, "unique_together"))
1929 return errors
1931 @classmethod
1932 def _check_indexes(cls, databases):
1933 """Check fields, names, and conditions of indexes."""
1934 errors = []
1935 references = set()
1936 for index in cls._meta.indexes:
1937 # Index name can't start with an underscore or a number, restricted
1938 # for cross-database compatibility with Oracle.
1939 if index.name[0] == "_" or index.name[0].isdigit():
1940 errors.append(
1941 checks.Error(
1942 "The index name '%s' cannot start with an underscore "
1943 "or a number." % index.name,
1944 obj=cls,
1945 id="models.E033",
1946 ),
1947 )
1948 if len(index.name) > index.max_name_length:
1949 errors.append(
1950 checks.Error(
1951 "The index name '%s' cannot be longer than %d "
1952 "characters." % (index.name, index.max_name_length),
1953 obj=cls,
1954 id="models.E034",
1955 ),
1956 )
1957 if index.contains_expressions:
1958 for expression in index.expressions:
1959 references.update(
1960 ref[0] for ref in cls._get_expr_references(expression)
1961 )
1962 for db in databases:
1963 if not router.allow_migrate_model(db, cls):
1964 continue
1965 connection = connections[db]
1966 if not (
1967 connection.features.supports_partial_indexes
1968 or "supports_partial_indexes" in cls._meta.required_db_features
1969 ) and any(index.condition is not None for index in cls._meta.indexes):
1970 errors.append(
1971 checks.Warning(
1972 "%s does not support indexes with conditions."
1973 % connection.display_name,
1974 hint=(
1975 "Conditions will be ignored. Silence this warning "
1976 "if you don't care about it."
1977 ),
1978 obj=cls,
1979 id="models.W037",
1980 )
1981 )
1982 if not (
1983 connection.features.supports_covering_indexes
1984 or "supports_covering_indexes" in cls._meta.required_db_features
1985 ) and any(index.include for index in cls._meta.indexes):
1986 errors.append(
1987 checks.Warning(
1988 "%s does not support indexes with non-key columns."
1989 % connection.display_name,
1990 hint=(
1991 "Non-key columns will be ignored. Silence this "
1992 "warning if you don't care about it."
1993 ),
1994 obj=cls,
1995 id="models.W040",
1996 )
1997 )
1998 if not (
1999 connection.features.supports_expression_indexes
2000 or "supports_expression_indexes" in cls._meta.required_db_features
2001 ) and any(index.contains_expressions for index in cls._meta.indexes):
2002 errors.append(
2003 checks.Warning(
2004 "%s does not support indexes on expressions."
2005 % connection.display_name,
2006 hint=(
2007 "An index won't be created. Silence this warning "
2008 "if you don't care about it."
2009 ),
2010 obj=cls,
2011 id="models.W043",
2012 )
2013 )
2014 fields = [
2015 field for index in cls._meta.indexes for field, _ in index.fields_orders
2016 ]
2017 fields += [include for index in cls._meta.indexes for include in index.include]
2018 fields += references
2019 errors.extend(cls._check_local_fields(fields, "indexes"))
2020 return errors
2022 @classmethod
2023 def _check_local_fields(cls, fields, option):
2024 from django.db import models
2026 # In order to avoid hitting the relation tree prematurely, we use our
2027 # own fields_map instead of using get_field()
2028 forward_fields_map = {}
2029 for field in cls._meta._get_fields(reverse=False):
2030 forward_fields_map[field.name] = field
2031 if hasattr(field, "attname"):
2032 forward_fields_map[field.attname] = field
2034 errors = []
2035 for field_name in fields:
2036 try:
2037 field = forward_fields_map[field_name]
2038 except KeyError:
2039 errors.append(
2040 checks.Error(
2041 "'%s' refers to the nonexistent field '%s'."
2042 % (
2043 option,
2044 field_name,
2045 ),
2046 obj=cls,
2047 id="models.E012",
2048 )
2049 )
2050 else:
2051 if isinstance(field.remote_field, models.ManyToManyRel):
2052 errors.append(
2053 checks.Error(
2054 "'%s' refers to a ManyToManyField '%s', but "
2055 "ManyToManyFields are not permitted in '%s'."
2056 % (
2057 option,
2058 field_name,
2059 option,
2060 ),
2061 obj=cls,
2062 id="models.E013",
2063 )
2064 )
2065 elif field not in cls._meta.local_fields:
2066 errors.append(
2067 checks.Error(
2068 "'%s' refers to field '%s' which is not local to model "
2069 "'%s'." % (option, field_name, cls._meta.object_name),
2070 hint="This issue may be caused by multi-table inheritance.",
2071 obj=cls,
2072 id="models.E016",
2073 )
2074 )
2075 return errors
2077 @classmethod
2078 def _check_ordering(cls):
2079 """
2080 Check "ordering" option -- is it a list of strings and do all fields
2081 exist?
2082 """
2083 if cls._meta._ordering_clash:
2084 return [
2085 checks.Error(
2086 "'ordering' and 'order_with_respect_to' cannot be used together.",
2087 obj=cls,
2088 id="models.E021",
2089 ),
2090 ]
2092 if cls._meta.order_with_respect_to or not cls._meta.ordering:
2093 return []
2095 if not isinstance(cls._meta.ordering, (list, tuple)):
2096 return [
2097 checks.Error(
2098 "'ordering' must be a tuple or list (even if you want to order by "
2099 "only one field).",
2100 obj=cls,
2101 id="models.E014",
2102 )
2103 ]
2105 errors = []
2106 fields = cls._meta.ordering
2108 # Skip expressions and '?' fields.
2109 fields = (f for f in fields if isinstance(f, str) and f != "?")
2111 # Convert "-field" to "field".
2112 fields = ((f[1:] if f.startswith("-") else f) for f in fields)
2114 # Separate related fields and non-related fields.
2115 _fields = []
2116 related_fields = []
2117 for f in fields:
2118 if LOOKUP_SEP in f:
2119 related_fields.append(f)
2120 else:
2121 _fields.append(f)
2122 fields = _fields
2124 # Check related fields.
2125 for field in related_fields:
2126 _cls = cls
2127 fld = None
2128 for part in field.split(LOOKUP_SEP):
2129 try:
2130 # pk is an alias that won't be found by opts.get_field.
2131 if part == "pk":
2132 fld = _cls._meta.pk
2133 else:
2134 fld = _cls._meta.get_field(part)
2135 if fld.is_relation:
2136 _cls = fld.path_infos[-1].to_opts.model
2137 else:
2138 _cls = None
2139 except (FieldDoesNotExist, AttributeError):
2140 if fld is None or (
2141 fld.get_transform(part) is None and fld.get_lookup(part) is None
2142 ):
2143 errors.append(
2144 checks.Error(
2145 "'ordering' refers to the nonexistent field, "
2146 "related field, or lookup '%s'." % field,
2147 obj=cls,
2148 id="models.E015",
2149 )
2150 )
2152 # Skip ordering on pk. This is always a valid order_by field
2153 # but is an alias and therefore won't be found by opts.get_field.
2154 fields = {f for f in fields if f != "pk"}
2156 # Check for invalid or nonexistent fields in ordering.
2157 invalid_fields = []
2159 # Any field name that is not present in field_names does not exist.
2160 # Also, ordering by m2m fields is not allowed.
2161 opts = cls._meta
2162 valid_fields = set(
2163 chain.from_iterable(
2164 (f.name, f.attname)
2165 if not (f.auto_created and not f.concrete)
2166 else (f.field.related_query_name(),)
2167 for f in chain(opts.fields, opts.related_objects)
2168 )
2169 )
2171 invalid_fields.extend(fields - valid_fields)
2173 for invalid_field in invalid_fields:
2174 errors.append(
2175 checks.Error(
2176 "'ordering' refers to the nonexistent field, related "
2177 "field, or lookup '%s'." % invalid_field,
2178 obj=cls,
2179 id="models.E015",
2180 )
2181 )
2182 return errors
2184 @classmethod
2185 def _check_long_column_names(cls, databases):
2186 """
2187 Check that any auto-generated column names are shorter than the limits
2188 for each database in which the model will be created.
2189 """
2190 if not databases:
2191 return []
2192 errors = []
2193 allowed_len = None
2194 db_alias = None
2196 # Find the minimum max allowed length among all specified db_aliases.
2197 for db in databases:
2198 # skip databases where the model won't be created
2199 if not router.allow_migrate_model(db, cls):
2200 continue
2201 connection = connections[db]
2202 max_name_length = connection.ops.max_name_length()
2203 if max_name_length is None or connection.features.truncates_names:
2204 continue
2205 else:
2206 if allowed_len is None:
2207 allowed_len = max_name_length
2208 db_alias = db
2209 elif max_name_length < allowed_len:
2210 allowed_len = max_name_length
2211 db_alias = db
2213 if allowed_len is None:
2214 return errors
2216 for f in cls._meta.local_fields:
2217 _, column_name = f.get_attname_column()
2219 # Check if auto-generated name for the field is too long
2220 # for the database.
2221 if (
2222 f.db_column is None
2223 and column_name is not None
2224 and len(column_name) > allowed_len
2225 ):
2226 errors.append(
2227 checks.Error(
2228 'Autogenerated column name too long for field "%s". '
2229 'Maximum length is "%s" for database "%s".'
2230 % (column_name, allowed_len, db_alias),
2231 hint="Set the column name manually using 'db_column'.",
2232 obj=cls,
2233 id="models.E018",
2234 )
2235 )
2237 for f in cls._meta.local_many_to_many:
2238 # Skip nonexistent models.
2239 if isinstance(f.remote_field.through, str):
2240 continue
2242 # Check if auto-generated name for the M2M field is too long
2243 # for the database.
2244 for m2m in f.remote_field.through._meta.local_fields:
2245 _, rel_name = m2m.get_attname_column()
2246 if (
2247 m2m.db_column is None
2248 and rel_name is not None
2249 and len(rel_name) > allowed_len
2250 ):
2251 errors.append(
2252 checks.Error(
2253 "Autogenerated column name too long for M2M field "
2254 '"%s". Maximum length is "%s" for database "%s".'
2255 % (rel_name, allowed_len, db_alias),
2256 hint=(
2257 "Use 'through' to create a separate model for "
2258 "M2M and then set column_name using 'db_column'."
2259 ),
2260 obj=cls,
2261 id="models.E019",
2262 )
2263 )
2265 return errors
2267 @classmethod
2268 def _get_expr_references(cls, expr):
2269 if isinstance(expr, Q):
2270 for child in expr.children:
2271 if isinstance(child, tuple):
2272 lookup, value = child
2273 yield tuple(lookup.split(LOOKUP_SEP))
2274 yield from cls._get_expr_references(value)
2275 else:
2276 yield from cls._get_expr_references(child)
2277 elif isinstance(expr, F):
2278 yield tuple(expr.name.split(LOOKUP_SEP))
2279 elif hasattr(expr, "get_source_expressions"):
2280 for src_expr in expr.get_source_expressions():
2281 yield from cls._get_expr_references(src_expr)
2283 @classmethod
2284 def _check_constraints(cls, databases):
2285 errors = []
2286 for db in databases:
2287 if not router.allow_migrate_model(db, cls):
2288 continue
2289 connection = connections[db]
2290 if not (
2291 connection.features.supports_table_check_constraints
2292 or "supports_table_check_constraints" in cls._meta.required_db_features
2293 ) and any(
2294 isinstance(constraint, CheckConstraint)
2295 for constraint in cls._meta.constraints
2296 ):
2297 errors.append(
2298 checks.Warning(
2299 "%s does not support check constraints."
2300 % connection.display_name,
2301 hint=(
2302 "A constraint won't be created. Silence this "
2303 "warning if you don't care about it."
2304 ),
2305 obj=cls,
2306 id="models.W027",
2307 )
2308 )
2309 if not (
2310 connection.features.supports_partial_indexes
2311 or "supports_partial_indexes" in cls._meta.required_db_features
2312 ) and any(
2313 isinstance(constraint, UniqueConstraint)
2314 and constraint.condition is not None
2315 for constraint in cls._meta.constraints
2316 ):
2317 errors.append(
2318 checks.Warning(
2319 "%s does not support unique constraints with "
2320 "conditions." % connection.display_name,
2321 hint=(
2322 "A constraint won't be created. Silence this "
2323 "warning if you don't care about it."
2324 ),
2325 obj=cls,
2326 id="models.W036",
2327 )
2328 )
2329 if not (
2330 connection.features.supports_deferrable_unique_constraints
2331 or "supports_deferrable_unique_constraints"
2332 in cls._meta.required_db_features
2333 ) and any(
2334 isinstance(constraint, UniqueConstraint)
2335 and constraint.deferrable is not None
2336 for constraint in cls._meta.constraints
2337 ):
2338 errors.append(
2339 checks.Warning(
2340 "%s does not support deferrable unique constraints."
2341 % connection.display_name,
2342 hint=(
2343 "A constraint won't be created. Silence this "
2344 "warning if you don't care about it."
2345 ),
2346 obj=cls,
2347 id="models.W038",
2348 )
2349 )
2350 if not (
2351 connection.features.supports_covering_indexes
2352 or "supports_covering_indexes" in cls._meta.required_db_features
2353 ) and any(
2354 isinstance(constraint, UniqueConstraint) and constraint.include
2355 for constraint in cls._meta.constraints
2356 ):
2357 errors.append(
2358 checks.Warning(
2359 "%s does not support unique constraints with non-key "
2360 "columns." % connection.display_name,
2361 hint=(
2362 "A constraint won't be created. Silence this "
2363 "warning if you don't care about it."
2364 ),
2365 obj=cls,
2366 id="models.W039",
2367 )
2368 )
2369 if not (
2370 connection.features.supports_expression_indexes
2371 or "supports_expression_indexes" in cls._meta.required_db_features
2372 ) and any(
2373 isinstance(constraint, UniqueConstraint)
2374 and constraint.contains_expressions
2375 for constraint in cls._meta.constraints
2376 ):
2377 errors.append(
2378 checks.Warning(
2379 "%s does not support unique constraints on "
2380 "expressions." % connection.display_name,
2381 hint=(
2382 "A constraint won't be created. Silence this "
2383 "warning if you don't care about it."
2384 ),
2385 obj=cls,
2386 id="models.W044",
2387 )
2388 )
2389 fields = set(
2390 chain.from_iterable(
2391 (*constraint.fields, *constraint.include)
2392 for constraint in cls._meta.constraints
2393 if isinstance(constraint, UniqueConstraint)
2394 )
2395 )
2396 references = set()
2397 for constraint in cls._meta.constraints:
2398 if isinstance(constraint, UniqueConstraint):
2399 if (
2400 connection.features.supports_partial_indexes
2401 or "supports_partial_indexes"
2402 not in cls._meta.required_db_features
2403 ) and isinstance(constraint.condition, Q):
2404 references.update(
2405 cls._get_expr_references(constraint.condition)
2406 )
2407 if (
2408 connection.features.supports_expression_indexes
2409 or "supports_expression_indexes"
2410 not in cls._meta.required_db_features
2411 ) and constraint.contains_expressions:
2412 for expression in constraint.expressions:
2413 references.update(cls._get_expr_references(expression))
2414 elif isinstance(constraint, CheckConstraint):
2415 if (
2416 connection.features.supports_table_check_constraints
2417 or "supports_table_check_constraints"
2418 not in cls._meta.required_db_features
2419 ):
2420 if isinstance(constraint.check, Q):
2421 references.update(
2422 cls._get_expr_references(constraint.check)
2423 )
2424 if any(
2425 isinstance(expr, RawSQL)
2426 for expr in constraint.check.flatten()
2427 ):
2428 errors.append(
2429 checks.Warning(
2430 f"Check constraint {constraint.name!r} contains "
2431 f"RawSQL() expression and won't be validated "
2432 f"during the model full_clean().",
2433 hint=(
2434 "Silence this warning if you don't care about "
2435 "it."
2436 ),
2437 obj=cls,
2438 id="models.W045",
2439 ),
2440 )
2441 for field_name, *lookups in references:
2442 # pk is an alias that won't be found by opts.get_field.
2443 if field_name != "pk":
2444 fields.add(field_name)
2445 if not lookups:
2446 # If it has no lookups it cannot result in a JOIN.
2447 continue
2448 try:
2449 if field_name == "pk":
2450 field = cls._meta.pk
2451 else:
2452 field = cls._meta.get_field(field_name)
2453 if not field.is_relation or field.many_to_many or field.one_to_many:
2454 continue
2455 except FieldDoesNotExist:
2456 continue
2457 # JOIN must happen at the first lookup.
2458 first_lookup = lookups[0]
2459 if (
2460 hasattr(field, "get_transform")
2461 and hasattr(field, "get_lookup")
2462 and field.get_transform(first_lookup) is None
2463 and field.get_lookup(first_lookup) is None
2464 ):
2465 errors.append(
2466 checks.Error(
2467 "'constraints' refers to the joined field '%s'."
2468 % LOOKUP_SEP.join([field_name] + lookups),
2469 obj=cls,
2470 id="models.E041",
2471 )
2472 )
2473 errors.extend(cls._check_local_fields(fields, "constraints"))
2474 return errors
2477############################################
2478# HELPER FUNCTIONS (CURRIED MODEL METHODS) #
2479############################################
2481# ORDERING METHODS #########################
2484def method_set_order(self, ordered_obj, id_list, using=None):
2485 order_wrt = ordered_obj._meta.order_with_respect_to
2486 filter_args = order_wrt.get_forward_related_filter(self)
2487 ordered_obj.objects.db_manager(using).filter(**filter_args).bulk_update(
2488 [ordered_obj(pk=pk, _order=order) for order, pk in enumerate(id_list)],
2489 ["_order"],
2490 )
2493def method_get_order(self, ordered_obj):
2494 order_wrt = ordered_obj._meta.order_with_respect_to
2495 filter_args = order_wrt.get_forward_related_filter(self)
2496 pk_name = ordered_obj._meta.pk.name
2497 return ordered_obj.objects.filter(**filter_args).values_list(pk_name, flat=True)
2500def make_foreign_order_accessors(model, related_model):
2501 setattr(
2502 related_model,
2503 "get_%s_order" % model.__name__.lower(),
2504 partialmethod(method_get_order, model),
2505 )
2506 setattr(
2507 related_model,
2508 "set_%s_order" % model.__name__.lower(),
2509 partialmethod(method_set_order, model),
2510 )
2513########
2514# MISC #
2515########
2518def model_unpickle(model_id):
2519 """Used to unpickle Model subclasses with deferred fields."""
2520 if isinstance(model_id, tuple):
2521 model = apps.get_model(*model_id)
2522 else:
2523 # Backwards compat - the model was cached directly in earlier versions.
2524 model = model_id
2525 return model.__new__(model)
2528model_unpickle.__safe_for_unpickle__ = True