Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_models.py: 23%

603 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1import datetime 

2import email.message 

3import json as jsonlib 

4import typing 

5import urllib.request 

6from collections.abc import Mapping 

7from http.cookiejar import Cookie, CookieJar 

8 

9from ._content import ByteStream, UnattachedStream, encode_request, encode_response 

10from ._decoders import ( 

11 SUPPORTED_DECODERS, 

12 ByteChunker, 

13 ContentDecoder, 

14 IdentityDecoder, 

15 LineDecoder, 

16 MultiDecoder, 

17 TextChunker, 

18 TextDecoder, 

19) 

20from ._exceptions import ( 

21 CookieConflict, 

22 HTTPStatusError, 

23 RequestNotRead, 

24 ResponseNotRead, 

25 StreamClosed, 

26 StreamConsumed, 

27 request_context, 

28) 

29from ._multipart import get_multipart_boundary_from_content_type 

30from ._status_codes import codes 

31from ._types import ( 

32 AsyncByteStream, 

33 CookieTypes, 

34 HeaderTypes, 

35 QueryParamTypes, 

36 RequestContent, 

37 RequestData, 

38 RequestExtensions, 

39 RequestFiles, 

40 ResponseContent, 

41 ResponseExtensions, 

42 SyncByteStream, 

43) 

44from ._urls import URL 

45from ._utils import ( 

46 guess_json_utf, 

47 is_known_encoding, 

48 normalize_header_key, 

49 normalize_header_value, 

50 obfuscate_sensitive_headers, 

51 parse_content_type_charset, 

52 parse_header_links, 

53) 

54 

55 

56class Headers(typing.MutableMapping[str, str]): 

57 """ 

58 HTTP headers, as a case-insensitive multi-dict. 

59 """ 

60 

61 def __init__( 

62 self, 

63 headers: typing.Optional[HeaderTypes] = None, 

64 encoding: typing.Optional[str] = None, 

65 ) -> None: 

66 if headers is None: 

67 self._list = [] # type: typing.List[typing.Tuple[bytes, bytes, bytes]] 

68 elif isinstance(headers, Headers): 

69 self._list = list(headers._list) 

70 elif isinstance(headers, Mapping): 

71 self._list = [ 

72 ( 

73 normalize_header_key(k, lower=False, encoding=encoding), 

74 normalize_header_key(k, lower=True, encoding=encoding), 

75 normalize_header_value(v, encoding), 

76 ) 

77 for k, v in headers.items() 

78 ] 

79 else: 

80 self._list = [ 

81 ( 

82 normalize_header_key(k, lower=False, encoding=encoding), 

83 normalize_header_key(k, lower=True, encoding=encoding), 

84 normalize_header_value(v, encoding), 

85 ) 

86 for k, v in headers 

87 ] 

88 

89 self._encoding = encoding 

90 

91 @property 

92 def encoding(self) -> str: 

93 """ 

94 Header encoding is mandated as ascii, but we allow fallbacks to utf-8 

95 or iso-8859-1. 

96 """ 

97 if self._encoding is None: 

98 for encoding in ["ascii", "utf-8"]: 

99 for key, value in self.raw: 

100 try: 

101 key.decode(encoding) 

102 value.decode(encoding) 

103 except UnicodeDecodeError: 

104 break 

105 else: 

106 # The else block runs if 'break' did not occur, meaning 

107 # all values fitted the encoding. 

108 self._encoding = encoding 

109 break 

110 else: 

111 # The ISO-8859-1 encoding covers all 256 code points in a byte, 

112 # so will never raise decode errors. 

113 self._encoding = "iso-8859-1" 

114 return self._encoding 

115 

116 @encoding.setter 

117 def encoding(self, value: str) -> None: 

118 self._encoding = value 

119 

120 @property 

121 def raw(self) -> typing.List[typing.Tuple[bytes, bytes]]: 

122 """ 

123 Returns a list of the raw header items, as byte pairs. 

124 """ 

125 return [(raw_key, value) for raw_key, _, value in self._list] 

126 

127 def keys(self) -> typing.KeysView[str]: 

128 return {key.decode(self.encoding): None for _, key, value in self._list}.keys() 

129 

130 def values(self) -> typing.ValuesView[str]: 

131 values_dict: typing.Dict[str, str] = {} 

132 for _, key, value in self._list: 

133 str_key = key.decode(self.encoding) 

134 str_value = value.decode(self.encoding) 

135 if str_key in values_dict: 

136 values_dict[str_key] += f", {str_value}" 

137 else: 

138 values_dict[str_key] = str_value 

139 return values_dict.values() 

140 

141 def items(self) -> typing.ItemsView[str, str]: 

142 """ 

143 Return `(key, value)` items of headers. Concatenate headers 

144 into a single comma separated value when a key occurs multiple times. 

145 """ 

146 values_dict: typing.Dict[str, str] = {} 

147 for _, key, value in self._list: 

148 str_key = key.decode(self.encoding) 

149 str_value = value.decode(self.encoding) 

150 if str_key in values_dict: 

151 values_dict[str_key] += f", {str_value}" 

152 else: 

153 values_dict[str_key] = str_value 

154 return values_dict.items() 

155 

156 def multi_items(self) -> typing.List[typing.Tuple[str, str]]: 

157 """ 

158 Return a list of `(key, value)` pairs of headers. Allow multiple 

159 occurrences of the same key without concatenating into a single 

160 comma separated value. 

161 """ 

