Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/multidict/_multidict_py.py: 46%

341 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1import sys 

2import types 

3from array import array 

4from collections import abc 

5 

6from ._abc import MultiMapping, MutableMultiMapping 

7 

8_marker = object() 

9 

10if sys.version_info >= (3, 9): 

11 GenericAlias = types.GenericAlias 

12else: 

13 def GenericAlias(cls): 

14 return cls 

15 

16 

17class istr(str): 

18 

19 """Case insensitive str.""" 

20 

21 __is_istr__ = True 

22 

23 

24upstr = istr # for relaxing backward compatibility problems 

25 

26 

27def getversion(md): 

28 if not isinstance(md, _Base): 

29 raise TypeError("Parameter should be multidict or proxy") 

30 return md._impl._version 

31 

32 

33_version = array("Q", [0]) 

34 

35 

36class _Impl: 

37 __slots__ = ("_items", "_version") 

38 

39 def __init__(self): 

40 self._items = [] 

41 self.incr_version() 

42 

43 def incr_version(self): 

44 global _version 

45 v = _version 

46 v[0] += 1 

47 self._version = v[0] 

48 

49 if sys.implementation.name != "pypy": 

50 

51 def __sizeof__(self): 

52 return object.__sizeof__(self) + sys.getsizeof(self._items) 

53 

54 

55class _Base: 

56 def _title(self, key): 

57 return key 

58 

59 def getall(self, key, default=_marker): 

60 """Return a list of all values matching the key.""" 

61 identity = self._title(key) 

62 res = [v for i, k, v in self._impl._items if i == identity] 

63 if res: 

64 return res 

65 if not res and default is not _marker: 

66 return default 

67 raise KeyError("Key not found: %r" % key) 

68 

69 def getone(self, key, default=_marker): 

70 """Get first value matching the key. 

71 

72 Raises KeyError if the key is not found and no default is provided. 

73 """ 

74 identity = self._title(key) 

75 for i, k, v in self._impl._items: 

76 if i == identity: 

77 return v 

78 if default is not _marker: 

79 return default 

80 raise KeyError("Key not found: %r" % key) 

81 

82 # Mapping interface # 

83 

84 def __getitem__(self, key): 

85 return self.getone(key) 

86 

87 def get(self, key, default=None): 

88 """Get first value matching the key. 

89 

90 If the key is not found, returns the default (or None if no default is provided) 

91 """ 

92 return self.getone(key, default) 

93 

94 def __iter__(self): 

95 return iter(self.keys()) 

96 

97 def __len__(self): 

98 return len(self._impl._items) 

99 

100 def keys(self): 

101 """Return a new view of the dictionary's keys.""" 

102 return _KeysView(self._impl) 

103 

104 def items(self): 

105 """Return a new view of the dictionary's items *(key, value) pairs).""" 

106 return _ItemsView(self._impl) 

107 

108 def values(self): 

109 """Return a new view of the dictionary's values.""" 

110 return _ValuesView(self._impl) 

111 

112 def __eq__(self, other): 

113 if not isinstance(other, abc.Mapping): 

114 return NotImplemented 

115 if isinstance(other, _Base): 

116 lft = self._impl._items 

117 rht = other._impl._items 

118 if len(lft) != len(rht): 

119 return False 

120 for (i1, k2, v1), (i2, k2, v2) in zip(lft, rht): 

121 if i1 != i2 or v1 != v2: 

122 return False 

123 return True 

124 if len(self._impl._items) != len(other): 

125 return False 

126 for k, v in self.items(): 

127 nv = other.get(k, _marker) 

128 if v != nv: 

129 return False 

130 return True 

131 

132 def __contains__(self, key): 

133 identity = self._title(key) 

134 for i, k, v in self._impl._items: 

135 if i == identity: 

136 return True 

137 return False 

138 

139 def __repr__(self): 

140 body = ", ".join("'{}': {!r}".format(k, v) for k, v in self.items()) 

