Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/multidict/_multidict_py.py: 49%

335 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:52 +0000

1import sys 

2from array import array 

3from collections import abc 

4 

5from ._abc import MultiMapping, MutableMultiMapping 

6 

7_marker = object() 

8 

9 

10class istr(str): 

11 

12 """Case insensitive str.""" 

13 

14 __is_istr__ = True 

15 

16 

17upstr = istr # for relaxing backward compatibility problems 

18 

19 

20def getversion(md): 

21 if not isinstance(md, _Base): 

22 raise TypeError("Parameter should be multidict or proxy") 

23 return md._impl._version 

24 

25 

26_version = array("Q", [0]) 

27 

28 

29class _Impl: 

30 __slots__ = ("_items", "_version") 

31 

32 def __init__(self): 

33 self._items = [] 

34 self.incr_version() 

35 

36 def incr_version(self): 

37 global _version 

38 v = _version 

39 v[0] += 1 

40 self._version = v[0] 

41 

42 if sys.implementation.name != "pypy": 

43 

44 def __sizeof__(self): 

45 return object.__sizeof__(self) + sys.getsizeof(self._items) 

46 

47 

48class _Base: 

49 def _title(self, key): 

50 return key 

51 

52 def getall(self, key, default=_marker): 

53 """Return a list of all values matching the key.""" 

54 identity = self._title(key) 

55 res = [v for i, k, v in self._impl._items if i == identity] 

56 if res: 

57 return res 

58 if not res and default is not _marker: 

59 return default 

60 raise KeyError("Key not found: %r" % key) 

61 

62 def getone(self, key, default=_marker): 

63 """Get first value matching the key.""" 

64 identity = self._title(key) 

65 for i, k, v in self._impl._items: 

66 if i == identity: 

67 return v 

68 if default is not _marker: 

69 return default 

70 raise KeyError("Key not found: %r" % key) 

71 

72 # Mapping interface # 

73 

74 def __getitem__(self, key): 

75 return self.getone(key) 

76 

77 def get(self, key, default=None): 

78 """Get first value matching the key. 

79 

80 The method is alias for .getone(). 

81 """ 

82 return self.getone(key, default) 

83 

84 def __iter__(self): 

85 return iter(self.keys()) 

86 

87 def __len__(self): 

88 return len(self._impl._items) 

89 

90 def keys(self): 

91 """Return a new view of the dictionary's keys.""" 

92 return _KeysView(self._impl) 

93 

94 def items(self): 

95 """Return a new view of the dictionary's items *(key, value) pairs).""" 

96 return _ItemsView(self._impl) 

97 

98 def values(self): 

99 """Return a new view of the dictionary's values.""" 

100 return _ValuesView(self._impl) 

101 

102 def __eq__(self, other): 

103 if not isinstance(other, abc.Mapping): 

104 return NotImplemented 

105 if isinstance(other, _Base): 

106 lft = self._impl._items 

107 rht = other._impl._items 

108 if len(lft) != len(rht): 

109 return False 

110 for (i1, k2, v1), (i2, k2, v2) in zip(lft, rht): 

111 if i1 != i2 or v1 != v2: 

112 return False 

113 return True 

114 if len(self._impl._items) != len(other): 

115 return False 

116 for k, v in self.items(): 

117 nv = other.get(k, _marker) 

118 if v != nv: 

119 return False 

120 return True 

121 

122 def __contains__(self, key): 

123 identity = self._title(key) 

124 for i, k, v in self._impl._items: 

125 if i == identity: 

126 return True 

127 return False 

128 

129 def __repr__(self): 

130 body = ", ".join("'{}': {!r}".format(k, v) for k, v in self.items()) 

131 return "<{}({})>".format(self.__class__.__name__, body) 

132 

133 

134class MultiDictProxy(_Base, MultiMapping): 

135 """Read-only proxy for MultiDict instance.""" 

136 

137 def __init__(self, arg): 

138 if not isinstance(arg, (MultiDict, MultiDictProxy)): 

139 raise TypeError( 

140 "ctor requires MultiDict or MultiDictProxy instance" 

141 ", not {}".format(type(arg)) 

142 ) 

143 

144 self._impl = arg._impl 

145 

146 def __reduce__(self): 

147 raise TypeError("can't pickle {} objects".format(self.__class__.__name__)) 

148 

149 def copy(self): 

150 """Return a copy of itself.""" 

151 return MultiDict(self.items()) 

152 

153 

154class CIMultiDictProxy(MultiDictProxy): 

155 """Read-only proxy for CIMultiDict instance.""" 

156 

157 def __init__(self, arg): 

158 if not isinstance(arg, (CIMultiDict, CIMultiDictProxy)): 

159 raise TypeError( 

160 "ctor requires CIMultiDict or CIMultiDictProxy instance" 

161 ", not {}".format(type(arg)) 

162 ) 

163 

164 self._impl = arg._impl 

165 

166 def _title(self, key): 

167 return key.title() 

168 

169 def copy(self): 

170 """Return a copy of itself.""" 

171 return CIMultiDict(self.items()) 

172 

173 

