Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/compress.py: 21%

75 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13""" 

14NOTE: All functions in this module are considered private and are 

15subject to abrupt breaking changes. Please do not use them directly. 

16 

17""" 

18 

19import io 

20import logging 

21from gzip import GzipFile 

22from gzip import compress as gzip_compress 

23 

24from botocore.compat import urlencode 

25from botocore.utils import determine_content_length 

26 

27logger = logging.getLogger(__name__) 

28 

29 

30def maybe_compress_request(config, request_dict, operation_model): 

31 """Attempt to compress the request body using the modeled encodings.""" 

32 if _should_compress_request(config, request_dict, operation_model): 

33 for encoding in operation_model.request_compression['encodings']: 

34 encoder = COMPRESSION_MAPPING.get(encoding) 

35 if encoder is not None: 

36 logger.debug('Compressing request with %s encoding.', encoding) 

37 request_dict['body'] = encoder(request_dict['body']) 

38 _set_compression_header(request_dict['headers'], encoding) 

39 return 

40 else: 

41 logger.debug('Unsupported compression encoding: %s', encoding) 

42 

43 

44def _should_compress_request(config, request_dict, operation_model): 

45 if ( 

46 config.disable_request_compression is not True 

47 and config.signature_version != 'v2' 

48 and operation_model.request_compression is not None 

49 ): 

50 if not _is_compressible_type(request_dict): 

51 body_type = type(request_dict['body']) 

52 log_msg = 'Body type %s does not support compression.' 

53 logger.debug(log_msg, body_type) 

54 return False 

55 

56 if operation_model.has_streaming_input: 

57 streaming_input = operation_model.get_streaming_input() 

58 streaming_metadata = streaming_input.metadata 

59 return 'requiresLength' not in streaming_metadata 

60 

61 body_size = _get_body_size(request_dict['body']) 

62 min_size = config.request_min_compression_size_bytes 

63 return min_size <= body_size 

64 

65 return False 

66 

67 

68def _is_compressible_type(request_dict): 

69 body = request_dict['body'] 

70 # Coerce dict to a format compatible with compression. 

71 if isinstance(body, dict): 

72 body = urlencode(body, doseq=True, encoding='utf-8').encode('utf-8') 

73 request_dict['body'] = body 

74 is_supported_type = isinstance(body, (str, bytes, bytearray)) 

75 return is_supported_type or hasattr(body, 'read') 

76 

77 

78def _get_body_size(body): 

79 size = determine_content_length(body) 

80 if size is None: 

81 logger.debug( 

82 'Unable to get length of the request body: %s. ' 

83 'Skipping compression.', 

84 body, 

85 ) 

86 size = 0 

87 return size 

88 

89 

90def _gzip_compress_body(body): 

91 if isinstance(body, str): 

92 return gzip_compress(body.encode('utf-8')) 

93 elif isinstance(body, (bytes, bytearray)): 

94 return gzip_compress(body) 

95 elif hasattr(body, 'read'): 

96 if hasattr(body, 'seek') and hasattr(body, 'tell'): 

97 current_position = body.tell() 

98 compressed_obj = _gzip_compress_fileobj(body) 

99 body.seek(current_position) 

100 return compressed_obj 

101 return _gzip_compress_fileobj(body) 

102 

103 

104def _gzip_compress_fileobj(body): 

105 compressed_obj = io.BytesIO() 

106 with GzipFile(fileobj=compressed_obj, mode='wb') as gz: 

107 while True: 

108 chunk = body.read(8192) 

109 if not chunk: 

110 break 

111 if isinstance(chunk, str): 

112 chunk = chunk.encode('utf-8') 

113 gz.write(chunk) 

114 compressed_obj.seek(0) 

115 return compressed_obj 

116 

117 

118def _set_compression_header(headers, encoding): 

119 ce_header = headers.get('Content-Encoding') 

120 if ce_header is None: 

121 headers['Content-Encoding'] = encoding 

122 else: 

123 headers['Content-Encoding'] = f'{ce_header},{encoding}' 

124 

125 

126COMPRESSION_MAPPING = {'gzip': _gzip_compress_body}