Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_models.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

597 statements  

1from __future__ import annotations 

2 

3import datetime 

4import email.message 

5import json as jsonlib 

6import typing 

7import urllib.request 

8from collections.abc import Mapping 

9from http.cookiejar import Cookie, CookieJar 

10 

11from ._content import ByteStream, UnattachedStream, encode_request, encode_response 

12from ._decoders import ( 

13 SUPPORTED_DECODERS, 

14 ByteChunker, 

15 ContentDecoder, 

16 IdentityDecoder, 

17 LineDecoder, 

18 MultiDecoder, 

19 TextChunker, 

20 TextDecoder, 

21) 

22from ._exceptions import ( 

23 CookieConflict, 

24 HTTPStatusError, 

25 RequestNotRead, 

26 ResponseNotRead, 

27 StreamClosed, 

28 StreamConsumed, 

29 request_context, 

30) 

31from ._multipart import get_multipart_boundary_from_content_type 

32from ._status_codes import codes 

33from ._types import ( 

34 AsyncByteStream, 

35 CookieTypes, 

36 HeaderTypes, 

37 QueryParamTypes, 

38 RequestContent, 

39 RequestData, 

40 RequestExtensions, 

41 RequestFiles, 

42 ResponseContent, 

43 ResponseExtensions, 

44 SyncByteStream, 

45) 

46from ._urls import URL 

47from ._utils import ( 

48 is_known_encoding, 

49 normalize_header_key, 

50 normalize_header_value, 

51 obfuscate_sensitive_headers, 

52 parse_content_type_charset, 

53 parse_header_links, 

54) 

55 

56__all__ = ["Cookies", "Headers", "Request", "Response"] 

57 

58 

59class Headers(typing.MutableMapping[str, str]): 

60 """ 

61 HTTP headers, as a case-insensitive multi-dict. 

62 """ 

63 

64 def __init__( 

65 self, 

66 headers: HeaderTypes | None = None, 

67 encoding: str | None = None, 

68 ) -> None: 

69 if headers is None: 

70 self._list = [] # type: typing.List[typing.Tuple[bytes, bytes, bytes]] 

71 elif isinstance(headers, Headers): 

72 self._list = list(headers._list) 

73 elif isinstance(headers, Mapping): 

74 self._list = [ 

75 ( 

76 normalize_header_key(k, lower=False, encoding=encoding), 

77 normalize_header_key(k, lower=True, encoding=encoding), 

78 normalize_header_value(v, encoding), 

79 ) 

80 for k, v in headers.items() 

81 ] 

82 else: 

83 self._list = [ 

84 ( 

85 normalize_header_key(k, lower=False, encoding=encoding), 

86 normalize_header_key(k, lower=True, encoding=encoding), 

87 normalize_header_value(v, encoding), 

88 ) 

89 for k, v in headers 

90 ] 

91 

92 self._encoding = encoding 

93 

94 @property 

95 def encoding(self) -> str: 

96 """ 

97 Header encoding is mandated as ascii, but we allow fallbacks to utf-8 

98 or iso-8859-1. 

99 """ 

100 if self._encoding is None: 

101 for encoding in ["ascii", "utf-8"]: 

102 for key, value in self.raw: 

103 try: 

104 key.decode(encoding) 

105 value.decode(encoding) 

106 except UnicodeDecodeError: 

107 break 

108 else: 

109 # The else block runs if 'break' did not occur, meaning 

110 # all values fitted the encoding. 

111 self._encoding = encoding 

112 break 

113 else: 

114 # The ISO-8859-1 encoding covers all 256 code points in a byte, 

115 # so will never raise decode errors. 

116 self._encoding = "iso-8859-1" 

117 return self._encoding 

118 

119 @encoding.setter 

120 def encoding(self, value: str) -> None: 

121 self._encoding = value 

122 

123 @property 

124 def raw(self) -> list[tuple[bytes, bytes]]: 

125 """ 

126 Returns a list of the raw header items, as byte pairs. 

127 """ 

128 return [(raw_key, value) for raw_key, _, value in self._list] 

129 

130 def keys(self) -> typing.KeysView[str]: 

131 return {key.decode(self.encoding): None for _, key, value in self._list}.keys() 

132 

133 def values(self) -> typing.ValuesView[str]: 

134 values_dict: dict[str, str] = {} 

135 for _, key, value in self._list: 

136 str_key = key.decode(self.encoding) 

137 str_value = value.decode(self.encoding) 

138 if str_key in values_dict: 

139 values_dict[str_key] += f", {str_value}" 

140 else: 

141 values_dict[str_key] = str_value 

142 return values_dict.values() 

143 

144 def items(self) -> typing.ItemsView[str, str]: 

145 """ 

146 Return `(key, value)` items of headers. Concatenate headers 

147 into a single comma separated value when a key occurs multiple times. 

148 """ 

149 values_dict: dict[str, str] = {} 

150 for _, key, value in self._list: 

151 str_key = key.decode(self.encoding) 

152 str_value = value.decode(self.encoding) 

153 if str_key in values_dict: 

154 values_dict[str_key] += f", {str_value}" 

155 else: 

156 values_dict[str_key] = str_value 

157 return values_dict.items() 

158 

159 def multi_items(self) -> list[tuple[str, str]]: 

