1# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13"""
14NOTE: All functions in this module are considered private and are
15subject to abrupt breaking changes. Please do not use them directly.
16
17"""
18
19import io
20import logging
21from gzip import GzipFile
22from gzip import compress as gzip_compress
23
24from botocore.compat import urlencode
25from botocore.useragent import register_feature_id
26from botocore.utils import determine_content_length
27
28logger = logging.getLogger(__name__)
29
30
31def maybe_compress_request(config, request_dict, operation_model):
32 """Attempt to compress the request body using the modeled encodings."""
33 if _should_compress_request(config, request_dict, operation_model):
34 for encoding in operation_model.request_compression['encodings']:
35 encoder = COMPRESSION_MAPPING.get(encoding)
36 if encoder is not None:
37 logger.debug('Compressing request with %s encoding.', encoding)
38 request_dict['body'] = encoder(request_dict['body'])
39 _set_compression_header(request_dict['headers'], encoding)
40 return
41 else:
42 logger.debug('Unsupported compression encoding: %s', encoding)
43
44
45def _should_compress_request(config, request_dict, operation_model):
46 if (
47 config.disable_request_compression is not True
48 and config.signature_version != 'v2'
49 and operation_model.request_compression is not None
50 ):
51 if not _is_compressible_type(request_dict):
52 body_type = type(request_dict['body'])
53 log_msg = 'Body type %s does not support compression.'
54 logger.debug(log_msg, body_type)
55 return False
56
57 if operation_model.has_streaming_input:
58 streaming_input = operation_model.get_streaming_input()
59 streaming_metadata = streaming_input.metadata
60 return 'requiresLength' not in streaming_metadata
61
62 body_size = _get_body_size(request_dict['body'])
63 min_size = config.request_min_compression_size_bytes
64 return min_size <= body_size
65
66 return False
67
68
69def _is_compressible_type(request_dict):
70 body = request_dict['body']
71 # Coerce dict to a format compatible with compression.
72 if isinstance(body, dict):
73 body = urlencode(body, doseq=True, encoding='utf-8').encode('utf-8')
74 request_dict['body'] = body
75 is_supported_type = isinstance(body, (str, bytes, bytearray))
76 return is_supported_type or hasattr(body, 'read')
77
78
79def _get_body_size(body):
80 size = determine_content_length(body)
81 if size is None:
82 logger.debug(
83 'Unable to get length of the request body: %s. '
84 'Skipping compression.',
85 body,
86 )
87 size = 0
88 return size
89
90
91def _gzip_compress_body(body):
92 register_feature_id('GZIP_REQUEST_COMPRESSION')
93 if isinstance(body, str):
94 return gzip_compress(body.encode('utf-8'))
95 elif isinstance(body, (bytes, bytearray)):
96 return gzip_compress(body)
97 elif hasattr(body, 'read'):
98 if hasattr(body, 'seek') and hasattr(body, 'tell'):
99 current_position = body.tell()
100 compressed_obj = _gzip_compress_fileobj(body)
101 body.seek(current_position)
102 return compressed_obj
103 return _gzip_compress_fileobj(body)
104
105
106def _gzip_compress_fileobj(body):
107 compressed_obj = io.BytesIO()
108 with GzipFile(fileobj=compressed_obj, mode='wb') as gz:
109 while True:
110 chunk = body.read(8192)
111 if not chunk:
112 break
113 if isinstance(chunk, str):
114 chunk = chunk.encode('utf-8')
115 gz.write(chunk)
116 compressed_obj.seek(0)
117 return compressed_obj
118
119
120def _set_compression_header(headers, encoding):
121 ce_header = headers.get('Content-Encoding')
122 if ce_header is None:
123 headers['Content-Encoding'] = encoding
124 else:
125 headers['Content-Encoding'] = f'{ce_header},{encoding}'
126
127
128COMPRESSION_MAPPING = {'gzip': _gzip_compress_body}