Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pydantic/_internal/_utils.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

189 statements  

1"""Bucket of reusable internal utilities. 

2 

3This should be reduced as much as possible with functions only used in one place, moved to that place. 

4""" 

5 

6from __future__ import annotations as _annotations 

7 

8import dataclasses 

9import keyword 

10import sys 

11import warnings 

12import weakref 

13from collections import OrderedDict, defaultdict, deque 

14from collections.abc import Callable, Iterable, Mapping 

15from collections.abc import Set as AbstractSet 

16from copy import deepcopy 

17from functools import cached_property 

18from inspect import Parameter 

19from itertools import zip_longest 

20from types import BuiltinFunctionType, CodeType, FunctionType, GeneratorType, LambdaType, ModuleType 

21from typing import TYPE_CHECKING, Any, Generic, TypeVar, overload 

22 

23from typing_extensions import TypeAlias, TypeGuard, deprecated 

24 

25from pydantic import PydanticDeprecatedSince211 

26 

27from . import _repr, _typing_extra 

28from ._import_utils import import_cached_base_model 

29 

30if TYPE_CHECKING: 

31 # TODO remove type error comments when we drop support for Python 3.9 

32 MappingIntStrAny: TypeAlias = Mapping[int, Any] | Mapping[str, Any] # pyright: ignore[reportGeneralTypeIssues] 

33 AbstractSetIntStr: TypeAlias = AbstractSet[int] | AbstractSet[str] # pyright: ignore[reportGeneralTypeIssues] 

34 from ..main import BaseModel 

35 

36 

37# these are types that are returned unchanged by deepcopy 

38IMMUTABLE_NON_COLLECTIONS_TYPES: set[type[Any]] = { 

39 int, 

40 float, 

41 complex, 

42 str, 

43 bool, 

44 bytes, 

45 type, 

46 _typing_extra.NoneType, 

47 FunctionType, 

48 BuiltinFunctionType, 

49 LambdaType, 

50 weakref.ref, 

51 CodeType, 

52 # note: including ModuleType will differ from behaviour of deepcopy by not producing error. 

53 # It might be not a good idea in general, but considering that this function used only internally 

54 # against default values of fields, this will allow to actually have a field with module as default value 

55 ModuleType, 

56 NotImplemented.__class__, 

57 Ellipsis.__class__, 

58} 

59 

60# these are types that if empty, might be copied with simple copy() instead of deepcopy() 

61BUILTIN_COLLECTIONS: set[type[Any]] = { 

62 list, 

63 set, 

64 tuple, 

65 frozenset, 

66 dict, 

67 OrderedDict, 

68 defaultdict, 

69 deque, 

70} 

71 

72 

73def can_be_positional(param: Parameter) -> bool: 

74 """Return whether the parameter accepts a positional argument. 

75 

76 ```python {test="skip" lint="skip"} 

77 def func(a, /, b, *, c): 

78 pass 

79 

80 params = inspect.signature(func).parameters 

81 can_be_positional(params['a']) 

82 #> True 

83 can_be_positional(params['b']) 

84 #> True 

85 can_be_positional(params['c']) 

86 #> False 

87 ``` 

88 """ 

89 return param.kind in (Parameter.POSITIONAL_ONLY, Parameter.POSITIONAL_OR_KEYWORD) 

90 

91 

92def sequence_like(v: Any) -> bool: 

93 return isinstance(v, (list, tuple, set, frozenset, GeneratorType, deque)) 

94 

95 

96def lenient_isinstance(o: Any, class_or_tuple: type[Any] | tuple[type[Any], ...] | None) -> bool: # pragma: no cover 

97 try: 

98 return isinstance(o, class_or_tuple) # type: ignore[arg-type] 

99 except TypeError: 

100 return False 

101 

102 

103def lenient_issubclass(cls: Any, class_or_tuple: Any) -> bool: # pragma: no cover 

104 try: 

105 return isinstance(cls, type) and issubclass(cls, class_or_tuple) 

106 except TypeError: 

107 if isinstance(cls, _typing_extra.WithArgsTypes): 

108 return False 

109 raise # pragma: no cover 

110 

111 

112def is_model_class(cls: Any) -> TypeGuard[type[BaseModel]]: 

113 """Returns true if cls is a _proper_ subclass of BaseModel, and provides proper type-checking, 

114 unlike raw calls to lenient_issubclass. 

115 """ 