160 """ 

161 Return a list of `(key, value)` pairs of headers. Allow multiple 

162 occurrences of the same key without concatenating into a single 

163 comma separated value. 

164 """ 

165 return [ 

166 (key.decode(self.encoding), value.decode(self.encoding)) 

167 for _, key, value in self._list 

168 ] 

169 

170 def get(self, key: str, default: typing.Any = None) -> typing.Any: 

171 """ 

172 Return a header value. If multiple occurrences of the header occur 

173 then concatenate them together with commas. 

174 """ 

175 try: 

176 return self[key] 

177 except KeyError: 

178 return default 

179 

180 def get_list(self, key: str, split_commas: bool = False) -> list[str]: 

181 """ 

182 Return a list of all header values for a given key. 

183 If `split_commas=True` is passed, then any comma separated header 

184 values are split into multiple return strings. 

185 """ 

186 get_header_key = key.lower().encode(self.encoding) 

187 

188 values = [ 

189 item_value.decode(self.encoding) 

190 for _, item_key, item_value in self._list 

191 if item_key.lower() == get_header_key 

192 ] 

193 

194 if not split_commas: 

195 return values 

196 

197 split_values = [] 

198 for value in values: 

199 split_values.extend([item.strip() for item in value.split(",")]) 

200 return split_values 

201 

202 def update(self, headers: HeaderTypes | None = None) -> None: # type: ignore 

203 headers = Headers(headers) 

204 for key in headers.keys(): 

205 if key in self: 

206 self.pop(key) 

207 self._list.extend(headers._list) 

208 

209 def copy(self) -> Headers: 

210 return Headers(self, encoding=self.encoding) 

211 

212 def __getitem__(self, key: str) -> str: 

213 """ 

214 Return a single header value. 

215 

216 If there are multiple headers with the same key, then we concatenate 

217 them with commas. See: https://tools.ietf.org/html/rfc7230#section-3.2.2 

218 """ 

219 normalized_key = key.lower().encode(self.encoding) 

220 

221 items = [ 

222 header_value.decode(self.encoding) 

223 for _, header_key, header_value in self._list 

224 if header_key == normalized_key 

225 ] 

226 

227 if items: 

228 return ", ".join(items) 

229 

230 raise KeyError(key) 

231 

232 def __setitem__(self, key: str, value: str) -> None: 

233 """ 

234 Set the header `key` to `value`, removing any duplicate entries. 

235 Retains insertion order. 

236 """ 

237 set_key = key.encode(self._encoding or "utf-8") 

238 set_value = value.encode(self._encoding or "utf-8") 

239 lookup_key = set_key.lower() 

240 

241 found_indexes = [ 

242 idx 

243 for idx, (_, item_key, _) in enumerate(self._list) 

244 if item_key == lookup_key 

245 ] 

246 

247 for idx in reversed(found_indexes[1:]): 

248 del self._list[idx] 

249 

250 if found_indexes: 

251 idx = found_indexes[0] 

252 self._list[idx] = (set_key, lookup_key, set_value) 

253 else: 

254 self._list.append((set_key, lookup_key, set_value)) 

255 

256 def __delitem__(self, key: str) -> None: 

257 """ 

258 Remove the header `key`. 

259 """ 

260 del_key = key.lower().encode(self.encoding) 

261 

262 pop_indexes = [ 

263 idx 

264 for idx, (_, item_key, _) in enumerate(self._list) 

265 if item_key.lower() == del_key 

266 ] 

267 

268 if not pop_indexes: 

269 raise KeyError(key) 

270 

271 for idx in reversed(pop_indexes): 

272 del self._list[idx] 

273 

274 def __contains__(self, key: typing.Any) -> bool: 

275 header_key = key.lower().encode(self.encoding) 

276 return header_key in [key for _, key, _ in self._list] 

277 

278 def __iter__(self) -> typing.Iterator[typing.Any]: 

279 return iter(self.keys()) 

280 

281 def __len__(self) -> int: 

282 return len(self._list) 

283 

284 def __eq__(self, other: typing.Any) -> bool: 

285 try: 

286 other_headers = Headers(other) 

287 except ValueError: 

288 return False 

289 

290 self_list = [(key, value) for _, key, value in self._list] 

291 other_list = [(key, value) for _, key, value in other_headers._list] 

292 return sorted(self_list) == sorted(other_list) 

293 

294 def __repr__(self) -> str: 

295 class_name = self.__class__.__name__ 

296 

297 encoding_str = "" 

298 if self.encoding != "ascii": 

299 encoding_str = f", encoding={self.encoding!r}" 

300 

301 as_list = list(obfuscate_sensitive_headers(self.multi_items())) 

302 as_dict = dict(as_list) 

303 

304 no_duplicate_keys = len(as_dict) == len(as_list) 

305 if no_duplicate_keys: 

306 return f"{class_name}({as_dict!r}{encoding_str})" 

307 return f"{class_name}({as_list!r}{encoding_str})" 

308 

309 

310class Request: 

311 def __init__( 

312 self, 

313 method: str | bytes, 

314 url: URL | str, 

315 *, 

316 params: QueryParamTypes | None = None, 

317 headers: HeaderTypes | None = None, 

318 cookies: CookieTypes | None = None, 

319 content: RequestContent | None = None, 

320 data: RequestData | None = None, 

321 files: RequestFiles | None = None, 

322 json: typing.Any | None = None, 

323 stream: SyncByteStream | AsyncByteStream | None = None, 

324 extensions: RequestExtensions | None = None, 

325 ) -> None: 