141 return "<{}({})>".format(self.__class__.__name__, body) 

142 

143 __class_getitem__ = classmethod(GenericAlias) 

144 

145 

146class MultiDictProxy(_Base, MultiMapping): 

147 """Read-only proxy for MultiDict instance.""" 

148 

149 def __init__(self, arg): 

150 if not isinstance(arg, (MultiDict, MultiDictProxy)): 

151 raise TypeError( 

152 "ctor requires MultiDict or MultiDictProxy instance" 

153 ", not {}".format(type(arg)) 

154 ) 

155 

156 self._impl = arg._impl 

157 

158 def __reduce__(self): 

159 raise TypeError("can't pickle {} objects".format(self.__class__.__name__)) 

160 

161 def copy(self): 

162 """Return a copy of itself.""" 

163 return MultiDict(self.items()) 

164 

165 

166class CIMultiDictProxy(MultiDictProxy): 

167 """Read-only proxy for CIMultiDict instance.""" 

168 

169 def __init__(self, arg): 

170 if not isinstance(arg, (CIMultiDict, CIMultiDictProxy)): 

171 raise TypeError( 

172 "ctor requires CIMultiDict or CIMultiDictProxy instance" 

173 ", not {}".format(type(arg)) 

174 ) 

175 

176 self._impl = arg._impl 

177 

178 def _title(self, key): 

179 return key.title() 

180 

181 def copy(self): 

182 """Return a copy of itself.""" 

183 return CIMultiDict(self.items()) 

184 

185 

186class MultiDict(_Base, MutableMultiMapping): 

187 """Dictionary with the support for duplicate keys.""" 

188 

189 def __init__(self, *args, **kwargs): 

190 self._impl = _Impl() 

191 

192 self._extend(args, kwargs, self.__class__.__name__, self._extend_items) 

193 

194 if sys.implementation.name != "pypy": 

195 

196 def __sizeof__(self): 

197 return object.__sizeof__(self) + sys.getsizeof(self._impl) 

198 

199 def __reduce__(self): 

200 return (self.__class__, (list(self.items()),)) 

201 

202 def _title(self, key): 

203 return key 

204 

205 def _key(self, key): 

206 if isinstance(key, str): 

207 return key 

208 else: 

209 raise TypeError( 

210 "MultiDict keys should be either str " "or subclasses of str" 

211 ) 

212 

213 def add(self, key, value): 

214 identity = self._title(key) 

215 self._impl._items.append((identity, self._key(key), value)) 

216 self._impl.incr_version() 

217 

218 def copy(self): 

219 """Return a copy of itself.""" 

220 cls = self.__class__ 

221 return cls(self.items()) 

222 

223 __copy__ = copy 

224 

225 def extend(self, *args, **kwargs): 

226 """Extend current MultiDict with more values. 

227 

228 This method must be used instead of update. 

229 """ 

230 self._extend(args, kwargs, "extend", self._extend_items) 

231 

232 def _extend(self, args, kwargs, name, method): 

233 if len(args) > 1: 

234 raise TypeError( 

235 "{} takes at most 1 positional argument" 

236 " ({} given)".format(name, len(args)) 

237 ) 

238 if args: 

239 arg = args[0] 

240 if isinstance(args[0], (MultiDict, MultiDictProxy)) and not kwargs: 

241 items = arg._impl._items 

242 else: 

243 if hasattr(arg, "items"): 

244 arg = arg.items() 

245 if kwargs: 

246 arg = list(arg) 

247 arg.extend(list(kwargs.items())) 

248 items = [] 

249 for item in arg: 

250 if not len(item) == 2: 

251 raise TypeError( 

252 "{} takes either dict or list of (key, value) " 

253 "tuples".format(name) 

254 ) 

255 items.append((self._title(item[0]), self._key(item[0]), item[1])) 

256 

257 method(items) 

258 else: 

