1"""Bucket of reusable internal utilities. 
    2 
    3This should be reduced as much as possible with functions only used in one place, moved to that place. 
    4""" 
    5 
    6from __future__ import annotations as _annotations 
    7 
    8import dataclasses 
    9import keyword 
    10import sys 
    11import warnings 
    12import weakref 
    13from collections import OrderedDict, defaultdict, deque 
    14from collections.abc import Callable, Iterable, Mapping 
    15from collections.abc import Set as AbstractSet 
    16from copy import deepcopy 
    17from functools import cached_property 
    18from inspect import Parameter 
    19from itertools import zip_longest 
    20from types import BuiltinFunctionType, CodeType, FunctionType, GeneratorType, LambdaType, ModuleType 
    21from typing import TYPE_CHECKING, Any, Generic, TypeVar, overload 
    22 
    23from typing_extensions import TypeAlias, TypeGuard, deprecated 
    24 
    25from pydantic import PydanticDeprecatedSince211 
    26 
    27from . import _repr, _typing_extra 
    28from ._import_utils import import_cached_base_model 
    29 
    30if TYPE_CHECKING: 
    31    # TODO remove type error comments when we drop support for Python 3.9 
    32    MappingIntStrAny: TypeAlias = Mapping[int, Any] | Mapping[str, Any]  # pyright: ignore[reportGeneralTypeIssues] 
    33    AbstractSetIntStr: TypeAlias = AbstractSet[int] | AbstractSet[str]  # pyright: ignore[reportGeneralTypeIssues] 
    34    from ..main import BaseModel 
    35 
    36 
    37# these are types that are returned unchanged by deepcopy 
    38IMMUTABLE_NON_COLLECTIONS_TYPES: set[type[Any]] = { 
    39    int, 
    40    float, 
    41    complex, 
    42    str, 
    43    bool, 
    44    bytes, 
    45    type, 
    46    _typing_extra.NoneType, 
    47    FunctionType, 
    48    BuiltinFunctionType, 
    49    LambdaType, 
    50    weakref.ref, 
    51    CodeType, 
    52    # note: including ModuleType will differ from behaviour of deepcopy by not producing error. 
    53    # It might be not a good idea in general, but considering that this function used only internally 
    54    # against default values of fields, this will allow to actually have a field with module as default value 
    55    ModuleType, 
    56    NotImplemented.__class__, 
    57    Ellipsis.__class__, 
    58} 
    59 
    60# these are types that if empty, might be copied with simple copy() instead of deepcopy() 
    61BUILTIN_COLLECTIONS: set[type[Any]] = { 
    62    list, 
    63    set, 
    64    tuple, 
    65    frozenset, 
    66    dict, 
    67    OrderedDict, 
    68    defaultdict, 
    69    deque, 
    70} 
    71 
    72 
    73def can_be_positional(param: Parameter) -> bool: 
    74    """Return whether the parameter accepts a positional argument. 
    75 
    76    ```python {test="skip" lint="skip"} 
    77    def func(a, /, b, *, c): 
    78        pass 
    79 
    80    params = inspect.signature(func).parameters 
    81    can_be_positional(params['a']) 
    82    #> True 
    83    can_be_positional(params['b']) 
    84    #> True 
    85    can_be_positional(params['c']) 
    86    #> False 
    87    ``` 
    88    """ 
    89    return param.kind in (Parameter.POSITIONAL_ONLY, Parameter.POSITIONAL_OR_KEYWORD) 
    90 
    91 
    92def sequence_like(v: Any) -> bool: 
    93    return isinstance(v, (list, tuple, set, frozenset, GeneratorType, deque)) 
    94 
    95 
    96def lenient_isinstance(o: Any, class_or_tuple: type[Any] | tuple[type[Any], ...] | None) -> bool:  # pragma: no cover 
    97    try: 
    98        return isinstance(o, class_or_tuple)  # type: ignore[arg-type] 
    99    except TypeError: 
    100        return False 
    101 
    102 
    103def lenient_issubclass(cls: Any, class_or_tuple: Any) -> bool:  # pragma: no cover 
    104    try: 
    105        return isinstance(cls, type) and issubclass(cls, class_or_tuple) 
    106    except TypeError: 
    107        if isinstance(cls, _typing_extra.WithArgsTypes): 
    108            return False 
    109        raise  # pragma: no cover 
    110 
    111 
    112def is_model_class(cls: Any) -> TypeGuard[type[BaseModel]]: 
    113    """Returns true if cls is a _proper_ subclass of BaseModel, and provides proper type-checking, 
    114    unlike raw calls to lenient_issubclass. 
    115    """ 
    116    BaseModel = import_cached_base_model() 
    117 
    118    return lenient_issubclass(cls, BaseModel) and cls is not BaseModel 
    119 
    120 
    121def is_valid_identifier(identifier: str) -> bool: 
    122    """Checks that a string is a valid identifier and not a Python keyword. 
    123    :param identifier: The identifier to test. 
    124    :return: True if the identifier is valid. 
    125    """ 
    126    return identifier.isidentifier() and not keyword.iskeyword(identifier) 
    127 
    128 
    129KeyType = TypeVar('KeyType') 
    130 
    131 
    132def deep_update(mapping: dict[KeyType, Any], *updating_mappings: dict[KeyType, Any]) -> dict[KeyType, Any]: 
    133    updated_mapping = mapping.copy() 
    134    for updating_mapping in updating_mappings: 
    135        for k, v in updating_mapping.items(): 
    136            if k in updated_mapping and isinstance(updated_mapping[k], dict) and isinstance(v, dict): 
    137                updated_mapping[k] = deep_update(updated_mapping[k], v) 
    138            else: 
    139                updated_mapping[k] = v 
    140    return updated_mapping 
    141 
    142 
    143def update_not_none(mapping: dict[Any, Any], **update: Any) -> None: 
    144    mapping.update({k: v for k, v in update.items() if v is not None}) 
    145 
    146 
    147T = TypeVar('T') 
    148 
    149 
    150def unique_list( 
    151    input_list: list[T] | tuple[T, ...], 
    152    *, 
    153    name_factory: Callable[[T], str] = str, 
    154) -> list[T]: 
    155    """Make a list unique while maintaining order. 
    156    We update the list if another one with the same name is set 
    157    (e.g. model validator overridden in subclass). 
    158    """ 
    159    result: list[T] = [] 
    160    result_names: list[str] = [] 
    161    for v in input_list: 
    162        v_name = name_factory(v) 
    163        if v_name not in result_names: 
    164            result_names.append(v_name) 
    165            result.append(v) 
    166        else: 
    167            result[result_names.index(v_name)] = v 
    168 
    169    return result 
    170 
    171 
    172class ValueItems(_repr.Representation): 
    173    """Class for more convenient calculation of excluded or included fields on values.""" 
    174 
    175    __slots__ = ('_items', '_type') 
    176 
    177    def __init__(self, value: Any, items: AbstractSetIntStr | MappingIntStrAny) -> None: 
    178        items = self._coerce_items(items) 
    179 
    180        if isinstance(value, (list, tuple)): 
    181            items = self._normalize_indexes(items, len(value))  # type: ignore 
    182 
    183        self._items: MappingIntStrAny = items  # type: ignore 
    184 
    185    def is_excluded(self, item: Any) -> bool: 
    186        """Check if item is fully excluded. 
    187 
    188        :param item: key or index of a value 
    189        """ 
    190        return self.is_true(self._items.get(item)) 
    191 
    192    def is_included(self, item: Any) -> bool: 
    193        """Check if value is contained in self._items. 
    194 
    195        :param item: key or index of value 
    196        """ 
    197        return item in self._items 
    198 
    199    def for_element(self, e: int | str) -> AbstractSetIntStr | MappingIntStrAny | None: 
    200        """:param e: key or index of element on value 
    201        :return: raw values for element if self._items is dict and contain needed element 
    202        """ 
    203        item = self._items.get(e)  # type: ignore 
    204        return item if not self.is_true(item) else None 
    205 
    206    def _normalize_indexes(self, items: MappingIntStrAny, v_length: int) -> dict[int | str, Any]: 
    207        """:param items: dict or set of indexes which will be normalized 
    208        :param v_length: length of sequence indexes of which will be 
    209 
    210        >>> self._normalize_indexes({0: True, -2: True, -1: True}, 4) 
    211        {0: True, 2: True, 3: True} 
    212        >>> self._normalize_indexes({'__all__': True}, 4) 
    213        {0: True, 1: True, 2: True, 3: True} 
    214        """ 
    215        normalized_items: dict[int | str, Any] = {} 
    216        all_items = None 
    217        for i, v in items.items(): 
    218            if not (isinstance(v, Mapping) or isinstance(v, AbstractSet) or self.is_true(v)): 
    219                raise TypeError(f'Unexpected type of exclude value for index "{i}" {v.__class__}') 
    220            if i == '__all__': 
    221                all_items = self._coerce_value(v) 
    222                continue 
    223            if not isinstance(i, int): 
    224                raise TypeError( 
    225                    'Excluding fields from a sequence of sub-models or dicts must be performed index-wise: ' 
    226                    'expected integer keys or keyword "__all__"' 
    227                ) 
    228            normalized_i = v_length + i if i < 0 else i 
    229            normalized_items[normalized_i] = self.merge(v, normalized_items.get(normalized_i)) 
    230 
    231        if not all_items: 
    232            return normalized_items 
    233        if self.is_true(all_items): 
    234            for i in range(v_length): 
    235                normalized_items.setdefault(i, ...) 
    236            return normalized_items 
    237        for i in range(v_length): 
    238            normalized_item = normalized_items.setdefault(i, {}) 
    239            if not self.is_true(normalized_item): 
    240                normalized_items[i] = self.merge(all_items, normalized_item) 
    241        return normalized_items 
    242 
    243    @classmethod 
    244    def merge(cls, base: Any, override: Any, intersect: bool = False) -> Any: 
    245        """Merge a `base` item with an `override` item. 
    246 
    247        Both `base` and `override` are converted to dictionaries if possible. 
    248        Sets are converted to dictionaries with the sets entries as keys and 
    249        Ellipsis as values. 
    250 
    251        Each key-value pair existing in `base` is merged with `override`, 
    252        while the rest of the key-value pairs are updated recursively with this function. 
    253 
    254        Merging takes place based on the "union" of keys if `intersect` is 
    255        set to `False` (default) and on the intersection of keys if 
    256        `intersect` is set to `True`. 
    257        """ 
    258        override = cls._coerce_value(override) 
    259        base = cls._coerce_value(base) 
    260        if override is None: 
    261            return base 
    262        if cls.is_true(base) or base is None: 
    263            return override 
    264        if cls.is_true(override): 
    265            return base if intersect else override 
    266 
    267        # intersection or union of keys while preserving ordering: 
    268        if intersect: 
    269            merge_keys = [k for k in base if k in override] + [k for k in override if k in base] 
    270        else: 
    271            merge_keys = list(base) + [k for k in override if k not in base] 
    272 
    273        merged: dict[int | str, Any] = {} 
    274        for k in merge_keys: 
    275            merged_item = cls.merge(base.get(k), override.get(k), intersect=intersect) 
    276            if merged_item is not None: 
    277                merged[k] = merged_item 
    278 
    279        return merged 
    280 
    281    @staticmethod 
    282    def _coerce_items(items: AbstractSetIntStr | MappingIntStrAny) -> MappingIntStrAny: 
    283        if isinstance(items, Mapping): 
    284            pass 
    285        elif isinstance(items, AbstractSet): 
    286            items = dict.fromkeys(items, ...)  # type: ignore 
    287        else: 
    288            class_name = getattr(items, '__class__', '???') 
    289            raise TypeError(f'Unexpected type of exclude value {class_name}') 
    290        return items  # type: ignore 
    291 
    292    @classmethod 
    293    def _coerce_value(cls, value: Any) -> Any: 
    294        if value is None or cls.is_true(value): 
    295            return value 
    296        return cls._coerce_items(value) 
    297 
    298    @staticmethod 
    299    def is_true(v: Any) -> bool: 
    300        return v is True or v is ... 
    301 
    302    def __repr_args__(self) -> _repr.ReprArgs: 
    303        return [(None, self._items)] 
    304 
    305 
    306if TYPE_CHECKING: 
    307 
    308    def LazyClassAttribute(name: str, get_value: Callable[[], T]) -> T: ... 
    309 
    310else: 
    311 
    312    class LazyClassAttribute: 
    313        """A descriptor exposing an attribute only accessible on a class (hidden from instances). 
    314 
    315        The attribute is lazily computed and cached during the first access. 
    316        """ 
    317 
    318        def __init__(self, name: str, get_value: Callable[[], Any]) -> None: 
    319            self.name = name 
    320            self.get_value = get_value 
    321 
    322        @cached_property 
    323        def value(self) -> Any: 
    324            return self.get_value() 
    325 
    326        def __get__(self, instance: Any, owner: type[Any]) -> None: 
    327            if instance is None: 
    328                return self.value 
    329            raise AttributeError(f'{self.name!r} attribute of {owner.__name__!r} is class-only') 
    330 
    331 
    332Obj = TypeVar('Obj') 
    333 
    334 
    335def smart_deepcopy(obj: Obj) -> Obj: 
    336    """Return type as is for immutable built-in types 
    337    Use obj.copy() for built-in empty collections 
    338    Use copy.deepcopy() for non-empty collections and unknown objects. 
    339    """ 
    340    obj_type = obj.__class__ 
    341    if obj_type in IMMUTABLE_NON_COLLECTIONS_TYPES: 
    342        return obj  # fastest case: obj is immutable and not collection therefore will not be copied anyway 
    343    try: 
    344        if not obj and obj_type in BUILTIN_COLLECTIONS: 
    345            # faster way for empty collections, no need to copy its members 
    346            return obj if obj_type is tuple else obj.copy()  # tuple doesn't have copy method  # type: ignore 
    347    except (TypeError, ValueError, RuntimeError): 
    348        # do we really dare to catch ALL errors? Seems a bit risky 
    349        pass 
    350 
    351    return deepcopy(obj)  # slowest way when we actually might need a deepcopy 
    352 
    353 
    354_SENTINEL = object() 
    355 
    356 
    357def all_identical(left: Iterable[Any], right: Iterable[Any]) -> bool: 
    358    """Check that the items of `left` are the same objects as those in `right`. 
    359 
    360    >>> a, b = object(), object() 
    361    >>> all_identical([a, b, a], [a, b, a]) 
    362    True 
    363    >>> all_identical([a, b, [a]], [a, b, [a]])  # new list object, while "equal" is not "identical" 
    364    False 
    365    """ 
    366    for left_item, right_item in zip_longest(left, right, fillvalue=_SENTINEL): 
    367        if left_item is not right_item: 
    368            return False 
    369    return True 
    370 
    371 
    372def get_first_not_none(a: Any, b: Any) -> Any: 
    373    """Return the first argument if it is not `None`, otherwise return the second argument.""" 
    374    return a if a is not None else b 
    375 
    376 
    377@dataclasses.dataclass(frozen=True) 
    378class SafeGetItemProxy: 
    379    """Wrapper redirecting `__getitem__` to `get` with a sentinel value as default 
    380 
    381    This makes is safe to use in `operator.itemgetter` when some keys may be missing 
    382    """ 
    383 
    384    # Define __slots__manually for performances 
    385    # @dataclasses.dataclass() only support slots=True in python>=3.10 
    386    __slots__ = ('wrapped',) 
    387 
    388    wrapped: Mapping[str, Any] 
    389 
    390    def __getitem__(self, key: str, /) -> Any: 
    391        return self.wrapped.get(key, _SENTINEL) 
    392 
    393    # required to pass the object to operator.itemgetter() instances due to a quirk of typeshed 
    394    # https://github.com/python/mypy/issues/13713 
    395    # https://github.com/python/typeshed/pull/8785 
    396    # Since this is typing-only, hide it in a typing.TYPE_CHECKING block 
    397    if TYPE_CHECKING: 
    398 
    399        def __contains__(self, key: str, /) -> bool: 
    400            return self.wrapped.__contains__(key) 
    401 
    402 
    403_ModelT = TypeVar('_ModelT', bound='BaseModel') 
    404_RT = TypeVar('_RT') 
    405 
    406 
    407class deprecated_instance_property(Generic[_ModelT, _RT]): 
    408    """A decorator exposing the decorated class method as a property, with a warning on instance access. 
    409 
    410    This decorator takes a class method defined on the `BaseModel` class and transforms it into 
    411    an attribute. The attribute can be accessed on both the class and instances of the class. If accessed 
    412    via an instance, a deprecation warning is emitted stating that instance access will be removed in V3. 
    413    """ 
    414 
    415    def __init__(self, fget: Callable[[type[_ModelT]], _RT], /) -> None: 
    416        # Note: fget should be a classmethod: 
    417        self.fget = fget 
    418 
    419    @overload 
    420    def __get__(self, instance: None, objtype: type[_ModelT]) -> _RT: ... 
    421    @overload 
    422    @deprecated( 
    423        'Accessing this attribute on the instance is deprecated, and will be removed in Pydantic V3. ' 
    424        'Instead, you should access this attribute from the model class.', 
    425        category=None, 
    426    ) 
    427    def __get__(self, instance: _ModelT, objtype: type[_ModelT]) -> _RT: ... 
    428    def __get__(self, instance: _ModelT | None, objtype: type[_ModelT]) -> _RT: 
    429        if instance is not None: 
    430            # fmt: off 
    431            attr_name = ( 
    432                self.fget.__name__ 
    433                if sys.version_info >= (3, 10) 
    434                else self.fget.__func__.__name__  # pyright: ignore[reportFunctionMemberAccess] 
    435            ) 
    436            # fmt: on 
    437            warnings.warn( 
    438                f'Accessing the {attr_name!r} attribute on the instance is deprecated. ' 
    439                'Instead, you should access this attribute from the model class.', 
    440                category=PydanticDeprecatedSince211, 
    441                stacklevel=2, 
    442            ) 
    443        return self.fget.__get__(instance, objtype)()