326 self.method = ( 

327 method.decode("ascii").upper() 

328 if isinstance(method, bytes) 

329 else method.upper() 

330 ) 

331 self.url = URL(url) 

332 if params is not None: 

333 self.url = self.url.copy_merge_params(params=params) 

334 self.headers = Headers(headers) 

335 self.extensions = {} if extensions is None else extensions 

336 

337 if cookies: 

338 Cookies(cookies).set_cookie_header(self) 

339 

340 if stream is None: 

341 content_type: str | None = self.headers.get("content-type") 

342 headers, stream = encode_request( 

343 content=content, 

344 data=data, 

345 files=files, 

346 json=json, 

347 boundary=get_multipart_boundary_from_content_type( 

348 content_type=content_type.encode(self.headers.encoding) 

349 if content_type 

350 else None 

351 ), 

352 ) 

353 self._prepare(headers) 

354 self.stream = stream 

355 # Load the request body, except for streaming content. 

356 if isinstance(stream, ByteStream): 

357 self.read() 

358 else: 

359 # There's an important distinction between `Request(content=...)`, 

360 # and `Request(stream=...)`. 

361 # 

362 # Using `content=...` implies automatically populated `Host` and content 

363 # headers, of either `Content-Length: ...` or `Transfer-Encoding: chunked`. 

364 # 

365 # Using `stream=...` will not automatically include *any* 

366 # auto-populated headers. 

367 # 

368 # As an end-user you don't really need `stream=...`. It's only 

369 # useful when: 

370 # 

371 # * Preserving the request stream when copying requests, eg for redirects. 

372 # * Creating request instances on the *server-side* of the transport API. 

373 self.stream = stream 

374 

375 def _prepare(self, default_headers: dict[str, str]) -> None: 

376 for key, value in default_headers.items(): 

377 # Ignore Transfer-Encoding if the Content-Length has been set explicitly. 

378 if key.lower() == "transfer-encoding" and "Content-Length" in self.headers: 

379 continue 

380 self.headers.setdefault(key, value) 

381 

382 auto_headers: list[tuple[bytes, bytes]] = [] 

383 

384 has_host = "Host" in self.headers 

385 has_content_length = ( 

386 "Content-Length" in self.headers or "Transfer-Encoding" in self.headers 

387 ) 

388 

389 if not has_host and self.url.host: 

390 auto_headers.append((b"Host", self.url.netloc)) 

391 if not has_content_length and self.method in ("POST", "PUT", "PATCH"): 

392 auto_headers.append((b"Content-Length", b"0")) 

393 

394 self.headers = Headers(auto_headers + self.headers.raw) 

395 

396 @property 

397 def content(self) -> bytes: 

398 if not hasattr(self, "_content"): 

399 raise RequestNotRead() 

400 return self._content 

401 

402 def read(self) -> bytes: 

403 """ 

404 Read and return the request content. 

405 """ 

406 if not hasattr(self, "_content"): 

407 assert isinstance(self.stream, typing.Iterable) 

408 self._content = b"".join(self.stream) 

409 if not isinstance(self.stream, ByteStream): 

410 # If a streaming request has been read entirely into memory, then 

411 # we can replace the stream with a raw bytes implementation, 

412 # to ensure that any non-replayable streams can still be used. 

413 self.stream = ByteStream(self._content) 

414 return self._content 

415 

416 async def aread(self) -> bytes: 

417 """ 

418 Read and return the request content. 

419 """ 

420 if not hasattr(self, "_content"): 

421 assert isinstance(self.stream, typing.AsyncIterable) 

422 self._content = b"".join([part async for part in self.stream]) 

423 if not isinstance(self.stream, ByteStream): 

424 # If a streaming request has been read entirely into memory, then 

425 # we can replace the stream with a raw bytes implementation, 

426 # to ensure that any non-replayable streams can still be used. 

427 self.stream = ByteStream(self._content) 

428 return self._content 

429 

430 def __repr__(self) -> str: 

431 class_name = self.__class__.__name__ 

432 url = str(self.url) 

433 return f"<{class_name}({self.method!r}, {url!r})>" 

434 

435 def __getstate__(self) -> dict[str, typing.Any]: 

436 return { 

437 name: value 

438 for name, value in self.__dict__.items() 

439 if name not in ["extensions", "stream"] 

440 } 

441 

442 def __setstate__(self, state: dict[str, typing.Any]) -> None: 

443 for name, value in state.items(): 

444 setattr(self, name, value) 

445 self.extensions = {} 

446 self.stream = UnattachedStream() 

447 

448 

449class Response: 

450 def __init__( 

451 self, 

452 status_code: int, 

453 *, 

454 headers: HeaderTypes | None = None, 

455 content: ResponseContent | None = None, 

456 text: str | None = None, 

457 html: str | None = None, 

458 json: typing.Any = None, 

459 stream: SyncByteStream | AsyncByteStream | None = None, 

460 request: Request | None = None, 

461 extensions: ResponseExtensions | None = None, 

462 history: list[Response] | None = None, 

463 default_encoding: str | typing.Callable[[bytes], str] = "utf-8", 

464 ) -> None: 

