1# SPDX-FileCopyrightText: 2015 Eric Larson
2#
3# SPDX-License-Identifier: Apache-2.0
4from __future__ import annotations
5
6import io
7from typing import IO, TYPE_CHECKING, Any, Mapping, cast
8
9from pip._vendor import msgpack
10from pip._vendor.requests.structures import CaseInsensitiveDict
11from pip._vendor.urllib3 import HTTPResponse
12
13if TYPE_CHECKING:
14 from pip._vendor.requests import PreparedRequest
15
16
17class Serializer:
18 serde_version = "4"
19
20 def dumps(
21 self,
22 request: PreparedRequest,
23 response: HTTPResponse,
24 body: bytes | None = None,
25 ) -> bytes:
26 response_headers: CaseInsensitiveDict[str] = CaseInsensitiveDict(
27 response.headers
28 )
29
30 if body is None:
31 # When a body isn't passed in, we'll read the response. We
32 # also update the response with a new file handler to be
33 # sure it acts as though it was never read.
34 body = response.read(decode_content=False)
35 response._fp = io.BytesIO(body) # type: ignore[assignment]
36 response.length_remaining = len(body)
37
38 data = {
39 "response": {
40 "body": body, # Empty bytestring if body is stored separately
41 "headers": {str(k): str(v) for k, v in response.headers.items()},
42 "status": response.status,
43 "version": response.version,
44 "reason": str(response.reason),
45 "decode_content": response.decode_content,
46 }
47 }
48
49 # Construct our vary headers
50 data["vary"] = {}
51 if "vary" in response_headers:
52 varied_headers = response_headers["vary"].split(",")
53 for header in varied_headers:
54 header = str(header).strip()
55 header_value = request.headers.get(header, None)
56 if header_value is not None:
57 header_value = str(header_value)
58 data["vary"][header] = header_value
59
60 return b",".join([f"cc={self.serde_version}".encode(), self.serialize(data)])
61
62 def serialize(self, data: dict[str, Any]) -> bytes:
63 return cast(bytes, msgpack.dumps(data, use_bin_type=True))
64
65 def loads(
66 self,
67 request: PreparedRequest,
68 data: bytes,
69 body_file: IO[bytes] | None = None,
70 ) -> HTTPResponse | None:
71 # Short circuit if we've been given an empty set of data
72 if not data:
73 return None
74
75 # Previous versions of this library supported other serialization
76 # formats, but these have all been removed.
77 if not data.startswith(f"cc={self.serde_version},".encode()):
78 return None
79
80 data = data[5:]
81 return self._loads_v4(request, data, body_file)
82
83 def prepare_response(
84 self,
85 request: PreparedRequest,
86 cached: Mapping[str, Any],
87 body_file: IO[bytes] | None = None,
88 ) -> HTTPResponse | None:
89 """Verify our vary headers match and construct a real urllib3
90 HTTPResponse object.
91 """
92 # Special case the '*' Vary value as it means we cannot actually
93 # determine if the cached response is suitable for this request.
94 # This case is also handled in the controller code when creating
95 # a cache entry, but is left here for backwards compatibility.
96 if "*" in cached.get("vary", {}):
97 return None
98
99 # Ensure that the Vary headers for the cached response match our
100 # request
101 for header, value in cached.get("vary", {}).items():
102 if request.headers.get(header, None) != value:
103 return None
104
105 body_raw = cached["response"].pop("body")
106
107 headers: CaseInsensitiveDict[str] = CaseInsensitiveDict(
108 data=cached["response"]["headers"]
109 )
110 if headers.get("transfer-encoding", "") == "chunked":
111 headers.pop("transfer-encoding")
112
113 cached["response"]["headers"] = headers
114
115 try:
116 body: IO[bytes]
117 if body_file is None:
118 body = io.BytesIO(body_raw)
119 else:
120 body = body_file
121 except TypeError:
122 # This can happen if cachecontrol serialized to v1 format (pickle)
123 # using Python 2. A Python 2 str(byte string) will be unpickled as
124 # a Python 3 str (unicode string), which will cause the above to
125 # fail with:
126 #
127 # TypeError: 'str' does not support the buffer interface
128 body = io.BytesIO(body_raw.encode("utf8"))
129
130 # Discard any `strict` parameter serialized by older version of cachecontrol.
131 cached["response"].pop("strict", None)
132
133 return HTTPResponse(body=body, preload_content=False, **cached["response"])
134
135 def _loads_v4(
136 self,
137 request: PreparedRequest,
138 data: bytes,
139 body_file: IO[bytes] | None = None,
140 ) -> HTTPResponse | None:
141 try:
142 cached = msgpack.loads(data, raw=False)
143 except ValueError:
144 return None
145
146 return self.prepare_response(request, cached, body_file)