Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/protobuf/internal/containers.py: 34%

355 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1# Protocol Buffers - Google's data interchange format 

2# Copyright 2008 Google Inc. All rights reserved. 

3# 

4# Use of this source code is governed by a BSD-style 

5# license that can be found in the LICENSE file or at 

6# https://developers.google.com/open-source/licenses/bsd 

7 

8"""Contains container classes to represent different protocol buffer types. 

9 

10This file defines container classes which represent categories of protocol 

11buffer field types which need extra maintenance. Currently these categories 

12are: 

13 

14- Repeated scalar fields - These are all repeated fields which aren't 

15 composite (e.g. they are of simple types like int32, string, etc). 

16- Repeated composite fields - Repeated fields which are composite. This 

17 includes groups and nested messages. 

18""" 

19 

20import collections.abc 

21import copy 

22import pickle 

23from typing import ( 

24 Any, 

25 Iterable, 

26 Iterator, 

27 List, 

28 MutableMapping, 

29 MutableSequence, 

30 NoReturn, 

31 Optional, 

32 Sequence, 

33 TypeVar, 

34 Union, 

35 overload, 

36) 

37 

38 

39_T = TypeVar('_T') 

40_K = TypeVar('_K') 

41_V = TypeVar('_V') 

42 

43 

44class BaseContainer(Sequence[_T]): 

45 """Base container class.""" 

46 

47 # Minimizes memory usage and disallows assignment to other attributes. 

48 __slots__ = ['_message_listener', '_values'] 

49 

50 def __init__(self, message_listener: Any) -> None: 

51 """ 

52 Args: 

53 message_listener: A MessageListener implementation. 

54 The RepeatedScalarFieldContainer will call this object's 

55 Modified() method when it is modified. 

56 """ 

57 self._message_listener = message_listener 

58 self._values = [] 

59 

60 @overload 

61 def __getitem__(self, key: int) -> _T: 

62 ... 

63 

64 @overload 

65 def __getitem__(self, key: slice) -> List[_T]: 

66 ... 

67 

68 def __getitem__(self, key): 

69 """Retrieves item by the specified key.""" 

70 return self._values[key] 

71 

72 def __len__(self) -> int: 

73 """Returns the number of elements in the container.""" 

74 return len(self._values) 

75 

76 def __ne__(self, other: Any) -> bool: 

77 """Checks if another instance isn't equal to this one.""" 

78 # The concrete classes should define __eq__. 

79 return not self == other 

80 

81 __hash__ = None 

82 

83 def __repr__(self) -> str: 

84 return repr(self._values) 

85 

86 def sort(self, *args, **kwargs) -> None: 

87 # Continue to support the old sort_function keyword argument. 

88 # This is expected to be a rare occurrence, so use LBYL to avoid 

89 # the overhead of actually catching KeyError. 

90 if 'sort_function' in kwargs: 

91 kwargs['cmp'] = kwargs.pop('sort_function') 

92 self._values.sort(*args, **kwargs) 

93 

94 def reverse(self) -> None: 

95 self._values.reverse() 

96 

97 

98# TODO: Remove this. BaseContainer does *not* conform to 

99# MutableSequence, only its subclasses do. 

100collections.abc.MutableSequence.register(BaseContainer) 

101 

102 

103class RepeatedScalarFieldContainer(BaseContainer[_T], MutableSequence[_T]): 

104 """Simple, type-checked, list-like container for holding repeated scalars.""" 

105 

106 # Disallows assignment to other attributes. 

107 __slots__ = ['_type_checker'] 

108 

109 def __init__( 

110 self, 

111 message_listener: Any, 

112 type_checker: Any, 

113 ) -> None: 

114 """Args: 

115 

116 message_listener: A MessageListener implementation. The 

117 RepeatedScalarFieldContainer will call this object's Modified() method 

118 when it is modified. 

119 type_checker: A type_checkers.ValueChecker instance to run on elements 

120 inserted into this container. 

121 """ 

122 super().__init__(message_listener) 