465 self.status_code = status_code 

466 self.headers = Headers(headers) 

467 

468 self._request: Request | None = request 

469 

470 # When follow_redirects=False and a redirect is received, 

471 # the client will set `response.next_request`. 

472 self.next_request: Request | None = None 

473 

474 self.extensions: ResponseExtensions = {} if extensions is None else extensions 

475 self.history = [] if history is None else list(history) 

476 

477 self.is_closed = False 

478 self.is_stream_consumed = False 

479 

480 self.default_encoding = default_encoding 

481 

482 if stream is None: 

483 headers, stream = encode_response(content, text, html, json) 

484 self._prepare(headers) 

485 self.stream = stream 

486 if isinstance(stream, ByteStream): 

487 # Load the response body, except for streaming content. 

488 self.read() 

489 else: 

490 # There's an important distinction between `Response(content=...)`, 

491 # and `Response(stream=...)`. 

492 # 

493 # Using `content=...` implies automatically populated content headers, 

494 # of either `Content-Length: ...` or `Transfer-Encoding: chunked`. 

495 # 

496 # Using `stream=...` will not automatically include any content headers. 

497 # 

498 # As an end-user you don't really need `stream=...`. It's only 

499 # useful when creating response instances having received a stream 

500 # from the transport API. 

501 self.stream = stream 

502 

503 self._num_bytes_downloaded = 0 

504 

505 def _prepare(self, default_headers: dict[str, str]) -> None: 

506 for key, value in default_headers.items(): 

507 # Ignore Transfer-Encoding if the Content-Length has been set explicitly. 

508 if key.lower() == "transfer-encoding" and "content-length" in self.headers: 

509 continue 

510 self.headers.setdefault(key, value) 

511 

512 @property 

513 def elapsed(self) -> datetime.timedelta: 

514 """ 

515 Returns the time taken for the complete request/response 

516 cycle to complete. 

517 """ 

518 if not hasattr(self, "_elapsed"): 

519 raise RuntimeError( 

520 "'.elapsed' may only be accessed after the response " 

521 "has been read or closed." 

522 ) 

523 return self._elapsed 

524 

525 @elapsed.setter 

526 def elapsed(self, elapsed: datetime.timedelta) -> None: 

527 self._elapsed = elapsed 

528 

529 @property 

530 def request(self) -> Request: 

531 """ 

532 Returns the request instance associated to the current response. 

533 """ 

534 if self._request is None: 

535 raise RuntimeError( 

536 "The request instance has not been set on this response." 

537 ) 

538 return self._request 

539 

540 @request.setter 

541 def request(self, value: Request) -> None: 

542 self._request = value 

543 

544 @property 

545 def http_version(self) -> str: 

546 try: 

547 http_version: bytes = self.extensions["http_version"] 

548 except KeyError: 

549 return "HTTP/1.1" 

550 else: 

551 return http_version.decode("ascii", errors="ignore") 

552 

553 @property 

554 def reason_phrase(self) -> str: 

555 try: 

556 reason_phrase: bytes = self.extensions["reason_phrase"] 

557 except KeyError: 

558 return codes.get_reason_phrase(self.status_code) 

559 else: 

560 return reason_phrase.decode("ascii", errors="ignore") 

561 

562 @property 

563 def url(self) -> URL: 

564 """ 

565 Returns the URL for which the request was made. 

566 """ 

567 return self.request.url 

568 

569 @property 

570 def content(self) -> bytes: 

571 if not hasattr(self, "_content"): 

572 raise ResponseNotRead() 

573 return self._content 

574 

575 @property 

576 def text(self) -> str: 

577 if not hasattr(self, "_text"): 

578 content = self.content 

579 if not content: 

580 self._text = "" 

581 else: 

582 decoder = TextDecoder(encoding=self.encoding or "utf-8") 

583 self._text = "".join([decoder.decode(self.content), decoder.flush()]) 

584 return self._text 

585 

586 @property 

587 def encoding(self) -> str | None: 

588 """ 

589 Return an encoding to use for decoding the byte content into text. 

590 The priority for determining this is given by... 

591 

592 * `.encoding = <>` has been set explicitly. 

593 * The encoding as specified by the charset parameter in the Content-Type header. 

594 * The encoding as determined by `default_encoding`, which may either be 

595 a string like "utf-8" indicating the encoding to use, or may be a callable 

596 which enables charset autodetection. 

597 """ 

598 if not hasattr(self, "_encoding"): 

599 encoding = self.charset_encoding 

600 if encoding is None or not is_known_encoding(encoding): 

601 if isinstance(self.default_encoding, str): 

602 encoding = self.default_encoding 

603 elif hasattr(self, "_content"): 

604 encoding = self.default_encoding(self._content) 

605 self._encoding = encoding or "utf-8" 

606 return self._encoding 

607 

608 @encoding.setter 

609 def encoding(self, value: str) -> None: 

610 """ 

611 Set the encoding to use for decoding the byte content into text. 

612 

613 If the `text` attribute has been accessed, attempting to set the 

614 encoding will throw a ValueError. 

615 """ 

616 if hasattr(self, "_text"): 

617 raise ValueError( 

618 "Setting encoding after `text` has been accessed is not allowed." 

619 ) 

620 self._encoding = value 

621 

622 @property 

