Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/httpchecksum.py: 25%

262 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13 

14""" The interfaces in this module are not intended for public use. 

15 

16This module defines interfaces for applying checksums to HTTP requests within 

17the context of botocore. This involves both resolving the checksum to be used 

18based on client configuration and environment, as well as application of the 

19checksum to the request. 

20""" 

21import base64 

22import io 

23import logging 

24from binascii import crc32 

25from hashlib import sha1, sha256 

26 

27from botocore.compat import HAS_CRT 

28from botocore.exceptions import ( 

29 AwsChunkedWrapperError, 

30 FlexibleChecksumError, 

31 MissingDependencyException, 

32) 

33from botocore.response import StreamingBody 

34from botocore.utils import ( 

35 conditionally_calculate_md5, 

36 determine_content_length, 

37) 

38 

39if HAS_CRT: 

40 from awscrt import checksums as crt_checksums 

41else: 

42 crt_checksums = None 

43 

44logger = logging.getLogger(__name__) 

45 

46 

47class BaseChecksum: 

48 _CHUNK_SIZE = 1024 * 1024 

49 

50 def update(self, chunk): 

51 pass 

52 

53 def digest(self): 

54 pass 

55 

56 def b64digest(self): 

57 bs = self.digest() 

58 return base64.b64encode(bs).decode("ascii") 

59 

60 def _handle_fileobj(self, fileobj): 

61 start_position = fileobj.tell() 

62 for chunk in iter(lambda: fileobj.read(self._CHUNK_SIZE), b""): 

63 self.update(chunk) 

64 fileobj.seek(start_position) 

65 

66 def handle(self, body): 

67 if isinstance(body, (bytes, bytearray)): 

68 self.update(body) 

69 else: 

70 self._handle_fileobj(body) 

71 return self.b64digest() 

72 

73 

74class Crc32Checksum(BaseChecksum): 

75 def __init__(self): 

76 self._int_crc32 = 0 

77 

78 def update(self, chunk): 

79 self._int_crc32 = crc32(chunk, self._int_crc32) & 0xFFFFFFFF 

80 

81 def digest(self): 

82 return self._int_crc32.to_bytes(4, byteorder="big") 

83 

84 

85class CrtCrc32Checksum(BaseChecksum): 

86 # Note: This class is only used if the CRT is available 

87 def __init__(self): 

88 self._int_crc32 = 0 

89 

90 def update(self, chunk): 

91 new_checksum = crt_checksums.crc32(chunk, self._int_crc32) 

92 self._int_crc32 = new_checksum & 0xFFFFFFFF 

93 

94 def digest(self): 

95 return self._int_crc32.to_bytes(4, byteorder="big") 

96 

97 

98class CrtCrc32cChecksum(BaseChecksum): 

99 # Note: This class is only used if the CRT is available 

100 def __init__(self): 

101 self._int_crc32c = 0 

102 

103 def update(self, chunk): 

104 new_checksum = crt_checksums.crc32c(chunk, self._int_crc32c) 

105 self._int_crc32c = new_checksum & 0xFFFFFFFF 

106 

107 def digest(self): 

108 return self._int_crc32c.to_bytes(4, byteorder="big") 

109 

110 

111class Sha1Checksum(BaseChecksum): 

112 def __init__(self): 

113 self._checksum = sha1() 

114 

115 def update(self, chunk): 

116 self._checksum.update(chunk) 

117 

118 def digest(self): 

119 return self._checksum.digest() 

120 

121 

122class Sha256Checksum(BaseChecksum): 

123 def __init__(self): 

124 self._checksum = sha256() 

125 

126 def update(self, chunk): 

127 self._checksum.update(chunk) 

128 

129 def digest(self): 

130 return self._checksum.digest() 

131 

132 

133class AwsChunkedWrapper: 

134 _DEFAULT_CHUNK_SIZE = 1024 * 1024 

135 

136 def __init__( 

137 self, 

138 raw, 

139 checksum_cls=None, 

140 checksum_name="x-amz-checksum", 

141 chunk_size=None, 

142 ): 

143 self._raw = raw 

144 self._checksum_name = checksum_name 

145 self._checksum_cls = checksum_cls 

146 self._reset() 

147 

148 if chunk_size is None: 

149 chunk_size = self._DEFAULT_CHUNK_SIZE 

150 self._chunk_size = chunk_size 

151 