174class MultiDict(_Base, MutableMultiMapping): 

175 """Dictionary with the support for duplicate keys.""" 

176 

177 def __init__(self, *args, **kwargs): 

178 self._impl = _Impl() 

179 

180 self._extend(args, kwargs, self.__class__.__name__, self._extend_items) 

181 

182 if sys.implementation.name != "pypy": 

183 

184 def __sizeof__(self): 

185 return object.__sizeof__(self) + sys.getsizeof(self._impl) 

186 

187 def __reduce__(self): 

188 return (self.__class__, (list(self.items()),)) 

189 

190 def _title(self, key): 

191 return key 

192 

193 def _key(self, key): 

194 if isinstance(key, str): 

195 return key 

196 else: 

197 raise TypeError( 

198 "MultiDict keys should be either str " "or subclasses of str" 

199 ) 

200 

201 def add(self, key, value): 

202 identity = self._title(key) 

203 self._impl._items.append((identity, self._key(key), value)) 

204 self._impl.incr_version() 

205 

206 def copy(self): 

207 """Return a copy of itself.""" 

208 cls = self.__class__ 

209 return cls(self.items()) 

210 

211 __copy__ = copy 

212 

213 def extend(self, *args, **kwargs): 

214 """Extend current MultiDict with more values. 

215 

216 This method must be used instead of update. 

217 """ 

218 self._extend(args, kwargs, "extend", self._extend_items) 

219 

220 def _extend(self, args, kwargs, name, method): 

221 if len(args) > 1: 

222 raise TypeError( 

223 "{} takes at most 1 positional argument" 

224 " ({} given)".format(name, len(args)) 

225 ) 

226 if args: 

227 arg = args[0] 

228 if isinstance(args[0], (MultiDict, MultiDictProxy)) and not kwargs: 

229 items = arg._impl._items 

230 else: 

231 if hasattr(arg, "items"): 

232 arg = arg.items() 

233 if kwargs: 

234 arg = list(arg) 

235 arg.extend(list(kwargs.items())) 

236 items = [] 

237 for item in arg: 

238 if not len(item) == 2: 

239 raise TypeError( 

240 "{} takes either dict or list of (key, value) " 

241 "tuples".format(name) 

242 ) 

243 items.append((self._title(item[0]), self._key(item[0]), item[1])) 

244 

245 method(items) 

246 else: 

247 method( 

248 [ 

249 (self._title(key), self._key(key), value) 

250 for key, value in kwargs.items() 

251 ] 

252 ) 

253 

254 def _extend_items(self, items): 

255 for identity, key, value in items: 

256 self.add(key, value) 

257 

258 def clear(self): 

259 """Remove all items from MultiDict.""" 

260 self._impl._items.clear() 

261 self._impl.incr_version() 

262 

263 # Mapping interface # 

264 

265 def __setitem__(self, key, value): 

266 self._replace(key, value) 

267 

268 def __delitem__(self, key): 

269 identity = self._title(key) 

270 items = self._impl._items 

271 found = False 

272 for i in range(len(items) - 1, -1, -1): 

273 if items[i][0] == identity: 

274 del items[i] 

275 found = True 

276 if not found: 

277 raise KeyError(key) 

278 else: 

279 self._impl.incr_version() 

280 

281 def setdefault(self, key, default=None): 

282 """Return value for key, set value to default if key is not present.""" 

283 identity = self._title(key) 

284 for i, k, v in self._impl._items: 

285 if i == identity: 

286 return v 

287 self.add(key, default) 

288 return default 

289 

290 def popone(self, key, default=_marker): 

291 """Remove specified key and return the corresponding value. 

292 

293 If key is not found, d is returned if given, otherwise 

294 KeyError is raised. 

295 

296 """ 

297 identity = self._title(key) 

298 for i in range(len(self._impl._items)): 

299 if self._impl._items[i][0] == identity: 

300 value = self._impl._items[i][2] 

301 del self._impl._items[i] 

302 self._impl.incr_version() 

303 return value 

304 if default is _marker: 

305 raise KeyError(key) 

306 else: 

307 return default 

308 

309 pop = popone # type: ignore 

310 

311 def popall(self, key, default=_marker): 

312 """Remove all occurrences of key and return the list of corresponding 

313 values. 

314 

315 If key is not found, default is returned if given, otherwise 

316 KeyError is raised. 

317 

318 """ 

319 found = False 

320 identity = self._title(key) 

321 ret = [] 

322 for i in range(len(self._impl._items) - 1, -1, -1): 

323 item = self._impl._items[i] 

324 if item[0] == identity: 

325 ret.append(item[2]) 

326 del self._impl._items[i] 

327 self._impl.incr_version() 

328 found = True 

329 if not found: 

330 if default is _marker: 

331 raise KeyError(key) 

332 else: 

333 return default 

334 else: 

335 ret.reverse() 

336 return ret 

337 

338 def popitem(self): 

339 """Remove and return an arbitrary (key, value) pair.""" 

340 if self._impl._items: 

341 i = self._impl._items.pop(0) 

342 self._impl.incr_version() 

343 return i[1], i[2] 

