Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/s3transfer/copies.py: 24%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import copy
14import math
16from botocore.exceptions import ClientError
18from s3transfer.exceptions import S3CopyFailedError
19from s3transfer.tasks import (
20 CompleteMultipartUploadTask,
21 CreateMultipartUploadTask,
22 SubmissionTask,
23 Task,
24)
25from s3transfer.utils import (
26 ChunksizeAdjuster,
27 calculate_range_parameter,
28 get_callbacks,
29 get_filtered_dict,
30)
33class CopySubmissionTask(SubmissionTask):
34 """Task for submitting tasks to execute a copy"""
36 EXTRA_ARGS_TO_HEAD_ARGS_MAPPING = {
37 'CopySourceIfMatch': 'IfMatch',
38 'CopySourceIfModifiedSince': 'IfModifiedSince',
39 'CopySourceIfNoneMatch': 'IfNoneMatch',
40 'CopySourceIfUnmodifiedSince': 'IfUnmodifiedSince',
41 'CopySourceSSECustomerKey': 'SSECustomerKey',
42 'CopySourceSSECustomerAlgorithm': 'SSECustomerAlgorithm',
43 'CopySourceSSECustomerKeyMD5': 'SSECustomerKeyMD5',
44 'RequestPayer': 'RequestPayer',
45 'ExpectedBucketOwner': 'ExpectedBucketOwner',
46 }
48 UPLOAD_PART_COPY_ARGS = [
49 'CopySourceIfMatch',
50 'CopySourceIfModifiedSince',
51 'CopySourceIfNoneMatch',
52 'CopySourceIfUnmodifiedSince',
53 'CopySourceSSECustomerKey',
54 'CopySourceSSECustomerAlgorithm',
55 'CopySourceSSECustomerKeyMD5',
56 'SSECustomerKey',
57 'SSECustomerAlgorithm',
58 'SSECustomerKeyMD5',
59 'RequestPayer',
60 'ExpectedBucketOwner',
61 ]
63 CREATE_MULTIPART_ARGS_BLACKLIST = [
64 'CopySourceIfMatch',
65 'CopySourceIfModifiedSince',
66 'CopySourceIfNoneMatch',
67 'CopySourceIfUnmodifiedSince',
68 'CopySourceSSECustomerKey',
69 'CopySourceSSECustomerAlgorithm',
70 'CopySourceSSECustomerKeyMD5',
71 'MetadataDirective',
72 'TaggingDirective',
73 ]
75 COMPLETE_MULTIPART_ARGS = [
76 'SSECustomerKey',
77 'SSECustomerAlgorithm',
78 'SSECustomerKeyMD5',
79 'RequestPayer',
80 'ExpectedBucketOwner',
81 ]
83 def _submit(
84 self, client, config, osutil, request_executor, transfer_future
85 ):
86 """
87 :param client: The client associated with the transfer manager
89 :type config: s3transfer.manager.TransferConfig
90 :param config: The transfer config associated with the transfer
91 manager
93 :type osutil: s3transfer.utils.OSUtil
94 :param osutil: The os utility associated to the transfer manager
96 :type request_executor: s3transfer.futures.BoundedExecutor
97 :param request_executor: The request executor associated with the
98 transfer manager
100 :type transfer_future: s3transfer.futures.TransferFuture
101 :param transfer_future: The transfer future associated with the
102 transfer request that tasks are being submitted for
103 """
104 if (
105 transfer_future.meta.size is None
106 or transfer_future.meta.etag is None
107 ):
108 # If a size was not provided figure out the size for the
109 # user. Note that we will only use the client provided to
110 # the TransferManager. If the object is outside of the region
111 # of the client, they may have to provide the file size themselves
112 # with a completely new client.
113 call_args = transfer_future.meta.call_args
114 head_object_request = (
115 self._get_head_object_request_from_copy_source(
116 call_args.copy_source
117 )
118 )
119 extra_args = call_args.extra_args
121 # Map any values that may be used in the head object that is
122 # used in the copy object
123 for param, value in extra_args.items():
124 if param in self.EXTRA_ARGS_TO_HEAD_ARGS_MAPPING:
125 head_object_request[
126 self.EXTRA_ARGS_TO_HEAD_ARGS_MAPPING[param]
127 ] = value
129 response = call_args.source_client.head_object(
130 **head_object_request
131 )
132 transfer_future.meta.provide_transfer_size(
133 response['ContentLength']
134 )
135 # Provide an etag to ensure a stored object is not modified
136 # during a multipart copy.
137 transfer_future.meta.provide_object_etag(response.get('ETag'))
139 # If it is greater than threshold do a multipart copy, otherwise
140 # do a regular copy object.
141 if transfer_future.meta.size < config.multipart_threshold:
142 self._submit_copy_request(
143 client, config, osutil, request_executor, transfer_future
144 )
145 else:
146 self._submit_multipart_request(
147 client, config, osutil, request_executor, transfer_future
148 )
150 def _submit_copy_request(
151 self, client, config, osutil, request_executor, transfer_future
152 ):
153 call_args = transfer_future.meta.call_args
155 # Get the needed progress callbacks for the task
156 progress_callbacks = get_callbacks(transfer_future, 'progress')
158 # Submit the request of a single copy.
159 self._transfer_coordinator.submit(
160 request_executor,
161 CopyObjectTask(
162 transfer_coordinator=self._transfer_coordinator,
163 main_kwargs={
164 'client': client,
165 'copy_source': call_args.copy_source,
166 'bucket': call_args.bucket,
167 'key': call_args.key,
168 'extra_args': call_args.extra_args,
169 'callbacks': progress_callbacks,
170 'size': transfer_future.meta.size,
171 },
172 is_final=True,
173 ),
174 )
176 def _submit_multipart_request(
177 self, client, config, osutil, request_executor, transfer_future
178 ):
179 call_args = transfer_future.meta.call_args
181 # Submit the request to create a multipart upload and make sure it
182 # does not include any of the arguments used for copy part.
183 create_multipart_extra_args = {}
184 for param, val in call_args.extra_args.items():
185 if param not in self.CREATE_MULTIPART_ARGS_BLACKLIST:
186 create_multipart_extra_args[param] = val
188 create_multipart_future = self._transfer_coordinator.submit(
189 request_executor,
190 CreateMultipartUploadTask(
191 transfer_coordinator=self._transfer_coordinator,
192 main_kwargs={
193 'client': client,
194 'bucket': call_args.bucket,
195 'key': call_args.key,
196 'extra_args': create_multipart_extra_args,
197 },
198 ),
199 )
201 # Determine how many parts are needed based on filesize and
202 # desired chunksize.
203 part_size = config.multipart_chunksize
204 adjuster = ChunksizeAdjuster()
205 part_size = adjuster.adjust_chunksize(
206 part_size, transfer_future.meta.size
207 )
208 num_parts = int(
209 math.ceil(transfer_future.meta.size / float(part_size))
210 )
212 # Submit requests to upload the parts of the file.
213 part_futures = []
214 progress_callbacks = get_callbacks(transfer_future, 'progress')
216 for part_number in range(1, num_parts + 1):
217 extra_part_args = self._extra_upload_part_args(
218 call_args.extra_args
219 )
220 # The part number for upload part starts at 1 while the
221 # range parameter starts at zero, so just subtract 1 off of
222 # the part number
223 extra_part_args['CopySourceRange'] = calculate_range_parameter(
224 part_size,
225 part_number - 1,
226 num_parts,
227 transfer_future.meta.size,
228 )
229 if transfer_future.meta.etag is not None:
230 extra_part_args['CopySourceIfMatch'] = (
231 transfer_future.meta.etag
232 )
233 # Get the size of the part copy as well for the progress
234 # callbacks.
235 size = self._get_transfer_size(
236 part_size,
237 part_number - 1,
238 num_parts,
239 transfer_future.meta.size,
240 )
241 # Get the checksum algorithm of the multipart request.
242 checksum_algorithm = call_args.extra_args.get("ChecksumAlgorithm")
243 part_futures.append(
244 self._transfer_coordinator.submit(
245 request_executor,
246 CopyPartTask(
247 transfer_coordinator=self._transfer_coordinator,
248 main_kwargs={
249 'client': client,
250 'copy_source': call_args.copy_source,
251 'bucket': call_args.bucket,
252 'key': call_args.key,
253 'part_number': part_number,
254 'extra_args': extra_part_args,
255 'callbacks': progress_callbacks,
256 'size': size,
257 'checksum_algorithm': checksum_algorithm,
258 },
259 pending_main_kwargs={
260 'upload_id': create_multipart_future
261 },
262 ),
263 )
264 )
266 complete_multipart_extra_args = self._extra_complete_multipart_args(
267 call_args.extra_args
268 )
269 # Submit the request to complete the multipart upload.
270 self._transfer_coordinator.submit(
271 request_executor,
272 CompleteMultipartUploadTask(
273 transfer_coordinator=self._transfer_coordinator,
274 main_kwargs={
275 'client': client,
276 'bucket': call_args.bucket,
277 'key': call_args.key,
278 'extra_args': complete_multipart_extra_args,
279 },
280 pending_main_kwargs={
281 'upload_id': create_multipart_future,
282 'parts': part_futures,
283 },
284 is_final=True,
285 ),
286 )
288 def _get_head_object_request_from_copy_source(self, copy_source):
289 if isinstance(copy_source, dict):
290 return copy.copy(copy_source)
291 else:
292 raise TypeError(
293 'Expecting dictionary formatted: '
294 '{"Bucket": bucket_name, "Key": key} '
295 f'but got {copy_source} or type {type(copy_source)}.'
296 )
298 def _extra_upload_part_args(self, extra_args):
299 # Only the args in COPY_PART_ARGS actually need to be passed
300 # onto the upload_part_copy calls.
301 return get_filtered_dict(extra_args, self.UPLOAD_PART_COPY_ARGS)
303 def _extra_complete_multipart_args(self, extra_args):
304 return get_filtered_dict(extra_args, self.COMPLETE_MULTIPART_ARGS)
306 def _get_transfer_size(
307 self, part_size, part_index, num_parts, total_transfer_size
308 ):
309 if part_index == num_parts - 1:
310 # The last part may be different in size then the rest of the
311 # parts.
312 return total_transfer_size - (part_index * part_size)
313 return part_size
316class CopyObjectTask(Task):
317 """Task to do a nonmultipart copy"""
319 def _main(
320 self, client, copy_source, bucket, key, extra_args, callbacks, size
321 ):
322 """
323 :param client: The client to use when calling PutObject
324 :param copy_source: The CopySource parameter to use
325 :param bucket: The name of the bucket to copy to
326 :param key: The name of the key to copy to
327 :param extra_args: A dictionary of any extra arguments that may be
328 used in the upload.
329 :param callbacks: List of callbacks to call after copy
330 :param size: The size of the transfer. This value is passed into
331 the callbacks
333 """
334 client.copy_object(
335 CopySource=copy_source, Bucket=bucket, Key=key, **extra_args
336 )
337 for callback in callbacks:
338 callback(bytes_transferred=size)
341class CopyPartTask(Task):
342 """Task to upload a part in a multipart copy"""
344 def _main(
345 self,
346 client,
347 copy_source,
348 bucket,
349 key,
350 upload_id,
351 part_number,
352 extra_args,
353 callbacks,
354 size,
355 checksum_algorithm=None,
356 ):
357 """
358 :param client: The client to use when calling PutObject
359 :param copy_source: The CopySource parameter to use
360 :param bucket: The name of the bucket to upload to
361 :param key: The name of the key to upload to
362 :param upload_id: The id of the upload
363 :param part_number: The number representing the part of the multipart
364 upload
365 :param extra_args: A dictionary of any extra arguments that may be
366 used in the upload.
367 :param callbacks: List of callbacks to call after copy part
368 :param size: The size of the transfer. This value is passed into
369 the callbacks
370 :param checksum_algorithm: The algorithm that was used to create the multipart
371 upload
373 :rtype: dict
374 :returns: A dictionary representing a part::
376 {'Etag': etag_value, 'PartNumber': part_number}
378 This value can be appended to a list to be used to complete
379 the multipart upload. If a checksum is in the response,
380 it will also be included.
381 """
382 try:
383 response = client.upload_part_copy(
384 CopySource=copy_source,
385 Bucket=bucket,
386 Key=key,
387 UploadId=upload_id,
388 PartNumber=part_number,
389 **extra_args,
390 )
391 except ClientError as e:
392 error_code = e.response.get('Error', {}).get('Code')
393 src_key = copy_source['Key']
394 src_bucket = copy_source['Bucket']
395 if error_code == "PreconditionFailed":
396 raise S3CopyFailedError(
397 f'Contents of stored object "{src_key}" '
398 f'in bucket "{src_bucket}" did not match '
399 'expected ETag.'
400 )
401 else:
402 raise
403 for callback in callbacks:
404 callback(bytes_transferred=size)
405 etag = response['CopyPartResult']['ETag']
406 part_metadata = {'ETag': etag, 'PartNumber': part_number}
407 if checksum_algorithm:
408 checksum_member = f'Checksum{checksum_algorithm.upper()}'
409 if checksum_member in response['CopyPartResult']:
410 part_metadata[checksum_member] = response['CopyPartResult'][
411 checksum_member
412 ]
413 return part_metadata