152 def _reset(self): 

153 self._remaining = b"" 

154 self._complete = False 

155 self._checksum = None 

156 if self._checksum_cls: 

157 self._checksum = self._checksum_cls() 

158 

159 def seek(self, offset, whence=0): 

160 if offset != 0 or whence != 0: 

161 raise AwsChunkedWrapperError( 

162 error_msg="Can only seek to start of stream" 

163 ) 

164 self._reset() 

165 self._raw.seek(0) 

166 

167 def read(self, size=None): 

168 # Normalize "read all" size values to None 

169 if size is not None and size <= 0: 

170 size = None 

171 

172 # If the underlying body is done and we have nothing left then 

173 # end the stream 

174 if self._complete and not self._remaining: 

175 return b"" 

176 

177 # While we're not done and want more bytes 

178 want_more_bytes = size is None or size > len(self._remaining) 

179 while not self._complete and want_more_bytes: 

180 self._remaining += self._make_chunk() 

181 want_more_bytes = size is None or size > len(self._remaining) 

182 

183 # If size was None, we want to return everything 

184 if size is None: 

185 size = len(self._remaining) 

186 

187 # Return a chunk up to the size asked for 

188 to_return = self._remaining[:size] 

189 self._remaining = self._remaining[size:] 

190 return to_return 

191 

192 def _make_chunk(self): 

193 # NOTE: Chunk size is not deterministic as read could return less. This 

194 # means we cannot know the content length of the encoded aws-chunked 

195 # stream ahead of time without ensuring a consistent chunk size 

196 raw_chunk = self._raw.read(self._chunk_size) 

197 hex_len = hex(len(raw_chunk))[2:].encode("ascii") 

198 self._complete = not raw_chunk 

199 

200 if self._checksum: 

201 self._checksum.update(raw_chunk) 

202 

203 if self._checksum and self._complete: 

204 name = self._checksum_name.encode("ascii") 

205 checksum = self._checksum.b64digest().encode("ascii") 

206 return b"0\r\n%s:%s\r\n\r\n" % (name, checksum) 

207 

208 return b"%s\r\n%s\r\n" % (hex_len, raw_chunk) 

209 

210 def __iter__(self): 

211 while not self._complete: 

212 yield self._make_chunk() 

213 

214 

215class StreamingChecksumBody(StreamingBody): 

216 def __init__(self, raw_stream, content_length, checksum, expected): 

217 super().__init__(raw_stream, content_length) 

218 self._checksum = checksum 

219 self._expected = expected 

220 

221 def read(self, amt=None): 

222 chunk = super().read(amt=amt) 

223 self._checksum.update(chunk) 

224 if amt is None or (not chunk and amt > 0): 

225 self._validate_checksum() 

226 return chunk 

227 

228 def _validate_checksum(self): 

229 if self._checksum.digest() != base64.b64decode(self._expected): 

230 error_msg = ( 

231 f"Expected checksum {self._expected} did not match calculated " 

232 f"checksum: {self._checksum.b64digest()}" 

233 ) 

234 raise FlexibleChecksumError(error_msg=error_msg) 

235 

236 

237def resolve_checksum_context(request, operation_model, params): 

238 resolve_request_checksum_algorithm(request, operation_model, params) 

239 resolve_response_checksum_algorithms(request, operation_model, params) 

240 

241 

242def resolve_request_checksum_algorithm( 

243 request, 

244 operation_model, 

245 params, 

246 supported_algorithms=None, 

247): 

248 http_checksum = operation_model.http_checksum 

249 algorithm_member = http_checksum.get("requestAlgorithmMember") 

250 if algorithm_member and algorithm_member in params: 

251 # If the client has opted into using flexible checksums and the 

252 # request supports it, use that instead of checksum required 

253 if supported_algorithms is None: 

254 supported_algorithms = _SUPPORTED_CHECKSUM_ALGORITHMS 

255 

256 algorithm_name = params[algorithm_member].lower() 

257 if algorithm_name not in supported_algorithms: 

258 if not HAS_CRT and algorithm_name in _CRT_CHECKSUM_ALGORITHMS: 

259 raise MissingDependencyException( 

260 msg=( 

261 f"Using {algorithm_name.upper()} requires an " 

262 "additional dependency. You will need to pip install " 

263 "botocore[crt] before proceeding." 

264 ) 

265 ) 

