Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/django/db/models/fields/composite.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

114 statements  

1import json 

2 

3from django.core import checks 

4from django.db.models import NOT_PROVIDED, Field 

5from django.db.models.expressions import ColPairs 

6from django.db.models.fields.tuple_lookups import ( 

7 TupleExact, 

8 TupleGreaterThan, 

9 TupleGreaterThanOrEqual, 

10 TupleIn, 

11 TupleIsNull, 

12 TupleLessThan, 

13 TupleLessThanOrEqual, 

14) 

15from django.utils.functional import cached_property 

16 

17 

18class AttributeSetter: 

19 def __init__(self, name, value): 

20 setattr(self, name, value) 

21 

22 

23class CompositeAttribute: 

24 def __init__(self, field): 

25 self.field = field 

26 

27 @property 

28 def attnames(self): 

29 return [field.attname for field in self.field.fields] 

30 

31 def __get__(self, instance, cls=None): 

32 return tuple(getattr(instance, attname) for attname in self.attnames) 

33 

34 def __set__(self, instance, values): 

35 attnames = self.attnames 

36 length = len(attnames) 

37 

38 if values is None: 

39 values = (None,) * length 

40 

41 if not isinstance(values, (list, tuple)): 

42 raise ValueError(f"{self.field.name!r} must be a list or a tuple.") 

43 if length != len(values): 

44 raise ValueError(f"{self.field.name!r} must have {length} elements.") 

45 

46 for attname, value in zip(attnames, values): 

47 setattr(instance, attname, value) 

48 

49 

50class CompositePrimaryKey(Field): 

51 descriptor_class = CompositeAttribute 

52 

53 def __init__(self, *args, **kwargs): 

54 if ( 

55 not args 

56 or not all(isinstance(field, str) for field in args) 

57 or len(set(args)) != len(args) 

58 ): 

59 raise ValueError("CompositePrimaryKey args must be unique strings.") 

60 if len(args) == 1: 

61 raise ValueError("CompositePrimaryKey must include at least two fields.") 

62 if kwargs.get("default", NOT_PROVIDED) is not NOT_PROVIDED: 

63 raise ValueError("CompositePrimaryKey cannot have a default.") 

64 if kwargs.get("db_default", NOT_PROVIDED) is not NOT_PROVIDED: 

65 raise ValueError("CompositePrimaryKey cannot have a database default.") 

66 if kwargs.get("db_column", None) is not None: 

67 raise ValueError("CompositePrimaryKey cannot have a db_column.") 

68 if kwargs.setdefault("editable", False): 

69 raise ValueError("CompositePrimaryKey cannot be editable.") 

70 if not kwargs.setdefault("primary_key", True): 

71 raise ValueError("CompositePrimaryKey must be a primary key.") 

72 if not kwargs.setdefault("blank", True): 

73 raise ValueError("CompositePrimaryKey must be blank.") 

74 

75 self.field_names = args 

76 super().__init__(**kwargs) 

77 

78 def deconstruct(self): 

79 # args is always [] so it can be ignored. 

80 name, path, _, kwargs = super().deconstruct() 

81 return name, path, self.field_names, kwargs 

82 

83 @cached_property 

84 def fields(self): 

85 meta = self.model._meta 

86 return tuple(meta.get_field(field_name) for field_name in self.field_names) 

87 

88 @cached_property 

89 def columns(self): 

90 return tuple(field.column for field in self.fields) 

91 

92 def contribute_to_class(self, cls, name, private_only=False): 

93 super().contribute_to_class(cls, name, private_only=private_only) 

94 cls._meta.pk = self 

95 setattr(cls, self.attname, self.descriptor_class(self)) 

96 

97 def get_attname_column(self): 

98 return self.get_attname(), None 

99 

100 def __iter__(self): 

101 return iter(self.fields) 

102 

103 def __len__(self): 

104 return len(self.field_names) 

105 

106 @cached_property 

107 def cached_col(self): 

108 return ColPairs(self.model._meta.db_table, self.fields, self.fields, self) 

109 

110 def get_col(self, alias, output_field=None): 

111 if alias == self.model._meta.db_table and ( 

112 output_field is None or output_field == self 

113 ): 

114 return self.cached_col 

115 

116 return ColPairs(alias, self.fields, self.fields, output_field) 

117 

118 def get_pk_value_on_save(self, instance): 

119 values = [] 

120 

121 for field in self.fields: 

122 value = field.value_from_object(instance) 

123 if value is None: 

124 value = field.get_pk_value_on_save(instance) 

125 values.append(value) 

126 

127 return tuple(values) 

128 

129 def _check_field_name(self): 

130 if self.name == "pk": 

131 return [] 

132 return [ 

133 checks.Error( 

134 "'CompositePrimaryKey' must be named 'pk'.", 

135 obj=self, 

136 id="fields.E013", 

137 ) 

138 ] 

139 

140 def value_to_string(self, obj): 

141 values = [] 

142 vals = self.value_from_object(obj) 

143 for field, value in zip(self.fields, vals): 

144 obj = AttributeSetter(field.attname, value) 

145 values.append(field.value_to_string(obj)) 

146 return json.dumps(values, ensure_ascii=False) 

147 

148 def to_python(self, value): 

149 if isinstance(value, str): 

150 # Assume we're deserializing. 

151 vals = json.loads(value) 

152 value = [ 

153 field.to_python(val) 

154 for field, val in zip(self.fields, vals, strict=True) 

155 ] 

156 return value 

157 

158 

159CompositePrimaryKey.register_lookup(TupleExact) 

160CompositePrimaryKey.register_lookup(TupleGreaterThan) 

161CompositePrimaryKey.register_lookup(TupleGreaterThanOrEqual) 

162CompositePrimaryKey.register_lookup(TupleLessThan) 

163CompositePrimaryKey.register_lookup(TupleLessThanOrEqual) 

164CompositePrimaryKey.register_lookup(TupleIn) 

165CompositePrimaryKey.register_lookup(TupleIsNull) 

166 

167 

168def unnest(fields): 

169 result = [] 

170 

171 for field in fields: 

172 if isinstance(field, CompositePrimaryKey): 

173 result.extend(field.fields) 

174 else: 

175 result.append(field) 

176 

177 return result