162 return [ 

163 (key.decode(self.encoding), value.decode(self.encoding)) 

164 for _, key, value in self._list 

165 ] 

166 

167 def get(self, key: str, default: typing.Any = None) -> typing.Any: 

168 """ 

169 Return a header value. If multiple occurrences of the header occur 

170 then concatenate them together with commas. 

171 """ 

172 try: 

173 return self[key] 

174 except KeyError: 

175 return default 

176 

177 def get_list(self, key: str, split_commas: bool = False) -> typing.List[str]: 

178 """ 

179 Return a list of all header values for a given key. 

180 If `split_commas=True` is passed, then any comma separated header 

181 values are split into multiple return strings. 

182 """ 

183 get_header_key = key.lower().encode(self.encoding) 

184 

185 values = [ 

186 item_value.decode(self.encoding) 

187 for _, item_key, item_value in self._list 

188 if item_key.lower() == get_header_key 

189 ] 

190 

191 if not split_commas: 

192 return values 

193 

194 split_values = [] 

195 for value in values: 

196 split_values.extend([item.strip() for item in value.split(",")]) 

197 return split_values 

198 

199 def update(self, headers: typing.Optional[HeaderTypes] = None) -> None: # type: ignore 

200 headers = Headers(headers) 

201 for key in headers.keys(): 

202 if key in self: 

203 self.pop(key) 

204 self._list.extend(headers._list) 

205 

206 def copy(self) -> "Headers": 

207 return Headers(self, encoding=self.encoding) 

208 

209 def __getitem__(self, key: str) -> str: 

210 """ 

211 Return a single header value. 

212 

213 If there are multiple headers with the same key, then we concatenate 

214 them with commas. See: https://tools.ietf.org/html/rfc7230#section-3.2.2 

215 """ 

216 normalized_key = key.lower().encode(self.encoding) 

217 

218 items = [ 

219 header_value.decode(self.encoding) 

220 for _, header_key, header_value in self._list 

221 if header_key == normalized_key 

222 ] 

223 

224 if items: 

225 return ", ".join(items) 

226 

227 raise KeyError(key) 

228 

229 def __setitem__(self, key: str, value: str) -> None: 

230 """ 

231 Set the header `key` to `value`, removing any duplicate entries. 

232 Retains insertion order. 

233 """ 

234 set_key = key.encode(self._encoding or "utf-8") 

235 set_value = value.encode(self._encoding or "utf-8") 

236 lookup_key = set_key.lower() 

237 

238 found_indexes = [ 

239 idx 

240 for idx, (_, item_key, _) in enumerate(self._list) 

241 if item_key == lookup_key 

242 ] 

243 

244 for idx in reversed(found_indexes[1:]): 

245 del self._list[idx] 

246 

247 if found_indexes: 

248 idx = found_indexes[0] 

249 self._list[idx] = (set_key, lookup_key, set_value) 

250 else: 

251 self._list.append((set_key, lookup_key, set_value)) 

252 

253 def __delitem__(self, key: str) -> None: 

254 """ 

255 Remove the header `key`. 

256 """ 

257 del_key = key.lower().encode(self.encoding) 

258 

259 pop_indexes = [ 

260 idx 

261 for idx, (_, item_key, _) in enumerate(self._list) 

262 if item_key.lower() == del_key 

263 ] 

264 

265 if not pop_indexes: 

266 raise KeyError(key) 

267 

268 for idx in reversed(pop_indexes): 

269 del self._list[idx] 

270 

271 def __contains__(self, key: typing.Any) -> bool: 

272 header_key = key.lower().encode(self.encoding) 

273 return header_key in [key for _, key, _ in self._list] 

274 

275 def __iter__(self) -> typing.Iterator[typing.Any]: 

276 return iter(self.keys()) 

277 

278 def __len__(self) -> int: 

279 return len(self._list) 

280 

281 def __eq__(self, other: typing.Any) -> bool: 

282 try: 

283 other_headers = Headers(other) 

284 except ValueError: 

285 return False 

286 

287 self_list = [(key, value) for _, key, value in self._list] 

288 other_list = [(key, value) for _, key, value in other_headers._list] 

289 return sorted(self_list) == sorted(other_list) 

290 

291 def __repr__(self) -> str: 

292 class_name = self.__class__.__name__ 

293 

294 encoding_str = "" 

295 if self.encoding != "ascii": 

296 encoding_str = f", encoding={self.encoding!r}" 

297 

298 as_list = list(obfuscate_sensitive_headers(self.multi_items())) 

299 as_dict = dict(as_list) 

300 

301 no_duplicate_keys = len(as_dict) == len(as_list) 

302 if no_duplicate_keys: 

303 return f"{class_name}({as_dict!r}{encoding_str})" 

304 return f"{class_name}({as_list!r}{encoding_str})" 

305 

306 

307class Request: 

308 def __init__( 

309 self, 

310 method: typing.Union[str, bytes], 

311 url: typing.Union["URL", str], 

312 *, 

313 params: typing.Optional[QueryParamTypes] = None, 

314 headers: typing.Optional[HeaderTypes] = None, 

315 cookies: typing.Optional[CookieTypes] = None, 

316 content: typing.Optional[RequestContent] = None, 

317 data: typing.Optional[RequestData] = None, 

318 files: typing.Optional[RequestFiles] = None, 

319 json: typing.Optional[typing.Any] = None, 

320 stream: typing.Union[SyncByteStream, AsyncByteStream, None] = None, 

321 extensions: typing.Optional[RequestExtensions] = None, 

322 ): 