116 BaseModel = import_cached_base_model() 

117 

118 return lenient_issubclass(cls, BaseModel) and cls is not BaseModel 

119 

120 

121def is_valid_identifier(identifier: str) -> bool: 

122 """Checks that a string is a valid identifier and not a Python keyword. 

123 :param identifier: The identifier to test. 

124 :return: True if the identifier is valid. 

125 """ 

126 return identifier.isidentifier() and not keyword.iskeyword(identifier) 

127 

128 

129KeyType = TypeVar('KeyType') 

130 

131 

132def deep_update(mapping: dict[KeyType, Any], *updating_mappings: dict[KeyType, Any]) -> dict[KeyType, Any]: 

133 updated_mapping = mapping.copy() 

134 for updating_mapping in updating_mappings: 

135 for k, v in updating_mapping.items(): 

136 if k in updated_mapping and isinstance(updated_mapping[k], dict) and isinstance(v, dict): 

137 updated_mapping[k] = deep_update(updated_mapping[k], v) 

138 else: 

139 updated_mapping[k] = v 

140 return updated_mapping 

141 

142 

143def update_not_none(mapping: dict[Any, Any], **update: Any) -> None: 

144 mapping.update({k: v for k, v in update.items() if v is not None}) 

145 

146 

147T = TypeVar('T') 

148 

149 

150def unique_list( 

151 input_list: list[T] | tuple[T, ...], 

152 *, 

153 name_factory: Callable[[T], str] = str, 

154) -> list[T]: 

155 """Make a list unique while maintaining order. 

156 We update the list if another one with the same name is set 

157 (e.g. model validator overridden in subclass). 

158 """ 

159 result: list[T] = [] 

160 result_names: list[str] = [] 

161 for v in input_list: 

162 v_name = name_factory(v) 

163 if v_name not in result_names: 

164 result_names.append(v_name) 

165 result.append(v) 

166 else: 

167 result[result_names.index(v_name)] = v 

168 

169 return result 

170 

171 

172class ValueItems(_repr.Representation): 

173 """Class for more convenient calculation of excluded or included fields on values.""" 

174 

175 __slots__ = ('_items', '_type') 

176 

177 def __init__(self, value: Any, items: AbstractSetIntStr | MappingIntStrAny) -> None: 

178 items = self._coerce_items(items) 

179 

180 if isinstance(value, (list, tuple)): 

181 items = self._normalize_indexes(items, len(value)) # type: ignore 

182 

183 self._items: MappingIntStrAny = items # type: ignore 

184 

185 def is_excluded(self, item: Any) -> bool: 

186 """Check if item is fully excluded. 

187 

188 :param item: key or index of a value 

189 """ 

190 return self.is_true(self._items.get(item)) 

191 

192 def is_included(self, item: Any) -> bool: 

193 """Check if value is contained in self._items. 

194 

195 :param item: key or index of value 

196 """ 

197 return item in self._items 

198 

199 def for_element(self, e: int | str) -> AbstractSetIntStr | MappingIntStrAny | None: 

200 """:param e: key or index of element on value 

201 :return: raw values for element if self._items is dict and contain needed element 

202 """ 

203 item = self._items.get(e) # type: ignore 

204 return item if not self.is_true(item) else None 

205 

206 def _normalize_indexes(self, items: MappingIntStrAny, v_length: int) -> dict[int | str, Any]: 

207 """:param items: dict or set of indexes which will be normalized 

208 :param v_length: length of sequence indexes of which will be 

209 

210 >>> self._normalize_indexes({0: True, -2: True, -1: True}, 4) 

211 {0: True, 2: True, 3: True} 

212 >>> self._normalize_indexes({'__all__': True}, 4) 

213 {0: True, 1: True, 2: True, 3: True} 

214 """ 

215 normalized_items: dict[int | str, Any] = {} 

216 all_items = None 

217 for i, v in items.items(): 

218 if not (isinstance(v, Mapping) or isinstance(v, AbstractSet) or self.is_true(v)): 

219 raise TypeError(f'Unexpected type of exclude value for index "{i}" {v.__class__}') 

220 if i == '__all__': 

221 all_items = self._coerce_value(v) 

222 continue 

223 if not isinstance(i, int): 

224 raise TypeError( 

225 'Excluding fields from a sequence of sub-models or dicts must be performed index-wise: ' 

226 'expected integer keys or keyword "__all__"' 

227 ) 

