Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/multidict/_multidict_py.py: 46%
341 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1import sys
2import types
3from array import array
4from collections import abc
6from ._abc import MultiMapping, MutableMultiMapping
8_marker = object()
10if sys.version_info >= (3, 9):
11 GenericAlias = types.GenericAlias
12else:
13 def GenericAlias(cls):
14 return cls
17class istr(str):
19 """Case insensitive str."""
21 __is_istr__ = True
24upstr = istr # for relaxing backward compatibility problems
27def getversion(md):
28 if not isinstance(md, _Base):
29 raise TypeError("Parameter should be multidict or proxy")
30 return md._impl._version
33_version = array("Q", [0])
36class _Impl:
37 __slots__ = ("_items", "_version")
39 def __init__(self):
40 self._items = []
41 self.incr_version()
43 def incr_version(self):
44 global _version
45 v = _version
46 v[0] += 1
47 self._version = v[0]
49 if sys.implementation.name != "pypy":
51 def __sizeof__(self):
52 return object.__sizeof__(self) + sys.getsizeof(self._items)
55class _Base:
56 def _title(self, key):
57 return key
59 def getall(self, key, default=_marker):
60 """Return a list of all values matching the key."""
61 identity = self._title(key)
62 res = [v for i, k, v in self._impl._items if i == identity]
63 if res:
64 return res
65 if not res and default is not _marker:
66 return default
67 raise KeyError("Key not found: %r" % key)
69 def getone(self, key, default=_marker):
70 """Get first value matching the key.
72 Raises KeyError if the key is not found and no default is provided.
73 """
74 identity = self._title(key)
75 for i, k, v in self._impl._items:
76 if i == identity:
77 return v
78 if default is not _marker:
79 return default
80 raise KeyError("Key not found: %r" % key)
82 # Mapping interface #
84 def __getitem__(self, key):
85 return self.getone(key)
87 def get(self, key, default=None):
88 """Get first value matching the key.
90 If the key is not found, returns the default (or None if no default is provided)
91 """
92 return self.getone(key, default)
94 def __iter__(self):
95 return iter(self.keys())
97 def __len__(self):
98 return len(self._impl._items)
100 def keys(self):
101 """Return a new view of the dictionary's keys."""
102 return _KeysView(self._impl)
104 def items(self):
105 """Return a new view of the dictionary's items *(key, value) pairs)."""
106 return _ItemsView(self._impl)
108 def values(self):
109 """Return a new view of the dictionary's values."""
110 return _ValuesView(self._impl)
112 def __eq__(self, other):
113 if not isinstance(other, abc.Mapping):
114 return NotImplemented
115 if isinstance(other, _Base):
116 lft = self._impl._items
117 rht = other._impl._items
118 if len(lft) != len(rht):
119 return False
120 for (i1, k2, v1), (i2, k2, v2) in zip(lft, rht):
121 if i1 != i2 or v1 != v2:
122 return False
123 return True
124 if len(self._impl._items) != len(other):
125 return False
126 for k, v in self.items():
127 nv = other.get(k, _marker)
128 if v != nv:
129 return False
130 return True
132 def __contains__(self, key):
133 identity = self._title(key)
134 for i, k, v in self._impl._items:
135 if i == identity:
136 return True
137 return False
139 def __repr__(self):
140 body = ", ".join("'{}': {!r}".format(k, v) for k, v in self.items())
141 return "<{}({})>".format(self.__class__.__name__, body)
143 __class_getitem__ = classmethod(GenericAlias)
146class MultiDictProxy(_Base, MultiMapping):
147 """Read-only proxy for MultiDict instance."""
149 def __init__(self, arg):
150 if not isinstance(arg, (MultiDict, MultiDictProxy)):
151 raise TypeError(
152 "ctor requires MultiDict or MultiDictProxy instance"
153 ", not {}".format(type(arg))
154 )
156 self._impl = arg._impl
158 def __reduce__(self):
159 raise TypeError("can't pickle {} objects".format(self.__class__.__name__))
161 def copy(self):
162 """Return a copy of itself."""
163 return MultiDict(self.items())
166class CIMultiDictProxy(MultiDictProxy):
167 """Read-only proxy for CIMultiDict instance."""
169 def __init__(self, arg):
170 if not isinstance(arg, (CIMultiDict, CIMultiDictProxy)):
171 raise TypeError(
172 "ctor requires CIMultiDict or CIMultiDictProxy instance"
173 ", not {}".format(type(arg))
174 )
176 self._impl = arg._impl
178 def _title(self, key):
179 return key.title()
181 def copy(self):
182 """Return a copy of itself."""
183 return CIMultiDict(self.items())
186class MultiDict(_Base, MutableMultiMapping):
187 """Dictionary with the support for duplicate keys."""
189 def __init__(self, *args, **kwargs):
190 self._impl = _Impl()
192 self._extend(args, kwargs, self.__class__.__name__, self._extend_items)
194 if sys.implementation.name != "pypy":
196 def __sizeof__(self):
197 return object.__sizeof__(self) + sys.getsizeof(self._impl)
199 def __reduce__(self):
200 return (self.__class__, (list(self.items()),))
202 def _title(self, key):
203 return key
205 def _key(self, key):
206 if isinstance(key, str):
207 return key
208 else:
209 raise TypeError(
210 "MultiDict keys should be either str " "or subclasses of str"
211 )
213 def add(self, key, value):
214 identity = self._title(key)
215 self._impl._items.append((identity, self._key(key), value))
216 self._impl.incr_version()
218 def copy(self):
219 """Return a copy of itself."""
220 cls = self.__class__
221 return cls(self.items())
223 __copy__ = copy
225 def extend(self, *args, **kwargs):
226 """Extend current MultiDict with more values.
228 This method must be used instead of update.
229 """
230 self._extend(args, kwargs, "extend", self._extend_items)
232 def _extend(self, args, kwargs, name, method):
233 if len(args) > 1:
234 raise TypeError(
235 "{} takes at most 1 positional argument"
236 " ({} given)".format(name, len(args))
237 )
238 if args:
239 arg = args[0]
240 if isinstance(args[0], (MultiDict, MultiDictProxy)) and not kwargs:
241 items = arg._impl._items
242 else:
243 if hasattr(arg, "items"):
244 arg = arg.items()
245 if kwargs:
246 arg = list(arg)
247 arg.extend(list(kwargs.items()))
248 items = []
249 for item in arg:
250 if not len(item) == 2:
251 raise TypeError(
252 "{} takes either dict or list of (key, value) "
253 "tuples".format(name)
254 )
255 items.append((self._title(item[0]), self._key(item[0]), item[1]))
257 method(items)
258 else:
259 method(
260 [
261 (self._title(key), self._key(key), value)
262 for key, value in kwargs.items()
263 ]
264 )
266 def _extend_items(self, items):
267 for identity, key, value in items:
268 self.add(key, value)
270 def clear(self):
271 """Remove all items from MultiDict."""
272 self._impl._items.clear()
273 self._impl.incr_version()
275 # Mapping interface #
277 def __setitem__(self, key, value):
278 self._replace(key, value)
280 def __delitem__(self, key):
281 identity = self._title(key)
282 items = self._impl._items
283 found = False
284 for i in range(len(items) - 1, -1, -1):
285 if items[i][0] == identity:
286 del items[i]
287 found = True
288 if not found:
289 raise KeyError(key)
290 else:
291 self._impl.incr_version()
293 def setdefault(self, key, default=None):
294 """Return value for key, set value to default if key is not present."""
295 identity = self._title(key)
296 for i, k, v in self._impl._items:
297 if i == identity:
298 return v
299 self.add(key, default)
300 return default
302 def popone(self, key, default=_marker):
303 """Remove specified key and return the corresponding value.
305 If key is not found, d is returned if given, otherwise
306 KeyError is raised.
308 """
309 identity = self._title(key)
310 for i in range(len(self._impl._items)):
311 if self._impl._items[i][0] == identity:
312 value = self._impl._items[i][2]
313 del self._impl._items[i]
314 self._impl.incr_version()
315 return value
316 if default is _marker:
317 raise KeyError(key)
318 else:
319 return default
321 pop = popone # type: ignore
323 def popall(self, key, default=_marker):
324 """Remove all occurrences of key and return the list of corresponding
325 values.
327 If key is not found, default is returned if given, otherwise
328 KeyError is raised.
330 """
331 found = False
332 identity = self._title(key)
333 ret = []
334 for i in range(len(self._impl._items) - 1, -1, -1):
335 item = self._impl._items[i]
336 if item[0] == identity:
337 ret.append(item[2])
338 del self._impl._items[i]
339 self._impl.incr_version()
340 found = True
341 if not found:
342 if default is _marker:
343 raise KeyError(key)
344 else:
345 return default
346 else:
347 ret.reverse()
348 return ret
350 def popitem(self):
351 """Remove and return an arbitrary (key, value) pair."""
352 if self._impl._items:
353 i = self._impl._items.pop(0)
354 self._impl.incr_version()
355 return i[1], i[2]
356 else:
357 raise KeyError("empty multidict")
359 def update(self, *args, **kwargs):
360 """Update the dictionary from *other*, overwriting existing keys."""
361 self._extend(args, kwargs, "update", self._update_items)
363 def _update_items(self, items):
364 if not items:
365 return
366 used_keys = {}
367 for identity, key, value in items:
368 start = used_keys.get(identity, 0)
369 for i in range(start, len(self._impl._items)):
370 item = self._impl._items[i]
371 if item[0] == identity:
372 used_keys[identity] = i + 1
373 self._impl._items[i] = (identity, key, value)
374 break
375 else:
376 self._impl._items.append((identity, key, value))
377 used_keys[identity] = len(self._impl._items)
379 # drop tails
380 i = 0
381 while i < len(self._impl._items):
382 item = self._impl._items[i]
383 identity = item[0]
384 pos = used_keys.get(identity)
385 if pos is None:
386 i += 1
387 continue
388 if i >= pos:
389 del self._impl._items[i]
390 else:
391 i += 1
393 self._impl.incr_version()
395 def _replace(self, key, value):
396 key = self._key(key)
397 identity = self._title(key)
398 items = self._impl._items
400 for i in range(len(items)):
401 item = items[i]
402 if item[0] == identity:
403 items[i] = (identity, key, value)
404 # i points to last found item
405 rgt = i
406 self._impl.incr_version()
407 break
408 else:
409 self._impl._items.append((identity, key, value))
410 self._impl.incr_version()
411 return
413 # remove all tail items
414 i = rgt + 1
415 while i < len(items):
416 item = items[i]
417 if item[0] == identity:
418 del items[i]
419 else:
420 i += 1
423class CIMultiDict(MultiDict):
424 """Dictionary with the support for duplicate case-insensitive keys."""
426 def _title(self, key):
427 return key.title()
430class _Iter:
431 __slots__ = ("_size", "_iter")
433 def __init__(self, size, iterator):
434 self._size = size
435 self._iter = iterator
437 def __iter__(self):
438 return self
440 def __next__(self):
441 return next(self._iter)
443 def __length_hint__(self):
444 return self._size
447class _ViewBase:
448 def __init__(self, impl):
449 self._impl = impl
451 def __len__(self):
452 return len(self._impl._items)
455class _ItemsView(_ViewBase, abc.ItemsView):
456 def __contains__(self, item):
457 assert isinstance(item, tuple) or isinstance(item, list)
458 assert len(item) == 2
459 for i, k, v in self._impl._items:
460 if item[0] == k and item[1] == v:
461 return True
462 return False
464 def __iter__(self):
465 return _Iter(len(self), self._iter(self._impl._version))
467 def _iter(self, version):
468 for i, k, v in self._impl._items:
469 if version != self._impl._version:
470 raise RuntimeError("Dictionary changed during iteration")
471 yield k, v
473 def __repr__(self):
474 lst = []
475 for item in self._impl._items:
476 lst.append("{!r}: {!r}".format(item[1], item[2]))
477 body = ", ".join(lst)
478 return "{}({})".format(self.__class__.__name__, body)
481class _ValuesView(_ViewBase, abc.ValuesView):
482 def __contains__(self, value):
483 for item in self._impl._items:
484 if item[2] == value:
485 return True
486 return False
488 def __iter__(self):
489 return _Iter(len(self), self._iter(self._impl._version))
491 def _iter(self, version):
492 for item in self._impl._items:
493 if version != self._impl._version:
494 raise RuntimeError("Dictionary changed during iteration")
495 yield item[2]
497 def __repr__(self):
498 lst = []
499 for item in self._impl._items:
500 lst.append("{!r}".format(item[2]))
501 body = ", ".join(lst)
502 return "{}({})".format(self.__class__.__name__, body)
505class _KeysView(_ViewBase, abc.KeysView):
506 def __contains__(self, key):
507 for item in self._impl._items:
508 if item[1] == key:
509 return True
510 return False
512 def __iter__(self):
513 return _Iter(len(self), self._iter(self._impl._version))
515 def _iter(self, version):
516 for item in self._impl._items:
517 if version != self._impl._version:
518 raise RuntimeError("Dictionary changed during iteration")
519 yield item[1]
521 def __repr__(self):
522 lst = []
523 for item in self._impl._items:
524 lst.append("{!r}".format(item[1]))
525 body = ", ".join(lst)
526 return "{}({})".format(self.__class__.__name__, body)