323 self.method = ( 

324 method.decode("ascii").upper() 

325 if isinstance(method, bytes) 

326 else method.upper() 

327 ) 

328 self.url = URL(url) 

329 if params is not None: 

330 self.url = self.url.copy_merge_params(params=params) 

331 self.headers = Headers(headers) 

332 self.extensions = {} if extensions is None else extensions 

333 

334 if cookies: 

335 Cookies(cookies).set_cookie_header(self) 

336 

337 if stream is None: 

338 content_type: typing.Optional[str] = self.headers.get("content-type") 

339 headers, stream = encode_request( 

340 content=content, 

341 data=data, 

342 files=files, 

343 json=json, 

344 boundary=get_multipart_boundary_from_content_type( 

345 content_type=content_type.encode(self.headers.encoding) 

346 if content_type 

347 else None 

348 ), 

349 ) 

350 self._prepare(headers) 

351 self.stream = stream 

352 # Load the request body, except for streaming content. 

353 if isinstance(stream, ByteStream): 

354 self.read() 

355 else: 

356 # There's an important distinction between `Request(content=...)`, 

357 # and `Request(stream=...)`. 

358 # 

359 # Using `content=...` implies automatically populated `Host` and content 

360 # headers, of either `Content-Length: ...` or `Transfer-Encoding: chunked`. 

361 # 

362 # Using `stream=...` will not automatically include *any* auto-populated headers. 

363 # 

364 # As an end-user you don't really need `stream=...`. It's only 

365 # useful when: 

366 # 

367 # * Preserving the request stream when copying requests, eg for redirects. 

368 # * Creating request instances on the *server-side* of the transport API. 

369 self.stream = stream 

370 

371 def _prepare(self, default_headers: typing.Dict[str, str]) -> None: 

372 for key, value in default_headers.items(): 

373 # Ignore Transfer-Encoding if the Content-Length has been set explicitly. 

374 if key.lower() == "transfer-encoding" and "Content-Length" in self.headers: 

375 continue 

376 self.headers.setdefault(key, value) 

377 

378 auto_headers: typing.List[typing.Tuple[bytes, bytes]] = [] 

379 

380 has_host = "Host" in self.headers 

381 has_content_length = ( 

382 "Content-Length" in self.headers or "Transfer-Encoding" in self.headers 

383 ) 

384 

385 if not has_host and self.url.host: 

386 auto_headers.append((b"Host", self.url.netloc)) 

387 if not has_content_length and self.method in ("POST", "PUT", "PATCH"): 

388 auto_headers.append((b"Content-Length", b"0")) 

389 

390 self.headers = Headers(auto_headers + self.headers.raw) 

391 

392 @property 

393 def content(self) -> bytes: 

394 if not hasattr(self, "_content"): 

395 raise RequestNotRead() 

396 return self._content 

397 

398 def read(self) -> bytes: 

399 """ 

400 Read and return the request content. 

401 """ 

402 if not hasattr(self, "_content"): 

403 assert isinstance(self.stream, typing.Iterable) 

404 self._content = b"".join(self.stream) 

405 if not isinstance(self.stream, ByteStream): 

406 # If a streaming request has been read entirely into memory, then 

407 # we can replace the stream with a raw bytes implementation, 

408 # to ensure that any non-replayable streams can still be used. 

409 self.stream = ByteStream(self._content) 

410 return self._content 

411 

412 async def aread(self) -> bytes: 

413 """ 

414 Read and return the request content. 

415 """ 

416 if not hasattr(self, "_content"): 

417 assert isinstance(self.stream, typing.AsyncIterable) 

418 self._content = b"".join([part async for part in self.stream]) 

419 if not isinstance(self.stream, ByteStream): 

420 # If a streaming request has been read entirely into memory, then 

421 # we can replace the stream with a raw bytes implementation, 

422 # to ensure that any non-replayable streams can still be used. 

423 self.stream = ByteStream(self._content) 

424 return self._content 

425 

426 def __repr__(self) -> str: 

427 class_name = self.__class__.__name__ 

428 url = str(self.url) 

429 return f"<{class_name}({self.method!r}, {url!r})>" 

430 

431 def __getstate__(self) -> typing.Dict[str, typing.Any]: 

432 return { 

433 name: value 

434 for name, value in self.__dict__.items() 

435 if name not in ["extensions", "stream"] 

436 } 

437 

438 def __setstate__(self, state: typing.Dict[str, typing.Any]) -> None: 

439 for name, value in state.items(): 

440 setattr(self, name, value) 

441 self.extensions = {} 

442 self.stream = UnattachedStream() 

443 

444 

445class Response: 

446 def __init__( 

447 self, 

448 status_code: int, 

449 *, 

450 headers: typing.Optional[HeaderTypes] = None, 

451 content: typing.Optional[ResponseContent] = None, 

452 text: typing.Optional[str] = None, 

453 html: typing.Optional[str] = None, 

454 json: typing.Any = None, 

455 stream: typing.Union[SyncByteStream, AsyncByteStream, None] = None, 

456 request: typing.Optional[Request] = None, 

457 extensions: typing.Optional[ResponseExtensions] = None, 

458 history: typing.Optional[typing.List["Response"]] = None, 

459 default_encoding: typing.Union[str, typing.Callable[[bytes], str]] = "utf-8", 

460 ): 