228 normalized_i = v_length + i if i < 0 else i 

229 normalized_items[normalized_i] = self.merge(v, normalized_items.get(normalized_i)) 

230 

231 if not all_items: 

232 return normalized_items 

233 if self.is_true(all_items): 

234 for i in range(v_length): 

235 normalized_items.setdefault(i, ...) 

236 return normalized_items 

237 for i in range(v_length): 

238 normalized_item = normalized_items.setdefault(i, {}) 

239 if not self.is_true(normalized_item): 

240 normalized_items[i] = self.merge(all_items, normalized_item) 

241 return normalized_items 

242 

243 @classmethod 

244 def merge(cls, base: Any, override: Any, intersect: bool = False) -> Any: 

245 """Merge a `base` item with an `override` item. 

246 

247 Both `base` and `override` are converted to dictionaries if possible. 

248 Sets are converted to dictionaries with the sets entries as keys and 

249 Ellipsis as values. 

250 

251 Each key-value pair existing in `base` is merged with `override`, 

252 while the rest of the key-value pairs are updated recursively with this function. 

253 

254 Merging takes place based on the "union" of keys if `intersect` is 

255 set to `False` (default) and on the intersection of keys if 

256 `intersect` is set to `True`. 

257 """ 

258 override = cls._coerce_value(override) 

259 base = cls._coerce_value(base) 

260 if override is None: 

261 return base 

262 if cls.is_true(base) or base is None: 

263 return override 

264 if cls.is_true(override): 

265 return base if intersect else override 

266 

267 # intersection or union of keys while preserving ordering: 

268 if intersect: 

269 merge_keys = [k for k in base if k in override] + [k for k in override if k in base] 

270 else: 

271 merge_keys = list(base) + [k for k in override if k not in base] 

272 

273 merged: dict[int | str, Any] = {} 

274 for k in merge_keys: 

275 merged_item = cls.merge(base.get(k), override.get(k), intersect=intersect) 

276 if merged_item is not None: 

277 merged[k] = merged_item 

278 

279 return merged 

280 

281 @staticmethod 

282 def _coerce_items(items: AbstractSetIntStr | MappingIntStrAny) -> MappingIntStrAny: 

283 if isinstance(items, Mapping): 

284 pass 

285 elif isinstance(items, AbstractSet): 

286 items = dict.fromkeys(items, ...) # type: ignore 

287 else: 

288 class_name = getattr(items, '__class__', '???') 

289 raise TypeError(f'Unexpected type of exclude value {class_name}') 

290 return items # type: ignore 

291 

292 @classmethod 

293 def _coerce_value(cls, value: Any) -> Any: 

294 if value is None or cls.is_true(value): 

295 return value 

296 return cls._coerce_items(value) 

297 

298 @staticmethod 

299 def is_true(v: Any) -> bool: 

300 return v is True or v is ... 

301 

302 def __repr_args__(self) -> _repr.ReprArgs: 

303 return [(None, self._items)] 

304 

305 

306if TYPE_CHECKING: 

307 

308 def LazyClassAttribute(name: str, get_value: Callable[[], T]) -> T: ... 

309 

310else: 

311 

312 class LazyClassAttribute: 

313 """A descriptor exposing an attribute only accessible on a class (hidden from instances). 

314 

315 The attribute is lazily computed and cached during the first access. 

316 """ 

317 

318 def __init__(self, name: str, get_value: Callable[[], Any]) -> None: 

319 self.name = name 

320 self.get_value = get_value 

321 

322 @cached_property 

323 def value(self) -> Any: 

324 return self.get_value() 

325 

326 def __get__(self, instance: Any, owner: type[Any]) -> None: 

327 if instance is None: 

328 return self.value 

329 raise AttributeError(f'{self.name!r} attribute of {owner.__name__!r} is class-only') 

330 

331 

332Obj = TypeVar('Obj') 

333 

334 

335def smart_deepcopy(obj: Obj) -> Obj: 

336 """Return type as is for immutable built-in types 

337 Use obj.copy() for built-in empty collections 

338 Use copy.deepcopy() for non-empty collections and unknown objects. 

339 """ 

340 obj_type = obj.__class__ 

341 if obj_type in IMMUTABLE_NON_COLLECTIONS_TYPES: 

342 return obj # fastest case: obj is immutable and not collection therefore will not be copied anyway 

