Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_models.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

596 statements  

1from __future__ import annotations 

2 

3import datetime 

4import email.message 

5import json as jsonlib 

6import typing 

7import urllib.request 

8from collections.abc import Mapping 

9from http.cookiejar import Cookie, CookieJar 

10 

11from ._content import ByteStream, UnattachedStream, encode_request, encode_response 

12from ._decoders import ( 

13 SUPPORTED_DECODERS, 

14 ByteChunker, 

15 ContentDecoder, 

16 IdentityDecoder, 

17 LineDecoder, 

18 MultiDecoder, 

19 TextChunker, 

20 TextDecoder, 

21) 

22from ._exceptions import ( 

23 CookieConflict, 

24 HTTPStatusError, 

25 RequestNotRead, 

26 ResponseNotRead, 

27 StreamClosed, 

28 StreamConsumed, 

29 request_context, 

30) 

31from ._multipart import get_multipart_boundary_from_content_type 

32from ._status_codes import codes 

33from ._types import ( 

34 AsyncByteStream, 

35 CookieTypes, 

36 HeaderTypes, 

37 QueryParamTypes, 

38 RequestContent, 

39 RequestData, 

40 RequestExtensions, 

41 RequestFiles, 

42 ResponseContent, 

43 ResponseExtensions, 

44 SyncByteStream, 

45) 

46from ._urls import URL 

47from ._utils import ( 

48 is_known_encoding, 

49 normalize_header_key, 

50 normalize_header_value, 

51 obfuscate_sensitive_headers, 

52 parse_content_type_charset, 

53 parse_header_links, 

54) 

55 

56 

57class Headers(typing.MutableMapping[str, str]): 

58 """ 

59 HTTP headers, as a case-insensitive multi-dict. 

60 """ 

61 

62 def __init__( 

63 self, 

64 headers: HeaderTypes | None = None, 

65 encoding: str | None = None, 

66 ) -> None: 

67 if headers is None: 

68 self._list = [] # type: typing.List[typing.Tuple[bytes, bytes, bytes]] 

69 elif isinstance(headers, Headers): 

70 self._list = list(headers._list) 

71 elif isinstance(headers, Mapping): 

72 self._list = [ 

73 ( 

74 normalize_header_key(k, lower=False, encoding=encoding), 

75 normalize_header_key(k, lower=True, encoding=encoding), 

76 normalize_header_value(v, encoding), 

77 ) 

78 for k, v in headers.items() 

79 ] 

80 else: 

81 self._list = [ 

82 ( 

83 normalize_header_key(k, lower=False, encoding=encoding), 

84 normalize_header_key(k, lower=True, encoding=encoding), 

85 normalize_header_value(v, encoding), 

86 ) 

87 for k, v in headers 

88 ] 

89 

90 self._encoding = encoding 

91 

92 @property 

93 def encoding(self) -> str: 

94 """ 

95 Header encoding is mandated as ascii, but we allow fallbacks to utf-8 

96 or iso-8859-1. 

97 """ 

98 if self._encoding is None: 

99 for encoding in ["ascii", "utf-8"]: 

100 for key, value in self.raw: 

101 try: 

102 key.decode(encoding) 

103 value.decode(encoding) 

104 except UnicodeDecodeError: 

105 break 

106 else: 

107 # The else block runs if 'break' did not occur, meaning 

108 # all values fitted the encoding. 

109 self._encoding = encoding 

110 break 

111 else: 

112 # The ISO-8859-1 encoding covers all 256 code points in a byte, 

113 # so will never raise decode errors. 

114 self._encoding = "iso-8859-1" 

115 return self._encoding 

116 

117 @encoding.setter 

118 def encoding(self, value: str) -> None: 

119 self._encoding = value 

120 

121 @property 

122 def raw(self) -> list[tuple[bytes, bytes]]: 

123 """ 

124 Returns a list of the raw header items, as byte pairs. 

125 """ 

126 return [(raw_key, value) for raw_key, _, value in self._list] 

127 

128 def keys(self) -> typing.KeysView[str]: 

129 return {key.decode(self.encoding): None for _, key, value in self._list}.keys() 

130 

131 def values(self) -> typing.ValuesView[str]: 

132 values_dict: dict[str, str] = {} 

133 for _, key, value in self._list: 

134 str_key = key.decode(self.encoding) 

135 str_value = value.decode(self.encoding) 

136 if str_key in values_dict: 

137 values_dict[str_key] += f", {str_value}" 

138 else: 

139 values_dict[str_key] = str_value 

140 return values_dict.values() 

141 

142 def items(self) -> typing.ItemsView[str, str]: 

143 """ 

144 Return `(key, value)` items of headers. Concatenate headers 

145 into a single comma separated value when a key occurs multiple times. 

146 """ 

147 values_dict: dict[str, str] = {} 

148 for _, key, value in self._list: 

149 str_key = key.decode(self.encoding) 

150 str_value = value.decode(self.encoding) 

151 if str_key in values_dict: 

152 values_dict[str_key] += f", {str_value}" 

153 else: 

154 values_dict[str_key] = str_value 

155 return values_dict.items() 

156 

157 def multi_items(self) -> list[tuple[str, str]]: 

158 """ 

159 Return a list of `(key, value)` pairs of headers. Allow multiple 

160 occurrences of the same key without concatenating into a single 

161 comma separated value. 

162 """ 

