Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/multidict-6.0.6.dev0-py3.8-linux-x86_64.egg/multidict/_multidict_py.py: 73%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

341 statements  

1import sys 

2import types 

3from array import array 

4from collections import abc 

5 

6from ._abc import MultiMapping, MutableMultiMapping 

7 

8_marker = object() 

9 

10if sys.version_info >= (3, 9): 

11 GenericAlias = types.GenericAlias 

12else: 

13 

14 def GenericAlias(cls): 

15 return cls 

16 

17 

18class istr(str): 

19 

20 """Case insensitive str.""" 

21 

22 __is_istr__ = True 

23 

24 

25upstr = istr # for relaxing backward compatibility problems 

26 

27 

28def getversion(md): 

29 if not isinstance(md, _Base): 

30 raise TypeError("Parameter should be multidict or proxy") 

31 return md._impl._version 

32 

33 

34_version = array("Q", [0]) 

35 

36 

37class _Impl: 

38 __slots__ = ("_items", "_version") 

39 

40 def __init__(self): 

41 self._items = [] 

42 self.incr_version() 

43 

44 def incr_version(self): 

45 global _version 

46 v = _version 

47 v[0] += 1 

48 self._version = v[0] 

49 

50 if sys.implementation.name != "pypy": 

51 

52 def __sizeof__(self): 

53 return object.__sizeof__(self) + sys.getsizeof(self._items) 

54 

55 

56class _Base: 

57 def _title(self, key): 

58 return key 

59 

60 def getall(self, key, default=_marker): 

61 """Return a list of all values matching the key.""" 

62 identity = self._title(key) 

63 res = [v for i, k, v in self._impl._items if i == identity] 

64 if res: 

65 return res 

66 if not res and default is not _marker: 

67 return default 

68 raise KeyError("Key not found: %r" % key) 

69 

70 def getone(self, key, default=_marker): 

71 """Get first value matching the key. 

72 

73 Raises KeyError if the key is not found and no default is provided. 

74 """ 

75 identity = self._title(key) 

76 for i, k, v in self._impl._items: 

77 if i == identity: 

78 return v 

79 if default is not _marker: 

80 return default 

81 raise KeyError("Key not found: %r" % key) 

82 

83 # Mapping interface # 

84 

85 def __getitem__(self, key): 

86 return self.getone(key) 

87 

88 def get(self, key, default=None): 

89 """Get first value matching the key. 

90 

91 If the key is not found, returns the default (or None if no default is provided) 

92 """ 

93 return self.getone(key, default) 

94 

95 def __iter__(self): 

96 return iter(self.keys()) 

97 

98 def __len__(self): 

99 return len(self._impl._items) 

100 

101 def keys(self): 

102 """Return a new view of the dictionary's keys.""" 

103 return _KeysView(self._impl) 

104 

105 def items(self): 

106 """Return a new view of the dictionary's items *(key, value) pairs).""" 

107 return _ItemsView(self._impl) 

108 

109 def values(self): 

110 """Return a new view of the dictionary's values.""" 

111 return _ValuesView(self._impl) 

112 

113 def __eq__(self, other): 

114 if not isinstance(other, abc.Mapping): 

115 return NotImplemented 

116 if isinstance(other, _Base): 

117 lft = self._impl._items 

118 rht = other._impl._items 

119 if len(lft) != len(rht): 

120 return False 

121 for (i1, k2, v1), (i2, k2, v2) in zip(lft, rht): 

122 if i1 != i2 or v1 != v2: 

123 return False 

124 return True 

125 if len(self._impl._items) != len(other): 

126 return False 

127 for k, v in self.items(): 

128 nv = other.get(k, _marker) 

129 if v != nv: 

130 return False 

131 return True 

132 

133 def __contains__(self, key): 

134 identity = self._title(key) 

135 for i, k, v in self._impl._items: 

136 if i == identity: 

137 return True 

138 return False 

139 

140 def __repr__(self): 