343 try: 

344 if not obj and obj_type in BUILTIN_COLLECTIONS: 

345 # faster way for empty collections, no need to copy its members 

346 return obj if obj_type is tuple else obj.copy() # tuple doesn't have copy method # type: ignore 

347 except (TypeError, ValueError, RuntimeError): 

348 # do we really dare to catch ALL errors? Seems a bit risky 

349 pass 

350 

351 return deepcopy(obj) # slowest way when we actually might need a deepcopy 

352 

353 

354_SENTINEL = object() 

355 

356 

357def all_identical(left: Iterable[Any], right: Iterable[Any]) -> bool: 

358 """Check that the items of `left` are the same objects as those in `right`. 

359 

360 >>> a, b = object(), object() 

361 >>> all_identical([a, b, a], [a, b, a]) 

362 True 

363 >>> all_identical([a, b, [a]], [a, b, [a]]) # new list object, while "equal" is not "identical" 

364 False 

365 """ 

366 for left_item, right_item in zip_longest(left, right, fillvalue=_SENTINEL): 

367 if left_item is not right_item: 

368 return False 

369 return True 

370 

371 

372def get_first_not_none(a: Any, b: Any) -> Any: 

373 """Return the first argument if it is not `None`, otherwise return the second argument.""" 

374 return a if a is not None else b 

375 

376 

377@dataclasses.dataclass(frozen=True) 

378class SafeGetItemProxy: 

379 """Wrapper redirecting `__getitem__` to `get` with a sentinel value as default 

380 

381 This makes is safe to use in `operator.itemgetter` when some keys may be missing 

382 """ 

383 

384 # Define __slots__manually for performances 

385 # @dataclasses.dataclass() only support slots=True in python>=3.10 

386 __slots__ = ('wrapped',) 

387 

388 wrapped: Mapping[str, Any] 

389 

390 def __getitem__(self, key: str, /) -> Any: 

391 return self.wrapped.get(key, _SENTINEL) 

392 

393 # required to pass the object to operator.itemgetter() instances due to a quirk of typeshed 

394 # https://github.com/python/mypy/issues/13713 

395 # https://github.com/python/typeshed/pull/8785 

396 # Since this is typing-only, hide it in a typing.TYPE_CHECKING block 

397 if TYPE_CHECKING: 

398 

399 def __contains__(self, key: str, /) -> bool: 

400 return self.wrapped.__contains__(key) 

401 

402 

403_ModelT = TypeVar('_ModelT', bound='BaseModel') 

404_RT = TypeVar('_RT') 

405 

406 

407class deprecated_instance_property(Generic[_ModelT, _RT]): 

408 """A decorator exposing the decorated class method as a property, with a warning on instance access. 

409 

410 This decorator takes a class method defined on the `BaseModel` class and transforms it into 

411 an attribute. The attribute can be accessed on both the class and instances of the class. If accessed 

412 via an instance, a deprecation warning is emitted stating that instance access will be removed in V3. 

413 """ 

414 

415 def __init__(self, fget: Callable[[type[_ModelT]], _RT], /) -> None: 

416 # Note: fget should be a classmethod: 

417 self.fget = fget 

418 

419 @overload 

420 def __get__(self, instance: None, objtype: type[_ModelT]) -> _RT: ... 

421 @overload 

422 @deprecated( 

423 'Accessing this attribute on the instance is deprecated, and will be removed in Pydantic V3. ' 

424 'Instead, you should access this attribute from the model class.', 

425 category=None, 

426 ) 

427 def __get__(self, instance: _ModelT, objtype: type[_ModelT]) -> _RT: ... 

428 def __get__(self, instance: _ModelT | None, objtype: type[_ModelT]) -> _RT: 

429 if instance is not None: 

430 # fmt: off 

431 attr_name = ( 

432 self.fget.__name__ 

433 if sys.version_info >= (3, 10) 

434 else self.fget.__func__.__name__ # pyright: ignore[reportFunctionMemberAccess] 

435 ) 

436 # fmt: on 

437 warnings.warn( 

438 f'Accessing the {attr_name!r} attribute on the instance is deprecated. ' 

439 'Instead, you should access this attribute from the model class.', 

440 category=PydanticDeprecatedSince211, 

441 stacklevel=2, 

442 ) 

443 return self.fget.__get__(instance, objtype)()