Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/compress.py: 21%
75 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13"""
14NOTE: All functions in this module are considered private and are
15subject to abrupt breaking changes. Please do not use them directly.
17"""
19import io
20import logging
21from gzip import GzipFile
22from gzip import compress as gzip_compress
24from botocore.compat import urlencode
25from botocore.utils import determine_content_length
27logger = logging.getLogger(__name__)
30def maybe_compress_request(config, request_dict, operation_model):
31 """Attempt to compress the request body using the modeled encodings."""
32 if _should_compress_request(config, request_dict, operation_model):
33 for encoding in operation_model.request_compression['encodings']:
34 encoder = COMPRESSION_MAPPING.get(encoding)
35 if encoder is not None:
36 logger.debug('Compressing request with %s encoding.', encoding)
37 request_dict['body'] = encoder(request_dict['body'])
38 _set_compression_header(request_dict['headers'], encoding)
39 return
40 else:
41 logger.debug('Unsupported compression encoding: %s', encoding)
44def _should_compress_request(config, request_dict, operation_model):
45 if (
46 config.disable_request_compression is not True
47 and config.signature_version != 'v2'
48 and operation_model.request_compression is not None
49 ):
50 if not _is_compressible_type(request_dict):
51 body_type = type(request_dict['body'])
52 log_msg = 'Body type %s does not support compression.'
53 logger.debug(log_msg, body_type)
54 return False
56 if operation_model.has_streaming_input:
57 streaming_input = operation_model.get_streaming_input()
58 streaming_metadata = streaming_input.metadata
59 return 'requiresLength' not in streaming_metadata
61 body_size = _get_body_size(request_dict['body'])
62 min_size = config.request_min_compression_size_bytes
63 return min_size <= body_size
65 return False
68def _is_compressible_type(request_dict):
69 body = request_dict['body']
70 # Coerce dict to a format compatible with compression.
71 if isinstance(body, dict):
72 body = urlencode(body, doseq=True, encoding='utf-8').encode('utf-8')
73 request_dict['body'] = body
74 is_supported_type = isinstance(body, (str, bytes, bytearray))
75 return is_supported_type or hasattr(body, 'read')
78def _get_body_size(body):
79 size = determine_content_length(body)
80 if size is None:
81 logger.debug(
82 'Unable to get length of the request body: %s. '
83 'Skipping compression.',
84 body,
85 )
86 size = 0
87 return size
90def _gzip_compress_body(body):
91 if isinstance(body, str):
92 return gzip_compress(body.encode('utf-8'))
93 elif isinstance(body, (bytes, bytearray)):
94 return gzip_compress(body)
95 elif hasattr(body, 'read'):
96 if hasattr(body, 'seek') and hasattr(body, 'tell'):
97 current_position = body.tell()
98 compressed_obj = _gzip_compress_fileobj(body)
99 body.seek(current_position)
100 return compressed_obj
101 return _gzip_compress_fileobj(body)
104def _gzip_compress_fileobj(body):
105 compressed_obj = io.BytesIO()
106 with GzipFile(fileobj=compressed_obj, mode='wb') as gz:
107 while True:
108 chunk = body.read(8192)
109 if not chunk:
110 break
111 if isinstance(chunk, str):
112 chunk = chunk.encode('utf-8')
113 gz.write(chunk)
114 compressed_obj.seek(0)
115 return compressed_obj
118def _set_compression_header(headers, encoding):
119 ce_header = headers.get('Content-Encoding')
120 if ce_header is None:
121 headers['Content-Encoding'] = encoding
122 else:
123 headers['Content-Encoding'] = f'{ce_header},{encoding}'
126COMPRESSION_MAPPING = {'gzip': _gzip_compress_body}