623 def charset_encoding(self) -> str | None: 

624 """ 

625 Return the encoding, as specified by the Content-Type header. 

626 """ 

627 content_type = self.headers.get("Content-Type") 

628 if content_type is None: 

629 return None 

630 

631 return parse_content_type_charset(content_type) 

632 

633 def _get_content_decoder(self) -> ContentDecoder: 

634 """ 

635 Returns a decoder instance which can be used to decode the raw byte 

636 content, depending on the Content-Encoding used in the response. 

637 """ 

638 if not hasattr(self, "_decoder"): 

639 decoders: list[ContentDecoder] = [] 

640 values = self.headers.get_list("content-encoding", split_commas=True) 

641 for value in values: 

642 value = value.strip().lower() 

643 try: 

644 decoder_cls = SUPPORTED_DECODERS[value] 

645 decoders.append(decoder_cls()) 

646 except KeyError: 

647 continue 

648 

649 if len(decoders) == 1: 

650 self._decoder = decoders[0] 

651 elif len(decoders) > 1: 

652 self._decoder = MultiDecoder(children=decoders) 

653 else: 

654 self._decoder = IdentityDecoder() 

655 

656 return self._decoder 

657 

658 @property 

659 def is_informational(self) -> bool: 

660 """ 

661 A property which is `True` for 1xx status codes, `False` otherwise. 

662 """ 

663 return codes.is_informational(self.status_code) 

664 

665 @property 

666 def is_success(self) -> bool: 

667 """ 

668 A property which is `True` for 2xx status codes, `False` otherwise. 

669 """ 

670 return codes.is_success(self.status_code) 

671 

672 @property 

673 def is_redirect(self) -> bool: 

674 """ 

675 A property which is `True` for 3xx status codes, `False` otherwise. 

676 

677 Note that not all responses with a 3xx status code indicate a URL redirect. 

678 

679 Use `response.has_redirect_location` to determine responses with a properly 

680 formed URL redirection. 

681 """ 

682 return codes.is_redirect(self.status_code) 

683 

684 @property 

685 def is_client_error(self) -> bool: 

686 """ 

687 A property which is `True` for 4xx status codes, `False` otherwise. 

688 """ 

689 return codes.is_client_error(self.status_code) 

690 

691 @property 

692 def is_server_error(self) -> bool: 

693 """ 

694 A property which is `True` for 5xx status codes, `False` otherwise. 

695 """ 

696 return codes.is_server_error(self.status_code) 

697 

698 @property 

699 def is_error(self) -> bool: 

700 """ 

701 A property which is `True` for 4xx and 5xx status codes, `False` otherwise. 

702 """ 

703 return codes.is_error(self.status_code) 

704 

705 @property 

706 def has_redirect_location(self) -> bool: 

707 """ 

708 Returns True for 3xx responses with a properly formed URL redirection, 

709 `False` otherwise. 

710 """ 

711 return ( 

712 self.status_code 

713 in ( 

714 # 301 (Cacheable redirect. Method may change to GET.) 

715 codes.MOVED_PERMANENTLY, 

716 # 302 (Uncacheable redirect. Method may change to GET.) 

717 codes.FOUND, 

718 # 303 (Client should make a GET or HEAD request.) 

719 codes.SEE_OTHER, 

720 # 307 (Equiv. 302, but retain method) 

721 codes.TEMPORARY_REDIRECT, 

722 # 308 (Equiv. 301, but retain method) 

723 codes.PERMANENT_REDIRECT, 

724 ) 

725 and "Location" in self.headers 

726 ) 

727 

728 def raise_for_status(self) -> Response: 

729 """ 

730 Raise the `HTTPStatusError` if one occurred. 

731 """ 

732 request = self._request 

733 if request is None: 

734 raise RuntimeError( 

735 "Cannot call `raise_for_status` as the request " 

736 "instance has not been set on this response." 

737 ) 

738 

739 if self.is_success: 

740 return self 

741 

742 if self.has_redirect_location: 

743 message = ( 

744 "{error_type} '{0.status_code} {0.reason_phrase}' for url '{0.url}'\n" 

745 "Redirect location: '{0.headers[location]}'\n" 

746 "For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/{0.status_code}" 

747 ) 

748 else: 

749 message = ( 

750 "{error_type} '{0.status_code} {0.reason_phrase}' for url '{0.url}'\n" 

751 "For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/{0.status_code}" 

752 ) 

753 

754 status_class = self.status_code // 100 

755 error_types = { 

756 1: "Informational response", 

757 3: "Redirect response", 

758 4: "Client error", 

759 5: "Server error", 

760 } 

761 error_type = error_types.get(status_class, "Invalid status code") 

762 message = message.format(self, error_type=error_type) 

763 raise HTTPStatusError(message, request=request, response=self) 

764 

765 def json(self, **kwargs: typing.Any) -> typing.Any: 

766 return jsonlib.loads(self.content, **kwargs) 

767 

768 @property 

769 def cookies(self) -> Cookies: 

770 if not hasattr(self, "_cookies"): 

771 self._cookies = Cookies() 

772 self._cookies.extract_cookies(self) 

773 return self._cookies 

774 

775 @property 

776 def links(self) -> dict[str | None, dict[str, str]]: 

777 """ 

778 Returns the parsed header links of the response, if any 

779 """ 