123 self._type_checker = type_checker 

124 

125 def append(self, value: _T) -> None: 

126 """Appends an item to the list. Similar to list.append().""" 

127 self._values.append(self._type_checker.CheckValue(value)) 

128 if not self._message_listener.dirty: 

129 self._message_listener.Modified() 

130 

131 def insert(self, key: int, value: _T) -> None: 

132 """Inserts the item at the specified position. Similar to list.insert().""" 

133 self._values.insert(key, self._type_checker.CheckValue(value)) 

134 if not self._message_listener.dirty: 

135 self._message_listener.Modified() 

136 

137 def extend(self, elem_seq: Iterable[_T]) -> None: 

138 """Extends by appending the given iterable. Similar to list.extend().""" 

139# TODO: Change OSS to raise error too 

140 if elem_seq is None: 

141 return 

142 try: 

143 elem_seq_iter = iter(elem_seq) 

144 except TypeError: 

145 if not elem_seq: 

146 warnings.warn('Value is not iterable. Please remove the wrong ' 

147 'usage. This will be changed to raise TypeError soon.') 

148 return 

149 raise 

150 new_values = [self._type_checker.CheckValue(elem) for elem in elem_seq_iter] 

151 if new_values: 

152 self._values.extend(new_values) 

153 self._message_listener.Modified() 

154 

155 def MergeFrom( 

156 self, 

157 other: Union['RepeatedScalarFieldContainer[_T]', Iterable[_T]], 

158 ) -> None: 

159 """Appends the contents of another repeated field of the same type to this 

160 one. We do not check the types of the individual fields. 

161 """ 

162 self._values.extend(other) 

163 self._message_listener.Modified() 

164 

165 def remove(self, elem: _T): 

166 """Removes an item from the list. Similar to list.remove().""" 

167 self._values.remove(elem) 

168 self._message_listener.Modified() 

169 

170 def pop(self, key: Optional[int] = -1) -> _T: 

171 """Removes and returns an item at a given index. Similar to list.pop().""" 

172 value = self._values[key] 

173 self.__delitem__(key) 

174 return value 

175 

176 @overload 

177 def __setitem__(self, key: int, value: _T) -> None: 

178 ... 

179 

180 @overload 

181 def __setitem__(self, key: slice, value: Iterable[_T]) -> None: 

182 ... 

183 

184 def __setitem__(self, key, value) -> None: 

185 """Sets the item on the specified position.""" 

186 if isinstance(key, slice): 

187 if key.step is not None: 

188 raise ValueError('Extended slices not supported') 

189 self._values[key] = map(self._type_checker.CheckValue, value) 

190 self._message_listener.Modified() 

191 else: 

192 self._values[key] = self._type_checker.CheckValue(value) 

193 self._message_listener.Modified() 

194 

195 def __delitem__(self, key: Union[int, slice]) -> None: 

196 """Deletes the item at the specified position.""" 

197 del self._values[key] 

198 self._message_listener.Modified() 

199 

200 def __eq__(self, other: Any) -> bool: 

201 """Compares the current instance with another one.""" 

202 if self is other: 

203 return True 

204 # Special case for the same type which should be common and fast. 

205 if isinstance(other, self.__class__): 

206 return other._values == self._values 

207 # We are presumably comparing against some other sequence type. 

208 return other == self._values 

209 

210 def __deepcopy__( 

211 self, 

212 unused_memo: Any = None, 

213 ) -> 'RepeatedScalarFieldContainer[_T]': 

214 clone = RepeatedScalarFieldContainer( 

215 copy.deepcopy(self._message_listener), self._type_checker) 

216 clone.MergeFrom(self) 

217 return clone 

218 

219 def __reduce__(self, **kwargs) -> NoReturn: 

220 raise pickle.PickleError( 

221 "Can't pickle repeated scalar fields, convert to list first") 

222 

223 

224# TODO: Constrain T to be a subtype of Message. 

225class RepeatedCompositeFieldContainer(BaseContainer[_T], MutableSequence[_T]): 

