Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/s3transfer/copies.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

90 statements  

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import copy 

14import math 

15 

16from botocore.exceptions import ClientError 

17 

18from s3transfer.exceptions import S3CopyFailedError 

19from s3transfer.tasks import ( 

20 CompleteMultipartUploadTask, 

21 CreateMultipartUploadTask, 

22 SubmissionTask, 

23 Task, 

24) 

25from s3transfer.utils import ( 

26 ChunksizeAdjuster, 

27 calculate_range_parameter, 

28 get_callbacks, 

29 get_filtered_dict, 

30) 

31 

32 

33class CopySubmissionTask(SubmissionTask): 

34 """Task for submitting tasks to execute a copy""" 

35 

36 EXTRA_ARGS_TO_HEAD_ARGS_MAPPING = { 

37 'CopySourceIfMatch': 'IfMatch', 

38 'CopySourceIfModifiedSince': 'IfModifiedSince', 

39 'CopySourceIfNoneMatch': 'IfNoneMatch', 

40 'CopySourceIfUnmodifiedSince': 'IfUnmodifiedSince', 

41 'CopySourceSSECustomerKey': 'SSECustomerKey', 

42 'CopySourceSSECustomerAlgorithm': 'SSECustomerAlgorithm', 

43 'CopySourceSSECustomerKeyMD5': 'SSECustomerKeyMD5', 

44 'RequestPayer': 'RequestPayer', 

45 'ExpectedBucketOwner': 'ExpectedBucketOwner', 

46 } 

47 

48 UPLOAD_PART_COPY_ARGS = [ 

49 'CopySourceIfMatch', 

50 'CopySourceIfModifiedSince', 

51 'CopySourceIfNoneMatch', 

52 'CopySourceIfUnmodifiedSince', 

53 'CopySourceSSECustomerKey', 

54 'CopySourceSSECustomerAlgorithm', 

55 'CopySourceSSECustomerKeyMD5', 

56 'SSECustomerKey', 

57 'SSECustomerAlgorithm', 

58 'SSECustomerKeyMD5', 

59 'RequestPayer', 

60 'ExpectedBucketOwner', 

61 ] 

62 

63 CREATE_MULTIPART_ARGS_BLACKLIST = [ 

64 'CopySourceIfMatch', 

65 'CopySourceIfModifiedSince', 

66 'CopySourceIfNoneMatch', 

67 'CopySourceIfUnmodifiedSince', 

68 'CopySourceSSECustomerKey', 

69 'CopySourceSSECustomerAlgorithm', 

70 'CopySourceSSECustomerKeyMD5', 

71 'MetadataDirective', 

72 'TaggingDirective', 

73 ] 

74 

75 COMPLETE_MULTIPART_ARGS = [ 

76 'SSECustomerKey', 

77 'SSECustomerAlgorithm', 

78 'SSECustomerKeyMD5', 

79 'RequestPayer', 

80 'ExpectedBucketOwner', 

81 ] 

82 

83 def _submit( 

84 self, client, config, osutil, request_executor, transfer_future 

85 ): 

86 """ 

87 :param client: The client associated with the transfer manager 

88 

89 :type config: s3transfer.manager.TransferConfig 

90 :param config: The transfer config associated with the transfer 

91 manager 

92 

93 :type osutil: s3transfer.utils.OSUtil 

94 :param osutil: The os utility associated to the transfer manager 

95 

96 :type request_executor: s3transfer.futures.BoundedExecutor 

97 :param request_executor: The request executor associated with the 

98 transfer manager 

99 

100 :type transfer_future: s3transfer.futures.TransferFuture 

101 :param transfer_future: The transfer future associated with the 

102 transfer request that tasks are being submitted for 

103 """ 

104 if ( 

105 transfer_future.meta.size is None 

106 or transfer_future.meta.etag is None 

107 ): 

108 # If a size was not provided figure out the size for the 

109 # user. Note that we will only use the client provided to 

110 # the TransferManager. If the object is outside of the region 

111 # of the client, they may have to provide the file size themselves 

112 # with a completely new client. 

113 call_args = transfer_future.meta.call_args 

114 head_object_request = ( 

115 self._get_head_object_request_from_copy_source( 

116 call_args.copy_source 

117 ) 

118 ) 

119 extra_args = call_args.extra_args 

120 

121 # Map any values that may be used in the head object that is 

122 # used in the copy object 

123 for param, value in extra_args.items(): 

124 if param in self.EXTRA_ARGS_TO_HEAD_ARGS_MAPPING: 

125 head_object_request[ 

126 self.EXTRA_ARGS_TO_HEAD_ARGS_MAPPING[param] 

127 ] = value 

128 

129 response = call_args.source_client.head_object( 

130 **head_object_request 

131 ) 

132 transfer_future.meta.provide_transfer_size( 

133 response['ContentLength'] 

134 ) 

135 # Provide an etag to ensure a stored object is not modified 

136 # during a multipart copy. 

137 transfer_future.meta.provide_object_etag(response.get('ETag')) 

