1# Copyright 2018 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Utilities for managing / converting field paths to / from strings."""
16from __future__ import annotations
17import re
18from collections import abc
19from typing import Iterable, cast
20
21_FIELD_PATH_MISSING_TOP = "{!r} is not contained in the data"
22_FIELD_PATH_MISSING_KEY = "{!r} is not contained in the data for the key {!r}"
23_FIELD_PATH_WRONG_TYPE = (
24 "The data at {!r} is not a dictionary, so it cannot contain the key {!r}"
25)
26
27_FIELD_PATH_DELIMITER = "."
28_BACKSLASH = "\\"
29_ESCAPED_BACKSLASH = _BACKSLASH * 2
30_BACKTICK = "`"
31_ESCAPED_BACKTICK = _BACKSLASH + _BACKTICK
32
33_SIMPLE_FIELD_NAME = re.compile("^[_a-zA-Z][_a-zA-Z0-9]*$")
34_LEADING_ALPHA_INVALID = re.compile(r"^[_a-zA-Z][_a-zA-Z0-9]*[~*/\[\]]")
35PATH_ELEMENT_TOKENS = [
36 ("SIMPLE", r"[_a-zA-Z][_a-zA-Z0-9]*"), # unquoted elements
37 ("QUOTED", r"`(?:\\`|[^`])*?`"), # quoted elements, unquoted
38 ("DOT", r"\."), # separator
39]
40TOKENS_PATTERN = "|".join("(?P<{}>{})".format(*pair) for pair in PATH_ELEMENT_TOKENS)
41TOKENS_REGEX = re.compile(TOKENS_PATTERN)
42
43
44def _tokenize_field_path(path: str):
45 """Lex a field path into tokens (including dots).
46
47 Args:
48 path (str): field path to be lexed.
49 Returns:
50 List(str): tokens
51 """
52 pos = 0
53 get_token = TOKENS_REGEX.match
54 match = get_token(path)
55 while match is not None:
56 type_ = cast(str, match.lastgroup)
57 value = match.group(type_)
58 yield value
59 pos = match.end()
60 match = get_token(path, pos)
61 if pos != len(path):
62 raise ValueError("Path {} not consumed, residue: {}".format(path, path[pos:]))
63
64
65def split_field_path(path: str | None):
66 """Split a field path into valid elements (without dots).
67
68 Args:
69 path (str): field path to be lexed.
70 Returns:
71 List(str): tokens
72 Raises:
73 ValueError: if the path does not match the elements-interspersed-
74 with-dots pattern.
75 """
76 if not path:
77 return []
78
79 elements = []
80 want_dot = False
81
82 for element in _tokenize_field_path(path):
83 if want_dot:
84 if element != ".":
85 raise ValueError("Invalid path: {}".format(path))
86 else:
87 want_dot = False
88 else:
89 if element == ".":
90 raise ValueError("Invalid path: {}".format(path))
91 elements.append(element)
92 want_dot = True
93
94 if not want_dot or not elements:
95 raise ValueError("Invalid path: {}".format(path))
96
97 return elements
98
99
100def parse_field_path(api_repr: str):
101 """Parse a **field path** from into a list of nested field names.
102
103 See :func:`field_path` for more on **field paths**.
104
105 Args:
106 api_repr (str):
107 The unique Firestore api representation which consists of
108 either simple or UTF-8 field names. It cannot exceed
109 1500 bytes, and cannot be empty. Simple field names match
110 ``'^[_a-zA-Z][_a-zA-Z0-9]*$'``. All other field names are
111 escaped by surrounding them with backticks.
112
113 Returns:
114 List[str, ...]: The list of field names in the field path.
115 """
116 # code dredged back up from
117 # https://github.com/googleapis/google-cloud-python/pull/5109/files
118 field_names = []
119 for field_name in split_field_path(api_repr):
120 # non-simple field name
121 if field_name[0] == "`" and field_name[-1] == "`":
122 field_name = field_name[1:-1]
123 field_name = field_name.replace(_ESCAPED_BACKTICK, _BACKTICK)
124 field_name = field_name.replace(_ESCAPED_BACKSLASH, _BACKSLASH)
125 field_names.append(field_name)
126 return field_names
127
128
129def render_field_path(field_names: Iterable[str]):
130 """Create a **field path** from a list of nested field names.
131
132 A **field path** is a ``.``-delimited concatenation of the field
133 names. It is used to represent a nested field. For example,
134 in the data
135
136 .. code-block:: python
137
138 data = {
139 'aa': {
140 'bb': {
141 'cc': 10,
142 },
143 },
144 }
145
146 the field path ``'aa.bb.cc'`` represents that data stored in
147 ``data['aa']['bb']['cc']``.
148
149 Args:
150 field_names: The list of field names.
151
152 Returns:
153 str: The ``.``-delimited field path.
154 """
155 result = []
156
157 for field_name in field_names:
158 match = _SIMPLE_FIELD_NAME.match(field_name)
159 if match and match.group(0) == field_name:
160 result.append(field_name)
161 else:
162 replaced = field_name.replace(_BACKSLASH, _ESCAPED_BACKSLASH).replace(
163 _BACKTICK, _ESCAPED_BACKTICK
164 )
165 result.append(_BACKTICK + replaced + _BACKTICK)
166
167 return _FIELD_PATH_DELIMITER.join(result)
168
169
170get_field_path = render_field_path # backward-compatibility
171
172
173def get_nested_value(field_path: str, data: dict):
174 """Get a (potentially nested) value from a dictionary.
175
176 If the data is nested, for example:
177
178 .. code-block:: python
179
180 >>> data
181 {
182 'top1': {
183 'middle2': {
184 'bottom3': 20,
185 'bottom4': 22,
186 },
187 'middle5': True,
188 },
189 'top6': b'\x00\x01 foo',
190 }
191
192 a **field path** can be used to access the nested data. For
193 example:
194
195 .. code-block:: python
196
197 >>> get_nested_value('top1', data)
198 {
199 'middle2': {
200 'bottom3': 20,
201 'bottom4': 22,
202 },
203 'middle5': True,
204 }
205 >>> get_nested_value('top1.middle2', data)
206 {
207 'bottom3': 20,
208 'bottom4': 22,
209 }
210 >>> get_nested_value('top1.middle2.bottom3', data)
211 20
212
213 See :meth:`~google.cloud.firestore_v1.client.Client.field_path` for
214 more information on **field paths**.
215
216 Args:
217 field_path (str): A field path (``.``-delimited list of
218 field names).
219 data (Dict[str, Any]): The (possibly nested) data.
220
221 Returns:
222 Any: (A copy of) the value stored for the ``field_path``.
223
224 Raises:
225 KeyError: If the ``field_path`` does not match nested data.
226 """
227 field_names = parse_field_path(field_path)
228
229 nested_data = data
230 for index, field_name in enumerate(field_names):
231 if isinstance(nested_data, abc.Mapping):
232 if field_name in nested_data:
233 nested_data = nested_data[field_name]
234 else:
235 if index == 0:
236 msg = _FIELD_PATH_MISSING_TOP.format(field_name)
237 raise KeyError(msg)
238 else:
239 partial = render_field_path(field_names[:index])
240 msg = _FIELD_PATH_MISSING_KEY.format(field_name, partial)
241 raise KeyError(msg)
242 else:
243 partial = render_field_path(field_names[:index])
244 msg = _FIELD_PATH_WRONG_TYPE.format(partial, field_name)
245 raise KeyError(msg)
246
247 return nested_data
248
249
250class FieldPath(object):
251 """Field Path object for client use.
252
253 A field path is a sequence of element keys, separated by periods.
254 Each element key can be either a simple identifier, or a full unicode
255 string.
256
257 In the string representation of a field path, non-identifier elements
258 must be quoted using backticks, with internal backticks and backslashes
259 escaped with a backslash.
260
261 Args:
262 parts: (one or more strings)
263 Indicating path of the key to be used.
264 """
265
266 def __init__(self, *parts: str):
267 for part in parts:
268 if not isinstance(part, str) or not part:
269 error = "One or more components is not a string or is empty."
270 raise ValueError(error)
271 self.parts = tuple(parts)
272
273 @classmethod
274 def from_api_repr(cls, api_repr: str) -> "FieldPath":
275 """Factory: create a FieldPath from the string formatted per the API.
276
277 Args:
278 api_repr (str): a string path, with non-identifier elements quoted
279 It cannot exceed 1500 characters, and cannot be empty.
280 Returns:
281 (:class:`FieldPath`) An instance parsed from ``api_repr``.
282 Raises:
283 ValueError if the parsing fails
284 """
285 api_repr = api_repr.strip()
286 if not api_repr:
287 raise ValueError("Field path API representation cannot be empty.")
288 return cls(*parse_field_path(api_repr))
289
290 @classmethod
291 def from_string(cls, path_string: str) -> "FieldPath":
292 """Factory: create a FieldPath from a unicode string representation.
293
294 This method splits on the character `.` and disallows the
295 characters `~*/[]`. To create a FieldPath whose components have
296 those characters, call the constructor.
297
298 Args:
299 path_string (str): A unicode string which cannot contain
300 `~*/[]` characters, cannot exceed 1500 bytes, and cannot be empty.
301
302 Returns:
303 (:class:`FieldPath`) An instance parsed from ``path_string``.
304 """
305 try:
306 return cls.from_api_repr(path_string)
307 except ValueError:
308 elements = path_string.split(".")
309 for element in elements:
310 if not element:
311 raise ValueError("Empty element")
312 if _LEADING_ALPHA_INVALID.match(element):
313 raise ValueError(
314 "Invalid char in element with leading alpha: {}".format(element)
315 )
316 return FieldPath(*elements)
317
318 def __repr__(self):
319 paths = ""
320 for part in self.parts:
321 paths += "'" + part + "',"
322 paths = paths[:-1]
323 return "FieldPath({})".format(paths)
324
325 def __hash__(self):
326 return hash(self.to_api_repr())
327
328 def __eq__(self, other):
329 if isinstance(other, FieldPath):
330 return self.parts == other.parts
331 return NotImplemented
332
333 def __lt__(self, other):
334 if isinstance(other, FieldPath):
335 return self.parts < other.parts
336 return NotImplemented
337
338 def __add__(self, other):
339 """Adds `other` field path to end of this field path.
340
341 Args:
342 other (~google.cloud.firestore_v1._helpers.FieldPath, str):
343 The field path to add to the end of this `FieldPath`.
344 """
345 if isinstance(other, FieldPath):
346 parts = self.parts + other.parts
347 return FieldPath(*parts)
348 elif isinstance(other, str):
349 parts = self.parts + FieldPath.from_string(other).parts
350 return FieldPath(*parts)
351 else:
352 return NotImplemented
353
354 def to_api_repr(self) -> str:
355 """Render a quoted string representation of the FieldPath
356
357 Returns:
358 (str) Quoted string representation of the path stored
359 within this FieldPath.
360 """
361 return render_field_path(self.parts)
362
363 def eq_or_parent(self, other) -> bool:
364 """Check whether ``other`` is an ancestor.
365
366 Returns:
367 (bool) True IFF ``other`` is an ancestor or equal to ``self``,
368 else False.
369 """
370 return self.parts[: len(other.parts)] == other.parts[: len(self.parts)]
371
372 def lineage(self) -> set["FieldPath"]:
373 """Return field paths for all parents.
374
375 Returns: Set[:class:`FieldPath`]
376 """
377 indexes = range(1, len(self.parts))
378 return {FieldPath(*self.parts[:index]) for index in indexes}
379
380 @staticmethod
381 def document_id() -> str:
382 """A special FieldPath value to refer to the ID of a document. It can be used
383 in queries to sort or filter by the document ID.
384
385 Returns: A special sentinel value to refer to the ID of a document.
386 """
387 return "__name__"