163 return [ 

164 (key.decode(self.encoding), value.decode(self.encoding)) 

165 for _, key, value in self._list 

166 ] 

167 

168 def get(self, key: str, default: typing.Any = None) -> typing.Any: 

169 """ 

170 Return a header value. If multiple occurrences of the header occur 

171 then concatenate them together with commas. 

172 """ 

173 try: 

174 return self[key] 

175 except KeyError: 

176 return default 

177 

178 def get_list(self, key: str, split_commas: bool = False) -> list[str]: 

179 """ 

180 Return a list of all header values for a given key. 

181 If `split_commas=True` is passed, then any comma separated header 

182 values are split into multiple return strings. 

183 """ 

184 get_header_key = key.lower().encode(self.encoding) 

185 

186 values = [ 

187 item_value.decode(self.encoding) 

188 for _, item_key, item_value in self._list 

189 if item_key.lower() == get_header_key 

190 ] 

191 

192 if not split_commas: 

193 return values 

194 

195 split_values = [] 

196 for value in values: 

197 split_values.extend([item.strip() for item in value.split(",")]) 

198 return split_values 

199 

200 def update(self, headers: HeaderTypes | None = None) -> None: # type: ignore 

201 headers = Headers(headers) 

202 for key in headers.keys(): 

203 if key in self: 

204 self.pop(key) 

205 self._list.extend(headers._list) 

206 

207 def copy(self) -> Headers: 

208 return Headers(self, encoding=self.encoding) 

209 

210 def __getitem__(self, key: str) -> str: 

211 """ 

212 Return a single header value. 

213 

214 If there are multiple headers with the same key, then we concatenate 

215 them with commas. See: https://tools.ietf.org/html/rfc7230#section-3.2.2 

216 """ 

217 normalized_key = key.lower().encode(self.encoding) 

218 

219 items = [ 

220 header_value.decode(self.encoding) 

221 for _, header_key, header_value in self._list 

222 if header_key == normalized_key 

223 ] 

224 

225 if items: 

226 return ", ".join(items) 

227 

228 raise KeyError(key) 

229 

230 def __setitem__(self, key: str, value: str) -> None: 

231 """ 

232 Set the header `key` to `value`, removing any duplicate entries. 

233 Retains insertion order. 

234 """ 

235 set_key = key.encode(self._encoding or "utf-8") 

236 set_value = value.encode(self._encoding or "utf-8") 

237 lookup_key = set_key.lower() 

238 

239 found_indexes = [ 

240 idx 

241 for idx, (_, item_key, _) in enumerate(self._list) 

242 if item_key == lookup_key 

243 ] 

244 

245 for idx in reversed(found_indexes[1:]): 

246 del self._list[idx] 

247 

248 if found_indexes: 

249 idx = found_indexes[0] 

250 self._list[idx] = (set_key, lookup_key, set_value) 

251 else: 

252 self._list.append((set_key, lookup_key, set_value)) 

253 

254 def __delitem__(self, key: str) -> None: 

255 """ 

256 Remove the header `key`. 

257 """ 

258 del_key = key.lower().encode(self.encoding) 

259 

260 pop_indexes = [ 

261 idx 

262 for idx, (_, item_key, _) in enumerate(self._list) 

263 if item_key.lower() == del_key 

264 ] 

265 

266 if not pop_indexes: 

267 raise KeyError(key) 

268 

269 for idx in reversed(pop_indexes): 

270 del self._list[idx] 

271 

272 def __contains__(self, key: typing.Any) -> bool: 

273 header_key = key.lower().encode(self.encoding) 

274 return header_key in [key for _, key, _ in self._list] 

275 

276 def __iter__(self) -> typing.Iterator[typing.Any]: 

277 return iter(self.keys()) 

278 

279 def __len__(self) -> int: 

280 return len(self._list) 

281 

282 def __eq__(self, other: typing.Any) -> bool: 

283 try: 

284 other_headers = Headers(other) 

285 except ValueError: 

286 return False 

287 

288 self_list = [(key, value) for _, key, value in self._list] 

289 other_list = [(key, value) for _, key, value in other_headers._list] 

290 return sorted(self_list) == sorted(other_list) 

291 

292 def __repr__(self) -> str: 

293 class_name = self.__class__.__name__ 

294 

295 encoding_str = "" 

296 if self.encoding != "ascii": 

297 encoding_str = f", encoding={self.encoding!r}" 

298 

299 as_list = list(obfuscate_sensitive_headers(self.multi_items())) 

300 as_dict = dict(as_list) 

301 

302 no_duplicate_keys = len(as_dict) == len(as_list) 

303 if no_duplicate_keys: 

304 return f"{class_name}({as_dict!r}{encoding_str})" 

305 return f"{class_name}({as_list!r}{encoding_str})" 

306 

307 

308class Request: 

309 def __init__( 

310 self, 

311 method: str | bytes, 

312 url: URL | str, 

313 *, 

314 params: QueryParamTypes | None = None, 

315 headers: HeaderTypes | None = None, 

316 cookies: CookieTypes | None = None, 

317 content: RequestContent | None = None, 

318 data: RequestData | None = None, 

319 files: RequestFiles | None = None, 

320 json: typing.Any | None = None, 

321 stream: SyncByteStream | AsyncByteStream | None = None, 

322 extensions: RequestExtensions | None = None, 

323 ) -> None: 

