Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/urllib3/_collections.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

239 statements  

1from __future__ import annotations 

2 

3import typing 

4from collections import OrderedDict 

5from enum import Enum, auto 

6from threading import RLock 

7 

8if typing.TYPE_CHECKING: 

9 # We can only import Protocol if TYPE_CHECKING because it's a development 

10 # dependency, and is not available at runtime. 

11 from typing import Protocol 

12 

13 from typing_extensions import Self 

14 

15 class HasGettableStringKeys(Protocol): 

16 def keys(self) -> typing.Iterator[str]: ... 

17 

18 def __getitem__(self, key: str) -> str: ... 

19 

20 

21__all__ = ["RecentlyUsedContainer", "HTTPHeaderDict"] 

22 

23 

24# Key type 

25_KT = typing.TypeVar("_KT") 

26# Value type 

27_VT = typing.TypeVar("_VT") 

28# Default type 

29_DT = typing.TypeVar("_DT") 

30 

31ValidHTTPHeaderSource = typing.Union[ 

32 "HTTPHeaderDict", 

33 typing.Mapping[str, str], 

34 typing.Iterable[tuple[str, str]], 

35 "HasGettableStringKeys", 

36] 

37 

38 

39class _Sentinel(Enum): 

40 not_passed = auto() 

41 

42 

43def ensure_can_construct_http_header_dict( 

44 potential: object, 

45) -> ValidHTTPHeaderSource | None: 

46 if isinstance(potential, HTTPHeaderDict): 

47 return potential 

48 elif isinstance(potential, typing.Mapping): 

49 # Full runtime checking of the contents of a Mapping is expensive, so for the 

50 # purposes of typechecking, we assume that any Mapping is the right shape. 

51 return typing.cast(typing.Mapping[str, str], potential) 

52 elif isinstance(potential, typing.Iterable): 

53 # Similarly to Mapping, full runtime checking of the contents of an Iterable is 

54 # expensive, so for the purposes of typechecking, we assume that any Iterable 

55 # is the right shape. 

56 return typing.cast(typing.Iterable[tuple[str, str]], potential) 

57 elif hasattr(potential, "keys") and hasattr(potential, "__getitem__"): 

58 return typing.cast("HasGettableStringKeys", potential) 

59 else: 

60 return None 

61 

62 

63class RecentlyUsedContainer(typing.Generic[_KT, _VT], typing.MutableMapping[_KT, _VT]): 

64 """ 

65 Provides a thread-safe dict-like container which maintains up to 

66 ``maxsize`` keys while throwing away the least-recently-used keys beyond 

67 ``maxsize``. 

68 

69 :param maxsize: 

70 Maximum number of recent elements to retain. 

71 

72 :param dispose_func: 

73 Every time an item is evicted from the container, 

74 ``dispose_func(value)`` is called. Callback which will get called 

75 """ 

76 

77 _container: typing.OrderedDict[_KT, _VT] 

78 _maxsize: int 

79 dispose_func: typing.Callable[[_VT], None] | None 

80 lock: RLock 

81 

82 def __init__( 

83 self, 

84 maxsize: int = 10, 

85 dispose_func: typing.Callable[[_VT], None] | None = None, 

86 ) -> None: 

87 super().__init__() 

88 self._maxsize = maxsize 

89 self.dispose_func = dispose_func 

90 self._container = OrderedDict() 

91 self.lock = RLock() 

92 

93 def __getitem__(self, key: _KT) -> _VT: 

94 # Re-insert the item, moving it to the end of the eviction line. 

95 with self.lock: 

96 item = self._container.pop(key) 

97 self._container[key] = item 

98 return item 

99 

100 def __setitem__(self, key: _KT, value: _VT) -> None: 

101 evicted_item = None 

102 with self.lock: 

103 # Possibly evict the existing value of 'key' 

104 try: 

105 # If the key exists, we'll overwrite it, which won't change the 

106 # size of the pool. Because accessing a key should move it to 

107 # the end of the eviction line, we pop it out first. 

108 evicted_item = key, self._container.pop(key) 

109 self._container[key] = value 

110 except KeyError: 

111 # When the key does not exist, we insert the value first so that 

112 # evicting works in all cases, including when self._maxsize is 0 

113 self._container[key] = value 

114 if len(self._container) > self._maxsize: 

115 # If we didn't evict an existing value, and we've hit our maximum 

116 # size, then we have to evict the least recently used item from 

117 # the beginning of the container. 

118 evicted_item = self._container.popitem(last=False) 

119 

120 # After releasing the lock on the pool, dispose of any evicted value. 

121 if evicted_item is not None and self.dispose_func: 