226 """Simple, list-like container for holding repeated composite fields.""" 

227 

228 # Disallows assignment to other attributes. 

229 __slots__ = ['_message_descriptor'] 

230 

231 def __init__(self, message_listener: Any, message_descriptor: Any) -> None: 

232 """ 

233 Note that we pass in a descriptor instead of the generated directly, 

234 since at the time we construct a _RepeatedCompositeFieldContainer we 

235 haven't yet necessarily initialized the type that will be contained in the 

236 container. 

237 

238 Args: 

239 message_listener: A MessageListener implementation. 

240 The RepeatedCompositeFieldContainer will call this object's 

241 Modified() method when it is modified. 

242 message_descriptor: A Descriptor instance describing the protocol type 

243 that should be present in this container. We'll use the 

244 _concrete_class field of this descriptor when the client calls add(). 

245 """ 

246 super().__init__(message_listener) 

247 self._message_descriptor = message_descriptor 

248 

249 def add(self, **kwargs: Any) -> _T: 

250 """Adds a new element at the end of the list and returns it. Keyword 

251 arguments may be used to initialize the element. 

252 """ 

253 new_element = self._message_descriptor._concrete_class(**kwargs) 

254 new_element._SetListener(self._message_listener) 

255 self._values.append(new_element) 

256 if not self._message_listener.dirty: 

257 self._message_listener.Modified() 

258 return new_element 

259 

260 def append(self, value: _T) -> None: 

261 """Appends one element by copying the message.""" 

262 new_element = self._message_descriptor._concrete_class() 

263 new_element._SetListener(self._message_listener) 

264 new_element.CopyFrom(value) 

265 self._values.append(new_element) 

266 if not self._message_listener.dirty: 

267 self._message_listener.Modified() 

268 

269 def insert(self, key: int, value: _T) -> None: 

270 """Inserts the item at the specified position by copying.""" 

271 new_element = self._message_descriptor._concrete_class() 

272 new_element._SetListener(self._message_listener) 

273 new_element.CopyFrom(value) 

274 self._values.insert(key, new_element) 

275 if not self._message_listener.dirty: 

276 self._message_listener.Modified() 

277 

278 def extend(self, elem_seq: Iterable[_T]) -> None: 

279 """Extends by appending the given sequence of elements of the same type 

280 

281 as this one, copying each individual message. 

282 """ 

283 message_class = self._message_descriptor._concrete_class 

284 listener = self._message_listener 

285 values = self._values 

286 for message in elem_seq: 

287 new_element = message_class() 

288 new_element._SetListener(listener) 

289 new_element.MergeFrom(message) 

290 values.append(new_element) 

291 listener.Modified() 

292 

293 def MergeFrom( 

294 self, 

295 other: Union['RepeatedCompositeFieldContainer[_T]', Iterable[_T]], 

296 ) -> None: 

297 """Appends the contents of another repeated field of the same type to this 

298 one, copying each individual message. 

299 """ 

300 self.extend(other) 

301 

302 def remove(self, elem: _T) -> None: 

303 """Removes an item from the list. Similar to list.remove().""" 

304 self._values.remove(elem) 

305 self._message_listener.Modified() 

306 

307 def pop(self, key: Optional[int] = -1) -> _T: 

308 """Removes and returns an item at a given index. Similar to list.pop().""" 

309 value = self._values[key] 

310 self.__delitem__(key) 

311 return value 

312 

313 @overload 

314 def __setitem__(self, key: int, value: _T) -> None: 

315 ... 

316 

317 @overload 

318 def __setitem__(self, key: slice, value: Iterable[_T]) -> None: 

319 ... 

320 

321 def __setitem__(self, key, value): 

322 # This method is implemented to make RepeatedCompositeFieldContainer 

323 # structurally compatible with typing.MutableSequence. It is 

324 # otherwise unsupported and will always raise an error. 

325 raise TypeError( 

326 f'{self.__class__.__name__} object does not support item assignment') 

327 

328 def __delitem__(self, key: Union[int, slice]) -> None: 