324 self.method = ( 

325 method.decode("ascii").upper() 

326 if isinstance(method, bytes) 

327 else method.upper() 

328 ) 

329 self.url = URL(url) 

330 if params is not None: 

331 self.url = self.url.copy_merge_params(params=params) 

332 self.headers = Headers(headers) 

333 self.extensions = {} if extensions is None else extensions 

334 

335 if cookies: 

336 Cookies(cookies).set_cookie_header(self) 

337 

338 if stream is None: 

339 content_type: str | None = self.headers.get("content-type") 

340 headers, stream = encode_request( 

341 content=content, 

342 data=data, 

343 files=files, 

344 json=json, 

345 boundary=get_multipart_boundary_from_content_type( 

346 content_type=content_type.encode(self.headers.encoding) 

347 if content_type 

348 else None 

349 ), 

350 ) 

351 self._prepare(headers) 

352 self.stream = stream 

353 # Load the request body, except for streaming content. 

354 if isinstance(stream, ByteStream): 

355 self.read() 

356 else: 

357 # There's an important distinction between `Request(content=...)`, 

358 # and `Request(stream=...)`. 

359 # 

360 # Using `content=...` implies automatically populated `Host` and content 

361 # headers, of either `Content-Length: ...` or `Transfer-Encoding: chunked`. 

362 # 

363 # Using `stream=...` will not automatically include *any* 

364 # auto-populated headers. 

365 # 

366 # As an end-user you don't really need `stream=...`. It's only 

367 # useful when: 

368 # 

369 # * Preserving the request stream when copying requests, eg for redirects. 

370 # * Creating request instances on the *server-side* of the transport API. 

371 self.stream = stream 

372 

373 def _prepare(self, default_headers: dict[str, str]) -> None: 

374 for key, value in default_headers.items(): 

375 # Ignore Transfer-Encoding if the Content-Length has been set explicitly. 

376 if key.lower() == "transfer-encoding" and "Content-Length" in self.headers: 

377 continue 

378 self.headers.setdefault(key, value) 

379 

380 auto_headers: list[tuple[bytes, bytes]] = [] 

381 

382 has_host = "Host" in self.headers 

383 has_content_length = ( 

384 "Content-Length" in self.headers or "Transfer-Encoding" in self.headers 

385 ) 

386 

387 if not has_host and self.url.host: 

388 auto_headers.append((b"Host", self.url.netloc)) 

389 if not has_content_length and self.method in ("POST", "PUT", "PATCH"): 

390 auto_headers.append((b"Content-Length", b"0")) 

391 

392 self.headers = Headers(auto_headers + self.headers.raw) 

393 

394 @property 

395 def content(self) -> bytes: 

396 if not hasattr(self, "_content"): 

397 raise RequestNotRead() 

398 return self._content 

399 

400 def read(self) -> bytes: 

401 """ 

402 Read and return the request content. 

403 """ 

404 if not hasattr(self, "_content"): 

405 assert isinstance(self.stream, typing.Iterable) 

406 self._content = b"".join(self.stream) 

407 if not isinstance(self.stream, ByteStream): 

408 # If a streaming request has been read entirely into memory, then 

409 # we can replace the stream with a raw bytes implementation, 

410 # to ensure that any non-replayable streams can still be used. 

411 self.stream = ByteStream(self._content) 

412 return self._content 

413 

414 async def aread(self) -> bytes: 

415 """ 

416 Read and return the request content. 

417 """ 

418 if not hasattr(self, "_content"): 

419 assert isinstance(self.stream, typing.AsyncIterable) 

420 self._content = b"".join([part async for part in self.stream]) 

421 if not isinstance(self.stream, ByteStream): 

422 # If a streaming request has been read entirely into memory, then 

423 # we can replace the stream with a raw bytes implementation, 

424 # to ensure that any non-replayable streams can still be used. 

425 self.stream = ByteStream(self._content) 

426 return self._content 

427 

428 def __repr__(self) -> str: 

429 class_name = self.__class__.__name__ 

430 url = str(self.url) 

431 return f"<{class_name}({self.method!r}, {url!r})>" 

432 

433 def __getstate__(self) -> dict[str, typing.Any]: 

434 return { 

435 name: value 

436 for name, value in self.__dict__.items() 

437 if name not in ["extensions", "stream"] 

438 } 

439 

440 def __setstate__(self, state: dict[str, typing.Any]) -> None: 

441 for name, value in state.items(): 

442 setattr(self, name, value) 

443 self.extensions = {} 

444 self.stream = UnattachedStream() 

445 

446 

447class Response: 

448 def __init__( 

449 self, 

450 status_code: int, 

451 *, 

452 headers: HeaderTypes | None = None, 

453 content: ResponseContent | None = None, 

454 text: str | None = None, 

455 html: str | None = None, 

456 json: typing.Any = None, 

457 stream: SyncByteStream | AsyncByteStream | None = None, 

458 request: Request | None = None, 

459 extensions: ResponseExtensions | None = None, 

460 history: list[Response] | None = None, 

461 default_encoding: str | typing.Callable[[bytes], str] = "utf-8", 

462 ) -> None: 