461 self.status_code = status_code 

462 self.headers = Headers(headers) 

463 

464 self._request: typing.Optional[Request] = request 

465 

466 # When follow_redirects=False and a redirect is received, 

467 # the client will set `response.next_request`. 

468 self.next_request: typing.Optional[Request] = None 

469 

470 self.extensions = {} if extensions is None else extensions 

471 self.history = [] if history is None else list(history) 

472 

473 self.is_closed = False 

474 self.is_stream_consumed = False 

475 

476 self.default_encoding = default_encoding 

477 

478 if stream is None: 

479 headers, stream = encode_response(content, text, html, json) 

480 self._prepare(headers) 

481 self.stream = stream 

482 if isinstance(stream, ByteStream): 

483 # Load the response body, except for streaming content. 

484 self.read() 

485 else: 

486 # There's an important distinction between `Response(content=...)`, 

487 # and `Response(stream=...)`. 

488 # 

489 # Using `content=...` implies automatically populated content headers, 

490 # of either `Content-Length: ...` or `Transfer-Encoding: chunked`. 

491 # 

492 # Using `stream=...` will not automatically include any content headers. 

493 # 

494 # As an end-user you don't really need `stream=...`. It's only 

495 # useful when creating response instances having received a stream 

496 # from the transport API. 

497 self.stream = stream 

498 

499 self._num_bytes_downloaded = 0 

500 

501 def _prepare(self, default_headers: typing.Dict[str, str]) -> None: 

502 for key, value in default_headers.items(): 

503 # Ignore Transfer-Encoding if the Content-Length has been set explicitly. 

504 if key.lower() == "transfer-encoding" and "content-length" in self.headers: 

505 continue 

506 self.headers.setdefault(key, value) 

507 

508 @property 

509 def elapsed(self) -> datetime.timedelta: 

510 """ 

511 Returns the time taken for the complete request/response 

512 cycle to complete. 

513 """ 

514 if not hasattr(self, "_elapsed"): 

515 raise RuntimeError( 

516 "'.elapsed' may only be accessed after the response " 

517 "has been read or closed." 

518 ) 

519 return self._elapsed 

520 

521 @elapsed.setter 

522 def elapsed(self, elapsed: datetime.timedelta) -> None: 

523 self._elapsed = elapsed 

524 

525 @property 

526 def request(self) -> Request: 

527 """ 

528 Returns the request instance associated to the current response. 

529 """ 

530 if self._request is None: 

531 raise RuntimeError( 

532 "The request instance has not been set on this response." 

533 ) 

534 return self._request 

535 

536 @request.setter 

537 def request(self, value: Request) -> None: 

538 self._request = value 

539 

540 @property 

541 def http_version(self) -> str: 

542 try: 

543 http_version: bytes = self.extensions["http_version"] 

544 except KeyError: 

545 return "HTTP/1.1" 

546 else: 

547 return http_version.decode("ascii", errors="ignore") 

548 

549 @property 

550 def reason_phrase(self) -> str: 

551 try: 

552 reason_phrase: bytes = self.extensions["reason_phrase"] 

553 except KeyError: 

554 return codes.get_reason_phrase(self.status_code) 

555 else: 

556 return reason_phrase.decode("ascii", errors="ignore") 

557 

558 @property 

559 def url(self) -> URL: 

560 """ 

561 Returns the URL for which the request was made. 

562 """ 

563 return self.request.url 

564 

565 @property 

566 def content(self) -> bytes: 

567 if not hasattr(self, "_content"): 

568 raise ResponseNotRead() 

569 return self._content 

570 

571 @property 

572 def text(self) -> str: 

573 if not hasattr(self, "_text"): 

574 content = self.content 

575 if not content: 

576 self._text = "" 

577 else: 

578 decoder = TextDecoder(encoding=self.encoding or "utf-8") 

579 self._text = "".join([decoder.decode(self.content), decoder.flush()]) 

580 return self._text 

581 

582 @property 

583 def encoding(self) -> typing.Optional[str]: 

584 """ 

585 Return an encoding to use for decoding the byte content into text. 

586 The priority for determining this is given by... 

587 

588 * `.encoding = <>` has been set explicitly. 

589 * The encoding as specified by the charset parameter in the Content-Type header. 

590 * The encoding as determined by `default_encoding`, which may either be 

591 a string like "utf-8" indicating the encoding to use, or may be a callable 

592 which enables charset autodetection. 

593 """ 

594 if not hasattr(self, "_encoding"): 

595 encoding = self.charset_encoding 

596 if encoding is None or not is_known_encoding(encoding): 

597 if isinstance(self.default_encoding, str): 

598 encoding = self.default_encoding 

599 elif hasattr(self, "_content"): 

600 encoding = self.default_encoding(self._content) 

601 self._encoding = encoding or "utf-8" 

602 return self._encoding 

603 

604 @encoding.setter 

605 def encoding(self, value: str) -> None: 

606 self._encoding = value 

607 

608 @property 

609 def charset_encoding(self) -> typing.Optional[str]: 

610 """ 

611 Return the encoding, as specified by the Content-Type header. 

612 """ 

613 content_type = self.headers.get("Content-Type") 

614 if content_type is None: 

615 return None 

616 

617 return parse_content_type_charset(content_type) 