344 else: 

345 raise KeyError("empty multidict") 

346 

347 def update(self, *args, **kwargs): 

348 """Update the dictionary from *other*, overwriting existing keys.""" 

349 self._extend(args, kwargs, "update", self._update_items) 

350 

351 def _update_items(self, items): 

352 if not items: 

353 return 

354 used_keys = {} 

355 for identity, key, value in items: 

356 start = used_keys.get(identity, 0) 

357 for i in range(start, len(self._impl._items)): 

358 item = self._impl._items[i] 

359 if item[0] == identity: 

360 used_keys[identity] = i + 1 

361 self._impl._items[i] = (identity, key, value) 

362 break 

363 else: 

364 self._impl._items.append((identity, key, value)) 

365 used_keys[identity] = len(self._impl._items) 

366 

367 # drop tails 

368 i = 0 

369 while i < len(self._impl._items): 

370 item = self._impl._items[i] 

371 identity = item[0] 

372 pos = used_keys.get(identity) 

373 if pos is None: 

374 i += 1 

375 continue 

376 if i >= pos: 

377 del self._impl._items[i] 

378 else: 

379 i += 1 

380 

381 self._impl.incr_version() 

382 

383 def _replace(self, key, value): 

384 key = self._key(key) 

385 identity = self._title(key) 

386 items = self._impl._items 

387 

388 for i in range(len(items)): 

389 item = items[i] 

390 if item[0] == identity: 

391 items[i] = (identity, key, value) 

392 # i points to last found item 

393 rgt = i 

394 self._impl.incr_version() 

395 break 

396 else: 

397 self._impl._items.append((identity, key, value)) 

398 self._impl.incr_version() 

399 return 

400 

401 # remove all tail items 

402 i = rgt + 1 

403 while i < len(items): 

404 item = items[i] 

405 if item[0] == identity: 

406 del items[i] 

407 else: 

408 i += 1 

409 

410 

411class CIMultiDict(MultiDict): 

412 """Dictionary with the support for duplicate case-insensitive keys.""" 

413 

414 def _title(self, key): 

415 return key.title() 

416 

417 

418class _Iter: 

419 __slots__ = ("_size", "_iter") 

420 

421 def __init__(self, size, iterator): 

422 self._size = size 

423 self._iter = iterator 

424 

425 def __iter__(self): 

426 return self 

427 

428 def __next__(self): 

429 return next(self._iter) 

430 

431 def __length_hint__(self): 

432 return self._size 

433 

434 

435class _ViewBase: 

436 def __init__(self, impl): 

437 self._impl = impl 

438 

439 def __len__(self): 

440 return len(self._impl._items) 

441 

442 

443class _ItemsView(_ViewBase, abc.ItemsView): 

444 def __contains__(self, item): 

445 assert isinstance(item, tuple) or isinstance(item, list) 

446 assert len(item) == 2 

447 for i, k, v in self._impl._items: 

448 if item[0] == k and item[1] == v: 

449 return True 

450 return False 

451 

452 def __iter__(self): 

453 return _Iter(len(self), self._iter(self._impl._version)) 

454 

455 def _iter(self, version): 

456 for i, k, v in self._impl._items: 

457 if version != self._impl._version: 

458 raise RuntimeError("Dictionary changed during iteration") 

459 yield k, v 

460 

461 def __repr__(self): 

462 lst = [] 

463 for item in self._impl._items: 

464 lst.append("{!r}: {!r}".format(item[1], item[2])) 

465 body = ", ".join(lst) 

466 return "{}({})".format(self.__class__.__name__, body) 

467 

468 

469class _ValuesView(_ViewBase, abc.ValuesView): 

470 def __contains__(self, value): 

471 for item in self._impl._items: 

472 if item[2] == value: 

473 return True 

474 return False 

475 

476 def __iter__(self): 

477 return _Iter(len(self), self._iter(self._impl._version)) 

478 

479 def _iter(self, version): 

480 for item in self._impl._items: 

481 if version != self._impl._version: 

482 raise RuntimeError("Dictionary changed during iteration") 

483 yield item[2] 

484 

485 def __repr__(self): 

486 lst = [] 

487 for item in self._impl._items: 

488 lst.append("{!r}".format(item[2])) 

489 body = ", ".join(lst) 

490 return "{}({})".format(self.__class__.__name__, body) 

491 

492 

493class _KeysView(_ViewBase, abc.KeysView): 

494 def __contains__(self, key): 

495 for item in self._impl._items: 

496 if item[1] == key: 

497 return True 

498 return False 

499 

500 def __iter__(self): 

501 return _Iter(len(self), self._iter(self._impl._version)) 

502 

503 def _iter(self, version): 

504 for item in self._impl._items: 

505 if version != self._impl._version: 

506 raise RuntimeError("Dictionary changed during iteration") 

507 yield item[1] 

508 

509 def __repr__(self): 

510 lst = [] 

511 for item in self._impl._items: 

512 lst.append("{!r}".format(item[1])) 

513 body = ", ".join(lst) 

514 return "{}({})".format(self.__class__.__name__, body)