463 self.status_code = status_code 

464 self.headers = Headers(headers) 

465 

466 self._request: Request | None = request 

467 

468 # When follow_redirects=False and a redirect is received, 

469 # the client will set `response.next_request`. 

470 self.next_request: Request | None = None 

471 

472 self.extensions: ResponseExtensions = {} if extensions is None else extensions 

473 self.history = [] if history is None else list(history) 

474 

475 self.is_closed = False 

476 self.is_stream_consumed = False 

477 

478 self.default_encoding = default_encoding 

479 

480 if stream is None: 

481 headers, stream = encode_response(content, text, html, json) 

482 self._prepare(headers) 

483 self.stream = stream 

484 if isinstance(stream, ByteStream): 

485 # Load the response body, except for streaming content. 

486 self.read() 

487 else: 

488 # There's an important distinction between `Response(content=...)`, 

489 # and `Response(stream=...)`. 

490 # 

491 # Using `content=...` implies automatically populated content headers, 

492 # of either `Content-Length: ...` or `Transfer-Encoding: chunked`. 

493 # 

494 # Using `stream=...` will not automatically include any content headers. 

495 # 

496 # As an end-user you don't really need `stream=...`. It's only 

497 # useful when creating response instances having received a stream 

498 # from the transport API. 

499 self.stream = stream 

500 

501 self._num_bytes_downloaded = 0 

502 

503 def _prepare(self, default_headers: dict[str, str]) -> None: 

504 for key, value in default_headers.items(): 

505 # Ignore Transfer-Encoding if the Content-Length has been set explicitly. 

506 if key.lower() == "transfer-encoding" and "content-length" in self.headers: 

507 continue 

508 self.headers.setdefault(key, value) 

509 

510 @property 

511 def elapsed(self) -> datetime.timedelta: 

512 """ 

513 Returns the time taken for the complete request/response 

514 cycle to complete. 

515 """ 

516 if not hasattr(self, "_elapsed"): 

517 raise RuntimeError( 

518 "'.elapsed' may only be accessed after the response " 

519 "has been read or closed." 

520 ) 

521 return self._elapsed 

522 

523 @elapsed.setter 

524 def elapsed(self, elapsed: datetime.timedelta) -> None: 

525 self._elapsed = elapsed 

526 

527 @property 

528 def request(self) -> Request: 

529 """ 

530 Returns the request instance associated to the current response. 

531 """ 

532 if self._request is None: 

533 raise RuntimeError( 

534 "The request instance has not been set on this response." 

535 ) 

536 return self._request 

537 

538 @request.setter 

539 def request(self, value: Request) -> None: 

540 self._request = value 

541 

542 @property 

543 def http_version(self) -> str: 

544 try: 

545 http_version: bytes = self.extensions["http_version"] 

546 except KeyError: 

547 return "HTTP/1.1" 

548 else: 

549 return http_version.decode("ascii", errors="ignore") 

550 

551 @property 

552 def reason_phrase(self) -> str: 

553 try: 

554 reason_phrase: bytes = self.extensions["reason_phrase"] 

555 except KeyError: 

556 return codes.get_reason_phrase(self.status_code) 

557 else: 

558 return reason_phrase.decode("ascii", errors="ignore") 

559 

560 @property 

561 def url(self) -> URL: 

562 """ 

563 Returns the URL for which the request was made. 

564 """ 

565 return self.request.url 

566 

567 @property 

568 def content(self) -> bytes: 

569 if not hasattr(self, "_content"): 

570 raise ResponseNotRead() 

571 return self._content 

572 

573 @property 

574 def text(self) -> str: 

575 if not hasattr(self, "_text"): 

576 content = self.content 

577 if not content: 

578 self._text = "" 

579 else: 

580 decoder = TextDecoder(encoding=self.encoding or "utf-8") 

581 self._text = "".join([decoder.decode(self.content), decoder.flush()]) 

582 return self._text 

583 

584 @property 

585 def encoding(self) -> str | None: 

586 """ 

587 Return an encoding to use for decoding the byte content into text. 

588 The priority for determining this is given by... 

589 

590 * `.encoding = <>` has been set explicitly. 

591 * The encoding as specified by the charset parameter in the Content-Type header. 

592 * The encoding as determined by `default_encoding`, which may either be 

593 a string like "utf-8" indicating the encoding to use, or may be a callable 

594 which enables charset autodetection. 

595 """ 

596 if not hasattr(self, "_encoding"): 

597 encoding = self.charset_encoding 

598 if encoding is None or not is_known_encoding(encoding): 

599 if isinstance(self.default_encoding, str): 

600 encoding = self.default_encoding 

601 elif hasattr(self, "_content"): 

602 encoding = self.default_encoding(self._content) 

603 self._encoding = encoding or "utf-8" 

604 return self._encoding 

605 

606 @encoding.setter 

607 def encoding(self, value: str) -> None: 

608 """ 

609 Set the encoding to use for decoding the byte content into text. 

610 

611 If the `text` attribute has been accessed, attempting to set the 

612 encoding will throw a ValueError. 

613 """ 

614 if hasattr(self, "_text"): 

615 raise ValueError( 

616 "Setting encoding after `text` has been accessed is not allowed." 

617 ) 

618 self._encoding = value 

619 

620 @property 

621 def charset_encoding(self) -> str | None: 