329 """Deletes the item at the specified position.""" 

330 del self._values[key] 

331 self._message_listener.Modified() 

332 

333 def __eq__(self, other: Any) -> bool: 

334 """Compares the current instance with another one.""" 

335 if self is other: 

336 return True 

337 if not isinstance(other, self.__class__): 

338 raise TypeError('Can only compare repeated composite fields against ' 

339 'other repeated composite fields.') 

340 return self._values == other._values 

341 

342 

343class ScalarMap(MutableMapping[_K, _V]): 

344 """Simple, type-checked, dict-like container for holding repeated scalars.""" 

345 

346 # Disallows assignment to other attributes. 

347 __slots__ = ['_key_checker', '_value_checker', '_values', '_message_listener', 

348 '_entry_descriptor'] 

349 

350 def __init__( 

351 self, 

352 message_listener: Any, 

353 key_checker: Any, 

354 value_checker: Any, 

355 entry_descriptor: Any, 

356 ) -> None: 

357 """ 

358 Args: 

359 message_listener: A MessageListener implementation. 

360 The ScalarMap will call this object's Modified() method when it 

361 is modified. 

362 key_checker: A type_checkers.ValueChecker instance to run on keys 

363 inserted into this container. 

364 value_checker: A type_checkers.ValueChecker instance to run on values 

365 inserted into this container. 

366 entry_descriptor: The MessageDescriptor of a map entry: key and value. 

367 """ 

368 self._message_listener = message_listener 

369 self._key_checker = key_checker 

370 self._value_checker = value_checker 

371 self._entry_descriptor = entry_descriptor 

372 self._values = {} 

373 

374 def __getitem__(self, key: _K) -> _V: 

375 try: 

376 return self._values[key] 

377 except KeyError: 

378 key = self._key_checker.CheckValue(key) 

379 val = self._value_checker.DefaultValue() 

380 self._values[key] = val 

381 return val 

382 

383 def __contains__(self, item: _K) -> bool: 

384 # We check the key's type to match the strong-typing flavor of the API. 

385 # Also this makes it easier to match the behavior of the C++ implementation. 

386 self._key_checker.CheckValue(item) 

387 return item in self._values 

388 

389 @overload 

390 def get(self, key: _K) -> Optional[_V]: 

391 ... 

392 

393 @overload 

394 def get(self, key: _K, default: _T) -> Union[_V, _T]: 

395 ... 

396 

397 # We need to override this explicitly, because our defaultdict-like behavior 

398 # will make the default implementation (from our base class) always insert 

399 # the key. 

400 def get(self, key, default=None): 

401 if key in self: 

402 return self[key] 

403 else: 

404 return default 

405 

406 def __setitem__(self, key: _K, value: _V) -> _T: 

407 checked_key = self._key_checker.CheckValue(key) 

408 checked_value = self._value_checker.CheckValue(value) 

409 self._values[checked_key] = checked_value 

410 self._message_listener.Modified() 

411 

412 def __delitem__(self, key: _K) -> None: 

413 del self._values[key] 

414 self._message_listener.Modified() 

415 

416 def __len__(self) -> int: 

417 return len(self._values) 

418 

419 def __iter__(self) -> Iterator[_K]: 

420 return iter(self._values) 

421 

422 def __repr__(self) -> str: 

423 return repr(self._values) 

424 

425 def MergeFrom(self, other: 'ScalarMap[_K, _V]') -> None: 

426 self._values.update(other._values) 

427 self._message_listener.Modified() 

428 

429 def InvalidateIterators(self) -> None: 

430 # It appears that the only way to reliably invalidate iterators to 

431 # self._values is to ensure that its size changes. 

432 original = self._values 

433 self._values = original.copy() 

434 original[None] = None 

435 

436 # This is defined in the abstract base, but we can do it much more cheaply. 

437 def clear(self) -> None: 

438 self._values.clear() 

439 self._message_listener.Modified() 

440 

441 def GetEntryClass(self) -> Any: 

442 return self._entry_descriptor._concrete_class 

