Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/urllib3/_collections.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

235 statements  

1from __future__ import annotations 

2 

3import typing 

4from collections import OrderedDict 

5from enum import Enum, auto 

6from threading import RLock 

7 

8if typing.TYPE_CHECKING: 

9 # We can only import Protocol if TYPE_CHECKING because it's a development 

10 # dependency, and is not available at runtime. 

11 from typing import Protocol 

12 

13 from typing_extensions import Self 

14 

15 class HasGettableStringKeys(Protocol): 

16 def keys(self) -> typing.Iterator[str]: 

17 ... 

18 

19 def __getitem__(self, key: str) -> str: 

20 ... 

21 

22 

23__all__ = ["RecentlyUsedContainer", "HTTPHeaderDict"] 

24 

25 

26# Key type 

27_KT = typing.TypeVar("_KT") 

28# Value type 

29_VT = typing.TypeVar("_VT") 

30# Default type 

31_DT = typing.TypeVar("_DT") 

32 

33ValidHTTPHeaderSource = typing.Union[ 

34 "HTTPHeaderDict", 

35 typing.Mapping[str, str], 

36 typing.Iterable[typing.Tuple[str, str]], 

37 "HasGettableStringKeys", 

38] 

39 

40 

41class _Sentinel(Enum): 

42 not_passed = auto() 

43 

44 

45def ensure_can_construct_http_header_dict( 

46 potential: object, 

47) -> ValidHTTPHeaderSource | None: 

48 if isinstance(potential, HTTPHeaderDict): 

49 return potential 

50 elif isinstance(potential, typing.Mapping): 

51 # Full runtime checking of the contents of a Mapping is expensive, so for the 

52 # purposes of typechecking, we assume that any Mapping is the right shape. 

53 return typing.cast(typing.Mapping[str, str], potential) 

54 elif isinstance(potential, typing.Iterable): 

55 # Similarly to Mapping, full runtime checking of the contents of an Iterable is 

56 # expensive, so for the purposes of typechecking, we assume that any Iterable 

57 # is the right shape. 

58 return typing.cast(typing.Iterable[typing.Tuple[str, str]], potential) 

59 elif hasattr(potential, "keys") and hasattr(potential, "__getitem__"): 

60 return typing.cast("HasGettableStringKeys", potential) 

61 else: 

62 return None 

63 

64 

65class RecentlyUsedContainer(typing.Generic[_KT, _VT], typing.MutableMapping[_KT, _VT]): 

66 """ 

67 Provides a thread-safe dict-like container which maintains up to 

68 ``maxsize`` keys while throwing away the least-recently-used keys beyond 

69 ``maxsize``. 

70 

71 :param maxsize: 

72 Maximum number of recent elements to retain. 

73 

74 :param dispose_func: 

75 Every time an item is evicted from the container, 

76 ``dispose_func(value)`` is called. Callback which will get called 

77 """ 

78 

79 _container: typing.OrderedDict[_KT, _VT] 

80 _maxsize: int 

81 dispose_func: typing.Callable[[_VT], None] | None 

82 lock: RLock 

83 

84 def __init__( 

85 self, 

86 maxsize: int = 10, 

87 dispose_func: typing.Callable[[_VT], None] | None = None, 

88 ) -> None: 

89 super().__init__() 

90 self._maxsize = maxsize 

91 self.dispose_func = dispose_func 

92 self._container = OrderedDict() 

93 self.lock = RLock() 

94 

95 def __getitem__(self, key: _KT) -> _VT: 

96 # Re-insert the item, moving it to the end of the eviction line. 

97 with self.lock: 

98 item = self._container.pop(key) 

99 self._container[key] = item 

100 return item 

101 

102 def __setitem__(self, key: _KT, value: _VT) -> None: 

103 evicted_item = None 

104 with self.lock: 

105 # Possibly evict the existing value of 'key' 

106 try: 

107 # If the key exists, we'll overwrite it, which won't change the 

108 # size of the pool. Because accessing a key should move it to 

109 # the end of the eviction line, we pop it out first. 

110 evicted_item = key, self._container.pop(key) 

111 self._container[key] = value 

112 except KeyError: 

113 # When the key does not exist, we insert the value first so that 

114 # evicting works in all cases, including when self._maxsize is 0 

115 self._container[key] = value 

116 if len(self._container) > self._maxsize: 

117 # If we didn't evict an existing value, and we've hit our maximum 

118 # size, then we have to evict the least recently used item from 

119 # the beginning of the container. 

120 evicted_item = self._container.popitem(last=False) 

121 

122 # After releasing the lock on the pool, dispose of any evicted value. 

