Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/s3transfer/copies.py: 26%

76 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import copy 

14import math 

15 

16from s3transfer.tasks import ( 

17 CompleteMultipartUploadTask, 

18 CreateMultipartUploadTask, 

19 SubmissionTask, 

20 Task, 

21) 

22from s3transfer.utils import ( 

23 ChunksizeAdjuster, 

24 calculate_range_parameter, 

25 get_callbacks, 

26 get_filtered_dict, 

27) 

28 

29 

30class CopySubmissionTask(SubmissionTask): 

31 """Task for submitting tasks to execute a copy""" 

32 

33 EXTRA_ARGS_TO_HEAD_ARGS_MAPPING = { 

34 'CopySourceIfMatch': 'IfMatch', 

35 'CopySourceIfModifiedSince': 'IfModifiedSince', 

36 'CopySourceIfNoneMatch': 'IfNoneMatch', 

37 'CopySourceIfUnmodifiedSince': 'IfUnmodifiedSince', 

38 'CopySourceSSECustomerKey': 'SSECustomerKey', 

39 'CopySourceSSECustomerAlgorithm': 'SSECustomerAlgorithm', 

40 'CopySourceSSECustomerKeyMD5': 'SSECustomerKeyMD5', 

41 'RequestPayer': 'RequestPayer', 

42 'ExpectedBucketOwner': 'ExpectedBucketOwner', 

43 } 

44 

45 UPLOAD_PART_COPY_ARGS = [ 

46 'CopySourceIfMatch', 

47 'CopySourceIfModifiedSince', 

48 'CopySourceIfNoneMatch', 

49 'CopySourceIfUnmodifiedSince', 

50 'CopySourceSSECustomerKey', 

51 'CopySourceSSECustomerAlgorithm', 

52 'CopySourceSSECustomerKeyMD5', 

53 'SSECustomerKey', 

54 'SSECustomerAlgorithm', 

55 'SSECustomerKeyMD5', 

56 'RequestPayer', 

57 'ExpectedBucketOwner', 

58 ] 

59 

60 CREATE_MULTIPART_ARGS_BLACKLIST = [ 

61 'CopySourceIfMatch', 

62 'CopySourceIfModifiedSince', 

63 'CopySourceIfNoneMatch', 

64 'CopySourceIfUnmodifiedSince', 

65 'CopySourceSSECustomerKey', 

66 'CopySourceSSECustomerAlgorithm', 

67 'CopySourceSSECustomerKeyMD5', 

68 'MetadataDirective', 

69 'TaggingDirective', 

70 ] 

71 

72 COMPLETE_MULTIPART_ARGS = [ 

73 'SSECustomerKey', 

74 'SSECustomerAlgorithm', 

75 'SSECustomerKeyMD5', 

76 'RequestPayer', 

77 'ExpectedBucketOwner', 

78 ] 

79 

80 def _submit( 

81 self, client, config, osutil, request_executor, transfer_future 

82 ): 

83 """ 

84 :param client: The client associated with the transfer manager 

85 

86 :type config: s3transfer.manager.TransferConfig 

87 :param config: The transfer config associated with the transfer 

88 manager 

89 

90 :type osutil: s3transfer.utils.OSUtil 

91 :param osutil: The os utility associated to the transfer manager 

92 

93 :type request_executor: s3transfer.futures.BoundedExecutor 

94 :param request_executor: The request executor associated with the 

95 transfer manager 

96 

97 :type transfer_future: s3transfer.futures.TransferFuture 

98 :param transfer_future: The transfer future associated with the 

99 transfer request that tasks are being submitted for 

100 """ 

101 # Determine the size if it was not provided 

102 if transfer_future.meta.size is None: 

103 # If a size was not provided figure out the size for the 

104 # user. Note that we will only use the client provided to 

105 # the TransferManager. If the object is outside of the region 

106 # of the client, they may have to provide the file size themselves 

107 # with a completely new client. 

108 call_args = transfer_future.meta.call_args 

109 head_object_request = ( 

110 self._get_head_object_request_from_copy_source( 

111 call_args.copy_source 

112 ) 

113 ) 

114 extra_args = call_args.extra_args 

115 

116 # Map any values that may be used in the head object that is 

117 # used in the copy object 

118 for param, value in extra_args.items(): 

119 if param in self.EXTRA_ARGS_TO_HEAD_ARGS_MAPPING: 

120 head_object_request[ 

121 self.EXTRA_ARGS_TO_HEAD_ARGS_MAPPING[param] 

122 ] = value 

123 

124 response = call_args.source_client.head_object( 

125 **head_object_request 

126 ) 

127 transfer_future.meta.provide_transfer_size( 

128 response['ContentLength'] 

129 ) 