443 

444 

445class MessageMap(MutableMapping[_K, _V]): 

446 """Simple, type-checked, dict-like container for with submessage values.""" 

447 

448 # Disallows assignment to other attributes. 

449 __slots__ = ['_key_checker', '_values', '_message_listener', 

450 '_message_descriptor', '_entry_descriptor'] 

451 

452 def __init__( 

453 self, 

454 message_listener: Any, 

455 message_descriptor: Any, 

456 key_checker: Any, 

457 entry_descriptor: Any, 

458 ) -> None: 

459 """ 

460 Args: 

461 message_listener: A MessageListener implementation. 

462 The ScalarMap will call this object's Modified() method when it 

463 is modified. 

464 key_checker: A type_checkers.ValueChecker instance to run on keys 

465 inserted into this container. 

466 value_checker: A type_checkers.ValueChecker instance to run on values 

467 inserted into this container. 

468 entry_descriptor: The MessageDescriptor of a map entry: key and value. 

469 """ 

470 self._message_listener = message_listener 

471 self._message_descriptor = message_descriptor 

472 self._key_checker = key_checker 

473 self._entry_descriptor = entry_descriptor 

474 self._values = {} 

475 

476 def __getitem__(self, key: _K) -> _V: 

477 key = self._key_checker.CheckValue(key) 

478 try: 

479 return self._values[key] 

480 except KeyError: 

481 new_element = self._message_descriptor._concrete_class() 

482 new_element._SetListener(self._message_listener) 

483 self._values[key] = new_element 

484 self._message_listener.Modified() 

485 return new_element 

486 

487 def get_or_create(self, key: _K) -> _V: 

488 """get_or_create() is an alias for getitem (ie. map[key]). 

489 

490 Args: 

491 key: The key to get or create in the map. 

492 

493 This is useful in cases where you want to be explicit that the call is 

494 mutating the map. This can avoid lint errors for statements like this 

495 that otherwise would appear to be pointless statements: 

496 

497 msg.my_map[key] 

498 """ 

499 return self[key] 

500 

501 @overload 

502 def get(self, key: _K) -> Optional[_V]: 

503 ... 

504 

505 @overload 

506 def get(self, key: _K, default: _T) -> Union[_V, _T]: 

507 ... 

508 

509 # We need to override this explicitly, because our defaultdict-like behavior 

510 # will make the default implementation (from our base class) always insert 

511 # the key. 

512 def get(self, key, default=None): 

513 if key in self: 

514 return self[key] 

515 else: 

516 return default 

517 

518 def __contains__(self, item: _K) -> bool: 

519 item = self._key_checker.CheckValue(item) 

520 return item in self._values 

521 

522 def __setitem__(self, key: _K, value: _V) -> NoReturn: 

523 raise ValueError('May not set values directly, call my_map[key].foo = 5') 

524 

525 def __delitem__(self, key: _K) -> None: 

526 key = self._key_checker.CheckValue(key) 

527 del self._values[key] 

528 self._message_listener.Modified() 

529 

530 def __len__(self) -> int: 

531 return len(self._values) 

532 

533 def __iter__(self) -> Iterator[_K]: 

534 return iter(self._values) 

535 

536 def __repr__(self) -> str: 

537 return repr(self._values) 

538 

539 def MergeFrom(self, other: 'MessageMap[_K, _V]') -> None: 

540 # pylint: disable=protected-access 

541 for key in other._values: 

542 # According to documentation: "When parsing from the wire or when merging, 

543 # if there are duplicate map keys the last key seen is used". 

544 if key in self: 

545 del self[key] 

546 self[key].CopyFrom(other[key]) 

547 # self._message_listener.Modified() not required here, because 

548 # mutations to submessages already propagate. 

549 

550 def InvalidateIterators(self) -> None: 

551 # It appears that the only way to reliably invalidate iterators to 

552 # self._values is to ensure that its size changes. 

553 original = self._values 

554 self._values = original.copy() 

555 original[None] = None 

556 