780 header = self.headers.get("link") 

781 if header is None: 

782 return {} 

783 

784 return { 

785 (link.get("rel") or link.get("url")): link 

786 for link in parse_header_links(header) 

787 } 

788 

789 @property 

790 def num_bytes_downloaded(self) -> int: 

791 return self._num_bytes_downloaded 

792 

793 def __repr__(self) -> str: 

794 return f"<Response [{self.status_code} {self.reason_phrase}]>" 

795 

796 def __getstate__(self) -> dict[str, typing.Any]: 

797 return { 

798 name: value 

799 for name, value in self.__dict__.items() 

800 if name not in ["extensions", "stream", "is_closed", "_decoder"] 

801 } 

802 

803 def __setstate__(self, state: dict[str, typing.Any]) -> None: 

804 for name, value in state.items(): 

805 setattr(self, name, value) 

806 self.is_closed = True 

807 self.extensions = {} 

808 self.stream = UnattachedStream() 

809 

810 def read(self) -> bytes: 

811 """ 

812 Read and return the response content. 

813 """ 

814 if not hasattr(self, "_content"): 

815 self._content = b"".join(self.iter_bytes()) 

816 return self._content 

817 

818 def iter_bytes(self, chunk_size: int | None = None) -> typing.Iterator[bytes]: 

819 """ 

820 A byte-iterator over the decoded response content. 

821 This allows us to handle gzip, deflate, brotli, and zstd encoded responses. 

822 """ 

823 if hasattr(self, "_content"): 

824 chunk_size = len(self._content) if chunk_size is None else chunk_size 

825 for i in range(0, len(self._content), max(chunk_size, 1)): 

826 yield self._content[i : i + chunk_size] 

827 else: 

828 decoder = self._get_content_decoder() 

829 chunker = ByteChunker(chunk_size=chunk_size) 

830 with request_context(request=self._request): 

831 for raw_bytes in self.iter_raw(): 

832 decoded = decoder.decode(raw_bytes) 

833 for chunk in chunker.decode(decoded): 

834 yield chunk 

835 decoded = decoder.flush() 

836 for chunk in chunker.decode(decoded): 

837 yield chunk # pragma: no cover 

838 for chunk in chunker.flush(): 

839 yield chunk 

840 

841 def iter_text(self, chunk_size: int | None = None) -> typing.Iterator[str]: 

842 """ 

843 A str-iterator over the decoded response content 

844 that handles both gzip, deflate, etc but also detects the content's 

845 string encoding. 

846 """ 

847 decoder = TextDecoder(encoding=self.encoding or "utf-8") 

848 chunker = TextChunker(chunk_size=chunk_size) 

849 with request_context(request=self._request): 

850 for byte_content in self.iter_bytes(): 

851 text_content = decoder.decode(byte_content) 

852 for chunk in chunker.decode(text_content): 

853 yield chunk 

854 text_content = decoder.flush() 

855 for chunk in chunker.decode(text_content): 

856 yield chunk # pragma: no cover 

857 for chunk in chunker.flush(): 

858 yield chunk 

859 

860 def iter_lines(self) -> typing.Iterator[str]: 

861 decoder = LineDecoder() 

862 with request_context(request=self._request): 

863 for text in self.iter_text(): 

864 for line in decoder.decode(text): 

865 yield line 

866 for line in decoder.flush(): 

867 yield line 

868 

869 def iter_raw(self, chunk_size: int | None = None) -> typing.Iterator[bytes]: 

870 """ 

871 A byte-iterator over the raw response content. 

872 """ 

873 if self.is_stream_consumed: 

874 raise StreamConsumed() 

875 if self.is_closed: 

876 raise StreamClosed() 

877 if not isinstance(self.stream, SyncByteStream): 

878 raise RuntimeError("Attempted to call a sync iterator on an async stream.") 

879 

880 self.is_stream_consumed = True 

881 self._num_bytes_downloaded = 0 

882 chunker = ByteChunker(chunk_size=chunk_size) 

883 

884 with request_context(request=self._request): 

885 for raw_stream_bytes in self.stream: 

886 self._num_bytes_downloaded += len(raw_stream_bytes) 

887 for chunk in chunker.decode(raw_stream_bytes): 

888 yield chunk 

889 

890 for chunk in chunker.flush(): 

891 yield chunk 

892 

893 self.close() 

894 

895 def close(self) -> None: 

896 """ 

897 Close the response and release the connection. 

898 Automatically called if the response body is read to completion. 

899 """ 

900 if not isinstance(self.stream, SyncByteStream): 

901 raise RuntimeError("Attempted to call an sync close on an async stream.") 

902 

903 if not self.is_closed: 

904 self.is_closed = True 

905 with request_context(request=self._request): 

906 self.stream.close() 

907 

908 async def aread(self) -> bytes: 

909 """ 

910 Read and return the response content. 

911 """ 

912 if not hasattr(self, "_content"): 

913 self._content = b"".join([part async for part in self.aiter_bytes()]) 

914 return self._content 

915 

916 async def aiter_bytes( 

917 self, chunk_size: int | None = None 

918 ) -> typing.AsyncIterator[bytes]: 

919 """ 

920 A byte-iterator over the decoded response content. 

921 This allows us to handle gzip, deflate, brotli, and zstd encoded responses. 

922 """ 

