Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/urllib3/_collections.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

231 statements  

1from __future__ import annotations 

2 

3import typing 

4from collections import OrderedDict 

5from enum import Enum, auto 

6from threading import RLock 

7 

8if typing.TYPE_CHECKING: 

9 # We can only import Protocol if TYPE_CHECKING because it's a development 

10 # dependency, and is not available at runtime. 

11 from typing import Protocol 

12 

13 from typing_extensions import Self 

14 

15 class HasGettableStringKeys(Protocol): 

16 def keys(self) -> typing.Iterator[str]: ... 

17 

18 def __getitem__(self, key: str) -> str: ... 

19 

20 

21__all__ = ["RecentlyUsedContainer", "HTTPHeaderDict"] 

22 

23 

24# Key type 

25_KT = typing.TypeVar("_KT") 

26# Value type 

27_VT = typing.TypeVar("_VT") 

28# Default type 

29_DT = typing.TypeVar("_DT") 

30 

31ValidHTTPHeaderSource = typing.Union[ 

32 "HTTPHeaderDict", 

33 typing.Mapping[str, str], 

34 typing.Iterable[tuple[str, str]], 

35 "HasGettableStringKeys", 

36] 

37 

38 

39class _Sentinel(Enum): 

40 not_passed = auto() 

41 

42 

43def ensure_can_construct_http_header_dict( 

44 potential: object, 

45) -> ValidHTTPHeaderSource | None: 

46 if isinstance(potential, HTTPHeaderDict): 

47 return potential 

48 elif isinstance(potential, typing.Mapping): 

49 # Full runtime checking of the contents of a Mapping is expensive, so for the 

50 # purposes of typechecking, we assume that any Mapping is the right shape. 

51 return typing.cast(typing.Mapping[str, str], potential) 

52 elif isinstance(potential, typing.Iterable): 

53 # Similarly to Mapping, full runtime checking of the contents of an Iterable is 

54 # expensive, so for the purposes of typechecking, we assume that any Iterable 

55 # is the right shape. 

56 return typing.cast(typing.Iterable[tuple[str, str]], potential) 

57 elif hasattr(potential, "keys") and hasattr(potential, "__getitem__"): 

58 return typing.cast("HasGettableStringKeys", potential) 

59 else: 

60 return None 

61 

62 

63class RecentlyUsedContainer(typing.Generic[_KT, _VT], typing.MutableMapping[_KT, _VT]): 

64 """ 

65 Provides a thread-safe dict-like container which maintains up to 

66 ``maxsize`` keys while throwing away the least-recently-used keys beyond 

67 ``maxsize``. 

68 

69 :param maxsize: 

70 Maximum number of recent elements to retain. 

71 

72 :param dispose_func: 

73 Every time an item is evicted from the container, 

74 ``dispose_func(value)`` is called. Callback which will get called 

75 """ 

76 

77 _container: typing.OrderedDict[_KT, _VT] 

78 _maxsize: int 

79 dispose_func: typing.Callable[[_VT], None] | None 

80 lock: RLock 

81 

82 def __init__( 

83 self, 

84 maxsize: int = 10, 

85 dispose_func: typing.Callable[[_VT], None] | None = None, 

86 ) -> None: 

87 super().__init__() 

88 self._maxsize = maxsize 

89 self.dispose_func = dispose_func 

90 self._container = OrderedDict() 

91 self.lock = RLock() 

92 

93 def __getitem__(self, key: _KT) -> _VT: 

94 # Re-insert the item, moving it to the end of the eviction line. 

95 with self.lock: 

96 item = self._container.pop(key) 

97 self._container[key] = item 

98 return item 

99 

100 def __setitem__(self, key: _KT, value: _VT) -> None: 

101 evicted_item = None 

102 with self.lock: 

103 # Possibly evict the existing value of 'key' 

104 try: 

105 # If the key exists, we'll overwrite it, which won't change the 

106 # size of the pool. Because accessing a key should move it to 

107 # the end of the eviction line, we pop it out first. 

108 evicted_item = key, self._container.pop(key) 

109 self._container[key] = value 

110 except KeyError: 

111 # When the key does not exist, we insert the value first so that 

112 # evicting works in all cases, including when self._maxsize is 0 

113 self._container[key] = value 

114 if len(self._container) > self._maxsize: 

115 # If we didn't evict an existing value, and we've hit our maximum 

116 # size, then we have to evict the least recently used item from 

117 # the beginning of the container. 

118 evicted_item = self._container.popitem(last=False) 

119 

120 # After releasing the lock on the pool, dispose of any evicted value. 

