1from collections.abc import Sequence, Hashable
2from itertools import islice, chain
3from numbers import Integral
4from typing import TypeVar, Generic
5from pyrsistent._plist import plist
6
7T_co = TypeVar('T_co', covariant=True)
8
9
10class PDeque(Generic[T_co]):
11 """
12 Persistent double ended queue (deque). Allows quick appends and pops in both ends. Implemented
13 using two persistent lists.
14
15 A maximum length can be specified to create a bounded queue.
16
17 Fully supports the Sequence and Hashable protocols including indexing and slicing but
18 if you need fast random access go for the PVector instead.
19
20 Do not instantiate directly, instead use the factory functions :py:func:`dq` or :py:func:`pdeque` to
21 create an instance.
22
23 Some examples:
24
25 >>> x = pdeque([1, 2, 3])
26 >>> x.left
27 1
28 >>> x.right
29 3
30 >>> x[0] == x.left
31 True
32 >>> x[-1] == x.right
33 True
34 >>> x.pop()
35 pdeque([1, 2])
36 >>> x.pop() == x[:-1]
37 True
38 >>> x.popleft()
39 pdeque([2, 3])
40 >>> x.append(4)
41 pdeque([1, 2, 3, 4])
42 >>> x.appendleft(4)
43 pdeque([4, 1, 2, 3])
44
45 >>> y = pdeque([1, 2, 3], maxlen=3)
46 >>> y.append(4)
47 pdeque([2, 3, 4], maxlen=3)
48 >>> y.appendleft(4)
49 pdeque([4, 1, 2], maxlen=3)
50 """
51 __slots__ = ('_left_list', '_right_list', '_length', '_maxlen', '__weakref__')
52
53 def __new__(cls, left_list, right_list, length, maxlen=None):
54 instance = super(PDeque, cls).__new__(cls)
55 instance._left_list = left_list
56 instance._right_list = right_list
57 instance._length = length
58
59 if maxlen is not None:
60 if not isinstance(maxlen, Integral):
61 raise TypeError('An integer is required as maxlen')
62
63 if maxlen < 0:
64 raise ValueError("maxlen must be non-negative")
65
66 instance._maxlen = maxlen
67 return instance
68
69 @property
70 def right(self):
71 """
72 Rightmost element in dqueue.
73 """
74 return PDeque._tip_from_lists(self._right_list, self._left_list)
75
76 @property
77 def left(self):
78 """
79 Leftmost element in dqueue.
80 """
81 return PDeque._tip_from_lists(self._left_list, self._right_list)
82
83 @staticmethod
84 def _tip_from_lists(primary_list, secondary_list):
85 if primary_list:
86 return primary_list.first
87
88 if secondary_list:
89 return secondary_list[-1]
90
91 raise IndexError('No elements in empty deque')
92
93 def __iter__(self):
94 return chain(self._left_list, self._right_list.reverse())
95
96 def __repr__(self):
97 return "pdeque({0}{1})".format(list(self),
98 ', maxlen={0}'.format(self._maxlen) if self._maxlen is not None else '')
99 __str__ = __repr__
100
101 @property
102 def maxlen(self):
103 """
104 Maximum length of the queue.
105 """
106 return self._maxlen
107
108 def pop(self, count=1):
109 """
110 Return new deque with rightmost element removed. Popping the empty queue
111 will return the empty queue. A optional count can be given to indicate the
112 number of elements to pop. Popping with a negative index is the same as
113 popleft. Executes in amortized O(k) where k is the number of elements to pop.
114
115 >>> pdeque([1, 2]).pop()
116 pdeque([1])
117 >>> pdeque([1, 2]).pop(2)
118 pdeque([])
119 >>> pdeque([1, 2]).pop(-1)
120 pdeque([2])
121 """
122 if count < 0:
123 return self.popleft(-count)
124
125 new_right_list, new_left_list = PDeque._pop_lists(self._right_list, self._left_list, count)
126 return PDeque(new_left_list, new_right_list, max(self._length - count, 0), self._maxlen)
127
128 def popleft(self, count=1):
129 """
130 Return new deque with leftmost element removed. Otherwise functionally
131 equivalent to pop().
132
133 >>> pdeque([1, 2]).popleft()
134 pdeque([2])
135 """
136 if count < 0:
137 return self.pop(-count)
138
139 new_left_list, new_right_list = PDeque._pop_lists(self._left_list, self._right_list, count)
140 return PDeque(new_left_list, new_right_list, max(self._length - count, 0), self._maxlen)
141
142 @staticmethod
143 def _pop_lists(primary_list, secondary_list, count):
144 new_primary_list = primary_list
145 new_secondary_list = secondary_list
146
147 while count > 0 and (new_primary_list or new_secondary_list):
148 count -= 1
149 if new_primary_list.rest:
150 new_primary_list = new_primary_list.rest
151 elif new_primary_list:
152 new_primary_list = new_secondary_list.reverse()
153 new_secondary_list = plist()
154 else:
155 new_primary_list = new_secondary_list.reverse().rest
156 new_secondary_list = plist()
157
158 return new_primary_list, new_secondary_list
159
160 def _is_empty(self):
161 return not self._left_list and not self._right_list
162
163 def __lt__(self, other):
164 if not isinstance(other, PDeque):
165 return NotImplemented
166
167 return tuple(self) < tuple(other)
168
169 def __eq__(self, other):
170 if not isinstance(other, PDeque):
171 return NotImplemented
172
173 if tuple(self) == tuple(other):
174 # Sanity check of the length value since it is redundant (there for performance)
175 assert len(self) == len(other)
176 return True
177
178 return False
179
180 def __hash__(self):
181 return hash(tuple(self))
182
183 def __len__(self):
184 return self._length
185
186 def append(self, elem):
187 """
188 Return new deque with elem as the rightmost element.
189
190 >>> pdeque([1, 2]).append(3)
191 pdeque([1, 2, 3])
192 """
193 new_left_list, new_right_list, new_length = self._append(self._left_list, self._right_list, elem)
194 return PDeque(new_left_list, new_right_list, new_length, self._maxlen)
195
196 def appendleft(self, elem):
197 """
198 Return new deque with elem as the leftmost element.
199
200 >>> pdeque([1, 2]).appendleft(3)
201 pdeque([3, 1, 2])
202 """
203 new_right_list, new_left_list, new_length = self._append(self._right_list, self._left_list, elem)
204 return PDeque(new_left_list, new_right_list, new_length, self._maxlen)
205
206 def _append(self, primary_list, secondary_list, elem):
207 if self._maxlen is not None and self._length == self._maxlen:
208 if self._maxlen == 0:
209 return primary_list, secondary_list, 0
210 new_primary_list, new_secondary_list = PDeque._pop_lists(primary_list, secondary_list, 1)
211 return new_primary_list, new_secondary_list.cons(elem), self._length
212
213 return primary_list, secondary_list.cons(elem), self._length + 1
214
215 @staticmethod
216 def _extend_list(the_list, iterable):
217 count = 0
218 for elem in iterable:
219 the_list = the_list.cons(elem)
220 count += 1
221
222 return the_list, count
223
224 def _extend(self, primary_list, secondary_list, iterable):
225 new_primary_list, extend_count = PDeque._extend_list(primary_list, iterable)
226 new_secondary_list = secondary_list
227 current_len = self._length + extend_count
228 if self._maxlen is not None and current_len > self._maxlen:
229 pop_len = current_len - self._maxlen
230 new_secondary_list, new_primary_list = PDeque._pop_lists(new_secondary_list, new_primary_list, pop_len)
231 extend_count -= pop_len
232
233 return new_primary_list, new_secondary_list, extend_count
234
235 def extend(self, iterable):
236 """
237 Return new deque with all elements of iterable appended to the right.
238
239 >>> pdeque([1, 2]).extend([3, 4])
240 pdeque([1, 2, 3, 4])
241 """
242 new_right_list, new_left_list, extend_count = self._extend(self._right_list, self._left_list, iterable)
243 return PDeque(new_left_list, new_right_list, self._length + extend_count, self._maxlen)
244
245 def extendleft(self, iterable):
246 """
247 Return new deque with all elements of iterable appended to the left.
248
249 NB! The elements will be inserted in reverse order compared to the order in the iterable.
250
251 >>> pdeque([1, 2]).extendleft([3, 4])
252 pdeque([4, 3, 1, 2])
253 """
254 new_left_list, new_right_list, extend_count = self._extend(self._left_list, self._right_list, iterable)
255 return PDeque(new_left_list, new_right_list, self._length + extend_count, self._maxlen)
256
257 def count(self, elem):
258 """
259 Return the number of elements equal to elem present in the queue
260
261 >>> pdeque([1, 2, 1]).count(1)
262 2
263 """
264 return self._left_list.count(elem) + self._right_list.count(elem)
265
266 def remove(self, elem):
267 """
268 Return new deque with first element from left equal to elem removed. If no such element is found
269 a ValueError is raised.
270
271 >>> pdeque([2, 1, 2]).remove(2)
272 pdeque([1, 2])
273 """
274 try:
275 return PDeque(self._left_list.remove(elem), self._right_list, self._length - 1)
276 except ValueError:
277 # Value not found in left list, try the right list
278 try:
279 # This is severely inefficient with a double reverse, should perhaps implement a remove_last()?
280 return PDeque(self._left_list,
281 self._right_list.reverse().remove(elem).reverse(), self._length - 1)
282 except ValueError as e:
283 raise ValueError('{0} not found in PDeque'.format(elem)) from e
284
285 def reverse(self):
286 """
287 Return reversed deque.
288
289 >>> pdeque([1, 2, 3]).reverse()
290 pdeque([3, 2, 1])
291
292 Also supports the standard python reverse function.
293
294 >>> reversed(pdeque([1, 2, 3]))
295 pdeque([3, 2, 1])
296 """
297 return PDeque(self._right_list, self._left_list, self._length)
298 __reversed__ = reverse
299
300 def rotate(self, steps):
301 """
302 Return deque with elements rotated steps steps.
303
304 >>> x = pdeque([1, 2, 3])
305 >>> x.rotate(1)
306 pdeque([3, 1, 2])
307 >>> x.rotate(-2)
308 pdeque([3, 1, 2])
309 """
310 popped_deque = self.pop(steps)
311 if steps >= 0:
312 return popped_deque.extendleft(islice(self.reverse(), steps))
313
314 return popped_deque.extend(islice(self, -steps))
315
316 def __reduce__(self):
317 # Pickling support
318 return pdeque, (list(self), self._maxlen)
319
320 def __getitem__(self, index):
321 if isinstance(index, slice):
322 if index.step is not None and index.step != 1:
323 # Too difficult, no structural sharing possible
324 return pdeque(tuple(self)[index], maxlen=self._maxlen)
325
326 result = self
327 if index.start is not None:
328 result = result.popleft(index.start % self._length)
329 if index.stop is not None:
330 result = result.pop(self._length - (index.stop % self._length))
331
332 return result
333
334 if not isinstance(index, Integral):
335 raise TypeError("'%s' object cannot be interpreted as an index" % type(index).__name__)
336
337 if index >= 0:
338 return self.popleft(index).left
339
340 shifted = len(self) + index
341 if shifted < 0:
342 raise IndexError(
343 "pdeque index {0} out of range {1}".format(index, len(self)),
344 )
345 return self.popleft(shifted).left
346
347 index = Sequence.index
348
349Sequence.register(PDeque)
350Hashable.register(PDeque)
351
352
353def pdeque(iterable=(), maxlen=None):
354 """
355 Return deque containing the elements of iterable. If maxlen is specified then
356 len(iterable) - maxlen elements are discarded from the left to if len(iterable) > maxlen.
357
358 >>> pdeque([1, 2, 3])
359 pdeque([1, 2, 3])
360 >>> pdeque([1, 2, 3, 4], maxlen=2)
361 pdeque([3, 4], maxlen=2)
362 """
363 t = tuple(iterable)
364 if maxlen is not None:
365 t = t[-maxlen:]
366 length = len(t)
367 pivot = int(length / 2)
368 left = plist(t[:pivot])
369 right = plist(t[pivot:], reverse=True)
370 return PDeque(left, right, length, maxlen)
371
372def dq(*elements):
373 """
374 Return deque containing all arguments.
375
376 >>> dq(1, 2, 3)
377 pdeque([1, 2, 3])
378 """
379 return pdeque(elements)