557 # This is defined in the abstract base, but we can do it much more cheaply. 

558 def clear(self) -> None: 

559 self._values.clear() 

560 self._message_listener.Modified() 

561 

562 def GetEntryClass(self) -> Any: 

563 return self._entry_descriptor._concrete_class 

564 

565 

566class _UnknownField: 

567 """A parsed unknown field.""" 

568 

569 # Disallows assignment to other attributes. 

570 __slots__ = ['_field_number', '_wire_type', '_data'] 

571 

572 def __init__(self, field_number, wire_type, data): 

573 self._field_number = field_number 

574 self._wire_type = wire_type 

575 self._data = data 

576 return 

577 

578 def __lt__(self, other): 

579 # pylint: disable=protected-access 

580 return self._field_number < other._field_number 

581 

582 def __eq__(self, other): 

583 if self is other: 

584 return True 

585 # pylint: disable=protected-access 

586 return (self._field_number == other._field_number and 

587 self._wire_type == other._wire_type and 

588 self._data == other._data) 

589 

590 

591class UnknownFieldRef: # pylint: disable=missing-class-docstring 

592 

593 def __init__(self, parent, index): 

594 self._parent = parent 

595 self._index = index 

596 

597 def _check_valid(self): 

598 if not self._parent: 

599 raise ValueError('UnknownField does not exist. ' 

600 'The parent message might be cleared.') 

601 if self._index >= len(self._parent): 

602 raise ValueError('UnknownField does not exist. ' 

603 'The parent message might be cleared.') 

604 

605 @property 

606 def field_number(self): 

607 self._check_valid() 

608 # pylint: disable=protected-access 

609 return self._parent._internal_get(self._index)._field_number 

610 

611 @property 

612 def wire_type(self): 

613 self._check_valid() 

614 # pylint: disable=protected-access 

615 return self._parent._internal_get(self._index)._wire_type 

616 

617 @property 

618 def data(self): 

619 self._check_valid() 

620 # pylint: disable=protected-access 

621 return self._parent._internal_get(self._index)._data 

622 

623 

624class UnknownFieldSet: 

625 """UnknownField container""" 

626 

627 # Disallows assignment to other attributes. 

628 __slots__ = ['_values'] 

629 

630 def __init__(self): 

631 self._values = [] 

632 

633 def __getitem__(self, index): 

634 if self._values is None: 

635 raise ValueError('UnknownFields does not exist. ' 

636 'The parent message might be cleared.') 

637 size = len(self._values) 

638 if index < 0: 

639 index += size 

640 if index < 0 or index >= size: 

641 raise IndexError('index %d out of range'.index) 

642 

643 return UnknownFieldRef(self, index) 

644 

645 def _internal_get(self, index): 

646 return self._values[index] 

647 

648 def __len__(self): 

649 if self._values is None: 

650 raise ValueError('UnknownFields does not exist. ' 

651 'The parent message might be cleared.') 

652 return len(self._values) 

653 

654 def _add(self, field_number, wire_type, data): 

655 unknown_field = _UnknownField(field_number, wire_type, data) 

656 self._values.append(unknown_field) 

657 return unknown_field 

658 

659 def __iter__(self): 

660 for i in range(len(self)): 

661 yield UnknownFieldRef(self, i) 

662 

663 def _extend(self, other): 

664 if other is None: 

665 return 

666 # pylint: disable=protected-access 

667 self._values.extend(other._values) 

668 

669 def __eq__(self, other): 

670 if self is other: 

671 return True 

672 # Sort unknown fields because their order shouldn't 

673 # affect equality test. 

674 values = list(self._values) 

675 if other is None: 

676 return not values 

677 values.sort() 

678 # pylint: disable=protected-access 

679 other_values = sorted(other._values) 

680 return values == other_values 

681 

682 def _clear(self): 

683 for value in self._values: 

684 # pylint: disable=protected-access 

685 if isinstance(value._data, UnknownFieldSet): 

686 value._data._clear() # pylint: disable=protected-access 

687 self._values = None