618 

619 def _get_content_decoder(self) -> ContentDecoder: 

620 """ 

621 Returns a decoder instance which can be used to decode the raw byte 

622 content, depending on the Content-Encoding used in the response. 

623 """ 

624 if not hasattr(self, "_decoder"): 

625 decoders: typing.List[ContentDecoder] = [] 

626 values = self.headers.get_list("content-encoding", split_commas=True) 

627 for value in values: 

628 value = value.strip().lower() 

629 try: 

630 decoder_cls = SUPPORTED_DECODERS[value] 

631 decoders.append(decoder_cls()) 

632 except KeyError: 

633 continue 

634 

635 if len(decoders) == 1: 

636 self._decoder = decoders[0] 

637 elif len(decoders) > 1: 

638 self._decoder = MultiDecoder(children=decoders) 

639 else: 

640 self._decoder = IdentityDecoder() 

641 

642 return self._decoder 

643 

644 @property 

645 def is_informational(self) -> bool: 

646 """ 

647 A property which is `True` for 1xx status codes, `False` otherwise. 

648 """ 

649 return codes.is_informational(self.status_code) 

650 

651 @property 

652 def is_success(self) -> bool: 

653 """ 

654 A property which is `True` for 2xx status codes, `False` otherwise. 

655 """ 

656 return codes.is_success(self.status_code) 

657 

658 @property 

659 def is_redirect(self) -> bool: 

660 """ 

661 A property which is `True` for 3xx status codes, `False` otherwise. 

662 

663 Note that not all responses with a 3xx status code indicate a URL redirect. 

664 

665 Use `response.has_redirect_location` to determine responses with a properly 

666 formed URL redirection. 

667 """ 

668 return codes.is_redirect(self.status_code) 

669 

670 @property 

671 def is_client_error(self) -> bool: 

672 """ 

673 A property which is `True` for 4xx status codes, `False` otherwise. 

674 """ 

675 return codes.is_client_error(self.status_code) 

676 

677 @property 

678 def is_server_error(self) -> bool: 

679 """ 

680 A property which is `True` for 5xx status codes, `False` otherwise. 

681 """ 

682 return codes.is_server_error(self.status_code) 

683 

684 @property 

685 def is_error(self) -> bool: 

686 """ 

687 A property which is `True` for 4xx and 5xx status codes, `False` otherwise. 

688 """ 

689 return codes.is_error(self.status_code) 

690 

691 @property 

692 def has_redirect_location(self) -> bool: 

693 """ 

694 Returns True for 3xx responses with a properly formed URL redirection, 

695 `False` otherwise. 

696 """ 

697 return ( 

698 self.status_code 

699 in ( 

700 # 301 (Cacheable redirect. Method may change to GET.) 

701 codes.MOVED_PERMANENTLY, 

702 # 302 (Uncacheable redirect. Method may change to GET.) 

703 codes.FOUND, 

704 # 303 (Client should make a GET or HEAD request.) 

705 codes.SEE_OTHER, 

706 # 307 (Equiv. 302, but retain method) 

707 codes.TEMPORARY_REDIRECT, 

708 # 308 (Equiv. 301, but retain method) 

709 codes.PERMANENT_REDIRECT, 

710 ) 

711 and "Location" in self.headers 

712 ) 

713 

714 def raise_for_status(self) -> None: 

715 """ 

716 Raise the `HTTPStatusError` if one occurred. 

717 """ 

718 request = self._request 

719 if request is None: 

720 raise RuntimeError( 

721 "Cannot call `raise_for_status` as the request " 

722 "instance has not been set on this response." 

723 ) 

724 

725 if self.is_success: 

726 return 

727 

728 if self.has_redirect_location: 

729 message = ( 

730 "{error_type} '{0.status_code} {0.reason_phrase}' for url '{0.url}'\n" 

731 "Redirect location: '{0.headers[location]}'\n" 

732 "For more information check: https://httpstatuses.com/{0.status_code}" 

733 ) 

734 else: 

735 message = ( 

736 "{error_type} '{0.status_code} {0.reason_phrase}' for url '{0.url}'\n" 

737 "For more information check: https://httpstatuses.com/{0.status_code}" 

738 ) 

739 

740 status_class = self.status_code // 100 

741 error_types = { 

742 1: "Informational response", 

743 3: "Redirect response", 

744 4: "Client error", 

745 5: "Server error", 

746 } 

747 error_type = error_types.get(status_class, "Invalid status code") 

748 message = message.format(self, error_type=error_type) 

749 raise HTTPStatusError(message, request=request, response=self) 

750 

751 def json(self, **kwargs: typing.Any) -> typing.Any: 

752 if self.charset_encoding is None and self.content and len(self.content) > 3: 

753 encoding = guess_json_utf(self.content) 

754 if encoding is not None: 

755 return jsonlib.loads(self.content.decode(encoding), **kwargs) 

756 return jsonlib.loads(self.text, **kwargs) 

757 

758 @property 

759 def cookies(self) -> "Cookies": 

760 if not hasattr(self, "_cookies"): 

761 self._cookies = Cookies() 

762 self._cookies.extract_cookies(self) 

763 return self._cookies 

764 

765 @property 

766 def links(self) -> typing.Dict[typing.Optional[str], typing.Dict[str, str]]: 

767 """ 

768 Returns the parsed header links of the response, if any 

769 """ 

770 header = self.headers.get("link") 