138 

139 # If it is greater than threshold do a multipart copy, otherwise 

140 # do a regular copy object. 

141 if transfer_future.meta.size < config.multipart_threshold: 

142 self._submit_copy_request( 

143 client, config, osutil, request_executor, transfer_future 

144 ) 

145 else: 

146 self._submit_multipart_request( 

147 client, config, osutil, request_executor, transfer_future 

148 ) 

149 

150 def _submit_copy_request( 

151 self, client, config, osutil, request_executor, transfer_future 

152 ): 

153 call_args = transfer_future.meta.call_args 

154 

155 # Get the needed progress callbacks for the task 

156 progress_callbacks = get_callbacks(transfer_future, 'progress') 

157 

158 # Submit the request of a single copy. 

159 self._transfer_coordinator.submit( 

160 request_executor, 

161 CopyObjectTask( 

162 transfer_coordinator=self._transfer_coordinator, 

163 main_kwargs={ 

164 'client': client, 

165 'copy_source': call_args.copy_source, 

166 'bucket': call_args.bucket, 

167 'key': call_args.key, 

168 'extra_args': call_args.extra_args, 

169 'callbacks': progress_callbacks, 

170 'size': transfer_future.meta.size, 

171 }, 

172 is_final=True, 

173 ), 

174 ) 

175 

176 def _submit_multipart_request( 

177 self, client, config, osutil, request_executor, transfer_future 

178 ): 

179 call_args = transfer_future.meta.call_args 

180 

181 # Submit the request to create a multipart upload and make sure it 

182 # does not include any of the arguments used for copy part. 

183 create_multipart_extra_args = {} 

184 for param, val in call_args.extra_args.items(): 

185 if param not in self.CREATE_MULTIPART_ARGS_BLACKLIST: 

186 create_multipart_extra_args[param] = val 

187 

188 create_multipart_future = self._transfer_coordinator.submit( 

189 request_executor, 

190 CreateMultipartUploadTask( 

191 transfer_coordinator=self._transfer_coordinator, 

192 main_kwargs={ 

193 'client': client, 

194 'bucket': call_args.bucket, 

195 'key': call_args.key, 

196 'extra_args': create_multipart_extra_args, 

197 }, 

198 ), 

199 ) 

200 

201 # Determine how many parts are needed based on filesize and 

202 # desired chunksize. 

203 part_size = config.multipart_chunksize 

204 adjuster = ChunksizeAdjuster() 

205 part_size = adjuster.adjust_chunksize( 

206 part_size, transfer_future.meta.size 

207 ) 

208 num_parts = int( 

209 math.ceil(transfer_future.meta.size / float(part_size)) 

210 ) 

211 

212 # Submit requests to upload the parts of the file. 

213 part_futures = [] 

214 progress_callbacks = get_callbacks(transfer_future, 'progress') 

215 

216 for part_number in range(1, num_parts + 1): 

217 extra_part_args = self._extra_upload_part_args( 

218 call_args.extra_args 

219 ) 

220 # The part number for upload part starts at 1 while the 

221 # range parameter starts at zero, so just subtract 1 off of 

222 # the part number 

223 extra_part_args['CopySourceRange'] = calculate_range_parameter( 

224 part_size, 

225 part_number - 1, 

226 num_parts, 

227 transfer_future.meta.size, 

228 ) 

229 if transfer_future.meta.etag is not None: 

230 extra_part_args['CopySourceIfMatch'] = ( 

231 transfer_future.meta.etag 

232 ) 

233 # Get the size of the part copy as well for the progress 

234 # callbacks. 

235 size = self._get_transfer_size( 

236 part_size, 

237 part_number - 1, 

238 num_parts, 

239 transfer_future.meta.size, 

240 ) 

241 # Get the checksum algorithm of the multipart request. 

242 checksum_algorithm = call_args.extra_args.get("ChecksumAlgorithm") 

243 part_futures.append( 

244 self._transfer_coordinator.submit( 

245 request_executor, 

246 CopyPartTask( 

247 transfer_coordinator=self._transfer_coordinator, 

248 main_kwargs={ 

249 'client': client, 

250 'copy_source': call_args.copy_source, 

251 'bucket': call_args.bucket, 

252 'key': call_args.key, 

253 'part_number': part_number, 

254 'extra_args': extra_part_args, 

255 'callbacks': progress_callbacks, 

256 'size': size, 

257 'checksum_algorithm': checksum_algorithm, 

258 }, 

259 pending_main_kwargs={ 

260 'upload_id': create_multipart_future 

261 }, 

262 ), 

263 ) 

264 ) 

265 

266 complete_multipart_extra_args = self._extra_complete_multipart_args( 

267 call_args.extra_args 

268 ) 

269 # Submit the request to complete the multipart upload. 

