Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/protobuf/internal/field_mask.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

178 statements  

1# Protocol Buffers - Google's data interchange format 

2# Copyright 2008 Google Inc. All rights reserved. 

3# 

4# Use of this source code is governed by a BSD-style 

5# license that can be found in the LICENSE file or at 

6# https://developers.google.com/open-source/licenses/bsd 

7 

8"""Contains FieldMask class.""" 

9 

10from google.protobuf.descriptor import FieldDescriptor 

11 

12 

13class FieldMask(object): 

14 """Class for FieldMask message type.""" 

15 

16 __slots__ = () 

17 

18 def ToJsonString(self): 

19 """Converts FieldMask to string according to ProtoJSON spec.""" 

20 camelcase_paths = [] 

21 for path in self.paths: 

22 camelcase_paths.append(_SnakeCaseToCamelCase(path)) 

23 return ','.join(camelcase_paths) 

24 

25 def FromJsonString(self, value): 

26 """Converts string to FieldMask according to ProtoJSON spec.""" 

27 if not isinstance(value, str): 

28 raise ValueError('FieldMask JSON value not a string: {!r}'.format(value)) 

29 self.Clear() 

30 if value: 

31 for path in value.split(','): 

32 self.paths.append(_CamelCaseToSnakeCase(path)) 

33 

34 def IsValidForDescriptor(self, message_descriptor): 

35 """Checks whether the FieldMask is valid for Message Descriptor.""" 

36 for path in self.paths: 

37 if not _IsValidPath(message_descriptor, path): 

38 return False 

39 return True 

40 

41 def AllFieldsFromDescriptor(self, message_descriptor): 

42 """Gets all direct fields of Message Descriptor to FieldMask.""" 

43 self.Clear() 

44 for field in message_descriptor.fields: 

45 self.paths.append(field.name) 

46 

47 def CanonicalFormFromMask(self, mask): 

48 """Converts a FieldMask to the canonical form. 

49 

50 Removes paths that are covered by another path. For example, 

51 "foo.bar" is covered by "foo" and will be removed if "foo" 

52 is also in the FieldMask. Then sorts all paths in alphabetical order. 

53 

54 Args: 

55 mask: The original FieldMask to be converted. 

56 """ 

57 tree = _FieldMaskTree(mask) 

58 tree.ToFieldMask(self) 

59 

60 def Union(self, mask1, mask2): 

61 """Merges mask1 and mask2 into this FieldMask.""" 

62 _CheckFieldMaskMessage(mask1) 

63 _CheckFieldMaskMessage(mask2) 

64 tree = _FieldMaskTree(mask1) 

65 tree.MergeFromFieldMask(mask2) 

66 tree.ToFieldMask(self) 

67 

68 def Intersect(self, mask1, mask2): 

69 """Intersects mask1 and mask2 into this FieldMask.""" 

70 _CheckFieldMaskMessage(mask1) 

71 _CheckFieldMaskMessage(mask2) 

72 tree = _FieldMaskTree(mask1) 

73 intersection = _FieldMaskTree() 

74 for path in mask2.paths: 

75 tree.IntersectPath(path, intersection) 

76 intersection.ToFieldMask(self) 

77 

78 def MergeMessage( 

79 self, source, destination, 

80 replace_message_field=False, replace_repeated_field=False): 

81 """Merges fields specified in FieldMask from source to destination. 

82 

83 Args: 

84 source: Source message. 

85 destination: The destination message to be merged into. 

86 replace_message_field: Replace message field if True. Merge message 

87 field if False. 

88 replace_repeated_field: Replace repeated field if True. Append 

89 elements of repeated field if False. 

90 """ 

91 tree = _FieldMaskTree(self) 

92 tree.MergeMessage( 

93 source, destination, replace_message_field, replace_repeated_field) 

94 

95 

96def _IsValidPath(message_descriptor, path): 

97 """Checks whether the path is valid for Message Descriptor.""" 

98 parts = path.split('.') 

99 last = parts.pop() 

100 for name in parts: 

101 field = message_descriptor.fields_by_name.get(name) 

