Coverage for /pythoncovmergedfiles/medio/medio/src/pydantic/pydantic/_internal/_utils.py: 35%

173 statements  

« prev     ^ index     » next       coverage.py v7.2.3, created at 2023-04-27 07:38 +0000

1""" 

2Bucket of reusable internal utilities. 

3 

4This should be reduced as much as possible with functions only used in one place, moved to that place. 

5""" 

6from __future__ import annotations as _annotations 

7 

8import keyword 

9import typing 

10import weakref 

11from collections import OrderedDict, defaultdict, deque 

12from copy import deepcopy 

13from itertools import zip_longest 

14from types import BuiltinFunctionType, CodeType, FunctionType, GeneratorType, LambdaType, ModuleType 

15from typing import Any, TypeVar 

16 

17from typing_extensions import TypeAlias, TypeGuard 

18 

19from . import _repr, _typing_extra 

20 

21if typing.TYPE_CHECKING: 

22 MappingIntStrAny: TypeAlias = 'typing.Mapping[int, Any] | typing.Mapping[str, Any]' 

23 AbstractSetIntStr: TypeAlias = 'typing.AbstractSet[int] | typing.AbstractSet[str]' 

24 from ..main import BaseModel 

25 

26 

27# these are types that are returned unchanged by deepcopy 

28IMMUTABLE_NON_COLLECTIONS_TYPES: set[type[Any]] = { 

29 int, 

30 float, 

31 complex, 

32 str, 

33 bool, 

34 bytes, 

35 type, 

36 _typing_extra.NoneType, 

37 FunctionType, 

38 BuiltinFunctionType, 

39 LambdaType, 

40 weakref.ref, 

41 CodeType, 

42 # note: including ModuleType will differ from behaviour of deepcopy by not producing error. 

43 # It might be not a good idea in general, but considering that this function used only internally 

44 # against default values of fields, this will allow to actually have a field with module as default value 

45 ModuleType, 

46 NotImplemented.__class__, 

47 Ellipsis.__class__, 

48} 

49 

50# these are types that if empty, might be copied with simple copy() instead of deepcopy() 

51BUILTIN_COLLECTIONS: set[type[Any]] = { 

52 list, 

53 set, 

54 tuple, 

55 frozenset, 

56 dict, 

57 OrderedDict, 

58 defaultdict, 

59 deque, 

60} 

61 

62 

63def sequence_like(v: Any) -> bool: 

64 return isinstance(v, (list, tuple, set, frozenset, GeneratorType, deque)) 

65 

66 

67def lenient_isinstance(o: Any, class_or_tuple: type[Any] | tuple[type[Any], ...] | None) -> bool: 

68 try: 

69 return isinstance(o, class_or_tuple) # type: ignore[arg-type] 

70 except TypeError: 

71 return False 

72 

73 

74def lenient_issubclass(cls: Any, class_or_tuple: Any) -> bool: 

75 try: 

76 return isinstance(cls, type) and issubclass(cls, class_or_tuple) 

77 except TypeError: 

78 if isinstance(cls, _typing_extra.WithArgsTypes): 

79 return False 

80 raise # pragma: no cover 

81 

82 

83def is_basemodel(cls: Any) -> TypeGuard[type[BaseModel]]: 

84 """ 

85 We can remove this function and go back to using lenient_issubclass, but this is nice because it 

86 ensures that we get proper type-checking, which lenient_issubclass doesn't provide. 

87 

88 Would be nice if there was a lenient_issubclass-equivalent in typing_extensions, or otherwise 

89 a way to define such a function that would support proper type-checking; maybe we should bring it up 

90 at the typing summit.. 

91 """ 

92 from ..main import BaseModel 

93 

94 return lenient_issubclass(cls, BaseModel) 

95 

96 

97def is_valid_identifier(identifier: str) -> bool: 

98 """ 

99 Checks that a string is a valid identifier and not a Python keyword. 

100 :param identifier: The identifier to test. 

101 :return: True if the identifier is valid. 

102 """ 

103 return identifier.isidentifier() and not keyword.iskeyword(identifier) 

104 

105 

106KeyType = TypeVar('KeyType') 

107 

108 

109def deep_update(mapping: dict[KeyType, Any], *updating_mappings: dict[KeyType, Any]) -> dict[KeyType, Any]: 

110 updated_mapping = mapping.copy() 

111 for updating_mapping in updating_mappings: 

112 for k, v in updating_mapping.items(): 

113 if k in updated_mapping and isinstance(updated_mapping[k], dict) and isinstance(v, dict): 

114 updated_mapping[k] = deep_update(updated_mapping[k], v) 

115 else: 

116 updated_mapping[k] = v 

117 return updated_mapping 

118 

119 