121 if evicted_item is not None and self.dispose_func: 

122 _, evicted_value = evicted_item 

123 self.dispose_func(evicted_value) 

124 

125 def __delitem__(self, key: _KT) -> None: 

126 with self.lock: 

127 value = self._container.pop(key) 

128 

129 if self.dispose_func: 

130 self.dispose_func(value) 

131 

132 def __len__(self) -> int: 

133 with self.lock: 

134 return len(self._container) 

135 

136 def __iter__(self) -> typing.NoReturn: 

137 raise NotImplementedError( 

138 "Iteration over this class is unlikely to be threadsafe." 

139 ) 

140 

141 def clear(self) -> None: 

142 with self.lock: 

143 # Copy pointers to all values, then wipe the mapping 

144 values = list(self._container.values()) 

145 self._container.clear() 

146 

147 if self.dispose_func: 

148 for value in values: 

149 self.dispose_func(value) 

150 

151 def keys(self) -> set[_KT]: # type: ignore[override] 

152 with self.lock: 

153 return set(self._container.keys()) 

154 

155 

156class HTTPHeaderDictItemView(set[tuple[str, str]]): 

157 """ 

158 HTTPHeaderDict is unusual for a Mapping[str, str] in that it has two modes of 

159 address. 

160 

161 If we directly try to get an item with a particular name, we will get a string 

162 back that is the concatenated version of all the values: 

163 

164 >>> d['X-Header-Name'] 

165 'Value1, Value2, Value3' 

166 

167 However, if we iterate over an HTTPHeaderDict's items, we will optionally combine 

168 these values based on whether combine=True was called when building up the dictionary 

169 

170 >>> d = HTTPHeaderDict({"A": "1", "B": "foo"}) 

171 >>> d.add("A", "2", combine=True) 

172 >>> d.add("B", "bar") 

173 >>> list(d.items()) 

174 [ 

175 ('A', '1, 2'), 

176 ('B', 'foo'), 

177 ('B', 'bar'), 

178 ] 

179 

180 This class conforms to the interface required by the MutableMapping ABC while 

181 also giving us the nonstandard iteration behavior we want; items with duplicate 

182 keys, ordered by time of first insertion. 

183 """ 

184 

185 _headers: HTTPHeaderDict 

186 

187 def __init__(self, headers: HTTPHeaderDict) -> None: 

188 self._headers = headers 

189 

190 def __len__(self) -> int: 

191 return len(list(self._headers.iteritems())) 

192 

193 def __iter__(self) -> typing.Iterator[tuple[str, str]]: 

194 return self._headers.iteritems() 

195 

196 def __contains__(self, item: object) -> bool: 

197 if isinstance(item, tuple) and len(item) == 2: 

198 passed_key, passed_val = item 

199 if isinstance(passed_key, str) and isinstance(passed_val, str): 

200 return self._headers._has_value_for_header(passed_key, passed_val) 

201 return False 

202 

203 

204class HTTPHeaderDict(typing.MutableMapping[str, str]): 

205 """ 

206 :param headers: 

207 An iterable of field-value pairs. Must not contain multiple field names 

208 when compared case-insensitively. 

209 

210 :param kwargs: 

211 Additional field-value pairs to pass in to ``dict.update``. 

212 

213 A ``dict`` like container for storing HTTP Headers. 

214 

215 Field names are stored and compared case-insensitively in compliance with 

216 RFC 7230. Iteration provides the first case-sensitive key seen for each 

217 case-insensitive pair. 

218 

219 Using ``__setitem__`` syntax overwrites fields that compare equal 

220 case-insensitively in order to maintain ``dict``'s api. For fields that 

221 compare equal, instead create a new ``HTTPHeaderDict`` and use ``.add`` 

222 in a loop. 

223 

224 If multiple fields that are equal case-insensitively are passed to the 

225 constructor or ``.update``, the behavior is undefined and some will be 

226 lost. 

227 

228 >>> headers = HTTPHeaderDict() 

229 >>> headers.add('Set-Cookie', 'foo=bar') 

230 >>> headers.add('set-cookie', 'baz=quxx') 

231 >>> headers['content-length'] = '7' 

232 >>> headers['SET-cookie'] 

233 'foo=bar, baz=quxx' 

234 >>> headers['Content-Length'] 

235 '7' 

236 """ 

237 

238 _container: typing.MutableMapping[str, list[str]] 

239 

240 def __init__(self, headers: ValidHTTPHeaderSource | None = None, **kwargs: str): 

241 super().__init__() 

242 self._container = {} # 'dict' is insert-ordered 