141 body = ", ".join("'{}': {!r}".format(k, v) for k, v in self.items()) 

142 return "<{}({})>".format(self.__class__.__name__, body) 

143 

144 __class_getitem__ = classmethod(GenericAlias) 

145 

146 

147class MultiDictProxy(_Base, MultiMapping): 

148 """Read-only proxy for MultiDict instance.""" 

149 

150 def __init__(self, arg): 

151 if not isinstance(arg, (MultiDict, MultiDictProxy)): 

152 raise TypeError( 

153 "ctor requires MultiDict or MultiDictProxy instance" 

154 ", not {}".format(type(arg)) 

155 ) 

156 

157 self._impl = arg._impl 

158 

159 def __reduce__(self): 

160 raise TypeError("can't pickle {} objects".format(self.__class__.__name__)) 

161 

162 def copy(self): 

163 """Return a copy of itself.""" 

164 return MultiDict(self.items()) 

165 

166 

167class CIMultiDictProxy(MultiDictProxy): 

168 """Read-only proxy for CIMultiDict instance.""" 

169 

170 def __init__(self, arg): 

171 if not isinstance(arg, (CIMultiDict, CIMultiDictProxy)): 

172 raise TypeError( 

173 "ctor requires CIMultiDict or CIMultiDictProxy instance" 

174 ", not {}".format(type(arg)) 

175 ) 

176 

177 self._impl = arg._impl 

178 

179 def _title(self, key): 

180 return key.title() 

181 

182 def copy(self): 

183 """Return a copy of itself.""" 

184 return CIMultiDict(self.items()) 

185 

186 

187class MultiDict(_Base, MutableMultiMapping): 

188 """Dictionary with the support for duplicate keys.""" 

189 

190 def __init__(self, *args, **kwargs): 

191 self._impl = _Impl() 

192 

193 self._extend(args, kwargs, self.__class__.__name__, self._extend_items) 

194 

195 if sys.implementation.name != "pypy": 

196 

197 def __sizeof__(self): 

198 return object.__sizeof__(self) + sys.getsizeof(self._impl) 

199 

200 def __reduce__(self): 

201 return (self.__class__, (list(self.items()),)) 

202 

203 def _title(self, key): 

204 return key 

205 

206 def _key(self, key): 

207 if isinstance(key, str): 

208 return key 

209 else: 

210 raise TypeError( 

211 "MultiDict keys should be either str " "or subclasses of str" 

212 ) 

213 

214 def add(self, key, value): 

215 identity = self._title(key) 

216 self._impl._items.append((identity, self._key(key), value)) 

217 self._impl.incr_version() 

218 

219 def copy(self): 

220 """Return a copy of itself.""" 

221 cls = self.__class__ 

222 return cls(self.items()) 

223 

224 __copy__ = copy 

225 

226 def extend(self, *args, **kwargs): 

227 """Extend current MultiDict with more values. 

228 

229 This method must be used instead of update. 

230 """ 

231 self._extend(args, kwargs, "extend", self._extend_items) 

232 

233 def _extend(self, args, kwargs, name, method): 

234 if len(args) > 1: 

235 raise TypeError( 

236 "{} takes at most 1 positional argument" 

237 " ({} given)".format(name, len(args)) 

238 ) 

239 if args: 

240 arg = args[0] 

241 if isinstance(args[0], (MultiDict, MultiDictProxy)) and not kwargs: 

242 items = arg._impl._items 

243 else: 

244 if hasattr(arg, "items"): 

245 arg = arg.items() 

246 if kwargs: 

247 arg = list(arg) 

248 arg.extend(list(kwargs.items())) 

249 items = [] 

250 for item in arg: 

251 if not len(item) == 2: 

252 raise TypeError( 

253 "{} takes either dict or list of (key, value) " 

254 "tuples".format(name) 

255 ) 

256 items.append((self._title(item[0]), self._key(item[0]), item[1])) 

257 

258 method(items) 

259 else: 