120def dict_not_none(__pos: dict[str, Any] | None = None, **kwargs: Any) -> dict[str, Any]: 

121 return {k: v for k, v in (__pos or kwargs).items() if v is not None} 

122 

123 

124def update_not_none(mapping: dict[Any, Any], **update: Any) -> None: 

125 mapping.update({k: v for k, v in update.items() if v is not None}) 

126 

127 

128def almost_equal_floats(value_1: float, value_2: float, *, delta: float = 1e-8) -> bool: 

129 """ 

130 Return True if two floats are almost equal 

131 """ 

132 return abs(value_1 - value_2) <= delta 

133 

134 

135def to_camel(string: str) -> str: 

136 return ''.join(word.capitalize() for word in string.split('_')) 

137 

138 

139def to_lower_camel(string: str) -> str: 

140 if len(string) >= 1: 

141 pascal_string = to_camel(string) 

142 return pascal_string[0].lower() + pascal_string[1:] 

143 return string.lower() 

144 

145 

146T = TypeVar('T') 

147 

148 

149def unique_list( 

150 input_list: list[T] | tuple[T, ...], 

151 *, 

152 name_factory: typing.Callable[[T], str] = str, 

153) -> list[T]: 

154 """ 

155 Make a list unique while maintaining order. 

156 We update the list if another one with the same name is set 

157 (e.g. root validator overridden in subclass) 

158 """ 

159 result: list[T] = [] 

160 result_names: list[str] = [] 

161 for v in input_list: 

162 v_name = name_factory(v) 

163 if v_name not in result_names: 

164 result_names.append(v_name) 

165 result.append(v) 

166 else: 

167 result[result_names.index(v_name)] = v 

168 

169 return result 

170 

171 

172class ValueItems(_repr.Representation): 

173 """ 

174 Class for more convenient calculation of excluded or included fields on values. 

175 """ 

176 

177 __slots__ = ('_items', '_type') 

178 

179 def __init__(self, value: Any, items: AbstractSetIntStr | MappingIntStrAny) -> None: 

180 items = self._coerce_items(items) 

181 

182 if isinstance(value, (list, tuple)): 

183 items = self._normalize_indexes(items, len(value)) # type: ignore 

184 

185 self._items: MappingIntStrAny = items # type: ignore 

186 

187 def is_excluded(self, item: Any) -> bool: 

188 """ 

189 Check if item is fully excluded. 

190 

191 :param item: key or index of a value 

192 """ 

193 return self.is_true(self._items.get(item)) 

194 

195 def is_included(self, item: Any) -> bool: 

196 """ 

197 Check if value is contained in self._items 

198 

199 :param item: key or index of value 

200 """ 

201 return item in self._items 

202 

203 def for_element(self, e: int | str) -> AbstractSetIntStr | MappingIntStrAny | None: 

204 """ 

205 :param e: key or index of element on value 

206 :return: raw values for element if self._items is dict and contain needed element 

207 """ 

208 

209 item = self._items.get(e) # type: ignore 

210 return item if not self.is_true(item) else None 

211 

212 def _normalize_indexes(self, items: MappingIntStrAny, v_length: int) -> dict[int | str, Any]: 

213 """ 

214 :param items: dict or set of indexes which will be normalized 

215 :param v_length: length of sequence indexes of which will be 

216 

217 >>> self._normalize_indexes({0: True, -2: True, -1: True}, 4) 

218 {0: True, 2: True, 3: True} 

219 >>> self._normalize_indexes({'__all__': True}, 4) 

220 {0: True, 1: True, 2: True, 3: True} 

221 """ 

222 

223 normalized_items: dict[int | str, Any] = {} 

224 all_items = None 

225 for i, v in items.items(): 

226 if not (isinstance(v, typing.Mapping) or isinstance(v, typing.AbstractSet) or self.is_true(v)): 

227 raise TypeError(f'Unexpected type of exclude value for index "{i}" {v.__class__}') 

228 if i == '__all__': 

229 all_items = self._coerce_value(v) 

230 continue 

231 if not isinstance(i, int): 

232 raise TypeError( 

233 'Excluding fields from a sequence of sub-models or dicts must be performed index-wise: ' 

234 'expected integer keys or keyword "__all__"' 

235 ) 

236 normalized_i = v_length + i if i < 0 else i 

237 normalized_items[normalized_i] = self.merge(v, normalized_items.get(normalized_i)) 

238 

239 if not all_items: 

240 return normalized_items 

241 if self.is_true(all_items): 

242 for i in range(v_length): 

243 normalized_items.setdefault(i, ...) 

244 return normalized_items 

245 for i in range(v_length): 

246 normalized_item = normalized_items.setdefault(i, {}) 

247 if not self.is_true(normalized_item): 

248 normalized_items[i] = self.merge(all_items, normalized_item) 