123 if evicted_item is not None and self.dispose_func: 

124 _, evicted_value = evicted_item 

125 self.dispose_func(evicted_value) 

126 

127 def __delitem__(self, key: _KT) -> None: 

128 with self.lock: 

129 value = self._container.pop(key) 

130 

131 if self.dispose_func: 

132 self.dispose_func(value) 

133 

134 def __len__(self) -> int: 

135 with self.lock: 

136 return len(self._container) 

137 

138 def __iter__(self) -> typing.NoReturn: 

139 raise NotImplementedError( 

140 "Iteration over this class is unlikely to be threadsafe." 

141 ) 

142 

143 def clear(self) -> None: 

144 with self.lock: 

145 # Copy pointers to all values, then wipe the mapping 

146 values = list(self._container.values()) 

147 self._container.clear() 

148 

149 if self.dispose_func: 

150 for value in values: 

151 self.dispose_func(value) 

152 

153 def keys(self) -> set[_KT]: # type: ignore[override] 

154 with self.lock: 

155 return set(self._container.keys()) 

156 

157 

158class HTTPHeaderDictItemView(typing.Set[typing.Tuple[str, str]]): 

159 """ 

160 HTTPHeaderDict is unusual for a Mapping[str, str] in that it has two modes of 

161 address. 

162 

163 If we directly try to get an item with a particular name, we will get a string 

164 back that is the concatenated version of all the values: 

165 

166 >>> d['X-Header-Name'] 

167 'Value1, Value2, Value3' 

168 

169 However, if we iterate over an HTTPHeaderDict's items, we will optionally combine 

170 these values based on whether combine=True was called when building up the dictionary 

171 

172 >>> d = HTTPHeaderDict({"A": "1", "B": "foo"}) 

173 >>> d.add("A", "2", combine=True) 

174 >>> d.add("B", "bar") 

175 >>> list(d.items()) 

176 [ 

177 ('A', '1, 2'), 

178 ('B', 'foo'), 

179 ('B', 'bar'), 

180 ] 

181 

182 This class conforms to the interface required by the MutableMapping ABC while 

183 also giving us the nonstandard iteration behavior we want; items with duplicate 

184 keys, ordered by time of first insertion. 

185 """ 

186 

187 _headers: HTTPHeaderDict 

188 

189 def __init__(self, headers: HTTPHeaderDict) -> None: 

190 self._headers = headers 

191 

192 def __len__(self) -> int: 

193 return len(list(self._headers.iteritems())) 

194 

195 def __iter__(self) -> typing.Iterator[tuple[str, str]]: 

196 return self._headers.iteritems() 

197 

198 def __contains__(self, item: object) -> bool: 

199 if isinstance(item, tuple) and len(item) == 2: 

200 passed_key, passed_val = item 

201 if isinstance(passed_key, str) and isinstance(passed_val, str): 

202 return self._headers._has_value_for_header(passed_key, passed_val) 

203 return False 

204 

205 

206class HTTPHeaderDict(typing.MutableMapping[str, str]): 

207 """ 

208 :param headers: 

209 An iterable of field-value pairs. Must not contain multiple field names 

210 when compared case-insensitively. 

211 

212 :param kwargs: 

213 Additional field-value pairs to pass in to ``dict.update``. 

214 

215 A ``dict`` like container for storing HTTP Headers. 

216 

217 Field names are stored and compared case-insensitively in compliance with 

218 RFC 7230. Iteration provides the first case-sensitive key seen for each 

219 case-insensitive pair. 

220 

221 Using ``__setitem__`` syntax overwrites fields that compare equal 

222 case-insensitively in order to maintain ``dict``'s api. For fields that 

223 compare equal, instead create a new ``HTTPHeaderDict`` and use ``.add`` 

224 in a loop. 

225 

226 If multiple fields that are equal case-insensitively are passed to the 

227 constructor or ``.update``, the behavior is undefined and some will be 

228 lost. 

229 

230 >>> headers = HTTPHeaderDict() 

231 >>> headers.add('Set-Cookie', 'foo=bar') 

232 >>> headers.add('set-cookie', 'baz=quxx') 

233 >>> headers['content-length'] = '7' 

234 >>> headers['SET-cookie'] 

235 'foo=bar, baz=quxx' 

236 >>> headers['Content-Length'] 

237 '7' 

238 """ 

239 

240 _container: typing.MutableMapping[str, list[str]] 

241 

242 def __init__(self, headers: ValidHTTPHeaderSource | None = None, **kwargs: str): 

243 super().__init__() 