771 ldict = {} 

772 if header: 

773 links = parse_header_links(header) 

774 for link in links: 

775 key = link.get("rel") or link.get("url") 

776 ldict[key] = link 

777 return ldict 

778 

779 @property 

780 def num_bytes_downloaded(self) -> int: 

781 return self._num_bytes_downloaded 

782 

783 def __repr__(self) -> str: 

784 return f"<Response [{self.status_code} {self.reason_phrase}]>" 

785 

786 def __getstate__(self) -> typing.Dict[str, typing.Any]: 

787 return { 

788 name: value 

789 for name, value in self.__dict__.items() 

790 if name not in ["extensions", "stream", "is_closed", "_decoder"] 

791 } 

792 

793 def __setstate__(self, state: typing.Dict[str, typing.Any]) -> None: 

794 for name, value in state.items(): 

795 setattr(self, name, value) 

796 self.is_closed = True 

797 self.extensions = {} 

798 self.stream = UnattachedStream() 

799 

800 def read(self) -> bytes: 

801 """ 

802 Read and return the response content. 

803 """ 

804 if not hasattr(self, "_content"): 

805 self._content = b"".join(self.iter_bytes()) 

806 return self._content 

807 

808 def iter_bytes( 

809 self, chunk_size: typing.Optional[int] = None 

810 ) -> typing.Iterator[bytes]: 

811 """ 

812 A byte-iterator over the decoded response content. 

813 This allows us to handle gzip, deflate, and brotli encoded responses. 

814 """ 

815 if hasattr(self, "_content"): 

816 chunk_size = len(self._content) if chunk_size is None else chunk_size 

817 for i in range(0, len(self._content), max(chunk_size, 1)): 

818 yield self._content[i : i + chunk_size] 

819 else: 

820 decoder = self._get_content_decoder() 

821 chunker = ByteChunker(chunk_size=chunk_size) 

822 with request_context(request=self._request): 

823 for raw_bytes in self.iter_raw(): 

824 decoded = decoder.decode(raw_bytes) 

825 for chunk in chunker.decode(decoded): 

826 yield chunk 

827 decoded = decoder.flush() 

828 for chunk in chunker.decode(decoded): 

829 yield chunk # pragma: no cover 

830 for chunk in chunker.flush(): 

831 yield chunk 

832 

833 def iter_text( 

834 self, chunk_size: typing.Optional[int] = None 

835 ) -> typing.Iterator[str]: 

836 """ 

837 A str-iterator over the decoded response content 

838 that handles both gzip, deflate, etc but also detects the content's 

839 string encoding. 

840 """ 

841 decoder = TextDecoder(encoding=self.encoding or "utf-8") 

842 chunker = TextChunker(chunk_size=chunk_size) 

843 with request_context(request=self._request): 

844 for byte_content in self.iter_bytes(): 

845 text_content = decoder.decode(byte_content) 

846 for chunk in chunker.decode(text_content): 

847 yield chunk 

848 text_content = decoder.flush() 

849 for chunk in chunker.decode(text_content): 

850 yield chunk 

851 for chunk in chunker.flush(): 

852 yield chunk 

853 

854 def iter_lines(self) -> typing.Iterator[str]: 

855 decoder = LineDecoder() 

856 with request_context(request=self._request): 

857 for text in self.iter_text(): 

858 for line in decoder.decode(text): 

859 yield line 

860 for line in decoder.flush(): 

861 yield line 

862 

863 def iter_raw( 

864 self, chunk_size: typing.Optional[int] = None 

865 ) -> typing.Iterator[bytes]: 

866 """ 

867 A byte-iterator over the raw response content. 

868 """ 

869 if self.is_stream_consumed: 

870 raise StreamConsumed() 

871 if self.is_closed: 

872 raise StreamClosed() 

873 if not isinstance(self.stream, SyncByteStream): 

874 raise RuntimeError("Attempted to call a sync iterator on an async stream.") 

875 

876 self.is_stream_consumed = True 

877 self._num_bytes_downloaded = 0 

878 chunker = ByteChunker(chunk_size=chunk_size) 

879 

880 with request_context(request=self._request): 

881 for raw_stream_bytes in self.stream: 

882 self._num_bytes_downloaded += len(raw_stream_bytes) 

883 for chunk in chunker.decode(raw_stream_bytes): 

884 yield chunk 

885 

886 for chunk in chunker.flush(): 

887 yield chunk 

888 

889 self.close() 

890 

891 def close(self) -> None: 

892 """ 

893 Close the response and release the connection. 

894 Automatically called if the response body is read to completion. 

895 """ 

896 if not isinstance(self.stream, SyncByteStream): 

897 raise RuntimeError("Attempted to call an sync close on an async stream.") 

898 

899 if not self.is_closed: 

900 self.is_closed = True 

901 with request_context(request=self._request): 

902 self.stream.close() 

903 

904 async def aread(self) -> bytes: 

905 """ 

906 Read and return the response content. 

907 """ 

908 if not hasattr(self, "_content"): 

909 self._content = b"".join([part async for part in self.aiter_bytes()]) 

910 return self._content 

911 

912 async def aiter_bytes( 

913 self, chunk_size: typing.Optional[int] = None 

914 ) -> typing.AsyncIterator[bytes]: 

915 """ 

916 A byte-iterator over the decoded response content. 

917 This allows us to handle gzip, deflate, and brotli encoded responses. 

918 """ 