249 return normalized_items 

250 

251 @classmethod 

252 def merge(cls, base: Any, override: Any, intersect: bool = False) -> Any: 

253 """ 

254 Merge a `base` item with an `override` item. 

255 

256 Both `base` and `override` are converted to dictionaries if possible. 

257 Sets are converted to dictionaries with the sets entries as keys and 

258 Ellipsis as values. 

259 

260 Each key-value pair existing in `base` is merged with `override`, 

261 while the rest of the key-value pairs are updated recursively with this function. 

262 

263 Merging takes place based on the "union" of keys if `intersect` is 

264 set to `False` (default) and on the intersection of keys if 

265 `intersect` is set to `True`. 

266 """ 

267 override = cls._coerce_value(override) 

268 base = cls._coerce_value(base) 

269 if override is None: 

270 return base 

271 if cls.is_true(base) or base is None: 

272 return override 

273 if cls.is_true(override): 

274 return base if intersect else override 

275 

276 # intersection or union of keys while preserving ordering: 

277 if intersect: 

278 merge_keys = [k for k in base if k in override] + [k for k in override if k in base] 

279 else: 

280 merge_keys = list(base) + [k for k in override if k not in base] 

281 

282 merged: dict[int | str, Any] = {} 

283 for k in merge_keys: 

284 merged_item = cls.merge(base.get(k), override.get(k), intersect=intersect) 

285 if merged_item is not None: 

286 merged[k] = merged_item 

287 

288 return merged 

289 

290 @staticmethod 

291 def _coerce_items(items: AbstractSetIntStr | MappingIntStrAny) -> MappingIntStrAny: 

292 if isinstance(items, typing.Mapping): 

293 pass 

294 elif isinstance(items, typing.AbstractSet): # type: ignore 

295 items = dict.fromkeys(items, ...) # type: ignore 

296 else: 

297 class_name = getattr(items, '__class__', '???') 

298 raise TypeError(f'Unexpected type of exclude value {class_name}') 

299 return items # type: ignore 

300 

301 @classmethod 

302 def _coerce_value(cls, value: Any) -> Any: 

303 if value is None or cls.is_true(value): 

304 return value 

305 return cls._coerce_items(value) 

306 

307 @staticmethod 

308 def is_true(v: Any) -> bool: 

309 return v is True or v is ... 

310 

311 def __repr_args__(self) -> _repr.ReprArgs: 

312 return [(None, self._items)] 

313 

314 

315if typing.TYPE_CHECKING: 

316 

317 def ClassAttribute(name: str, value: T) -> T: 

318 ... 

319 

320else: 

321 

322 class ClassAttribute: 

323 """ 

324 Hide class attribute from its instances 

325 """ 

326 

327 __slots__ = 'name', 'value' 

328 

329 def __init__(self, name: str, value: Any) -> None: 

330 self.name = name 

331 self.value = value 

332 

333 def __get__(self, instance: Any, owner: type[Any]) -> None: 

334 if instance is None: 

335 return self.value 

336 raise AttributeError(f'{self.name!r} attribute of {owner.__name__!r} is class-only') 

337 

338 

339Obj = TypeVar('Obj') 

340 

341 

342def smart_deepcopy(obj: Obj) -> Obj: 

343 """ 

344 Return type as is for immutable built-in types 

345 Use obj.copy() for built-in empty collections 

346 Use copy.deepcopy() for non-empty collections and unknown objects 

347 """ 

348 

349 obj_type = obj.__class__ 

350 if obj_type in IMMUTABLE_NON_COLLECTIONS_TYPES: 

351 return obj # fastest case: obj is immutable and not collection therefore will not be copied anyway 

352 try: 

353 if not obj and obj_type in BUILTIN_COLLECTIONS: 

354 # faster way for empty collections, no need to copy its members 

355 return obj if obj_type is tuple else obj.copy() # type: ignore # tuple doesn't have copy method 

356 except (TypeError, ValueError, RuntimeError): 

357 # do we really dare to catch ALL errors? Seems a bit risky 

358 pass 

359 

360 return deepcopy(obj) # slowest way when we actually might need a deepcopy 

361 

362 

363_EMPTY = object() 

364 

365 

366def all_identical(left: typing.Iterable[Any], right: typing.Iterable[Any]) -> bool: 

367 """ 

368 Check that the items of `left` are the same objects as those in `right`. 

369 

370 >>> a, b = object(), object() 

371 >>> all_identical([a, b, a], [a, b, a]) 

372 True 

373 >>> all_identical([a, b, [a]], [a, b, [a]]) # new list object, while "equal" is not "identical" 

374 False 

375 """ 

376 for left_item, right_item in zip_longest(left, right, fillvalue=_EMPTY): 

377 if left_item is not right_item: 

378 return False 

379 return True