Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/s3transfer/copies.py: 26%
76 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import copy
14import math
16from s3transfer.tasks import (
17 CompleteMultipartUploadTask,
18 CreateMultipartUploadTask,
19 SubmissionTask,
20 Task,
21)
22from s3transfer.utils import (
23 ChunksizeAdjuster,
24 calculate_range_parameter,
25 get_callbacks,
26 get_filtered_dict,
27)
30class CopySubmissionTask(SubmissionTask):
31 """Task for submitting tasks to execute a copy"""
33 EXTRA_ARGS_TO_HEAD_ARGS_MAPPING = {
34 'CopySourceIfMatch': 'IfMatch',
35 'CopySourceIfModifiedSince': 'IfModifiedSince',
36 'CopySourceIfNoneMatch': 'IfNoneMatch',
37 'CopySourceIfUnmodifiedSince': 'IfUnmodifiedSince',
38 'CopySourceSSECustomerKey': 'SSECustomerKey',
39 'CopySourceSSECustomerAlgorithm': 'SSECustomerAlgorithm',
40 'CopySourceSSECustomerKeyMD5': 'SSECustomerKeyMD5',
41 'RequestPayer': 'RequestPayer',
42 'ExpectedBucketOwner': 'ExpectedBucketOwner',
43 }
45 UPLOAD_PART_COPY_ARGS = [
46 'CopySourceIfMatch',
47 'CopySourceIfModifiedSince',
48 'CopySourceIfNoneMatch',
49 'CopySourceIfUnmodifiedSince',
50 'CopySourceSSECustomerKey',
51 'CopySourceSSECustomerAlgorithm',
52 'CopySourceSSECustomerKeyMD5',
53 'SSECustomerKey',
54 'SSECustomerAlgorithm',
55 'SSECustomerKeyMD5',
56 'RequestPayer',
57 'ExpectedBucketOwner',
58 ]
60 CREATE_MULTIPART_ARGS_BLACKLIST = [
61 'CopySourceIfMatch',
62 'CopySourceIfModifiedSince',
63 'CopySourceIfNoneMatch',
64 'CopySourceIfUnmodifiedSince',
65 'CopySourceSSECustomerKey',
66 'CopySourceSSECustomerAlgorithm',
67 'CopySourceSSECustomerKeyMD5',
68 'MetadataDirective',
69 'TaggingDirective',
70 ]
72 COMPLETE_MULTIPART_ARGS = [
73 'SSECustomerKey',
74 'SSECustomerAlgorithm',
75 'SSECustomerKeyMD5',
76 'RequestPayer',
77 'ExpectedBucketOwner',
78 ]
80 def _submit(
81 self, client, config, osutil, request_executor, transfer_future
82 ):
83 """
84 :param client: The client associated with the transfer manager
86 :type config: s3transfer.manager.TransferConfig
87 :param config: The transfer config associated with the transfer
88 manager
90 :type osutil: s3transfer.utils.OSUtil
91 :param osutil: The os utility associated to the transfer manager
93 :type request_executor: s3transfer.futures.BoundedExecutor
94 :param request_executor: The request executor associated with the
95 transfer manager
97 :type transfer_future: s3transfer.futures.TransferFuture
98 :param transfer_future: The transfer future associated with the
99 transfer request that tasks are being submitted for
100 """
101 # Determine the size if it was not provided
102 if transfer_future.meta.size is None:
103 # If a size was not provided figure out the size for the
104 # user. Note that we will only use the client provided to
105 # the TransferManager. If the object is outside of the region
106 # of the client, they may have to provide the file size themselves
107 # with a completely new client.
108 call_args = transfer_future.meta.call_args
109 head_object_request = (
110 self._get_head_object_request_from_copy_source(
111 call_args.copy_source
112 )
113 )
114 extra_args = call_args.extra_args
116 # Map any values that may be used in the head object that is
117 # used in the copy object
118 for param, value in extra_args.items():
119 if param in self.EXTRA_ARGS_TO_HEAD_ARGS_MAPPING:
120 head_object_request[
121 self.EXTRA_ARGS_TO_HEAD_ARGS_MAPPING[param]
122 ] = value
124 response = call_args.source_client.head_object(
125 **head_object_request
126 )
127 transfer_future.meta.provide_transfer_size(
128 response['ContentLength']
129 )
131 # If it is greater than threshold do a multipart copy, otherwise
132 # do a regular copy object.
133 if transfer_future.meta.size < config.multipart_threshold:
134 self._submit_copy_request(
135 client, config, osutil, request_executor, transfer_future
136 )
137 else:
138 self._submit_multipart_request(
139 client, config, osutil, request_executor, transfer_future
140 )
142 def _submit_copy_request(
143 self, client, config, osutil, request_executor, transfer_future
144 ):
145 call_args = transfer_future.meta.call_args
147 # Get the needed progress callbacks for the task
148 progress_callbacks = get_callbacks(transfer_future, 'progress')
150 # Submit the request of a single copy.
151 self._transfer_coordinator.submit(
152 request_executor,
153 CopyObjectTask(
154 transfer_coordinator=self._transfer_coordinator,
155 main_kwargs={
156 'client': client,
157 'copy_source': call_args.copy_source,
158 'bucket': call_args.bucket,
159 'key': call_args.key,
160 'extra_args': call_args.extra_args,
161 'callbacks': progress_callbacks,
162 'size': transfer_future.meta.size,
163 },
164 is_final=True,
165 ),
166 )
168 def _submit_multipart_request(
169 self, client, config, osutil, request_executor, transfer_future
170 ):
171 call_args = transfer_future.meta.call_args
173 # Submit the request to create a multipart upload and make sure it
174 # does not include any of the arguments used for copy part.
175 create_multipart_extra_args = {}
176 for param, val in call_args.extra_args.items():
177 if param not in self.CREATE_MULTIPART_ARGS_BLACKLIST:
178 create_multipart_extra_args[param] = val
180 create_multipart_future = self._transfer_coordinator.submit(
181 request_executor,
182 CreateMultipartUploadTask(
183 transfer_coordinator=self._transfer_coordinator,
184 main_kwargs={
185 'client': client,
186 'bucket': call_args.bucket,
187 'key': call_args.key,
188 'extra_args': create_multipart_extra_args,
189 },
190 ),
191 )
193 # Determine how many parts are needed based on filesize and
194 # desired chunksize.
195 part_size = config.multipart_chunksize
196 adjuster = ChunksizeAdjuster()
197 part_size = adjuster.adjust_chunksize(
198 part_size, transfer_future.meta.size
199 )
200 num_parts = int(
201 math.ceil(transfer_future.meta.size / float(part_size))
202 )
204 # Submit requests to upload the parts of the file.
205 part_futures = []
206 progress_callbacks = get_callbacks(transfer_future, 'progress')
208 for part_number in range(1, num_parts + 1):
209 extra_part_args = self._extra_upload_part_args(
210 call_args.extra_args
211 )
212 # The part number for upload part starts at 1 while the
213 # range parameter starts at zero, so just subtract 1 off of
214 # the part number
215 extra_part_args['CopySourceRange'] = calculate_range_parameter(
216 part_size,
217 part_number - 1,
218 num_parts,
219 transfer_future.meta.size,
220 )
221 # Get the size of the part copy as well for the progress
222 # callbacks.
223 size = self._get_transfer_size(
224 part_size,
225 part_number - 1,
226 num_parts,
227 transfer_future.meta.size,
228 )
229 # Get the checksum algorithm of the multipart request.
230 checksum_algorithm = call_args.extra_args.get("ChecksumAlgorithm")
231 part_futures.append(
232 self._transfer_coordinator.submit(
233 request_executor,
234 CopyPartTask(
235 transfer_coordinator=self._transfer_coordinator,
236 main_kwargs={
237 'client': client,
238 'copy_source': call_args.copy_source,
239 'bucket': call_args.bucket,
240 'key': call_args.key,
241 'part_number': part_number,
242 'extra_args': extra_part_args,
243 'callbacks': progress_callbacks,
244 'size': size,
245 'checksum_algorithm': checksum_algorithm,
246 },
247 pending_main_kwargs={
248 'upload_id': create_multipart_future
249 },
250 ),
251 )
252 )
254 complete_multipart_extra_args = self._extra_complete_multipart_args(
255 call_args.extra_args
256 )
257 # Submit the request to complete the multipart upload.
258 self._transfer_coordinator.submit(
259 request_executor,
260 CompleteMultipartUploadTask(
261 transfer_coordinator=self._transfer_coordinator,
262 main_kwargs={
263 'client': client,
264 'bucket': call_args.bucket,
265 'key': call_args.key,
266 'extra_args': complete_multipart_extra_args,
267 },
268 pending_main_kwargs={
269 'upload_id': create_multipart_future,
270 'parts': part_futures,
271 },
272 is_final=True,
273 ),
274 )
276 def _get_head_object_request_from_copy_source(self, copy_source):
277 if isinstance(copy_source, dict):
278 return copy.copy(copy_source)
279 else:
280 raise TypeError(
281 'Expecting dictionary formatted: '
282 '{"Bucket": bucket_name, "Key": key} '
283 'but got %s or type %s.' % (copy_source, type(copy_source))
284 )
286 def _extra_upload_part_args(self, extra_args):
287 # Only the args in COPY_PART_ARGS actually need to be passed
288 # onto the upload_part_copy calls.
289 return get_filtered_dict(extra_args, self.UPLOAD_PART_COPY_ARGS)
291 def _extra_complete_multipart_args(self, extra_args):
292 return get_filtered_dict(extra_args, self.COMPLETE_MULTIPART_ARGS)
294 def _get_transfer_size(
295 self, part_size, part_index, num_parts, total_transfer_size
296 ):
297 if part_index == num_parts - 1:
298 # The last part may be different in size then the rest of the
299 # parts.
300 return total_transfer_size - (part_index * part_size)
301 return part_size
304class CopyObjectTask(Task):
305 """Task to do a nonmultipart copy"""
307 def _main(
308 self, client, copy_source, bucket, key, extra_args, callbacks, size
309 ):
310 """
311 :param client: The client to use when calling PutObject
312 :param copy_source: The CopySource parameter to use
313 :param bucket: The name of the bucket to copy to
314 :param key: The name of the key to copy to
315 :param extra_args: A dictionary of any extra arguments that may be
316 used in the upload.
317 :param callbacks: List of callbacks to call after copy
318 :param size: The size of the transfer. This value is passed into
319 the callbacks
321 """
322 client.copy_object(
323 CopySource=copy_source, Bucket=bucket, Key=key, **extra_args
324 )
325 for callback in callbacks:
326 callback(bytes_transferred=size)
329class CopyPartTask(Task):
330 """Task to upload a part in a multipart copy"""
332 def _main(
333 self,
334 client,
335 copy_source,
336 bucket,
337 key,
338 upload_id,
339 part_number,
340 extra_args,
341 callbacks,
342 size,
343 checksum_algorithm=None,
344 ):
345 """
346 :param client: The client to use when calling PutObject
347 :param copy_source: The CopySource parameter to use
348 :param bucket: The name of the bucket to upload to
349 :param key: The name of the key to upload to
350 :param upload_id: The id of the upload
351 :param part_number: The number representing the part of the multipart
352 upload
353 :param extra_args: A dictionary of any extra arguments that may be
354 used in the upload.
355 :param callbacks: List of callbacks to call after copy part
356 :param size: The size of the transfer. This value is passed into
357 the callbacks
358 :param checksum_algorithm: The algorithm that was used to create the multipart
359 upload
361 :rtype: dict
362 :returns: A dictionary representing a part::
364 {'Etag': etag_value, 'PartNumber': part_number}
366 This value can be appended to a list to be used to complete
367 the multipart upload. If a checksum is in the response,
368 it will also be included.
369 """
370 response = client.upload_part_copy(
371 CopySource=copy_source,
372 Bucket=bucket,
373 Key=key,
374 UploadId=upload_id,
375 PartNumber=part_number,
376 **extra_args,
377 )
378 for callback in callbacks:
379 callback(bytes_transferred=size)
380 etag = response['CopyPartResult']['ETag']
381 part_metadata = {'ETag': etag, 'PartNumber': part_number}
382 if checksum_algorithm:
383 checksum_member = f'Checksum{checksum_algorithm.upper()}'
384 if checksum_member in response['CopyPartResult']:
385 part_metadata[checksum_member] = response['CopyPartResult'][
386 checksum_member
387 ]
388 return part_metadata