Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pydantic/_internal/_utils.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

192 statements  

1"""Bucket of reusable internal utilities. 

2 

3This should be reduced as much as possible with functions only used in one place, moved to that place. 

4""" 

5 

6from __future__ import annotations as _annotations 

7 

8import dataclasses 

9import keyword 

10import sys 

11import warnings 

12import weakref 

13from collections import OrderedDict, defaultdict, deque 

14from collections.abc import Callable, Iterable, Mapping 

15from collections.abc import Set as AbstractSet 

16from copy import deepcopy 

17from functools import cached_property 

18from inspect import Parameter 

19from itertools import zip_longest 

20from types import BuiltinFunctionType, CodeType, FunctionType, GeneratorType, LambdaType, ModuleType 

21from typing import TYPE_CHECKING, Any, Generic, TypeVar, overload 

22 

23from pydantic_core import MISSING 

24from typing_extensions import TypeAlias, TypeGuard, deprecated 

25 

26from pydantic import PydanticDeprecatedSince211 

27 

28from . import _repr, _typing_extra 

29from ._import_utils import import_cached_base_model 

30 

31if TYPE_CHECKING: 

32 # TODO remove type error comments when we drop support for Python 3.9 

33 MappingIntStrAny: TypeAlias = Mapping[int, Any] | Mapping[str, Any] # pyright: ignore[reportGeneralTypeIssues] 

34 AbstractSetIntStr: TypeAlias = AbstractSet[int] | AbstractSet[str] # pyright: ignore[reportGeneralTypeIssues] 

35 from ..main import BaseModel 

36 

37 

38# these are types that are returned unchanged by deepcopy 

39IMMUTABLE_NON_COLLECTIONS_TYPES: set[type[Any]] = { 

40 int, 

41 float, 

42 complex, 

43 str, 

44 bool, 

45 bytes, 

46 type, 

47 _typing_extra.NoneType, 

48 FunctionType, 

49 BuiltinFunctionType, 

50 LambdaType, 

51 weakref.ref, 

52 CodeType, 

53 # note: including ModuleType will differ from behaviour of deepcopy by not producing error. 

54 # It might be not a good idea in general, but considering that this function used only internally 

55 # against default values of fields, this will allow to actually have a field with module as default value 

56 ModuleType, 

57 NotImplemented.__class__, 

58 Ellipsis.__class__, 

59} 

60 

61# these are types that if empty, might be copied with simple copy() instead of deepcopy() 

62BUILTIN_COLLECTIONS: set[type[Any]] = { 

63 list, 

64 set, 

65 tuple, 

66 frozenset, 

67 dict, 

68 OrderedDict, 

69 defaultdict, 

70 deque, 

71} 

72 

73 

74def can_be_positional(param: Parameter) -> bool: 

75 """Return whether the parameter accepts a positional argument. 

76 

77 ```python {test="skip" lint="skip"} 

78 def func(a, /, b, *, c): 

79 pass 

80 

81 params = inspect.signature(func).parameters 

82 can_be_positional(params['a']) 

83 #> True 

84 can_be_positional(params['b']) 

85 #> True 

86 can_be_positional(params['c']) 

87 #> False 

88 ``` 

89 """ 

90 return param.kind in (Parameter.POSITIONAL_ONLY, Parameter.POSITIONAL_OR_KEYWORD) 

91 

92 

93def sequence_like(v: Any) -> bool: 

94 return isinstance(v, (list, tuple, set, frozenset, GeneratorType, deque)) 

95 

96 

97def lenient_isinstance(o: Any, class_or_tuple: type[Any] | tuple[type[Any], ...] | None) -> bool: # pragma: no cover 

98 try: 

99 return isinstance(o, class_or_tuple) # type: ignore[arg-type] 

100 except TypeError: 

101 return False 

102 

103 

104def lenient_issubclass(cls: Any, class_or_tuple: Any) -> bool: # pragma: no cover 

105 try: 

106 return isinstance(cls, type) and issubclass(cls, class_or_tuple) 

107 except TypeError: 

108 if isinstance(cls, _typing_extra.WithArgsTypes): 

109 return False 

110 raise # pragma: no cover 

111 

112 

113def is_model_class(cls: Any) -> TypeGuard[type[BaseModel]]: 

114 """Returns true if cls is a _proper_ subclass of BaseModel, and provides proper type-checking, 

115 unlike raw calls to lenient_issubclass. 

116 """ 