919 if hasattr(self, "_content"): 

920 chunk_size = len(self._content) if chunk_size is None else chunk_size 

921 for i in range(0, len(self._content), max(chunk_size, 1)): 

922 yield self._content[i : i + chunk_size] 

923 else: 

924 decoder = self._get_content_decoder() 

925 chunker = ByteChunker(chunk_size=chunk_size) 

926 with request_context(request=self._request): 

927 async for raw_bytes in self.aiter_raw(): 

928 decoded = decoder.decode(raw_bytes) 

929 for chunk in chunker.decode(decoded): 

930 yield chunk 

931 decoded = decoder.flush() 

932 for chunk in chunker.decode(decoded): 

933 yield chunk # pragma: no cover 

934 for chunk in chunker.flush(): 

935 yield chunk 

936 

937 async def aiter_text( 

938 self, chunk_size: typing.Optional[int] = None 

939 ) -> typing.AsyncIterator[str]: 

940 """ 

941 A str-iterator over the decoded response content 

942 that handles both gzip, deflate, etc but also detects the content's 

943 string encoding. 

944 """ 

945 decoder = TextDecoder(encoding=self.encoding or "utf-8") 

946 chunker = TextChunker(chunk_size=chunk_size) 

947 with request_context(request=self._request): 

948 async for byte_content in self.aiter_bytes(): 

949 text_content = decoder.decode(byte_content) 

950 for chunk in chunker.decode(text_content): 

951 yield chunk 

952 text_content = decoder.flush() 

953 for chunk in chunker.decode(text_content): 

954 yield chunk 

955 for chunk in chunker.flush(): 

956 yield chunk 

957 

958 async def aiter_lines(self) -> typing.AsyncIterator[str]: 

959 decoder = LineDecoder() 

960 with request_context(request=self._request): 

961 async for text in self.aiter_text(): 

962 for line in decoder.decode(text): 

963 yield line 

964 for line in decoder.flush(): 

965 yield line 

966 

967 async def aiter_raw( 

968 self, chunk_size: typing.Optional[int] = None 

969 ) -> typing.AsyncIterator[bytes]: 

970 """ 

971 A byte-iterator over the raw response content. 

972 """ 

973 if self.is_stream_consumed: 

974 raise StreamConsumed() 

975 if self.is_closed: 

976 raise StreamClosed() 

977 if not isinstance(self.stream, AsyncByteStream): 

978 raise RuntimeError("Attempted to call an async iterator on an sync stream.") 

979 

980 self.is_stream_consumed = True 

981 self._num_bytes_downloaded = 0 

982 chunker = ByteChunker(chunk_size=chunk_size) 

983 

984 with request_context(request=self._request): 

985 async for raw_stream_bytes in self.stream: 

986 self._num_bytes_downloaded += len(raw_stream_bytes) 

987 for chunk in chunker.decode(raw_stream_bytes): 

988 yield chunk 

989 

990 for chunk in chunker.flush(): 

991 yield chunk 

992 

993 await self.aclose() 

994 

995 async def aclose(self) -> None: 

996 """ 

997 Close the response and release the connection. 

998 Automatically called if the response body is read to completion. 

999 """ 

1000 if not isinstance(self.stream, AsyncByteStream): 

1001 raise RuntimeError("Attempted to call an async close on an sync stream.") 

1002 

1003 if not self.is_closed: 

1004 self.is_closed = True 

1005 with request_context(request=self._request): 

1006 await self.stream.aclose() 

1007 

1008 

1009class Cookies(typing.MutableMapping[str, str]): 

1010 """ 

1011 HTTP Cookies, as a mutable mapping. 

1012 """ 

1013 

1014 def __init__(self, cookies: typing.Optional[CookieTypes] = None) -> None: 

1015 if cookies is None or isinstance(cookies, dict): 

1016 self.jar = CookieJar() 

1017 if isinstance(cookies, dict): 

1018 for key, value in cookies.items(): 

1019 self.set(key, value) 

1020 elif isinstance(cookies, list): 

1021 self.jar = CookieJar() 

1022 for key, value in cookies: 

1023 self.set(key, value) 

1024 elif isinstance(cookies, Cookies): 

1025 self.jar = CookieJar() 

1026 for cookie in cookies.jar: 

1027 self.jar.set_cookie(cookie) 

1028 else: 

1029 self.jar = cookies 

1030 

1031 def extract_cookies(self, response: Response) -> None: 

1032 """ 

1033 Loads any cookies based on the response `Set-Cookie` headers. 

1034 """ 

1035 urllib_response = self._CookieCompatResponse(response) 

1036 urllib_request = self._CookieCompatRequest(response.request) 

1037 

1038 self.jar.extract_cookies(urllib_response, urllib_request) # type: ignore 

1039 

1040 def set_cookie_header(self, request: Request) -> None: 

1041 """ 

1042 Sets an appropriate 'Cookie:' HTTP header on the `Request`. 

1043 """ 

1044 urllib_request = self._CookieCompatRequest(request) 

1045 self.jar.add_cookie_header(urllib_request) 

1046 

1047 def set(self, name: str, value: str, domain: str = "", path: str = "/") -> None: 

1048 """ 

1049 Set a cookie value by name. May optionally include domain and path. 

1050 """ 

