Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/httpchecksum.py: 25%
262 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
14""" The interfaces in this module are not intended for public use.
16This module defines interfaces for applying checksums to HTTP requests within
17the context of botocore. This involves both resolving the checksum to be used
18based on client configuration and environment, as well as application of the
19checksum to the request.
20"""
21import base64
22import io
23import logging
24from binascii import crc32
25from hashlib import sha1, sha256
27from botocore.compat import HAS_CRT
28from botocore.exceptions import (
29 AwsChunkedWrapperError,
30 FlexibleChecksumError,
31 MissingDependencyException,
32)
33from botocore.response import StreamingBody
34from botocore.utils import (
35 conditionally_calculate_md5,
36 determine_content_length,
37)
39if HAS_CRT:
40 from awscrt import checksums as crt_checksums
41else:
42 crt_checksums = None
44logger = logging.getLogger(__name__)
47class BaseChecksum:
48 _CHUNK_SIZE = 1024 * 1024
50 def update(self, chunk):
51 pass
53 def digest(self):
54 pass
56 def b64digest(self):
57 bs = self.digest()
58 return base64.b64encode(bs).decode("ascii")
60 def _handle_fileobj(self, fileobj):
61 start_position = fileobj.tell()
62 for chunk in iter(lambda: fileobj.read(self._CHUNK_SIZE), b""):
63 self.update(chunk)
64 fileobj.seek(start_position)
66 def handle(self, body):
67 if isinstance(body, (bytes, bytearray)):
68 self.update(body)
69 else:
70 self._handle_fileobj(body)
71 return self.b64digest()
74class Crc32Checksum(BaseChecksum):
75 def __init__(self):
76 self._int_crc32 = 0
78 def update(self, chunk):
79 self._int_crc32 = crc32(chunk, self._int_crc32) & 0xFFFFFFFF
81 def digest(self):
82 return self._int_crc32.to_bytes(4, byteorder="big")
85class CrtCrc32Checksum(BaseChecksum):
86 # Note: This class is only used if the CRT is available
87 def __init__(self):
88 self._int_crc32 = 0
90 def update(self, chunk):
91 new_checksum = crt_checksums.crc32(chunk, self._int_crc32)
92 self._int_crc32 = new_checksum & 0xFFFFFFFF
94 def digest(self):
95 return self._int_crc32.to_bytes(4, byteorder="big")
98class CrtCrc32cChecksum(BaseChecksum):
99 # Note: This class is only used if the CRT is available
100 def __init__(self):
101 self._int_crc32c = 0
103 def update(self, chunk):
104 new_checksum = crt_checksums.crc32c(chunk, self._int_crc32c)
105 self._int_crc32c = new_checksum & 0xFFFFFFFF
107 def digest(self):
108 return self._int_crc32c.to_bytes(4, byteorder="big")
111class Sha1Checksum(BaseChecksum):
112 def __init__(self):
113 self._checksum = sha1()
115 def update(self, chunk):
116 self._checksum.update(chunk)
118 def digest(self):
119 return self._checksum.digest()
122class Sha256Checksum(BaseChecksum):
123 def __init__(self):
124 self._checksum = sha256()
126 def update(self, chunk):
127 self._checksum.update(chunk)
129 def digest(self):
130 return self._checksum.digest()
133class AwsChunkedWrapper:
134 _DEFAULT_CHUNK_SIZE = 1024 * 1024
136 def __init__(
137 self,
138 raw,
139 checksum_cls=None,
140 checksum_name="x-amz-checksum",
141 chunk_size=None,
142 ):
143 self._raw = raw
144 self._checksum_name = checksum_name
145 self._checksum_cls = checksum_cls
146 self._reset()
148 if chunk_size is None:
149 chunk_size = self._DEFAULT_CHUNK_SIZE
150 self._chunk_size = chunk_size
152 def _reset(self):
153 self._remaining = b""
154 self._complete = False
155 self._checksum = None
156 if self._checksum_cls:
157 self._checksum = self._checksum_cls()
159 def seek(self, offset, whence=0):
160 if offset != 0 or whence != 0:
161 raise AwsChunkedWrapperError(
162 error_msg="Can only seek to start of stream"
163 )
164 self._reset()
165 self._raw.seek(0)
167 def read(self, size=None):
168 # Normalize "read all" size values to None
169 if size is not None and size <= 0:
170 size = None
172 # If the underlying body is done and we have nothing left then
173 # end the stream
174 if self._complete and not self._remaining:
175 return b""
177 # While we're not done and want more bytes
178 want_more_bytes = size is None or size > len(self._remaining)
179 while not self._complete and want_more_bytes:
180 self._remaining += self._make_chunk()
181 want_more_bytes = size is None or size > len(self._remaining)
183 # If size was None, we want to return everything
184 if size is None:
185 size = len(self._remaining)
187 # Return a chunk up to the size asked for
188 to_return = self._remaining[:size]
189 self._remaining = self._remaining[size:]
190 return to_return
192 def _make_chunk(self):
193 # NOTE: Chunk size is not deterministic as read could return less. This
194 # means we cannot know the content length of the encoded aws-chunked
195 # stream ahead of time without ensuring a consistent chunk size
196 raw_chunk = self._raw.read(self._chunk_size)
197 hex_len = hex(len(raw_chunk))[2:].encode("ascii")
198 self._complete = not raw_chunk
200 if self._checksum:
201 self._checksum.update(raw_chunk)
203 if self._checksum and self._complete:
204 name = self._checksum_name.encode("ascii")
205 checksum = self._checksum.b64digest().encode("ascii")
206 return b"0\r\n%s:%s\r\n\r\n" % (name, checksum)
208 return b"%s\r\n%s\r\n" % (hex_len, raw_chunk)
210 def __iter__(self):
211 while not self._complete:
212 yield self._make_chunk()
215class StreamingChecksumBody(StreamingBody):
216 def __init__(self, raw_stream, content_length, checksum, expected):
217 super().__init__(raw_stream, content_length)
218 self._checksum = checksum
219 self._expected = expected
221 def read(self, amt=None):
222 chunk = super().read(amt=amt)
223 self._checksum.update(chunk)
224 if amt is None or (not chunk and amt > 0):
225 self._validate_checksum()
226 return chunk
228 def _validate_checksum(self):
229 if self._checksum.digest() != base64.b64decode(self._expected):
230 error_msg = (
231 f"Expected checksum {self._expected} did not match calculated "
232 f"checksum: {self._checksum.b64digest()}"
233 )
234 raise FlexibleChecksumError(error_msg=error_msg)
237def resolve_checksum_context(request, operation_model, params):
238 resolve_request_checksum_algorithm(request, operation_model, params)
239 resolve_response_checksum_algorithms(request, operation_model, params)
242def resolve_request_checksum_algorithm(
243 request,
244 operation_model,
245 params,
246 supported_algorithms=None,
247):
248 http_checksum = operation_model.http_checksum
249 algorithm_member = http_checksum.get("requestAlgorithmMember")
250 if algorithm_member and algorithm_member in params:
251 # If the client has opted into using flexible checksums and the
252 # request supports it, use that instead of checksum required
253 if supported_algorithms is None:
254 supported_algorithms = _SUPPORTED_CHECKSUM_ALGORITHMS
256 algorithm_name = params[algorithm_member].lower()
257 if algorithm_name not in supported_algorithms:
258 if not HAS_CRT and algorithm_name in _CRT_CHECKSUM_ALGORITHMS:
259 raise MissingDependencyException(
260 msg=(
261 f"Using {algorithm_name.upper()} requires an "
262 "additional dependency. You will need to pip install "
263 "botocore[crt] before proceeding."
264 )
265 )
266 raise FlexibleChecksumError(
267 error_msg="Unsupported checksum algorithm: %s" % algorithm_name
268 )
270 location_type = "header"
271 if operation_model.has_streaming_input:
272 # Operations with streaming input must support trailers.
273 if request["url"].startswith("https:"):
274 # We only support unsigned trailer checksums currently. As this
275 # disables payload signing we'll only use trailers over TLS.
276 location_type = "trailer"
278 algorithm = {
279 "algorithm": algorithm_name,
280 "in": location_type,
281 "name": "x-amz-checksum-%s" % algorithm_name,
282 }
284 if algorithm["name"] in request["headers"]:
285 # If the header is already set by the customer, skip calculation
286 return
288 checksum_context = request["context"].get("checksum", {})
289 checksum_context["request_algorithm"] = algorithm
290 request["context"]["checksum"] = checksum_context
291 elif operation_model.http_checksum_required or http_checksum.get(
292 "requestChecksumRequired"
293 ):
294 # Otherwise apply the old http checksum behavior via Content-MD5
295 checksum_context = request["context"].get("checksum", {})
296 checksum_context["request_algorithm"] = "conditional-md5"
297 request["context"]["checksum"] = checksum_context
300def apply_request_checksum(request):
301 checksum_context = request.get("context", {}).get("checksum", {})
302 algorithm = checksum_context.get("request_algorithm")
304 if not algorithm:
305 return
307 if algorithm == "conditional-md5":
308 # Special case to handle the http checksum required trait
309 conditionally_calculate_md5(request)
310 elif algorithm["in"] == "header":
311 _apply_request_header_checksum(request)
312 elif algorithm["in"] == "trailer":
313 _apply_request_trailer_checksum(request)
314 else:
315 raise FlexibleChecksumError(
316 error_msg="Unknown checksum variant: %s" % algorithm["in"]
317 )
320def _apply_request_header_checksum(request):
321 checksum_context = request.get("context", {}).get("checksum", {})
322 algorithm = checksum_context.get("request_algorithm")
323 location_name = algorithm["name"]
324 if location_name in request["headers"]:
325 # If the header is already set by the customer, skip calculation
326 return
327 checksum_cls = _CHECKSUM_CLS.get(algorithm["algorithm"])
328 digest = checksum_cls().handle(request["body"])
329 request["headers"][location_name] = digest
332def _apply_request_trailer_checksum(request):
333 checksum_context = request.get("context", {}).get("checksum", {})
334 algorithm = checksum_context.get("request_algorithm")
335 location_name = algorithm["name"]
336 checksum_cls = _CHECKSUM_CLS.get(algorithm["algorithm"])
338 headers = request["headers"]
339 body = request["body"]
341 if location_name in headers:
342 # If the header is already set by the customer, skip calculation
343 return
345 headers["Transfer-Encoding"] = "chunked"
346 if "Content-Encoding" in headers:
347 # We need to preserve the existing content encoding and add
348 # aws-chunked as a new content encoding.
349 headers["Content-Encoding"] += ",aws-chunked"
350 else:
351 headers["Content-Encoding"] = "aws-chunked"
352 headers["X-Amz-Trailer"] = location_name
354 content_length = determine_content_length(body)
355 if content_length is not None:
356 # Send the decoded content length if we can determine it. Some
357 # services such as S3 may require the decoded content length
358 headers["X-Amz-Decoded-Content-Length"] = str(content_length)
360 if isinstance(body, (bytes, bytearray)):
361 body = io.BytesIO(body)
363 request["body"] = AwsChunkedWrapper(
364 body,
365 checksum_cls=checksum_cls,
366 checksum_name=location_name,
367 )
370def resolve_response_checksum_algorithms(
371 request, operation_model, params, supported_algorithms=None
372):
373 http_checksum = operation_model.http_checksum
374 mode_member = http_checksum.get("requestValidationModeMember")
375 if mode_member and mode_member in params:
376 if supported_algorithms is None:
377 supported_algorithms = _SUPPORTED_CHECKSUM_ALGORITHMS
378 response_algorithms = {
379 a.lower() for a in http_checksum.get("responseAlgorithms", [])
380 }
382 usable_algorithms = []
383 for algorithm in _ALGORITHMS_PRIORITY_LIST:
384 if algorithm not in response_algorithms:
385 continue
386 if algorithm in supported_algorithms:
387 usable_algorithms.append(algorithm)
389 checksum_context = request["context"].get("checksum", {})
390 checksum_context["response_algorithms"] = usable_algorithms
391 request["context"]["checksum"] = checksum_context
394def handle_checksum_body(http_response, response, context, operation_model):
395 headers = response["headers"]
396 checksum_context = context.get("checksum", {})
397 algorithms = checksum_context.get("response_algorithms")
399 if not algorithms:
400 return
402 for algorithm in algorithms:
403 header_name = "x-amz-checksum-%s" % algorithm
404 # If the header is not found, check the next algorithm
405 if header_name not in headers:
406 continue
408 # If a - is in the checksum this is not valid Base64. S3 returns
409 # checksums that include a -# suffix to indicate a checksum derived
410 # from the hash of all part checksums. We cannot wrap this response
411 if "-" in headers[header_name]:
412 continue
414 if operation_model.has_streaming_output:
415 response["body"] = _handle_streaming_response(
416 http_response, response, algorithm
417 )
418 else:
419 response["body"] = _handle_bytes_response(
420 http_response, response, algorithm
421 )
423 # Expose metadata that the checksum check actually occurred
424 checksum_context = response["context"].get("checksum", {})
425 checksum_context["response_algorithm"] = algorithm
426 response["context"]["checksum"] = checksum_context
427 return
429 logger.info(
430 f'Skipping checksum validation. Response did not contain one of the '
431 f'following algorithms: {algorithms}.'
432 )
435def _handle_streaming_response(http_response, response, algorithm):
436 checksum_cls = _CHECKSUM_CLS.get(algorithm)
437 header_name = "x-amz-checksum-%s" % algorithm
438 return StreamingChecksumBody(
439 http_response.raw,
440 response["headers"].get("content-length"),
441 checksum_cls(),
442 response["headers"][header_name],
443 )
446def _handle_bytes_response(http_response, response, algorithm):
447 body = http_response.content
448 header_name = "x-amz-checksum-%s" % algorithm
449 checksum_cls = _CHECKSUM_CLS.get(algorithm)
450 checksum = checksum_cls()
451 checksum.update(body)
452 expected = response["headers"][header_name]
453 if checksum.digest() != base64.b64decode(expected):
454 error_msg = (
455 "Expected checksum %s did not match calculated checksum: %s"
456 % (
457 expected,
458 checksum.b64digest(),
459 )
460 )
461 raise FlexibleChecksumError(error_msg=error_msg)
462 return body
465_CHECKSUM_CLS = {
466 "crc32": Crc32Checksum,
467 "sha1": Sha1Checksum,
468 "sha256": Sha256Checksum,
469}
470_CRT_CHECKSUM_ALGORITHMS = ["crc32", "crc32c"]
471if HAS_CRT:
472 # Use CRT checksum implementations if available
473 _CRT_CHECKSUM_CLS = {
474 "crc32": CrtCrc32Checksum,
475 "crc32c": CrtCrc32cChecksum,
476 }
477 _CHECKSUM_CLS.update(_CRT_CHECKSUM_CLS)
478 # Validate this list isn't out of sync with _CRT_CHECKSUM_CLS keys
479 assert all(
480 name in _CRT_CHECKSUM_ALGORITHMS for name in _CRT_CHECKSUM_CLS.keys()
481 )
482_SUPPORTED_CHECKSUM_ALGORITHMS = list(_CHECKSUM_CLS.keys())
483_ALGORITHMS_PRIORITY_LIST = ['crc32c', 'crc32', 'sha1', 'sha256']