117 BaseModel = import_cached_base_model() 

118 

119 return lenient_issubclass(cls, BaseModel) and cls is not BaseModel 

120 

121 

122def is_valid_identifier(identifier: str) -> bool: 

123 """Checks that a string is a valid identifier and not a Python keyword. 

124 :param identifier: The identifier to test. 

125 :return: True if the identifier is valid. 

126 """ 

127 return identifier.isidentifier() and not keyword.iskeyword(identifier) 

128 

129 

130KeyType = TypeVar('KeyType') 

131 

132 

133def deep_update(mapping: dict[KeyType, Any], *updating_mappings: dict[KeyType, Any]) -> dict[KeyType, Any]: 

134 updated_mapping = mapping.copy() 

135 for updating_mapping in updating_mappings: 

136 for k, v in updating_mapping.items(): 

137 if k in updated_mapping and isinstance(updated_mapping[k], dict) and isinstance(v, dict): 

138 updated_mapping[k] = deep_update(updated_mapping[k], v) 

139 else: 

140 updated_mapping[k] = v 

141 return updated_mapping 

142 

143 

144def update_not_none(mapping: dict[Any, Any], **update: Any) -> None: 

145 mapping.update({k: v for k, v in update.items() if v is not None}) 

146 

147 

148T = TypeVar('T') 

149 

150 

151def unique_list( 

152 input_list: list[T] | tuple[T, ...], 

153 *, 

154 name_factory: Callable[[T], str] = str, 

155) -> list[T]: 

156 """Make a list unique while maintaining order. 

157 We update the list if another one with the same name is set 

158 (e.g. model validator overridden in subclass). 

159 """ 

160 result: list[T] = [] 

161 result_names: list[str] = [] 

162 for v in input_list: 

163 v_name = name_factory(v) 

164 if v_name not in result_names: 

165 result_names.append(v_name) 

166 result.append(v) 

167 else: 

168 result[result_names.index(v_name)] = v 

169 

170 return result 

171 

172 

173class ValueItems(_repr.Representation): 

174 """Class for more convenient calculation of excluded or included fields on values.""" 

175 

176 __slots__ = ('_items', '_type') 

177 

178 def __init__(self, value: Any, items: AbstractSetIntStr | MappingIntStrAny) -> None: 

179 items = self._coerce_items(items) 

180 

181 if isinstance(value, (list, tuple)): 

182 items = self._normalize_indexes(items, len(value)) # type: ignore 

183 

184 self._items: MappingIntStrAny = items # type: ignore 

185 

186 def is_excluded(self, item: Any) -> bool: 

187 """Check if item is fully excluded. 

188 

189 :param item: key or index of a value 

190 """ 

191 return self.is_true(self._items.get(item)) 

192 

193 def is_included(self, item: Any) -> bool: 

194 """Check if value is contained in self._items. 

195 

196 :param item: key or index of value 

197 """ 

198 return item in self._items 

199 

200 def for_element(self, e: int | str) -> AbstractSetIntStr | MappingIntStrAny | None: 

201 """:param e: key or index of element on value 

202 :return: raw values for element if self._items is dict and contain needed element 

203 """ 

204 item = self._items.get(e) # type: ignore 

205 return item if not self.is_true(item) else None 

206 

207 def _normalize_indexes(self, items: MappingIntStrAny, v_length: int) -> dict[int | str, Any]: 

208 """:param items: dict or set of indexes which will be normalized 

209 :param v_length: length of sequence indexes of which will be 

210 

211 >>> self._normalize_indexes({0: True, -2: True, -1: True}, 4) 

212 {0: True, 2: True, 3: True} 

213 >>> self._normalize_indexes({'__all__': True}, 4) 

214 {0: True, 1: True, 2: True, 3: True} 

215 """ 

216 normalized_items: dict[int | str, Any] = {} 

217 all_items = None 

218 for i, v in items.items(): 

219 if not (isinstance(v, Mapping) or isinstance(v, AbstractSet) or self.is_true(v)): 

220 raise TypeError(f'Unexpected type of exclude value for index "{i}" {v.__class__}') 

221 if i == '__all__': 

222 all_items = self._coerce_value(v) 

223 continue 

224 if not isinstance(i, int): 

225 raise TypeError( 

226 'Excluding fields from a sequence of sub-models or dicts must be performed index-wise: ' 

227 'expected integer keys or keyword "__all__"' 

228 ) 

