1# Copyright 2018 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Utilities for managing / converting field paths to / from strings.""" 
    16from __future__ import annotations 
    17import re 
    18from collections import abc 
    19from typing import Iterable, cast 
    20 
    21_FIELD_PATH_MISSING_TOP = "{!r} is not contained in the data" 
    22_FIELD_PATH_MISSING_KEY = "{!r} is not contained in the data for the key {!r}" 
    23_FIELD_PATH_WRONG_TYPE = ( 
    24    "The data at {!r} is not a dictionary, so it cannot contain the key {!r}" 
    25) 
    26 
    27_FIELD_PATH_DELIMITER = "." 
    28_BACKSLASH = "\\" 
    29_ESCAPED_BACKSLASH = _BACKSLASH * 2 
    30_BACKTICK = "`" 
    31_ESCAPED_BACKTICK = _BACKSLASH + _BACKTICK 
    32 
    33_SIMPLE_FIELD_NAME = re.compile("^[_a-zA-Z][_a-zA-Z0-9]*$") 
    34_LEADING_ALPHA_INVALID = re.compile(r"^[_a-zA-Z][_a-zA-Z0-9]*[~*/\[\]]") 
    35PATH_ELEMENT_TOKENS = [ 
    36    ("SIMPLE", r"[_a-zA-Z][_a-zA-Z0-9]*"),  # unquoted elements 
    37    ("QUOTED", r"`(?:\\`|[^`])*?`"),  # quoted elements, unquoted 
    38    ("DOT", r"\."),  # separator 
    39] 
    40TOKENS_PATTERN = "|".join("(?P<{}>{})".format(*pair) for pair in PATH_ELEMENT_TOKENS) 
    41TOKENS_REGEX = re.compile(TOKENS_PATTERN) 
    42 
    43 
    44def _tokenize_field_path(path: str): 
    45    """Lex a field path into tokens (including dots). 
    46 
    47    Args: 
    48        path (str): field path to be lexed. 
    49    Returns: 
    50        List(str): tokens 
    51    """ 
    52    pos = 0 
    53    get_token = TOKENS_REGEX.match 
    54    match = get_token(path) 
    55    while match is not None: 
    56        type_ = cast(str, match.lastgroup) 
    57        value = match.group(type_) 
    58        yield value 
    59        pos = match.end() 
    60        match = get_token(path, pos) 
    61    if pos != len(path): 
    62        raise ValueError("Path {} not consumed, residue: {}".format(path, path[pos:])) 
    63 
    64 
    65def split_field_path(path: str | None): 
    66    """Split a field path into valid elements (without dots). 
    67 
    68    Args: 
    69        path (str): field path to be lexed. 
    70    Returns: 
    71        List(str): tokens 
    72    Raises: 
    73        ValueError: if the path does not match the elements-interspersed- 
    74                    with-dots pattern. 
    75    """ 
    76    if not path: 
    77        return [] 
    78 
    79    elements = [] 
    80    want_dot = False 
    81 
    82    for element in _tokenize_field_path(path): 
    83        if want_dot: 
    84            if element != ".": 
    85                raise ValueError("Invalid path: {}".format(path)) 
    86            else: 
    87                want_dot = False 
    88        else: 
    89            if element == ".": 
    90                raise ValueError("Invalid path: {}".format(path)) 
    91            elements.append(element) 
    92            want_dot = True 
    93 
    94    if not want_dot or not elements: 
    95        raise ValueError("Invalid path: {}".format(path)) 
    96 
    97    return elements 
    98 
    99 
    100def parse_field_path(api_repr: str): 
    101    """Parse a **field path** from into a list of nested field names. 
    102 
    103    See :func:`field_path` for more on **field paths**. 
    104 
    105    Args: 
    106        api_repr (str): 
    107            The unique Firestore api representation which consists of 
    108            either simple or UTF-8 field names. It cannot exceed 
    109            1500 bytes, and cannot be empty. Simple field names match 
    110            ``'^[_a-zA-Z][_a-zA-Z0-9]*$'``. All other field names are 
    111            escaped by surrounding them with backticks. 
    112 
    113    Returns: 
    114        List[str, ...]: The list of field names in the field path. 
    115    """ 
    116    # code dredged back up from 
    117    # https://github.com/googleapis/google-cloud-python/pull/5109/files 
    118    field_names = [] 
    119    for field_name in split_field_path(api_repr): 
    120        # non-simple field name 
    121        if field_name[0] == "`" and field_name[-1] == "`": 
    122            field_name = field_name[1:-1] 
    123            field_name = field_name.replace(_ESCAPED_BACKTICK, _BACKTICK) 
    124            field_name = field_name.replace(_ESCAPED_BACKSLASH, _BACKSLASH) 
    125        field_names.append(field_name) 
    126    return field_names 
    127 
    128 
    129def render_field_path(field_names: Iterable[str]): 
    130    """Create a **field path** from a list of nested field names. 
    131 
    132    A **field path** is a ``.``-delimited concatenation of the field 
    133    names. It is used to represent a nested field. For example, 
    134    in the data 
    135 
    136    .. code-block:: python 
    137 
    138       data = { 
    139          'aa': { 
    140              'bb': { 
    141                  'cc': 10, 
    142              }, 
    143          }, 
    144       } 
    145 
    146    the field path ``'aa.bb.cc'`` represents that data stored in 
    147    ``data['aa']['bb']['cc']``. 
    148 
    149    Args: 
    150        field_names: The list of field names. 
    151 
    152    Returns: 
    153        str: The ``.``-delimited field path. 
    154    """ 
    155    result = [] 
    156 
    157    for field_name in field_names: 
    158        match = _SIMPLE_FIELD_NAME.match(field_name) 
    159        if match and match.group(0) == field_name: 
    160            result.append(field_name) 
    161        else: 
    162            replaced = field_name.replace(_BACKSLASH, _ESCAPED_BACKSLASH).replace( 
    163                _BACKTICK, _ESCAPED_BACKTICK 
    164            ) 
    165            result.append(_BACKTICK + replaced + _BACKTICK) 
    166 
    167    return _FIELD_PATH_DELIMITER.join(result) 
    168 
    169 
    170get_field_path = render_field_path  # backward-compatibility 
    171 
    172 
    173def get_nested_value(field_path: str, data: dict): 
    174    """Get a (potentially nested) value from a dictionary. 
    175 
    176    If the data is nested, for example: 
    177 
    178    .. code-block:: python 
    179 
    180       >>> data 
    181       { 
    182           'top1': { 
    183               'middle2': { 
    184                   'bottom3': 20, 
    185                   'bottom4': 22, 
    186               }, 
    187               'middle5': True, 
    188           }, 
    189           'top6': b'\x00\x01 foo', 
    190       } 
    191 
    192    a **field path** can be used to access the nested data. For 
    193    example: 
    194 
    195    .. code-block:: python 
    196 
    197       >>> get_nested_value('top1', data) 
    198       { 
    199           'middle2': { 
    200               'bottom3': 20, 
    201               'bottom4': 22, 
    202           }, 
    203           'middle5': True, 
    204       } 
    205       >>> get_nested_value('top1.middle2', data) 
    206       { 
    207           'bottom3': 20, 
    208           'bottom4': 22, 
    209       } 
    210       >>> get_nested_value('top1.middle2.bottom3', data) 
    211       20 
    212 
    213    See :meth:`~google.cloud.firestore_v1.client.Client.field_path` for 
    214    more information on **field paths**. 
    215 
    216    Args: 
    217        field_path (str): A field path (``.``-delimited list of 
    218            field names). 
    219        data (Dict[str, Any]): The (possibly nested) data. 
    220 
    221    Returns: 
    222        Any: (A copy of) the value stored for the ``field_path``. 
    223 
    224    Raises: 
    225        KeyError: If the ``field_path`` does not match nested data. 
    226    """ 
    227    field_names = parse_field_path(field_path) 
    228 
    229    nested_data = data 
    230    for index, field_name in enumerate(field_names): 
    231        if isinstance(nested_data, abc.Mapping): 
    232            if field_name in nested_data: 
    233                nested_data = nested_data[field_name] 
    234            else: 
    235                if index == 0: 
    236                    msg = _FIELD_PATH_MISSING_TOP.format(field_name) 
    237                    raise KeyError(msg) 
    238                else: 
    239                    partial = render_field_path(field_names[:index]) 
    240                    msg = _FIELD_PATH_MISSING_KEY.format(field_name, partial) 
    241                    raise KeyError(msg) 
    242        else: 
    243            partial = render_field_path(field_names[:index]) 
    244            msg = _FIELD_PATH_WRONG_TYPE.format(partial, field_name) 
    245            raise KeyError(msg) 
    246 
    247    return nested_data 
    248 
    249 
    250class FieldPath(object): 
    251    """Field Path object for client use. 
    252 
    253    A field path is a sequence of element keys, separated by periods. 
    254    Each element key can be either a simple identifier, or a full unicode 
    255    string. 
    256 
    257    In the string representation of a field path, non-identifier elements 
    258    must be quoted using backticks, with internal backticks and backslashes 
    259    escaped with a backslash. 
    260 
    261    Args: 
    262        parts: (one or more strings) 
    263            Indicating path of the key to be used. 
    264    """ 
    265 
    266    def __init__(self, *parts: str): 
    267        for part in parts: 
    268            if not isinstance(part, str) or not part: 
    269                error = "One or more components is not a string or is empty." 
    270                raise ValueError(error) 
    271        self.parts = tuple(parts) 
    272 
    273    @classmethod 
    274    def from_api_repr(cls, api_repr: str) -> "FieldPath": 
    275        """Factory: create a FieldPath from the string formatted per the API. 
    276 
    277        Args: 
    278            api_repr (str): a string path, with non-identifier elements quoted 
    279            It cannot exceed 1500 characters, and cannot be empty. 
    280        Returns: 
    281            (:class:`FieldPath`) An instance parsed from ``api_repr``. 
    282        Raises: 
    283            ValueError if the parsing fails 
    284        """ 
    285        api_repr = api_repr.strip() 
    286        if not api_repr: 
    287            raise ValueError("Field path API representation cannot be empty.") 
    288        return cls(*parse_field_path(api_repr)) 
    289 
    290    @classmethod 
    291    def from_string(cls, path_string: str) -> "FieldPath": 
    292        """Factory: create a FieldPath from a unicode string representation. 
    293 
    294        This method splits on the character `.` and disallows the 
    295        characters `~*/[]`. To create a FieldPath whose components have 
    296        those characters, call the constructor. 
    297 
    298        Args: 
    299            path_string (str): A unicode string which cannot contain 
    300            `~*/[]` characters, cannot exceed 1500 bytes, and cannot be empty. 
    301 
    302        Returns: 
    303            (:class:`FieldPath`) An instance parsed from ``path_string``. 
    304        """ 
    305        try: 
    306            return cls.from_api_repr(path_string) 
    307        except ValueError: 
    308            elements = path_string.split(".") 
    309            for element in elements: 
    310                if not element: 
    311                    raise ValueError("Empty element") 
    312                if _LEADING_ALPHA_INVALID.match(element): 
    313                    raise ValueError( 
    314                        "Invalid char in element with leading alpha: {}".format(element) 
    315                    ) 
    316            return FieldPath(*elements) 
    317 
    318    def __repr__(self): 
    319        paths = "" 
    320        for part in self.parts: 
    321            paths += "'" + part + "'," 
    322        paths = paths[:-1] 
    323        return "FieldPath({})".format(paths) 
    324 
    325    def __hash__(self): 
    326        return hash(self.to_api_repr()) 
    327 
    328    def __eq__(self, other): 
    329        if isinstance(other, FieldPath): 
    330            return self.parts == other.parts 
    331        return NotImplemented 
    332 
    333    def __lt__(self, other): 
    334        if isinstance(other, FieldPath): 
    335            return self.parts < other.parts 
    336        return NotImplemented 
    337 
    338    def __add__(self, other): 
    339        """Adds `other` field path to end of this field path. 
    340 
    341        Args: 
    342            other (~google.cloud.firestore_v1._helpers.FieldPath, str): 
    343                The field path to add to the end of this `FieldPath`. 
    344        """ 
    345        if isinstance(other, FieldPath): 
    346            parts = self.parts + other.parts 
    347            return FieldPath(*parts) 
    348        elif isinstance(other, str): 
    349            parts = self.parts + FieldPath.from_string(other).parts 
    350            return FieldPath(*parts) 
    351        else: 
    352            return NotImplemented 
    353 
    354    def to_api_repr(self) -> str: 
    355        """Render a quoted string representation of the FieldPath 
    356 
    357        Returns: 
    358            (str) Quoted string representation of the path stored 
    359            within this FieldPath. 
    360        """ 
    361        return render_field_path(self.parts) 
    362 
    363    def eq_or_parent(self, other) -> bool: 
    364        """Check whether ``other`` is an ancestor. 
    365 
    366        Returns: 
    367            (bool) True IFF ``other`` is an ancestor or equal to ``self``, 
    368            else False. 
    369        """ 
    370        return self.parts[: len(other.parts)] == other.parts[: len(self.parts)] 
    371 
    372    def lineage(self) -> set["FieldPath"]: 
    373        """Return field paths for all parents. 
    374 
    375        Returns: Set[:class:`FieldPath`] 
    376        """ 
    377        indexes = range(1, len(self.parts)) 
    378        return {FieldPath(*self.parts[:index]) for index in indexes} 
    379 
    380    @staticmethod 
    381    def document_id() -> str: 
    382        """A special FieldPath value to refer to the ID of a document. It can be used 
    383           in queries to sort or filter by the document ID. 
    384 
    385        Returns: A special sentinel value to refer to the ID of a document. 
    386        """ 
    387        return "__name__"