Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/multidict/_multidict_py.py: 49%
341 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-09 06:47 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-09 06:47 +0000
1import sys
2import types
3from array import array
4from collections import abc
6from ._abc import MultiMapping, MutableMultiMapping
8_marker = object()
10if sys.version_info >= (3, 9):
11 GenericAlias = types.GenericAlias
12else:
14 def GenericAlias(cls):
15 return cls
18class istr(str):
20 """Case insensitive str."""
22 __is_istr__ = True
25upstr = istr # for relaxing backward compatibility problems
28def getversion(md):
29 if not isinstance(md, _Base):
30 raise TypeError("Parameter should be multidict or proxy")
31 return md._impl._version
34_version = array("Q", [0])
37class _Impl:
38 __slots__ = ("_items", "_version")
40 def __init__(self):
41 self._items = []
42 self.incr_version()
44 def incr_version(self):
45 global _version
46 v = _version
47 v[0] += 1
48 self._version = v[0]
50 if sys.implementation.name != "pypy":
52 def __sizeof__(self):
53 return object.__sizeof__(self) + sys.getsizeof(self._items)
56class _Base:
57 def _title(self, key):
58 return key
60 def getall(self, key, default=_marker):
61 """Return a list of all values matching the key."""
62 identity = self._title(key)
63 res = [v for i, k, v in self._impl._items if i == identity]
64 if res:
65 return res
66 if not res and default is not _marker:
67 return default
68 raise KeyError("Key not found: %r" % key)
70 def getone(self, key, default=_marker):
71 """Get first value matching the key.
73 Raises KeyError if the key is not found and no default is provided.
74 """
75 identity = self._title(key)
76 for i, k, v in self._impl._items:
77 if i == identity:
78 return v
79 if default is not _marker:
80 return default
81 raise KeyError("Key not found: %r" % key)
83 # Mapping interface #
85 def __getitem__(self, key):
86 return self.getone(key)
88 def get(self, key, default=None):
89 """Get first value matching the key.
91 If the key is not found, returns the default (or None if no default is provided)
92 """
93 return self.getone(key, default)
95 def __iter__(self):
96 return iter(self.keys())
98 def __len__(self):
99 return len(self._impl._items)
101 def keys(self):
102 """Return a new view of the dictionary's keys."""
103 return _KeysView(self._impl)
105 def items(self):
106 """Return a new view of the dictionary's items *(key, value) pairs)."""
107 return _ItemsView(self._impl)
109 def values(self):
110 """Return a new view of the dictionary's values."""
111 return _ValuesView(self._impl)
113 def __eq__(self, other):
114 if not isinstance(other, abc.Mapping):
115 return NotImplemented
116 if isinstance(other, _Base):
117 lft = self._impl._items
118 rht = other._impl._items
119 if len(lft) != len(rht):
120 return False
121 for (i1, k2, v1), (i2, k2, v2) in zip(lft, rht):
122 if i1 != i2 or v1 != v2:
123 return False
124 return True
125 if len(self._impl._items) != len(other):
126 return False
127 for k, v in self.items():
128 nv = other.get(k, _marker)
129 if v != nv:
130 return False
131 return True
133 def __contains__(self, key):
134 identity = self._title(key)
135 for i, k, v in self._impl._items:
136 if i == identity:
137 return True
138 return False
140 def __repr__(self):
141 body = ", ".join("'{}': {!r}".format(k, v) for k, v in self.items())
142 return "<{}({})>".format(self.__class__.__name__, body)
144 __class_getitem__ = classmethod(GenericAlias)
147class MultiDictProxy(_Base, MultiMapping):
148 """Read-only proxy for MultiDict instance."""
150 def __init__(self, arg):
151 if not isinstance(arg, (MultiDict, MultiDictProxy)):
152 raise TypeError(
153 "ctor requires MultiDict or MultiDictProxy instance"
154 ", not {}".format(type(arg))
155 )
157 self._impl = arg._impl
159 def __reduce__(self):
160 raise TypeError("can't pickle {} objects".format(self.__class__.__name__))
162 def copy(self):
163 """Return a copy of itself."""
164 return MultiDict(self.items())
167class CIMultiDictProxy(MultiDictProxy):
168 """Read-only proxy for CIMultiDict instance."""
170 def __init__(self, arg):
171 if not isinstance(arg, (CIMultiDict, CIMultiDictProxy)):
172 raise TypeError(
173 "ctor requires CIMultiDict or CIMultiDictProxy instance"
174 ", not {}".format(type(arg))
175 )
177 self._impl = arg._impl
179 def _title(self, key):
180 return key.title()
182 def copy(self):
183 """Return a copy of itself."""
184 return CIMultiDict(self.items())
187class MultiDict(_Base, MutableMultiMapping):
188 """Dictionary with the support for duplicate keys."""
190 def __init__(self, *args, **kwargs):
191 self._impl = _Impl()
193 self._extend(args, kwargs, self.__class__.__name__, self._extend_items)
195 if sys.implementation.name != "pypy":
197 def __sizeof__(self):
198 return object.__sizeof__(self) + sys.getsizeof(self._impl)
200 def __reduce__(self):
201 return (self.__class__, (list(self.items()),))
203 def _title(self, key):
204 return key
206 def _key(self, key):
207 if isinstance(key, str):
208 return key
209 else:
210 raise TypeError(
211 "MultiDict keys should be either str " "or subclasses of str"
212 )
214 def add(self, key, value):
215 identity = self._title(key)
216 self._impl._items.append((identity, self._key(key), value))
217 self._impl.incr_version()
219 def copy(self):
220 """Return a copy of itself."""
221 cls = self.__class__
222 return cls(self.items())
224 __copy__ = copy
226 def extend(self, *args, **kwargs):
227 """Extend current MultiDict with more values.
229 This method must be used instead of update.
230 """
231 self._extend(args, kwargs, "extend", self._extend_items)
233 def _extend(self, args, kwargs, name, method):
234 if len(args) > 1:
235 raise TypeError(
236 "{} takes at most 1 positional argument"
237 " ({} given)".format(name, len(args))
238 )
239 if args:
240 arg = args[0]
241 if isinstance(args[0], (MultiDict, MultiDictProxy)) and not kwargs:
242 items = arg._impl._items
243 else:
244 if hasattr(arg, "items"):
245 arg = arg.items()
246 if kwargs:
247 arg = list(arg)
248 arg.extend(list(kwargs.items()))
249 items = []
250 for item in arg:
251 if not len(item) == 2:
252 raise TypeError(
253 "{} takes either dict or list of (key, value) "
254 "tuples".format(name)
255 )
256 items.append((self._title(item[0]), self._key(item[0]), item[1]))
258 method(items)
259 else:
260 method(
261 [
262 (self._title(key), self._key(key), value)
263 for key, value in kwargs.items()
264 ]
265 )
267 def _extend_items(self, items):
268 for identity, key, value in items:
269 self.add(key, value)
271 def clear(self):
272 """Remove all items from MultiDict."""
273 self._impl._items.clear()
274 self._impl.incr_version()
276 # Mapping interface #
278 def __setitem__(self, key, value):
279 self._replace(key, value)
281 def __delitem__(self, key):
282 identity = self._title(key)
283 items = self._impl._items
284 found = False
285 for i in range(len(items) - 1, -1, -1):
286 if items[i][0] == identity:
287 del items[i]
288 found = True
289 if not found:
290 raise KeyError(key)
291 else:
292 self._impl.incr_version()
294 def setdefault(self, key, default=None):
295 """Return value for key, set value to default if key is not present."""
296 identity = self._title(key)
297 for i, k, v in self._impl._items:
298 if i == identity:
299 return v
300 self.add(key, default)
301 return default
303 def popone(self, key, default=_marker):
304 """Remove specified key and return the corresponding value.
306 If key is not found, d is returned if given, otherwise
307 KeyError is raised.
309 """
310 identity = self._title(key)
311 for i in range(len(self._impl._items)):
312 if self._impl._items[i][0] == identity:
313 value = self._impl._items[i][2]
314 del self._impl._items[i]
315 self._impl.incr_version()
316 return value
317 if default is _marker:
318 raise KeyError(key)
319 else:
320 return default
322 pop = popone # type: ignore
324 def popall(self, key, default=_marker):
325 """Remove all occurrences of key and return the list of corresponding
326 values.
328 If key is not found, default is returned if given, otherwise
329 KeyError is raised.
331 """
332 found = False
333 identity = self._title(key)
334 ret = []
335 for i in range(len(self._impl._items) - 1, -1, -1):
336 item = self._impl._items[i]
337 if item[0] == identity:
338 ret.append(item[2])
339 del self._impl._items[i]
340 self._impl.incr_version()
341 found = True
342 if not found:
343 if default is _marker:
344 raise KeyError(key)
345 else:
346 return default
347 else:
348 ret.reverse()
349 return ret
351 def popitem(self):
352 """Remove and return an arbitrary (key, value) pair."""
353 if self._impl._items:
354 i = self._impl._items.pop(0)
355 self._impl.incr_version()
356 return i[1], i[2]
357 else:
358 raise KeyError("empty multidict")
360 def update(self, *args, **kwargs):
361 """Update the dictionary from *other*, overwriting existing keys."""
362 self._extend(args, kwargs, "update", self._update_items)
364 def _update_items(self, items):
365 if not items:
366 return
367 used_keys = {}
368 for identity, key, value in items:
369 start = used_keys.get(identity, 0)
370 for i in range(start, len(self._impl._items)):
371 item = self._impl._items[i]
372 if item[0] == identity:
373 used_keys[identity] = i + 1
374 self._impl._items[i] = (identity, key, value)
375 break
376 else:
377 self._impl._items.append((identity, key, value))
378 used_keys[identity] = len(self._impl._items)
380 # drop tails
381 i = 0
382 while i < len(self._impl._items):
383 item = self._impl._items[i]
384 identity = item[0]
385 pos = used_keys.get(identity)
386 if pos is None:
387 i += 1
388 continue
389 if i >= pos:
390 del self._impl._items[i]
391 else:
392 i += 1
394 self._impl.incr_version()
396 def _replace(self, key, value):
397 key = self._key(key)
398 identity = self._title(key)
399 items = self._impl._items
401 for i in range(len(items)):
402 item = items[i]
403 if item[0] == identity:
404 items[i] = (identity, key, value)
405 # i points to last found item
406 rgt = i
407 self._impl.incr_version()
408 break
409 else:
410 self._impl._items.append((identity, key, value))
411 self._impl.incr_version()
412 return
414 # remove all tail items
415 i = rgt + 1
416 while i < len(items):
417 item = items[i]
418 if item[0] == identity:
419 del items[i]
420 else:
421 i += 1
424class CIMultiDict(MultiDict):
425 """Dictionary with the support for duplicate case-insensitive keys."""
427 def _title(self, key):
428 return key.title()
431class _Iter:
432 __slots__ = ("_size", "_iter")
434 def __init__(self, size, iterator):
435 self._size = size
436 self._iter = iterator
438 def __iter__(self):
439 return self
441 def __next__(self):
442 return next(self._iter)
444 def __length_hint__(self):
445 return self._size
448class _ViewBase:
449 def __init__(self, impl):
450 self._impl = impl
452 def __len__(self):
453 return len(self._impl._items)
456class _ItemsView(_ViewBase, abc.ItemsView):
457 def __contains__(self, item):
458 assert isinstance(item, tuple) or isinstance(item, list)
459 assert len(item) == 2
460 for i, k, v in self._impl._items:
461 if item[0] == k and item[1] == v:
462 return True
463 return False
465 def __iter__(self):
466 return _Iter(len(self), self._iter(self._impl._version))
468 def _iter(self, version):
469 for i, k, v in self._impl._items:
470 if version != self._impl._version:
471 raise RuntimeError("Dictionary changed during iteration")
472 yield k, v
474 def __repr__(self):
475 lst = []
476 for item in self._impl._items:
477 lst.append("{!r}: {!r}".format(item[1], item[2]))
478 body = ", ".join(lst)
479 return "{}({})".format(self.__class__.__name__, body)
482class _ValuesView(_ViewBase, abc.ValuesView):
483 def __contains__(self, value):
484 for item in self._impl._items:
485 if item[2] == value:
486 return True
487 return False
489 def __iter__(self):
490 return _Iter(len(self), self._iter(self._impl._version))
492 def _iter(self, version):
493 for item in self._impl._items:
494 if version != self._impl._version:
495 raise RuntimeError("Dictionary changed during iteration")
496 yield item[2]
498 def __repr__(self):
499 lst = []
500 for item in self._impl._items:
501 lst.append("{!r}".format(item[2]))
502 body = ", ".join(lst)
503 return "{}({})".format(self.__class__.__name__, body)
506class _KeysView(_ViewBase, abc.KeysView):
507 def __contains__(self, key):
508 for item in self._impl._items:
509 if item[1] == key:
510 return True
511 return False
513 def __iter__(self):
514 return _Iter(len(self), self._iter(self._impl._version))
516 def _iter(self, version):
517 for item in self._impl._items:
518 if version != self._impl._version:
519 raise RuntimeError("Dictionary changed during iteration")
520 yield item[1]
522 def __repr__(self):
523 lst = []
524 for item in self._impl._items:
525 lst.append("{!r}".format(item[1]))
526 body = ", ".join(lst)
527 return "{}({})".format(self.__class__.__name__, body)