259 method( 

260 [ 

261 (self._title(key), self._key(key), value) 

262 for key, value in kwargs.items() 

263 ] 

264 ) 

265 

266 def _extend_items(self, items): 

267 for identity, key, value in items: 

268 self.add(key, value) 

269 

270 def clear(self): 

271 """Remove all items from MultiDict.""" 

272 self._impl._items.clear() 

273 self._impl.incr_version() 

274 

275 # Mapping interface # 

276 

277 def __setitem__(self, key, value): 

278 self._replace(key, value) 

279 

280 def __delitem__(self, key): 

281 identity = self._title(key) 

282 items = self._impl._items 

283 found = False 

284 for i in range(len(items) - 1, -1, -1): 

285 if items[i][0] == identity: 

286 del items[i] 

287 found = True 

288 if not found: 

289 raise KeyError(key) 

290 else: 

291 self._impl.incr_version() 

292 

293 def setdefault(self, key, default=None): 

294 """Return value for key, set value to default if key is not present.""" 

295 identity = self._title(key) 

296 for i, k, v in self._impl._items: 

297 if i == identity: 

298 return v 

299 self.add(key, default) 

300 return default 

301 

302 def popone(self, key, default=_marker): 

303 """Remove specified key and return the corresponding value. 

304 

305 If key is not found, d is returned if given, otherwise 

306 KeyError is raised. 

307 

308 """ 

309 identity = self._title(key) 

310 for i in range(len(self._impl._items)): 

311 if self._impl._items[i][0] == identity: 

312 value = self._impl._items[i][2] 

313 del self._impl._items[i] 

314 self._impl.incr_version() 

315 return value 

316 if default is _marker: 

317 raise KeyError(key) 

318 else: 

319 return default 

320 

321 pop = popone # type: ignore 

322 

323 def popall(self, key, default=_marker): 

324 """Remove all occurrences of key and return the list of corresponding 

325 values. 

326 

327 If key is not found, default is returned if given, otherwise 

328 KeyError is raised. 

329 

330 """ 

331 found = False 

332 identity = self._title(key) 

333 ret = [] 

334 for i in range(len(self._impl._items) - 1, -1, -1): 

335 item = self._impl._items[i] 

336 if item[0] == identity: 

337 ret.append(item[2]) 

338 del self._impl._items[i] 

339 self._impl.incr_version() 

340 found = True 

341 if not found: 

342 if default is _marker: 

343 raise KeyError(key) 

344 else: 

345 return default 

346 else: 

347 ret.reverse() 

348 return ret 

349 

350 def popitem(self): 

351 """Remove and return an arbitrary (key, value) pair.""" 

352 if self._impl._items: 

353 i = self._impl._items.pop(0) 

354 self._impl.incr_version() 

355 return i[1], i[2] 

356 else: 

357 raise KeyError("empty multidict") 

358 

359 def update(self, *args, **kwargs): 

360 """Update the dictionary from *other*, overwriting existing keys.""" 

361 self._extend(args, kwargs, "update", self._update_items) 

362 

363 def _update_items(self, items): 

364 if not items: 

365 return 

366 used_keys = {} 

367 for identity, key, value in items: 

368 start = used_keys.get(identity, 0) 

369 for i in range(start, len(self._impl._items)): 

370 item = self._impl._items[i] 

371 if item[0] == identity: 

372 used_keys[identity] = i + 1 

373 self._impl._items[i] = (identity, key, value) 

374 break 

375 else: 

376 self._impl._items.append((identity, key, value)) 

377 used_keys[identity] = len(self._impl._items) 

378 

379 # drop tails 

380 i = 0 

381 while i < len(self._impl._items): 

382 item = self._impl._items[i] 

383 identity = item[0] 

384 pos = used_keys.get(identity) 

385 if pos is None: 

386 i += 1 

387 continue 

388 if i >= pos: 

389 del self._impl._items[i] 

390 else: 

391 i += 1 

392 

393 self._impl.incr_version() 

