Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/multidict/_multidict_py.py: 49%
335 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
1import sys
2from array import array
3from collections import abc
5from ._abc import MultiMapping, MutableMultiMapping
7_marker = object()
10class istr(str):
12 """Case insensitive str."""
14 __is_istr__ = True
17upstr = istr # for relaxing backward compatibility problems
20def getversion(md):
21 if not isinstance(md, _Base):
22 raise TypeError("Parameter should be multidict or proxy")
23 return md._impl._version
26_version = array("Q", [0])
29class _Impl:
30 __slots__ = ("_items", "_version")
32 def __init__(self):
33 self._items = []
34 self.incr_version()
36 def incr_version(self):
37 global _version
38 v = _version
39 v[0] += 1
40 self._version = v[0]
42 if sys.implementation.name != "pypy":
44 def __sizeof__(self):
45 return object.__sizeof__(self) + sys.getsizeof(self._items)
48class _Base:
49 def _title(self, key):
50 return key
52 def getall(self, key, default=_marker):
53 """Return a list of all values matching the key."""
54 identity = self._title(key)
55 res = [v for i, k, v in self._impl._items if i == identity]
56 if res:
57 return res
58 if not res and default is not _marker:
59 return default
60 raise KeyError("Key not found: %r" % key)
62 def getone(self, key, default=_marker):
63 """Get first value matching the key."""
64 identity = self._title(key)
65 for i, k, v in self._impl._items:
66 if i == identity:
67 return v
68 if default is not _marker:
69 return default
70 raise KeyError("Key not found: %r" % key)
72 # Mapping interface #
74 def __getitem__(self, key):
75 return self.getone(key)
77 def get(self, key, default=None):
78 """Get first value matching the key.
80 The method is alias for .getone().
81 """
82 return self.getone(key, default)
84 def __iter__(self):
85 return iter(self.keys())
87 def __len__(self):
88 return len(self._impl._items)
90 def keys(self):
91 """Return a new view of the dictionary's keys."""
92 return _KeysView(self._impl)
94 def items(self):
95 """Return a new view of the dictionary's items *(key, value) pairs)."""
96 return _ItemsView(self._impl)
98 def values(self):
99 """Return a new view of the dictionary's values."""
100 return _ValuesView(self._impl)
102 def __eq__(self, other):
103 if not isinstance(other, abc.Mapping):
104 return NotImplemented
105 if isinstance(other, _Base):
106 lft = self._impl._items
107 rht = other._impl._items
108 if len(lft) != len(rht):
109 return False
110 for (i1, k2, v1), (i2, k2, v2) in zip(lft, rht):
111 if i1 != i2 or v1 != v2:
112 return False
113 return True
114 if len(self._impl._items) != len(other):
115 return False
116 for k, v in self.items():
117 nv = other.get(k, _marker)
118 if v != nv:
119 return False
120 return True
122 def __contains__(self, key):
123 identity = self._title(key)
124 for i, k, v in self._impl._items:
125 if i == identity:
126 return True
127 return False
129 def __repr__(self):
130 body = ", ".join("'{}': {!r}".format(k, v) for k, v in self.items())
131 return "<{}({})>".format(self.__class__.__name__, body)
134class MultiDictProxy(_Base, MultiMapping):
135 """Read-only proxy for MultiDict instance."""
137 def __init__(self, arg):
138 if not isinstance(arg, (MultiDict, MultiDictProxy)):
139 raise TypeError(
140 "ctor requires MultiDict or MultiDictProxy instance"
141 ", not {}".format(type(arg))
142 )
144 self._impl = arg._impl
146 def __reduce__(self):
147 raise TypeError("can't pickle {} objects".format(self.__class__.__name__))
149 def copy(self):
150 """Return a copy of itself."""
151 return MultiDict(self.items())
154class CIMultiDictProxy(MultiDictProxy):
155 """Read-only proxy for CIMultiDict instance."""
157 def __init__(self, arg):
158 if not isinstance(arg, (CIMultiDict, CIMultiDictProxy)):
159 raise TypeError(
160 "ctor requires CIMultiDict or CIMultiDictProxy instance"
161 ", not {}".format(type(arg))
162 )
164 self._impl = arg._impl
166 def _title(self, key):
167 return key.title()
169 def copy(self):
170 """Return a copy of itself."""
171 return CIMultiDict(self.items())
174class MultiDict(_Base, MutableMultiMapping):
175 """Dictionary with the support for duplicate keys."""
177 def __init__(self, *args, **kwargs):
178 self._impl = _Impl()
180 self._extend(args, kwargs, self.__class__.__name__, self._extend_items)
182 if sys.implementation.name != "pypy":
184 def __sizeof__(self):
185 return object.__sizeof__(self) + sys.getsizeof(self._impl)
187 def __reduce__(self):
188 return (self.__class__, (list(self.items()),))
190 def _title(self, key):
191 return key
193 def _key(self, key):
194 if isinstance(key, str):
195 return key
196 else:
197 raise TypeError(
198 "MultiDict keys should be either str " "or subclasses of str"
199 )
201 def add(self, key, value):
202 identity = self._title(key)
203 self._impl._items.append((identity, self._key(key), value))
204 self._impl.incr_version()
206 def copy(self):
207 """Return a copy of itself."""
208 cls = self.__class__
209 return cls(self.items())
211 __copy__ = copy
213 def extend(self, *args, **kwargs):
214 """Extend current MultiDict with more values.
216 This method must be used instead of update.
217 """
218 self._extend(args, kwargs, "extend", self._extend_items)
220 def _extend(self, args, kwargs, name, method):
221 if len(args) > 1:
222 raise TypeError(
223 "{} takes at most 1 positional argument"
224 " ({} given)".format(name, len(args))
225 )
226 if args:
227 arg = args[0]
228 if isinstance(args[0], (MultiDict, MultiDictProxy)) and not kwargs:
229 items = arg._impl._items
230 else:
231 if hasattr(arg, "items"):
232 arg = arg.items()
233 if kwargs:
234 arg = list(arg)
235 arg.extend(list(kwargs.items()))
236 items = []
237 for item in arg:
238 if not len(item) == 2:
239 raise TypeError(
240 "{} takes either dict or list of (key, value) "
241 "tuples".format(name)
242 )
243 items.append((self._title(item[0]), self._key(item[0]), item[1]))
245 method(items)
246 else:
247 method(
248 [
249 (self._title(key), self._key(key), value)
250 for key, value in kwargs.items()
251 ]
252 )
254 def _extend_items(self, items):
255 for identity, key, value in items:
256 self.add(key, value)
258 def clear(self):
259 """Remove all items from MultiDict."""
260 self._impl._items.clear()
261 self._impl.incr_version()
263 # Mapping interface #
265 def __setitem__(self, key, value):
266 self._replace(key, value)
268 def __delitem__(self, key):
269 identity = self._title(key)
270 items = self._impl._items
271 found = False
272 for i in range(len(items) - 1, -1, -1):
273 if items[i][0] == identity:
274 del items[i]
275 found = True
276 if not found:
277 raise KeyError(key)
278 else:
279 self._impl.incr_version()
281 def setdefault(self, key, default=None):
282 """Return value for key, set value to default if key is not present."""
283 identity = self._title(key)
284 for i, k, v in self._impl._items:
285 if i == identity:
286 return v
287 self.add(key, default)
288 return default
290 def popone(self, key, default=_marker):
291 """Remove specified key and return the corresponding value.
293 If key is not found, d is returned if given, otherwise
294 KeyError is raised.
296 """
297 identity = self._title(key)
298 for i in range(len(self._impl._items)):
299 if self._impl._items[i][0] == identity:
300 value = self._impl._items[i][2]
301 del self._impl._items[i]
302 self._impl.incr_version()
303 return value
304 if default is _marker:
305 raise KeyError(key)
306 else:
307 return default
309 pop = popone # type: ignore
311 def popall(self, key, default=_marker):
312 """Remove all occurrences of key and return the list of corresponding
313 values.
315 If key is not found, default is returned if given, otherwise
316 KeyError is raised.
318 """
319 found = False
320 identity = self._title(key)
321 ret = []
322 for i in range(len(self._impl._items) - 1, -1, -1):
323 item = self._impl._items[i]
324 if item[0] == identity:
325 ret.append(item[2])
326 del self._impl._items[i]
327 self._impl.incr_version()
328 found = True
329 if not found:
330 if default is _marker:
331 raise KeyError(key)
332 else:
333 return default
334 else:
335 ret.reverse()
336 return ret
338 def popitem(self):
339 """Remove and return an arbitrary (key, value) pair."""
340 if self._impl._items:
341 i = self._impl._items.pop(0)
342 self._impl.incr_version()
343 return i[1], i[2]
344 else:
345 raise KeyError("empty multidict")
347 def update(self, *args, **kwargs):
348 """Update the dictionary from *other*, overwriting existing keys."""
349 self._extend(args, kwargs, "update", self._update_items)
351 def _update_items(self, items):
352 if not items:
353 return
354 used_keys = {}
355 for identity, key, value in items:
356 start = used_keys.get(identity, 0)
357 for i in range(start, len(self._impl._items)):
358 item = self._impl._items[i]
359 if item[0] == identity:
360 used_keys[identity] = i + 1
361 self._impl._items[i] = (identity, key, value)
362 break
363 else:
364 self._impl._items.append((identity, key, value))
365 used_keys[identity] = len(self._impl._items)
367 # drop tails
368 i = 0
369 while i < len(self._impl._items):
370 item = self._impl._items[i]
371 identity = item[0]
372 pos = used_keys.get(identity)
373 if pos is None:
374 i += 1
375 continue
376 if i >= pos:
377 del self._impl._items[i]
378 else:
379 i += 1
381 self._impl.incr_version()
383 def _replace(self, key, value):
384 key = self._key(key)
385 identity = self._title(key)
386 items = self._impl._items
388 for i in range(len(items)):
389 item = items[i]
390 if item[0] == identity:
391 items[i] = (identity, key, value)
392 # i points to last found item
393 rgt = i
394 self._impl.incr_version()
395 break
396 else:
397 self._impl._items.append((identity, key, value))
398 self._impl.incr_version()
399 return
401 # remove all tail items
402 i = rgt + 1
403 while i < len(items):
404 item = items[i]
405 if item[0] == identity:
406 del items[i]
407 else:
408 i += 1
411class CIMultiDict(MultiDict):
412 """Dictionary with the support for duplicate case-insensitive keys."""
414 def _title(self, key):
415 return key.title()
418class _Iter:
419 __slots__ = ("_size", "_iter")
421 def __init__(self, size, iterator):
422 self._size = size
423 self._iter = iterator
425 def __iter__(self):
426 return self
428 def __next__(self):
429 return next(self._iter)
431 def __length_hint__(self):
432 return self._size
435class _ViewBase:
436 def __init__(self, impl):
437 self._impl = impl
439 def __len__(self):
440 return len(self._impl._items)
443class _ItemsView(_ViewBase, abc.ItemsView):
444 def __contains__(self, item):
445 assert isinstance(item, tuple) or isinstance(item, list)
446 assert len(item) == 2
447 for i, k, v in self._impl._items:
448 if item[0] == k and item[1] == v:
449 return True
450 return False
452 def __iter__(self):
453 return _Iter(len(self), self._iter(self._impl._version))
455 def _iter(self, version):
456 for i, k, v in self._impl._items:
457 if version != self._impl._version:
458 raise RuntimeError("Dictionary changed during iteration")
459 yield k, v
461 def __repr__(self):
462 lst = []
463 for item in self._impl._items:
464 lst.append("{!r}: {!r}".format(item[1], item[2]))
465 body = ", ".join(lst)
466 return "{}({})".format(self.__class__.__name__, body)
469class _ValuesView(_ViewBase, abc.ValuesView):
470 def __contains__(self, value):
471 for item in self._impl._items:
472 if item[2] == value:
473 return True
474 return False
476 def __iter__(self):
477 return _Iter(len(self), self._iter(self._impl._version))
479 def _iter(self, version):
480 for item in self._impl._items:
481 if version != self._impl._version:
482 raise RuntimeError("Dictionary changed during iteration")
483 yield item[2]
485 def __repr__(self):
486 lst = []
487 for item in self._impl._items:
488 lst.append("{!r}".format(item[2]))
489 body = ", ".join(lst)
490 return "{}({})".format(self.__class__.__name__, body)
493class _KeysView(_ViewBase, abc.KeysView):
494 def __contains__(self, key):
495 for item in self._impl._items:
496 if item[1] == key:
497 return True
498 return False
500 def __iter__(self):
501 return _Iter(len(self), self._iter(self._impl._version))
503 def _iter(self, version):
504 for item in self._impl._items:
505 if version != self._impl._version:
506 raise RuntimeError("Dictionary changed during iteration")
507 yield item[1]
509 def __repr__(self):
510 lst = []
511 for item in self._impl._items:
512 lst.append("{!r}".format(item[1]))
513 body = ", ".join(lst)
514 return "{}({})".format(self.__class__.__name__, body)