622 """ 

623 Return the encoding, as specified by the Content-Type header. 

624 """ 

625 content_type = self.headers.get("Content-Type") 

626 if content_type is None: 

627 return None 

628 

629 return parse_content_type_charset(content_type) 

630 

631 def _get_content_decoder(self) -> ContentDecoder: 

632 """ 

633 Returns a decoder instance which can be used to decode the raw byte 

634 content, depending on the Content-Encoding used in the response. 

635 """ 

636 if not hasattr(self, "_decoder"): 

637 decoders: list[ContentDecoder] = [] 

638 values = self.headers.get_list("content-encoding", split_commas=True) 

639 for value in values: 

640 value = value.strip().lower() 

641 try: 

642 decoder_cls = SUPPORTED_DECODERS[value] 

643 decoders.append(decoder_cls()) 

644 except KeyError: 

645 continue 

646 

647 if len(decoders) == 1: 

648 self._decoder = decoders[0] 

649 elif len(decoders) > 1: 

650 self._decoder = MultiDecoder(children=decoders) 

651 else: 

652 self._decoder = IdentityDecoder() 

653 

654 return self._decoder 

655 

656 @property 

657 def is_informational(self) -> bool: 

658 """ 

659 A property which is `True` for 1xx status codes, `False` otherwise. 

660 """ 

661 return codes.is_informational(self.status_code) 

662 

663 @property 

664 def is_success(self) -> bool: 

665 """ 

666 A property which is `True` for 2xx status codes, `False` otherwise. 

667 """ 

668 return codes.is_success(self.status_code) 

669 

670 @property 

671 def is_redirect(self) -> bool: 

672 """ 

673 A property which is `True` for 3xx status codes, `False` otherwise. 

674 

675 Note that not all responses with a 3xx status code indicate a URL redirect. 

676 

677 Use `response.has_redirect_location` to determine responses with a properly 

678 formed URL redirection. 

679 """ 

680 return codes.is_redirect(self.status_code) 

681 

682 @property 

683 def is_client_error(self) -> bool: 

684 """ 

685 A property which is `True` for 4xx status codes, `False` otherwise. 

686 """ 

687 return codes.is_client_error(self.status_code) 

688 

689 @property 

690 def is_server_error(self) -> bool: 

691 """ 

692 A property which is `True` for 5xx status codes, `False` otherwise. 

693 """ 

694 return codes.is_server_error(self.status_code) 

695 

696 @property 

697 def is_error(self) -> bool: 

698 """ 

699 A property which is `True` for 4xx and 5xx status codes, `False` otherwise. 

700 """ 

701 return codes.is_error(self.status_code) 

702 

703 @property 

704 def has_redirect_location(self) -> bool: 

705 """ 

706 Returns True for 3xx responses with a properly formed URL redirection, 

707 `False` otherwise. 

708 """ 

709 return ( 

710 self.status_code 

711 in ( 

712 # 301 (Cacheable redirect. Method may change to GET.) 

713 codes.MOVED_PERMANENTLY, 

714 # 302 (Uncacheable redirect. Method may change to GET.) 

715 codes.FOUND, 

716 # 303 (Client should make a GET or HEAD request.) 

717 codes.SEE_OTHER, 

718 # 307 (Equiv. 302, but retain method) 

719 codes.TEMPORARY_REDIRECT, 

720 # 308 (Equiv. 301, but retain method) 

721 codes.PERMANENT_REDIRECT, 

722 ) 

723 and "Location" in self.headers 

724 ) 

725 

726 def raise_for_status(self) -> Response: 

727 """ 

728 Raise the `HTTPStatusError` if one occurred. 

729 """ 

730 request = self._request 

731 if request is None: 

732 raise RuntimeError( 

733 "Cannot call `raise_for_status` as the request " 

734 "instance has not been set on this response." 

735 ) 

736 

737 if self.is_success: 

738 return self 

739 

740 if self.has_redirect_location: 

741 message = ( 

742 "{error_type} '{0.status_code} {0.reason_phrase}' for url '{0.url}'\n" 

743 "Redirect location: '{0.headers[location]}'\n" 

744 "For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/{0.status_code}" 

745 ) 

746 else: 

747 message = ( 

748 "{error_type} '{0.status_code} {0.reason_phrase}' for url '{0.url}'\n" 

749 "For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/{0.status_code}" 

750 ) 

751 

752 status_class = self.status_code // 100 

753 error_types = { 

754 1: "Informational response", 

755 3: "Redirect response", 

756 4: "Client error", 

757 5: "Server error", 

758 } 

759 error_type = error_types.get(status_class, "Invalid status code") 

760 message = message.format(self, error_type=error_type) 

761 raise HTTPStatusError(message, request=request, response=self) 

762 

763 def json(self, **kwargs: typing.Any) -> typing.Any: 

764 return jsonlib.loads(self.content, **kwargs) 

765 

766 @property 

767 def cookies(self) -> Cookies: 

768 if not hasattr(self, "_cookies"): 

769 self._cookies = Cookies() 

770 self._cookies.extract_cookies(self) 

771 return self._cookies 

772 

773 @property 

774 def links(self) -> dict[str | None, dict[str, str]]: 

775 """ 

776 Returns the parsed header links of the response, if any 

777 """ 