130 

131 # If it is greater than threshold do a multipart copy, otherwise 

132 # do a regular copy object. 

133 if transfer_future.meta.size < config.multipart_threshold: 

134 self._submit_copy_request( 

135 client, config, osutil, request_executor, transfer_future 

136 ) 

137 else: 

138 self._submit_multipart_request( 

139 client, config, osutil, request_executor, transfer_future 

140 ) 

141 

142 def _submit_copy_request( 

143 self, client, config, osutil, request_executor, transfer_future 

144 ): 

145 call_args = transfer_future.meta.call_args 

146 

147 # Get the needed progress callbacks for the task 

148 progress_callbacks = get_callbacks(transfer_future, 'progress') 

149 

150 # Submit the request of a single copy. 

151 self._transfer_coordinator.submit( 

152 request_executor, 

153 CopyObjectTask( 

154 transfer_coordinator=self._transfer_coordinator, 

155 main_kwargs={ 

156 'client': client, 

157 'copy_source': call_args.copy_source, 

158 'bucket': call_args.bucket, 

159 'key': call_args.key, 

160 'extra_args': call_args.extra_args, 

161 'callbacks': progress_callbacks, 

162 'size': transfer_future.meta.size, 

163 }, 

164 is_final=True, 

165 ), 

166 ) 

167 

168 def _submit_multipart_request( 

169 self, client, config, osutil, request_executor, transfer_future 

170 ): 

171 call_args = transfer_future.meta.call_args 

172 

173 # Submit the request to create a multipart upload and make sure it 

174 # does not include any of the arguments used for copy part. 

175 create_multipart_extra_args = {} 

176 for param, val in call_args.extra_args.items(): 

177 if param not in self.CREATE_MULTIPART_ARGS_BLACKLIST: 

178 create_multipart_extra_args[param] = val 

179 

180 create_multipart_future = self._transfer_coordinator.submit( 

181 request_executor, 

182 CreateMultipartUploadTask( 

183 transfer_coordinator=self._transfer_coordinator, 

184 main_kwargs={ 

185 'client': client, 

186 'bucket': call_args.bucket, 

187 'key': call_args.key, 

188 'extra_args': create_multipart_extra_args, 

189 }, 

190 ), 

191 ) 

192 

193 # Determine how many parts are needed based on filesize and 

194 # desired chunksize. 

195 part_size = config.multipart_chunksize 

196 adjuster = ChunksizeAdjuster() 

197 part_size = adjuster.adjust_chunksize( 

198 part_size, transfer_future.meta.size 

199 ) 

200 num_parts = int( 

201 math.ceil(transfer_future.meta.size / float(part_size)) 

202 ) 

203 

204 # Submit requests to upload the parts of the file. 

205 part_futures = [] 

206 progress_callbacks = get_callbacks(transfer_future, 'progress') 

207 

208 for part_number in range(1, num_parts + 1): 

209 extra_part_args = self._extra_upload_part_args( 

210 call_args.extra_args 

211 ) 

212 # The part number for upload part starts at 1 while the 

213 # range parameter starts at zero, so just subtract 1 off of 

214 # the part number 

215 extra_part_args['CopySourceRange'] = calculate_range_parameter( 

216 part_size, 

217 part_number - 1, 

218 num_parts, 

219 transfer_future.meta.size, 

220 ) 

221 # Get the size of the part copy as well for the progress 

222 # callbacks. 

223 size = self._get_transfer_size( 

224 part_size, 

225 part_number - 1, 

226 num_parts, 

227 transfer_future.meta.size, 

228 ) 

229 # Get the checksum algorithm of the multipart request. 

230 checksum_algorithm = call_args.extra_args.get("ChecksumAlgorithm") 

231 part_futures.append( 

232 self._transfer_coordinator.submit( 

233 request_executor, 

234 CopyPartTask( 

235 transfer_coordinator=self._transfer_coordinator, 

236 main_kwargs={ 

237 'client': client, 

238 'copy_source': call_args.copy_source, 

239 'bucket': call_args.bucket, 

240 'key': call_args.key, 

241 'part_number': part_number, 

242 'extra_args': extra_part_args, 

243 'callbacks': progress_callbacks, 

244 'size': size, 

245 'checksum_algorithm': checksum_algorithm, 

246 }, 

247 pending_main_kwargs={ 

248 'upload_id': create_multipart_future 

249 }, 

250 ), 

251 ) 

252 ) 

253 

254 complete_multipart_extra_args = self._extra_complete_multipart_args( 

255 call_args.extra_args 

256 ) 

257 # Submit the request to complete the multipart upload. 