260 method( 

261 [ 

262 (self._title(key), self._key(key), value) 

263 for key, value in kwargs.items() 

264 ] 

265 ) 

266 

267 def _extend_items(self, items): 

268 for identity, key, value in items: 

269 self.add(key, value) 

270 

271 def clear(self): 

272 """Remove all items from MultiDict.""" 

273 self._impl._items.clear() 

274 self._impl.incr_version() 

275 

276 # Mapping interface # 

277 

278 def __setitem__(self, key, value): 

279 self._replace(key, value) 

280 

281 def __delitem__(self, key): 

282 identity = self._title(key) 

283 items = self._impl._items 

284 found = False 

285 for i in range(len(items) - 1, -1, -1): 

286 if items[i][0] == identity: 

287 del items[i] 

288 found = True 

289 if not found: 

290 raise KeyError(key) 

291 else: 

292 self._impl.incr_version() 

293 

294 def setdefault(self, key, default=None): 

295 """Return value for key, set value to default if key is not present.""" 

296 identity = self._title(key) 

297 for i, k, v in self._impl._items: 

298 if i == identity: 

299 return v 

300 self.add(key, default) 

301 return default 

302 

303 def popone(self, key, default=_marker): 

304 """Remove specified key and return the corresponding value. 

305 

306 If key is not found, d is returned if given, otherwise 

307 KeyError is raised. 

308 

309 """ 

310 identity = self._title(key) 

311 for i in range(len(self._impl._items)): 

312 if self._impl._items[i][0] == identity: 

313 value = self._impl._items[i][2] 

314 del self._impl._items[i] 

315 self._impl.incr_version() 

316 return value 

317 if default is _marker: 

318 raise KeyError(key) 

319 else: 

320 return default 

321 

322 pop = popone # type: ignore 

323 

324 def popall(self, key, default=_marker): 

325 """Remove all occurrences of key and return the list of corresponding 

326 values. 

327 

328 If key is not found, default is returned if given, otherwise 

329 KeyError is raised. 

330 

331 """ 

332 found = False 

333 identity = self._title(key) 

334 ret = [] 

335 for i in range(len(self._impl._items) - 1, -1, -1): 

336 item = self._impl._items[i] 

337 if item[0] == identity: 

338 ret.append(item[2]) 

339 del self._impl._items[i] 

340 self._impl.incr_version() 

341 found = True 

342 if not found: 

343 if default is _marker: 

344 raise KeyError(key) 

345 else: 

346 return default 

347 else: 

348 ret.reverse() 

349 return ret 

350 

351 def popitem(self): 

352 """Remove and return an arbitrary (key, value) pair.""" 

353 if self._impl._items: 

354 i = self._impl._items.pop(0) 

355 self._impl.incr_version() 

356 return i[1], i[2] 

357 else: 

358 raise KeyError("empty multidict") 

359 

360 def update(self, *args, **kwargs): 

361 """Update the dictionary from *other*, overwriting existing keys.""" 

362 self._extend(args, kwargs, "update", self._update_items) 

363 

364 def _update_items(self, items): 

365 if not items: 

366 return 

367 used_keys = {} 

368 for identity, key, value in items: 

369 start = used_keys.get(identity, 0) 

370 for i in range(start, len(self._impl._items)): 

371 item = self._impl._items[i] 

372 if item[0] == identity: 

373 used_keys[identity] = i + 1 

374 self._impl._items[i] = (identity, key, value) 

375 break 

376 else: 

377 self._impl._items.append((identity, key, value)) 

378 used_keys[identity] = len(self._impl._items) 

379 

380 # drop tails 

381 i = 0 

382 while i < len(self._impl._items): 

383 item = self._impl._items[i] 

384 identity = item[0] 

385 pos = used_keys.get(identity) 

386 if pos is None: 

387 i += 1 

388 continue 

389 if i >= pos: 

390 del self._impl._items[i] 

391 else: 

392 i += 1 

393 