244 self._container = {} # 'dict' is insert-ordered 

245 if headers is not None: 

246 if isinstance(headers, HTTPHeaderDict): 

247 self._copy_from(headers) 

248 else: 

249 self.extend(headers) 

250 if kwargs: 

251 self.extend(kwargs) 

252 

253 def __setitem__(self, key: str, val: str) -> None: 

254 # avoid a bytes/str comparison by decoding before httplib 

255 if isinstance(key, bytes): 

256 key = key.decode("latin-1") 

257 self._container[key.lower()] = [key, val] 

258 

259 def __getitem__(self, key: str) -> str: 

260 val = self._container[key.lower()] 

261 return ", ".join(val[1:]) 

262 

263 def __delitem__(self, key: str) -> None: 

264 del self._container[key.lower()] 

265 

266 def __contains__(self, key: object) -> bool: 

267 if isinstance(key, str): 

268 return key.lower() in self._container 

269 return False 

270 

271 def setdefault(self, key: str, default: str = "") -> str: 

272 return super().setdefault(key, default) 

273 

274 def __eq__(self, other: object) -> bool: 

275 maybe_constructable = ensure_can_construct_http_header_dict(other) 

276 if maybe_constructable is None: 

277 return False 

278 else: 

279 other_as_http_header_dict = type(self)(maybe_constructable) 

280 

281 return {k.lower(): v for k, v in self.itermerged()} == { 

282 k.lower(): v for k, v in other_as_http_header_dict.itermerged() 

283 } 

284 

285 def __ne__(self, other: object) -> bool: 

286 return not self.__eq__(other) 

287 

288 def __len__(self) -> int: 

289 return len(self._container) 

290 

291 def __iter__(self) -> typing.Iterator[str]: 

292 # Only provide the originally cased names 

293 for vals in self._container.values(): 

294 yield vals[0] 

295 

296 def discard(self, key: str) -> None: 

297 try: 

298 del self[key] 

299 except KeyError: 

300 pass 

301 

302 def add(self, key: str, val: str, *, combine: bool = False) -> None: 

303 """Adds a (name, value) pair, doesn't overwrite the value if it already 

304 exists. 

305 

306 If this is called with combine=True, instead of adding a new header value 

307 as a distinct item during iteration, this will instead append the value to 

308 any existing header value with a comma. If no existing header value exists 

309 for the key, then the value will simply be added, ignoring the combine parameter. 

310 

311 >>> headers = HTTPHeaderDict(foo='bar') 

312 >>> headers.add('Foo', 'baz') 

313 >>> headers['foo'] 

314 'bar, baz' 

315 >>> list(headers.items()) 

316 [('foo', 'bar'), ('foo', 'baz')] 

317 >>> headers.add('foo', 'quz', combine=True) 

318 >>> list(headers.items()) 

319 [('foo', 'bar, baz, quz')] 

320 """ 

321 # avoid a bytes/str comparison by decoding before httplib 

322 if isinstance(key, bytes): 

323 key = key.decode("latin-1") 

324 key_lower = key.lower() 

325 new_vals = [key, val] 

326 # Keep the common case aka no item present as fast as possible 

327 vals = self._container.setdefault(key_lower, new_vals) 

328 if new_vals is not vals: 

329 # if there are values here, then there is at least the initial 

330 # key/value pair 

331 assert len(vals) >= 2 

332 if combine: 

333 vals[-1] = vals[-1] + ", " + val 

334 else: 

335 vals.append(val) 

336 

337 def extend(self, *args: ValidHTTPHeaderSource, **kwargs: str) -> None: 

338 """Generic import function for any type of header-like object. 

339 Adapted version of MutableMapping.update in order to insert items 

340 with self.add instead of self.__setitem__ 

341 """ 

342 if len(args) > 1: 

343 raise TypeError( 

344 f"extend() takes at most 1 positional arguments ({len(args)} given)" 

345 ) 

346 other = args[0] if len(args) >= 1 else () 

347 

348 if isinstance(other, HTTPHeaderDict): 

349 for key, val in other.iteritems(): 

350 self.add(key, val) 

351 elif isinstance(other, typing.Mapping): 

352 for key, val in other.items(): 

353 self.add(key, val) 

354 elif isinstance(other, typing.Iterable): 

355 other = typing.cast(typing.Iterable[typing.Tuple[str, str]], other) 

356 for key, value in other: 

357 self.add(key, value) 

358 elif hasattr(other, "keys") and hasattr(other, "__getitem__"): 

359 # THIS IS NOT A TYPESAFE BRANCH 

360 # In this branch, the object has a `keys` attr but is not a Mapping or any of 