243 if headers is not None: 

244 if isinstance(headers, HTTPHeaderDict): 

245 self._copy_from(headers) 

246 else: 

247 self.extend(headers) 

248 if kwargs: 

249 self.extend(kwargs) 

250 

251 def __setitem__(self, key: str, val: str) -> None: 

252 # avoid a bytes/str comparison by decoding before httplib 

253 if isinstance(key, bytes): 

254 key = key.decode("latin-1") 

255 self._container[key.lower()] = [key, val] 

256 

257 def __getitem__(self, key: str) -> str: 

258 val = self._container[key.lower()] 

259 return ", ".join(val[1:]) 

260 

261 def __delitem__(self, key: str) -> None: 

262 del self._container[key.lower()] 

263 

264 def __contains__(self, key: object) -> bool: 

265 if isinstance(key, str): 

266 return key.lower() in self._container 

267 return False 

268 

269 def setdefault(self, key: str, default: str = "") -> str: 

270 return super().setdefault(key, default) 

271 

272 def __eq__(self, other: object) -> bool: 

273 maybe_constructable = ensure_can_construct_http_header_dict(other) 

274 if maybe_constructable is None: 

275 return False 

276 else: 

277 other_as_http_header_dict = type(self)(maybe_constructable) 

278 

279 return {k.lower(): v for k, v in self.itermerged()} == { 

280 k.lower(): v for k, v in other_as_http_header_dict.itermerged() 

281 } 

282 

283 def __ne__(self, other: object) -> bool: 

284 return not self.__eq__(other) 

285 

286 def __len__(self) -> int: 

287 return len(self._container) 

288 

289 def __iter__(self) -> typing.Iterator[str]: 

290 # Only provide the originally cased names 

291 for vals in self._container.values(): 

292 yield vals[0] 

293 

294 def discard(self, key: str) -> None: 

295 try: 

296 del self[key] 

297 except KeyError: 

298 pass 

299 

300 def add(self, key: str, val: str, *, combine: bool = False) -> None: 

301 """Adds a (name, value) pair, doesn't overwrite the value if it already 

302 exists. 

303 

304 If this is called with combine=True, instead of adding a new header value 

305 as a distinct item during iteration, this will instead append the value to 

306 any existing header value with a comma. If no existing header value exists 

307 for the key, then the value will simply be added, ignoring the combine parameter. 

308 

309 >>> headers = HTTPHeaderDict(foo='bar') 

310 >>> headers.add('Foo', 'baz') 

311 >>> headers['foo'] 

312 'bar, baz' 

313 >>> list(headers.items()) 

314 [('foo', 'bar'), ('foo', 'baz')] 

315 >>> headers.add('foo', 'quz', combine=True) 

316 >>> list(headers.items()) 

317 [('foo', 'bar, baz, quz')] 

318 """ 

319 # avoid a bytes/str comparison by decoding before httplib 

320 if isinstance(key, bytes): 

321 key = key.decode("latin-1") 

322 key_lower = key.lower() 

323 new_vals = [key, val] 

324 # Keep the common case aka no item present as fast as possible 

325 vals = self._container.setdefault(key_lower, new_vals) 

326 if new_vals is not vals: 

327 # if there are values here, then there is at least the initial 

328 # key/value pair 

329 assert len(vals) >= 2 

330 if combine: 

331 vals[-1] = vals[-1] + ", " + val 

332 else: 

333 vals.append(val) 

334 

335 def extend(self, *args: ValidHTTPHeaderSource, **kwargs: str) -> None: 

336 """Generic import function for any type of header-like object. 

337 Adapted version of MutableMapping.update in order to insert items 

338 with self.add instead of self.__setitem__ 

339 """ 

340 if len(args) > 1: 

341 raise TypeError( 

342 f"extend() takes at most 1 positional arguments ({len(args)} given)" 

343 ) 

344 other = args[0] if len(args) >= 1 else () 

345 

346 if isinstance(other, HTTPHeaderDict): 

347 for key, val in other.iteritems(): 

348 self.add(key, val) 

349 elif isinstance(other, typing.Mapping): 

350 for key, val in other.items(): 

351 self.add(key, val) 

352 elif isinstance(other, typing.Iterable): 

353 other = typing.cast(typing.Iterable[tuple[str, str]], other) 

354 for key, value in other: 

355 self.add(key, value) 

356 elif hasattr(other, "keys") and hasattr(other, "__getitem__"): 

357 # THIS IS NOT A TYPESAFE BRANCH 

358 # In this branch, the object has a `keys` attr but is not a Mapping or any of 