923 if hasattr(self, "_content"): 

924 chunk_size = len(self._content) if chunk_size is None else chunk_size 

925 for i in range(0, len(self._content), max(chunk_size, 1)): 

926 yield self._content[i : i + chunk_size] 

927 else: 

928 decoder = self._get_content_decoder() 

929 chunker = ByteChunker(chunk_size=chunk_size) 

930 with request_context(request=self._request): 

931 async for raw_bytes in self.aiter_raw(): 

932 decoded = decoder.decode(raw_bytes) 

933 for chunk in chunker.decode(decoded): 

934 yield chunk 

935 decoded = decoder.flush() 

936 for chunk in chunker.decode(decoded): 

937 yield chunk # pragma: no cover 

938 for chunk in chunker.flush(): 

939 yield chunk 

940 

941 async def aiter_text( 

942 self, chunk_size: int | None = None 

943 ) -> typing.AsyncIterator[str]: 

944 """ 

945 A str-iterator over the decoded response content 

946 that handles both gzip, deflate, etc but also detects the content's 

947 string encoding. 

948 """ 

949 decoder = TextDecoder(encoding=self.encoding or "utf-8") 

950 chunker = TextChunker(chunk_size=chunk_size) 

951 with request_context(request=self._request): 

952 async for byte_content in self.aiter_bytes(): 

953 text_content = decoder.decode(byte_content) 

954 for chunk in chunker.decode(text_content): 

955 yield chunk 

956 text_content = decoder.flush() 

957 for chunk in chunker.decode(text_content): 

958 yield chunk # pragma: no cover 

959 for chunk in chunker.flush(): 

960 yield chunk 

961 

962 async def aiter_lines(self) -> typing.AsyncIterator[str]: 

963 decoder = LineDecoder() 

964 with request_context(request=self._request): 

965 async for text in self.aiter_text(): 

966 for line in decoder.decode(text): 

967 yield line 

968 for line in decoder.flush(): 

969 yield line 

970 

971 async def aiter_raw( 

972 self, chunk_size: int | None = None 

973 ) -> typing.AsyncIterator[bytes]: 

974 """ 

975 A byte-iterator over the raw response content. 

976 """ 

977 if self.is_stream_consumed: 

978 raise StreamConsumed() 

979 if self.is_closed: 

980 raise StreamClosed() 

981 if not isinstance(self.stream, AsyncByteStream): 

982 raise RuntimeError("Attempted to call an async iterator on an sync stream.") 

983 

984 self.is_stream_consumed = True 

985 self._num_bytes_downloaded = 0 

986 chunker = ByteChunker(chunk_size=chunk_size) 

987 

988 with request_context(request=self._request): 

989 async for raw_stream_bytes in self.stream: 

990 self._num_bytes_downloaded += len(raw_stream_bytes) 

991 for chunk in chunker.decode(raw_stream_bytes): 

992 yield chunk 

993 

994 for chunk in chunker.flush(): 

995 yield chunk 

996 

997 await self.aclose() 

998 

999 async def aclose(self) -> None: 

1000 """ 

1001 Close the response and release the connection. 

1002 Automatically called if the response body is read to completion. 

1003 """ 

1004 if not isinstance(self.stream, AsyncByteStream): 

1005 raise RuntimeError("Attempted to call an async close on an sync stream.") 

1006 

1007 if not self.is_closed: 

1008 self.is_closed = True 

1009 with request_context(request=self._request): 

1010 await self.stream.aclose() 

1011 

1012 

1013class Cookies(typing.MutableMapping[str, str]): 

1014 """ 

1015 HTTP Cookies, as a mutable mapping. 

1016 """ 

1017 

1018 def __init__(self, cookies: CookieTypes | None = None) -> None: 

1019 if cookies is None or isinstance(cookies, dict): 

1020 self.jar = CookieJar() 

1021 if isinstance(cookies, dict): 

1022 for key, value in cookies.items(): 

1023 self.set(key, value) 

1024 elif isinstance(cookies, list): 

1025 self.jar = CookieJar() 

1026 for key, value in cookies: 

1027 self.set(key, value) 

1028 elif isinstance(cookies, Cookies): 

1029 self.jar = CookieJar() 

1030 for cookie in cookies.jar: 

1031 self.jar.set_cookie(cookie) 

1032 else: 

1033 self.jar = cookies 

1034 

1035 def extract_cookies(self, response: Response) -> None: 

1036 """ 

1037 Loads any cookies based on the response `Set-Cookie` headers. 

1038 """ 

1039 urllib_response = self._CookieCompatResponse(response) 

1040 urllib_request = self._CookieCompatRequest(response.request) 

1041 

1042 self.jar.extract_cookies(urllib_response, urllib_request) # type: ignore 

1043 

1044 def set_cookie_header(self, request: Request) -> None: 

1045 """ 

1046 Sets an appropriate 'Cookie:' HTTP header on the `Request`. 

1047 """ 

1048 urllib_request = self._CookieCompatRequest(request) 

1049 self.jar.add_cookie_header(urllib_request) 

1050 

1051 def set(self, name: str, value: str, domain: str = "", path: str = "/") -> None: 

1052 """ 

1053 Set a cookie value by name. May optionally include domain and path. 

1054 """ 