361 # the other types indicated in the method signature. We do some stuff with 

362 # it as though it partially implements the Mapping interface, but we're not 

363 # doing that stuff safely AT ALL. 

364 for key in other.keys(): 

365 self.add(key, other[key]) 

366 

367 for key, value in kwargs.items(): 

368 self.add(key, value) 

369 

370 @typing.overload 

371 def getlist(self, key: str) -> list[str]: 

372 ... 

373 

374 @typing.overload 

375 def getlist(self, key: str, default: _DT) -> list[str] | _DT: 

376 ... 

377 

378 def getlist( 

379 self, key: str, default: _Sentinel | _DT = _Sentinel.not_passed 

380 ) -> list[str] | _DT: 

381 """Returns a list of all the values for the named field. Returns an 

382 empty list if the key doesn't exist.""" 

383 try: 

384 vals = self._container[key.lower()] 

385 except KeyError: 

386 if default is _Sentinel.not_passed: 

387 # _DT is unbound; empty list is instance of List[str] 

388 return [] 

389 # _DT is bound; default is instance of _DT 

390 return default 

391 else: 

392 # _DT may or may not be bound; vals[1:] is instance of List[str], which 

393 # meets our external interface requirement of `Union[List[str], _DT]`. 

394 return vals[1:] 

395 

396 def _prepare_for_method_change(self) -> Self: 

397 """ 

398 Remove content-specific header fields before changing the request 

399 method to GET or HEAD according to RFC 9110, Section 15.4. 

400 """ 

401 content_specific_headers = [ 

402 "Content-Encoding", 

403 "Content-Language", 

404 "Content-Location", 

405 "Content-Type", 

406 "Content-Length", 

407 "Digest", 

408 "Last-Modified", 

409 ] 

410 for header in content_specific_headers: 

411 self.discard(header) 

412 return self 

413 

414 # Backwards compatibility for httplib 

415 getheaders = getlist 

416 getallmatchingheaders = getlist 

417 iget = getlist 

418 

419 # Backwards compatibility for http.cookiejar 

420 get_all = getlist 

421 

422 def __repr__(self) -> str: 

423 return f"{type(self).__name__}({dict(self.itermerged())})" 

424 

425 def _copy_from(self, other: HTTPHeaderDict) -> None: 

426 for key in other: 

427 val = other.getlist(key) 

428 self._container[key.lower()] = [key, *val] 

429 

430 def copy(self) -> Self: 

431 clone = type(self)() 

432 clone._copy_from(self) 

433 return clone 

434 

435 def iteritems(self) -> typing.Iterator[tuple[str, str]]: 

436 """Iterate over all header lines, including duplicate ones.""" 

437 for key in self: 

438 vals = self._container[key.lower()] 

439 for val in vals[1:]: 

440 yield vals[0], val 

441 

442 def itermerged(self) -> typing.Iterator[tuple[str, str]]: 

443 """Iterate over all headers, merging duplicate ones together.""" 

444 for key in self: 

445 val = self._container[key.lower()] 

446 yield val[0], ", ".join(val[1:]) 

447 

448 def items(self) -> HTTPHeaderDictItemView: # type: ignore[override] 

449 return HTTPHeaderDictItemView(self) 

450 

451 def _has_value_for_header(self, header_name: str, potential_value: str) -> bool: 

452 if header_name in self: 

453 return potential_value in self._container[header_name.lower()][1:] 

454 return False 

455 

456 def __ior__(self, other: object) -> HTTPHeaderDict: 

457 # Supports extending a header dict in-place using operator |= 

458 # combining items with add instead of __setitem__ 

459 maybe_constructable = ensure_can_construct_http_header_dict(other) 

460 if maybe_constructable is None: 

461 return NotImplemented 

462 self.extend(maybe_constructable) 

463 return self 

464 

465 def __or__(self, other: object) -> Self: 

466 # Supports merging header dicts using operator | 

467 # combining items with add instead of __setitem__ 

468 maybe_constructable = ensure_can_construct_http_header_dict(other) 

469 if maybe_constructable is None: 

470 return NotImplemented 

471 result = self.copy() 

472 result.extend(maybe_constructable) 

473 return result 

474 

475 def __ror__(self, other: object) -> Self: 

476 # Supports merging header dicts using operator | when other is on left side 

477 # combining items with add instead of __setitem__ 

478 maybe_constructable = ensure_can_construct_http_header_dict(other) 

479 if maybe_constructable is None: 

480 return NotImplemented 

481 result = type(self)(maybe_constructable) 

482 result.extend(self) 

483 return result