266 raise FlexibleChecksumError( 

267 error_msg="Unsupported checksum algorithm: %s" % algorithm_name 

268 ) 

269 

270 location_type = "header" 

271 if operation_model.has_streaming_input: 

272 # Operations with streaming input must support trailers. 

273 if request["url"].startswith("https:"): 

274 # We only support unsigned trailer checksums currently. As this 

275 # disables payload signing we'll only use trailers over TLS. 

276 location_type = "trailer" 

277 

278 algorithm = { 

279 "algorithm": algorithm_name, 

280 "in": location_type, 

281 "name": "x-amz-checksum-%s" % algorithm_name, 

282 } 

283 

284 if algorithm["name"] in request["headers"]: 

285 # If the header is already set by the customer, skip calculation 

286 return 

287 

288 checksum_context = request["context"].get("checksum", {}) 

289 checksum_context["request_algorithm"] = algorithm 

290 request["context"]["checksum"] = checksum_context 

291 elif operation_model.http_checksum_required or http_checksum.get( 

292 "requestChecksumRequired" 

293 ): 

294 # Otherwise apply the old http checksum behavior via Content-MD5 

295 checksum_context = request["context"].get("checksum", {}) 

296 checksum_context["request_algorithm"] = "conditional-md5" 

297 request["context"]["checksum"] = checksum_context 

298 

299 

300def apply_request_checksum(request): 

301 checksum_context = request.get("context", {}).get("checksum", {}) 

302 algorithm = checksum_context.get("request_algorithm") 

303 

304 if not algorithm: 

305 return 

306 

307 if algorithm == "conditional-md5": 

308 # Special case to handle the http checksum required trait 

309 conditionally_calculate_md5(request) 

310 elif algorithm["in"] == "header": 

311 _apply_request_header_checksum(request) 

312 elif algorithm["in"] == "trailer": 

313 _apply_request_trailer_checksum(request) 

314 else: 

315 raise FlexibleChecksumError( 

316 error_msg="Unknown checksum variant: %s" % algorithm["in"] 

317 ) 

318 

319 

320def _apply_request_header_checksum(request): 

321 checksum_context = request.get("context", {}).get("checksum", {}) 

322 algorithm = checksum_context.get("request_algorithm") 

323 location_name = algorithm["name"] 

324 if location_name in request["headers"]: 

325 # If the header is already set by the customer, skip calculation 

326 return 

327 checksum_cls = _CHECKSUM_CLS.get(algorithm["algorithm"]) 

328 digest = checksum_cls().handle(request["body"]) 

329 request["headers"][location_name] = digest 

330 

331 

332def _apply_request_trailer_checksum(request): 

333 checksum_context = request.get("context", {}).get("checksum", {}) 

334 algorithm = checksum_context.get("request_algorithm") 

335 location_name = algorithm["name"] 

336 checksum_cls = _CHECKSUM_CLS.get(algorithm["algorithm"]) 

337 

338 headers = request["headers"] 

339 body = request["body"] 

340 

341 if location_name in headers: 

342 # If the header is already set by the customer, skip calculation 

343 return 

344 

345 headers["Transfer-Encoding"] = "chunked" 

346 if "Content-Encoding" in headers: 

347 # We need to preserve the existing content encoding and add 

348 # aws-chunked as a new content encoding. 

349 headers["Content-Encoding"] += ",aws-chunked" 

350 else: 

351 headers["Content-Encoding"] = "aws-chunked" 

352 headers["X-Amz-Trailer"] = location_name 

353 

354 content_length = determine_content_length(body) 

355 if content_length is not None: 

356 # Send the decoded content length if we can determine it. Some 

357 # services such as S3 may require the decoded content length 

358 headers["X-Amz-Decoded-Content-Length"] = str(content_length) 

359 

360 if isinstance(body, (bytes, bytearray)): 

361 body = io.BytesIO(body) 

362 

363 request["body"] = AwsChunkedWrapper( 

364 body, 

365 checksum_cls=checksum_cls, 

366 checksum_name=location_name, 

367 ) 

368 

369 

370def resolve_response_checksum_algorithms( 

371 request, operation_model, params, supported_algorithms=None 

372): 

373 http_checksum = operation_model.http_checksum 

374 mode_member = http_checksum.get("requestValidationModeMember") 

375 if mode_member and mode_member in params: 

376 if supported_algorithms is None: 