102 if (field is None or 

103 field.is_repeated or 

104 field.type != FieldDescriptor.TYPE_MESSAGE): 

105 return False 

106 message_descriptor = field.message_type 

107 return last in message_descriptor.fields_by_name 

108 

109 

110def _CheckFieldMaskMessage(message): 

111 """Raises ValueError if message is not a FieldMask.""" 

112 message_descriptor = message.DESCRIPTOR 

113 if (message_descriptor.name != 'FieldMask' or 

114 message_descriptor.file.name != 'google/protobuf/field_mask.proto'): 

115 raise ValueError('Message {0} is not a FieldMask.'.format( 

116 message_descriptor.full_name)) 

117 

118 

119def _SnakeCaseToCamelCase(path_name): 

120 """Converts a path name from snake_case to camelCase.""" 

121 result = [] 

122 after_underscore = False 

123 for c in path_name: 

124 if c.isupper(): 

125 raise ValueError( 

126 'Fail to print FieldMask to Json string: Path name ' 

127 '{0} must not contain uppercase letters.'.format(path_name)) 

128 if after_underscore: 

129 if c.islower(): 

130 result.append(c.upper()) 

131 after_underscore = False 

132 else: 

133 raise ValueError( 

134 'Fail to print FieldMask to Json string: The ' 

135 'character after a "_" must be a lowercase letter ' 

136 'in path name {0}.'.format(path_name)) 

137 elif c == '_': 

138 after_underscore = True 

139 else: 

140 result += c 

141 

142 if after_underscore: 

143 raise ValueError('Fail to print FieldMask to Json string: Trailing "_" ' 

144 'in path name {0}.'.format(path_name)) 

145 return ''.join(result) 

146 

147 

148def _CamelCaseToSnakeCase(path_name): 

149 """Converts a field name from camelCase to snake_case.""" 

150 result = [] 

151 for c in path_name: 

152 if c == '_': 

153 raise ValueError('Fail to parse FieldMask: Path name ' 

154 '{0} must not contain "_"s.'.format(path_name)) 

155 if c.isupper(): 

156 result += '_' 

157 result += c.lower() 

158 else: 

159 result += c 

160 return ''.join(result) 

161 

162 

163class _FieldMaskTree(object): 

164 """Represents a FieldMask in a tree structure. 

165 

166 For example, given a FieldMask "foo.bar,foo.baz,bar.baz", 

167 the FieldMaskTree will be: 

168 [_root] -+- foo -+- bar 

169 | | 

170 | +- baz 

171 | 

172 +- bar --- baz 

173 In the tree, each leaf node represents a field path. 

174 """ 

175 

176 __slots__ = ('_root',) 

177 

178 def __init__(self, field_mask=None): 

179 """Initializes the tree by FieldMask.""" 

180 self._root = {} 

181 if field_mask: 

182 self.MergeFromFieldMask(field_mask) 

183 

184 def MergeFromFieldMask(self, field_mask): 

185 """Merges a FieldMask to the tree.""" 

186 for path in field_mask.paths: 

187 self.AddPath(path) 

188 

189 def AddPath(self, path): 

190 """Adds a field path into the tree. 

191 

192 If the field path to add is a sub-path of an existing field path 

193 in the tree (i.e., a leaf node), it means the tree already matches 

194 the given path so nothing will be added to the tree. If the path 

195 matches an existing non-leaf node in the tree, that non-leaf node 

196 will be turned into a leaf node with all its children removed because 

197 the path matches all the node's children. Otherwise, a new path will 

198 be added. 

199 

200 Args: 

201 path: The field path to add. 

202 """ 

203 node = self._root 

204 for name in path.split('.'): 

205 if name not in node: 

206 node[name] = {} 

207 elif not node[name]: 

208 # Pre-existing empty node implies we already have this entire tree. 

209 return 

210 node = node[name] 

211 # Remove any sub-trees we might have had. 

212 node.clear() 

213 

214 def ToFieldMask(self, field_mask): 

215 """Converts the tree to a FieldMask.""" 

216 field_mask.Clear() 

217 _AddFieldPaths(self._root, '', field_mask) 

218 

219 def IntersectPath(self, path, intersection): 