122 _, evicted_value = evicted_item 

123 self.dispose_func(evicted_value) 

124 

125 def __delitem__(self, key: _KT) -> None: 

126 with self.lock: 

127 value = self._container.pop(key) 

128 

129 if self.dispose_func: 

130 self.dispose_func(value) 

131 

132 def __len__(self) -> int: 

133 with self.lock: 

134 return len(self._container) 

135 

136 def __iter__(self) -> typing.NoReturn: 

137 raise NotImplementedError( 

138 "Iteration over this class is unlikely to be threadsafe." 

139 ) 

140 

141 def clear(self) -> None: 

142 with self.lock: 

143 # Copy pointers to all values, then wipe the mapping 

144 values = list(self._container.values()) 

145 self._container.clear() 

146 

147 if self.dispose_func: 

148 for value in values: 

149 self.dispose_func(value) 

150 

151 def keys(self) -> set[_KT]: # type: ignore[override] 

152 with self.lock: 

153 return set(self._container.keys()) 

154 

155 

156class HTTPHeaderDictItemView(set[tuple[str, str]]): 

157 """ 

158 HTTPHeaderDict is unusual for a Mapping[str, str] in that it has two modes of 

159 address. 

160 

161 If we directly try to get an item with a particular name, we will get a string 

162 back that is the concatenated version of all the values: 

163 

164 >>> d['X-Header-Name'] 

165 'Value1, Value2, Value3' 

166 

167 However, if we iterate over an HTTPHeaderDict's items, we will optionally combine 

168 these values based on whether combine=True was called when building up the dictionary 

169 

170 >>> d = HTTPHeaderDict({"A": "1", "B": "foo"}) 

171 >>> d.add("A", "2", combine=True) 

172 >>> d.add("B", "bar") 

173 >>> list(d.items()) 

174 [ 

175 ('A', '1, 2'), 

176 ('B', 'foo'), 

177 ('B', 'bar'), 

178 ] 

179 

180 This class conforms to the interface required by the MutableMapping ABC while 

181 also giving us the nonstandard iteration behavior we want; items with duplicate 

182 keys, ordered by time of first insertion. 

183 """ 

184 

185 _headers: HTTPHeaderDict 

186 

187 def __init__(self, headers: HTTPHeaderDict) -> None: 

188 self._headers = headers 

189 

190 def __len__(self) -> int: 

191 return len(list(self._headers.iteritems())) 

192 

193 def __iter__(self) -> typing.Iterator[tuple[str, str]]: 

194 return self._headers.iteritems() 

195 

196 def __contains__(self, item: object) -> bool: 

197 if isinstance(item, tuple) and len(item) == 2: 

198 passed_key, passed_val = item 

199 if isinstance(passed_key, str) and isinstance(passed_val, str): 

200 return self._headers._has_value_for_header(passed_key, passed_val) 

201 return False 

202 

203 

204class HTTPHeaderDict(typing.MutableMapping[str, str]): 

205 """ 

206 :param headers: 

207 An iterable of field-value pairs. Must not contain multiple field names 

208 when compared case-insensitively. 

209 

210 :param kwargs: 

211 Additional field-value pairs to pass in to ``dict.update``. 

212 

213 A ``dict`` like container for storing HTTP Headers. 

214 

215 Field names are stored and compared case-insensitively in compliance with 

216 RFC 7230. Iteration provides the first case-sensitive key seen for each 

217 case-insensitive pair. 

218 

219 Using ``__setitem__`` syntax overwrites fields that compare equal 

220 case-insensitively in order to maintain ``dict``'s api. For fields that 

221 compare equal, instead create a new ``HTTPHeaderDict`` and use ``.add`` 

222 in a loop. 

223 

224 If multiple fields that are equal case-insensitively are passed to the 

225 constructor or ``.update``, the behavior is undefined and some will be 

226 lost. 

227 

228 >>> headers = HTTPHeaderDict() 

229 >>> headers.add('Set-Cookie', 'foo=bar') 

230 >>> headers.add('set-cookie', 'baz=quxx') 

231 >>> headers['content-length'] = '7' 

232 >>> headers['SET-cookie'] 

233 'foo=bar, baz=quxx' 

234 >>> headers['Content-Length'] 

235 '7' 

236 """ 

237 

238 _container: typing.MutableMapping[str, list[str]] 

239 

240 def __init__(self, headers: ValidHTTPHeaderSource | None = None, **kwargs: str): 

241 super().__init__() 

242 self._container = {} # 'dict' is insert-ordered 

243 if headers is not None: 

244 if isinstance(headers, HTTPHeaderDict): 

245 self._copy_from(headers) 

246 else: 

