Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_vendor/cachecontrol/serialize.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

62 statements  

1# SPDX-FileCopyrightText: 2015 Eric Larson 

2# 

3# SPDX-License-Identifier: Apache-2.0 

4from __future__ import annotations 

5 

6import io 

7from typing import IO, TYPE_CHECKING, Any, Mapping, cast 

8 

9from pip._vendor import msgpack 

10from pip._vendor.requests.structures import CaseInsensitiveDict 

11from pip._vendor.urllib3 import HTTPResponse 

12 

13if TYPE_CHECKING: 

14 from pip._vendor.requests import PreparedRequest 

15 

16 

17class Serializer: 

18 serde_version = "4" 

19 

20 def dumps( 

21 self, 

22 request: PreparedRequest, 

23 response: HTTPResponse, 

24 body: bytes | None = None, 

25 ) -> bytes: 

26 response_headers: CaseInsensitiveDict[str] = CaseInsensitiveDict( 

27 response.headers 

28 ) 

29 

30 if body is None: 

31 # When a body isn't passed in, we'll read the response. We 

32 # also update the response with a new file handler to be 

33 # sure it acts as though it was never read. 

34 body = response.read(decode_content=False) 

35 response._fp = io.BytesIO(body) # type: ignore[assignment] 

36 response.length_remaining = len(body) 

37 

38 data = { 

39 "response": { 

40 "body": body, # Empty bytestring if body is stored separately 

41 "headers": {str(k): str(v) for k, v in response.headers.items()}, 

42 "status": response.status, 

43 "version": response.version, 

44 "reason": str(response.reason), 

45 "decode_content": response.decode_content, 

46 } 

47 } 

48 

49 # Construct our vary headers 

50 data["vary"] = {} 

51 if "vary" in response_headers: 

52 varied_headers = response_headers["vary"].split(",") 

53 for header in varied_headers: 

54 header = str(header).strip() 

55 header_value = request.headers.get(header, None) 

56 if header_value is not None: 

57 header_value = str(header_value) 

58 data["vary"][header] = header_value 

59 

60 return b",".join([f"cc={self.serde_version}".encode(), self.serialize(data)]) 

61 

62 def serialize(self, data: dict[str, Any]) -> bytes: 

63 return cast(bytes, msgpack.dumps(data, use_bin_type=True)) 

64 

65 def loads( 

66 self, 

67 request: PreparedRequest, 

68 data: bytes, 

69 body_file: IO[bytes] | None = None, 

70 ) -> HTTPResponse | None: 

71 # Short circuit if we've been given an empty set of data 

72 if not data: 

73 return None 

74 

75 # Previous versions of this library supported other serialization 

76 # formats, but these have all been removed. 

77 if not data.startswith(f"cc={self.serde_version},".encode()): 

78 return None 

79 

80 data = data[5:] 

81 return self._loads_v4(request, data, body_file) 

82 

83 def prepare_response( 

84 self, 

85 request: PreparedRequest, 

86 cached: Mapping[str, Any], 

87 body_file: IO[bytes] | None = None, 

88 ) -> HTTPResponse | None: 

89 """Verify our vary headers match and construct a real urllib3 

90 HTTPResponse object. 

91 """ 

92 # Special case the '*' Vary value as it means we cannot actually 

93 # determine if the cached response is suitable for this request. 

94 # This case is also handled in the controller code when creating 

95 # a cache entry, but is left here for backwards compatibility. 

96 if "*" in cached.get("vary", {}): 

97 return None 

98 

99 # Ensure that the Vary headers for the cached response match our 

100 # request 

101 for header, value in cached.get("vary", {}).items(): 

102 if request.headers.get(header, None) != value: 

103 return None 

104 

105 body_raw = cached["response"].pop("body") 

106 

107 headers: CaseInsensitiveDict[str] = CaseInsensitiveDict( 

108 data=cached["response"]["headers"] 

109 ) 

110 if headers.get("transfer-encoding", "") == "chunked": 

111 headers.pop("transfer-encoding") 

112 

113 cached["response"]["headers"] = headers 

114 

115 try: 

116 body: IO[bytes] 

117 if body_file is None: 

118 body = io.BytesIO(body_raw) 

119 else: 

120 body = body_file 

121 except TypeError: 

122 # This can happen if cachecontrol serialized to v1 format (pickle) 

123 # using Python 2. A Python 2 str(byte string) will be unpickled as 

124 # a Python 3 str (unicode string), which will cause the above to 

125 # fail with: 

126 # 

127 # TypeError: 'str' does not support the buffer interface 

128 body = io.BytesIO(body_raw.encode("utf8")) 

129 

130 # Discard any `strict` parameter serialized by older version of cachecontrol. 

131 cached["response"].pop("strict", None) 

132 

133 return HTTPResponse(body=body, preload_content=False, **cached["response"]) 

134 

135 def _loads_v4( 

136 self, 

137 request: PreparedRequest, 

138 data: bytes, 

139 body_file: IO[bytes] | None = None, 

140 ) -> HTTPResponse | None: 

141 try: 

142 cached = msgpack.loads(data, raw=False) 

143 except ValueError: 

144 return None 

145 

146 return self.prepare_response(request, cached, body_file)