377 supported_algorithms = _SUPPORTED_CHECKSUM_ALGORITHMS 

378 response_algorithms = { 

379 a.lower() for a in http_checksum.get("responseAlgorithms", []) 

380 } 

381 

382 usable_algorithms = [] 

383 for algorithm in _ALGORITHMS_PRIORITY_LIST: 

384 if algorithm not in response_algorithms: 

385 continue 

386 if algorithm in supported_algorithms: 

387 usable_algorithms.append(algorithm) 

388 

389 checksum_context = request["context"].get("checksum", {}) 

390 checksum_context["response_algorithms"] = usable_algorithms 

391 request["context"]["checksum"] = checksum_context 

392 

393 

394def handle_checksum_body(http_response, response, context, operation_model): 

395 headers = response["headers"] 

396 checksum_context = context.get("checksum", {}) 

397 algorithms = checksum_context.get("response_algorithms") 

398 

399 if not algorithms: 

400 return 

401 

402 for algorithm in algorithms: 

403 header_name = "x-amz-checksum-%s" % algorithm 

404 # If the header is not found, check the next algorithm 

405 if header_name not in headers: 

406 continue 

407 

408 # If a - is in the checksum this is not valid Base64. S3 returns 

409 # checksums that include a -# suffix to indicate a checksum derived 

410 # from the hash of all part checksums. We cannot wrap this response 

411 if "-" in headers[header_name]: 

412 continue 

413 

414 if operation_model.has_streaming_output: 

415 response["body"] = _handle_streaming_response( 

416 http_response, response, algorithm 

417 ) 

418 else: 

419 response["body"] = _handle_bytes_response( 

420 http_response, response, algorithm 

421 ) 

422 

423 # Expose metadata that the checksum check actually occurred 

424 checksum_context = response["context"].get("checksum", {}) 

425 checksum_context["response_algorithm"] = algorithm 

426 response["context"]["checksum"] = checksum_context 

427 return 

428 

429 logger.info( 

430 f'Skipping checksum validation. Response did not contain one of the ' 

431 f'following algorithms: {algorithms}.' 

432 ) 

433 

434 

435def _handle_streaming_response(http_response, response, algorithm): 

436 checksum_cls = _CHECKSUM_CLS.get(algorithm) 

437 header_name = "x-amz-checksum-%s" % algorithm 

438 return StreamingChecksumBody( 

439 http_response.raw, 

440 response["headers"].get("content-length"), 

441 checksum_cls(), 

442 response["headers"][header_name], 

443 ) 

444 

445 

446def _handle_bytes_response(http_response, response, algorithm): 

447 body = http_response.content 

448 header_name = "x-amz-checksum-%s" % algorithm 

449 checksum_cls = _CHECKSUM_CLS.get(algorithm) 

450 checksum = checksum_cls() 

451 checksum.update(body) 

452 expected = response["headers"][header_name] 

453 if checksum.digest() != base64.b64decode(expected): 

454 error_msg = ( 

455 "Expected checksum %s did not match calculated checksum: %s" 

456 % ( 

457 expected, 

458 checksum.b64digest(), 

459 ) 

460 ) 

461 raise FlexibleChecksumError(error_msg=error_msg) 

462 return body 

463 

464 

465_CHECKSUM_CLS = { 

466 "crc32": Crc32Checksum, 

467 "sha1": Sha1Checksum, 

468 "sha256": Sha256Checksum, 

469} 

470_CRT_CHECKSUM_ALGORITHMS = ["crc32", "crc32c"] 

471if HAS_CRT: 

472 # Use CRT checksum implementations if available 

473 _CRT_CHECKSUM_CLS = { 

474 "crc32": CrtCrc32Checksum, 

475 "crc32c": CrtCrc32cChecksum, 

476 } 

477 _CHECKSUM_CLS.update(_CRT_CHECKSUM_CLS) 

478 # Validate this list isn't out of sync with _CRT_CHECKSUM_CLS keys 

479 assert all( 

480 name in _CRT_CHECKSUM_ALGORITHMS for name in _CRT_CHECKSUM_CLS.keys() 

481 ) 

482_SUPPORTED_CHECKSUM_ALGORITHMS = list(_CHECKSUM_CLS.keys()) 

483_ALGORITHMS_PRIORITY_LIST = ['crc32c', 'crc32', 'sha1', 'sha256']