247 self.extend(headers) 

248 if kwargs: 

249 self.extend(kwargs) 

250 

251 def __setitem__(self, key: str, val: str) -> None: 

252 # avoid a bytes/str comparison by decoding before httplib 

253 if isinstance(key, bytes): 

254 key = key.decode("latin-1") 

255 self._container[key.lower()] = [key, val] 

256 

257 def __getitem__(self, key: str) -> str: 

258 if isinstance(key, bytes): 

259 key = key.decode("latin-1") 

260 val = self._container[key.lower()] 

261 return ", ".join(val[1:]) 

262 

263 def __delitem__(self, key: str) -> None: 

264 if isinstance(key, bytes): 

265 key = key.decode("latin-1") 

266 del self._container[key.lower()] 

267 

268 def __contains__(self, key: object) -> bool: 

269 if isinstance(key, bytes): 

270 key = key.decode("latin-1") 

271 if isinstance(key, str): 

272 return key.lower() in self._container 

273 return False 

274 

275 def setdefault(self, key: str, default: str = "") -> str: 

276 return super().setdefault(key, default) 

277 

278 def __eq__(self, other: object) -> bool: 

279 maybe_constructable = ensure_can_construct_http_header_dict(other) 

280 if maybe_constructable is None: 

281 return False 

282 else: 

283 other_as_http_header_dict = type(self)(maybe_constructable) 

284 

285 return {k.lower(): v for k, v in self.itermerged()} == { 

286 k.lower(): v for k, v in other_as_http_header_dict.itermerged() 

287 } 

288 

289 def __ne__(self, other: object) -> bool: 

290 return not self.__eq__(other) 

291 

292 def __len__(self) -> int: 

293 return len(self._container) 

294 

295 def __iter__(self) -> typing.Iterator[str]: 

296 # Only provide the originally cased names 

297 for vals in self._container.values(): 

298 yield vals[0] 

299 

300 def discard(self, key: str) -> None: 

301 try: 

302 del self[key] 

303 except KeyError: 

304 pass 

305 

306 def add(self, key: str, val: str, *, combine: bool = False) -> None: 

307 """Adds a (name, value) pair, doesn't overwrite the value if it already 

308 exists. 

309 

310 If this is called with combine=True, instead of adding a new header value 

311 as a distinct item during iteration, this will instead append the value to 

312 any existing header value with a comma. If no existing header value exists 

313 for the key, then the value will simply be added, ignoring the combine parameter. 

314 

315 >>> headers = HTTPHeaderDict(foo='bar') 

316 >>> headers.add('Foo', 'baz') 

317 >>> headers['foo'] 

318 'bar, baz' 

319 >>> list(headers.items()) 

320 [('foo', 'bar'), ('foo', 'baz')] 

321 >>> headers.add('foo', 'quz', combine=True) 

322 >>> list(headers.items()) 

323 [('foo', 'bar, baz, quz')] 

324 """ 

325 # avoid a bytes/str comparison by decoding before httplib 

326 if isinstance(key, bytes): 

327 key = key.decode("latin-1") 

328 key_lower = key.lower() 

329 new_vals = [key, val] 

330 # Keep the common case aka no item present as fast as possible 

331 vals = self._container.setdefault(key_lower, new_vals) 

332 if new_vals is not vals: 

333 # if there are values here, then there is at least the initial 

334 # key/value pair 

335 assert len(vals) >= 2 

336 if combine: 

337 vals[-1] = vals[-1] + ", " + val 

338 else: 

339 vals.append(val) 

340 

341 def extend(self, *args: ValidHTTPHeaderSource, **kwargs: str) -> None: 

342 """Generic import function for any type of header-like object. 

343 Adapted version of MutableMapping.update in order to insert items 

344 with self.add instead of self.__setitem__ 

345 """ 

346 if len(args) > 1: 

347 raise TypeError( 

348 f"extend() takes at most 1 positional arguments ({len(args)} given)" 

349 ) 

350 other = args[0] if len(args) >= 1 else () 

351 

352 if isinstance(other, HTTPHeaderDict): 

353 for key, val in other.iteritems(): 

354 self.add(key, val) 

355 elif isinstance(other, typing.Mapping): 

356 for key, val in other.items(): 

357 self.add(key, val) 

358 elif isinstance(other, typing.Iterable): 

359 other = typing.cast(typing.Iterable[tuple[str, str]], other) 

360 for key, value in other: 

361 self.add(key, value) 

362 elif hasattr(other, "keys") and hasattr(other, "__getitem__"): 

363 # THIS IS NOT A TYPESAFE BRANCH 

364 # In this branch, the object has a `keys` attr but is not a Mapping or any of 

