Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/compress.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

79 statements  

1# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13""" 

14NOTE: All functions in this module are considered private and are 

15subject to abrupt breaking changes. Please do not use them directly. 

16 

17""" 

18 

19import io 

20import logging 

21from gzip import GzipFile 

22from gzip import compress as gzip_compress 

23 

24from botocore.compat import urlencode 

25from botocore.useragent import register_feature_id 

26from botocore.utils import determine_content_length 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31def maybe_compress_request(config, request_dict, operation_model): 

32 """Attempt to compress the request body using the modeled encodings.""" 

33 if _should_compress_request(config, request_dict, operation_model): 

34 for encoding in operation_model.request_compression['encodings']: 

35 encoder = COMPRESSION_MAPPING.get(encoding) 

36 if encoder is not None: 

37 logger.debug('Compressing request with %s encoding.', encoding) 

38 request_dict['body'] = encoder(request_dict['body']) 

39 _set_compression_header(request_dict['headers'], encoding) 

40 return 

41 else: 

42 logger.debug('Unsupported compression encoding: %s', encoding) 

43 

44 

45def _should_compress_request(config, request_dict, operation_model): 

46 if ( 

47 config.disable_request_compression is not True 

48 and config.signature_version != 'v2' 

49 and operation_model.request_compression is not None 

50 ): 

51 if not _is_compressible_type(request_dict): 

52 body_type = type(request_dict['body']) 

53 log_msg = 'Body type %s does not support compression.' 

54 logger.debug(log_msg, body_type) 

55 return False 

56 

57 if operation_model.has_streaming_input: 

58 streaming_input = operation_model.get_streaming_input() 

59 streaming_metadata = streaming_input.metadata 

60 return 'requiresLength' not in streaming_metadata 

61 

62 body_size = _get_body_size(request_dict['body']) 

63 min_size = config.request_min_compression_size_bytes 

64 return min_size <= body_size 

65 

66 return False 

67 

68 

69def _is_compressible_type(request_dict): 

70 body = request_dict['body'] 

71 # Coerce dict to a format compatible with compression. 

72 if isinstance(body, dict): 

73 body = urlencode(body, doseq=True, encoding='utf-8').encode('utf-8') 

74 request_dict['body'] = body 

75 is_supported_type = isinstance(body, (str, bytes, bytearray)) 

76 return is_supported_type or hasattr(body, 'read') 

77 

78 

79def _get_body_size(body): 

80 size = determine_content_length(body) 

81 if size is None: 

82 logger.debug( 

83 'Unable to get length of the request body: %s. ' 

84 'Skipping compression.', 

85 body, 

86 ) 

87 size = 0 

88 return size 

89 

90 

91def _gzip_compress_body(body): 

92 register_feature_id('GZIP_REQUEST_COMPRESSION') 

93 if isinstance(body, str): 

94 return gzip_compress(body.encode('utf-8')) 

95 elif isinstance(body, (bytes, bytearray)): 

96 return gzip_compress(body) 

97 elif hasattr(body, 'read'): 

98 if hasattr(body, 'seek') and hasattr(body, 'tell'): 

99 current_position = body.tell() 

100 compressed_obj = _gzip_compress_fileobj(body) 

101 body.seek(current_position) 

102 return compressed_obj 

103 return _gzip_compress_fileobj(body) 

104 

105 

106def _gzip_compress_fileobj(body): 

107 compressed_obj = io.BytesIO() 

108 with GzipFile(fileobj=compressed_obj, mode='wb') as gz: 

109 while True: 

110 chunk = body.read(8192) 

111 if not chunk: 

112 break 

113 if isinstance(chunk, str): 

114 chunk = chunk.encode('utf-8') 

115 gz.write(chunk) 

116 compressed_obj.seek(0) 

117 return compressed_obj 

118 

119 

120def _set_compression_header(headers, encoding): 

121 ce_header = headers.get('Content-Encoding') 

122 if ce_header is None: 

123 headers['Content-Encoding'] = encoding 

124 else: 

125 headers['Content-Encoding'] = f'{ce_header},{encoding}' 

126 

127 

128COMPRESSION_MAPPING = {'gzip': _gzip_compress_body}