Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/httpx/_models.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import codecs
4import datetime
5import email.message
6import json as jsonlib
7import re
8import typing
9import urllib.request
10from collections.abc import Mapping
11from http.cookiejar import Cookie, CookieJar
13from ._content import ByteStream, UnattachedStream, encode_request, encode_response
14from ._decoders import (
15 SUPPORTED_DECODERS,
16 ByteChunker,
17 ContentDecoder,
18 IdentityDecoder,
19 LineDecoder,
20 MultiDecoder,
21 TextChunker,
22 TextDecoder,
23)
24from ._exceptions import (
25 CookieConflict,
26 HTTPStatusError,
27 RequestNotRead,
28 ResponseNotRead,
29 StreamClosed,
30 StreamConsumed,
31 request_context,
32)
33from ._multipart import get_multipart_boundary_from_content_type
34from ._status_codes import codes
35from ._types import (
36 AsyncByteStream,
37 CookieTypes,
38 HeaderTypes,
39 QueryParamTypes,
40 RequestContent,
41 RequestData,
42 RequestExtensions,
43 RequestFiles,
44 ResponseContent,
45 ResponseExtensions,
46 SyncByteStream,
47)
48from ._urls import URL
49from ._utils import to_bytes_or_str, to_str
51__all__ = ["Cookies", "Headers", "Request", "Response"]
53SENSITIVE_HEADERS = {"authorization", "proxy-authorization"}
56def _is_known_encoding(encoding: str) -> bool:
57 """
58 Return `True` if `encoding` is a known codec.
59 """
60 try:
61 codecs.lookup(encoding)
62 except LookupError:
63 return False
64 return True
67def _normalize_header_key(key: str | bytes, encoding: str | None = None) -> bytes:
68 """
69 Coerce str/bytes into a strictly byte-wise HTTP header key.
70 """
71 return key if isinstance(key, bytes) else key.encode(encoding or "ascii")
74def _normalize_header_value(value: str | bytes, encoding: str | None = None) -> bytes:
75 """
76 Coerce str/bytes into a strictly byte-wise HTTP header value.
77 """
78 if isinstance(value, bytes):
79 return value
80 if not isinstance(value, str):
81 raise TypeError(f"Header value must be str or bytes, not {type(value)}")
82 return value.encode(encoding or "ascii")
85def _parse_content_type_charset(content_type: str) -> str | None:
86 # We used to use `cgi.parse_header()` here, but `cgi` became a dead battery.
87 # See: https://peps.python.org/pep-0594/#cgi
88 msg = email.message.Message()
89 msg["content-type"] = content_type
90 return msg.get_content_charset(failobj=None)
93def _parse_header_links(value: str) -> list[dict[str, str]]:
94 """
95 Returns a list of parsed link headers, for more info see:
96 https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link
97 The generic syntax of those is:
98 Link: < uri-reference >; param1=value1; param2="value2"
99 So for instance:
100 Link; '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;'
101 would return
102 [
103 {"url": "http:/.../front.jpeg", "type": "image/jpeg"},
104 {"url": "http://.../back.jpeg"},
105 ]
106 :param value: HTTP Link entity-header field
107 :return: list of parsed link headers
108 """
109 links: list[dict[str, str]] = []
110 replace_chars = " '\""
111 value = value.strip(replace_chars)
112 if not value:
113 return links
114 for val in re.split(", *<", value):
115 try:
116 url, params = val.split(";", 1)
117 except ValueError:
118 url, params = val, ""
119 link = {"url": url.strip("<> '\"")}
120 for param in params.split(";"):
121 try:
122 key, value = param.split("=")
123 except ValueError:
124 break
125 link[key.strip(replace_chars)] = value.strip(replace_chars)
126 links.append(link)
127 return links
130def _obfuscate_sensitive_headers(
131 items: typing.Iterable[tuple[typing.AnyStr, typing.AnyStr]],
132) -> typing.Iterator[tuple[typing.AnyStr, typing.AnyStr]]:
133 for k, v in items:
134 if to_str(k.lower()) in SENSITIVE_HEADERS:
135 v = to_bytes_or_str("[secure]", match_type_of=v)
136 yield k, v
139class Headers(typing.MutableMapping[str, str]):
140 """
141 HTTP headers, as a case-insensitive multi-dict.
142 """
144 def __init__(
145 self,
146 headers: HeaderTypes | None = None,
147 encoding: str | None = None,
148 ) -> None:
149 self._list = [] # type: typing.List[typing.Tuple[bytes, bytes, bytes]]
151 if isinstance(headers, Headers):
152 self._list = list(headers._list)
153 elif isinstance(headers, Mapping):
154 for k, v in headers.items():
155 bytes_key = _normalize_header_key(k, encoding)
156 bytes_value = _normalize_header_value(v, encoding)
157 self._list.append((bytes_key, bytes_key.lower(), bytes_value))
158 elif headers is not None:
159 for k, v in headers:
160 bytes_key = _normalize_header_key(k, encoding)
161 bytes_value = _normalize_header_value(v, encoding)
162 self._list.append((bytes_key, bytes_key.lower(), bytes_value))
164 self._encoding = encoding
166 @property
167 def encoding(self) -> str:
168 """
169 Header encoding is mandated as ascii, but we allow fallbacks to utf-8
170 or iso-8859-1.
171 """
172 if self._encoding is None:
173 for encoding in ["ascii", "utf-8"]:
174 for key, value in self.raw:
175 try:
176 key.decode(encoding)
177 value.decode(encoding)
178 except UnicodeDecodeError:
179 break
180 else:
181 # The else block runs if 'break' did not occur, meaning
182 # all values fitted the encoding.
183 self._encoding = encoding
184 break
185 else:
186 # The ISO-8859-1 encoding covers all 256 code points in a byte,
187 # so will never raise decode errors.
188 self._encoding = "iso-8859-1"
189 return self._encoding
191 @encoding.setter
192 def encoding(self, value: str) -> None:
193 self._encoding = value
195 @property
196 def raw(self) -> list[tuple[bytes, bytes]]:
197 """
198 Returns a list of the raw header items, as byte pairs.
199 """
200 return [(raw_key, value) for raw_key, _, value in self._list]
202 def keys(self) -> typing.KeysView[str]:
203 return {key.decode(self.encoding): None for _, key, value in self._list}.keys()
205 def values(self) -> typing.ValuesView[str]:
206 values_dict: dict[str, str] = {}
207 for _, key, value in self._list:
208 str_key = key.decode(self.encoding)
209 str_value = value.decode(self.encoding)
210 if str_key in values_dict:
211 values_dict[str_key] += f", {str_value}"
212 else:
213 values_dict[str_key] = str_value
214 return values_dict.values()
216 def items(self) -> typing.ItemsView[str, str]:
217 """
218 Return `(key, value)` items of headers. Concatenate headers
219 into a single comma separated value when a key occurs multiple times.
220 """
221 values_dict: dict[str, str] = {}
222 for _, key, value in self._list:
223 str_key = key.decode(self.encoding)
224 str_value = value.decode(self.encoding)
225 if str_key in values_dict:
226 values_dict[str_key] += f", {str_value}"
227 else:
228 values_dict[str_key] = str_value
229 return values_dict.items()
231 def multi_items(self) -> list[tuple[str, str]]:
232 """
233 Return a list of `(key, value)` pairs of headers. Allow multiple
234 occurrences of the same key without concatenating into a single
235 comma separated value.
236 """
237 return [
238 (key.decode(self.encoding), value.decode(self.encoding))
239 for _, key, value in self._list
240 ]
242 def get(self, key: str, default: typing.Any = None) -> typing.Any:
243 """
244 Return a header value. If multiple occurrences of the header occur
245 then concatenate them together with commas.
246 """
247 try:
248 return self[key]
249 except KeyError:
250 return default
252 def get_list(self, key: str, split_commas: bool = False) -> list[str]:
253 """
254 Return a list of all header values for a given key.
255 If `split_commas=True` is passed, then any comma separated header
256 values are split into multiple return strings.
257 """
258 get_header_key = key.lower().encode(self.encoding)
260 values = [
261 item_value.decode(self.encoding)
262 for _, item_key, item_value in self._list
263 if item_key.lower() == get_header_key
264 ]
266 if not split_commas:
267 return values
269 split_values = []
270 for value in values:
271 split_values.extend([item.strip() for item in value.split(",")])
272 return split_values
274 def update(self, headers: HeaderTypes | None = None) -> None: # type: ignore
275 headers = Headers(headers)
276 for key in headers.keys():
277 if key in self:
278 self.pop(key)
279 self._list.extend(headers._list)
281 def copy(self) -> Headers:
282 return Headers(self, encoding=self.encoding)
284 def __getitem__(self, key: str) -> str:
285 """
286 Return a single header value.
288 If there are multiple headers with the same key, then we concatenate
289 them with commas. See: https://tools.ietf.org/html/rfc7230#section-3.2.2
290 """
291 normalized_key = key.lower().encode(self.encoding)
293 items = [
294 header_value.decode(self.encoding)
295 for _, header_key, header_value in self._list
296 if header_key == normalized_key
297 ]
299 if items:
300 return ", ".join(items)
302 raise KeyError(key)
304 def __setitem__(self, key: str, value: str) -> None:
305 """
306 Set the header `key` to `value`, removing any duplicate entries.
307 Retains insertion order.
308 """
309 set_key = key.encode(self._encoding or "utf-8")
310 set_value = value.encode(self._encoding or "utf-8")
311 lookup_key = set_key.lower()
313 found_indexes = [
314 idx
315 for idx, (_, item_key, _) in enumerate(self._list)
316 if item_key == lookup_key
317 ]
319 for idx in reversed(found_indexes[1:]):
320 del self._list[idx]
322 if found_indexes:
323 idx = found_indexes[0]
324 self._list[idx] = (set_key, lookup_key, set_value)
325 else:
326 self._list.append((set_key, lookup_key, set_value))
328 def __delitem__(self, key: str) -> None:
329 """
330 Remove the header `key`.
331 """
332 del_key = key.lower().encode(self.encoding)
334 pop_indexes = [
335 idx
336 for idx, (_, item_key, _) in enumerate(self._list)
337 if item_key.lower() == del_key
338 ]
340 if not pop_indexes:
341 raise KeyError(key)
343 for idx in reversed(pop_indexes):
344 del self._list[idx]
346 def __contains__(self, key: typing.Any) -> bool:
347 header_key = key.lower().encode(self.encoding)
348 return header_key in [key for _, key, _ in self._list]
350 def __iter__(self) -> typing.Iterator[typing.Any]:
351 return iter(self.keys())
353 def __len__(self) -> int:
354 return len(self._list)
356 def __eq__(self, other: typing.Any) -> bool:
357 try:
358 other_headers = Headers(other)
359 except ValueError:
360 return False
362 self_list = [(key, value) for _, key, value in self._list]
363 other_list = [(key, value) for _, key, value in other_headers._list]
364 return sorted(self_list) == sorted(other_list)
366 def __repr__(self) -> str:
367 class_name = self.__class__.__name__
369 encoding_str = ""
370 if self.encoding != "ascii":
371 encoding_str = f", encoding={self.encoding!r}"
373 as_list = list(_obfuscate_sensitive_headers(self.multi_items()))
374 as_dict = dict(as_list)
376 no_duplicate_keys = len(as_dict) == len(as_list)
377 if no_duplicate_keys:
378 return f"{class_name}({as_dict!r}{encoding_str})"
379 return f"{class_name}({as_list!r}{encoding_str})"
382class Request:
383 def __init__(
384 self,
385 method: str,
386 url: URL | str,
387 *,
388 params: QueryParamTypes | None = None,
389 headers: HeaderTypes | None = None,
390 cookies: CookieTypes | None = None,
391 content: RequestContent | None = None,
392 data: RequestData | None = None,
393 files: RequestFiles | None = None,
394 json: typing.Any | None = None,
395 stream: SyncByteStream | AsyncByteStream | None = None,
396 extensions: RequestExtensions | None = None,
397 ) -> None:
398 self.method = method.upper()
399 self.url = URL(url) if params is None else URL(url, params=params)
400 self.headers = Headers(headers)
401 self.extensions = {} if extensions is None else dict(extensions)
403 if cookies:
404 Cookies(cookies).set_cookie_header(self)
406 if stream is None:
407 content_type: str | None = self.headers.get("content-type")
408 headers, stream = encode_request(
409 content=content,
410 data=data,
411 files=files,
412 json=json,
413 boundary=get_multipart_boundary_from_content_type(
414 content_type=content_type.encode(self.headers.encoding)
415 if content_type
416 else None
417 ),
418 )
419 self._prepare(headers)
420 self.stream = stream
421 # Load the request body, except for streaming content.
422 if isinstance(stream, ByteStream):
423 self.read()
424 else:
425 # There's an important distinction between `Request(content=...)`,
426 # and `Request(stream=...)`.
427 #
428 # Using `content=...` implies automatically populated `Host` and content
429 # headers, of either `Content-Length: ...` or `Transfer-Encoding: chunked`.
430 #
431 # Using `stream=...` will not automatically include *any*
432 # auto-populated headers.
433 #
434 # As an end-user you don't really need `stream=...`. It's only
435 # useful when:
436 #
437 # * Preserving the request stream when copying requests, eg for redirects.
438 # * Creating request instances on the *server-side* of the transport API.
439 self.stream = stream
441 def _prepare(self, default_headers: dict[str, str]) -> None:
442 for key, value in default_headers.items():
443 # Ignore Transfer-Encoding if the Content-Length has been set explicitly.
444 if key.lower() == "transfer-encoding" and "Content-Length" in self.headers:
445 continue
446 self.headers.setdefault(key, value)
448 auto_headers: list[tuple[bytes, bytes]] = []
450 has_host = "Host" in self.headers
451 has_content_length = (
452 "Content-Length" in self.headers or "Transfer-Encoding" in self.headers
453 )
455 if not has_host and self.url.host:
456 auto_headers.append((b"Host", self.url.netloc))
457 if not has_content_length and self.method in ("POST", "PUT", "PATCH"):
458 auto_headers.append((b"Content-Length", b"0"))
460 self.headers = Headers(auto_headers + self.headers.raw)
462 @property
463 def content(self) -> bytes:
464 if not hasattr(self, "_content"):
465 raise RequestNotRead()
466 return self._content
468 def read(self) -> bytes:
469 """
470 Read and return the request content.
471 """
472 if not hasattr(self, "_content"):
473 assert isinstance(self.stream, typing.Iterable)
474 self._content = b"".join(self.stream)
475 if not isinstance(self.stream, ByteStream):
476 # If a streaming request has been read entirely into memory, then
477 # we can replace the stream with a raw bytes implementation,
478 # to ensure that any non-replayable streams can still be used.
479 self.stream = ByteStream(self._content)
480 return self._content
482 async def aread(self) -> bytes:
483 """
484 Read and return the request content.
485 """
486 if not hasattr(self, "_content"):
487 assert isinstance(self.stream, typing.AsyncIterable)
488 self._content = b"".join([part async for part in self.stream])
489 if not isinstance(self.stream, ByteStream):
490 # If a streaming request has been read entirely into memory, then
491 # we can replace the stream with a raw bytes implementation,
492 # to ensure that any non-replayable streams can still be used.
493 self.stream = ByteStream(self._content)
494 return self._content
496 def __repr__(self) -> str:
497 class_name = self.__class__.__name__
498 url = str(self.url)
499 return f"<{class_name}({self.method!r}, {url!r})>"
501 def __getstate__(self) -> dict[str, typing.Any]:
502 return {
503 name: value
504 for name, value in self.__dict__.items()
505 if name not in ["extensions", "stream"]
506 }
508 def __setstate__(self, state: dict[str, typing.Any]) -> None:
509 for name, value in state.items():
510 setattr(self, name, value)
511 self.extensions = {}
512 self.stream = UnattachedStream()
515class Response:
516 def __init__(
517 self,
518 status_code: int,
519 *,
520 headers: HeaderTypes | None = None,
521 content: ResponseContent | None = None,
522 text: str | None = None,
523 html: str | None = None,
524 json: typing.Any = None,
525 stream: SyncByteStream | AsyncByteStream | None = None,
526 request: Request | None = None,
527 extensions: ResponseExtensions | None = None,
528 history: list[Response] | None = None,
529 default_encoding: str | typing.Callable[[bytes], str] = "utf-8",
530 ) -> None:
531 self.status_code = status_code
532 self.headers = Headers(headers)
534 self._request: Request | None = request
536 # When follow_redirects=False and a redirect is received,
537 # the client will set `response.next_request`.
538 self.next_request: Request | None = None
540 self.extensions = {} if extensions is None else dict(extensions)
541 self.history = [] if history is None else list(history)
543 self.is_closed = False
544 self.is_stream_consumed = False
546 self.default_encoding = default_encoding
548 if stream is None:
549 headers, stream = encode_response(content, text, html, json)
550 self._prepare(headers)
551 self.stream = stream
552 if isinstance(stream, ByteStream):
553 # Load the response body, except for streaming content.
554 self.read()
555 else:
556 # There's an important distinction between `Response(content=...)`,
557 # and `Response(stream=...)`.
558 #
559 # Using `content=...` implies automatically populated content headers,
560 # of either `Content-Length: ...` or `Transfer-Encoding: chunked`.
561 #
562 # Using `stream=...` will not automatically include any content headers.
563 #
564 # As an end-user you don't really need `stream=...`. It's only
565 # useful when creating response instances having received a stream
566 # from the transport API.
567 self.stream = stream
569 self._num_bytes_downloaded = 0
571 def _prepare(self, default_headers: dict[str, str]) -> None:
572 for key, value in default_headers.items():
573 # Ignore Transfer-Encoding if the Content-Length has been set explicitly.
574 if key.lower() == "transfer-encoding" and "content-length" in self.headers:
575 continue
576 self.headers.setdefault(key, value)
578 @property
579 def elapsed(self) -> datetime.timedelta:
580 """
581 Returns the time taken for the complete request/response
582 cycle to complete.
583 """
584 if not hasattr(self, "_elapsed"):
585 raise RuntimeError(
586 "'.elapsed' may only be accessed after the response "
587 "has been read or closed."
588 )
589 return self._elapsed
591 @elapsed.setter
592 def elapsed(self, elapsed: datetime.timedelta) -> None:
593 self._elapsed = elapsed
595 @property
596 def request(self) -> Request:
597 """
598 Returns the request instance associated to the current response.
599 """
600 if self._request is None:
601 raise RuntimeError(
602 "The request instance has not been set on this response."
603 )
604 return self._request
606 @request.setter
607 def request(self, value: Request) -> None:
608 self._request = value
610 @property
611 def http_version(self) -> str:
612 try:
613 http_version: bytes = self.extensions["http_version"]
614 except KeyError:
615 return "HTTP/1.1"
616 else:
617 return http_version.decode("ascii", errors="ignore")
619 @property
620 def reason_phrase(self) -> str:
621 try:
622 reason_phrase: bytes = self.extensions["reason_phrase"]
623 except KeyError:
624 return codes.get_reason_phrase(self.status_code)
625 else:
626 return reason_phrase.decode("ascii", errors="ignore")
628 @property
629 def url(self) -> URL:
630 """
631 Returns the URL for which the request was made.
632 """
633 return self.request.url
635 @property
636 def content(self) -> bytes:
637 if not hasattr(self, "_content"):
638 raise ResponseNotRead()
639 return self._content
641 @property
642 def text(self) -> str:
643 if not hasattr(self, "_text"):
644 content = self.content
645 if not content:
646 self._text = ""
647 else:
648 decoder = TextDecoder(encoding=self.encoding or "utf-8")
649 self._text = "".join([decoder.decode(self.content), decoder.flush()])
650 return self._text
652 @property
653 def encoding(self) -> str | None:
654 """
655 Return an encoding to use for decoding the byte content into text.
656 The priority for determining this is given by...
658 * `.encoding = <>` has been set explicitly.
659 * The encoding as specified by the charset parameter in the Content-Type header.
660 * The encoding as determined by `default_encoding`, which may either be
661 a string like "utf-8" indicating the encoding to use, or may be a callable
662 which enables charset autodetection.
663 """
664 if not hasattr(self, "_encoding"):
665 encoding = self.charset_encoding
666 if encoding is None or not _is_known_encoding(encoding):
667 if isinstance(self.default_encoding, str):
668 encoding = self.default_encoding
669 elif hasattr(self, "_content"):
670 encoding = self.default_encoding(self._content)
671 self._encoding = encoding or "utf-8"
672 return self._encoding
674 @encoding.setter
675 def encoding(self, value: str) -> None:
676 """
677 Set the encoding to use for decoding the byte content into text.
679 If the `text` attribute has been accessed, attempting to set the
680 encoding will throw a ValueError.
681 """
682 if hasattr(self, "_text"):
683 raise ValueError(
684 "Setting encoding after `text` has been accessed is not allowed."
685 )
686 self._encoding = value
688 @property
689 def charset_encoding(self) -> str | None:
690 """
691 Return the encoding, as specified by the Content-Type header.
692 """
693 content_type = self.headers.get("Content-Type")
694 if content_type is None:
695 return None
697 return _parse_content_type_charset(content_type)
699 def _get_content_decoder(self) -> ContentDecoder:
700 """
701 Returns a decoder instance which can be used to decode the raw byte
702 content, depending on the Content-Encoding used in the response.
703 """
704 if not hasattr(self, "_decoder"):
705 decoders: list[ContentDecoder] = []
706 values = self.headers.get_list("content-encoding", split_commas=True)
707 for value in values:
708 value = value.strip().lower()
709 try:
710 decoder_cls = SUPPORTED_DECODERS[value]
711 decoders.append(decoder_cls())
712 except KeyError:
713 continue
715 if len(decoders) == 1:
716 self._decoder = decoders[0]
717 elif len(decoders) > 1:
718 self._decoder = MultiDecoder(children=decoders)
719 else:
720 self._decoder = IdentityDecoder()
722 return self._decoder
724 @property
725 def is_informational(self) -> bool:
726 """
727 A property which is `True` for 1xx status codes, `False` otherwise.
728 """
729 return codes.is_informational(self.status_code)
731 @property
732 def is_success(self) -> bool:
733 """
734 A property which is `True` for 2xx status codes, `False` otherwise.
735 """
736 return codes.is_success(self.status_code)
738 @property
739 def is_redirect(self) -> bool:
740 """
741 A property which is `True` for 3xx status codes, `False` otherwise.
743 Note that not all responses with a 3xx status code indicate a URL redirect.
745 Use `response.has_redirect_location` to determine responses with a properly
746 formed URL redirection.
747 """
748 return codes.is_redirect(self.status_code)
750 @property
751 def is_client_error(self) -> bool:
752 """
753 A property which is `True` for 4xx status codes, `False` otherwise.
754 """
755 return codes.is_client_error(self.status_code)
757 @property
758 def is_server_error(self) -> bool:
759 """
760 A property which is `True` for 5xx status codes, `False` otherwise.
761 """
762 return codes.is_server_error(self.status_code)
764 @property
765 def is_error(self) -> bool:
766 """
767 A property which is `True` for 4xx and 5xx status codes, `False` otherwise.
768 """
769 return codes.is_error(self.status_code)
771 @property
772 def has_redirect_location(self) -> bool:
773 """
774 Returns True for 3xx responses with a properly formed URL redirection,
775 `False` otherwise.
776 """
777 return (
778 self.status_code
779 in (
780 # 301 (Cacheable redirect. Method may change to GET.)
781 codes.MOVED_PERMANENTLY,
782 # 302 (Uncacheable redirect. Method may change to GET.)
783 codes.FOUND,
784 # 303 (Client should make a GET or HEAD request.)
785 codes.SEE_OTHER,
786 # 307 (Equiv. 302, but retain method)
787 codes.TEMPORARY_REDIRECT,
788 # 308 (Equiv. 301, but retain method)
789 codes.PERMANENT_REDIRECT,
790 )
791 and "Location" in self.headers
792 )
794 def raise_for_status(self) -> Response:
795 """
796 Raise the `HTTPStatusError` if one occurred.
797 """
798 request = self._request
799 if request is None:
800 raise RuntimeError(
801 "Cannot call `raise_for_status` as the request "
802 "instance has not been set on this response."
803 )
805 if self.is_success:
806 return self
808 if self.has_redirect_location:
809 message = (
810 "{error_type} '{0.status_code} {0.reason_phrase}' for url '{0.url}'\n"
811 "Redirect location: '{0.headers[location]}'\n"
812 "For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/{0.status_code}"
813 )
814 else:
815 message = (
816 "{error_type} '{0.status_code} {0.reason_phrase}' for url '{0.url}'\n"
817 "For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/{0.status_code}"
818 )
820 status_class = self.status_code // 100
821 error_types = {
822 1: "Informational response",
823 3: "Redirect response",
824 4: "Client error",
825 5: "Server error",
826 }
827 error_type = error_types.get(status_class, "Invalid status code")
828 message = message.format(self, error_type=error_type)
829 raise HTTPStatusError(message, request=request, response=self)
831 def json(self, **kwargs: typing.Any) -> typing.Any:
832 return jsonlib.loads(self.content, **kwargs)
834 @property
835 def cookies(self) -> Cookies:
836 if not hasattr(self, "_cookies"):
837 self._cookies = Cookies()
838 self._cookies.extract_cookies(self)
839 return self._cookies
841 @property
842 def links(self) -> dict[str | None, dict[str, str]]:
843 """
844 Returns the parsed header links of the response, if any
845 """
846 header = self.headers.get("link")
847 if header is None:
848 return {}
850 return {
851 (link.get("rel") or link.get("url")): link
852 for link in _parse_header_links(header)
853 }
855 @property
856 def num_bytes_downloaded(self) -> int:
857 return self._num_bytes_downloaded
859 def __repr__(self) -> str:
860 return f"<Response [{self.status_code} {self.reason_phrase}]>"
862 def __getstate__(self) -> dict[str, typing.Any]:
863 return {
864 name: value
865 for name, value in self.__dict__.items()
866 if name not in ["extensions", "stream", "is_closed", "_decoder"]
867 }
869 def __setstate__(self, state: dict[str, typing.Any]) -> None:
870 for name, value in state.items():
871 setattr(self, name, value)
872 self.is_closed = True
873 self.extensions = {}
874 self.stream = UnattachedStream()
876 def read(self) -> bytes:
877 """
878 Read and return the response content.
879 """
880 if not hasattr(self, "_content"):
881 self._content = b"".join(self.iter_bytes())
882 return self._content
884 def iter_bytes(self, chunk_size: int | None = None) -> typing.Iterator[bytes]:
885 """
886 A byte-iterator over the decoded response content.
887 This allows us to handle gzip, deflate, brotli, and zstd encoded responses.
888 """
889 if hasattr(self, "_content"):
890 chunk_size = len(self._content) if chunk_size is None else chunk_size
891 for i in range(0, len(self._content), max(chunk_size, 1)):
892 yield self._content[i : i + chunk_size]
893 else:
894 decoder = self._get_content_decoder()
895 chunker = ByteChunker(chunk_size=chunk_size)
896 with request_context(request=self._request):
897 for raw_bytes in self.iter_raw():
898 decoded = decoder.decode(raw_bytes)
899 for chunk in chunker.decode(decoded):
900 yield chunk
901 decoded = decoder.flush()
902 for chunk in chunker.decode(decoded):
903 yield chunk # pragma: no cover
904 for chunk in chunker.flush():
905 yield chunk
907 def iter_text(self, chunk_size: int | None = None) -> typing.Iterator[str]:
908 """
909 A str-iterator over the decoded response content
910 that handles both gzip, deflate, etc but also detects the content's
911 string encoding.
912 """
913 decoder = TextDecoder(encoding=self.encoding or "utf-8")
914 chunker = TextChunker(chunk_size=chunk_size)
915 with request_context(request=self._request):
916 for byte_content in self.iter_bytes():
917 text_content = decoder.decode(byte_content)
918 for chunk in chunker.decode(text_content):
919 yield chunk
920 text_content = decoder.flush()
921 for chunk in chunker.decode(text_content):
922 yield chunk # pragma: no cover
923 for chunk in chunker.flush():
924 yield chunk
926 def iter_lines(self) -> typing.Iterator[str]:
927 decoder = LineDecoder()
928 with request_context(request=self._request):
929 for text in self.iter_text():
930 for line in decoder.decode(text):
931 yield line
932 for line in decoder.flush():
933 yield line
935 def iter_raw(self, chunk_size: int | None = None) -> typing.Iterator[bytes]:
936 """
937 A byte-iterator over the raw response content.
938 """
939 if self.is_stream_consumed:
940 raise StreamConsumed()
941 if self.is_closed:
942 raise StreamClosed()
943 if not isinstance(self.stream, SyncByteStream):
944 raise RuntimeError("Attempted to call a sync iterator on an async stream.")
946 self.is_stream_consumed = True
947 self._num_bytes_downloaded = 0
948 chunker = ByteChunker(chunk_size=chunk_size)
950 with request_context(request=self._request):
951 for raw_stream_bytes in self.stream:
952 self._num_bytes_downloaded += len(raw_stream_bytes)
953 for chunk in chunker.decode(raw_stream_bytes):
954 yield chunk
956 for chunk in chunker.flush():
957 yield chunk
959 self.close()
961 def close(self) -> None:
962 """
963 Close the response and release the connection.
964 Automatically called if the response body is read to completion.
965 """
966 if not isinstance(self.stream, SyncByteStream):
967 raise RuntimeError("Attempted to call an sync close on an async stream.")
969 if not self.is_closed:
970 self.is_closed = True
971 with request_context(request=self._request):
972 self.stream.close()
974 async def aread(self) -> bytes:
975 """
976 Read and return the response content.
977 """
978 if not hasattr(self, "_content"):
979 self._content = b"".join([part async for part in self.aiter_bytes()])
980 return self._content
982 async def aiter_bytes(
983 self, chunk_size: int | None = None
984 ) -> typing.AsyncIterator[bytes]:
985 """
986 A byte-iterator over the decoded response content.
987 This allows us to handle gzip, deflate, brotli, and zstd encoded responses.
988 """
989 if hasattr(self, "_content"):
990 chunk_size = len(self._content) if chunk_size is None else chunk_size
991 for i in range(0, len(self._content), max(chunk_size, 1)):
992 yield self._content[i : i + chunk_size]
993 else:
994 decoder = self._get_content_decoder()
995 chunker = ByteChunker(chunk_size=chunk_size)
996 with request_context(request=self._request):
997 async for raw_bytes in self.aiter_raw():
998 decoded = decoder.decode(raw_bytes)
999 for chunk in chunker.decode(decoded):
1000 yield chunk
1001 decoded = decoder.flush()
1002 for chunk in chunker.decode(decoded):
1003 yield chunk # pragma: no cover
1004 for chunk in chunker.flush():
1005 yield chunk
1007 async def aiter_text(
1008 self, chunk_size: int | None = None
1009 ) -> typing.AsyncIterator[str]:
1010 """
1011 A str-iterator over the decoded response content
1012 that handles both gzip, deflate, etc but also detects the content's
1013 string encoding.
1014 """
1015 decoder = TextDecoder(encoding=self.encoding or "utf-8")
1016 chunker = TextChunker(chunk_size=chunk_size)
1017 with request_context(request=self._request):
1018 async for byte_content in self.aiter_bytes():
1019 text_content = decoder.decode(byte_content)
1020 for chunk in chunker.decode(text_content):
1021 yield chunk
1022 text_content = decoder.flush()
1023 for chunk in chunker.decode(text_content):
1024 yield chunk # pragma: no cover
1025 for chunk in chunker.flush():
1026 yield chunk
1028 async def aiter_lines(self) -> typing.AsyncIterator[str]:
1029 decoder = LineDecoder()
1030 with request_context(request=self._request):
1031 async for text in self.aiter_text():
1032 for line in decoder.decode(text):
1033 yield line
1034 for line in decoder.flush():
1035 yield line
1037 async def aiter_raw(
1038 self, chunk_size: int | None = None
1039 ) -> typing.AsyncIterator[bytes]:
1040 """
1041 A byte-iterator over the raw response content.
1042 """
1043 if self.is_stream_consumed:
1044 raise StreamConsumed()
1045 if self.is_closed:
1046 raise StreamClosed()
1047 if not isinstance(self.stream, AsyncByteStream):
1048 raise RuntimeError("Attempted to call an async iterator on an sync stream.")
1050 self.is_stream_consumed = True
1051 self._num_bytes_downloaded = 0
1052 chunker = ByteChunker(chunk_size=chunk_size)
1054 with request_context(request=self._request):
1055 async for raw_stream_bytes in self.stream:
1056 self._num_bytes_downloaded += len(raw_stream_bytes)
1057 for chunk in chunker.decode(raw_stream_bytes):
1058 yield chunk
1060 for chunk in chunker.flush():
1061 yield chunk
1063 await self.aclose()
1065 async def aclose(self) -> None:
1066 """
1067 Close the response and release the connection.
1068 Automatically called if the response body is read to completion.
1069 """
1070 if not isinstance(self.stream, AsyncByteStream):
1071 raise RuntimeError("Attempted to call an async close on an sync stream.")
1073 if not self.is_closed:
1074 self.is_closed = True
1075 with request_context(request=self._request):
1076 await self.stream.aclose()
1079class Cookies(typing.MutableMapping[str, str]):
1080 """
1081 HTTP Cookies, as a mutable mapping.
1082 """
1084 def __init__(self, cookies: CookieTypes | None = None) -> None:
1085 if cookies is None or isinstance(cookies, dict):
1086 self.jar = CookieJar()
1087 if isinstance(cookies, dict):
1088 for key, value in cookies.items():
1089 self.set(key, value)
1090 elif isinstance(cookies, list):
1091 self.jar = CookieJar()
1092 for key, value in cookies:
1093 self.set(key, value)
1094 elif isinstance(cookies, Cookies):
1095 self.jar = CookieJar()
1096 for cookie in cookies.jar:
1097 self.jar.set_cookie(cookie)
1098 else:
1099 self.jar = cookies
1101 def extract_cookies(self, response: Response) -> None:
1102 """
1103 Loads any cookies based on the response `Set-Cookie` headers.
1104 """
1105 urllib_response = self._CookieCompatResponse(response)
1106 urllib_request = self._CookieCompatRequest(response.request)
1108 self.jar.extract_cookies(urllib_response, urllib_request) # type: ignore
1110 def set_cookie_header(self, request: Request) -> None:
1111 """
1112 Sets an appropriate 'Cookie:' HTTP header on the `Request`.
1113 """
1114 urllib_request = self._CookieCompatRequest(request)
1115 self.jar.add_cookie_header(urllib_request)
1117 def set(self, name: str, value: str, domain: str = "", path: str = "/") -> None:
1118 """
1119 Set a cookie value by name. May optionally include domain and path.
1120 """
1121 kwargs = {
1122 "version": 0,
1123 "name": name,
1124 "value": value,
1125 "port": None,
1126 "port_specified": False,
1127 "domain": domain,
1128 "domain_specified": bool(domain),
1129 "domain_initial_dot": domain.startswith("."),
1130 "path": path,
1131 "path_specified": bool(path),
1132 "secure": False,
1133 "expires": None,
1134 "discard": True,
1135 "comment": None,
1136 "comment_url": None,
1137 "rest": {"HttpOnly": None},
1138 "rfc2109": False,
1139 }
1140 cookie = Cookie(**kwargs) # type: ignore
1141 self.jar.set_cookie(cookie)
1143 def get( # type: ignore
1144 self,
1145 name: str,
1146 default: str | None = None,
1147 domain: str | None = None,
1148 path: str | None = None,
1149 ) -> str | None:
1150 """
1151 Get a cookie by name. May optionally include domain and path
1152 in order to specify exactly which cookie to retrieve.
1153 """
1154 value = None
1155 for cookie in self.jar:
1156 if cookie.name == name:
1157 if domain is None or cookie.domain == domain:
1158 if path is None or cookie.path == path:
1159 if value is not None:
1160 message = f"Multiple cookies exist with name={name}"
1161 raise CookieConflict(message)
1162 value = cookie.value
1164 if value is None:
1165 return default
1166 return value
1168 def delete(
1169 self,
1170 name: str,
1171 domain: str | None = None,
1172 path: str | None = None,
1173 ) -> None:
1174 """
1175 Delete a cookie by name. May optionally include domain and path
1176 in order to specify exactly which cookie to delete.
1177 """
1178 if domain is not None and path is not None:
1179 return self.jar.clear(domain, path, name)
1181 remove = [
1182 cookie
1183 for cookie in self.jar
1184 if cookie.name == name
1185 and (domain is None or cookie.domain == domain)
1186 and (path is None or cookie.path == path)
1187 ]
1189 for cookie in remove:
1190 self.jar.clear(cookie.domain, cookie.path, cookie.name)
1192 def clear(self, domain: str | None = None, path: str | None = None) -> None:
1193 """
1194 Delete all cookies. Optionally include a domain and path in
1195 order to only delete a subset of all the cookies.
1196 """
1197 args = []
1198 if domain is not None:
1199 args.append(domain)
1200 if path is not None:
1201 assert domain is not None
1202 args.append(path)
1203 self.jar.clear(*args)
1205 def update(self, cookies: CookieTypes | None = None) -> None: # type: ignore
1206 cookies = Cookies(cookies)
1207 for cookie in cookies.jar:
1208 self.jar.set_cookie(cookie)
1210 def __setitem__(self, name: str, value: str) -> None:
1211 return self.set(name, value)
1213 def __getitem__(self, name: str) -> str:
1214 value = self.get(name)
1215 if value is None:
1216 raise KeyError(name)
1217 return value
1219 def __delitem__(self, name: str) -> None:
1220 return self.delete(name)
1222 def __len__(self) -> int:
1223 return len(self.jar)
1225 def __iter__(self) -> typing.Iterator[str]:
1226 return (cookie.name for cookie in self.jar)
1228 def __bool__(self) -> bool:
1229 for _ in self.jar:
1230 return True
1231 return False
1233 def __repr__(self) -> str:
1234 cookies_repr = ", ".join(
1235 [
1236 f"<Cookie {cookie.name}={cookie.value} for {cookie.domain} />"
1237 for cookie in self.jar
1238 ]
1239 )
1241 return f"<Cookies[{cookies_repr}]>"
1243 class _CookieCompatRequest(urllib.request.Request):
1244 """
1245 Wraps a `Request` instance up in a compatibility interface suitable
1246 for use with `CookieJar` operations.
1247 """
1249 def __init__(self, request: Request) -> None:
1250 super().__init__(
1251 url=str(request.url),
1252 headers=dict(request.headers),
1253 method=request.method,
1254 )
1255 self.request = request
1257 def add_unredirected_header(self, key: str, value: str) -> None:
1258 super().add_unredirected_header(key, value)
1259 self.request.headers[key] = value
1261 class _CookieCompatResponse:
1262 """
1263 Wraps a `Request` instance up in a compatibility interface suitable
1264 for use with `CookieJar` operations.
1265 """
1267 def __init__(self, response: Response) -> None:
1268 self.response = response
1270 def info(self) -> email.message.Message:
1271 info = email.message.Message()
1272 for key, value in self.response.headers.multi_items():
1273 # Note that setting `info[key]` here is an "append" operation,
1274 # not a "replace" operation.
1275 # https://docs.python.org/3/library/email.compat32-message.html#email.message.Message.__setitem__
1276 info[key] = value
1277 return info