365 # the other types indicated in the method signature. We do some stuff with 

366 # it as though it partially implements the Mapping interface, but we're not 

367 # doing that stuff safely AT ALL. 

368 for key in other.keys(): 

369 self.add(key, other[key]) 

370 

371 for key, value in kwargs.items(): 

372 self.add(key, value) 

373 

374 @typing.overload 

375 def getlist(self, key: str) -> list[str]: ... 

376 

377 @typing.overload 

378 def getlist(self, key: str, default: _DT) -> list[str] | _DT: ... 

379 

380 def getlist( 

381 self, key: str, default: _Sentinel | _DT = _Sentinel.not_passed 

382 ) -> list[str] | _DT: 

383 """Returns a list of all the values for the named field. Returns an 

384 empty list if the key doesn't exist.""" 

385 if isinstance(key, bytes): 

386 key = key.decode("latin-1") 

387 try: 

388 vals = self._container[key.lower()] 

389 except KeyError: 

390 if default is _Sentinel.not_passed: 

391 # _DT is unbound; empty list is instance of List[str] 

392 return [] 

393 # _DT is bound; default is instance of _DT 

394 return default 

395 else: 

396 # _DT may or may not be bound; vals[1:] is instance of List[str], which 

397 # meets our external interface requirement of `Union[List[str], _DT]`. 

398 return vals[1:] 

399 

400 def _prepare_for_method_change(self) -> Self: 

401 """ 

402 Remove content-specific header fields before changing the request 

403 method to GET or HEAD according to RFC 9110, Section 15.4. 

404 """ 

405 content_specific_headers = [ 

406 "Content-Encoding", 

407 "Content-Language", 

408 "Content-Location", 

409 "Content-Type", 

410 "Content-Length", 

411 "Digest", 

412 "Last-Modified", 

413 ] 

414 for header in content_specific_headers: 

415 self.discard(header) 

416 return self 

417 

418 # Backwards compatibility for httplib 

419 getheaders = getlist 

420 getallmatchingheaders = getlist 

421 iget = getlist 

422 

423 # Backwards compatibility for http.cookiejar 

424 get_all = getlist 

425 

426 def __repr__(self) -> str: 

427 return f"{type(self).__name__}({dict(self.itermerged())})" 

428 

429 def _copy_from(self, other: HTTPHeaderDict) -> None: 

430 for key in other: 

431 val = other.getlist(key) 

432 self._container[key.lower()] = [key, *val] 

433 

434 def copy(self) -> Self: 

435 clone = type(self)() 

436 clone._copy_from(self) 

437 return clone 

438 

439 def iteritems(self) -> typing.Iterator[tuple[str, str]]: 

440 """Iterate over all header lines, including duplicate ones.""" 

441 for key in self: 

442 vals = self._container[key.lower()] 

443 for val in vals[1:]: 

444 yield vals[0], val 

445 

446 def itermerged(self) -> typing.Iterator[tuple[str, str]]: 

447 """Iterate over all headers, merging duplicate ones together.""" 

448 for key in self: 

449 val = self._container[key.lower()] 

450 yield val[0], ", ".join(val[1:]) 

451 

452 def items(self) -> HTTPHeaderDictItemView: # type: ignore[override] 

453 return HTTPHeaderDictItemView(self) 

454 

455 def _has_value_for_header(self, header_name: str, potential_value: str) -> bool: 

456 if header_name in self: 

457 return potential_value in self._container[header_name.lower()][1:] 

458 return False 

459 

460 def __ior__(self, other: object) -> HTTPHeaderDict: 

461 # Supports extending a header dict in-place using operator |= 

462 # combining items with add instead of __setitem__ 

463 maybe_constructable = ensure_can_construct_http_header_dict(other) 

464 if maybe_constructable is None: 

465 return NotImplemented 

466 self.extend(maybe_constructable) 

467 return self 

468 

469 def __or__(self, other: object) -> Self: 

470 # Supports merging header dicts using operator | 

471 # combining items with add instead of __setitem__ 

472 maybe_constructable = ensure_can_construct_http_header_dict(other) 

473 if maybe_constructable is None: 

474 return NotImplemented 

475 result = self.copy() 

476 result.extend(maybe_constructable) 

477 return result 

478 

479 def __ror__(self, other: object) -> Self: 

480 # Supports merging header dicts using operator | when other is on left side 

481 # combining items with add instead of __setitem__ 

482 maybe_constructable = ensure_can_construct_http_header_dict(other) 

483 if maybe_constructable is None: 

484 return NotImplemented 

485 result = type(self)(maybe_constructable) 

486 result.extend(self) 

487 return result