1051 kwargs = { 

1052 "version": 0, 

1053 "name": name, 

1054 "value": value, 

1055 "port": None, 

1056 "port_specified": False, 

1057 "domain": domain, 

1058 "domain_specified": bool(domain), 

1059 "domain_initial_dot": domain.startswith("."), 

1060 "path": path, 

1061 "path_specified": bool(path), 

1062 "secure": False, 

1063 "expires": None, 

1064 "discard": True, 

1065 "comment": None, 

1066 "comment_url": None, 

1067 "rest": {"HttpOnly": None}, 

1068 "rfc2109": False, 

1069 } 

1070 cookie = Cookie(**kwargs) # type: ignore 

1071 self.jar.set_cookie(cookie) 

1072 

1073 def get( # type: ignore 

1074 self, 

1075 name: str, 

1076 default: typing.Optional[str] = None, 

1077 domain: typing.Optional[str] = None, 

1078 path: typing.Optional[str] = None, 

1079 ) -> typing.Optional[str]: 

1080 """ 

1081 Get a cookie by name. May optionally include domain and path 

1082 in order to specify exactly which cookie to retrieve. 

1083 """ 

1084 value = None 

1085 for cookie in self.jar: 

1086 if cookie.name == name: 

1087 if domain is None or cookie.domain == domain: 

1088 if path is None or cookie.path == path: 

1089 if value is not None: 

1090 message = f"Multiple cookies exist with name={name}" 

1091 raise CookieConflict(message) 

1092 value = cookie.value 

1093 

1094 if value is None: 

1095 return default 

1096 return value 

1097 

1098 def delete( 

1099 self, 

1100 name: str, 

1101 domain: typing.Optional[str] = None, 

1102 path: typing.Optional[str] = None, 

1103 ) -> None: 

1104 """ 

1105 Delete a cookie by name. May optionally include domain and path 

1106 in order to specify exactly which cookie to delete. 

1107 """ 

1108 if domain is not None and path is not None: 

1109 return self.jar.clear(domain, path, name) 

1110 

1111 remove = [ 

1112 cookie 

1113 for cookie in self.jar 

1114 if cookie.name == name 

1115 and (domain is None or cookie.domain == domain) 

1116 and (path is None or cookie.path == path) 

1117 ] 

1118 

1119 for cookie in remove: 

1120 self.jar.clear(cookie.domain, cookie.path, cookie.name) 

1121 

1122 def clear( 

1123 self, domain: typing.Optional[str] = None, path: typing.Optional[str] = None 

1124 ) -> None: 

1125 """ 

1126 Delete all cookies. Optionally include a domain and path in 

1127 order to only delete a subset of all the cookies. 

1128 """ 

1129 args = [] 

1130 if domain is not None: 

1131 args.append(domain) 

1132 if path is not None: 

1133 assert domain is not None 

1134 args.append(path) 

1135 self.jar.clear(*args) 

1136 

1137 def update(self, cookies: typing.Optional[CookieTypes] = None) -> None: # type: ignore 

1138 cookies = Cookies(cookies) 

1139 for cookie in cookies.jar: 

1140 self.jar.set_cookie(cookie) 

1141 

1142 def __setitem__(self, name: str, value: str) -> None: 

1143 return self.set(name, value) 

1144 

1145 def __getitem__(self, name: str) -> str: 

1146 value = self.get(name) 

1147 if value is None: 

1148 raise KeyError(name) 

1149 return value 

1150 

1151 def __delitem__(self, name: str) -> None: 

1152 return self.delete(name) 

1153 

1154 def __len__(self) -> int: 

1155 return len(self.jar) 

1156 

1157 def __iter__(self) -> typing.Iterator[str]: 

1158 return (cookie.name for cookie in self.jar) 

1159 

1160 def __bool__(self) -> bool: 

1161 for _ in self.jar: 

1162 return True 

1163 return False 

1164 

1165 def __repr__(self) -> str: 

1166 cookies_repr = ", ".join( 

1167 [ 

1168 f"<Cookie {cookie.name}={cookie.value} for {cookie.domain} />" 

1169 for cookie in self.jar 

1170 ] 

1171 ) 

1172 

1173 return f"<Cookies[{cookies_repr}]>" 

1174 

1175 class _CookieCompatRequest(urllib.request.Request): 

1176 """ 

1177 Wraps a `Request` instance up in a compatibility interface suitable 

1178 for use with `CookieJar` operations. 

1179 """ 

1180 

1181 def __init__(self, request: Request) -> None: 

1182 super().__init__( 

1183 url=str(request.url), 

1184 headers=dict(request.headers), 

1185 method=request.method, 

1186 ) 

1187 self.request = request 

1188 

1189 def add_unredirected_header(self, key: str, value: str) -> None: 

1190 super().add_unredirected_header(key, value) 

1191 self.request.headers[key] = value 

1192 

1193 class _CookieCompatResponse: 

1194 """ 

1195 Wraps a `Request` instance up in a compatibility interface suitable 

1196 for use with `CookieJar` operations. 

1197 """ 

1198 

1199 def __init__(self, response: Response): 

1200 self.response = response 

1201 

1202 def info(self) -> email.message.Message: 

1203 info = email.message.Message() 

1204 for key, value in self.response.headers.multi_items(): 

1205 # Note that setting `info[key]` here is an "append" operation, 

1206 # not a "replace" operation. 

1207 # https://docs.python.org/3/library/email.compat32-message.html#email.message.Message.__setitem__ 

1208 info[key] = value 

1209 return info