1# Copyright 2018 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import collections
16import copy
17from typing import Iterable
18
19from proto.utils import cached_property
20
21
22class Repeated(collections.abc.MutableSequence):
23 """A view around a mutable sequence in protocol buffers.
24
25 This implements the full Python MutableSequence interface, but all methods
26 modify the underlying field container directly.
27 """
28
29 def __init__(self, sequence, *, marshal, proto_type=None):
30 """Initialize a wrapper around a protobuf repeated field.
31
32 Args:
33 sequence: A protocol buffers repeated field.
34 marshal (~.MarshalRegistry): An instantiated marshal, used to
35 convert values going to and from this map.
36 """
37 self._pb = sequence
38 self._marshal = marshal
39 self._proto_type = proto_type
40
41 def __copy__(self):
42 """Copy this object and return the copy."""
43 return type(self)(self.pb[:], marshal=self._marshal)
44
45 def __delitem__(self, key):
46 """Delete the given item."""
47 del self.pb[key]
48
49 def __eq__(self, other):
50 if hasattr(other, "pb"):
51 return tuple(self.pb) == tuple(other.pb)
52 return tuple(self.pb) == tuple(other) if isinstance(other, Iterable) else False
53
54 def __getitem__(self, key):
55 """Return the given item."""
56 return self.pb[key]
57
58 def __len__(self):
59 """Return the length of the sequence."""
60 return len(self.pb)
61
62 def __ne__(self, other):
63 return not self == other
64
65 def __repr__(self):
66 return repr([*self])
67
68 def __setitem__(self, key, value):
69 self.pb[key] = value
70
71 def insert(self, index: int, value):
72 """Insert ``value`` in the sequence before ``index``."""
73 self.pb.insert(index, value)
74
75 def sort(self, *, key: str = None, reverse: bool = False):
76 """Stable sort *IN PLACE*."""
77 self.pb.sort(key=key, reverse=reverse)
78
79 @property
80 def pb(self):
81 return self._pb
82
83
84class RepeatedComposite(Repeated):
85 """A view around a mutable sequence of messages in protocol buffers.
86
87 This implements the full Python MutableSequence interface, but all methods
88 modify the underlying field container directly.
89 """
90
91 @cached_property
92 def _pb_type(self):
93 """Return the protocol buffer type for this sequence."""
94 # Provide the marshal-given proto_type, if any.
95 # Used for RepeatedComposite of Enum.
96 if self._proto_type is not None:
97 return self._proto_type
98
99 # There is no public-interface mechanism to determine the type
100 # of what should go in the list (and the C implementation seems to
101 # have no exposed mechanism at all).
102 #
103 # If the list has members, use the existing list members to
104 # determine the type.
105 if len(self.pb) > 0:
106 return type(self.pb[0])
107
108 # We have no members in the list, so we get the type from the attributes.
109 if hasattr(self.pb, "_message_descriptor") and hasattr(
110 self.pb._message_descriptor, "_concrete_class"
111 ):
112 return self.pb._message_descriptor._concrete_class
113
114 # Fallback logic in case attributes are not available
115 # In order to get the type, we create a throw-away copy and add a
116 # blank member to it.
117 canary = copy.deepcopy(self.pb).add()
118 return type(canary)
119
120 def __eq__(self, other):
121 if super().__eq__(other):
122 return True
123 return (
124 tuple([i for i in self]) == tuple(other)
125 if isinstance(other, Iterable)
126 else False
127 )
128
129 def __getitem__(self, key):
130 return self._marshal.to_python(self._pb_type, self.pb[key])
131
132 def __setitem__(self, key, value):
133 # The underlying protocol buffer does not define __setitem__, so we
134 # have to implement all the operations on our own.
135
136 # If ``key`` is an integer, as in list[index] = value:
137 if isinstance(key, int):
138 if -len(self) <= key < len(self):
139 self.pop(key) # Delete the old item.
140 self.insert(key, value) # Insert the new item in its place.
141 else:
142 raise IndexError("list assignment index out of range")
143
144 # If ``key`` is a slice object, as in list[start:stop:step] = [values]:
145 elif isinstance(key, slice):
146 start, stop, step = key.indices(len(self))
147
148 if not isinstance(value, collections.abc.Iterable):
149 raise TypeError("can only assign an iterable")
150
151 if step == 1: # Is not an extended slice.
152 # Assign all the new values to the sliced part, replacing the
153 # old values, if any, and unconditionally inserting those
154 # values whose indices already exceed the slice length.
155 for index, item in enumerate(value):
156 if start + index < stop:
157 self.pop(start + index)
158 self.insert(start + index, item)
159
160 # If there are less values than the length of the slice, remove
161 # the remaining elements so that the slice adapts to the
162 # newly provided values.
163 for _ in range(stop - start - len(value)):
164 self.pop(start + len(value))
165
166 else: # Is an extended slice.
167 indices = range(start, stop, step)
168
169 if len(value) != len(indices): # XXX: Use PEP 572 on 3.8+
170 raise ValueError(
171 f"attempt to assign sequence of size "
172 f"{len(value)} to extended slice of size "
173 f"{len(indices)}"
174 )
175
176 # Assign each value to its index, calling this function again
177 # with individual integer indexes that get processed above.
178 for index, item in zip(indices, value):
179 self[index] = item
180
181 else:
182 raise TypeError(
183 f"list indices must be integers or slices, not {type(key).__name__}"
184 )
185
186 def insert(self, index: int, value):
187 """Insert ``value`` in the sequence before ``index``."""
188 pb_value = self._marshal.to_proto(self._pb_type, value)
189 self.pb.insert(index, pb_value)