394 

395 def _replace(self, key, value): 

396 key = self._key(key) 

397 identity = self._title(key) 

398 items = self._impl._items 

399 

400 for i in range(len(items)): 

401 item = items[i] 

402 if item[0] == identity: 

403 items[i] = (identity, key, value) 

404 # i points to last found item 

405 rgt = i 

406 self._impl.incr_version() 

407 break 

408 else: 

409 self._impl._items.append((identity, key, value)) 

410 self._impl.incr_version() 

411 return 

412 

413 # remove all tail items 

414 i = rgt + 1 

415 while i < len(items): 

416 item = items[i] 

417 if item[0] == identity: 

418 del items[i] 

419 else: 

420 i += 1 

421 

422 

423class CIMultiDict(MultiDict): 

424 """Dictionary with the support for duplicate case-insensitive keys.""" 

425 

426 def _title(self, key): 

427 return key.title() 

428 

429 

430class _Iter: 

431 __slots__ = ("_size", "_iter") 

432 

433 def __init__(self, size, iterator): 

434 self._size = size 

435 self._iter = iterator 

436 

437 def __iter__(self): 

438 return self 

439 

440 def __next__(self): 

441 return next(self._iter) 

442 

443 def __length_hint__(self): 

444 return self._size 

445 

446 

447class _ViewBase: 

448 def __init__(self, impl): 

449 self._impl = impl 

450 

451 def __len__(self): 

452 return len(self._impl._items) 

453 

454 

455class _ItemsView(_ViewBase, abc.ItemsView): 

456 def __contains__(self, item): 

457 assert isinstance(item, tuple) or isinstance(item, list) 

458 assert len(item) == 2 

459 for i, k, v in self._impl._items: 

460 if item[0] == k and item[1] == v: 

461 return True 

462 return False 

463 

464 def __iter__(self): 

465 return _Iter(len(self), self._iter(self._impl._version)) 

466 

467 def _iter(self, version): 

468 for i, k, v in self._impl._items: 

469 if version != self._impl._version: 

470 raise RuntimeError("Dictionary changed during iteration") 

471 yield k, v 

472 

473 def __repr__(self): 

474 lst = [] 

475 for item in self._impl._items: 

476 lst.append("{!r}: {!r}".format(item[1], item[2])) 

477 body = ", ".join(lst) 

478 return "{}({})".format(self.__class__.__name__, body) 

479 

480 

481class _ValuesView(_ViewBase, abc.ValuesView): 

482 def __contains__(self, value): 

483 for item in self._impl._items: 

484 if item[2] == value: 

485 return True 

486 return False 

487 

488 def __iter__(self): 

489 return _Iter(len(self), self._iter(self._impl._version)) 

490 

491 def _iter(self, version): 

492 for item in self._impl._items: 

493 if version != self._impl._version: 

494 raise RuntimeError("Dictionary changed during iteration") 

495 yield item[2] 

496 

497 def __repr__(self): 

498 lst = [] 

499 for item in self._impl._items: 

500 lst.append("{!r}".format(item[2])) 

501 body = ", ".join(lst) 

502 return "{}({})".format(self.__class__.__name__, body) 

503 

504 

505class _KeysView(_ViewBase, abc.KeysView): 

506 def __contains__(self, key): 

507 for item in self._impl._items: 

508 if item[1] == key: 

509 return True 

510 return False 

511 

512 def __iter__(self): 

513 return _Iter(len(self), self._iter(self._impl._version)) 

514 

515 def _iter(self, version): 

516 for item in self._impl._items: 

517 if version != self._impl._version: 

518 raise RuntimeError("Dictionary changed during iteration") 

519 yield item[1] 

520 

521 def __repr__(self): 

522 lst = [] 

523 for item in self._impl._items: 

524 lst.append("{!r}".format(item[1])) 

525 body = ", ".join(lst) 

526 return "{}({})".format(self.__class__.__name__, body)