220 """Calculates the intersection part of a field path with this tree. 

221 

222 Args: 

223 path: The field path to calculates. 

224 intersection: The out tree to record the intersection part. 

225 """ 

226 node = self._root 

227 for name in path.split('.'): 

228 if name not in node: 

229 return 

230 elif not node[name]: 

231 intersection.AddPath(path) 

232 return 

233 node = node[name] 

234 intersection.AddLeafNodes(path, node) 

235 

236 def AddLeafNodes(self, prefix, node): 

237 """Adds leaf nodes begin with prefix to this tree.""" 

238 if not node: 

239 self.AddPath(prefix) 

240 return 

241 stack = [(prefix, node)] 

242 while stack: 

243 current_prefix, current_node = stack.pop() 

244 if not current_node: 

245 self.AddPath(current_prefix) 

246 continue 

247 for name in current_node: 

248 child_path = current_prefix + '.' + name 

249 stack.append((child_path, current_node[name])) 

250 

251 def MergeMessage( 

252 self, source, destination, 

253 replace_message, replace_repeated): 

254 """Merge all fields specified by this tree from source to destination.""" 

255 _MergeMessage( 

256 self._root, source, destination, replace_message, replace_repeated) 

257 

258 

259def _StrConvert(value): 

260 """Converts value to str if it is not.""" 

261 # This file is imported by c extension and some methods like ClearField 

262 # requires string for the field name. py2/py3 has different text 

263 # type and may use unicode. 

264 if not isinstance(value, str): 

265 return value.encode('utf-8') 

266 return value 

267 

268 

269def _MergeMessage( 

270 node, source, destination, replace_message, replace_repeated): 

271 """Merge all fields specified by a sub-tree from source to destination.""" 

272 stack = [(node, source, destination)] 

273 while stack: 

274 current_node, current_source, current_destination = stack.pop() 

275 source_descriptor = current_source.DESCRIPTOR 

276 for name in current_node: 

277 child = current_node[name] 

278 field = source_descriptor.fields_by_name[name] 

279 if field is None: 

280 raise ValueError('Error: Can\'t find field {0} in message {1}.'.format( 

281 name, source_descriptor.full_name)) 

282 if child: 

283 # Sub-paths are only allowed for singular message fields. 

284 if (field.is_repeated or 

285 field.cpp_type != FieldDescriptor.CPPTYPE_MESSAGE): 

286 raise ValueError('Error: Field {0} in message {1} is not a singular ' 

287 'message field and cannot have sub-fields.'.format( 

288 name, source_descriptor.full_name)) 

289 if current_source.HasField(name): 

290 stack.append( 

291 (child, getattr(current_source, name), 

292 getattr(current_destination, name))) 

293 continue 

294 if field.is_repeated: 

295 if replace_repeated: 

296 current_destination.ClearField(_StrConvert(name)) 

297 repeated_source = getattr(current_source, name) 

298 repeated_destination = getattr(current_destination, name) 

299 repeated_destination.MergeFrom(repeated_source) 

300 else: 

301 if field.cpp_type == FieldDescriptor.CPPTYPE_MESSAGE: 

302 if replace_message: 

303 current_destination.ClearField(_StrConvert(name)) 

304 if current_source.HasField(name): 

305 getattr(current_destination, name).MergeFrom( 

306 getattr(current_source, name)) 

307 elif not field.has_presence or current_source.HasField(name): 

308 setattr(current_destination, name, getattr(current_source, name)) 

309 else: 

310 current_destination.ClearField(_StrConvert(name)) 

311 

312 

313def _AddFieldPaths(node, prefix, field_mask): 

314 """Adds the field paths descended from node to field_mask.""" 

315 stack = [(node, prefix)] 

316 while stack: 

317 current_node, current_prefix = stack.pop() 

318 if not current_node and current_prefix: 

319 field_mask.paths.append(current_prefix) 

320 continue 

321 for name in sorted(current_node, reverse=True): 

322 if current_prefix: 

323 child_path = current_prefix + '.' + name 

324 else: 

325 child_path = name 

326 stack.append((current_node[name], child_path))