778 header = self.headers.get("link") 

779 if header is None: 

780 return {} 

781 

782 return { 

783 (link.get("rel") or link.get("url")): link 

784 for link in parse_header_links(header) 

785 } 

786 

787 @property 

788 def num_bytes_downloaded(self) -> int: 

789 return self._num_bytes_downloaded 

790 

791 def __repr__(self) -> str: 

792 return f"<Response [{self.status_code} {self.reason_phrase}]>" 

793 

794 def __getstate__(self) -> dict[str, typing.Any]: 

795 return { 

796 name: value 

797 for name, value in self.__dict__.items() 

798 if name not in ["extensions", "stream", "is_closed", "_decoder"] 

799 } 

800 

801 def __setstate__(self, state: dict[str, typing.Any]) -> None: 

802 for name, value in state.items(): 

803 setattr(self, name, value) 

804 self.is_closed = True 

805 self.extensions = {} 

806 self.stream = UnattachedStream() 

807 

808 def read(self) -> bytes: 

809 """ 

810 Read and return the response content. 

811 """ 

812 if not hasattr(self, "_content"): 

813 self._content = b"".join(self.iter_bytes()) 

814 return self._content 

815 

816 def iter_bytes(self, chunk_size: int | None = None) -> typing.Iterator[bytes]: 

817 """ 

818 A byte-iterator over the decoded response content. 

819 This allows us to handle gzip, deflate, and brotli encoded responses. 

820 """ 

821 if hasattr(self, "_content"): 

822 chunk_size = len(self._content) if chunk_size is None else chunk_size 

823 for i in range(0, len(self._content), max(chunk_size, 1)): 

824 yield self._content[i : i + chunk_size] 

825 else: 

826 decoder = self._get_content_decoder() 

827 chunker = ByteChunker(chunk_size=chunk_size) 

828 with request_context(request=self._request): 

829 for raw_bytes in self.iter_raw(): 

830 decoded = decoder.decode(raw_bytes) 

831 for chunk in chunker.decode(decoded): 

832 yield chunk 

833 decoded = decoder.flush() 

834 for chunk in chunker.decode(decoded): 

835 yield chunk # pragma: no cover 

836 for chunk in chunker.flush(): 

837 yield chunk 

838 

839 def iter_text(self, chunk_size: int | None = None) -> typing.Iterator[str]: 

840 """ 

841 A str-iterator over the decoded response content 

842 that handles both gzip, deflate, etc but also detects the content's 

843 string encoding. 

844 """ 

845 decoder = TextDecoder(encoding=self.encoding or "utf-8") 

846 chunker = TextChunker(chunk_size=chunk_size) 

847 with request_context(request=self._request): 

848 for byte_content in self.iter_bytes(): 

849 text_content = decoder.decode(byte_content) 

850 for chunk in chunker.decode(text_content): 

851 yield chunk 

852 text_content = decoder.flush() 

853 for chunk in chunker.decode(text_content): 

854 yield chunk # pragma: no cover 

855 for chunk in chunker.flush(): 

856 yield chunk 

857 

858 def iter_lines(self) -> typing.Iterator[str]: 

859 decoder = LineDecoder() 

860 with request_context(request=self._request): 

861 for text in self.iter_text(): 

862 for line in decoder.decode(text): 

863 yield line 

864 for line in decoder.flush(): 

865 yield line 

866 

867 def iter_raw(self, chunk_size: int | None = None) -> typing.Iterator[bytes]: 

868 """ 

869 A byte-iterator over the raw response content. 

870 """ 

871 if self.is_stream_consumed: 

872 raise StreamConsumed() 

873 if self.is_closed: 

874 raise StreamClosed() 

875 if not isinstance(self.stream, SyncByteStream): 

876 raise RuntimeError("Attempted to call a sync iterator on an async stream.") 

877 

878 self.is_stream_consumed = True 

879 self._num_bytes_downloaded = 0 

880 chunker = ByteChunker(chunk_size=chunk_size) 

881 

882 with request_context(request=self._request): 

883 for raw_stream_bytes in self.stream: 

884 self._num_bytes_downloaded += len(raw_stream_bytes) 

885 for chunk in chunker.decode(raw_stream_bytes): 

886 yield chunk 

887 

888 for chunk in chunker.flush(): 

889 yield chunk 

890 

891 self.close() 

892 

893 def close(self) -> None: 

894 """ 

895 Close the response and release the connection. 

896 Automatically called if the response body is read to completion. 

897 """ 

898 if not isinstance(self.stream, SyncByteStream): 

899 raise RuntimeError("Attempted to call an sync close on an async stream.") 

900 

901 if not self.is_closed: 

902 self.is_closed = True 

903 with request_context(request=self._request): 

904 self.stream.close() 

905 

906 async def aread(self) -> bytes: 

907 """ 

908 Read and return the response content. 

909 """ 

910 if not hasattr(self, "_content"): 

911 self._content = b"".join([part async for part in self.aiter_bytes()]) 

912 return self._content 

913 

914 async def aiter_bytes( 

915 self, chunk_size: int | None = None 

916 ) -> typing.AsyncIterator[bytes]: 

917 """ 

918 A byte-iterator over the decoded response content. 

919 This allows us to handle gzip, deflate, and brotli encoded responses. 

920 """ 