229 normalized_i = v_length + i if i < 0 else i 

230 normalized_items[normalized_i] = self.merge(v, normalized_items.get(normalized_i)) 

231 

232 if not all_items: 

233 return normalized_items 

234 if self.is_true(all_items): 

235 for i in range(v_length): 

236 normalized_items.setdefault(i, ...) 

237 return normalized_items 

238 for i in range(v_length): 

239 normalized_item = normalized_items.setdefault(i, {}) 

240 if not self.is_true(normalized_item): 

241 normalized_items[i] = self.merge(all_items, normalized_item) 

242 return normalized_items 

243 

244 @classmethod 

245 def merge(cls, base: Any, override: Any, intersect: bool = False) -> Any: 

246 """Merge a `base` item with an `override` item. 

247 

248 Both `base` and `override` are converted to dictionaries if possible. 

249 Sets are converted to dictionaries with the sets entries as keys and 

250 Ellipsis as values. 

251 

252 Each key-value pair existing in `base` is merged with `override`, 

253 while the rest of the key-value pairs are updated recursively with this function. 

254 

255 Merging takes place based on the "union" of keys if `intersect` is 

256 set to `False` (default) and on the intersection of keys if 

257 `intersect` is set to `True`. 

258 """ 

259 override = cls._coerce_value(override) 

260 base = cls._coerce_value(base) 

261 if override is None: 

262 return base 

263 if cls.is_true(base) or base is None: 

264 return override 

265 if cls.is_true(override): 

266 return base if intersect else override 

267 

268 # intersection or union of keys while preserving ordering: 

269 if intersect: 

270 merge_keys = [k for k in base if k in override] + [k for k in override if k in base] 

271 else: 

272 merge_keys = list(base) + [k for k in override if k not in base] 

273 

274 merged: dict[int | str, Any] = {} 

275 for k in merge_keys: 

276 merged_item = cls.merge(base.get(k), override.get(k), intersect=intersect) 

277 if merged_item is not None: 

278 merged[k] = merged_item 

279 

280 return merged 

281 

282 @staticmethod 

283 def _coerce_items(items: AbstractSetIntStr | MappingIntStrAny) -> MappingIntStrAny: 

284 if isinstance(items, Mapping): 

285 pass 

286 elif isinstance(items, AbstractSet): 

287 items = dict.fromkeys(items, ...) # type: ignore 

288 else: 

289 class_name = getattr(items, '__class__', '???') 

290 raise TypeError(f'Unexpected type of exclude value {class_name}') 

291 return items # type: ignore 

292 

293 @classmethod 

294 def _coerce_value(cls, value: Any) -> Any: 

295 if value is None or cls.is_true(value): 

296 return value 

297 return cls._coerce_items(value) 

298 

299 @staticmethod 

300 def is_true(v: Any) -> bool: 

301 return v is True or v is ... 

302 

303 def __repr_args__(self) -> _repr.ReprArgs: 

304 return [(None, self._items)] 

305 

306 

307if TYPE_CHECKING: 

308 

309 def LazyClassAttribute(name: str, get_value: Callable[[], T]) -> T: ... 

310 

311else: 

312 

313 class LazyClassAttribute: 

314 """A descriptor exposing an attribute only accessible on a class (hidden from instances). 

315 

316 The attribute is lazily computed and cached during the first access. 

317 """ 

318 

319 def __init__(self, name: str, get_value: Callable[[], Any]) -> None: 

320 self.name = name 

321 self.get_value = get_value 

322 

323 @cached_property 

324 def value(self) -> Any: 

325 return self.get_value() 

326 

327 def __get__(self, instance: Any, owner: type[Any]) -> None: 

328 if instance is None: 

329 return self.value 

330 raise AttributeError(f'{self.name!r} attribute of {owner.__name__!r} is class-only') 

331 

332 

333Obj = TypeVar('Obj') 

334 

335 

336def smart_deepcopy(obj: Obj) -> Obj: 

337 """Return type as is for immutable built-in types 

338 Use obj.copy() for built-in empty collections 

339 Use copy.deepcopy() for non-empty collections and unknown objects. 

340 """ 

341 if obj is MISSING: 

342 return obj # pyright: ignore[reportReturnType] 

343 obj_type = obj.__class__ 

344 if obj_type in IMMUTABLE_NON_COLLECTIONS_TYPES: 

