Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/protobuf/internal/field_mask.py: 17%
162 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Protocol Buffers - Google's data interchange format
2# Copyright 2008 Google Inc. All rights reserved.
3# https://developers.google.com/protocol-buffers/
4#
5# Redistribution and use in source and binary forms, with or without
6# modification, are permitted provided that the following conditions are
7# met:
8#
9# * Redistributions of source code must retain the above copyright
10# notice, this list of conditions and the following disclaimer.
11# * Redistributions in binary form must reproduce the above
12# copyright notice, this list of conditions and the following disclaimer
13# in the documentation and/or other materials provided with the
14# distribution.
15# * Neither the name of Google Inc. nor the names of its
16# contributors may be used to endorse or promote products derived from
17# this software without specific prior written permission.
18#
19# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31"""Contains FieldMask class."""
33from google.protobuf.descriptor import FieldDescriptor
36class FieldMask(object):
37 """Class for FieldMask message type."""
39 __slots__ = ()
41 def ToJsonString(self):
42 """Converts FieldMask to string according to proto3 JSON spec."""
43 camelcase_paths = []
44 for path in self.paths:
45 camelcase_paths.append(_SnakeCaseToCamelCase(path))
46 return ','.join(camelcase_paths)
48 def FromJsonString(self, value):
49 """Converts string to FieldMask according to proto3 JSON spec."""
50 if not isinstance(value, str):
51 raise ValueError('FieldMask JSON value not a string: {!r}'.format(value))
52 self.Clear()
53 if value:
54 for path in value.split(','):
55 self.paths.append(_CamelCaseToSnakeCase(path))
57 def IsValidForDescriptor(self, message_descriptor):
58 """Checks whether the FieldMask is valid for Message Descriptor."""
59 for path in self.paths:
60 if not _IsValidPath(message_descriptor, path):
61 return False
62 return True
64 def AllFieldsFromDescriptor(self, message_descriptor):
65 """Gets all direct fields of Message Descriptor to FieldMask."""
66 self.Clear()
67 for field in message_descriptor.fields:
68 self.paths.append(field.name)
70 def CanonicalFormFromMask(self, mask):
71 """Converts a FieldMask to the canonical form.
73 Removes paths that are covered by another path. For example,
74 "foo.bar" is covered by "foo" and will be removed if "foo"
75 is also in the FieldMask. Then sorts all paths in alphabetical order.
77 Args:
78 mask: The original FieldMask to be converted.
79 """
80 tree = _FieldMaskTree(mask)
81 tree.ToFieldMask(self)
83 def Union(self, mask1, mask2):
84 """Merges mask1 and mask2 into this FieldMask."""
85 _CheckFieldMaskMessage(mask1)
86 _CheckFieldMaskMessage(mask2)
87 tree = _FieldMaskTree(mask1)
88 tree.MergeFromFieldMask(mask2)
89 tree.ToFieldMask(self)
91 def Intersect(self, mask1, mask2):
92 """Intersects mask1 and mask2 into this FieldMask."""
93 _CheckFieldMaskMessage(mask1)
94 _CheckFieldMaskMessage(mask2)
95 tree = _FieldMaskTree(mask1)
96 intersection = _FieldMaskTree()
97 for path in mask2.paths:
98 tree.IntersectPath(path, intersection)
99 intersection.ToFieldMask(self)
101 def MergeMessage(
102 self, source, destination,
103 replace_message_field=False, replace_repeated_field=False):
104 """Merges fields specified in FieldMask from source to destination.
106 Args:
107 source: Source message.
108 destination: The destination message to be merged into.
109 replace_message_field: Replace message field if True. Merge message
110 field if False.
111 replace_repeated_field: Replace repeated field if True. Append
112 elements of repeated field if False.
113 """
114 tree = _FieldMaskTree(self)
115 tree.MergeMessage(
116 source, destination, replace_message_field, replace_repeated_field)
119def _IsValidPath(message_descriptor, path):
120 """Checks whether the path is valid for Message Descriptor."""
121 parts = path.split('.')
122 last = parts.pop()
123 for name in parts:
124 field = message_descriptor.fields_by_name.get(name)
125 if (field is None or
126 field.label == FieldDescriptor.LABEL_REPEATED or
127 field.type != FieldDescriptor.TYPE_MESSAGE):
128 return False
129 message_descriptor = field.message_type
130 return last in message_descriptor.fields_by_name
133def _CheckFieldMaskMessage(message):
134 """Raises ValueError if message is not a FieldMask."""
135 message_descriptor = message.DESCRIPTOR
136 if (message_descriptor.name != 'FieldMask' or
137 message_descriptor.file.name != 'google/protobuf/field_mask.proto'):
138 raise ValueError('Message {0} is not a FieldMask.'.format(
139 message_descriptor.full_name))
142def _SnakeCaseToCamelCase(path_name):
143 """Converts a path name from snake_case to camelCase."""
144 result = []
145 after_underscore = False
146 for c in path_name:
147 if c.isupper():
148 raise ValueError(
149 'Fail to print FieldMask to Json string: Path name '
150 '{0} must not contain uppercase letters.'.format(path_name))
151 if after_underscore:
152 if c.islower():
153 result.append(c.upper())
154 after_underscore = False
155 else:
156 raise ValueError(
157 'Fail to print FieldMask to Json string: The '
158 'character after a "_" must be a lowercase letter '
159 'in path name {0}.'.format(path_name))
160 elif c == '_':
161 after_underscore = True
162 else:
163 result += c
165 if after_underscore:
166 raise ValueError('Fail to print FieldMask to Json string: Trailing "_" '
167 'in path name {0}.'.format(path_name))
168 return ''.join(result)
171def _CamelCaseToSnakeCase(path_name):
172 """Converts a field name from camelCase to snake_case."""
173 result = []
174 for c in path_name:
175 if c == '_':
176 raise ValueError('Fail to parse FieldMask: Path name '
177 '{0} must not contain "_"s.'.format(path_name))
178 if c.isupper():
179 result += '_'
180 result += c.lower()
181 else:
182 result += c
183 return ''.join(result)
186class _FieldMaskTree(object):
187 """Represents a FieldMask in a tree structure.
189 For example, given a FieldMask "foo.bar,foo.baz,bar.baz",
190 the FieldMaskTree will be:
191 [_root] -+- foo -+- bar
192 | |
193 | +- baz
194 |
195 +- bar --- baz
196 In the tree, each leaf node represents a field path.
197 """
199 __slots__ = ('_root',)
201 def __init__(self, field_mask=None):
202 """Initializes the tree by FieldMask."""
203 self._root = {}
204 if field_mask:
205 self.MergeFromFieldMask(field_mask)
207 def MergeFromFieldMask(self, field_mask):
208 """Merges a FieldMask to the tree."""
209 for path in field_mask.paths:
210 self.AddPath(path)
212 def AddPath(self, path):
213 """Adds a field path into the tree.
215 If the field path to add is a sub-path of an existing field path
216 in the tree (i.e., a leaf node), it means the tree already matches
217 the given path so nothing will be added to the tree. If the path
218 matches an existing non-leaf node in the tree, that non-leaf node
219 will be turned into a leaf node with all its children removed because
220 the path matches all the node's children. Otherwise, a new path will
221 be added.
223 Args:
224 path: The field path to add.
225 """
226 node = self._root
227 for name in path.split('.'):
228 if name not in node:
229 node[name] = {}
230 elif not node[name]:
231 # Pre-existing empty node implies we already have this entire tree.
232 return
233 node = node[name]
234 # Remove any sub-trees we might have had.
235 node.clear()
237 def ToFieldMask(self, field_mask):
238 """Converts the tree to a FieldMask."""
239 field_mask.Clear()
240 _AddFieldPaths(self._root, '', field_mask)
242 def IntersectPath(self, path, intersection):
243 """Calculates the intersection part of a field path with this tree.
245 Args:
246 path: The field path to calculates.
247 intersection: The out tree to record the intersection part.
248 """
249 node = self._root
250 for name in path.split('.'):
251 if name not in node:
252 return
253 elif not node[name]:
254 intersection.AddPath(path)
255 return
256 node = node[name]
257 intersection.AddLeafNodes(path, node)
259 def AddLeafNodes(self, prefix, node):
260 """Adds leaf nodes begin with prefix to this tree."""
261 if not node:
262 self.AddPath(prefix)
263 for name in node:
264 child_path = prefix + '.' + name
265 self.AddLeafNodes(child_path, node[name])
267 def MergeMessage(
268 self, source, destination,
269 replace_message, replace_repeated):
270 """Merge all fields specified by this tree from source to destination."""
271 _MergeMessage(
272 self._root, source, destination, replace_message, replace_repeated)
275def _StrConvert(value):
276 """Converts value to str if it is not."""
277 # This file is imported by c extension and some methods like ClearField
278 # requires string for the field name. py2/py3 has different text
279 # type and may use unicode.
280 if not isinstance(value, str):
281 return value.encode('utf-8')
282 return value
285def _MergeMessage(
286 node, source, destination, replace_message, replace_repeated):
287 """Merge all fields specified by a sub-tree from source to destination."""
288 source_descriptor = source.DESCRIPTOR
289 for name in node:
290 child = node[name]
291 field = source_descriptor.fields_by_name[name]
292 if field is None:
293 raise ValueError('Error: Can\'t find field {0} in message {1}.'.format(
294 name, source_descriptor.full_name))
295 if child:
296 # Sub-paths are only allowed for singular message fields.
297 if (field.label == FieldDescriptor.LABEL_REPEATED or
298 field.cpp_type != FieldDescriptor.CPPTYPE_MESSAGE):
299 raise ValueError('Error: Field {0} in message {1} is not a singular '
300 'message field and cannot have sub-fields.'.format(
301 name, source_descriptor.full_name))
302 if source.HasField(name):
303 _MergeMessage(
304 child, getattr(source, name), getattr(destination, name),
305 replace_message, replace_repeated)
306 continue
307 if field.label == FieldDescriptor.LABEL_REPEATED:
308 if replace_repeated:
309 destination.ClearField(_StrConvert(name))
310 repeated_source = getattr(source, name)
311 repeated_destination = getattr(destination, name)
312 repeated_destination.MergeFrom(repeated_source)
313 else:
314 if field.cpp_type == FieldDescriptor.CPPTYPE_MESSAGE:
315 if replace_message:
316 destination.ClearField(_StrConvert(name))
317 if source.HasField(name):
318 getattr(destination, name).MergeFrom(getattr(source, name))
319 else:
320 setattr(destination, name, getattr(source, name))
323def _AddFieldPaths(node, prefix, field_mask):
324 """Adds the field paths descended from node to field_mask."""
325 if not node and prefix:
326 field_mask.paths.append(prefix)
327 return
328 for name in sorted(node):
329 if prefix:
330 child_path = prefix + '.' + name
331 else:
332 child_path = name
333 _AddFieldPaths(node[name], child_path, field_mask)