921 if hasattr(self, "_content"): 

922 chunk_size = len(self._content) if chunk_size is None else chunk_size 

923 for i in range(0, len(self._content), max(chunk_size, 1)): 

924 yield self._content[i : i + chunk_size] 

925 else: 

926 decoder = self._get_content_decoder() 

927 chunker = ByteChunker(chunk_size=chunk_size) 

928 with request_context(request=self._request): 

929 async for raw_bytes in self.aiter_raw(): 

930 decoded = decoder.decode(raw_bytes) 

931 for chunk in chunker.decode(decoded): 

932 yield chunk 

933 decoded = decoder.flush() 

934 for chunk in chunker.decode(decoded): 

935 yield chunk # pragma: no cover 

936 for chunk in chunker.flush(): 

937 yield chunk 

938 

939 async def aiter_text( 

940 self, chunk_size: int | None = None 

941 ) -> typing.AsyncIterator[str]: 

942 """ 

943 A str-iterator over the decoded response content 

944 that handles both gzip, deflate, etc but also detects the content's 

945 string encoding. 

946 """ 

947 decoder = TextDecoder(encoding=self.encoding or "utf-8") 

948 chunker = TextChunker(chunk_size=chunk_size) 

949 with request_context(request=self._request): 

950 async for byte_content in self.aiter_bytes(): 

951 text_content = decoder.decode(byte_content) 

952 for chunk in chunker.decode(text_content): 

953 yield chunk 

954 text_content = decoder.flush() 

955 for chunk in chunker.decode(text_content): 

956 yield chunk # pragma: no cover 

957 for chunk in chunker.flush(): 

958 yield chunk 

959 

960 async def aiter_lines(self) -> typing.AsyncIterator[str]: 

961 decoder = LineDecoder() 

962 with request_context(request=self._request): 

963 async for text in self.aiter_text(): 

964 for line in decoder.decode(text): 

965 yield line 

966 for line in decoder.flush(): 

967 yield line 

968 

969 async def aiter_raw( 

970 self, chunk_size: int | None = None 

971 ) -> typing.AsyncIterator[bytes]: 

972 """ 

973 A byte-iterator over the raw response content. 

974 """ 

975 if self.is_stream_consumed: 

976 raise StreamConsumed() 

977 if self.is_closed: 

978 raise StreamClosed() 

979 if not isinstance(self.stream, AsyncByteStream): 

980 raise RuntimeError("Attempted to call an async iterator on an sync stream.") 

981 

982 self.is_stream_consumed = True 

983 self._num_bytes_downloaded = 0 

984 chunker = ByteChunker(chunk_size=chunk_size) 

985 

986 with request_context(request=self._request): 

987 async for raw_stream_bytes in self.stream: 

988 self._num_bytes_downloaded += len(raw_stream_bytes) 

989 for chunk in chunker.decode(raw_stream_bytes): 

990 yield chunk 

991 

992 for chunk in chunker.flush(): 

993 yield chunk 

994 

995 await self.aclose() 

996 

997 async def aclose(self) -> None: 

998 """ 

999 Close the response and release the connection. 

1000 Automatically called if the response body is read to completion. 

1001 """ 

1002 if not isinstance(self.stream, AsyncByteStream): 

1003 raise RuntimeError("Attempted to call an async close on an sync stream.") 

1004 

1005 if not self.is_closed: 

1006 self.is_closed = True 

1007 with request_context(request=self._request): 

1008 await self.stream.aclose() 

1009 

1010 

1011class Cookies(typing.MutableMapping[str, str]): 

1012 """ 

1013 HTTP Cookies, as a mutable mapping. 

1014 """ 

1015 

1016 def __init__(self, cookies: CookieTypes | None = None) -> None: 

1017 if cookies is None or isinstance(cookies, dict): 

1018 self.jar = CookieJar() 

1019 if isinstance(cookies, dict): 

1020 for key, value in cookies.items(): 

1021 self.set(key, value) 

1022 elif isinstance(cookies, list): 

1023 self.jar = CookieJar() 

1024 for key, value in cookies: 

1025 self.set(key, value) 

1026 elif isinstance(cookies, Cookies): 

1027 self.jar = CookieJar() 

1028 for cookie in cookies.jar: 

1029 self.jar.set_cookie(cookie) 

1030 else: 

1031 self.jar = cookies 

1032 

1033 def extract_cookies(self, response: Response) -> None: 

1034 """ 

1035 Loads any cookies based on the response `Set-Cookie` headers. 

1036 """ 

1037 urllib_response = self._CookieCompatResponse(response) 

1038 urllib_request = self._CookieCompatRequest(response.request) 

1039 

1040 self.jar.extract_cookies(urllib_response, urllib_request) # type: ignore 

1041 

1042 def set_cookie_header(self, request: Request) -> None: 

1043 """ 

1044 Sets an appropriate 'Cookie:' HTTP header on the `Request`. 

1045 """ 

1046 urllib_request = self._CookieCompatRequest(request) 

1047 self.jar.add_cookie_header(urllib_request) 

1048 

1049 def set(self, name: str, value: str, domain: str = "", path: str = "/") -> None: 

1050 """ 

1051 Set a cookie value by name. May optionally include domain and path. 

1052 """ 

