Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_vendor/cachecontrol/serialize.py: 21%

89 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1# SPDX-FileCopyrightText: 2015 Eric Larson 

2# 

3# SPDX-License-Identifier: Apache-2.0 

4 

5import base64 

6import io 

7import json 

8import zlib 

9 

10from pip._vendor import msgpack 

11from pip._vendor.requests.structures import CaseInsensitiveDict 

12 

13from .compat import HTTPResponse, pickle, text_type 

14 

15 

16def _b64_decode_bytes(b): 

17 return base64.b64decode(b.encode("ascii")) 

18 

19 

20def _b64_decode_str(s): 

21 return _b64_decode_bytes(s).decode("utf8") 

22 

23 

24_default_body_read = object() 

25 

26 

27class Serializer(object): 

28 def dumps(self, request, response, body=None): 

29 response_headers = CaseInsensitiveDict(response.headers) 

30 

31 if body is None: 

32 # When a body isn't passed in, we'll read the response. We 

33 # also update the response with a new file handler to be 

34 # sure it acts as though it was never read. 

35 body = response.read(decode_content=False) 

36 response._fp = io.BytesIO(body) 

37 

38 # NOTE: This is all a bit weird, but it's really important that on 

39 # Python 2.x these objects are unicode and not str, even when 

40 # they contain only ascii. The problem here is that msgpack 

41 # understands the difference between unicode and bytes and we 

42 # have it set to differentiate between them, however Python 2 

43 # doesn't know the difference. Forcing these to unicode will be 

44 # enough to have msgpack know the difference. 

45 data = { 

46 u"response": { 

47 u"body": body, # Empty bytestring if body is stored separately 

48 u"headers": dict( 

49 (text_type(k), text_type(v)) for k, v in response.headers.items() 

50 ), 

51 u"status": response.status, 

52 u"version": response.version, 

53 u"reason": text_type(response.reason), 

54 u"strict": response.strict, 

55 u"decode_content": response.decode_content, 

56 } 

57 } 

58 

59 # Construct our vary headers 

60 data[u"vary"] = {} 

61 if u"vary" in response_headers: 

62 varied_headers = response_headers[u"vary"].split(",") 

63 for header in varied_headers: 

64 header = text_type(header).strip() 

65 header_value = request.headers.get(header, None) 

66 if header_value is not None: 

67 header_value = text_type(header_value) 

68 data[u"vary"][header] = header_value 

69 

70 return b",".join([b"cc=4", msgpack.dumps(data, use_bin_type=True)]) 

71 

72 def loads(self, request, data, body_file=None): 

73 # Short circuit if we've been given an empty set of data 

74 if not data: 

75 return 

76 

77 # Determine what version of the serializer the data was serialized 

78 # with 

79 try: 

80 ver, data = data.split(b",", 1) 

81 except ValueError: 

82 ver = b"cc=0" 

83 

84 # Make sure that our "ver" is actually a version and isn't a false 

85 # positive from a , being in the data stream. 

86 if ver[:3] != b"cc=": 

87 data = ver + data 

88 ver = b"cc=0" 

89 

90 # Get the version number out of the cc=N 

91 ver = ver.split(b"=", 1)[-1].decode("ascii") 

92 

93 # Dispatch to the actual load method for the given version 

94 try: 

95 return getattr(self, "_loads_v{}".format(ver))(request, data, body_file) 

96 

97 except AttributeError: 

98 # This is a version we don't have a loads function for, so we'll 

99 # just treat it as a miss and return None 

100 return 

101 

102 def prepare_response(self, request, cached, body_file=None): 

103 """Verify our vary headers match and construct a real urllib3 

104 HTTPResponse object. 

105 """ 

106 # Special case the '*' Vary value as it means we cannot actually 

107 # determine if the cached response is suitable for this request. 

108 # This case is also handled in the controller code when creating 

109 # a cache entry, but is left here for backwards compatibility. 

110 if "*" in cached.get("vary", {}): 

111 return 

112 

113 # Ensure that the Vary headers for the cached response match our 

114 # request 

115 for header, value in cached.get("vary", {}).items(): 

116 if request.headers.get(header, None) != value: 

117 return 

118 

119 body_raw = cached["response"].pop("body") 

120 

121 headers = CaseInsensitiveDict(data=cached["response"]["headers"]) 

122 if headers.get("transfer-encoding", "") == "chunked": 

123 headers.pop("transfer-encoding") 

124 

125 cached["response"]["headers"] = headers 

126 

127 try: 

128 if body_file is None: 

129 body = io.BytesIO(body_raw) 

130 else: 

131 body = body_file 

132 except TypeError: 

133 # This can happen if cachecontrol serialized to v1 format (pickle) 

134 # using Python 2. A Python 2 str(byte string) will be unpickled as 

135 # a Python 3 str (unicode string), which will cause the above to 

136 # fail with: 

137 # 

138 # TypeError: 'str' does not support the buffer interface 

139 body = io.BytesIO(body_raw.encode("utf8")) 

140 

141 return HTTPResponse(body=body, preload_content=False, **cached["response"]) 

142 

143 def _loads_v0(self, request, data, body_file=None): 

144 # The original legacy cache data. This doesn't contain enough 

145 # information to construct everything we need, so we'll treat this as 

146 # a miss. 

147 return 

148 

149 def _loads_v1(self, request, data, body_file=None): 

150 try: 

151 cached = pickle.loads(data) 

152 except ValueError: 

153 return 

154 

155 return self.prepare_response(request, cached, body_file) 

156 

157 def _loads_v2(self, request, data, body_file=None): 

158 assert body_file is None 

159 try: 

160 cached = json.loads(zlib.decompress(data).decode("utf8")) 

161 except (ValueError, zlib.error): 

162 return 

163 

164 # We need to decode the items that we've base64 encoded 

165 cached["response"]["body"] = _b64_decode_bytes(cached["response"]["body"]) 

166 cached["response"]["headers"] = dict( 

167 (_b64_decode_str(k), _b64_decode_str(v)) 

168 for k, v in cached["response"]["headers"].items() 

169 ) 

170 cached["response"]["reason"] = _b64_decode_str(cached["response"]["reason"]) 

171 cached["vary"] = dict( 

172 (_b64_decode_str(k), _b64_decode_str(v) if v is not None else v) 

173 for k, v in cached["vary"].items() 

174 ) 

175 

176 return self.prepare_response(request, cached, body_file) 

177 

178 def _loads_v3(self, request, data, body_file): 

179 # Due to Python 2 encoding issues, it's impossible to know for sure 

180 # exactly how to load v3 entries, thus we'll treat these as a miss so 

181 # that they get rewritten out as v4 entries. 

182 return 

183 

184 def _loads_v4(self, request, data, body_file=None): 

185 try: 

186 cached = msgpack.loads(data, raw=False) 

187 except ValueError: 

188 return 

189 

190 return self.prepare_response(request, cached, body_file)