1055 kwargs = { 

1056 "version": 0, 

1057 "name": name, 

1058 "value": value, 

1059 "port": None, 

1060 "port_specified": False, 

1061 "domain": domain, 

1062 "domain_specified": bool(domain), 

1063 "domain_initial_dot": domain.startswith("."), 

1064 "path": path, 

1065 "path_specified": bool(path), 

1066 "secure": False, 

1067 "expires": None, 

1068 "discard": True, 

1069 "comment": None, 

1070 "comment_url": None, 

1071 "rest": {"HttpOnly": None}, 

1072 "rfc2109": False, 

1073 } 

1074 cookie = Cookie(**kwargs) # type: ignore 

1075 self.jar.set_cookie(cookie) 

1076 

1077 def get( # type: ignore 

1078 self, 

1079 name: str, 

1080 default: str | None = None, 

1081 domain: str | None = None, 

1082 path: str | None = None, 

1083 ) -> str | None: 

1084 """ 

1085 Get a cookie by name. May optionally include domain and path 

1086 in order to specify exactly which cookie to retrieve. 

1087 """ 

1088 value = None 

1089 for cookie in self.jar: 

1090 if cookie.name == name: 

1091 if domain is None or cookie.domain == domain: 

1092 if path is None or cookie.path == path: 

1093 if value is not None: 

1094 message = f"Multiple cookies exist with name={name}" 

1095 raise CookieConflict(message) 

1096 value = cookie.value 

1097 

1098 if value is None: 

1099 return default 

1100 return value 

1101 

1102 def delete( 

1103 self, 

1104 name: str, 

1105 domain: str | None = None, 

1106 path: str | None = None, 

1107 ) -> None: 

1108 """ 

1109 Delete a cookie by name. May optionally include domain and path 

1110 in order to specify exactly which cookie to delete. 

1111 """ 

1112 if domain is not None and path is not None: 

1113 return self.jar.clear(domain, path, name) 

1114 

1115 remove = [ 

1116 cookie 

1117 for cookie in self.jar 

1118 if cookie.name == name 

1119 and (domain is None or cookie.domain == domain) 

1120 and (path is None or cookie.path == path) 

1121 ] 

1122 

1123 for cookie in remove: 

1124 self.jar.clear(cookie.domain, cookie.path, cookie.name) 

1125 

1126 def clear(self, domain: str | None = None, path: str | None = None) -> None: 

1127 """ 

1128 Delete all cookies. Optionally include a domain and path in 

1129 order to only delete a subset of all the cookies. 

1130 """ 

1131 args = [] 

1132 if domain is not None: 

1133 args.append(domain) 

1134 if path is not None: 

1135 assert domain is not None 

1136 args.append(path) 

1137 self.jar.clear(*args) 

1138 

1139 def update(self, cookies: CookieTypes | None = None) -> None: # type: ignore 

1140 cookies = Cookies(cookies) 

1141 for cookie in cookies.jar: 

1142 self.jar.set_cookie(cookie) 

1143 

1144 def __setitem__(self, name: str, value: str) -> None: 

1145 return self.set(name, value) 

1146 

1147 def __getitem__(self, name: str) -> str: 

1148 value = self.get(name) 

1149 if value is None: 

1150 raise KeyError(name) 

1151 return value 

1152 

1153 def __delitem__(self, name: str) -> None: 

1154 return self.delete(name) 

1155 

1156 def __len__(self) -> int: 

1157 return len(self.jar) 

1158 

1159 def __iter__(self) -> typing.Iterator[str]: 

1160 return (cookie.name for cookie in self.jar) 

1161 

1162 def __bool__(self) -> bool: 

1163 for _ in self.jar: 

1164 return True 

1165 return False 

1166 

1167 def __repr__(self) -> str: 

1168 cookies_repr = ", ".join( 

1169 [ 

1170 f"<Cookie {cookie.name}={cookie.value} for {cookie.domain} />" 

1171 for cookie in self.jar 

1172 ] 

1173 ) 

1174 

1175 return f"<Cookies[{cookies_repr}]>" 

1176 

1177 class _CookieCompatRequest(urllib.request.Request): 

1178 """ 

1179 Wraps a `Request` instance up in a compatibility interface suitable 

1180 for use with `CookieJar` operations. 

1181 """ 

1182 

1183 def __init__(self, request: Request) -> None: 

1184 super().__init__( 

1185 url=str(request.url), 

1186 headers=dict(request.headers), 

1187 method=request.method, 

1188 ) 

1189 self.request = request 

1190 

1191 def add_unredirected_header(self, key: str, value: str) -> None: 

1192 super().add_unredirected_header(key, value) 

1193 self.request.headers[key] = value 

1194 

1195 class _CookieCompatResponse: 

1196 """ 

1197 Wraps a `Request` instance up in a compatibility interface suitable 

1198 for use with `CookieJar` operations. 

1199 """ 

1200 

1201 def __init__(self, response: Response) -> None: 

1202 self.response = response 

1203 

1204 def info(self) -> email.message.Message: 

1205 info = email.message.Message() 

1206 for key, value in self.response.headers.multi_items(): 

1207 # Note that setting `info[key]` here is an "append" operation, 

1208 # not a "replace" operation. 

1209 # https://docs.python.org/3/library/email.compat32-message.html#email.message.Message.__setitem__ 

1210 info[key] = value 

1211 return info