359 # the other types indicated in the method signature. We do some stuff with 

360 # it as though it partially implements the Mapping interface, but we're not 

361 # doing that stuff safely AT ALL. 

362 for key in other.keys(): 

363 self.add(key, other[key]) 

364 

365 for key, value in kwargs.items(): 

366 self.add(key, value) 

367 

368 @typing.overload 

369 def getlist(self, key: str) -> list[str]: ... 

370 

371 @typing.overload 

372 def getlist(self, key: str, default: _DT) -> list[str] | _DT: ... 

373 

374 def getlist( 

375 self, key: str, default: _Sentinel | _DT = _Sentinel.not_passed 

376 ) -> list[str] | _DT: 

377 """Returns a list of all the values for the named field. Returns an 

378 empty list if the key doesn't exist.""" 

379 try: 

380 vals = self._container[key.lower()] 

381 except KeyError: 

382 if default is _Sentinel.not_passed: 

383 # _DT is unbound; empty list is instance of List[str] 

384 return [] 

385 # _DT is bound; default is instance of _DT 

386 return default 

387 else: 

388 # _DT may or may not be bound; vals[1:] is instance of List[str], which 

389 # meets our external interface requirement of `Union[List[str], _DT]`. 

390 return vals[1:] 

391 

392 def _prepare_for_method_change(self) -> Self: 

393 """ 

394 Remove content-specific header fields before changing the request 

395 method to GET or HEAD according to RFC 9110, Section 15.4. 

396 """ 

397 content_specific_headers = [ 

398 "Content-Encoding", 

399 "Content-Language", 

400 "Content-Location", 

401 "Content-Type", 

402 "Content-Length", 

403 "Digest", 

404 "Last-Modified", 

405 ] 

406 for header in content_specific_headers: 

407 self.discard(header) 

408 return self 

409 

410 # Backwards compatibility for httplib 

411 getheaders = getlist 

412 getallmatchingheaders = getlist 

413 iget = getlist 

414 

415 # Backwards compatibility for http.cookiejar 

416 get_all = getlist 

417 

418 def __repr__(self) -> str: 

419 return f"{type(self).__name__}({dict(self.itermerged())})" 

420 

421 def _copy_from(self, other: HTTPHeaderDict) -> None: 

422 for key in other: 

423 val = other.getlist(key) 

424 self._container[key.lower()] = [key, *val] 

425 

426 def copy(self) -> Self: 

427 clone = type(self)() 

428 clone._copy_from(self) 

429 return clone 

430 

431 def iteritems(self) -> typing.Iterator[tuple[str, str]]: 

432 """Iterate over all header lines, including duplicate ones.""" 

433 for key in self: 

434 vals = self._container[key.lower()] 

435 for val in vals[1:]: 

436 yield vals[0], val 

437 

438 def itermerged(self) -> typing.Iterator[tuple[str, str]]: 

439 """Iterate over all headers, merging duplicate ones together.""" 

440 for key in self: 

441 val = self._container[key.lower()] 

442 yield val[0], ", ".join(val[1:]) 

443 

444 def items(self) -> HTTPHeaderDictItemView: # type: ignore[override] 

445 return HTTPHeaderDictItemView(self) 

446 

447 def _has_value_for_header(self, header_name: str, potential_value: str) -> bool: 

448 if header_name in self: 

449 return potential_value in self._container[header_name.lower()][1:] 

450 return False 

451 

452 def __ior__(self, other: object) -> HTTPHeaderDict: 

453 # Supports extending a header dict in-place using operator |= 

454 # combining items with add instead of __setitem__ 

455 maybe_constructable = ensure_can_construct_http_header_dict(other) 

456 if maybe_constructable is None: 

457 return NotImplemented 

458 self.extend(maybe_constructable) 

459 return self 

460 

461 def __or__(self, other: object) -> Self: 

462 # Supports merging header dicts using operator | 

463 # combining items with add instead of __setitem__ 

464 maybe_constructable = ensure_can_construct_http_header_dict(other) 

465 if maybe_constructable is None: 

466 return NotImplemented 

467 result = self.copy() 

468 result.extend(maybe_constructable) 

469 return result 

470 

471 def __ror__(self, other: object) -> Self: 

472 # Supports merging header dicts using operator | when other is on left side 

473 # combining items with add instead of __setitem__ 

474 maybe_constructable = ensure_can_construct_http_header_dict(other) 

475 if maybe_constructable is None: 

476 return NotImplemented 

477 result = type(self)(maybe_constructable) 

478 result.extend(self) 

479 return result