1053 kwargs = { 

1054 "version": 0, 

1055 "name": name, 

1056 "value": value, 

1057 "port": None, 

1058 "port_specified": False, 

1059 "domain": domain, 

1060 "domain_specified": bool(domain), 

1061 "domain_initial_dot": domain.startswith("."), 

1062 "path": path, 

1063 "path_specified": bool(path), 

1064 "secure": False, 

1065 "expires": None, 

1066 "discard": True, 

1067 "comment": None, 

1068 "comment_url": None, 

1069 "rest": {"HttpOnly": None}, 

1070 "rfc2109": False, 

1071 } 

1072 cookie = Cookie(**kwargs) # type: ignore 

1073 self.jar.set_cookie(cookie) 

1074 

1075 def get( # type: ignore 

1076 self, 

1077 name: str, 

1078 default: str | None = None, 

1079 domain: str | None = None, 

1080 path: str | None = None, 

1081 ) -> str | None: 

1082 """ 

1083 Get a cookie by name. May optionally include domain and path 

1084 in order to specify exactly which cookie to retrieve. 

1085 """ 

1086 value = None 

1087 for cookie in self.jar: 

1088 if cookie.name == name: 

1089 if domain is None or cookie.domain == domain: 

1090 if path is None or cookie.path == path: 

1091 if value is not None: 

1092 message = f"Multiple cookies exist with name={name}" 

1093 raise CookieConflict(message) 

1094 value = cookie.value 

1095 

1096 if value is None: 

1097 return default 

1098 return value 

1099 

1100 def delete( 

1101 self, 

1102 name: str, 

1103 domain: str | None = None, 

1104 path: str | None = None, 

1105 ) -> None: 

1106 """ 

1107 Delete a cookie by name. May optionally include domain and path 

1108 in order to specify exactly which cookie to delete. 

1109 """ 

1110 if domain is not None and path is not None: 

1111 return self.jar.clear(domain, path, name) 

1112 

1113 remove = [ 

1114 cookie 

1115 for cookie in self.jar 

1116 if cookie.name == name 

1117 and (domain is None or cookie.domain == domain) 

1118 and (path is None or cookie.path == path) 

1119 ] 

1120 

1121 for cookie in remove: 

1122 self.jar.clear(cookie.domain, cookie.path, cookie.name) 

1123 

1124 def clear(self, domain: str | None = None, path: str | None = None) -> None: 

1125 """ 

1126 Delete all cookies. Optionally include a domain and path in 

1127 order to only delete a subset of all the cookies. 

1128 """ 

1129 args = [] 

1130 if domain is not None: 

1131 args.append(domain) 

1132 if path is not None: 

1133 assert domain is not None 

1134 args.append(path) 

1135 self.jar.clear(*args) 

1136 

1137 def update(self, cookies: CookieTypes | None = None) -> None: # type: ignore 

1138 cookies = Cookies(cookies) 

1139 for cookie in cookies.jar: 

1140 self.jar.set_cookie(cookie) 

1141 

1142 def __setitem__(self, name: str, value: str) -> None: 

1143 return self.set(name, value) 

1144 

1145 def __getitem__(self, name: str) -> str: 

1146 value = self.get(name) 

1147 if value is None: 

1148 raise KeyError(name) 

1149 return value 

1150 

1151 def __delitem__(self, name: str) -> None: 

1152 return self.delete(name) 

1153 

1154 def __len__(self) -> int: 

1155 return len(self.jar) 

1156 

1157 def __iter__(self) -> typing.Iterator[str]: 

1158 return (cookie.name for cookie in self.jar) 

1159 

1160 def __bool__(self) -> bool: 

1161 for _ in self.jar: 

1162 return True 

1163 return False 

1164 

1165 def __repr__(self) -> str: 

1166 cookies_repr = ", ".join( 

1167 [ 

1168 f"<Cookie {cookie.name}={cookie.value} for {cookie.domain} />" 

1169 for cookie in self.jar 

1170 ] 

1171 ) 

1172 

1173 return f"<Cookies[{cookies_repr}]>" 

1174 

1175 class _CookieCompatRequest(urllib.request.Request): 

1176 """ 

1177 Wraps a `Request` instance up in a compatibility interface suitable 

1178 for use with `CookieJar` operations. 

1179 """ 

1180 

1181 def __init__(self, request: Request) -> None: 

1182 super().__init__( 

1183 url=str(request.url), 

1184 headers=dict(request.headers), 

1185 method=request.method, 

1186 ) 

1187 self.request = request 

1188 

1189 def add_unredirected_header(self, key: str, value: str) -> None: 

1190 super().add_unredirected_header(key, value) 

1191 self.request.headers[key] = value 

1192 

1193 class _CookieCompatResponse: 

1194 """ 

1195 Wraps a `Request` instance up in a compatibility interface suitable 

1196 for use with `CookieJar` operations. 

1197 """ 

1198 

1199 def __init__(self, response: Response) -> None: 

1200 self.response = response 

1201 

1202 def info(self) -> email.message.Message: 

1203 info = email.message.Message() 

1204 for key, value in self.response.headers.multi_items(): 

1205 # Note that setting `info[key]` here is an "append" operation, 

1206 # not a "replace" operation. 

1207 # https://docs.python.org/3/library/email.compat32-message.html#email.message.Message.__setitem__ 

1208 info[key] = value 

1209 return info