345 return obj # fastest case: obj is immutable and not collection therefore will not be copied anyway 

346 try: 

347 if not obj and obj_type in BUILTIN_COLLECTIONS: 

348 # faster way for empty collections, no need to copy its members 

349 return obj if obj_type is tuple else obj.copy() # tuple doesn't have copy method # type: ignore 

350 except (TypeError, ValueError, RuntimeError): 

351 # do we really dare to catch ALL errors? Seems a bit risky 

352 pass 

353 

354 return deepcopy(obj) # slowest way when we actually might need a deepcopy 

355 

356 

357_SENTINEL = object() 

358 

359 

360def all_identical(left: Iterable[Any], right: Iterable[Any]) -> bool: 

361 """Check that the items of `left` are the same objects as those in `right`. 

362 

363 >>> a, b = object(), object() 

364 >>> all_identical([a, b, a], [a, b, a]) 

365 True 

366 >>> all_identical([a, b, [a]], [a, b, [a]]) # new list object, while "equal" is not "identical" 

367 False 

368 """ 

369 for left_item, right_item in zip_longest(left, right, fillvalue=_SENTINEL): 

370 if left_item is not right_item: 

371 return False 

372 return True 

373 

374 

375def get_first_not_none(a: Any, b: Any) -> Any: 

376 """Return the first argument if it is not `None`, otherwise return the second argument.""" 

377 return a if a is not None else b 

378 

379 

380@dataclasses.dataclass(frozen=True) 

381class SafeGetItemProxy: 

382 """Wrapper redirecting `__getitem__` to `get` with a sentinel value as default 

383 

384 This makes is safe to use in `operator.itemgetter` when some keys may be missing 

385 """ 

386 

387 # Define __slots__manually for performances 

388 # @dataclasses.dataclass() only support slots=True in python>=3.10 

389 __slots__ = ('wrapped',) 

390 

391 wrapped: Mapping[str, Any] 

392 

393 def __getitem__(self, key: str, /) -> Any: 

394 return self.wrapped.get(key, _SENTINEL) 

395 

396 # required to pass the object to operator.itemgetter() instances due to a quirk of typeshed 

397 # https://github.com/python/mypy/issues/13713 

398 # https://github.com/python/typeshed/pull/8785 

399 # Since this is typing-only, hide it in a typing.TYPE_CHECKING block 

400 if TYPE_CHECKING: 

401 

402 def __contains__(self, key: str, /) -> bool: 

403 return self.wrapped.__contains__(key) 

404 

405 

406_ModelT = TypeVar('_ModelT', bound='BaseModel') 

407_RT = TypeVar('_RT') 

408 

409 

410class deprecated_instance_property(Generic[_ModelT, _RT]): 

411 """A decorator exposing the decorated class method as a property, with a warning on instance access. 

412 

413 This decorator takes a class method defined on the `BaseModel` class and transforms it into 

414 an attribute. The attribute can be accessed on both the class and instances of the class. If accessed 

415 via an instance, a deprecation warning is emitted stating that instance access will be removed in V3. 

416 """ 

417 

418 def __init__(self, fget: Callable[[type[_ModelT]], _RT], /) -> None: 

419 # Note: fget should be a classmethod: 

420 self.fget = fget 

421 

422 @overload 

423 def __get__(self, instance: None, objtype: type[_ModelT]) -> _RT: ... 

424 @overload 

425 @deprecated( 

426 'Accessing this attribute on the instance is deprecated, and will be removed in Pydantic V3. ' 

427 'Instead, you should access this attribute from the model class.', 

428 category=None, 

429 ) 

430 def __get__(self, instance: _ModelT, objtype: type[_ModelT]) -> _RT: ... 

431 def __get__(self, instance: _ModelT | None, objtype: type[_ModelT]) -> _RT: 

432 if instance is not None: 

433 # fmt: off 

434 attr_name = ( 

435 self.fget.__name__ 

436 if sys.version_info >= (3, 10) 

437 else self.fget.__func__.__name__ # pyright: ignore[reportFunctionMemberAccess] 

438 ) 

439 # fmt: on 

440 warnings.warn( 

441 f'Accessing the {attr_name!r} attribute on the instance is deprecated. ' 

442 'Instead, you should access this attribute from the model class.', 

443 category=PydanticDeprecatedSince211, 

444 stacklevel=2, 

445 ) 

446 return self.fget.__get__(instance, objtype)()