394 self._impl.incr_version() 

395 

396 def _replace(self, key, value): 

397 key = self._key(key) 

398 identity = self._title(key) 

399 items = self._impl._items 

400 

401 for i in range(len(items)): 

402 item = items[i] 

403 if item[0] == identity: 

404 items[i] = (identity, key, value) 

405 # i points to last found item 

406 rgt = i 

407 self._impl.incr_version() 

408 break 

409 else: 

410 self._impl._items.append((identity, key, value)) 

411 self._impl.incr_version() 

412 return 

413 

414 # remove all tail items 

415 i = rgt + 1 

416 while i < len(items): 

417 item = items[i] 

418 if item[0] == identity: 

419 del items[i] 

420 else: 

421 i += 1 

422 

423 

424class CIMultiDict(MultiDict): 

425 """Dictionary with the support for duplicate case-insensitive keys.""" 

426 

427 def _title(self, key): 

428 return key.title() 

429 

430 

431class _Iter: 

432 __slots__ = ("_size", "_iter") 

433 

434 def __init__(self, size, iterator): 

435 self._size = size 

436 self._iter = iterator 

437 

438 def __iter__(self): 

439 return self 

440 

441 def __next__(self): 

442 return next(self._iter) 

443 

444 def __length_hint__(self): 

445 return self._size 

446 

447 

448class _ViewBase: 

449 def __init__(self, impl): 

450 self._impl = impl 

451 

452 def __len__(self): 

453 return len(self._impl._items) 

454 

455 

456class _ItemsView(_ViewBase, abc.ItemsView): 

457 def __contains__(self, item): 

458 assert isinstance(item, tuple) or isinstance(item, list) 

459 assert len(item) == 2 

460 for i, k, v in self._impl._items: 

461 if item[0] == k and item[1] == v: 

462 return True 

463 return False 

464 

465 def __iter__(self): 

466 return _Iter(len(self), self._iter(self._impl._version)) 

467 

468 def _iter(self, version): 

469 for i, k, v in self._impl._items: 

470 if version != self._impl._version: 

471 raise RuntimeError("Dictionary changed during iteration") 

472 yield k, v 

473 

474 def __repr__(self): 

475 lst = [] 

476 for item in self._impl._items: 

477 lst.append("{!r}: {!r}".format(item[1], item[2])) 

478 body = ", ".join(lst) 

479 return "{}({})".format(self.__class__.__name__, body) 

480 

481 

482class _ValuesView(_ViewBase, abc.ValuesView): 

483 def __contains__(self, value): 

484 for item in self._impl._items: 

485 if item[2] == value: 

486 return True 

487 return False 

488 

489 def __iter__(self): 

490 return _Iter(len(self), self._iter(self._impl._version)) 

491 

492 def _iter(self, version): 

493 for item in self._impl._items: 

494 if version != self._impl._version: 

495 raise RuntimeError("Dictionary changed during iteration") 

496 yield item[2] 

497 

498 def __repr__(self): 

499 lst = [] 

500 for item in self._impl._items: 

501 lst.append("{!r}".format(item[2])) 

502 body = ", ".join(lst) 

503 return "{}({})".format(self.__class__.__name__, body) 

504 

505 

506class _KeysView(_ViewBase, abc.KeysView): 

507 def __contains__(self, key): 

508 for item in self._impl._items: 

509 if item[1] == key: 

510 return True 

511 return False 

512 

513 def __iter__(self): 

514 return _Iter(len(self), self._iter(self._impl._version)) 

515 

516 def _iter(self, version): 

517 for item in self._impl._items: 

518 if version != self._impl._version: 

519 raise RuntimeError("Dictionary changed during iteration") 

520 yield item[1] 

521 

522 def __repr__(self): 

523 lst = [] 

524 for item in self._impl._items: 

525 lst.append("{!r}".format(item[1])) 

526 body = ", ".join(lst) 

527 return "{}({})".format(self.__class__.__name__, body)