258 self._transfer_coordinator.submit( 

259 request_executor, 

260 CompleteMultipartUploadTask( 

261 transfer_coordinator=self._transfer_coordinator, 

262 main_kwargs={ 

263 'client': client, 

264 'bucket': call_args.bucket, 

265 'key': call_args.key, 

266 'extra_args': complete_multipart_extra_args, 

267 }, 

268 pending_main_kwargs={ 

269 'upload_id': create_multipart_future, 

270 'parts': part_futures, 

271 }, 

272 is_final=True, 

273 ), 

274 ) 

275 

276 def _get_head_object_request_from_copy_source(self, copy_source): 

277 if isinstance(copy_source, dict): 

278 return copy.copy(copy_source) 

279 else: 

280 raise TypeError( 

281 'Expecting dictionary formatted: ' 

282 '{"Bucket": bucket_name, "Key": key} ' 

283 'but got %s or type %s.' % (copy_source, type(copy_source)) 

284 ) 

285 

286 def _extra_upload_part_args(self, extra_args): 

287 # Only the args in COPY_PART_ARGS actually need to be passed 

288 # onto the upload_part_copy calls. 

289 return get_filtered_dict(extra_args, self.UPLOAD_PART_COPY_ARGS) 

290 

291 def _extra_complete_multipart_args(self, extra_args): 

292 return get_filtered_dict(extra_args, self.COMPLETE_MULTIPART_ARGS) 

293 

294 def _get_transfer_size( 

295 self, part_size, part_index, num_parts, total_transfer_size 

296 ): 

297 if part_index == num_parts - 1: 

298 # The last part may be different in size then the rest of the 

299 # parts. 

300 return total_transfer_size - (part_index * part_size) 

301 return part_size 

302 

303 

304class CopyObjectTask(Task): 

305 """Task to do a nonmultipart copy""" 

306 

307 def _main( 

308 self, client, copy_source, bucket, key, extra_args, callbacks, size 

309 ): 

310 """ 

311 :param client: The client to use when calling PutObject 

312 :param copy_source: The CopySource parameter to use 

313 :param bucket: The name of the bucket to copy to 

314 :param key: The name of the key to copy to 

315 :param extra_args: A dictionary of any extra arguments that may be 

316 used in the upload. 

317 :param callbacks: List of callbacks to call after copy 

318 :param size: The size of the transfer. This value is passed into 

319 the callbacks 

320 

321 """ 

322 client.copy_object( 

323 CopySource=copy_source, Bucket=bucket, Key=key, **extra_args 

324 ) 

325 for callback in callbacks: 

326 callback(bytes_transferred=size) 

327 

328 

329class CopyPartTask(Task): 

330 """Task to upload a part in a multipart copy""" 

331 

332 def _main( 

333 self, 

334 client, 

335 copy_source, 

336 bucket, 

337 key, 

338 upload_id, 

339 part_number, 

340 extra_args, 

341 callbacks, 

342 size, 

343 checksum_algorithm=None, 

344 ): 

345 """ 

346 :param client: The client to use when calling PutObject 

347 :param copy_source: The CopySource parameter to use 

348 :param bucket: The name of the bucket to upload to 

349 :param key: The name of the key to upload to 

350 :param upload_id: The id of the upload 

351 :param part_number: The number representing the part of the multipart 

352 upload 

353 :param extra_args: A dictionary of any extra arguments that may be 

354 used in the upload. 

355 :param callbacks: List of callbacks to call after copy part 

356 :param size: The size of the transfer. This value is passed into 

357 the callbacks 

358 :param checksum_algorithm: The algorithm that was used to create the multipart 

359 upload 

360 

361 :rtype: dict 

362 :returns: A dictionary representing a part:: 

363 

364 {'Etag': etag_value, 'PartNumber': part_number} 

365 

366 This value can be appended to a list to be used to complete 

367 the multipart upload. If a checksum is in the response, 

368 it will also be included. 

369 """ 

370 response = client.upload_part_copy( 

371 CopySource=copy_source, 

372 Bucket=bucket, 

373 Key=key, 

374 UploadId=upload_id, 

375 PartNumber=part_number, 

376 **extra_args, 

377 ) 

378 for callback in callbacks: 

379 callback(bytes_transferred=size) 

380 etag = response['CopyPartResult']['ETag'] 

381 part_metadata = {'ETag': etag, 'PartNumber': part_number} 

382 if checksum_algorithm: 

383 checksum_member = f'Checksum{checksum_algorithm.upper()}' 

384 if checksum_member in response['CopyPartResult']: 

385 part_metadata[checksum_member] = response['CopyPartResult'][ 

386 checksum_member 

387 ] 

388 return part_metadata