270 self._transfer_coordinator.submit( 

271 request_executor, 

272 CompleteMultipartUploadTask( 

273 transfer_coordinator=self._transfer_coordinator, 

274 main_kwargs={ 

275 'client': client, 

276 'bucket': call_args.bucket, 

277 'key': call_args.key, 

278 'extra_args': complete_multipart_extra_args, 

279 }, 

280 pending_main_kwargs={ 

281 'upload_id': create_multipart_future, 

282 'parts': part_futures, 

283 }, 

284 is_final=True, 

285 ), 

286 ) 

287 

288 def _get_head_object_request_from_copy_source(self, copy_source): 

289 if isinstance(copy_source, dict): 

290 return copy.copy(copy_source) 

291 else: 

292 raise TypeError( 

293 'Expecting dictionary formatted: ' 

294 '{"Bucket": bucket_name, "Key": key} ' 

295 f'but got {copy_source} or type {type(copy_source)}.' 

296 ) 

297 

298 def _extra_upload_part_args(self, extra_args): 

299 # Only the args in COPY_PART_ARGS actually need to be passed 

300 # onto the upload_part_copy calls. 

301 return get_filtered_dict(extra_args, self.UPLOAD_PART_COPY_ARGS) 

302 

303 def _extra_complete_multipart_args(self, extra_args): 

304 return get_filtered_dict(extra_args, self.COMPLETE_MULTIPART_ARGS) 

305 

306 def _get_transfer_size( 

307 self, part_size, part_index, num_parts, total_transfer_size 

308 ): 

309 if part_index == num_parts - 1: 

310 # The last part may be different in size then the rest of the 

311 # parts. 

312 return total_transfer_size - (part_index * part_size) 

313 return part_size 

314 

315 

316class CopyObjectTask(Task): 

317 """Task to do a nonmultipart copy""" 

318 

319 def _main( 

320 self, client, copy_source, bucket, key, extra_args, callbacks, size 

321 ): 

322 """ 

323 :param client: The client to use when calling PutObject 

324 :param copy_source: The CopySource parameter to use 

325 :param bucket: The name of the bucket to copy to 

326 :param key: The name of the key to copy to 

327 :param extra_args: A dictionary of any extra arguments that may be 

328 used in the upload. 

329 :param callbacks: List of callbacks to call after copy 

330 :param size: The size of the transfer. This value is passed into 

331 the callbacks 

332 

333 """ 

334 client.copy_object( 

335 CopySource=copy_source, Bucket=bucket, Key=key, **extra_args 

336 ) 

337 for callback in callbacks: 

338 callback(bytes_transferred=size) 

339 

340 

341class CopyPartTask(Task): 

342 """Task to upload a part in a multipart copy""" 

343 

344 def _main( 

345 self, 

346 client, 

347 copy_source, 

348 bucket, 

349 key, 

350 upload_id, 

351 part_number, 

352 extra_args, 

353 callbacks, 

354 size, 

355 checksum_algorithm=None, 

356 ): 

357 """ 

358 :param client: The client to use when calling PutObject 

359 :param copy_source: The CopySource parameter to use 

360 :param bucket: The name of the bucket to upload to 

361 :param key: The name of the key to upload to 

362 :param upload_id: The id of the upload 

363 :param part_number: The number representing the part of the multipart 

364 upload 

365 :param extra_args: A dictionary of any extra arguments that may be 

366 used in the upload. 

367 :param callbacks: List of callbacks to call after copy part 

368 :param size: The size of the transfer. This value is passed into 

369 the callbacks 

370 :param checksum_algorithm: The algorithm that was used to create the multipart 

371 upload 

372 

373 :rtype: dict 

374 :returns: A dictionary representing a part:: 

375 

376 {'Etag': etag_value, 'PartNumber': part_number} 

377 

378 This value can be appended to a list to be used to complete 

379 the multipart upload. If a checksum is in the response, 

380 it will also be included. 

381 """ 

382 try: 

383 response = client.upload_part_copy( 

384 CopySource=copy_source, 

385 Bucket=bucket, 

386 Key=key, 

387 UploadId=upload_id, 

388 PartNumber=part_number, 

389 **extra_args, 

390 ) 

391 except ClientError as e: 

392 error_code = e.response.get('Error', {}).get('Code') 

393 src_key = copy_source['Key'] 

394 src_bucket = copy_source['Bucket'] 

395 if error_code == "PreconditionFailed": 

396 raise S3CopyFailedError( 

397 f'Contents of stored object "{src_key}" ' 

398 f'in bucket "{src_bucket}" did not match ' 

399 'expected ETag.' 

400 ) 

401 else: 

402 raise 

403 for callback in callbacks: 

404 callback(bytes_transferred=size) 

405 etag = response['CopyPartResult']['ETag'] 

406 part_metadata = {'ETag': etag, 'PartNumber': part_number} 

407 if checksum_algorithm: 

408 checksum_member = f'Checksum{checksum_algorithm.upper()}' 

409 if checksum_member in response['CopyPartResult']: 

410 part_metadata[checksum_member] = response['CopyPartResult'][ 

411 checksum_member 

412 ] 

413 return part_metadata